Core Java: Parallel, aber richtig – Wie Java-Collectors unter Last bestehen

vor 11 Stunden 1

Manchmal reicht es nicht aus, dass Code funktioniert – er muss auch unter Last funktionieren. In modernen Anwendungen, die große Datenmengen verarbeiten, steht Entwicklerinnen und Entwicklern mit der Streams-API in Java ein elegantes, deklaratives Werkzeug zur Verfügung, um Daten in Pipelines zu transformieren, zu filtern und schließlich zu aggregieren. Die Vorstellung, mit wenigen Zeilen komplexe Datenoperationen zu beschreiben, ist nicht nur verführerisch, sondern tatsächlich realistisch. Doch was passiert, wenn diese Operationen auf Millionen von Einträgen treffen? Wenn die Ausführung in mehreren Threads parallel erfolgen soll, um Zeit zu sparen und Mehrkernsysteme effektiv zu nutzen?

Genau an dieser Stelle rückt ein Konzept in den Vordergrund, das oft zu wenig Beachtung findet: der Collector. Er ist das Element am Ende einer Stream-Pipeline, das bestimmt, was mit den verarbeiteten Daten geschehen soll. Und obwohl die API einfach erscheint – collect(Collectors.toList()) – verbirgt sich dahinter eine Architektur, die in paralleler Ausführung ganz eigene Herausforderungen mit sich bringt.

Im Folgenden geht es daher nicht nur um die Syntax oder die Mechanik von Collectoren, sondern um ein tiefes Verständnis für die Bedingungen, unter denen sie korrekt und effizient zum Einsatz kommen. Wir schauen auf Standardlösungen des JDK (Java Development Kit), diskutieren individuelle Implementierungen, zeigen typische Fehler – und kommen letztlich zu der Frage: Wie viel Parallelisierung verträgt ein Collector, ohne dass es gefährlich wird?

Die Streams-API von Java vermittelt auf den ersten Blick den Eindruck, dass sich das Sammeln von Ergebnissen – das sogenannte terminale Aggregieren – problemlos parallelisieren lässt. Doch hinter der Methode collect(...) verbirgt sich mehr als nur syntaktische Bequemlichkeit. Sie ist eine koordinierte Zusammenarbeit zwischen einem Datenstrom und einem Collector – einem Objekt, das aus Einzelteilen ein Ganzes formt.

Ein Collector besteht im Kern aus vier funktionalen Komponenten: dem supplier, der für jeden Teilprozess einen neuen Zwischenspeicher bereitstellt; dem accumulator, der Elemente in diesen Zwischenspeicher einspeist; dem combiner, der mehrere Zwischenspeicher zu einem zusammenführt; und schließlich dem finisher, der das Endergebnis produziert. Während supplier und accumulator auch in sequenziellen Streams essenziell sind, tritt der combiner erst dann in Aktion, wenn mehrere Threads unabhängig voneinander gesammelt haben – also bei einem parallelStream().

Hier liegt der erste fundamentale Unterschied zwischen sequenzieller und paralleler Verarbeitung: In einem sequenziellen Stream genügt es, schrittweise in einen einzigen Speicher zu akkumulieren. In der parallelen Variante hingegen entstehen mehrere voneinander isolierte Zwischenspeicher, deren Inhalte später konfliktfrei zu einem Endergebnis verschmolzen werden müssen. Dieses Verschmelzen geschieht durch den combiner – und genau an dieser Stelle entscheidet sich, ob ein Collector für parallele Verarbeitung tauglich ist oder nicht.

Die Tauglichkeit hängt von mehreren Eigenschaften ab: Die Operationen müssen assoziativ sein, also unabhängig von der Kombination der Zwischenergebnisse dasselbe Resultat liefern. Zudem darf kein geteilter Zustand ohne Synchronisierung vorliegen. Und nicht zuletzt müssen die einzelnen Schritte deterministisch und frei von Seiteneffekten bleiben – andernfalls wird aus einer Parallelisierung schnell eine Quelle subtiler Fehler.

Das Wissen um diese strukturellen Anforderungen ist der erste Schritt zu einem bewussten Einsatz paralleler Verarbeitung. Denn nur wer verstanden hat, wie Collector und Stream im Zusammenspiel funktionieren, kann abschätzen, wann ein Performancegewinn möglich ist – und wann man sich stattdessen instabile oder schlicht falsche Ergebnisse einhandelt.

Stellen wir uns vor, ein Stream wird parallel ausgeführt – etwa über ein großes Dataset, das in mehrere Segmente aufgeteilt ist. Jedes dieser Segmente wird nun unabhängig verarbeitet. Was trivial klingt, hat tiefgreifende Implikationen: Sobald mehrere Threads gleichzeitig sammeln, dürfen sich deren Zwischenergebnisse nicht in die Quere kommen. Die Verantwortung für die Korrektheit liegt beim Collector – genauer: bei seiner strukturellen und funktionalen Ausgestaltung.

Die erste grundlegende Eigenschaft ist Assoziativität. Ein combiner-Aufruf muss unabhängig von der Reihenfolge konsistente Ergebnisse liefern. combine(a, b) und combine(b, a) müssen äquivalente Resultate erzeugen. Das ist notwendig, weil die Reihenfolge der Kombination in einem parallelen Kontext vom Scheduler abhängt – und somit unvorhersagbar ist.

Der zweite Punkt betrifft den Zugriff auf Speicherstrukturen. Sobald ein Collector während der Akkumulation einen gemeinsamen, veränderbaren Zustand nutzt – etwa eine nicht synchronisierte Liste oder Map – entsteht ein potenzieller Hotspot für Race Conditions. Der Collector muss entweder ausschließlich mit lokalen, thread-isolierten Zwischenspeichern arbeiten oder sich auf nebenläufige Datenstrukturen stützen, wie etwa ConcurrentHashMap, LongAdder oder explizit synchronisierte Wrapper.

Darüber hinaus ist auch Determinismus ein wesentliches Kriterium: Eine parallele Ausführung darf nicht zu unterschiedlichen Ergebnissen führen – weder inhaltlich noch strukturell. Insbesondere bei ungeordneten Strukturen wie HashSet oder HashMap ist Vorsicht geboten, da die Iterationsreihenfolge variieren kann – was bei Collectors.joining() oder Collectors.toMap() problematisch wird, wenn die Anwendung auf Ordnung angewiesen ist.

Die drei Anforderungen Assoziativität, isolierter Zustand und Determinismus bilden den technischen Prüfstein für parallele Collectoren. Sie sind nicht optional, sondern grundlegend. Wer sie ignoriert, riskiert schwer zu reproduzierende Fehler, unvollständige Ergebnisse oder performante, aber semantisch falsche Ausgaben.

Beispiele aus der Java-Standardbibliothek: Ein naheliegender Weg, um das abstrakte Konzept paralleler Collectoren greifbar zu machen, führt über die bereits in der Java-Standardbibliothek enthaltenen Collectors. Viele Entwickler nutzen Collectors.toList(), toSet() oder joining() nahezu täglich – selten jedoch im Wissen darum, ob und wie sich diese Collectoren in einem parallelen Kontext verhalten.

Ein einfaches Beispiel: Der Collector Collectors.toList() nutzt intern eine ArrayList. Diese ist nicht thread-sicher. Folglich ist das Ergebnis bei paralleler Verwendung potenziell inkonsistent, sofern nicht intern für Isolation der Zwischenspeicher gesorgt ist.

public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>(ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }

Tatsächlich funktioniert dieser Collector in parallelen Streams dennoch korrekt, weil die Streams-API jedem Thread seinen eigenen Akkumulationsbereich zuteilt und erst am Ende über einen kombinierten Merge-Prozess zusammenführt. Der entscheidende Punkt liegt also nicht in der Datenstruktur selbst, sondern in ihrer kontrollierten Isolierung.

Weniger robust zeigt sich Collectors.groupingBy(...). Diese Variante basiert auf einer HashMap, die nicht für gleichzeitigen Zugriff ausgelegt ist. Wird dieser Collector ohne Schutzmaßnahmen in einem parallelStream() eingesetzt, drohen Race Conditions. Die Standardlösung dafür lautet Collectors.groupingByConcurrent(...), die intern auf ConcurrentHashMap setzt und somit für gleichzeitigen Zugriff konzipiert ist.

public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList()); }

Ein Blick auf die Signatur dieser Methode zeigt bereits die Intention:

Map<Integer, List<String>> result = namen.parallelStream() .collect(Collectors.groupingByConcurrent(String::length));

In diesem Beispiel werden Strings nach ihrer Länge gruppiert – in einer parallel verarbeitbaren Weise. Entscheidend ist, dass sowohl die Map-Implementierung als auch der Akkumulationsprozess thread-safe sind.

Ebenso interessant ist Collectors.toConcurrentMap(...), der explizit dafür vorgesehen ist, große Mengen von Key-Value-Paaren parallel zu aggregieren. Hier ist die Kombination von Schlüsselkonflikten und der richtige Umgang mit Merge-Funktionen von besonderem Interesse.

Die Erkenntnis aus diesen Beispielen lautet: Nicht jeder Standard-Collector ist per se für Parallelität geeignet. Nur weil eine Methode aus dem Collectors-Baukasten stammt, bedeutet das nicht, dass sie in jeder Ausführungskonfiguration korrekt funktioniert. Der Kontext entscheidet – und mit ihm die verwendete Datenstruktur, das Verhalten des combiner und die Art der Akkumulation.

Wer also aus einem Stream nicht nur ein beliebiges Ergebnis, sondern ein korrektes und performantes Ergebnis ziehen will, sollte die Wahl seines Collectors ebenso sorgfältig treffen wie das Filterkriterium am Anfang der Pipeline.

So mächtig die vorgefertigten Collectors der Java-Standardbibliothek auch sein mögen, manchmal reichen sie für spezifische Anforderungen nicht aus. Besonders wenn domänenspezifische Aggregationen, spezialisierte Datenstrukturen oder nicht-triviale Reduktionslogik benötigt werden, lohnt sich ein Blick auf die Möglichkeit, eigene Collector-Implementierungen zu erstellen.

In der Regel lässt sich ein eigener Collector mit der statischen Methode Collector.of(...) erstellen. Diese Methode erwartet fünf Parameter: einen Supplier<A>, der einen neuen Akkumulator erzeugt; einen BiConsumer<A, T>, der ein Element in den Akkumulator einfügt; einen BinaryOperator<A> zum Kombinieren zweier Akkumulatoren; optional eine Function<A, R> zur Konvertierung des Ergebnisses; und schließlich ein Array Collector.Characteristics..., das Metainformationen wie CONCURRENT oder UNORDERED bereitstellt.

Ein einfacher, aber aussagekräftiger Collector könnte etwa Zeichenketten parallel zu einer ConcurrentLinkedQueue sammeln:

Collector<String, ?, Queue<String>> toConcurrentQueue() { return Collector.of( ConcurrentLinkedQueue::new, Queue::add, (left, right) -> { left.addAll(right); return left; }, Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED ); }

Dieser Collector ist sowohl CONCURRENT als auch UNORDERED, das bedeutet: Er kann von mehreren Threads gleichzeitig beschrieben werden, ohne dass die Einfügereihenfolge garantiert werden muss. Wichtig ist dabei, dass ConcurrentLinkedQueue als thread-sichere Datenstruktur fungiert und die Operation addAll ebenfalls nebenläufig unkritisch ist.

Doch auch komplexere Szenarien sind denkbar, etwa das parallele Ermitteln von statistischen Kennzahlen (Minimum, Maximum, Durchschnitt) über eine Datenmenge. In solchen Fällen kann ein record als Akkumulatorstruktur dienen, der in sich bereits alle benötigten Teilzustände kapselt. Der combiner muss dann lediglich diese Strukturen feldweise konsolidieren.

Eigene Collector-Implementierungen zwingen dazu, sich mit der Parallelisierbarkeit der genutzten Datenstrukturen und der Kombinierbarkeit der Aggregationslogik intensiv auseinanderzusetzen. Das ist kein Nachteil, sondern ein wertvoller Lerneffekt. Denn nur wer versteht, was ein Collector im Inneren macht, kann ihn bewusst und sicher einsetzen.

Wer Collectoren im Parallelisierungskontext produktiv einsetzen möchte, sollte einige bewährte Strategien berücksichtigen – nicht als starre Regeln, sondern als Orientierungsrahmen für robuste und effiziente Implementierungen.

Ein erster Grundsatz lautet: Nur parallelisieren, wenn ein echter Nutzen zu erwarten ist. Kleine Datenmengen, triviale Transformationen oder IO-gebundene Prozesse profitieren in der Regel nicht von parallelStream(). Im Gegenteil: Der Overhead des Thread-Managements kann den potenziellen Performancegewinn sogar übersteigen. Eine Parallelisierung lohnt sich erst dann, wenn die zu verarbeitenden Datenmengen hinreichend groß und die Operationen CPU-intensiv sind.

Zweitens: Nur thread-sichere oder isolierte Datenstrukturen verwenden. Das bedeutet entweder, dass jeder Thread seinen eigenen Akkumulator nutzt – was die Streams-API intern unterstützt – oder dass explizit nebenläufige Datenstrukturen wie ConcurrentHashMap, ConcurrentLinkedQueue oder atomare Wrapper eingesetzt werden.

Drittens: Collectors gezielt auswählen. Die Standardbibliothek bietet mit groupingByConcurrent, toConcurrentMap oder mapping leistungsfähige Werkzeuge, die speziell für den parallelen Einsatz konzipiert wurden. Wer darüber hinaus eigene Lösungen entwickelt, sollte besonderes Augenmerk auf den combiner und die Assoziativität der Logik legen.

Viertens: Ergebnisse validieren – insbesondere bei neuen oder komplexen Pipelines. Parallele Streams verhalten sich nicht deterministisch in der Ausführung, deshalb sind Tests in unterschiedlichen Auslastungsszenarien und unter variierender Last notwendig. Das gilt vor allem dann, wenn Entwicklerinnen oder Entwickler Collectoren selbst entwickeln oder anpassen.

Und nicht zuletzt: Messen statt vermuten. Tools wie JMH (Java Microbenchmark Harness), Flight Recorder oder async-profiler helfen dabei, realistische Aussagen über die Performancevorteile zu treffen. Parallelisierung ohne Metriken ist wie Blindflug mit Rückenwind – vielleicht schneller, aber womöglich in die falsche Richtung.

(Bild: Playful Creatives / Adobe Stock)

Am 14. Oktober findet die betterCode() Java 2025 statt. Bei der von iX und dpunkt.verlag ausgerichteten Online-Konferenz dreht sich alles um das für September geplante Java 25, das auch als LTS-Release verfügbar sein wird. Außerdem gibt es eine Keynote und ein Panel zu 30 Jahren Java und einen Vortrag zu ML für Java-Anwendungen.

Gesamten Artikel lesen