Skip to main content

· 7 min read
Javier Montón

Kafka Connect is a tool that allows you to stream data between Apache Kafka and other systems, sometimes the data might be converted from Protobuf to something different, other times, it might be converted to Protobuf.

Protobuf Converters

Confluent has a Protobuf Converter that can be used with any Kafka Connect Source or Sink, but it isn't as simple as it seems.

If you enable:

value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081

If you use this in a Sink connector, Kafka Connect will understand how to deserialize the Protobuf message in Kafka, but how will it write it to the sink?

If you enable this in a Source connector, Kafka Connect will be able to serialize the message to Protobuf, but what was it expecting from the Source?

In some connectors, like a JDBC, it might be obvious that the database has a proper schema defined, but in others, like S3 or SQS, there is no schema defined, so, how will Kafka Connect know how to read/write the data?

How Converters work

Inside Kafka Connect, the data is represented using the classes in org.apache.kafka.connect.data, which can be Struct class or other basic types. The converters expect to work with these classes and they will transform them into the desired format, like Protobuf, or the other way around.

If you define the Schema as a simple String, the Converter will understand that you only have a single field as String. If you work with JSON, the data should be translated first to a Struct.

Sink & Protobuf Converter

In a Sink, the ProtobufConverter (the same as the Avro converter or any other) transforms the data read from Kafka into this Struct class, with its fields and schema. From there, the Sink connector decides how to write these objects in the destination. If we use a destination that requires a schema, like an RDS, the connector will likely know (and probably need) how to write these structs.

If we use a plugin that writes into a schemaless system, like S3 or SQS, nothing prevents the connector from writing data other than JSON or similar. The connector needs to transform the struct into JSON if that's what we want.

Source & Protobuf Converter

Here is where it gets tricky. If we use a Protobuf converter in a Source connector, we are telling the converter to transform the Struct into Protobuf, but the connector needs to produce this Struct. In a structured source, like an RDS, it is easy to know how to produce the Struct, but in a schemaless source, like S3 or SQS, it is not that easy, the Source connector should be expecting something like a JSON, and it will need to parse it properly, generating a schema and a struct, otherwise, the Protobuf converter won't know what to do.

At EF, we have modified an SQS Source connector to allow automatic conversions from JSON to Struct, so the Protobuf or any other converter can work properly.

warning

Note that parsing automatically JSONs to be converted into Struct or Protobuf is not trivial due to the light types of JSON. For example, a numeric Timestamp in JSON is no different from a numeric value, so you cannot know if it's a Timestamp or an Integer, or Long. JSON doesn't have enums, so a String cannot be converted into an Enumeration automatically, etc.

But that's not all, how will this Protobuf Converter interact with Schema Registry? There are several issues depending on the configuration.

Schema Registry & Protobuf Converter

The Confluent's ProtobufConverter is designed to use the following configuration, which is the default one:

value.converter.auto.register.schemas=true
value.converter.use.latest.version=false
value.converter.latest.compatibility.strict=true

In this way, a Source Connector in Kafka Connect will produce a new Protobuf Schema, and it will register the Schema in Schema Registry. But the schema registered will be something similar to this:

syntax = "proto3";
message ConnectDefault1 {
string id = 1;
int32 value = 2;
repeated string tags = 3;
google.protobuf.Timestamp updatedAt = 4;
ConnectDefault2 sub = 5;

message ConnectDefault2 {
string name = 1;
string description = 2;
int32 value = 3;
}
}

As we can see, it decides to create messages named ConnectDefault1 and ConnectDefault2, which are fine if we don't care much about controlling our own schemas.

If a new message has a different schema than the first registered, and it's not compatible (following our compatibility configurations), it will fail.

But, what happens if we don't want Kafka Connect to write the schema in Schema Registry? We could disable the option, so this will be our config:

value.converter.auto.register.schemas=false
value.converter.use.latest.version=true
value.converter.latest.compatibility.strict=true

In this situation, we are registering the schema on our side, maybe we are calling our main object MainObject rather than ConnectDefault1. But here is where the issues start to appear, the ProtobufConverter will try to validate the schema in Schema Registry, and it might fail for several reasons. For example, the nested object might be recognized as a different type, because it is expecting a ConnectDefault2 and we are sending a MySubObject.

tip

We can use the SetSchemaMetadata SMT to set the schema name and namespace. But we still have issues with the order of the fields, more details later.

So, maybe we can disable the value.converter.latest.compatibility.strict=false if we know the schemas are the same?

We could do it, yes, then, the ProtobufConverter won't compare the schemas, it will serialize the message and send it to Kafka. If the message has exactly the same schema, it will work, if not, it will still send the message to Kafka.

And probably the worst scenario, lets imagine that a new event comes with fields in different order. We know that the Struct holds information about the field names and types, so it shouldn't cause any issue. We also know that an input message in a format like JSON also specifies the field names, so far so good. But, Protobuf cares about the order of the fields, not just about their names, and the ProtobufConverter doesn't know which is the expected order.

Look at the previous Protobuf definition, a new message might come with id and value in different order, the ProtobufConverter will produce a schema like this:

syntax = "proto3";
message ConnectDefault1 {
int32 value = 1;
string id = 2;
repeated string tags = 3;
google.protobuf.Timestamp updatedAt = 4;
ConnectDefault2 sub = 5;

message ConnectDefault2 {
string name = 1;
string description = 2;
int32 value = 3;
}
}

id and value are in different order. If we have the value.converter.latest.compatibility.strict set to true, this will cause an error, if we have it as false, the event will be produced to Kafka, but our consumers will read it wrong.

warning

Even if we use value.converter.latest.compatibility.strict=true, if we send a message that matches the schema in the order of the fields and their types, but not on the field names, it will still produce the message. For example, if our message looks like this:

{
"anotherfield": "12345",
"value": 10,
"tags": ["exampleTag1", "exampleTag2"],
"updatedAt": "2023-10-01T12:34:56Z"
}

The ProtobufConverter will serialize this message and produce it in Kafka. When a consumer reads this, it will use the Schema in Schema Registry, which has id rather than anotherfield, it will show the data on it. This come become a big issue if our schema has a lot of fields with the same type, like string, if a message has unordered fields, they will end up mixed.

Conclusion

If we are using a Source Connector with ProtobufConverter, we should use the default configuration

value.converter.auto.register.schemas=false
value.converter.use.latest.version=true
value.converter.latest.compatibility.strict=true

and ensure that the source messages have always the same schema, including the order of the fields.

Otherwise, the system becomes too fragile, we could get a lot of failures, or we could produce messages we won't be able to read. Even with the default configuration, we should ensure that messages in the source don't change the order of the fields, as this will cause issues with Protobuf.

· 4 min read
Javier Montón

When working with Kafka, increasing or decreasing the number of brokers isn't as trivial as it seems. If you add a new broker, it will stand there doing nothing. You have to manually reassign partitions of your topics to the new broker. But you don't want to just move some topics completely to your new broker, you want to spread your partitions and their replicas equitably across all your brokers. You also want to have the number of leader partitions balanced across all your brokers.

Reassign partitions

To reassign partitions to different brokers, you can use the Kafka binaries (bin/kafka-reassign-partitions.sh), but it isn't trivial if you have to reassign thousands of topics.

The binary file has three operations:

  • --generate. This will generate a plan to reassign partitions.
  • --execute. This will execute the plan.
  • --verify. This will verify the status of the reassignment.
tip

A throttle can be set to avoid overloading the brokers, and the throttle will remain in the cluster after the reassignment, until a --verify is run when the reassignment has finished, so it's highly recommended to run --verify until you are sure all the partitions have been reassigned.

To create a plan, you have to pass a JSON file with the topics you want to reassign and the brokers you want to reassign them to. e.g.:

{
"topics": [
{ "topic": "foo1" },
{ "topic": "foo2" }
],
"version": 1
}

And it will generate you a file like this:

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[2,1],"log_dirs":["any"]},
{"topic":"foo1","partition":1,"replicas":[1,3],"log_dirs":["any"]},
{"topic":"foo1","partition":2,"replicas":[3,4],"log_dirs":["any"]},
{"topic":"foo2","partition":0,"replicas":[4,2],"log_dirs":["any"]},
{"topic":"foo2","partition":1,"replicas":[2,1],"log_dirs":["any"]},
{"topic":"foo2","partition":2,"replicas":[1,3],"log_dirs":["any"]}]
}

The input expects you to give the list of brokers (1,2,3,4,5...) and this JSON with the whole list of partitions and replicas.

warning

The first number in "replicas":[1,3] is the leader partition, the rest are the followers. This is very important because you might end up with more leader partitions in a broker than others, increasing its workload

Problem with Kafka Reassign tool

When you need to reassign a big cluster, you might find some issues with the --generate command:

  • The plan generated is completely random. Running it twice for a topic will produce different results.
  • The plan generated is not always optimal. It might assign more partitions to one broker than to another.

On a cluster with thousands of partitions this might be ok, as probably randomizing the partitions will be enough to balance them across the brokers, but it might not be completely optional. Also, if you need to run this for a lot of topics, and you want to do it on batches, you don't want to run the --generate command for a topic twice, in that case, you will be reassigning a topic that was already reassigned.

Building a custom reassign plan.

To manage partitions more properly, a custom tool can be built, where partitions are defined based on the topic name and the list of brokers. By doing this, reassigning partitions on the same topic twice won't produce any changes. A tool like that can be used to manage the reassignment of partitions in a more controlled way.

Balance Leader partitions

After reassigning a lot of partitions, the leader partitions might not be well-balanced across your brokers. This means that a broker might have more leader partitions than other brokers, which is translated into more workload. An example in Kafka-UI: img.png

If you wait, the cluster probably will rebalance the leader partitions on its own (if auto.leader.rebalance.enable=true is set).

In order to force a rebalance, you can use the bin/kafka-leader-election.sh binary.

This an example of the CPU usage of brokers before and after the leader election: img.png

e.g.:

$ bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions

--election-type can be preferred or unclean. preferred will try to move the leader partition to the preferred broker, unclean will move the leader partition to any broker.

TL;DR

To reassign partitions to new brokers:

  • Use the bin/kafka-reassign-partitions.sh with a list of topics, brokers and the --generate command.
  • Use the bin/kafka-reassign-partitions.sh with the generated JSON and the --execute command.
  • Use the bin/kafka-reassign-partitions.sh with the generated JSON and the --verify command.
  • Use the bin/kafka-leader-election.sh to balance the leader partitions across your brokers.

· 7 min read
Javier Montón

This post is about Kafka Connect, Mirror Maker 2, how they manage offsets, and how to deal with them.

Kafka Offsets

When a consumer starts consuming messages from Kafka, it will probably use a consumer-group and Kafka will store the offset of the last message consumed by that consumer-group. This offset is stored in a Kafka topic called __consumer_offsets.

By doing this, when the consumer restarts, it can start consuming messages from the last offset it consumed.

There are tools that allow us to manage these offsets, like the binary files provided by Kafka, and in any case, the consumer can always decide which offsets to consume from. This means that the consumer can set the offset to the beginning, the end, or any other offset. A lot of tools allow even to search offsets based on timestamps.

Monitoring offset lag

The consumer-offset-lag is a metric provided by Kafka, based on the difference between the last offset consumed by the consumer and the last offset produced by the producer. Most of the monitoring tools will have this value, like DataDog, and it is very useful to know if the consumer is lagging, meaning that it is not consuming messages as fast as they are produced, or it is down.

But Mirror Maker 2 (MM2) can not be monitored through this metric, see below.

Kafka Connect Offsets

note

When we talk about Kafka Connect, we are talking about the distributed version of Kafka Connect, not the standalone version.

Kafka Connect manage offsets in their own way. When a consumer is started by Kafka Connect, it will store the offsets in a Kafka topic called connect-offsets by default, although it can be configured through the offset.storage.topic property.

tip

When a connector is created in Kafka Connect, it will track the offsets in its own topic, but it also will use a regular "consumer-group" so the offsets will be also tracked in __consumer_offsets. This also means that we can monitor Kafka Connect sinks through the consumer-offset-lag metric.

Mirror Maker 2 (MM2) Offsets

Mirror Maker 2 is a tool provided by Kafka to replicate messages from one Kafka cluster to another, it is meant to be used for a complete replication between clusters, but it can be used to copy only some topics.

note

MM2 can be run in different ways, but here we are talking about the Kafka Connect way, where MM2 is run as a Kafka Connect connector.

If we think about MM2 and the data it needs to consume and produce, we can have doubts about how offsets can be managed. To start with, it needs to read in one cluster and write on another, so, how does it manage the offsets?

By default, MM2 will create a topic called mm2-offset-syncs.<cluster-alias>.internal and as far asI know, it can not be renamed.

tip

While working with MM2, it is recommended to install the connectors in the "target" cluster, so the "source" cluster will be the external one.

By default, MM2 will create the aforementioned topic in the "source" cluster, and it will store the offsets of the last message consumed and produced. But as we can see, the "source" cluster is "external" to where the Kafka Connect is installed, and that might cause some issues in cases where the "source" cluster is not managed by us. For example, we might not have write or create access, and we can not create the topic.

The destination of mm2-offset-syncs.<cluster-alias>.internal can be defined by the offset-syncs.topic.location property which accepts source (default) and target.

note

When a Consumer is created by MM2, which is a Kafka Connect connector, it will store the offsets both in mm2-offset-syncs.<cluster-alias>.internal and in connect-offsets. This is very important if we want to manipulate offsets

warning

MM2 consumers do not use a group.id, they do not use any Kafka consumer group and their consumed offset won't be stored in __consumer_offsets. This also means that we can not monitor MM2 through the consumer-offset-lag metric.

Mixing Kafka Connect and MM2

If we look at the offsets stored both by Kafka Connect and MM2 in their topics, we can see the following:

Kafka Connect topic

If we look at the connect-offsets topic, we can see that the offsets are stored in JSON format, with the following structure:

  • key is a structure that contains the connector name, the partition, the topic, and the cluster.
[
"my-connector-name",
{
"cluster": "my-source-cluster-alias",
"partition": 3,
"topic": "my-example-topic"
}
]
  • And the value is a JSON with the offset:
{
"offset": 1234
}
note

No matter where we store the offsets (source or target), Kafka Connect will show the "source cluster alias" as this is where the Kafka consumer is created.

MM2 topic

If we look at the mm2-offset-syncs.<cluster-alias>.internal topic, we can see KC uses its own format to store the offsets:

  • key is the connector name, but it has a few extra bytes, which represents some structure defined inside the code
  • value is just an Int64, which represents the offset

Managing offsets is not really recommended as we could mess up the connectors, but it is possible to do it.

Hot to reset offsets in Mirror Maker 2

If we need to reset the offsets in MM2, we might think that deleting the topic mm2-offset-syncs.<cluster-alias>.internal will do the trick, but it won't, as offsets are also stored in Kafka Connect's topic. So, we need to reset the offsets in both topics.

There is a lot of misinformation about how to reset the offsets in Kafka Connect, their docs are not very clear about it, and Kafka Connect has been lacking tools to work with it. Typically, removing the connector and creating it with a different name will do the trick, but we might want to keep the same name.

Manual edit of offsets

We can manually produce a message in the connect-offsets topic to reset offsets, and the right way of doing it is to send a tombstone. We can check the messages we have right now, identify the connector we want and send the same Key with null value.

note

To reset offsets completely we do not specify offset: 0, we send a null value

REST API to reset offsets

Starting from Kafka 3.6.0, Kafka Connect has a REST API to manage connectors, and it is possible to reset offsets through it. The docs about it are defined in the KPI-875, but they are still not present in the official docs. If you are using Confluent, starting from Confluent's 7.6.0 version Kafka 3.6.0 is included.

If we use this version, we can simply do a few curls to reset offsets. First we need to stop the connector and then reset the offsets.

curl -X PUT http://localhost:8083/connectors/my-connector-name/offsets/stop
curl -X DELETE http://localhost:8083/connectors/my-connector-name/offsets

We can also know the status of the offsets:

curl -X GET http://localhost:8083/connectors/my-connector-name/offsets | jq

TL;DR

To reset offsets in MM2, you need to:

  • Stop, pause or remove the connectors
  • Delete or truncate the topic mm2-offset-syncs.<cluster-alias>.internal
  • Reset the offsets in the connect-offsets topic, either manually or through the REST API for the desired connector
  • Start the connectors again
warning

Deleting the topic mm2-offset-syncs.<cluster-alias>.internal will not reset the offsets for other connectors you have configured in MM2 as they fall back to the connect-offsets topic, but be careful and do this at your own risk, things might change in the future and this could become false.

· 20 min read
Javier Montón

A guide to move data from Kafka to an AWS RDS using Kafka Connect and the JDBC Sink Connector with IAM Auth.

Kafka Connect

For these examples, we are using Confluent's Kafka Connect on its Docker version, as we are going to deploy it in a Kubernetes cluster.

Single and distributed modes

Kafka Connect comes with two modes of execution, single and distributed. The main difference between them is that the single mode runs all the connectors in the same JVM, while the distributed mode runs each connector in its own JVM. The distributed mode is the recommended one for production environments, as it provides better scalability and fault tolerance. In the case of K8s, it means we will be using more than one pod to run Kafka Connect.

warning

Be aware that these two modes use different class paths, so if you are doing changes inside the docker and you are running the single mode locally but distributed in production, you might have different results. I strongly recommend to check manually which are the class paths in each case using something like

ps aux | grep java

And you will get something like this:

java -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/log/kafka -Dlog4j.configuration=file:/etc/kafka/connect-log4j.properties -cp /etc/kafka-connect/jars/*:/usr/share/java/kafka/*:/usr/share/java/confluent-common/*:/usr/share/java/kafka-serde-tools/*:/usr/share/java/monitoring-interceptors/*:/usr/bin/../share/java/kafka/*:/usr/bin/../share/java/confluent-telemetry/* org.apache.kafka.connect.cli.ConnectDistributed /etc/kafka-connect/kafka-connect.properties

And you'll find all the directories (after -cp) included in the running Kafka Connect.

  • Note that a folder called cp-base-new is widely used in the Single mode, but is not very well documented.
  • Setting your deployment to 1 replicas will run Kafka Connect in Single mode while setting it to 2 or more will run it in Distributed mode.

Deploying in K8s

This should be fairly straightforward, as we are using Confluent's Kafka Connect Docker image, which is already prepared to be deployed in K8s. Confluent provides a Helm chart as an example, so it should be easy. You can also create your own.

Using MSK (Kafka)

If you are using the AWS's Kafka version, MSK, and you are authenticating using IAM, you will need to do a few things:

  • Configure some environment variables in Kafka Connect
  • Add the required AWS libraries to the classpath

Environment variables

CONNECT_BOOTSTRAP_SERVERS will have the brokers, as usual, but using the 9098 port.

You need to specify the IAM callback handler as well as SASL:

CONNECT_SASL_CLIENT_CALLBACK_HANDLER_CLASS = software.amazon.msk.auth.iam.IAMClientCallbackHandler
CONNECT_SASL_MECHANISM = AWS_MSK_IAM
CONNECT_SECURITY_PROTOCOL = SASL_SSL

Also, you have to provide a JAAS file with the credentials. You can find more info about this in the AWS's documentation. For IAM, something like this should work:

CONNECT_SASL_JAAS_CONFIG = 
software.amazon.msk.auth.iam.IAMLoginModule required
awsRoleArn="arn:aws:iam::{account}:role/{role}"
awsStsRegion="{region}";

If you do this in yaml for Helm, it will look like this:

  - name: CONNECT_SASL_JAAS_CONFIG
value: >-
software.amazon.msk.auth.iam.IAMLoginModule required
awsRoleArn="arn:aws:iam::{account}:role/{role}"
awsStsRegion="{region}";

When Kafka Connect creates a new connector, it will use its own credentials configuration, so if you want to have the same IAM auth, you will need to add the same values to these environment variables:

  • CONNECT_CONSUMER_SASL_CLIENT_CALLBACK_HANDLER_CLASS
  • CONNECT_CONSUMER_SASL_MECHANISM
  • CONNECT_CONSUMER_SECURITY_PROTOCOL
  • CONNECT_CONSUMER_SASL_JAAS_CONFIG
tip

If you are using your own Helm template, you could create some variables for these values, so you can reuse them in the different environment variables, to avoid writing them twice.

Formatting logs as JSON

Logs are very important, and having a good format is key to being able to read and process them easily. Usually, in production, we could want to have them as JSON, and Kafka Connect does not make it as easy for us as we might expect.

If you only want to change the log level or format your logs a bit, you could use the environment variables available for that, they are described in their docs but if you want to proper format all logs as JSON, you will need to do a few more things.

Using JSONEventLayoutV1

Kafka Connect uses log4j1 (not log4j2), so we will need to use a log4j.properties file to configure it. They are using a patched version of the original Log4j1 that is supposed to fix some vulnerabilities.

We can use a dependency that automatically converts all of our logs into JSON, like log4j-jsonevent-layout. See Adding libraries for more info about how to add new libraries. If we have this library in the classpath, we can now use the JSONEventLayoutV1 in our log4j.properties file. Like:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=net.logstash.log4j.JSONEventLayoutV1

Properties files

Confluent will tell you that you can modify the template for logs in /etc/confluent/docker/log4j.properties.template, but you might need some extra steps if you want all logs as JSON.

  • Template for most of the logs, as described, in /etc/confluent/docker/log4j.properties.template
  • Logs from the "Admin Client" in /etc/kafka/log4j.properties
  • Some tool logs in /etc/confluent/docker/tools-log4j.properties.template
  • Some startup logs in /etc/kafka-connect/log4j.properties
  • There are also some random logs not using Log4j, they are defined in /usr/lib/jvm/jre/conf/logging.properties

If you want to format everything to JSON, I would recommend entering inside the docker image, looking for those files, and changing them as desired. Your Dockerfile could then replace them while creating the image.

warning

There are still some logs during the start-up not formatted as JSON. Confluent's Kafka Connect is using a Python script to start up the service, and that service is using some prints that do not belong to any Log4j, so they are not formatted in any way.

If you want to format those prints too, you will need to do something else as they don't have any configuration files. You could use a sed command to replace them, or you could modify the cub.py file in your image with the desired format.

Adding plugins

Adding plugins should be straightforward, the documentation explains pretty well how to do it. Note that you can add plugins inside the plugins folder or you could modify the plugins folder with

plugin.path=/usr/local/share/kafka/plugins

In any case, copying and pasting files into a Docker can limit a bit the flexibility of the solution, so I would recommend building a project where you can add all the dependencies that you need, meaning that libraries and plugins can be built and copied inside the Docker during the CI. By doing this, you will be able to use Gradle, Maven, SBT, or any other building tool to manage your dependencies, upgrade versions, and build plugins.

tip

Note that Plugins and libraries are not included in the same path, so I would recommend building a different project for each. For example, we could build a main project that can build the Kafka Connect image with their libraries and a subproject that can build plugins in a different folder. Then, the Dockerfile could easily copy both folders into the image in the right paths.

If you build a project like that, to add the JDBC plugin, for example, in Gradle you only need to add this:

dependencies {
implementation("io.confluent:kafka-connect-jdbc:10.7.4")
}

Adding libraries

As mentioned earlier, libraries must go in the classpath, not in the plugins' folder. If you are using a project to build your libraries and plugins, you could use many different plugins to pack all the dependencies into a .jar that can be copied into the Docker image.

For example, with Gradle, we could include the AWS library needed for IAM authentication, and the Log4j JSON formatter, like this:

dependencies {
implementation("software.amazon.msk:aws-msk-iam-auth:1.1.7")
implementation("net.logstash.log4j:jsonevent-layout:1.7")
}

Using a plugin to build a fat jar, everything should be included in one .jar file that we can copy into the Docker image.

tip

For the JDBC Sink, we will need to also include a Driver and more libraries in case we want to use IAM Auth with RDS, we will see that later.

Kafka Connect REST API

By default, Kafka Connect exposes its REST API in the port 8083. You can find more info about the API in the official documentation.

If you want to control who can access the API or change its port, you can use the CONNECT_LISTENERS and/or CONNECT_REST_ADVERTISED_PORT environment variables. For example, if you want to change the port to 8084, you could do this:

  - name: CONNECT_REST_ADVERTISED_PORT
value: "8084"

Also, you can even open the API in multiple ports, by doing this:

  - name: CONNECT_LISTENERS
value: "http://0.0.0.0:8084,http://0.0.0.0:8085"
- name: CONNECT_REST_ADVERTISED_PORT
value: "8084"

Securing the API

Kafka Connect's REST API lacks security options, it only allows you to use a basic authentication, which might not be what you are looking for. Also, the code seems to have several places where they do an if - else to check if basic auth is enabled or not.

But, there is also another way we can use to build our own security layer.

JAX-RS Security Extensions

Without entering too much into details, Kafka Connect, as well as Schema Registry, are using JAX-RS to build their REST APIs, and JAX-RS allows us to add our own security extensions. Following this pattern, we could add a simple filter to check if the user is authenticated or not, and if not, we could return a 401 error.

About how to authenticate a user, we could use different methods, depending on our setup. For example, we could use AWS IAM API to check if a user has permissions or not, or as we are deploying this in Kubernetes, we could rely on Kubernetes identities which will allow us to authenticate pods using a JWT token.

To do this, you have to create a JAX-RS plugin and then register it in Kafka Connect. Once your plugin is ready, you can register it by extending ConnectRestExtension:

class MySecurityExtension extends ConnectRestExtension {}

This class will need to be packed with your libraries and included in the classpath, as we did with other libraries.

JDBC Sink Connector

We need to download the plugin and add it to the plugins' folder. By default, it's /usr/share/confluent-hub-components/.

You can get the .jar with wget and copy it inside the Docker image, in the aforementioned folder. Or, as suggested earlier, if you are building a project using a building tool, like Gradle, you can use Maven to download all the plugins you might need. We only need to add the dependency:

dependencies {
implementation("io.confluent:kafka-connect-jdbc:10.7.4")
}

And build the .jar. Then, we can copy it inside the Docker image.

Drivers

Only the plugin is not enough to connect to a database, we will also need the driver. In our case, we are using PostgreSQL RDS, so we will need the driver for Postgres.

info

Several drivers are already included in the Kafka Connect image, but they are not inside the default classpath, so if we try to run the connector without adding the driver properly, we will get an error like No suitable driver found. They are placed in /usr/share/confluent-hub-components/, but as we can see using something like ps aux | grep java, they are not included in the classpath. So, we have three options:

  • Move the driver to the classpath
  • Add the drivers' folder to the classpath
  • Find our own driver and copy it inside the Docker image, in the classpath

I would go for the third option, which gives us more flexibility about which version of the driver we want to use.

So, we can download the driver and pack it with our libraries, and then copy it inside the Docker image:

implementation("org.postgresql:postgresql:42.7.1")
tip

Note that the JDBC Sink has to be placed in plugins folder, while the driver has to be placed in the library classpath.

IAM Auth

If you are using IAM Auth with RDS, you will need to add some extra libraries to the classpath. You can find more info about this in the AWS's documentation.

The bad news is that a simple driver can not use IAM Auth, if you try to connect to the database using IAM Auth, you will get an error like The server requested password-based authentication, but no password was provided.. You would need to create a token manually and pass it through the connection.

The good news is that there is a library created by AWS that acts as a wrapper for your JDBC Drivers, adding extra features, including IAM Auth.

To use IAM Auth, we only need to add this driver to the classpath, and change a bit our JDBC URL.

Add the dependency (also needed libraries for AWS RDS):

implementation("com.github.awslabs:aws-advanced-jdbc-wrapper:2.3.2")
implementation("software.amazon.awssdk:rds:2.20.145")

You can follow their docs, but in our case, we will need to change the JDBC URL to add a couple of things:

  • URL using the new driver
  • IAM Auth enabled flag enabled
jdbc:aws-wrapper:postgresql://{host}:{port}/postgres?wrapperPlugins=iam
tip

Some tips:

  • This JDBC URL goes inside the connector's configuration
  • A username is still required, and it should be the same as the role used to connect to the database
  • You can also help the wrapper to find the dialect used, by adding &wrapperDialect=rds-pg
  • You can also help Kafka Connect to find the dialect used, by adding another property in your connector's configuration: dialectName: "PostgreSqlDatabaseDialect"

META-INF/services and multiple drivers

At this point, we are including the JDBC PostgreSQL driver and the wrapper in the classpath, both are JDBC Drivers, if we are including different .jar files, everything should be fine, but if we are building a fat-jar, we might have some issues. Each one of these drivers is creating a file called META-INF/services/java.sql.Driver, and they are including the name of the driver in it. If our fat-jar is not merging them to include both classes, we will get an error like No suitable driver found.

Depending on the building tool and the plugin used, we might need to add some extra configuration to merge these files. For example, in Gradle we could need to add something like this:

mergeServiceFiles()

Or in the SBT Assembly plugin, we could need to add something like:

assembly / assemblyMergeStrategy := MergeStrategy.concat

Topic & Destination table

The JDBC Sink Connector allows us to decide which topics and tables we want to use, and we have two ways of doing it:

  • One topic / table per connector. In this case, we can directly write the topic and table names in the connector's configuration.
  • Multiple topics / tables per connector. In this case, we will need to use a pattern for topics and another for tables.

One topic / table per connector

This is the easiest way, we only need to add the topic and table names in the connector's configuration, like this:

topics: "my.first.topic"
table.name.format: "first_table"

Using patterns for topics and table names

The JDBC does some magic to map topics to tables, but it's not always what we want. For example, if we have a topic called my.topic it will take my as schema name and topic as table name. More details about table parsing can be found in their docs.

But, we likely use a pattern for our topics, especially if we are building a Datalake, so we might want to create tables based on a different pattern. For example, we could have a topic called my.first.topic and we want to create a table called first_table in our database. This can still be achieved using a router and a table.name.format property.

tip

Be aware that not all types accepted in your Kafka topics are accepted in your database, JDBC Driver and/or the JDBC Sink. For example, the list of valid types from the perspective of the JDBC Sink is here.

Case insensitive

By default, the JDBC Sink will use quotes for all table and column names, which is usually fine, but PostgreSQL is case insensitive if quotes are not used, so if your data from Kafka comes with uppercase letters, for example, if you are using camelCase, but if you are not using quotes in your database, or you do not want to use them while querying, you should disable quotes in Kafka Connect with:

quote.sql.identifiers=never

Deploy new connectors in K8s

Deploying new connectors can be tricky, especially if you are using Kubernetes. Kafka Connect exposes an API that we can use to create new connectors, but we have to "manually" do some calls to create, update, or delete connectors. This is not the best way of integrating something on our CI/CD, especially if our CI is running outside our K8s cluster.

Ideally, we would want to have a configuration file in our repository, that can be updated and automatically deployed during our CI.

connect-operator

There is a solution for this, Confluent's connect-operator, although the solution is not very robust, it does the job.

This is based on the shell-operator, a Kubernetes operator that can be deployed in our cluster and configured to "listen" for specific events, like new deployments, changes in config maps or whatever we want. Specifically, this connect-operator is designed to listen for changes in a config map, and then it will create, update, or delete connectors based on the content of that config map.

In other words, we can put our Connector's configuration in a config map, and then the connect-operator will create the connector for us.

tip

The connect-operator does not need to be deployed together with Kafka Connect, it is an independent pod that will be running in our cluster, it can be in the same namespace or not. It also can listen for config-maps attached to our Kafka Connect or to any other deployment, all depending on our configuration and K8s permissions.

warning

The connect-operator is a nice tool that does the job, but it isn't very robust. For example, it does not check if a connector creation has failed or not, it only checks if the connector exists or not, sends a curl to the REST API, and then it assumes that everything is fine. In any case, it is just a bash script using JQ for configuration, so it can be easily modified to fit our needs.

Configuring the connect-operator

As the connect-operator is based on the shell-operator, it expects a configuration file in YAML format, where we can define the events we want to listen to.

By default, the operator is called in two ways:

  • At startup, it will be called with a flag --config and it has to return the configuration file in YAML format that specifies the events we want to listen to.
  • When an event is triggered, our script will be triggered with the event as a parameter.

Listening to config maps

The config that we have to return to listen for config map changes should see something similar to this:

configVersion: v1
kubernetes:
- name: ConnectConfigMapMonitor
apiVersion: v1
kind: ConfigMap
executeHookOnEvent: ["Added","Deleted","Modified"]
jqFilter: ".data"
labelSelector:
matchLabels:
destination: $YOUR_DESTINATION
namespace:
nameSelector:
matchNames: ["$YOUR_NAMESPACE"]

YOUR_DESTINATION must be the same as the label used in the config map, and YOUR_NAMESPACE must be the same as the namespace where the config map is deployed.

info

The default connect-operator has a config to enable or disable the connector, but it is done in a way that will enable or disable all your connectors at once, so I prefer to skip that part as I want to have multiple connectors in my config maps.

note

The configuration looks different from a standard K8s configuration, but the shell-operator can handle it and there is no need to declare a new CRD with that structure.

Config Map

The config map must have the connectors' configuration in JSON, in the same way, you will use it in the REST API. I would suggest building a Helm template for config maps, so you can write your connectors configuration in YAML and then convert it to JSON using Helm.

Something like this should work in Helm:

{{- if .Values.configMap }}
apiVersion: v1
kind: ConfigMap
metadata:
labels:
destination: {{ .Values.your-deployment-name }} # this has to match with the label in the connect-operator config
data:
{{- range $key, $value := .Values.configMap.data }}
{{ $key }}: {{ $value | toJson | quote | indent 6 | trim }}
{{- end }}
{{- end }}

After having this Helm template, we can write our Connector's config like this:

configMap:
data:
my-connector-name:
name: "my-connector-name"
config:
# JDBC Config
name: "my-connector-name"
connector.class: "io.confluent.connect.jdbc.JdbcSinkConnector"
# using IAM Auth
connection.url: "jdbc:aws-wrapper:postgresql://{host}:{port}/postgres?wrapperPlugins=iam&wrapperDialect=rds-pg"
connection.user: env.USERNAME
dialect.name: "PostgreSqlDatabaseDialect"
topics: "my-topic"
tasks.max: "4"
# ...
tip

The config-map can be attached to your Kafka Connect deployment, or to any other deployment, what matters is that the connect-operator can find it.

Once it is deployed as a config-map, the connect-operator will create the connector for us.

Replacing variables

The connect-operator uses jq to replace variables, they can be stored in some config files inside the docker, passed as arguments, or as environment variables. Having them inside config files inside the Docker images looks weird to me, why our operator should know about the context of our connectors?

These are some examples of replacing variables, but you can find more details in the jq documentation:

Variables:

connection.user: $username

Environment variables:

connection.user: env.USERNAME

Note that they are not between quotes.

Variables inside strings:

connection.user: "prefix_\(env.USERNAME)_suffix"
tip

If you are parsing this with Helm, you might need to have the string between single quotes, otherwise, Helm will fail on \(

RBAC permissions to read config maps

If your connect-operator stays in a different deployment than the config-map, you will need to give it permission to read the config map. This can be achieved using a Role and a RoleBinding using Helm.

Something like this needs to be created:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ .Values.your-app }}-configmap-read-role
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["list", "watch"] # List and Watch all configmaps, get only the ones specified in resourceNames
- apiGroups: [""]
resources: ["configmaps"]
resourceNames:
["your-destination"] # this is the deployment having the config-map
verbs: ["get", "watch", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ .Values.your-app }}-read-configmaps
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Values.your-app }}
roleRef:
kind: Role
name: {{ .Values.your-app }}-configmap-read-role
apiGroup: rbac.authorization.k8s.io
{{- end }}

Custom Kafka groups for your connectors

By default, Kafka Connect will create a group for each connector, and it will use the connector's name as the group name, with connect- as a prefix. This is not very flexible, as we might want to have our own group names. For example, if we are sharing K8s clusters with other teams, we might want to have our own group names to avoid conflicts. Or we could have our own naming convention with ACLs in Kafka.

To decide a group name, we have to change two configurations:

First, we have to create an environment variable that allows us to override some configs, including the group name:

CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY=All
tip

This can be on your deployment file or on your Dockerfile.

Then, we can add the group name to our connector's configuration:

consumer.override.group.id: "my-custom-group-name"

· 9 min read
Javier Montón

Big Data Types is a library that can safely convert types between different Big Data systems.

The power of the library

The library implements a few abstract types that can hold any kind of structure, and using type-class derivations, it can convert between multiple types without having any code relating them. In other words, there is no need to implement a transformation between type A to type B, the library will do it for you.

As an example, let's say we have a generic type called Generic. Now we want to convert from type A to type B. If we implement the conversion from A to Generic and the conversion from Generic to B, automatically we can convert from A to B although there is no single line of code mixing A and B.

We can also do the opposite, we can convert from B to A by implementing the conversion from B to Generic and the conversion from Generic to A. Now we can convert between A and B as we wish.

Now comes the good part of this. If we introduce a new type C and we want to have conversions, we would need to convert from A to C, and from B to C and the opposite (4 new implementations). If now we introduce D, we would need to implement the conversion from A to D, from B to D and from C to D and the opposite (6 new implementations). This is not scalable, and it is not maintainable.

Having this Generic type means that when we introduce C, we only need to implement the conversion from C to Generic and from Generic to C, without worrying at all about other implementations or types. Moreover, is likely that the conversion will be very similar to others, so we can reuse some of the code.

tip

It is important to know that one of these types is the Scala types themselves. So if we want to convert from Scala types (like case classes) to another type, we only need to implement Generic -> newType

How the library works

Modules

As mentioned, the library has multiple modules, each one of them representing a different system with its own types. Each module implements the conversion from and to Generic.

For now, the modules are core (for Scala types and common code), BigQuery, Cassandra, Circe, and Spark.

To use the library, only the modules that are needed should be imported. For example, if we want to convert from Scala types to BigQuery types, we only need to BigQuery module. (Core module is always included as a dependency) If we want to convert from Spark to BigQuery we need to import both Spark and BigQuery modules.

Generic type

The Generic type is called SqlType and it's implemented as sealed trait that can hold any kind of structure. In Scala 3, this type is implemented as an enum but both represents the same.

Repeated values

Usually, there are two ways of implementing a repeated value like an Array. Some systems use a type like Array or List and others flag a basic type with repeated. The implementation of this SqlType uses the latter, so any basic type can have a mode that can be Required, Nullable, or Repeated. This is closer to the BigQuery implementation.

note

This implementation does not allow for Nullable and Repeated at the same time, but a Repeated type can have 0 elements.

Nested values

The SqlStruct can hold a list of records, including other SqlStruct, meaning that we can have nested structures.

Type-class derivation

Type-classes are a way of implementing "ad-hoc polymorphism". This means that we can implement behaviour for a type without having to modify the type itself. In Scala, we achieve this through implicits.

The interesting part of type-classes for this library is that we can derive a type-class for a type without having to implement it.

For example, we can create a simple type-class:

trait MyTypeClass[A] {
def doSomething(a: A): String
}
tip

A type-class is always a trait with a generic type.

Then, we can implement our type-class for an Int type:

implicit val myTypeClassForInt: MyTypeClass[Int] = new MyTypeClass[Int] {
override def doSomething(a: Int): String = "This is my int" + a.toString
}
tip

Scala 2.13 has a simplified syntax for this when there is only one method in the trait:

implicit val myTypeClassForInt: MyTypeClass[Int] = (a: Int) => "This is my int" + a.toString

We can do similar for other types:

implicit val myTypeClassForString: MyTypeClass[String] = new MyTypeClass[String] {
override def doSomething(a: String): String = "This is my String" + a
}

Now, if we want to have a List[Int] or a List[String], and use our type-class, we need to implement both List[Int] and List[String]. But, if we implement the type-class for List[A] where A is any type, the compiler can derive the implementation for List[Int] and List[String] automatically, and for any other type already implemented.

implicit def myTypeClassForList[A](implicit myTypeClassForA: MyTypeClass[A]): MyTypeClass[List[A]] = new MyTypeClass[List[A]] {
override def doSomething(a: List[A]): String = a.map(myTypeClassForA.doSomething).mkString(",")
}

Similarly, if we want to have a case class like:

case class MyClass(a: Int, b: String)

We would need to implement the type-class for MyClass. But, if we implement the type-class for a generic Product type, the compiler can derive the implementation for MyClass automatically, and for any other case class that has types already implemented.

note

Implementing the conversion for a Product type is more complex than implementing it for a List type, and usually Shapeless is the library we use to do this in Scala 2.

In Scala 3, the language already allows us to derive the type-class for a Product type, so we don't need to use Shapeless.

In big-data-types we have the implementation for all basic types, including iterables and Product types here for Scala 2 and here for Scala 3.

Implementing a new type

To implement a new type, we need to implement the conversion from and to Generic type. There is a complete guide, step by step, with examples, in the official documentation

A quick example, let's say we want to implement a new type called MyType. We need to implement the conversion MyType -> Generic and Generic -> MyType.

tip

Both conversions are not strictly needed, if we only need to use Scala -> MyType we only need to implement Generic -> MyType because the library already has the conversion Scala -> Generic. The same happens with other types, like BigQuery -> MyType will also be ready automatically.

To do that, we need a type-class that works with our type. This will be different depending on the type we want to implement. For example:

trait GenericToMyType[A] {
def getType: MyTypeObject
}

Maybe our type works with a List at the top level, as Spark does, so instead, we will do:

trait GenericToMyType[A] {
def getType: List[MyTypeObject]
}
tip

getType can be renamed to anything meaningful, like toMyType or myTypeSchema

And we need to implement this type-class for all the (Generic) SqlType types:

implicit val genericToMyTypeForInt: GenericToMyType[SqlInt] = new GenericToMyType[SqlInt] {
override def getType: MyTypeObject = MyIntType
}

Using conversions

The defined type-classes allow you to convert MyType -> Generic by doing this:

val int: SqlInt = SqlTypeConversion[MyIntType].getType

And Generic -> MyType by doing this:

val int: MyIntType = SqlTypeToBigQuery[SqlInt].getType

This can work well when we work these case classes and we don't have an instance of them. For example, a case class definition can be converted into a BigQuery Schema, ready to be used for table creation.

But, sometimes, our types work with instances rather than definitions, and we need to use them to convert to other types.

There is another type-class on all implemented types that allows to work with instances. In general, this type-class can be implemented using code from the other, but this one expects an argument of the type we want to convert to.

trait SqlInstanceToMyType[A] {
def myTypeSchema(value: A): MyTypeObject
}

Implementing this type-class allows to use the conversion like this:

val mySchema: MyTypeObject = SqlInstanceToMyType.myTypeSchema(theOtherType)

But these syntaxis are not very friendly, and we can use extension methods to make it more readable.

Extension methods

Extension methods in Scala 2 are done through implicit classes and allow us to create new methods for existing types.

In the library, we implement extension methods for Generic -> SpecificType, and the interesting part, again, is that we don't need to implement A -> B directly, the compiler can derive it for us.

  implicit class InstanceSyntax[A: SqlInstanceToMyType](value: A) {
def asMyType: MyTypeObject = SqlInstanceToMyType[A].myTypeSchema(value)
}

and suddenly, we can use the conversion like this:

val mySchema: MyTypeObject = theOtherType.asMyType

And this is a syntax that can be easier to use. For example, if we work with Spark and BigQuery, we can do the following:

val sparkDf: DataFrame = ???
val bigQuerySchema = sparkDf.schema.asBigQuery

More types to come

The library has only a few types implemented (BigQuery, Spark, Cassandra, and Circe) but implementing a new type is fairly easy and it gets automatically methods that can be used to convert it into any other type already implemented. As this grows, the number of conversions grows exponentially, and the library becomes more powerful.

Some types that could be potentially implemented:

  • Avro
  • Parquet
  • Athena (AWS)
  • Redshift (AWS)
  • Snowflake
  • RDS (relational databases)
  • Protobuf
  • ElasticSearch templates
  • ...

Some types could have some restrictions, but they could be implemented differently, for example, a type conversion could be implemented as a String conversion, being the string a "Create table" statement for a specific database and automatically any other type could be printed as a "Create table" statement.