Skip to main content

Kafka Streams - Why you should use named state stores

· 7 min read
Javier Montón

Why should you name your state stores? Because otherwise, you'll lose data.

But first, a bit of context.

Kafka Streams

Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.

Unlike other stream processing frameworks, Kafka Streams is not a separate processing cluster but a library that runs within your application. This means you don't need to set up and manage a separate cluster - your application becomes the stream processing engine.

Kafka Streams gives you simple operations like:

  • map - transform each record
  • filter - keep only records that match certain conditions
  • join - combine data from different sources
  • aggregate - group and summarize data

It also handles the hard stuff automatically, like recovering from failures and making sure data isn't lost or processed twice.

State Stores

State stores are a fundamental concept in Kafka Streams that allow you to maintain local state within your stream processing application. They are essentially key-value stores that are backed by Kafka topics for durability and fault tolerance.

By default, you don't need to think much about them, Kafka Streams automatically creates and manages state stores for you when you use operations like groupBy, aggregate, or join.

State stores are automatically managed by Kafka Streams, including:

  • Persistence to local disk (RocksDB by default)
  • Backup to Kafka topics (changelog topics)
  • Restoration during application restarts
  • Rebalancing when scaling up/down

As an example, a common scenario is when you want to performa a join between two streams. One way of doing it is to use a KTable, which is a representation of a stream as a table. When you create a KTable from a topic, Kafka Streams automatically creates a state store to maintain the current state of that table. Then, you can join a KStream with a KTable.

When joining a KStream with a KTable, Kafka Streams will perform the join for each record in the KStream against the current state of the KTable.

warning

This means that if the KTable is not up to date, or if it missing data for any reason, the join might not produce the expected results.

Internal Topics

When you use stateful operations in Kafka Streams, the framework automatically creates internal topics in Kafka to support fault tolerance and state management. These topics are not meant to be consumed directly by other applications.

The main types of internal topics are:

Changelog Topics

These topics store the complete state of your state stores. Every change to a state store is logged to its corresponding changelog topic. The naming pattern is:

<application.id>-<store-name>-changelog

Repartition Topics

Created when data needs to be repartitioned for operations like joins or aggregations. The naming pattern is:

<application.id>-<operation>-repartition
danger

Here's the problem: if you don't give your state stores names, Kafka Streams will make up names automatically. But these automatic names can change when you update your code or Kafka Streams version, and that means you'll past data.

For example, let's say you have an app that joins a KTable with a KStream. If you redeploy your app and the changelog topic gets a new name, you'll lose all the data that was stored in your KTable. Your joins will stop working because the old data is gone.

Code Examples

Let's take a common code example. If you do these operations in Kafka Streams:

  • Create a KStream from a topic
  • Do a stateless operation like map or filter
  • Create a KTable from this KStream
  • Perform a left join with another KStream

In code, it will look like this:

    val sourceStream: KStream[String, A] = builder.stream[String, A]("test-topic")
val mappedStream: KStream[String, B] = sourceStream.map { (key, value) =>
// assume some transformation logic
val transformedValue = ???
(key, transformedValue)
}
// .toTable will create a state store
val kTable: KTable[String, String] = mappedStream.toTable
val secondStream: KStream[String, C] = builder.stream[String, B]("test-topic-2")
val joinedStream: KStream[String, D] = secondStream.leftJoin(kTable) { (streamValue, tableValue) =>
// Join logic
???
}

When this code runs:

  1. Kafka Streams creates a state store to maintain the current state of the topic
  2. A changelog topic my-application-KSTREAM-TOTABLE-STATE-STORE-0000000007-changelog is created
  3. Every update to the KTable updates both the local state store and the changelog topic
  4. If the application restarts, the state store is rebuilt from the changelog topic

But now, what happens if we modify the code? For example, the map operation? Kafka Streams will think it's a new KTable and it will create a new state store with a new name, like this:

my-application-KSTREAM-TOTABLE-STATE-STORE-0000000042-changelog.

When the application restarts, it will rebuild the KTable in memory using the data from the new changelog topic, but the old data will be lost, as it was stored in the previous changelog topic. Practically speaking, we have lost all the previous data in the KTable. Now, new records in the KStream that try to find a match on data processed before our re-deployment will not find it, leading to missing results.

Naming State Stores

The solution is simple: give your state stores names.

You can (and you should) provide explicit names for your state stores using the Materialized class.

The above example can be modified to use explicit names for the state store:

    val kTable: KTable[String, String] = mappedStream.toTable(
Materialized.as("my-stream-TOTABLE-my-table-name")
)

Full example:

    val sourceStream: KStream[String, A] = builder.stream[String, A]("test-topic")
val mappedStream: KStream[String, String] = sourceStream.map { (key, value) =>
// assume some transformation logic
val transformedValue = ???
(key, transformedValue)
}
// .toTable will create a state store, now with an explicit name
val kTable: KTable[String, String] = mappedStream.toTable(
Materialized.as[String, String]("my-stream-TOTABLE-my-table-name")
)
val secondStream: KStream[String, B] = builder.stream[String, B]("test-topic-2")
val joinedStream: KStream[String, C] = secondStream.leftJoin(kTable) { (streamValue, tableValue) =>
// Join logic
???
}
warning

Note that the Materialized class has other methods, like Materialized.with, which allow you to specify some configuration, but not the name of the state store.

warning

Also, note that the class Named, which you can use like Named.as("my-name"), is not the same as Materialized.as("my-name"), Named gives a name to the operation, not to the state store. This might work for other topics like *-repartition topics, but it does not affect state stores.

More options can be set on a State Store, a bigger example would be:

stream.toTable(
Named.as(s"TABLE-table-name"),
Materialized
.as[K, V, ByteArrayKeyValueStore](s"TABLE-table-name")
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
.withStoreType(storeType)
)

The same problem happens with other operations like aggregate.

Without explicit naming, Kafka Streams generates names like:

KSTREAM-AGGREGATE-STATE-STORE-0000000003

The corresponding changelog topic would be:

my-app-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog

Which after a re-deployment, if the state store name changes, will lead to wrong aggregation results.

tip

Use descriptive names that clearly indicate what the state store contains. This makes debugging, monitoring, and maintenance much easier. For example, add your operation to the name, like TOTABLE, or AGGREGATE plus any additional context that helps you understand the purpose of the state store.

Benefits of Named State Stores

We could list several benefits, but among them, the most important one is: You won't lose data silently.

TL;DR

  • Always name your state stores, or you'll lose data without knowing it.

If you think this might be already happening to you, check all the topics named like *-changelog in your Kafka cluster, and check if any of them has outdated data, it could potentially indicate that a state store was renamed at some point, leaving a *-changelog topic unused, and another one with less data than expected.