Some kafka retention lessons (we learned the hard way)

We’re using Kafka (2.0 on cluster, Java client 1.1) as our messaging backbone. A different cluster is deployed in each environment, and we recently started seeing some weird behaviour in one of our user acceptance environments.

First Problem: Old messages are suddenly reprocessed.

Once in every few version releases, we suddently saw some services re-processing old messages. After a lot of head banging and tedious head scratching, we found out that in Kafka, the retention on the offsets and the retention on the messages is not necessarily the same.

What does it mean? Well, when a messsage is sent to a specific Kafka topic, it’s retained as long as the topic retention. So, if our topic retention is 1 week, then after 1 week it will no longer be available.

The consumer offset, however, is a different story. The consumers offsets are saved in an internal topic called __consumer_offsets, and their retention time is defined in the parameter offsets.retention.minutes in the broker config with a deafult of 24 hours.

So what happened to us is this: Our messages retention was set to 2 weeks, and the offsets retention was 24 hours. After a period of not using the system, we deployed a new version. Once the new version was up, it queried the Kafka topic for it’s latest offset. However, the consumer_offset of this application id was already deleted, and the default behaviour is to read from the begining of the stream – which is exactly what happened to us: This is why we were consuming old messages when we released new versions, and it would only happen if we released versions after more than 24 hours and less than 2 weeks.

Second Problem: The producer attempted to use a producer id which is not currently assigned to its transactional id

This one was even more annoying. We’re using Kafka streams api, which promises an exactly-once message processing. Every once in a while, we’d get the above error after the message has been proccessed. This would cause the Kafka stream to shut down, the app to restart – and then, to process the same message again(!).

Now, this was extremly weird. First of all, it was a violation of our "exactly-once" constraint. In addition, we had no idea what it means!

Lately we started also seeing what seems to be a related error: org.apache.kafka.common.errors.UnknownProducerIdException: Found no record of producerId=16003 on the broker. It is possible that the last message with the producerId=16003 has been removed due to hitting the retention limit.

So we started thinking – what is happening to our producer ids?

Then we discovered this:

The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer’s transactional ID without receiving any transaction status updates from it.

Type: int

Default: 604800000

604800000 ms is 7 days. So basically, if we’ve had a streaming application that had no traffic for 7 days, it’s producer metadata was deleted – and that’s the behaviour we’ve been seeing: the application consumed the message, processed it – and when it tried to commit the transaction and update the offset, it failed. This is why we processed, crushed, and re-processed.

Bottom line

Kafka is a tool built for massive data streaming, and it’s defaults are organized around it. Both these issued occured because this specific environment’s usage pattern is random and is not corresponding with the cdefault configuration.

How to handle runtime exceptions in Kafka Streams

We’ve been using Kafka Streams (1.1.0, Java) as the backbone of our μ-services architecture. We’ve switched to stream mainly because we wanted the exactly-once processing guarantee.

Lately we’ve been having several runtime exceptions that killed the entire stream library and our μ-service.

So the main question was – is this the way to go? After a few back and forth, we realized that the best way to test this is by checking:

  1. What would Kafka do?
  2. Do we still keep our exactly-once promise?

What does Kafka do?

This document is the Kafka Stream Architecture design. After the description of the StreamThread , StreamTask and StandbyTask, there’s a discussion about Exceptions handling, the gist of which is as follows:

First, we can distinguish between recoverable and fatal exceptions. Recoverable exception should be handled internally and never bubble out to the user. For fatal exceptions, Kafka Streams is doomed to fail and cannot start/continue to process data. […] We should never try to handle any fatal exceptions but clean up and shutdown

So, if Kafka threw a Throwable at us, it basically means that the library is doomed to fail, and won’t be able to process data. In our case, since the entire app is built around Kafka, this means killing the entire μ-service, and letting the deployment mechanism just re-deploy another one automatically.

Do we still keep our exactly-once promise?

Now we’re faced with the question whether or not this behaviour might harm our hard-earned exactly-once guarantee.
To answer that question, we first need to understand when the exactly-once is applicable.

exactly-once is applicable from the moment we’re inside the stream – meaning, our message arrived at the first topic, T1. So everything that happens before that is irrelevant: the producer who pushed the message to T1 in the first time could have failed just before sending it, and the message will never arrive (so not even at-least-once is valid) – so this is something we probably need to handle, but that doesn’t have anything to do with streams.

Now, let’s say our message, M, is already inside topic T1. Hooray!

Now we can either fail before reading it, while processing it, and after pushing it.

  • If we failed before reading it, we’re fine. The μ-service will go up again, will use the same appId, and we’ll read the message.
  • If we read it and failed before we even started processing it, we’ll never send the offset commit, so again, we’re fine.
  • If we failed during processing it, again, we’ll never reach the point of updating the offsets (because we commit the processed message together with the consumer offset – so if one didn’t happen, neither did the other)
  • If we failed after sending it – again, we’re fine: even if we didn’t get the ack, both the consumer offset and the new transformed/processed message are out.

Uncaught Exception Handlers

Kafka has 2 (non-overlapping) ways to handle uncaught exceptions:

  • KafkaStreams::setUncaughtExceptionHandler – this function allows you to register an uncaught exception handler, but it will not prevent the stream from dying: it’s only there to allow you to add behaviour to your app in case such an exception indeed happens. This is a good way to inform the rest of your app that it needs to shut itself down / send message somewhere.

  • ProductionExceptionHandler – You can implement this class and add it via the properties: StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG – but in this case, you will need to decide if the stream can keep going or not, and I feel this requires very deep understanding of the internals of the streams, and I’m still not sure when exactly you would want that.


For us, using k8s deployments, with n number of pods of each service being automatically scaled all the time, the best way to handle runtime/unchecked exceptions is to make sure our app goes down with the Kafka Stream library (using the KafkaStreams::setUncaughtExceptionHandler), and letting the deployment service take care of starting the app again.