Grosse Datenpakete und Kafka

Die Event-Streaming-Plattform wird zurzeit sehr gehypt und als Lösung für allerlei Problemstellungen gehandelt.

Autor: Anchou Bockhorn

Wie jede Technologie hat aber auch Kafka seine Limitationen - eine davon ist die maximale Paketgrösse von 1 MB. Dies ist zwar nur eine Standardeinstellung, sollte aber nicht ohne Weiteres geändert werden.

Wenn man danach googelt, ist es nicht einfach herauszufinden, wie man die Problematik der grossen Pakete lösen kann und was die Vor- und Nachteile der verschiedenen Ansätze sind. Aus diesem Grund haben wir uns entschlossen, eine Anleitung mit Best Practices zusammen zu tragen und eine möglichst kurze Antwort auf die Frage zu geben.

Falls Sie die technischen Details nicht interessieren, nehme ich das Resultat meiner Recherchen gleich vorweg:

Es gibt 3 Möglichkeiten:

  1. Die Standardeinstellung anpassen
    Da schon bei einigen grösseren Messages (10 MB) die Performance des Clusters beeinträchtigt werden kann, ist dies nicht zu empfehlen.
  2. Die Datenpakete in kleinere Einheiten unterteilen
    Dies ist möglich, erhöht aber die Komplexität deutlich und auch den Bedarf an Memory beim Consumer.
  3. Nur Referenzen auf die Daten an den Kafka-Cluster schicken und die Datenpakete in einem anderen Gefäss speichern
    Dies ist für die Mehrheit der Anwendungsfälle die empfohlene Herangehensweise. Es erhöht die Komplexität der Applikation nur geringfügig und erhält alle Eigenschaften, die Kafka so interessant machen.

Im Folgenden gehe ich detailliert auf die verschiedenen Lösungsansätze ein und wie sie umgesetzt werden können.

1.    Standardeinstellung anpassen (message.max.bytes)

Mit dem Vergrössern des Wertes von message.max.bytes ist es aber noch nicht getan. Es müssen vier Einstellungen angepasst werden: 

  1. Producer: max.request.size
  2. Broker: replica.fetch.max.bytes
  3. Broker: message.max.bytes
  4. Consumer: fetch.message.max.bytes

Nachdem man diese Einstellungen angepasst hat, kann man technisch gesehen Nachrichten bis zum neu gesetzten Limit verarbeiten. Allerdings wird man schnell Performance-Probleme feststellen. Ab 10 MB werden Beeinträchtigungen deutlich merkbar (1). Man kann der Problematik entgegenwirken, indem man den Heapspace drastisch erhöht. Allerdings gibt es dafür keine guten Faustregeln. Dazu kommt noch, dass es verschiedene Komponenten betrifft: Producer, Consumer und Broker.

2.    Datenpakete in kleine Einheiten unterteilen

Dies ist eine nahe liegende Herangehensweise, da man keine zusätzlichen Technologien braucht und somit die Performance allein vom Kafka-Cluster abhängt. Allerdings bedingt dieses Vorgehen einige Anpassungen an Producer, zusätzliche Logik und erhöhten Memory-Bedarf beim Consumer. 
Die zusätzliche Komplexität entsteht dadurch, dass im Normalfall die Segmente einer Nachricht nicht hintereinander ins Log geschrieben, sondern durch Segmente anderer Message unterbrochen werden.
 

interrupting_segements.svg
Abbildung 1: Die ersten zwei Segmente der Message 1 werden von einem Segment der Message 2 unterbrochen

Anmerkung: Einzig beim Spezialfall, dass nur ein einzelner idempotenter Producer pro Partition schreibt, sind die Segmente einer Message konsekutiv. Unter diesen Umständen ist die Umsetzung vergleichsweise simpel und erfordert vom Consumer höchstens so viel Memory, wie eine Message gross sein kann.


Producer:

Der Producer zerteilt die Message in Segmente und versieht diese mit zusätzlichen Metadaten. Einerseits braucht es einen Segment-Counter, damit der Consumer die Segmente in der richtigen Reihenfolge zusammensetzen kann. Anderseits braucht es eine Message-ID, damit die Segmente einer Message zugeordnet werden können. Und drittens sollte man darauf achten, dass man die Anzahl der Segmente und eine Checksumme mitschickt, damit das Ganze einfach und konsistent wieder zusammengesetzt werden kann.

Da der Producer das Versenden der Message durch das Aufrufen des Producer Callbacks erst bestätigt, nachdem das letzte Segment erfolgreich übermittelt wurde, ist es wichtig, dass diese ID aus der Message selbst deterministisch abgeleitet werden kann. Das wird für den Fall gemacht, dass ein Producer abstürzt, nachdem er schon einige Segmente versendet hat.

producer_fault.svg
Abbildung 2: Der Producer stürzt ab, bevor er alle Segmente einer Message übertragen konnte

Wenn dann ein anderer Producer übernimmt, kann er nicht wissen, welche Segmente schon ans Cluster geschickt wurden. Daher übermittelt er die gesamte Message nochmals. Somit werden die ersten Segmente doppelt gesendet. Dies muss vom Consumer festgestellt werden können. Durch eine ID, die aus der Message abgeleitet wird, kann das gewährleistet werden. Im Fall einer zufälligen UUID wäre das hingegen nicht möglich.

producer_resume.svg
Abbildung 3: Der Producer, der übernimmt, weiss nicht welche Segmente schon übertragen wurden und beginnt daher von vorne. Dadurch werden die ersten Segmente doppelt versendet.

Consumer:

Da die Segmente einer Message durch Segmente einer anderen Message unterbrochen werden können, muss der Consumer die Segmente der nachfolgenden Messages solange buffern, bis er die Message, von der er als erstes ein Segment erhalten hat, vollständig gelesen hat. Erst wenn alle Segmente einer Message wieder zur ursprünglichen Nachricht zusammengefügt sind, kann diese verarbeitet werden. 
Auch der Consumer Offset wird erst dann aufdatiert und das auf den Offset des ältesten Segments der im Buffer verbleibenden Messages.

consumer_offset.svg
Abbildung 4: Alle Segmente werden solange im Consumer gebuffert, bis die Message, von der als Erstes ein Segment gelesen wurde, komplett ist. Dann wird der Consumer Offset bis zum ältesten im Buffer verbliebenen Segments aufdatiert.

Will man sicherstellen, dass der Consumer zuverlässig arbeitet, darf der Segment-Buffer im Consumer nie grösser werden als das verfügbare Memory. Der Bedarf an Memory kann vermindert werden, indem man die Message als erstes verarbeitet, welche als Erstes komplett ist. Das heisst, nicht wie oben beschrieben, die Message verarbeiten, von welcher man als Erstes ein Segment erhalten hat. Allerdings ist dann das Consumer Offset Handling massiv komplexer und das Vorgehen hat weitere Limitationen (2).

Man kann den Consumer auch so bauen, dass der Memory-Bedarf auf die Grösse der ursprünglichen Nachrichten beschränkt ist. Allerdings erfordert das dann einen mehrstufigen Consumer mit noch mehr Logik. Wenn Sie dazu mehr erfahren möchten, können Sie mich gerne persönlich kontaktieren.

3.    Referenzen statt Daten verarbeiten

Das sicherste und einfachste Vorgehen ist, die Daten in einen externen Speicher abzulegen und lediglich die Referenz auf die extern gespeicherten Daten mit Kafka zu verarbeiten. Der Consumer liest die Referenzen zu den Daten und holt diese beim externen Speicher ab. Der externe Speicher muss natürlich ausfallsicher und schnell sein, da in diesem System das Schreiben und Lesen im externen Speicher der limitierende Faktor ist. Daher empfiehlt es sich, ein Cloud Storage zu verwenden, welches hochverfügbar ist und paralleles Schreiben und Lesen erlaubt.

reference_processing.svg
Abbildung 5: Die Daten werden vom Producer in einen externen Speicher geschrieben und vom Consumer von dort gelesen

Es stellt sich noch die Frage, wann die Daten im externen Speicher gelöscht werden sollen. Wenn nur ein Consumer von einer Partition liest, können die Daten gleich nach dem Lesen, bzw. dem Aufdatieren des Consumer Offsets gelöscht werden. Andernfalls empfiehlt es sich, dieselbe Retention Time für die Daten zu wählen, wie für die Topic, wo die Referenzen liegen.

Empfehlung je nach Anwendungsfall

Die Standardeinstellung der maximal 1 MB grossen Pakete zu verändern, macht nur Sinn, wenn die Pakete geringfügig grösser sind – weniger als 10 MB. Mit Performance- und Load-Tests muss bei dem Vorgehen zudem getestet werden, ob und wie viel zusätzliche Ressourcen (Heapspace etc.) benötigt werden.

Das Segmentieren der Nachrichten in kleinere Pakete empfiehlt sich, falls man ohne zusätzliche Technologien auskommen will und bereit ist, in komplexere Consumers zu investieren. Gleichzeitig sollte man hier beachten, wie die Produzenten geartet sind. Wenn es sehr wahrscheinlich ist, dass viele grosse Nachrichten gleichzeitig versandt werden, sollte man gegebenenfalls auf ein anderes Muster ausweichen. 

In allen anderen Fällen ist es zu empfehlen, die Daten extern abzulegen und lediglich die Referenzen zu den Daten mit Kafka zu verarbeiten.

 

(1)  Siehe Stackoverflow Beitrag
(2)  Mehr zu diesem Thema finden Sie hier: Präsentation von Jiangjie Qin