Skip to main content

3 posts tagged with "MSK"

View All Tags

· 4 min read
Javier Montón

When working with Kafka, increasing or decreasing the number of brokers isn't as trivial as it seems. If you add a new broker, it will stand there doing nothing. You have to manually reassign partitions of your topics to the new broker. But you don't want to just move some topics completely to your new broker, you want to spread your partitions and their replicas equitably across all your brokers. You also want to have the number of leader partitions balanced across all your brokers.

Reassign partitions

To reassign partitions to different brokers, you can use the Kafka binaries (bin/kafka-reassign-partitions.sh), but it isn't trivial if you have to reassign thousands of topics.

The binary file has three operations:

  • --generate. This will generate a plan to reassign partitions.
  • --execute. This will execute the plan.
  • --verify. This will verify the status of the reassignment.
tip

A throttle can be set to avoid overloading the brokers, and the throttle will remain in the cluster after the reassignment, until a --verify is run when the reassignment has finished, so it's highly recommended to run --verify until you are sure all the partitions have been reassigned.

To create a plan, you have to pass a JSON file with the topics you want to reassign and the brokers you want to reassign them to. e.g.:

{
"topics": [
{ "topic": "foo1" },
{ "topic": "foo2" }
],
"version": 1
}

And it will generate you a file like this:

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[2,1],"log_dirs":["any"]},
{"topic":"foo1","partition":1,"replicas":[1,3],"log_dirs":["any"]},
{"topic":"foo1","partition":2,"replicas":[3,4],"log_dirs":["any"]},
{"topic":"foo2","partition":0,"replicas":[4,2],"log_dirs":["any"]},
{"topic":"foo2","partition":1,"replicas":[2,1],"log_dirs":["any"]},
{"topic":"foo2","partition":2,"replicas":[1,3],"log_dirs":["any"]}]
}

The input expects you to give the list of brokers (1,2,3,4,5...) and this JSON with the whole list of partitions and replicas.

warning

The first number in "replicas":[1,3] is the leader partition, the rest are the followers. This is very important because you might end up with more leader partitions in a broker than others, increasing its workload

Problem with Kafka Reassign tool

When you need to reassign a big cluster, you might find some issues with the --generate command:

  • The plan generated is completely random. Running it twice for a topic will produce different results.
  • The plan generated is not always optimal. It might assign more partitions to one broker than to another.

On a cluster with thousands of partitions this might be ok, as probably randomizing the partitions will be enough to balance them across the brokers, but it might not be completely optional. Also, if you need to run this for a lot of topics, and you want to do it on batches, you don't want to run the --generate command for a topic twice, in that case, you will be reassigning a topic that was already reassigned.

Building a custom reassign plan.

To manage partitions more properly, a custom tool can be built, where partitions are defined based on the topic name and the list of brokers. By doing this, reassigning partitions on the same topic twice won't produce any changes. A tool like that can be used to manage the reassignment of partitions in a more controlled way.

Balance Leader partitions

After reassigning a lot of partitions, the leader partitions might not be well-balanced across your brokers. This means that a broker might have more leader partitions than other brokers, which is translated into more workload. An example in Kafka-UI: img.png

If you wait, the cluster probably will rebalance the leader partitions on its own (if auto.leader.rebalance.enable=true is set).

In order to force a rebalance, you can use the bin/kafka-leader-election.sh binary.

This an example of the CPU usage of brokers before and after the leader election: img.png

e.g.:

$ bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions

--election-type can be preferred or unclean. preferred will try to move the leader partition to the preferred broker, unclean will move the leader partition to any broker.

TL;DR

To reassign partitions to new brokers:

  • Use the bin/kafka-reassign-partitions.sh with a list of topics, brokers and the --generate command.
  • Use the bin/kafka-reassign-partitions.sh with the generated JSON and the --execute command.
  • Use the bin/kafka-reassign-partitions.sh with the generated JSON and the --verify command.
  • Use the bin/kafka-leader-election.sh to balance the leader partitions across your brokers.

· 7 min read
Javier Montón

This post is about Kafka Connect, Mirror Maker 2, how they manage offsets, and how to deal with them.

Kafka Offsets

When a consumer starts consuming messages from Kafka, it will probably use a consumer-group and Kafka will store the offset of the last message consumed by that consumer-group. This offset is stored in a Kafka topic called __consumer_offsets.

By doing this, when the consumer restarts, it can start consuming messages from the last offset it consumed.

There are tools that allow us to manage these offsets, like the binary files provided by Kafka, and in any case, the consumer can always decide which offsets to consume from. This means that the consumer can set the offset to the beginning, the end, or any other offset. A lot of tools allow even to search offsets based on timestamps.

Monitoring offset lag

The consumer-offset-lag is a metric provided by Kafka, based on the difference between the last offset consumed by the consumer and the last offset produced by the producer. Most of the monitoring tools will have this value, like DataDog, and it is very useful to know if the consumer is lagging, meaning that it is not consuming messages as fast as they are produced, or it is down.

But Mirror Maker 2 (MM2) can not be monitored through this metric, see below.

Kafka Connect Offsets

note

When we talk about Kafka Connect, we are talking about the distributed version of Kafka Connect, not the standalone version.

Kafka Connect manage offsets in their own way. When a consumer is started by Kafka Connect, it will store the offsets in a Kafka topic called connect-offsets by default, although it can be configured through the offset.storage.topic property.

tip

When a connector is created in Kafka Connect, it will track the offsets in its own topic, but it also will use a regular "consumer-group" so the offsets will be also tracked in __consumer_offsets. This also means that we can monitor Kafka Connect sinks through the consumer-offset-lag metric.

Mirror Maker 2 (MM2) Offsets

Mirror Maker 2 is a tool provided by Kafka to replicate messages from one Kafka cluster to another, it is meant to be used for a complete replication between clusters, but it can be used to copy only some topics.

note

MM2 can be run in different ways, but here we are talking about the Kafka Connect way, where MM2 is run as a Kafka Connect connector.

If we think about MM2 and the data it needs to consume and produce, we can have doubts about how offsets can be managed. To start with, it needs to read in one cluster and write on another, so, how does it manage the offsets?

By default, MM2 will create a topic called mm2-offset-syncs.<cluster-alias>.internal and as far asI know, it can not be renamed.

tip

While working with MM2, it is recommended to install the connectors in the "target" cluster, so the "source" cluster will be the external one.

By default, MM2 will create the aforementioned topic in the "source" cluster, and it will store the offsets of the last message consumed and produced. But as we can see, the "source" cluster is "external" to where the Kafka Connect is installed, and that might cause some issues in cases where the "source" cluster is not managed by us. For example, we might not have write or create access, and we can not create the topic.

The destination of mm2-offset-syncs.<cluster-alias>.internal can be defined by the offset-syncs.topic.location property which accepts source (default) and target.

note

When a Consumer is created by MM2, which is a Kafka Connect connector, it will store the offsets both in mm2-offset-syncs.<cluster-alias>.internal and in connect-offsets. This is very important if we want to manipulate offsets

warning

MM2 consumers do not use a group.id, they do not use any Kafka consumer group and their consumed offset won't be stored in __consumer_offsets. This also means that we can not monitor MM2 through the consumer-offset-lag metric.

Mixing Kafka Connect and MM2

If we look at the offsets stored both by Kafka Connect and MM2 in their topics, we can see the following:

Kafka Connect topic

If we look at the connect-offsets topic, we can see that the offsets are stored in JSON format, with the following structure:

  • key is a structure that contains the connector name, the partition, the topic, and the cluster.
[
"my-connector-name",
{
"cluster": "my-source-cluster-alias",
"partition": 3,
"topic": "my-example-topic"
}
]
  • And the value is a JSON with the offset:
{
"offset": 1234
}
note

No matter where we store the offsets (source or target), Kafka Connect will show the "source cluster alias" as this is where the Kafka consumer is created.

MM2 topic

If we look at the mm2-offset-syncs.<cluster-alias>.internal topic, we can see KC uses its own format to store the offsets:

  • key is the connector name, but it has a few extra bytes, which represents some structure defined inside the code
  • value is just an Int64, which represents the offset

Managing offsets is not really recommended as we could mess up the connectors, but it is possible to do it.

Hot to reset offsets in Mirror Maker 2

If we need to reset the offsets in MM2, we might think that deleting the topic mm2-offset-syncs.<cluster-alias>.internal will do the trick, but it won't, as offsets are also stored in Kafka Connect's topic. So, we need to reset the offsets in both topics.

There is a lot of misinformation about how to reset the offsets in Kafka Connect, their docs are not very clear about it, and Kafka Connect has been lacking tools to work with it. Typically, removing the connector and creating it with a different name will do the trick, but we might want to keep the same name.

Manual edit of offsets

We can manually produce a message in the connect-offsets topic to reset offsets, and the right way of doing it is to send a tombstone. We can check the messages we have right now, identify the connector we want and send the same Key with null value.

note

To reset offsets completely we do not specify offset: 0, we send a null value

REST API to reset offsets

Starting from Kafka 3.6.0, Kafka Connect has a REST API to manage connectors, and it is possible to reset offsets through it. The docs about it are defined in the KPI-875, but they are still not present in the official docs. If you are using Confluent, starting from Confluent's 7.6.0 version Kafka 3.6.0 is included.

If we use this version, we can simply do a few curls to reset offsets. First we need to stop the connector and then reset the offsets.

curl -X PUT http://localhost:8083/connectors/my-connector-name/offsets/stop
curl -X DELETE http://localhost:8083/connectors/my-connector-name/offsets

We can also know the status of the offsets:

curl -X GET http://localhost:8083/connectors/my-connector-name/offsets | jq

TL;DR

To reset offsets in MM2, you need to:

  • Stop, pause or remove the connectors
  • Delete or truncate the topic mm2-offset-syncs.<cluster-alias>.internal
  • Reset the offsets in the connect-offsets topic, either manually or through the REST API for the desired connector
  • Start the connectors again
warning

Deleting the topic mm2-offset-syncs.<cluster-alias>.internal will not reset the offsets for other connectors you have configured in MM2 as they fall back to the connect-offsets topic, but be careful and do this at your own risk, things might change in the future and this could become false.

· 20 min read
Javier Montón

A guide to move data from Kafka to an AWS RDS using Kafka Connect and the JDBC Sink Connector with IAM Auth.

Kafka Connect

For these examples, we are using Confluent's Kafka Connect on its Docker version, as we are going to deploy it in a Kubernetes cluster.

Single and distributed modes

Kafka Connect comes with two modes of execution, single and distributed. The main difference between them is that the single mode runs all the connectors in the same JVM, while the distributed mode runs each connector in its own JVM. The distributed mode is the recommended one for production environments, as it provides better scalability and fault tolerance. In the case of K8s, it means we will be using more than one pod to run Kafka Connect.

warning

Be aware that these two modes use different class paths, so if you are doing changes inside the docker and you are running the single mode locally but distributed in production, you might have different results. I strongly recommend to check manually which are the class paths in each case using something like

ps aux | grep java

And you will get something like this:

java -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/log/kafka -Dlog4j.configuration=file:/etc/kafka/connect-log4j.properties -cp /etc/kafka-connect/jars/*:/usr/share/java/kafka/*:/usr/share/java/confluent-common/*:/usr/share/java/kafka-serde-tools/*:/usr/share/java/monitoring-interceptors/*:/usr/bin/../share/java/kafka/*:/usr/bin/../share/java/confluent-telemetry/* org.apache.kafka.connect.cli.ConnectDistributed /etc/kafka-connect/kafka-connect.properties

And you'll find all the directories (after -cp) included in the running Kafka Connect.

  • Note that a folder called cp-base-new is widely used in the Single mode, but is not very well documented.
  • Setting your deployment to 1 replicas will run Kafka Connect in Single mode while setting it to 2 or more will run it in Distributed mode.

Deploying in K8s

This should be fairly straightforward, as we are using Confluent's Kafka Connect Docker image, which is already prepared to be deployed in K8s. Confluent provides a Helm chart as an example, so it should be easy. You can also create your own.

Using MSK (Kafka)

If you are using the AWS's Kafka version, MSK, and you are authenticating using IAM, you will need to do a few things:

  • Configure some environment variables in Kafka Connect
  • Add the required AWS libraries to the classpath

Environment variables

CONNECT_BOOTSTRAP_SERVERS will have the brokers, as usual, but using the 9098 port.

You need to specify the IAM callback handler as well as SASL:

CONNECT_SASL_CLIENT_CALLBACK_HANDLER_CLASS = software.amazon.msk.auth.iam.IAMClientCallbackHandler
CONNECT_SASL_MECHANISM = AWS_MSK_IAM
CONNECT_SECURITY_PROTOCOL = SASL_SSL

Also, you have to provide a JAAS file with the credentials. You can find more info about this in the AWS's documentation. For IAM, something like this should work:

CONNECT_SASL_JAAS_CONFIG = 
software.amazon.msk.auth.iam.IAMLoginModule required
awsRoleArn="arn:aws:iam::{account}:role/{role}"
awsStsRegion="{region}";

If you do this in yaml for Helm, it will look like this:

  - name: CONNECT_SASL_JAAS_CONFIG
value: >-
software.amazon.msk.auth.iam.IAMLoginModule required
awsRoleArn="arn:aws:iam::{account}:role/{role}"
awsStsRegion="{region}";

When Kafka Connect creates a new connector, it will use its own credentials configuration, so if you want to have the same IAM auth, you will need to add the same values to these environment variables:

  • CONNECT_CONSUMER_SASL_CLIENT_CALLBACK_HANDLER_CLASS
  • CONNECT_CONSUMER_SASL_MECHANISM
  • CONNECT_CONSUMER_SECURITY_PROTOCOL
  • CONNECT_CONSUMER_SASL_JAAS_CONFIG
tip

If you are using your own Helm template, you could create some variables for these values, so you can reuse them in the different environment variables, to avoid writing them twice.

Formatting logs as JSON

Logs are very important, and having a good format is key to being able to read and process them easily. Usually, in production, we could want to have them as JSON, and Kafka Connect does not make it as easy for us as we might expect.

If you only want to change the log level or format your logs a bit, you could use the environment variables available for that, they are described in their docs but if you want to proper format all logs as JSON, you will need to do a few more things.

Using JSONEventLayoutV1

Kafka Connect uses log4j1 (not log4j2), so we will need to use a log4j.properties file to configure it. They are using a patched version of the original Log4j1 that is supposed to fix some vulnerabilities.

We can use a dependency that automatically converts all of our logs into JSON, like log4j-jsonevent-layout. See Adding libraries for more info about how to add new libraries. If we have this library in the classpath, we can now use the JSONEventLayoutV1 in our log4j.properties file. Like:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=net.logstash.log4j.JSONEventLayoutV1

Properties files

Confluent will tell you that you can modify the template for logs in /etc/confluent/docker/log4j.properties.template, but you might need some extra steps if you want all logs as JSON.

  • Template for most of the logs, as described, in /etc/confluent/docker/log4j.properties.template
  • Logs from the "Admin Client" in /etc/kafka/log4j.properties
  • Some tool logs in /etc/confluent/docker/tools-log4j.properties.template
  • Some startup logs in /etc/kafka-connect/log4j.properties
  • There are also some random logs not using Log4j, they are defined in /usr/lib/jvm/jre/conf/logging.properties

If you want to format everything to JSON, I would recommend entering inside the docker image, looking for those files, and changing them as desired. Your Dockerfile could then replace them while creating the image.

warning

There are still some logs during the start-up not formatted as JSON. Confluent's Kafka Connect is using a Python script to start up the service, and that service is using some prints that do not belong to any Log4j, so they are not formatted in any way.

If you want to format those prints too, you will need to do something else as they don't have any configuration files. You could use a sed command to replace them, or you could modify the cub.py file in your image with the desired format.

Adding plugins

Adding plugins should be straightforward, the documentation explains pretty well how to do it. Note that you can add plugins inside the plugins folder or you could modify the plugins folder with

plugin.path=/usr/local/share/kafka/plugins

In any case, copying and pasting files into a Docker can limit a bit the flexibility of the solution, so I would recommend building a project where you can add all the dependencies that you need, meaning that libraries and plugins can be built and copied inside the Docker during the CI. By doing this, you will be able to use Gradle, Maven, SBT, or any other building tool to manage your dependencies, upgrade versions, and build plugins.

tip

Note that Plugins and libraries are not included in the same path, so I would recommend building a different project for each. For example, we could build a main project that can build the Kafka Connect image with their libraries and a subproject that can build plugins in a different folder. Then, the Dockerfile could easily copy both folders into the image in the right paths.

If you build a project like that, to add the JDBC plugin, for example, in Gradle you only need to add this:

dependencies {
implementation("io.confluent:kafka-connect-jdbc:10.7.4")
}

Adding libraries

As mentioned earlier, libraries must go in the classpath, not in the plugins' folder. If you are using a project to build your libraries and plugins, you could use many different plugins to pack all the dependencies into a .jar that can be copied into the Docker image.

For example, with Gradle, we could include the AWS library needed for IAM authentication, and the Log4j JSON formatter, like this:

dependencies {
implementation("software.amazon.msk:aws-msk-iam-auth:1.1.7")
implementation("net.logstash.log4j:jsonevent-layout:1.7")
}

Using a plugin to build a fat jar, everything should be included in one .jar file that we can copy into the Docker image.

tip

For the JDBC Sink, we will need to also include a Driver and more libraries in case we want to use IAM Auth with RDS, we will see that later.

Kafka Connect REST API

By default, Kafka Connect exposes its REST API in the port 8083. You can find more info about the API in the official documentation.

If you want to control who can access the API or change its port, you can use the CONNECT_LISTENERS and/or CONNECT_REST_ADVERTISED_PORT environment variables. For example, if you want to change the port to 8084, you could do this:

  - name: CONNECT_REST_ADVERTISED_PORT
value: "8084"

Also, you can even open the API in multiple ports, by doing this:

  - name: CONNECT_LISTENERS
value: "http://0.0.0.0:8084,http://0.0.0.0:8085"
- name: CONNECT_REST_ADVERTISED_PORT
value: "8084"

Securing the API

Kafka Connect's REST API lacks security options, it only allows you to use a basic authentication, which might not be what you are looking for. Also, the code seems to have several places where they do an if - else to check if basic auth is enabled or not.

But, there is also another way we can use to build our own security layer.

JAX-RS Security Extensions

Without entering too much into details, Kafka Connect, as well as Schema Registry, are using JAX-RS to build their REST APIs, and JAX-RS allows us to add our own security extensions. Following this pattern, we could add a simple filter to check if the user is authenticated or not, and if not, we could return a 401 error.

About how to authenticate a user, we could use different methods, depending on our setup. For example, we could use AWS IAM API to check if a user has permissions or not, or as we are deploying this in Kubernetes, we could rely on Kubernetes identities which will allow us to authenticate pods using a JWT token.

To do this, you have to create a JAX-RS plugin and then register it in Kafka Connect. Once your plugin is ready, you can register it by extending ConnectRestExtension:

class MySecurityExtension extends ConnectRestExtension {}

This class will need to be packed with your libraries and included in the classpath, as we did with other libraries.

JDBC Sink Connector

We need to download the plugin and add it to the plugins' folder. By default, it's /usr/share/confluent-hub-components/.

You can get the .jar with wget and copy it inside the Docker image, in the aforementioned folder. Or, as suggested earlier, if you are building a project using a building tool, like Gradle, you can use Maven to download all the plugins you might need. We only need to add the dependency:

dependencies {
implementation("io.confluent:kafka-connect-jdbc:10.7.4")
}

And build the .jar. Then, we can copy it inside the Docker image.

Drivers

Only the plugin is not enough to connect to a database, we will also need the driver. In our case, we are using PostgreSQL RDS, so we will need the driver for Postgres.

info

Several drivers are already included in the Kafka Connect image, but they are not inside the default classpath, so if we try to run the connector without adding the driver properly, we will get an error like No suitable driver found. They are placed in /usr/share/confluent-hub-components/, but as we can see using something like ps aux | grep java, they are not included in the classpath. So, we have three options:

  • Move the driver to the classpath
  • Add the drivers' folder to the classpath
  • Find our own driver and copy it inside the Docker image, in the classpath

I would go for the third option, which gives us more flexibility about which version of the driver we want to use.

So, we can download the driver and pack it with our libraries, and then copy it inside the Docker image:

implementation("org.postgresql:postgresql:42.7.1")
tip

Note that the JDBC Sink has to be placed in plugins folder, while the driver has to be placed in the library classpath.

IAM Auth

If you are using IAM Auth with RDS, you will need to add some extra libraries to the classpath. You can find more info about this in the AWS's documentation.

The bad news is that a simple driver can not use IAM Auth, if you try to connect to the database using IAM Auth, you will get an error like The server requested password-based authentication, but no password was provided.. You would need to create a token manually and pass it through the connection.

The good news is that there is a library created by AWS that acts as a wrapper for your JDBC Drivers, adding extra features, including IAM Auth.

To use IAM Auth, we only need to add this driver to the classpath, and change a bit our JDBC URL.

Add the dependency (also needed libraries for AWS RDS):

implementation("com.github.awslabs:aws-advanced-jdbc-wrapper:2.3.2")
implementation("software.amazon.awssdk:rds:2.20.145")

You can follow their docs, but in our case, we will need to change the JDBC URL to add a couple of things:

  • URL using the new driver
  • IAM Auth enabled flag enabled
jdbc:aws-wrapper:postgresql://{host}:{port}/postgres?wrapperPlugins=iam
tip

Some tips:

  • This JDBC URL goes inside the connector's configuration
  • A username is still required, and it should be the same as the role used to connect to the database
  • You can also help the wrapper to find the dialect used, by adding &wrapperDialect=rds-pg
  • You can also help Kafka Connect to find the dialect used, by adding another property in your connector's configuration: dialectName: "PostgreSqlDatabaseDialect"

META-INF/services and multiple drivers

At this point, we are including the JDBC PostgreSQL driver and the wrapper in the classpath, both are JDBC Drivers, if we are including different .jar files, everything should be fine, but if we are building a fat-jar, we might have some issues. Each one of these drivers is creating a file called META-INF/services/java.sql.Driver, and they are including the name of the driver in it. If our fat-jar is not merging them to include both classes, we will get an error like No suitable driver found.

Depending on the building tool and the plugin used, we might need to add some extra configuration to merge these files. For example, in Gradle we could need to add something like this:

mergeServiceFiles()

Or in the SBT Assembly plugin, we could need to add something like:

assembly / assemblyMergeStrategy := MergeStrategy.concat

Topic & Destination table

The JDBC Sink Connector allows us to decide which topics and tables we want to use, and we have two ways of doing it:

  • One topic / table per connector. In this case, we can directly write the topic and table names in the connector's configuration.
  • Multiple topics / tables per connector. In this case, we will need to use a pattern for topics and another for tables.

One topic / table per connector

This is the easiest way, we only need to add the topic and table names in the connector's configuration, like this:

topics: "my.first.topic"
table.name.format: "first_table"

Using patterns for topics and table names

The JDBC does some magic to map topics to tables, but it's not always what we want. For example, if we have a topic called my.topic it will take my as schema name and topic as table name. More details about table parsing can be found in their docs.

But, we likely use a pattern for our topics, especially if we are building a Datalake, so we might want to create tables based on a different pattern. For example, we could have a topic called my.first.topic and we want to create a table called first_table in our database. This can still be achieved using a router and a table.name.format property.

tip

Be aware that not all types accepted in your Kafka topics are accepted in your database, JDBC Driver and/or the JDBC Sink. For example, the list of valid types from the perspective of the JDBC Sink is here.

Case insensitive

By default, the JDBC Sink will use quotes for all table and column names, which is usually fine, but PostgreSQL is case insensitive if quotes are not used, so if your data from Kafka comes with uppercase letters, for example, if you are using camelCase, but if you are not using quotes in your database, or you do not want to use them while querying, you should disable quotes in Kafka Connect with:

quote.sql.identifiers=never

Deploy new connectors in K8s

Deploying new connectors can be tricky, especially if you are using Kubernetes. Kafka Connect exposes an API that we can use to create new connectors, but we have to "manually" do some calls to create, update, or delete connectors. This is not the best way of integrating something on our CI/CD, especially if our CI is running outside our K8s cluster.

Ideally, we would want to have a configuration file in our repository, that can be updated and automatically deployed during our CI.

connect-operator

There is a solution for this, Confluent's connect-operator, although the solution is not very robust, it does the job.

This is based on the shell-operator, a Kubernetes operator that can be deployed in our cluster and configured to "listen" for specific events, like new deployments, changes in config maps or whatever we want. Specifically, this connect-operator is designed to listen for changes in a config map, and then it will create, update, or delete connectors based on the content of that config map.

In other words, we can put our Connector's configuration in a config map, and then the connect-operator will create the connector for us.

tip

The connect-operator does not need to be deployed together with Kafka Connect, it is an independent pod that will be running in our cluster, it can be in the same namespace or not. It also can listen for config-maps attached to our Kafka Connect or to any other deployment, all depending on our configuration and K8s permissions.

warning

The connect-operator is a nice tool that does the job, but it isn't very robust. For example, it does not check if a connector creation has failed or not, it only checks if the connector exists or not, sends a curl to the REST API, and then it assumes that everything is fine. In any case, it is just a bash script using JQ for configuration, so it can be easily modified to fit our needs.

Configuring the connect-operator

As the connect-operator is based on the shell-operator, it expects a configuration file in YAML format, where we can define the events we want to listen to.

By default, the operator is called in two ways:

  • At startup, it will be called with a flag --config and it has to return the configuration file in YAML format that specifies the events we want to listen to.
  • When an event is triggered, our script will be triggered with the event as a parameter.

Listening to config maps

The config that we have to return to listen for config map changes should see something similar to this:

configVersion: v1
kubernetes:
- name: ConnectConfigMapMonitor
apiVersion: v1
kind: ConfigMap
executeHookOnEvent: ["Added","Deleted","Modified"]
jqFilter: ".data"
labelSelector:
matchLabels:
destination: $YOUR_DESTINATION
namespace:
nameSelector:
matchNames: ["$YOUR_NAMESPACE"]

YOUR_DESTINATION must be the same as the label used in the config map, and YOUR_NAMESPACE must be the same as the namespace where the config map is deployed.

info

The default connect-operator has a config to enable or disable the connector, but it is done in a way that will enable or disable all your connectors at once, so I prefer to skip that part as I want to have multiple connectors in my config maps.

note

The configuration looks different from a standard K8s configuration, but the shell-operator can handle it and there is no need to declare a new CRD with that structure.

Config Map

The config map must have the connectors' configuration in JSON, in the same way, you will use it in the REST API. I would suggest building a Helm template for config maps, so you can write your connectors configuration in YAML and then convert it to JSON using Helm.

Something like this should work in Helm:

{{- if .Values.configMap }}
apiVersion: v1
kind: ConfigMap
metadata:
labels:
destination: {{ .Values.your-deployment-name }} # this has to match with the label in the connect-operator config
data:
{{- range $key, $value := .Values.configMap.data }}
{{ $key }}: {{ $value | toJson | quote | indent 6 | trim }}
{{- end }}
{{- end }}

After having this Helm template, we can write our Connector's config like this:

configMap:
data:
my-connector-name:
name: "my-connector-name"
config:
# JDBC Config
name: "my-connector-name"
connector.class: "io.confluent.connect.jdbc.JdbcSinkConnector"
# using IAM Auth
connection.url: "jdbc:aws-wrapper:postgresql://{host}:{port}/postgres?wrapperPlugins=iam&wrapperDialect=rds-pg"
connection.user: env.USERNAME
dialect.name: "PostgreSqlDatabaseDialect"
topics: "my-topic"
tasks.max: "4"
# ...
tip

The config-map can be attached to your Kafka Connect deployment, or to any other deployment, what matters is that the connect-operator can find it.

Once it is deployed as a config-map, the connect-operator will create the connector for us.

Replacing variables

The connect-operator uses jq to replace variables, they can be stored in some config files inside the docker, passed as arguments, or as environment variables. Having them inside config files inside the Docker images looks weird to me, why our operator should know about the context of our connectors?

These are some examples of replacing variables, but you can find more details in the jq documentation:

Variables:

connection.user: $username

Environment variables:

connection.user: env.USERNAME

Note that they are not between quotes.

Variables inside strings:

connection.user: "prefix_\(env.USERNAME)_suffix"
tip

If you are parsing this with Helm, you might need to have the string between single quotes, otherwise, Helm will fail on \(

RBAC permissions to read config maps

If your connect-operator stays in a different deployment than the config-map, you will need to give it permission to read the config map. This can be achieved using a Role and a RoleBinding using Helm.

Something like this needs to be created:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ .Values.your-app }}-configmap-read-role
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["list", "watch"] # List and Watch all configmaps, get only the ones specified in resourceNames
- apiGroups: [""]
resources: ["configmaps"]
resourceNames:
["your-destination"] # this is the deployment having the config-map
verbs: ["get", "watch", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ .Values.your-app }}-read-configmaps
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Values.your-app }}
roleRef:
kind: Role
name: {{ .Values.your-app }}-configmap-read-role
apiGroup: rbac.authorization.k8s.io
{{- end }}

Custom Kafka groups for your connectors

By default, Kafka Connect will create a group for each connector, and it will use the connector's name as the group name, with connect- as a prefix. This is not very flexible, as we might want to have our own group names. For example, if we are sharing K8s clusters with other teams, we might want to have our own group names to avoid conflicts. Or we could have our own naming convention with ACLs in Kafka.

To decide a group name, we have to change two configurations:

First, we have to create an environment variable that allows us to override some configs, including the group name:

CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY=All
tip

This can be on your deployment file or on your Dockerfile.

Then, we can add the group name to our connector's configuration:

consumer.override.group.id: "my-custom-group-name"