Skip to main content

2 posts tagged with "Scala"

View All Tags

· 7 min read
Javier Montón

This post is about Kafka Connect, Mirror Maker 2, how they manage offsets, and how to deal with them.

Kafka Offsets

When a consumer starts consuming messages from Kafka, it will probably use a consumer-group and Kafka will store the offset of the last message consumed by that consumer-group. This offset is stored in a Kafka topic called __consumer_offsets.

By doing this, when the consumer restarts, it can start consuming messages from the last offset it consumed.

There are tools that allow us to manage these offsets, like the binary files provided by Kafka, and in any case, the consumer can always decide which offsets to consume from. This means that the consumer can set the offset to the beginning, the end, or any other offset. A lot of tools allow even to search offsets based on timestamps.

Monitoring offset lag

The consumer-offset-lag is a metric provided by Kafka, based on the difference between the last offset consumed by the consumer and the last offset produced by the producer. Most of the monitoring tools will have this value, like DataDog, and it is very useful to know if the consumer is lagging, meaning that it is not consuming messages as fast as they are produced, or it is down.

But Mirror Maker 2 (MM2) can not be monitored through this metric, see below.

Kafka Connect Offsets

note

When we talk about Kafka Connect, we are talking about the distributed version of Kafka Connect, not the standalone version.

Kafka Connect manage offsets in their own way. When a consumer is started by Kafka Connect, it will store the offsets in a Kafka topic called connect-offsets by default, although it can be configured through the offset.storage.topic property.

tip

When a connector is created in Kafka Connect, it will track the offsets in its own topic, but it also will use a regular "consumer-group" so the offsets will be also tracked in __consumer_offsets. This also means that we can monitor Kafka Connect sinks through the consumer-offset-lag metric.

Mirror Maker 2 (MM2) Offsets

Mirror Maker 2 is a tool provided by Kafka to replicate messages from one Kafka cluster to another, it is meant to be used for a complete replication between clusters, but it can be used to copy only some topics.

note

MM2 can be run in different ways, but here we are talking about the Kafka Connect way, where MM2 is run as a Kafka Connect connector.

If we think about MM2 and the data it needs to consume and produce, we can have doubts about how offsets can be managed. To start with, it needs to read in one cluster and write on another, so, how does it manage the offsets?

By default, MM2 will create a topic called mm2-offset-syncs.<cluster-alias>.internal and as far asI know, it can not be renamed.

tip

While working with MM2, it is recommended to install the connectors in the "target" cluster, so the "source" cluster will be the external one.

By default, MM2 will create the aforementioned topic in the "source" cluster, and it will store the offsets of the last message consumed and produced. But as we can see, the "source" cluster is "external" to where the Kafka Connect is installed, and that might cause some issues in cases where the "source" cluster is not managed by us. For example, we might not have write or create access, and we can not create the topic.

The destination of mm2-offset-syncs.<cluster-alias>.internal can be defined by the offset-syncs.topic.location property which accepts source (default) and target.

note

When a Consumer is created by MM2, which is a Kafka Connect connector, it will store the offsets both in mm2-offset-syncs.<cluster-alias>.internal and in connect-offsets. This is very important if we want to manipulate offsets

warning

MM2 consumers do not use a group.id, they do not use any Kafka consumer group and their consumed offset won't be stored in __consumer_offsets. This also means that we can not monitor MM2 through the consumer-offset-lag metric.

Mixing Kafka Connect and MM2

If we look at the offsets stored both by Kafka Connect and MM2 in their topics, we can see the following:

Kafka Connect topic

If we look at the connect-offsets topic, we can see that the offsets are stored in JSON format, with the following structure:

  • key is a structure that contains the connector name, the partition, the topic, and the cluster.
[
"my-connector-name",
{
"cluster": "my-source-cluster-alias",
"partition": 3,
"topic": "my-example-topic"
}
]
  • And the value is a JSON with the offset:
{
"offset": 1234
}
note

No matter where we store the offsets (source or target), Kafka Connect will show the "source cluster alias" as this is where the Kafka consumer is created.

MM2 topic

If we look at the mm2-offset-syncs.<cluster-alias>.internal topic, we can see KC uses its own format to store the offsets:

  • key is the connector name, but it has a few extra bytes, which represents some structure defined inside the code
  • value is just an Int64, which represents the offset

Managing offsets is not really recommended as we could mess up the connectors, but it is possible to do it.

Hot to reset offsets in Mirror Maker 2

If we need to reset the offsets in MM2, we might think that deleting the topic mm2-offset-syncs.<cluster-alias>.internal will do the trick, but it won't, as offsets are also stored in Kafka Connect's topic. So, we need to reset the offsets in both topics.

There is a lot of misinformation about how to reset the offsets in Kafka Connect, their docs are not very clear about it, and Kafka Connect has been lacking tools to work with it. Typically, removing the connector and creating it with a different name will do the trick, but we might want to keep the same name.

Manual edit of offsets

We can manually produce a message in the connect-offsets topic to reset offsets, and the right way of doing it is to send a tombstone. We can check the messages we have right now, identify the connector we want and send the same Key with null value.

note

To reset offsets completely we do not specify offset: 0, we send a null value

REST API to reset offsets

Starting from Kafka 3.6.0, Kafka Connect has a REST API to manage connectors, and it is possible to reset offsets through it. The docs about it are defined in the KPI-875, but they are still not present in the official docs. If you are using Confluent, starting from Confluent's 7.6.0 version Kafka 3.6.0 is included.

If we use this version, we can simply do a few curls to reset offsets. First we need to stop the connector and then reset the offsets.

curl -X PUT http://localhost:8083/connectors/my-connector-name/offsets/stop
curl -X DELETE http://localhost:8083/connectors/my-connector-name/offsets

We can also know the status of the offsets:

curl -X GET http://localhost:8083/connectors/my-connector-name/offsets | jq

TL;DR

To reset offsets in MM2, you need to:

  • Stop, pause or remove the connectors
  • Delete or truncate the topic mm2-offset-syncs.<cluster-alias>.internal
  • Reset the offsets in the connect-offsets topic, either manually or through the REST API for the desired connector
  • Start the connectors again
warning

Deleting the topic mm2-offset-syncs.<cluster-alias>.internal will not reset the offsets for other connectors you have configured in MM2 as they fall back to the connect-offsets topic, but be careful and do this at your own risk, things might change in the future and this could become false.

· 9 min read
Javier Montón

Big Data Types is a library that can safely convert types between different Big Data systems.

The power of the library

The library implements a few abstract types that can hold any kind of structure, and using type-class derivations, it can convert between multiple types without having any code relating them. In other words, there is no need to implement a transformation between type A to type B, the library will do it for you.

As an example, let's say we have a generic type called Generic. Now we want to convert from type A to type B. If we implement the conversion from A to Generic and the conversion from Generic to B, automatically we can convert from A to B although there is no single line of code mixing A and B.

We can also do the opposite, we can convert from B to A by implementing the conversion from B to Generic and the conversion from Generic to A. Now we can convert between A and B as we wish.

Now comes the good part of this. If we introduce a new type C and we want to have conversions, we would need to convert from A to C, and from B to C and the opposite (4 new implementations). If now we introduce D, we would need to implement the conversion from A to D, from B to D and from C to D and the opposite (6 new implementations). This is not scalable, and it is not maintainable.

Having this Generic type means that when we introduce C, we only need to implement the conversion from C to Generic and from Generic to C, without worrying at all about other implementations or types. Moreover, is likely that the conversion will be very similar to others, so we can reuse some of the code.

tip

It is important to know that one of these types is the Scala types themselves. So if we want to convert from Scala types (like case classes) to another type, we only need to implement Generic -> newType

How the library works

Modules

As mentioned, the library has multiple modules, each one of them representing a different system with its own types. Each module implements the conversion from and to Generic.

For now, the modules are core (for Scala types and common code), BigQuery, Cassandra, Circe, and Spark.

To use the library, only the modules that are needed should be imported. For example, if we want to convert from Scala types to BigQuery types, we only need to BigQuery module. (Core module is always included as a dependency) If we want to convert from Spark to BigQuery we need to import both Spark and BigQuery modules.

Generic type

The Generic type is called SqlType and it's implemented as sealed trait that can hold any kind of structure. In Scala 3, this type is implemented as an enum but both represents the same.

Repeated values

Usually, there are two ways of implementing a repeated value like an Array. Some systems use a type like Array or List and others flag a basic type with repeated. The implementation of this SqlType uses the latter, so any basic type can have a mode that can be Required, Nullable, or Repeated. This is closer to the BigQuery implementation.

note

This implementation does not allow for Nullable and Repeated at the same time, but a Repeated type can have 0 elements.

Nested values

The SqlStruct can hold a list of records, including other SqlStruct, meaning that we can have nested structures.

Type-class derivation

Type-classes are a way of implementing "ad-hoc polymorphism". This means that we can implement behaviour for a type without having to modify the type itself. In Scala, we achieve this through implicits.

The interesting part of type-classes for this library is that we can derive a type-class for a type without having to implement it.

For example, we can create a simple type-class:

trait MyTypeClass[A] {
def doSomething(a: A): String
}
tip

A type-class is always a trait with a generic type.

Then, we can implement our type-class for an Int type:

implicit val myTypeClassForInt: MyTypeClass[Int] = new MyTypeClass[Int] {
override def doSomething(a: Int): String = "This is my int" + a.toString
}
tip

Scala 2.13 has a simplified syntax for this when there is only one method in the trait:

implicit val myTypeClassForInt: MyTypeClass[Int] = (a: Int) => "This is my int" + a.toString

We can do similar for other types:

implicit val myTypeClassForString: MyTypeClass[String] = new MyTypeClass[String] {
override def doSomething(a: String): String = "This is my String" + a
}

Now, if we want to have a List[Int] or a List[String], and use our type-class, we need to implement both List[Int] and List[String]. But, if we implement the type-class for List[A] where A is any type, the compiler can derive the implementation for List[Int] and List[String] automatically, and for any other type already implemented.

implicit def myTypeClassForList[A](implicit myTypeClassForA: MyTypeClass[A]): MyTypeClass[List[A]] = new MyTypeClass[List[A]] {
override def doSomething(a: List[A]): String = a.map(myTypeClassForA.doSomething).mkString(",")
}

Similarly, if we want to have a case class like:

case class MyClass(a: Int, b: String)

We would need to implement the type-class for MyClass. But, if we implement the type-class for a generic Product type, the compiler can derive the implementation for MyClass automatically, and for any other case class that has types already implemented.

note

Implementing the conversion for a Product type is more complex than implementing it for a List type, and usually Shapeless is the library we use to do this in Scala 2.

In Scala 3, the language already allows us to derive the type-class for a Product type, so we don't need to use Shapeless.

In big-data-types we have the implementation for all basic types, including iterables and Product types here for Scala 2 and here for Scala 3.

Implementing a new type

To implement a new type, we need to implement the conversion from and to Generic type. There is a complete guide, step by step, with examples, in the official documentation

A quick example, let's say we want to implement a new type called MyType. We need to implement the conversion MyType -> Generic and Generic -> MyType.

tip

Both conversions are not strictly needed, if we only need to use Scala -> MyType we only need to implement Generic -> MyType because the library already has the conversion Scala -> Generic. The same happens with other types, like BigQuery -> MyType will also be ready automatically.

To do that, we need a type-class that works with our type. This will be different depending on the type we want to implement. For example:

trait GenericToMyType[A] {
def getType: MyTypeObject
}

Maybe our type works with a List at the top level, as Spark does, so instead, we will do:

trait GenericToMyType[A] {
def getType: List[MyTypeObject]
}
tip

getType can be renamed to anything meaningful, like toMyType or myTypeSchema

And we need to implement this type-class for all the (Generic) SqlType types:

implicit val genericToMyTypeForInt: GenericToMyType[SqlInt] = new GenericToMyType[SqlInt] {
override def getType: MyTypeObject = MyIntType
}

Using conversions

The defined type-classes allow you to convert MyType -> Generic by doing this:

val int: SqlInt = SqlTypeConversion[MyIntType].getType

And Generic -> MyType by doing this:

val int: MyIntType = SqlTypeToBigQuery[SqlInt].getType

This can work well when we work these case classes and we don't have an instance of them. For example, a case class definition can be converted into a BigQuery Schema, ready to be used for table creation.

But, sometimes, our types work with instances rather than definitions, and we need to use them to convert to other types.

There is another type-class on all implemented types that allows to work with instances. In general, this type-class can be implemented using code from the other, but this one expects an argument of the type we want to convert to.

trait SqlInstanceToMyType[A] {
def myTypeSchema(value: A): MyTypeObject
}

Implementing this type-class allows to use the conversion like this:

val mySchema: MyTypeObject = SqlInstanceToMyType.myTypeSchema(theOtherType)

But these syntaxis are not very friendly, and we can use extension methods to make it more readable.

Extension methods

Extension methods in Scala 2 are done through implicit classes and allow us to create new methods for existing types.

In the library, we implement extension methods for Generic -> SpecificType, and the interesting part, again, is that we don't need to implement A -> B directly, the compiler can derive it for us.

  implicit class InstanceSyntax[A: SqlInstanceToMyType](value: A) {
def asMyType: MyTypeObject = SqlInstanceToMyType[A].myTypeSchema(value)
}

and suddenly, we can use the conversion like this:

val mySchema: MyTypeObject = theOtherType.asMyType

And this is a syntax that can be easier to use. For example, if we work with Spark and BigQuery, we can do the following:

val sparkDf: DataFrame = ???
val bigQuerySchema = sparkDf.schema.asBigQuery

More types to come

The library has only a few types implemented (BigQuery, Spark, Cassandra, and Circe) but implementing a new type is fairly easy and it gets automatically methods that can be used to convert it into any other type already implemented. As this grows, the number of conversions grows exponentially, and the library becomes more powerful.

Some types that could be potentially implemented:

  • Avro
  • Parquet
  • Athena (AWS)
  • Redshift (AWS)
  • Snowflake
  • RDS (relational databases)
  • Protobuf
  • ElasticSearch templates
  • ...

Some types could have some restrictions, but they could be implemented differently, for example, a type conversion could be implemented as a String conversion, being the string a "Create table" statement for a specific database and automatically any other type could be printed as a "Create table" statement.