pulsar sink Pulsar Sink

Provided by: "Apache Software Foundation"

Support Level for this Kamelet is: "Stable"

Send documents to Pulsar.

Configuration Options

The following table summarizes the configuration options available for the pulsar-sink Kamelet:

Property Name Description Type Default Example

namespaceName

Pulsar Namespace Name

Required The Pulsar Namespace Name.

string

serviceUrl

Service URL

Required The Pulsar Service URL to point while creating the client from URI.

string

tenant

Tenant Name

Required The Tenant Name.

string

topic

Topic Name

Required The topic name or regexp.

string

topicType

Topic Type

Required The topic type.

Enum values:

* persistent * non-persistent

string

authenticationClass

Authentication Class

The Authentication FQCN to be used while creating the client from URI.

string

authenticationParams

Authentication Params

The Authentication Parameters to be used while creating the client from URI.

string

batchingEnabled

Enable Batching

Control whether automatic batching of messages is enabled for the producer.

boolean

true

batchingMaxMessages

Batching Maximum Messages

The maximum size to batch messages.

integer

1000

batchingMaxPublishDelayMicros

Batching Maximum Publish Delay in Microsecond

The maximum time period within which the messages sent will be batched if batchingEnabled is true.

integer

1000

blockIfQueueFull

Block If Queue Full

Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError.

boolean

false

compressionType

Compression Type

Compression type to use.

Enum values:

* ONE * LZ4 * ZLIB * ZSTD * SNAPPY

string

NONE

initialSequenceId

Initial SequenceId

The first message published will have a sequence Id of initialSequenceId 1.

integer

-1

lazyStartProducer

Number Of Consumer Threads

Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing.

boolean

false

maxPendingMessages

Maximum Pending Messages

Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true.

integer

1000

maxPendingMessagesAcrossPartitions

Maximum Pending Messages Across Partitions

The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.

integer

50000

messageRoutingMode

Message Routing Mode

Message Routing Mode to use.

Enum values:

* SinglePartition * RoundRobinPartition * CustomPartition

string

RoundRobinPartition

producerName

Producer Name

Name of the producer. If unset, lets Pulsar select a unique identifier.

string

sendTimeoutMs

Send Timeout in Milliseconds

Send timeout in milliseconds.

integer

30000

Dependencies

At runtime, the pulsar-sink Kamelet relies upon the presence of the following dependencies:

  • camel:pulsar

  • camel:kamelet

  • camel:core

Camel JBang usage

Prerequisites

  • You’ve installed JBang.

  • You have executed the following command:

jbang app install camel@apache/camel

Supposing you have a file named route.yaml with this content:

- route:
    from:
      uri: "kamelet:timer-source"
      parameters:
        period: 10000
        message: 'test'
      steps:
        - to:
            uri: "kamelet:log-sink"

You can now run it directly through the following command

camel run route.yaml

Camel K Environment Usage

This section describes how you can use the pulsar-sink.

Knative sink

You can use the pulsar-sink Kamelet as a Knative sink by binding it to a Knative object.

pulsar-sink-binding.yaml
apiVersion: camel.apache.org/v1
kind: KameletBinding
metadata:
  name: pulsar-sink-binding
spec:
  source:
    ref:
      kind: Channel
      apiVersion: messaging.knative.dev/v1
      name: mychannel
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: pulsar-sink
    properties:
      namespaceName: The Pulsar Namespace Name
      serviceUrl: The Service URL
      tenant: The Tenant Name
      topic: The Topic Name
      topicType: The Topic Type

Prerequisite

You have Camel K installed on the cluster.

Procedure for using the cluster CLI

  1. Save the pulsar-sink-binding.yaml file to your local drive, and then edit it as needed for your configuration.

  2. Run the sink by using the following command:

    kubectl apply -f pulsar-sink-binding.yaml

Procedure for using the Kamel CLI

Configure and run the sink by using the following command:

kamel bind channel:mychannel -p "sink.namespaceName=The Pulsar Namespace Name" -p "sink.serviceUrl=The Service URL" -p "sink.tenant=The Tenant Name" -p "sink.topic=The Topic Name" -p "sink.topicType=The Topic Type" pulsar-sink

This command creates the KameletBinding in the current namespace on the cluster.

Kafka sink

You can use the pulsar-sink Kamelet as a Kafka sink by binding it to a Kafka topic.

pulsar-sink-binding.yaml
apiVersion: camel.apache.org/v1
kind: KameletBinding
metadata:
  name: pulsar-sink-binding
spec:
  source:
    ref:
      kind: KafkaTopic
      apiVersion: kafka.strimzi.io/v1beta1
      name: my-topic
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: pulsar-sink
    properties:
      namespaceName: The Pulsar Namespace Name
      serviceUrl: The Service URL
      tenant: The Tenant Name
      topic: The Topic Name
      topicType: The Topic Type

Prerequisites

  • You’ve installed Strimzi.

  • You’ve created a topic named my-topic in the current namespace.

  • You have Camel K installed on the cluster.

Procedure for using the cluster CLI

  1. Save the pulsar-sink-binding.yaml file to your local drive, and then edit it as needed for your configuration.

  2. Run the sink by using the following command:

    kubectl apply -f pulsar-sink-binding.yaml

Procedure for using the Kamel CLI

Configure and run the sink by using the following command:

kamel bind kafka.strimzi.io/v1beta1:KafkaTopic:my-topic -p "sink.namespaceName=The Pulsar Namespace Name" -p "sink.serviceUrl=The Service URL" -p "sink.tenant=The Tenant Name" -p "sink.topic=The Topic Name" -p "sink.topicType=The Topic Type" pulsar-sink

This command creates the KameletBinding in the current namespace on the cluster.