Pulsar Source
Provided by: "Apache Software Foundation"
Support Level for this Kamelet is: "Stable"
Receive data from Pulsar topics.
Configuration Options
The following table summarizes the configuration options available for the pulsar-source
Kamelet:
Property | Name | Description | Type | Default | Example |
---|---|---|---|---|---|
Pulsar Namespace Name | Required The Pulsar Namespace Name. | string | |||
Service URL | Required The Pulsar Service URL to point while creating the client from URI. | string | |||
Tenant Name | Required The Tenant Name. | string | |||
Topic Name | Required The topic name or regexp. | string | |||
Topic Type | Required The topic type. Enum values: * persistent * non-persistent | string | |||
Authentication Class | The Authentication FQCN to be used while creating the client from URI. | string | |||
Authentication Params | The Authentication Parameters to be used while creating the client from URI. | string | |||
Consumer Name Prefix | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used. | string | cons | ||
Consumer Queue Size | Size of the consumer queue. | integer | 10 | ||
Dead Letter Topic | Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ. | integer | |||
Maximum Redelivery Count | Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created. | integer | |||
Message Listener | Whether to use the messageListener interface, or to receive messages using a separate thread pool. | boolean | true | ||
Negative Ack Redelivery Delay in Microseconds | Set the negative acknowledgement delay. | integer | 60000000 | ||
Number Of Consumer Threads | Number of threads to receive and handle messages when using a separate thread pool. | integer | 1 | ||
Number Of Consumers | Number of consumers. | integer | 1 | ||
Read Compacted | Enable compacted topic reading. | boolean | false | ||
Subscription Initial Position | Control the initial position in the topic of a newly created subscription. Default is latest message. Enum values: * EARLIEST * LATEST | string | LATEST | ||
Subscription Name | Name of the subscription to use. | string | subs | ||
Subscription Topics Mode | Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions. Enum values: * PersistentOnly * NonPersistentOnly * AllTopics | string | PersistentOnly | ||
Subscription Type | Type of the subscription. Enum values: * EXCLUSIVE * SHARED * FAILOVER * KEY_SHARED | string | EXCLUSIVE | ||
Topic Pattern | Whether the topic is a pattern (regular expression) that allows the consumer to subscribe to all matching topics in the namespace. | boolean | false |
Dependencies
At runtime, the pulsar-source
Kamelet relies upon the presence of the following dependencies:
-
camel:pulsar
-
camel:kamelet
-
camel:core
Camel JBang usage
Prerequisites
-
You’ve installed JBang.
-
You have executed the following command:
jbang app install camel@apache/camel
Supposing you have a file named route.yaml with this content:
- route:
from:
uri: "kamelet:timer-source"
parameters:
period: 10000
message: 'test'
steps:
- to:
uri: "kamelet:log-sink"
You can now run it directly through the following command
camel run route.yaml
Camel K Environment Usage
This section describes how you can use the pulsar-source
.
Knative source
You can use the pulsar-source
Kamelet as a Knative source by binding it to a Knative object.
apiVersion: camel.apache.org/v1
kind: KameletBinding
metadata:
name: pulsar-source-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: pulsar-source
properties:
namespaceName: The Pulsar Namespace Name
serviceUrl: The Service URL
tenant: The Tenant Name
topic: The Topic Name
topicType: The Topic Type
sink:
ref:
kind: Channel
apiVersion: messaging.knative.dev/v1
name: mychannel
Prerequisite
You have Camel K installed on the cluster.
Procedure for using the cluster CLI
-
Save the
pulsar-source-binding.yaml
file to your local drive, and then edit it as needed for your configuration. -
Run the source by using the following command:
kubectl apply -f pulsar-source-binding.yaml
Procedure for using the Kamel CLI
Configure and run the source by using the following command:
kamel bind channel:mychannel -p "source.namespaceName=The Pulsar Namespace Name" -p "source.serviceUrl=The Service URL" -p "source.tenant=The Tenant Name" -p "source.topic=The Topic Name" -p "source.topicType=The Topic Type" pulsar-source
This command creates the KameletBinding in the current namespace on the cluster.
Kafka source
You can use the pulsar-source
Kamelet as a Kafka source by binding it to a Kafka topic.
apiVersion: camel.apache.org/v1
kind: KameletBinding
metadata:
name: pulsar-source-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: pulsar-source
properties:
namespaceName: The Pulsar Namespace Name
serviceUrl: The Service URL
tenant: The Tenant Name
topic: The Topic Name
topicType: The Topic Type
sink:
ref:
kind: KafkaTopic
apiVersion: kafka.strimzi.io/v1beta1
name: my-topic
Prerequisites
-
You’ve installed Strimzi.
-
You’ve created a topic named
my-topic
in the current namespace. -
You have Camel K installed on the cluster.
Procedure for using the cluster CLI
-
Save the
pulsar-source-binding.yaml
file to your local drive, and then edit it as needed for your configuration. -
Run the source by using the following command:
kubectl apply -f pulsar-source-binding.yaml
Procedure for using the Kamel CLI
Configure and run the source by using the following command:
kamel bind kafka.strimzi.io/v1beta1:KafkaTopic:my-topic -p "source.namespaceName=The Pulsar Namespace Name" -p "source.serviceUrl=The Service URL" -p "source.tenant=The Tenant Name" -p "source.topic=The Topic Name" -p "source.topicType=The Topic Type" pulsar-source
This command creates the KameletBinding in the current namespace on the cluster.