Large data packets and Kafka

The event streaming platform is currently very much hyped and considered a solution for all kinds of problems.

Autor: Anchou Bockhorn

Like any technology, Kafka has its limitations - one of them is the maximum package size of 1 MB. This is only a default setting, but should not be changed easily.

If you google it, it is not easy to find out how to solve the problem of large packages and what the advantages and disadvantages of the different approaches are. That's why we decided to put together a guide with best practices and give as short an answer to the question as possible.

If you are not interested in the technical details, I will anticipate the result of my research:

There are three possibilities:

  1. Adjust the default setting
    Since the performance of the cluster can already be affected by some larger messages (10 MB), this is not recommended.
  2. Divide the data packets into smaller units
    This is possible, but it significantly increases the complexity and also the memory requirements of the consumer.
  3. Send only references to the data to the Kafka cluster and store the data packets in another container
    This is the recommended approach for the majority of applications. It only slightly increases the complexity of the application and retains all the features that make Kafka so interesting.

In the following I will go into detail about the different approaches and how they can be implemented.

1.    Adjust default setting (message.max.bytes)

But increasing the value of message.max.bytes is not the end of the story. Four settings have to be adjusted:

  1. Producer: max.request.size
  2. Broker: replica.fetch.max.bytes
  3. Mediator: message.max.bytes
  4. Consumer: fetch.message.max.bytes

After adjusting these settings, you can technically process messages up to the newly set limit. However, you will quickly notice performance problems. From 10 MB on, impairments become clearly noticeable (1). You can counteract the problem by drastically increasing the heapspace. However, there are no good rules of thumb for this. In addition, it affects different components: Producer, Consumer and Broker.

2.    Divide data packets into small units

This is an obvious approach, since no additional technologies are needed and therefore the performance depends solely on the Kafka cluster. However, this approach requires some adjustments to producers, additional logic and increased memory requirements for consumers. 
The additional complexity arises from the fact that normally the segments of a message are not written one after the other in the log, but are interrupted by segments of another message.

interrupting_segements.svg
Figure 1: The first two segments of message 1 are interrupted by a segment of message 2

Note: Only in the special case that only a single idempotent producer writes per partition, the segments of a message are consecutive. Under these circumstances, the implementation is comparatively simple and requires from the consumer at most as much memory as a message can be large.


Producer:

The producer splits the message into segments and provides them with additional metadata. On the one hand, a segment counter is required so that the consumer can put the segments together in the correct order. On the other hand, a message ID is required so that the segments can be assigned to a message. And thirdly, you should make sure that you include the number of segments and a checksum so that the whole thing can be reassembled easily and consistently.

Since the producer only confirms the sending of the message by calling the producer callback after the last segment has been successfully transmitted, it is important that this ID can be deterministically derived from the message itself. This is done in the event that a producer crashes after it has already sent some segments.

producer_fault.svg
Figure 2: The producer crashes before it could transfer all segments of a message

If another producer then takes over, he cannot know which segments have already been sent to the cluster. Therefore, he or she transmits the entire message again. This means that the first segments are sent twice. The consumer must be able to determine this. This can be guaranteed by an ID that is derived from the message. In the case of a random UUID, however, this would not be possible.

producer_resume.svg
Figure 3: The producer who takes over does not know which segments have already been transferred and therefore starts from the beginning. Thus the first segments are sent twice.

Consumer:

Since the segments of one message can be interrupted by segments of another message, the consumer must buffer the segments of subsequent messages until he or she has completely read the message from which he or she first received a segment. Only when all segments of a message have been reassembled to form the original message can it be processed. 
The consumer offset is also only then updated to the offset of the oldest segment of the messages remaining in the buffer.

consumer_offset.svg
Figure 4: All segments are buffered in the consumer until the message from which a segment was first read is complete. Then the consumer offset is updated up to the oldest segment remaining in the buffer.

To ensure that the consumer works reliably, the segment buffer in the consumer must never be larger than the available memory. The memory requirement can be reduced by processing the message first, which is complete first. This means that you should not process the message from which you first received a segment as described above. However, consumer offset handling is then much more complex and the procedure has further limitations (2).

You can also build the consumer in such a way that the memory requirement is limited to the size of the original messages. However, this then requires a multi-level consumer with even more logic. If you want to know more about this, please contact me personally.

3. Process references instead of data

The safest and simplest procedure is to store the data in an external memory and only process the reference to the externally stored data with Kafka. The consumer reads the references to the data and retrieves them from the external memory. The external memory must, of course, be fail-safe and fast, since the writing and reading in external memory is the limiting factor in this system. Therefore, we recommend that you use cloud storage that is highly available and allows parallel read and write.

reference_processing.svg
Figure 5: The data is written to an external memory by the producer and read from there by the consumer

There is still the question of when the data in the external memory should be deleted. If only one consumer reads from a partition, the data can be deleted immediately after reading or updating the consumer offset. Otherwise, it is advisable to select the same retention time for the data as for the topic where the references are located.

Recommendation depending on application

Changing the default setting of the maximum 1 MB packages only makes sense if the packages are slightly larger - less than 10 MB. Performance and load tests must also be used to check whether and how much additional resources (heapspace etc.) are required.

Segmenting the messages into smaller packages is recommended if you want to get by without additional technologies and are willing to invest in more complex consumers. At the same time, one should take into account the nature of the producers. If it is very likely that many large messages will be sent at the same time, you may need to switch to a different pattern. 

In all other cases it is recommended to store the data externally and to process only the references to the data with Kafka.

 

(1) See Stackoverflow article
(2) You can find more on this topic here: Presentation by Jiangjie Qin