Kafka für grosse Datenpakete

Die Event-Streaming-Plattform ist bei den meisten grossen Firmen nicht mehr wegzudenken, aber kann sie wirklich als Lösung für alle Problemstellungen hinhalten?

Autor: Patrick Nick

Wie jede Technologie hat aber auch Kafka seine Limitationen - eine davon ist die maximale Paketgrösse von 1 MB. Dies ist zwar nur eine Standardeinstellung, sollte aber nicht ohne Weiteres geändert werden.

Wenn man danach googelt, ist es nicht einfach herauszufinden, wie man die Problematik der grossen Pakete lösen kann und was die Vor- und Nachteile der verschiedenen Ansätze sind. Aus diesem Grund haben wir uns entschlossen, eine Anleitung mit Best Practices zusammenzutragen und eine möglichst kurze Antwort auf die Frage zu geben.

Falls Sie die technischen Details nicht interessieren, nehme ich das Resultat meiner Recherchen gleich vorweg:

Es gibt 3 Möglichkeiten:

  1. Die Standardeinstellung anpassen
    Da schon bei wenigen grösseren Messages (>10 MB) die Performance des Clusters empfindlich gestört werden kann, ist dies nicht zu empfehlen..
  2. Die Datenpakete in kleinere Einheiten unterteilen
    Dies ist möglich, erhöht aber sowohl Komplexität wie auch den Bedarf an Memory beim Consumer deutlich.
  3. Nur Referenzen auf die Daten an den Kafka-Cluster schicken und die Datenpakete in einem anderen Gefäss speichern
    Dies ist für die Mehrheit der Use Cases die empfohlene Herangehensweise. Es erhöht die Komplexität der Anwendungen nur geringfügig und erhält alle Eigenschaften, die Kafka so interessant machen.

Im Folgenden gehe ich detailliert auf die verschiedenen Lösungsansätze ein und wie sie umgesetzt werden können.

1.    Standardeinstellung anpassen (message.max.bytes)

Es ist technisch möglich, die Standardlimiten auszuhebeln indem man Konfigurationen überschreibt. Es müssen vier Einstellungen angepasst werden:

  1. Producer: max.request.size
  2. Broker: replica.fetch.max.bytes
  3. Broker: message.max.bytes
  4. Consumer: fetch.message.max.bytes

Nachdem man diese vier Einstellungen angepasst hat, kann man technisch gesehen Nachrichten bis zum neu gesetzten Limit verarbeiten. Allerdings wird man schnell Performanceprobleme feststellen. Ab 10 MB werden Beeinträchtigungen deutlich merkbar (1). Man kann der Problematik entgegenwirken, indem man den Heapspace drastisch erhöht. Allerdings gibt es dafür keine guten Faustregeln. Dazu kommt noch, dass es alle Komponenten betrifft welche mit den Datenpaketen in Berührung kommen: Producer, Consumer und Broker.

2.    Datenpakete in kleine Einheiten unterteilen

Dies ist eine naheliegende Herangehensweise, da man keine zusätzlichen Technologien braucht und somit die Performance allein vom Kafka-Cluster abhängt.
 
Aber einzig beim Spezialfall, dass nur ein einzelner Producer mit aktivierter Idempotenz pro Partition schreibt, sind die Segmente einer Message garantiert konsekutiv (für die Lebenszeit eines Producers). Unter diesen Umständen ist die Umsetzung vergleichsweise simpel und erfordert vom Consumer höchstens so viel Memory, wie eine Message gross sein kann.

Für generellere Szenarios bedingt dieses Vorgehen einige Anpassungen am Producer, sowie zusätzliche Logik und erhöhten Memory-Bedarf beim Consumer.
 
Die zusätzliche Komplexität entsteht dadurch, dass im Normalfall die Segmente einer Nachricht nicht notwendigerweise hintereinander ins Log geschrieben, sondern durch Segmente anderer Messages unterbrochen werden können. Diese Situation werden wir im folgenden für den Producer und den Consumer mit etwas mehr Detail darlegen.
 
Weiter ist es auch möglich, dass der erste Teil einer Nachricht nach einem späteren Teil ankommen könnte wenn er zuerst fehlschlug und vom Producer wiederholt werden musste.

interrupting_segements.svg
Abbildung 1: Die ersten zwei Segmente der Message 1 werden von einem Segment der Message 2 unterbrochen

Producer:

Der Producer zerteilt die Message in Segmente und versieht diese mit zusätzlichen Metadaten. Einerseits braucht es einen Segment-Counter, damit der Consumer die Segmente in der richtigen Reihenfolge zusammensetzen kann. Anderseits braucht es eine Message-ID, damit die Segmente einer Message zugeordnet werden können. Und drittens sollte man darauf achten, dass man die Anzahl der Segmente und eine Checksumme mitschickt, damit das Ganze einfach und konsistent wieder zusammengesetzt werden kann.

Da der Producer das Versenden der Message durch das Aufrufen des Producer Callbacks erst bestätigt, nachdem das letzte Segment erfolgreich übermittelt wurde, ist es wichtig, dass diese ID aus der Message selbst deterministisch abgeleitet werden kann. Das wird für den Fall gemacht, dass ein Producer abstürzt, nachdem er schon einige Segmente versendet hat.

producer_fault.svg
Abbildung 2: Der Producer stürzt ab, bevor er alle Segmente einer Message übertragen konnte

Wenn dann ein anderer Producer übernimmt, kann er nicht wissen, welche Segmente schon an den Cluster geschickt wurden. Daher übermittelt er die gesamte Message nochmals. Somit werden die ersten Segmente doppelt gesendet. Dies muss vom Consumer festgestellt werden können. Durch eine ID, die aus der Message abgeleitet wird, kann das gewährleistet werden. Im Fall einer zufälligen UUID wäre das hingegen nicht möglich.

producer_resume.svg
Abbildung 3: Der Producer, der übernimmt, weiss nicht, welche Segmente schon übertragen wurden und beginnt daher von vorne. Dadurch werden die ersten Segmente doppelt versendet.

Consumer:

Da die Segmente einer Message durch Segmente einer anderen Message unterbrochen werden können, muss der Consumer die Segmente der nachfolgenden Messages solange buffern, bis er die Message, von der er als erstes ein Segment erhalten hat, vollständig gelesen hat. Erst wenn alle Segmente einer Message wieder zur ursprünglichen Nachricht zusammengefügt sind, kann diese verarbeitet werden.
Auch der Consumer Offset wird erst dann aufdatiert und das auf den Offset des ältesten Segments der im Buffer verbleibenden Messages.

consumer_offset.svg
Abbildung 4: Alle Segmente werden solange im Consumer gebuffert, bis die Message, von der als Erstes ein Segment gelesen wurde, komplett ist. Dann wird der Consumer Offset bis zum ältesten im Buffer verbliebenen Segments aufdatiert.

Will man sicherstellen, dass der Consumer zuverlässig arbeitet, darf der Segment-Buffer im Consumer nie grösser werden als das verfügbare Memory. Der Bedarf an Memory kann vermindert werden, indem man die Message als erstes verarbeitet, welche als Erstes komplett ist. Das heisst, nicht wie oben beschrieben, die Message verarbeiten, von welcher man als Erstes ein Segment erhalten hat. Allerdings ist dann das Consumer Offset Handling massiv komplexer und das Vorgehen hat weitere Limitationen (2).

Man kann den Consumer auch so bauen, dass der Memory-Bedarf auf die Grösse der ursprünglichen Nachrichten beschränkt ist. Allerdings erfordert das dann einen mehrstufigen Consumer mit noch mehr Logik. Wenn Sie dazu mehr erfahren möchten, können Sie mich gerne persönlich kontaktieren.

3.    Referenzen statt Daten verarbeiten

Das sicherste und einfachste Vorgehen ist, die Daten in einen externen Speicher abzulegen und lediglich die Referenz auf die extern gespeicherten Daten mit Kafka zu verarbeiten. Der Consumer liest die Referenzen zu den Daten und holt diese beim externen Speicher ab. Der externe Speicher muss natürlich ausfallsicher und schnell sein, da in diesem System das Schreiben und Lesen im externen Speicher der limitierende Faktor ist. Daher empfiehlt es sich, ein Cloud Storage zu verwenden, welches hochverfügbar ist und paralleles Schreiben und Lesen erlaubt.

reference_processing.svg
Abbildung 5: Die Daten werden vom Producer in einen externen Speicher geschrieben und vom Consumer von dort gelesen

Es stellt sich noch die Frage, wann die Daten im externen Speicher gelöscht werden sollen. Wenn nur ein Consumer von einer Partition liest, können die Daten gleich nach dem Lesen, bzw. dem Aufdatieren des Consumer Offsets gelöscht werden. Andernfalls empfiehlt es sich, dieselbe Retention Time für die Daten zu wählen, wie für die Topics, wo die Referenzen liegen.

Empfehlung je nach Anwendungsfall

Die Standardeinstellung der maximal 1 MB grossen Pakete zu verändern, macht nur Sinn, wenn die Pakete geringfügig grösser sind – weniger als 10 MB. Mit Performance- und Load-Tests muss bei dem Vorgehen zudem getestet werden, ob und wie viel zusätzliche Ressourcen (Heapspace etc.) benötigt werden.

Das Segmentieren der Nachrichten in kleinere Pakete empfiehlt sich, falls man ohne zusätzliche Technologien auskommen will und bereit ist, in komplexere Consumers zu investieren. Gleichzeitig sollte man hier beachten, wie die Produzenten geartet sind. Wenn es sehr wahrscheinlich ist, dass viele grosse Nachrichten gleichzeitig versandt werden, sollte man gegebenenfalls auf ein anderes Muster ausweichen.

In allen anderen Fällen ist es zu empfehlen, die Daten extern abzulegen und lediglich die Referenzen zu den Daten mit Kafka zu verarbeiten.

 

Quellen

  1.  Siehe Stackoverflow Beitrag
  2.  Mehr zu diesem Thema finden Sie hier: Präsentation von Jiangjie Qin