Aggregation
In a Sink Connector scenario, there are, sometimes, use cases where an end user want to aggregate his Kafka record before sending them to an external system. Usually this can be done by defining a batch size or a timeout and once the aggregation has been completed, sent the aggregate records collection to the external system.
In Apache Camel it exists the Aggregate EIP implementation and in Camel-Kafka-connector we wanted to leverage what we already have in the plain Apache Camel project.
We introduced then the following options in the Sink Connector Configuration:
camel.beans.aggregate=#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator
camel.aggregation.size=10
camel.aggregation.timeout=5000
So you’ll be able to define your own AggregationStrategy by writing it through extending the AggregationStrategy Camel class or you can use one of the Aggregator provided by Camel.
We are working on adding some Aggregator out of the box in camel-kafka-connector.
How Does an aggregator look like?
An Aggregator is something like the following:
package org.apache.camel.kafkaconnector.aggregator;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
public class StringAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// lets append the old body to the new body
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
if (body != null) {
Message newIn = newExchange.getIn();
String newBody = newIn.getBody(String.class);
if (newBody != null) {
body += System.lineSeparator() + newBody;
}
newIn.setBody(body);
}
return newExchange;
}
}
And you may think oldExchange and newExchange like records arriving to the Aggregator.
So in this case each newExchange body will be concatenated with the oldExchange body and separated through the System line separator.
This process will go ahead until the batch size has been completed or the timeout has been reached.
How Can I extend an existing connector for adding an aggregator?
Take a look at Extending Connectors Documentation