Kafka for large data packages

The event streaming platform has become indispensable for most large companies, but can it really hold out as a solution for all problems?

Author: Patrick Nick

Like any technology, however, Kafka has its limitations - one of which is the maximum package size of 1 MB. Although this is only a default setting, it should not be changed without further ado.

If you google for it, it is not easy to find out how to solve the problem of large packages and what the advantages and disadvantages of the different approaches are. For this reason, we decided to put together a guide with best practices and give the shortest possible answer to the question.

In case you are not interested in the technical details, I will take the result of my research right away:

There are 3 options:

  1. Adjust the default setting
    Since the performance of the cluster can be severely disrupted even with a few larger messages (>10 MB), this is not recommended...
  2. Divide the data packets into smaller units
    This is possible, but it significantly increases both complexity and memory requirements for the consumer.
  3. Send only references to the data to the Kafka cluster and store the data packets in another vessel.
    This is the recommended approach for the majority of use cases. It only slightly increases the complexity of the applications and preserves all the features that make Kafka so interesting.

In the following, I go into detail about the different approaches and how they can be implemented.

1.    Adjusting the default setting (message.max.bytes)

It is technically possible to override the default limits by overriding configurations. Four settings must be adjusted:

  1. Producer: max.request.size
  2. Broker: replica.fetch.max.bytes
  3. Broker: message.max.bytes
  4. Consumer: fetch.message.max.bytes

After adjusting these four settings, you can technically process messages up to the newly set limit. However, you will quickly notice performance problems. From 10 MB, impairments become clearly noticeable (1). You can counteract the problem by drastically increasing the heap space. However, there are no good rules of thumb for this. In addition, it affects all components that come into contact with the data packets: Producer, Consumer and Broker.

2.    Divide data packets into small units

This is an obvious approach, since no additional technologies are needed and thus the performance depends solely on the Kafka cluster.
 
But only in the special case that only a single producer with activated idempotency writes per partition, the segments of a message are guaranteed to be consecutive (for the lifetime of a producer). Under these circumstances, the implementation is comparatively simple and requires at most as much memory from the consumer as a message can be large.

For more general scenarios, this procedure requires some adjustments to the producer, as well as additional logic and increased memory requirements for the consumer.
 
The additional complexity arises from the fact that in the normal case the segments of a message are not necessarily written one after the other into the log, but can be interrupted by segments of other messages. We will explain this situation in more detail for the producer and the consumer in the following.
 
It is also possible that the first part of a message could arrive after a later part if it failed first and had to be repeated by the producer.

interrupting_segements.svg
Figure 1: The first two segments of message 1 are interrupted by a segment of message 2

Producer:

The producer splits the message into segments and provides them with additional metadata. On the one hand, a segment counter is needed so that the consumer can assemble the segments in the correct order. On the other hand, a message ID is needed so that the segments can be assigned to a message. And thirdly, you should make sure that you send the number of segments and a checksum so that the whole thing can be reassembled easily and consistently.

Since the producer only confirms the sending of the message by calling the producer callback after the last segment has been successfully transmitted, it is important that this ID can be deterministically derived from the message itself. This is done in case a producer crashes after it has already sent several segments.

producer_fault.svg
Figure 2: The Producer crashes before it has been able to transmit all segments of a message.

If another producer then takes over, he cannot know which segments have already been sent to the cluster. Therefore, he transmits the entire message again. This means that the first segments are sent twice. The consumer must be able to detect this. This can be guaranteed by an ID that is derived from the message. In the case of a random UUID, however, this would not be possible.

producer_resume.svg
Figure 3: The producer who takes over does not know which segments have already been transmitted and therefore starts from the beginning. As a result, the first segments are sent twice.

Consumer:

Since the segments of a message can be interrupted by segments of another message, the consumer must buffer the segments of the subsequent messages until it has completely read the message from which it first received a segment. Only when all segments of a message have been reassembled into the original message can it be processed.
The consumer offset is also updated to the offset of the oldest segment of the messages remaining in the buffer.

consumer_offset.svg
Figure 4: All segments are buffered in the consumer until the message from which a segment was read first is complete. Then the consumer offset is updated to the oldest segment remaining in the buffer.

If you want to ensure that the consumer works reliably, the segment buffer in the consumer must never be larger than the available memory. The memory requirement can be reduced by first processing the message that is complete first. That is, do not process the message from which a segment was received first, as described above. However, the consumer offset handling is then massively more complex and the procedure has further limitations (2).

It is also possible to build the consumer in such a way that the memory requirement is limited to the size of the original messages. However, this requires a multi-level consumer with even more logic. If you would like to know more about this, please feel free to contact me personally.

3. process references instead of data

The safest and simplest approach is to store the data in an external memory and only process the reference to the externally stored data with Kafka. The consumer reads the references to the data and retrieves them from the external storage. The external storage must, of course, be fail-safe and fast, as in this system the writing and reading in the external storage is the limiting factor. Therefore, it is recommended to use a cloud storage that is highly available and allows parallel writing and reading.

reference_processing.svg
Figure 5: Data is written to an external memory by the Producer and read from there by the Consumer

There is still the question of when the data in the external memory should be deleted. If only one consumer reads from a partition, the data can be deleted immediately after reading or updating the consumer offset. Otherwise, it is recommended to select the same retention time for the data as for the topics where the references are located.

Recommendation depending on the application

Changing the default setting of maximum 1 MB packages only makes sense if the packages are slightly larger - less than 10 MB. Performance and load tests must also be used to test whether and how much additional resources (heap space, etc.) are needed.

Segmenting the messages into smaller packages is recommended if you want to get by without additional technologies and are willing to invest in more complex consumers. At the same time, one should consider the nature of the producers. If it is very likely that many large messages will be sent at the same time, one should switch to another pattern if necessary.

In all other cases, it is recommended to store the data externally and only process the references to the data with Kafka.

Sources

  1. See Stackoverflow article

  2. More on this topic can be found here: Presentation by Jiangjie Qin