Try it out locally

Run Kafka

Install a locally running kafka instance by following Apache Kafka quickstart guide. This usually boils down to:

Set KAFKA_HOME
export KAFKA_HOME=<your kafka install dir>
Start Zookeeper cluster
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
Start Kafka broker
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Create "mytopic" topic
$KAFKA_HOME/bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic mytopic

For using the quickstart we’ll use the plugin.path property, so you’ll have to add a path for your connectors.

Open your configuration file located at $KAFKA_HOME/config/connect-standalone.properties

and add a property at the end

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/connectors

At this point you’re able to run the connectors quickstart.

Next, run Camel kafka connectors source and/or sink:

You can use these Kafka utilities to listen or produce from a Kafka topic:

Run an Kafka Consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
Run an interactive CLI kafka producer
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic

Try some examples

For the following examples you need to fetch the camel-kafka-connector project and build it locally. You can either build the entire project with ./mvnw package from the root directory (which may take some time) or build just the connectors you are interested in with the following command from within a connector’s directory.

> cd connectors/camel-log-kafka-connector/
> mvn package -pl camel-timer-kafka-connector -am

Look into the config and docs/examples directories for the configuration files (*.properties) of the examples showcased here. There is also a comprehensive set of examples with instructions on how to run them in this repository.

Simple logger (sink)

Unzip or untar the camel-log-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-log-kafka-connector/target/ a .zip file named camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-log-kafka-connector/target/camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip
Run the default sink, just a camel logger:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelLogSinkConnector.properties

Timer (source)

Unzip or untar the camel-timer-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-timer-kafka-connector/target/ a .zip file named camel-timer-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-timer-kafka-connector/target/camel-log-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-timer-kafka-connector-0.6.0-SNAPSHOT-package.zip

This is an example of a source that produces a message every second to mytopic.

Run the default source, just a camel timer:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelTimerSourceConnector.properties

AWS Kinesis (source)

Unzip or untar the camel-aws-kinesis-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-kinesis-kafka-connector/target/ a .zip file named camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-aws-kinesis-kafka-connector/target/camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-kinesis-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example consumes from AWS Kinesis data stream and transfers the payload to mytopic topic in Kafka.

Adjust properties in examples/CamelAWSKinesisSourceConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-kinesis.configuration.access-key=youraccesskey, camel.component.aws-kinesis.configuration.secret-key=yoursecretkey and camel.component.aws-kinesis.configuration.region=yourregion.

Run the AWS Kinesis source:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSKinesisSourceConnector.properties

AWS SQS (sink)

Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-sqs-kafka-connector/target/ a .zip file named camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-aws-sqs-kafka-connector/target/camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example consumes from Kafka topic mytopic and transfers the payload to AWS SQS.

Adjust properties in examples/CamelAWSSQSSinkConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey, camel.component.aws-sqs.configuration.secret-key=yoursecretkey and camel.component.aws-sqs.configuration.region=yourregion

Run the AWS SQS sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSQSSinkConnector.properties

AWS SQS (source)

Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-sqs-kafka-connector/target/ a .zip file named camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-aws-sqs-kafka-connector/target/camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sqs-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example consumes from AWS SQS queue mysqs and transfers the payload to mytopic topic in Kafka.

Adjust properties in examples/CamelAWSSQSSourceConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sqs.configuration.access-key=youraccesskey, camel.component.aws-sqs.configuration.secret-key=yoursecretkey and camel.component.aws-sqs.configuration.region=yourregion

Run the AWS SQS source:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSQSSourceConnector.properties

AWS SNS (sink)

Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-sns-kafka-connector/target/ a .zip file named camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-aws-sns-kafka-connector/target/camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-sns-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example consumes from mytopic Kafka topic and transfers the payload to AWS SNS topic topic.

Adjust properties in examples/CamelAWSSNSSinkConnector.properties for your environment, you need to configure access key, secret key and region by setting camel.component.aws-sns.configuration.access-key=youraccesskey, camel.component.aws-sns.configuration.secret-key=yoursecretkey and camel.component.aws-sns.configuration.region=yourregion

Run the AWS SNS sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSSNSSinkConnector.properties

AWS S3 (source)

Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-aws-s3-kafka-connector/target/ a .zip file named camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-aws-s3-kafka-connector/target/camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-aws-s3-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example fetches objects from AWS S3 in the camel-kafka-connector bucket and transfers the payload to mytopic Kafka topic. This example shows how to implement a custom converter converting from bytes received from S3 to Kafka’s SchemaAndValue.

Adjust properties in examples/CamelAWSS3SourceConnector.properties for your environment, you need to configure access key, secret key and region by adding camel.component.aws-s3.configuration.access-key=youraccesskey, camel.component.aws-s3.configuration.secret-key=yoursecretkey and camel.component.aws-s3.configuration.region=yourregion

Run the AWS S3 source:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelAWSS3SourceConnector.properties

Apache Cassandra

Unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-cql-kafka-connector/target/ a .zip file named camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-cql-kafka-connector/target/camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip

This examples require a running Cassandra instance, for simplicity the steps below show how to start Cassandra using Docker. First you’ll need to run a Cassandra instance:

docker run --name master_node --env MAX_HEAP_SIZE='800M' -dt oscerd/cassandra

Next, check and make sure Cassandra is running:

docker exec -ti master_node /opt/cassandra/bin/nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens       Owns (effective)  Host ID                               Rack
UN  172.17.0.2  251.32 KiB  256          100.0%            5126aaad-f143-43e9-920a-0f9540a93967  rack1

To populate the database using to the cqlsh tool, you’ll need a local installation of Cassandra. Download and extract the Apache Cassandra distribution to a directory. We reference the Cassandra installation directory with LOCAL_CASSANDRA_HOME. Here we use version 3.11.4 to connect to the Cassandra instance we started using Docker.

<LOCAL_CASSANDRA_HOME>/bin/cqlsh $(docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node)

Next, execute the following script to create keyspace test, the table users and insert one row into it.

create keyspace test with replication = {'class':'SimpleStrategy', 'replication_factor':3};
use test;
create table users ( id int primary key, name text );
insert into users (id,name) values (1, 'oscerd');
quit;

In the configuration .properties file we use below the IP address of the Cassandra master node needs to be configured, replace the value 172.17.0.2 in the camel.source.url or localhost in camel.sink.url configuration property with the IP of the master node obtained from Docker. Each example uses a different .properties file shown in the command line to run the example.

docker inspect --format='{{ .NetworkSettings.IPAddress }}' master_node

Apache Cassandra (source)

This example polls Cassandra via CSQL (select * from users) in the test keyspace and transfers the result to the mytopic Kafka topic.

Run the Cassandra CQL source:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelCassandraQLSourceConnector.properties

Apache Cassandra (sink)

First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-cql-kafka-connector/target/ a .zip file named camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-cql-kafka-connector/target/camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-cql-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example adds data to the users table in Cassandra from the data consumed from the mytopic Kafka topic. Notice how the name column is populated from the Kafka message using CQL command insert into users…​.

Run the Cassandra CQL sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelCassandraQLSinkConnector.properties

Elasticsearch (sink)

First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-elasticsearch-rest-kafka-connector/target/ a .zip file named camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-elasticsearch-rest-kafka-connector/target/camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-elasticsearch-rest-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example passes data from mytopic Kafka topic to sampleIndexName index in Elasticsearch. Adjust properties in docs/examples/CamelElasticSearchSinkConnector.properties to reflect your environment, for example change the hostAddresses to a valid Elasticsearch instance hostname and port.

For the index operation, it might be necessary to provide or implement a transformer. A sample configuration would be similar to the one below:

transforms=ElasticSearchTransformer

This is the sample Transformer used in the integration test code that transforms Kafka’s ConnectRecord to a Map:

transforms.ElasticSearchTransformer.type=org.apache.camel.kafkaconnector.elasticsearch.sink.transforms.ConnectRecordValueToMapTransformer

This is a configuration for the sample transformer that defines the key used in the map:

transforms.ElasticSearchTransformer.key=MyKey

When the configuration is ready run the sink with:

Run the Elasticsearch sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelElasticSearchSinkConnector.properties

File (sink)

First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-file-kafka-connector/target/ a .zip file named camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-file-kafka-connector/target/camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-file-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example appends data from mytopic Kafka topic to a file in /tmp/kafkaconnect.txt.

Run the file sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelFileSinkConnector.properties

HTTP (sink)

First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-http-kafka-connector/target/ a .zip file named camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-http-kafka-connector/target/camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-http-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example sends data from mytopic Kafka topic to a HTTP service. Adjust properties in docs/examples/CamelHttpSinkConnector.properties for your environment, for example configuring the camel.sink.url.

Run the http sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelHttpSinkConnector.properties

JMS (source)

First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-sjms2-kafka-connector/target/ a .zip file named camel-sjsm2-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-sjsm2-kafka-connector/target/camel-sjms2-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-sjsm2-kafka-connector-0.6.0-SNAPSHOT-package.zip

These are the basic connectors. For camel-sjms2 we have a bunch of provided dependencies we need to add in our path, so run the following commands (note that this is not needed from 0.7.0 onward for ActiveMQ and Artemis JMS clients, as their dependecies are packaged along with the SJMS2 connector):

> cd /home/connectors/camel-sjms2-kafka-connector
> wget https://repo1.maven.org/maven2/org/apache/activemq/activemq-client/5.15.11/activemq-client-5.15.11.jar
> wget https://repo1.maven.org/maven2/org/apache/geronimo/specs/geronimo-jms_2.0_spec/1.0-alpha-2/geronimo-jms_2.0_spec-1.0-alpha-2.jar
> wget https://repo1.maven.org/maven2/org/apache/geronimo/specs/geronimo-annotation_1.0_spec/1.1.1/geronimo-annotation_1.0_spec-1.1.1.jar
> wget https://repo1.maven.org/maven2/javax/management/j2ee/management-api/1.1-rev-1/management-api-1.1-rev-1.jar
> wget https://repo1.maven.org/maven2/org/fusesource/hawtbuf/hawtbuf/1.11/hawtbuf-1.11.jar

This example receives messages from a JMS queue named myqueue and transfers them to mytopic Kafka topic. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616. Adjust properties in examples/CamelJmsSourceConnector.properties for your environment, for example configuring username and password by setting camel.component.sjms2.connection-factory.userName=yourusername and camel.component.sjms2.connection-factory.password=yourpassword or change the camel.component.sjms2.connection-factory and camel.component.sjms2.connection-factory.brokerURL to reflect your JMS implementation and URL.

Run the JMS source:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelJmsSourceConnector.properties

JMS (sink)

This example receives messages from mytopic Kafka topic and transfers them to JMS queue named myqueue. In this example ActiveMQ is used and it’s configured to connect to the broker running on localhost:61616. You can adjust properties in examples/CamelJmsSinkConnector.properties for your environment, for example configure username and password by adding camel.component.sjms2.connection-factory.userName=yourusername and camel.component.sjms2.connection-factory.password=yourpassword or change the camel.component.sjms2.connection-factory and camel.component.sjms2.connection-factory.brokerURL to reflect your JMS implementation and URL.

Run the JMS sink:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelJmsSinkConnector.properties

Telegram (source)

First thing to do, is unzip or untar the camel-aws-sqs-kafka-connector archive in the plugin.path location. After building the project you should have in connectors/camel-telegram-kafka-connector/target/ a .zip file named camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip

> cd /home/connectors/
> cp connectors/camel-telegram-kafka-connector/target/camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip .
> unzip camel-telegram-kafka-connector-0.6.0-SNAPSHOT-package.zip

This example transfers messages sent to Telegram bot to the mytopic Kafka topic. Adjust to set telegram bot token in examples/CamelTelegramSourceConnector.properties to reflect your bot’s token.

Run the telegram source:
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties docs/examples/CamelTelegramSourceConnector.properties