Kamelets Developer Guide

Introduction

This document guides you through the process of developing a new Kamelet that can be used by any Apache Camel subproject supporting the Kamelet technology stack and shared with others via Kamelet catalogs, such as the official the Apache Camel Kamelet Catalog.

We assume that the reader is familiar with the content of the Kamelets User Guide and with Camel K installation and general usage.

Basics

If you started to learn a bit about Kamelets, you’ve seen that they can be used to create two kinds of connectors:

  • Sources: they produce data that can be injected into a destination

  • Sinks: they consume data and optionally produce a response

When creating a new Kamelet, you should first decide first which kind of Kamelet you’re going to create, which depends on the use case you’ve in mind. A Kamelet does a single thing, so if you want to provide both a source and a sink for your system, they are two Kamelets.

In its essence, a Kamelet consists of a single YAML file that contains information on two distinct aspects of the connector:

  • User view: this part contains general documentation about the Kamelet, covering also the parameters that need to be configured in order to use it

  • Runtime aspects: this part tells the Camel runtime how to implement what the Kamelet promises to do. Most of the times it contains a Camel route template in YAML DSL

We’re ignoring here the part around data types of a Kamelet, which is not fundamental for the Kamelet to work and it is still subject to change

We’ll guide you through the process of creating a simple Kamelet by remapping a Camel component, then we’ll go through a much more complicated real-world example.

Creating a simple Kamelet

Since Apache Camel provides more than 300 components out of the box, it’s easy to create a Kamelet starting from one of the components already available. Most of the Kamelets available in the official catalog, in fact, are simple ones that contain only a remapping of the Kamelet properties into Camel endpoint parameters. We’re going to show an example shortly.

Suppose that you want to provide a Kamelet that allows users to search data on Twitter, providing a stream of information about a given keyword. Creating such a Kamelet is a fairly easy task: we can use options of the "camel-twitter" component without adding much processing logic.

So the procedure of writing a simple Kamelet starts with scaffolding a new Kamelet resource, which can be done with the Camel JBang CLI (camel):

camel init twitter-search-source.kamelet.yaml

This produces a YAML file like the following one:

twitter-search-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: twitter-search-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
    title: "Timer"
    description: "Produces periodic events with a custom payload"
    required:
      - message
    properties:
      period:
        title: Period
        description: The time interval between two events
        type: integer
        default: 1000
      message:
        title: Message
        description: The message to generate
        type: string
  dataTypes:
    out:
      default: text
      types:
        text:
          mediaType: text/plain
  template:
    from:
      uri: timer:tick
      parameters:
        period: "{{period}}"
      steps:
        - setBody:
            constant: "{{message}}"
        - to: "kamelet:sink"

We need to change the file to do what we want to achieve, that is, creating a route that searches a given keyword on Twitter.

The route provided in the initial scaffold (timer-to-log) is not what we need, so we change it to the following:

twitter-search-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
# ...
spec:
# ...
  template:
    from:
      uri: "twitter-search:{{keywords}}" (1)
      parameters:
        accessToken: "{{accessToken}}" (2)
        accessTokenSecret: "{{accessTokenSecret}}"
        consumerKey: "{{apiKey}}" (3)
        consumerSecret: "{{apiKeySecret}}"
      steps:
      - marshal: (4)
          json: {}
      - to: "kamelet:sink" (5)
1 keywords is a path parameter in Camel Twitter-search
2 Some endpoint parameters are just mapped 1-1
3 The Camel component consumerKey is named apiKey to reflect the actual name in the Twitter developer portal
4 The Camel Twitter component generates Java objects, so we marshal them to JSON
5 A Source Kamelet sends data to the special endpoint "kamelet:sink", that will be replaced at runtime by a different target

The YAML route template above just uses the twitter-search component to do searches on Twitter. We added a marshalling step to JSON because the output of a Kamelet needs always to be something that can be transferred over the wire.

The Kamelet is almost complete, we just need to document the parameters in a JSON schema format. We specify it in the specdefinition part:

twitter-search-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: twitter-search-source
# ...
spec:
  definition:
    title: "Twitter Search Source" (1)
    description: |-
      Allows to get all tweets on particular keywords from Twitter.

      It requires tokens that can be obtained by creating an application
      in the Twitter developer portal: https://developer.twitter.com/.
    required: (2)
    - keywords
    - apiKey
    - apiKeySecret
    - accessToken
    - accessTokenSecret
    properties:
      keywords: (3)
        title: Keywords
        description: The keywords to use in the Twitter search (Supports Twitter standard operators)
        type: string
        example: "Apache Camel"
      apiKey:
        title: API Key
        description: The API Key from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password (4)
      apiKeySecret:
        title: API Key Secret
        description: The API Key Secret from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
      accessToken:
        title: Access Token
        description: The Access Token from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
      accessTokenSecret:
        title: Access Token Secret
        description: The Access Token Secret from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
# ...
1 General information about the Kamelet itself in textual format
2 List of required parameters
3 A specification for each one of the parameters (flat structure, no nested options allowed)
4 Optional graphical customization for a specific UI (OpenShift Console)

This is all you need to create a Kamelet so that other users can leverage it. There are a few things remaining, like setting information about the generated objects and other metadata (like the icon and the provider and you’re done). The final Kamelet can look like the following:

twitter-search-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: twitter-search-source
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,..." # Truncated (1)
    camel.apache.org/provider: "Apache Software Foundation"
  labels:
    camel.apache.org/kamelet.type: "source"
    camel.apache.org/kamelet.group: "Twitter"
spec:
  definition:
    title: "Twitter Search Source"
    description: |-
      Allows to get all tweets on particular keywords from Twitter.

      It requires tokens that can be obtained by creating an application
      in the Twitter developer portal: https://developer.twitter.com/.
    required:
    - keywords
    - apiKey
    - apiKeySecret
    - accessToken
    - accessTokenSecret
    properties:
      keywords:
        title: Keywords
        description: The keywords to use in the Twitter search (Supports Twitter standard operators)
        type: string
        example: "Apache Camel"
      apiKey:
        title: API Key
        description: The API Key from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
      apiKeySecret:
        title: API Key Secret
        description: The API Key Secret from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
      accessToken:
        title: Access Token
        description: The Access Token from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
      accessTokenSecret:
        title: Access Token Secret
        description: The Access Token Secret from the Twitter application in the developer portal
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
  dataTypes: (2)
    out:
      default: json
      types:
        json:
          mediaType: application/json
  template: (3)
    from:
      uri: "twitter-search:{{keywords}}"
      parameters:
        accessToken: "{{accessToken}}"
        accessTokenSecret: "{{accessTokenSecret}}"
        consumerKey: "{{apiKey}}"
        consumerSecret: "{{apiKeySecret}}"
      steps:
      - marshal:
          json: {}
      - to: "kamelet:sink"
1 An icon with an appropriate license, better using svg+base64 URL encoding. You can encode icons using services like this one
2 The dataTypes section indicates that the Kamelet is going to produce JSON data as a default. The Kamelet is able to define multiple data types for in/out/error. The user will then be able to choose on of the data types in a Pipe when referencing the Kamelet.
3 The previous YAML flow

The Kamelet can be shared on the Catalog and or created on a Kubernetes cluster to let users use it.

Trying it out

A simple way to try it out is to apply it on a cluster, together with a simple binding. Assuming that you have a Kubernetes cluster and you’re connected to a namespace where the Camel K operator can act, just create the Kamelet:

kubectl apply -f twitter-search-source.kamelet.yaml

Then you can create a binding like the following one to try it out:

twitter-search-source-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: twitter-search-source-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: twitter-search-source
    properties:
      keywords: "Apache Camel"
      apiKey: "your own"
      apiKeySecret: "your own"
      accessToken: "your own"
      accessTokenSecret: "your own"
  sink:
    uri: "log:info"

This can be created using:

kubectl apply -f twitter-search-source-binding.yaml

Once created, you can see the logs of the binding using:

kamel logs twitter-search-source-binding

If everything goes right, you should get some tweets in the logs after the integration is created.

Refer to the Kamelets User Guide for more information on how to use it in different contexts (like Knative, Kafka, etc.).

Kamelet versions

The catalog containing a set of Kamelets is generally developed in order to be used with a given Camel version (see the Apache Camel Kamelets catalog). However, when publishing the Kamelet to the cluster you may want to maintain more than one version for any reason (ie, to use a different dependency and be able to support multiple runtimes). You can therefore use the .spec.versions parameter to optionally maintain a set of alternative versions beside the main (and default) one.

my-timer-source.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: my-timer-source
spec:
  definition:
    title: "Timer Example"
  types:
    out:
      mediaType: text/plain
  template:
    from:
      uri: timer:tick
      steps:
        - setBody:
            constant: "Kamelet Main"
        - to: "kamelet:sink"
  versions:
    v2:
      definition:
        title: "Timer Example 2"
      types:
        out:
          mediaType: text/plain
      template:
        from:
          uri: timer:tick
          steps:
            - setBody:
                constant: "Kamelet V2"
            - to: "kamelet:sink"
make sure the overall content fits into 1 MiB, which is the storage limit for a Custom Resource.

This is a way to handle multiple version on Kubernetes and may not be supported out of the box by Camel core. If the Integration will require specifically to use kamelet:my-timer-source?kameletVersion=v2, then, the operator will mount properly the specification on the running application.

The .spec.versions field may not be necessarily supported by the core as it’s meant to provide a way to handle versioning on the cluster only. The runtime must be provided with a materialized Kamelet file with the chosen spec (the operator is in charge of that).

Kamelet data types

A Kamelet usually encapsulates a specific functionality and serves a very opinionated use case with well-defined input parameters and outcome.

In order to enhance the Kamelet interoperability with other components the Kamelet may specify one to many data types for input, output and error scenarios. The declaration of supported Kamelet data types helps users to incorporate the Kamelet into their specific applications.

When referencing a Kamelet users may choose from a list of supported input/output data types in order to gain best fit for the individual use case.

Following from that each Kamelet may declare all supported input/output data types each of them providing additional information like header names, content type, content schema and so on.

my-sample-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: my-sample-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
# ...
  dataTypes:
    out: (1)
      default: application-json (2)
      headers:
        MySpecialCamelHeaderName: (3)
          type: string
          description: Some specific header
      types: (4)
        application-json:
          description: Output type as Json object
          mediaType: application/json
          schema: (5)
            type: object
            description: The Json object representing the my-sample source output
            properties:
              # ...
          dependencies: (6)
            - "camel:jackson"
        text-plain:
          description: Output type as plain text
          mediaType: text/plain
  template:
    from:
      uri: ...
      steps:
        - to: "kamelet:sink"
1 Declared output data types of this Kamelet source
2 The output data type used by default
3 Declaration of output headers with header name, type and description information
4 List of supported output types
5 Optional Json schema describing the application-json data type
6 Optional list of additional dependencies that are required by the data type.

The sample Kamelet above declares two supported output data types application-json and text-plain. Each declared data type is backed by a specific Apache Camel transformer implementation that is capable of producing the specific output. The respective transformer implementation may be provided by the Kamelet as a utility extension or by the underlying Apache Camel component.

As a result the user may now choose the output data type when referencing the Kamelet in a binding.

my-sample-source-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-sample-source-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: my-sample-source
    data-types: (1)
      out:
        format: text-plain (2)
  sink:
    uri: "log:info"
1 Chose the output data type on the Kamelet source reference in a Pipe.
2 Select text-plain as an output data type of the my-sample-source Kamelet.

The very same concept of data types can also be used on Kamelet sinks and input data types. As soon as the user chooses a specific input data type for a Kamelet the Pipe processing will try to resolve a matching transformer implementation and apply its logic.

by default, the operator will use a data-type-action Kamelet that has to be an available Kamelet in the catalog. This is provided out of the box installing bundled Apache Kamelet catalog. It will fail if the Kamelet is not available. You can also override the Kamelet action to use adding the camel.apache.org/kamelet.data.type annotation to the Pipe specification.

You may also use the data-type-action Kamelet in your Pipe binding in order to apply a specific data type transformation at any step.

my-sample-source-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: my-sample-source-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: my-sample-source
    data-types:
      out:
        format: application-json (1)
  steps:
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: json-deserialize-action (2)
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: resolve-pojo-schema-action (3)
      properties:
        mimeType: "avro/binary"
        schema: >
          { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] }
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: data-type-action (4)
      properties:
        scheme: "camel"
        format: "avro-binary"
  sink:
    uri: "log:info"
1 Chose the output data application-json type on the Kamelet source.
2 Deserialize the Json object with json-deserialize-action.
3 Declare a Avro schema
4 Use the data-type-action Kamelet to transform the Json object into Avro using the formerly declared schema

The Pipe in the sample above uses a combination of Kamelet output data type, Json deserialization and Avro binary data type to transform the Kamelet source output.

All referenced data types are backed by a specific transformer implementation either provided by the Kamelet itself or by pure Apache Camel functionality.

Creating a complex Kamelet

We’re now going to create a Kamelet with a high degree of complexity, to show how the Kamelet model can be used also to go over the functionality provided by a single Camel Component.

This example is complicated on purpose and uses several components and EIPs from Apache Camel: luckily your Kamelets will be much simpler than this one.

It will be a Kamelet of type "source", but most of the principles explained here can be taken into account also when developing a Kamelet of type "sink". The technical differences between the two scenarios will be highlighted in the "Creating a sink Kamelet" section.

We’re going to take a real world use case having a moderate complexity: we want to create a source of eartquake events around the world, taking data from the USGS APIs.

Step 1: write an end-to-end integration

Contrary to what one might expect, the first thing you need to do is to forget about Kamelets and just try to write a Camel K integration that consumes the earthquake data.

You may choose the language that you prefer to write the first integration, even writing it directly in YAML. We write it using the Java DSL because that is the language that most Apache Camel users are familiar with and it’s also supported by the tooling.

For a great developer experience, we suggest to use Visual Studio Code with the Camel Extension Pack

We start from scratch by creating an integration file with Camel JBang CLI:

camel init Earthquake.java

This will scaffold a Java source file with a timer-to-log integration, that we’ll edit according to our need. A first version of the integration might look like the following:

Earthquake.java
// camel-k: language=java

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.Exchange;

public class Earthquake extends RouteBuilder {
  @Override
  public void configure() throws Exception {

    from("timer:earthquake?period=10000") (1)
      .setHeader(Exchange.HTTP_METHOD).constant("GET")
      .to("https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson") (2)
      .convertBodyTo(String.class)
      .to("log:info"); (3)

  }
}
1 We do a timed poll from the API because there’s no way to consume it direcly
2 Look at https://earthquake.usgs.gov/fdsnws/event/1/ for more information about the API. We’re using the GeoJSON format
3 The integration ends in a "log:info" endpoint, because we just want to see if we can contact the API and get some results back

In order to run the integration above, if you have a Kubernetes cluster with Camel K installed, you can rely on that using kamel run Earthquake.java, but there’s a simpler solution that just requires your own machine:

camel run Earthquake.java

The camel run command relies on Camel JBang to locally run the integration. The integration will start and begin printing out earthquake data every 10 seconds.

I show an excerpt of what is printed by the integration:

{
   "type":"FeatureCollection",
   "metadata":{
      "generated":1614860715000,
      "url":"https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson",
      "title":"USGS Earthquakes",
      "status":200,
      "api":"1.10.3",
      "count":10762
   },
   "features":[
      {
         "type":"Feature",
         "properties":{
            "mag":2.17,
            "place":"27km ENE of Pine Valley, CA",
            "time":1614859396200,
            "updated":1614860064420,
            "url":"https://earthquake.usgs.gov/earthquakes/eventpage/ci39808832",
            "detail":"https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ci39808832&format=geojson",
            "status":"automatic",
            "tsunami":0,
            "sig":72,
            "net":"ci",
            "code":"39808832",
            "ids":",ci39808832,",
            "sources":",ci,",
            "types":",focal-mechanism,nearby-cities,origin,phase-data,scitech-link,",
            "nst":57,
            "dmin":0.04475,
            "rms":0.22,
            "gap":60,
            "magType":"ml",
            "type":"earthquake",
            "title":"M 2.2 - 27km ENE of Pine Valley, CA"
         },
         "geometry":{
            "type":"Point",
            "coordinates":[
               -116.2648333,
               32.9236667,
               3.54
            ]
         },
         "id":"ci39808832"
      }
    ]
}
We’ve truncated the list of "features" to just the first one, but it contains a lot more data

Step 2 (optional): iterate on the integration

Since the integration above produces useful data, its route could be technically used to build a source Kamelet, but there are a few problems we may want to address before publishing it:

  1. It produces a lot of data (10762 events, last 30 days by default). We may want to start emitting events of the last e.g. 2 hours by default for this use case: we can add a filter on the query to accomplish this.

  2. It produces a collection of features (earthquake events), while you may want to push to the destination the individual features. We can use Camel’s built-in split and jsonpath support to split the collection into separate entries.

  3. It continuously produces the same data: i.e. just wait another 10 seconds and you’ll get the same data again and again (with a shift of 10 seconds over the last 30 days). A good approach here is to try to filter out duplicates at the source as much as possible. We can think to store the time when the last update has been generated by the server and use it in subsequent queries to only obtain new events. This will not guarantee an "exactly once" semantics, because e.g. if the integration is restarted it will lose the in-memory state and start from the beginning, but it prevents sending an high amount of redundant data if the integration is kept alive. To store the time when last result has been generated by the API, we can use one of the in-memory caches that Camel provides, such as camel-caffeine-cache.

We’re going to use an in-memory cache because we need to store a single value. When using stateful data repositories, such as caches, it’s always a good practice to limit their size to a low value and avoid them to increase their size over time
If an end-to-end "exactly once" semantics is needed, you could later add a stateful idempotent repository in the global integration, but these aspects should be external to the Kamelet definition

Let’s try sorting out these issues in the route (we publish here the final version):

Earthquake.java
// camel-k: language=java

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ClaimCheckOperation;
import org.apache.camel.Exchange;

public class Earthquake extends RouteBuilder {
  @Override
  public void configure() throws Exception {

    from("timer:earthquake?period=10000")
      .setHeader("CamelCaffeineAction").constant("GET")
      .toD("caffeine-cache:cache-${routeId}?key=lastUpdate") (1)
      .choice()
        .when().simple("${header.CamelCaffeineActionHasResult}")
          .setProperty("lastUpdate", body())
        .otherwise()
          .setProperty("lastUpdate", simple("${date-with-timezone:now-120m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) (2)
      .end()
      .setHeader(Exchange.HTTP_METHOD).constant("GET")
      .toD("https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc") (3)
      .unmarshal().json()
      .setProperty("generated", simple("${body[metadata][generated]}")) (4)
      .setProperty("lastUpdate", simple("${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"))
      .claimCheck(ClaimCheckOperation.Push) (5)
      .setBody().exchangeProperty("lastUpdate")
      .setHeader("CamelCaffeineAction").constant("PUT")
      .toD("caffeine-cache:cache-${routeId}?key=lastUpdate")
      .claimCheck(ClaimCheckOperation.Pop)
      .split().jsonpath("$.features[*]") (6)
        .marshal().json()
        .to("log:info") (7)
      .end();

  }
}
1 We start each poll by checking if there has been a previous run (and get the corresponding time)
2 If it’s the first run of the integration, we set the clock back to 120m from the current time, to get events of the last 2 hours
3 We always include the time from which we want to receive updates in the query to the service
4 The service returns a "generated" field which includes a timestamp when the response has been generated: we’ll use it in the following requests
5 We put the current body in the claim check stack to use it for storing the "lastUpdate" field in the cache, then we restore the previous body
6 Individual records of the response are sent to the destination (which is "log:info" in this phase). In case an exception is thrown while processing a single entry, individual errors are sent to the route error handler and the processing continues
Don’t be scared from the complexity of the route, as this is a complicated example by choice: most of the Kamelets in the Kamelet Catalog don’t use any processing logic or EIP
When writing a route like this, you should always think to errors that might happen in various phases of the execution: here the "lastUpdate" value in the cache is updated after a successful invocation of the API but before the individual exchanges are sent to the destination, so that the source is protected by individual errors on the features (that are sent to the route error handler), but continues to process new data if a single feature can’t be processed.

This integration (which seems complex at first sight, but it should be still readable) solves the issues identified above by using multiple features available in Apache Camel (caches, "Simple" language, HTTP component, JSON data format, splitter EIP, claim check, JSONPath). Even if it’s not recommended to write overly-complicated integrations in a Kamelet (i.e. consider writing a plain component if it becomes too complicated and unreadable), you can see here how powerful is the Kamelet model.

We might have written the integration above in multiple routes connected using "direct:" endpoints, but a Kamelet contains a single route template and the mapping will be easier if the integration is composed of a single route (it’s also possible to define multiple supporting routes in a Kamelet, but we’re not going to show how to do it here)

Step 3: externalize parameters

The next step in the development is answering the following question: if I was a user instantiating this source, what aspects I would like to configure?

For the example above, there are 2 things that a user may want to configure:

  • period: the time interval between polls to the earthquake API. This may seem a technical issue, but it becomes a business issue when contacting APIs that do rate limiting

  • lookAhead: the number of minutes before the current time I would like to receive events since (it affects the events received when the source is first started or restarted)

Those two will become Kamelet parameters as you might expect, but for the time being, let’s refactor the integration to externalize them as standard Camel K properties:

Earthquake.java
// camel-k: language=java property=period=20000 property=lookAhead=120 (1)

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ClaimCheckOperation;
import org.apache.camel.Exchange;

public class Earthquake extends RouteBuilder {
  @Override
  public void configure() throws Exception {

    from("timer:earthquake?period={{period}}") (2)
      // ...
      .choice()
        .when().simple("${header.CamelCaffeineActionHasResult}")
          .setProperty("lastUpdate", body())
        .otherwise()
          .setProperty("lastUpdate", simple("${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) (3)
      .end()
      // ...
      .end();

  }
}
1 Modeline header defines the two parameters with a "development" value
2 Placeholder {{period}} is used
3 Placeholder {{lookAhead}} is used

This looks the same as before, but notice that the period and lookAhead parameters are set in the modeline, while the route uses the {{period}} and {{lookAhead}} placeholders instead of the actual values.

As before, this integration can be tested with camel run Earthquake.java (the modeline parameters will be automatically added by the kamel CLI).

Step 4 (optional): translate into YAML DSL

The integration is now ready to be turned into a Kamelet, but in case you’ve not written it directly in YAML DSL, you need to convert it before proceeding. The YAML DSL is the default DSL for Kamelets and the reason for that is that it provides multiple advantages over the other DSLs, the most important one being the ability to easily compile YAML integrations into Quarkus-based binary executables in the future, with all the advantages that derive from a point of view of performance and resource utilization.

If we managed to reduce our integration to contain only a Camel route, converting it to YAML is straightforward:

earthquake.yaml
# camel-k: language=yaml property=period=20000 property=lookAhead=120 dependency=camel-quarkus:caffeine dependency=camel-quarkus:http

- from:
    uri: "timer:earthquake"
    parameters:
      period: "{{period}}"
    steps:
    - setHeader:
        name: CamelCaffeineAction
        constant: GET
    - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate"
    - choice:
        when:
        - simple: "${header.CamelCaffeineActionHasResult}"
          steps:
          - set-property:
              name: lastUpdate
              simple: "${body}"
        otherwise:
          steps:
          - set-property:
              name: lastUpdate
              simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
    - setHeader:
        name: CamelHttpMethod
        constant: GET
    - toD: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc"
    - unmarshal:
        json: {}
    - set-property:
        name: generated
        simple: "${body[metadata][generated]}"
    - set-property:
        name: lastUpdate
        simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
    - claim-check:
        operation: Push
    - setBody:
        exchange-property: lastUpdate
    - setHeader:
        name: CamelCaffeineAction
        constant: PUT
    - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate"
    - claim-check:
        operation: Pop
    - split:
        jsonpath: "$.features[*]"
        steps:
          - marshal:
              json: {}
          - to: "log:info"

If you compare the YAML version of the route to the Java one, you see that they map 1-1.

The Camel Extension Pack for Visual Studio Code helps you writing the YAML route by providing auto-completion and error highlighting
Since the YAML DSL is quite new in the Camel ecosystem, it may miss some features available in the Java one, e.g. Camel K is not able to detect some dependencies automatically and we’ve specified them in the modeline header

This route can be run like the previous one using the kamel CLI:

camel run earthquake.yaml

Step 5: wrap it into a Kamelet

We’re about to write down an "Earthquake Source Kamelet" from the route we’ve built. As starting point, we may just wrap the previous YAML route into the Kamelet envelope. The result looks like:

earthquake-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: earthquake-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  template: (1)
    from:
      uri: "timer:earthquake"
      parameters:
        period: "{{period}}"
      steps:
      - setHeader:
          name: CamelCaffeineAction
          constant: GET
      - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate"
      - choice:
          when:
          - simple: "${header.CamelCaffeineActionHasResult}"
            steps:
            - set-property:
                name: lastUpdate
                simple: "${body}"
          otherwise:
            steps:
            - set-property:
                name: lastUpdate
                simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
      - setHeader:
          name: CamelHttpMethod
          constant: GET
      - toD: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc"
      - unmarshal:
          json: {}
      - set-property:
          name: generated
          simple: "${body[metadata][generated]}"
      - set-property:
          name: lastUpdate
          simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
      - claim-check:
          operation: Push
      - setBody:
          exchange-property: lastUpdate
      - setHeader:
          name: CamelCaffeineAction
          constant: PUT
      - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate"
      - claim-check:
          operation: Pop
      - split:
          jsonpath: "$.features[*]"
          steps:
            - marshal:
                json: {}
            - to: "kamelet:sink" (2)
1 Flow contains the (single) route template we have identified before
2 The old reference to "log:info" has been replaced with "kamelet:sink" here

The only difference between the YAML route embedded in the Kamelet and the one identified before is the final sink, which was "log:info" and now is "kamelet:sink", i.e. a placeholder that will be replaced with something else when the Kamelet is actually used (the user decides what is the destination of the earthquake events).

Step 6: describe the parameters

The Kamelet above is incomplete, we need to define the two parameters we’ve identified in the template and also give a description to the Kamelet itself. The way to express all this information is via a JSON Schema specification in the Kamelet YAML.

earthquake-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: earthquake-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition: (1)
    title: Earthquake Source
    description: |-
      Get data about current earthquake events happening in the world using the USGS API
    properties:
      period: (2)
        title: Period between polls
        description: The interval between fetches to the earthquake API in milliseconds
        type: integer
        default: 60000
      lookAhead: (3)
        title: Look-ahead minutes
        description: The amount of minutes to look ahead when starting the integration afresh
        type: integer
        default: 120
  template:
    from:
      uri: "timer:earthquake"
      # ...
1 The definition part starts with general information about the Kamelet
2 Definition of the period parameter (used with the {{period}} placeholder in the route)
3 Definition of the lookAhead parameter
In other scenarios, you might want to refer to non-required parameters in the Kamelet’s spec.template using the {{?optionalParam}} syntax; that might be helpful for those cases where the non-required parameter does not define a default value in the Kamelet’s spec.definition.properties. For more information, you can refer to the using Camel property placeholder syntax in the Camel Core project documentation.

Step 7: add metadata and sugar

We should complete the Kamelet with all mandatory (also optional) options that are described in the guidelines for contributing Kamelets.

The final result should look like:

earthquake-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: earthquake-source
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64..." # truncated (1)
    camel.apache.org/provider: "Apache Software Foundation"
  labels:
    camel.apache.org/kamelet.type: "source"
    camel.apache.org/requires.runtime: "camel-quarkus" (2)
spec:
  definition:
    title: Earthquake Source
    description: |-
      Get data about current earthquake events happening in the world using the USGS API
    properties:
      period:
        title: Period between polls
        description: The interval between fetches to the earthquake API in milliseconds
        type: integer
        default: 60000
      lookAhead:
        title: Look-ahead minutes
        description: The amount of minutes to look ahead when starting the integration afresh
        type: integer
        default: 120
  dataTypes: (3)
    out:
      default: json
      types:
        json:
          mediaType: application/json
  dependencies: (4)
    - camel-quarkus:caffeine
    - camel-quarkus:http
  template:
    from:
      uri: "timer:earthquake"
      parameters:
        period: "{{period}}"
      steps:
      - setHeader:
          name: CamelCaffeineAction
          constant: GET
      - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate"
      - choice:
          when:
          - simple: "${header.CamelCaffeineActionHasResult}"
            steps:
            - set-property:
                name: lastUpdate
                simple: "${body}"
          otherwise:
            steps:
            - set-property:
                name: lastUpdate
                simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
      - setHeader:
          name: CamelHttpMethod
          constant: GET
      - toD: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc"
      - unmarshal:
          json: {}
      - set-property:
          name: generated
          simple: "${body[metadata][generated]}"
      - set-property:
          name: lastUpdate
          simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
      - claim-check:
          operation: Push
      - setBody:
          exchange-property: lastUpdate
      - setHeader:
          name: CamelCaffeineAction
          constant: PUT
      - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate"
      - claim-check:
          operation: Pop
      - split:
          jsonpath: "$.features[*]"
          steps:
            - marshal:
                json: {}
            - to: "kamelet:sink"
1 Add an icon with an appropriate license, better using svg+base64 URL encoding. You can encode icons using services like this one
2 This marks the Kamelet as dependant on Quarkus since we’re specifying explicit dependencies on Quarkus artifacts in the specdependencies section
3 The types section indicates that the Kamelet is going to produce JSON data by default. The Kamelet is able to define multiple data types for in/out/error. The user will then be able to choose on of the data types in a Pipe when referencing the Kamelet.
4 Dependencies that we previously specified in the modeline options should be expressed now in the Kamelet spec

The Kamelet is now ready to be used!

Trying it out

You can install the Kamelet on your Kubernetes instance to see if it can be picked up and used by the Camel K runtime.

We assume that you’re connected to a Kubernetes cluster and working on a namespace where the Camel K operator is allowed to materialize integrations.

To create the Kamelet, you can execute:

kubectl apply -f earthquake-source.kamelet.yaml

If the Kamelet is valid, this will result in the Kamelet resource being created in the current namespace.

To check if it works, you can create a simple binding:

earthquake-source-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: earthquake-source-binding
spec:
  source:
    ref: (1)
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: earthquake-source
    properties:
      period: 10000 (2)
  sink:
    uri: "log:info" (3)
1 Kubernetes reference to the previously created Kamelet
2 We redefine the period to speed it up, otherwise the default is used (60000)
3 We just sink into "log:info", but we’re free to change it to anything else
The developer write Camel DSL to make a Kamelet work, but the end-user uses it declaratively without any idea of the complexity of the development process behind it

Creating this resource will tell the operator to materialize the binding using an integration:

kubectl apply -f earthquake-source-binding.yaml

We can check the logs of the integration using:

kamel logs earthquake-source-binding

If everything went well, you should see the events in the log.

Refer to the Kamelets User Guide for more information on how to use it in different contexts (like Knative, Kafka, etc.).

Creating a sink Kamelet

So far we’ve focused on the steps needed to create Kamelets of type "source", but the same steps can be used for type "sink" Kamelets with some minor changes.

We’re now going to create a "sink" Kamelet and look at the differences. For this part, we’ll write a Telegram sink Kamelet.

Analyze the use cases

Differently from sources, where you usually generate a single type of data, or even multiple ones depending on some static user parameter, a sink should always take into account that it can be fed dynamically with different type of data.

For example, in the case of a Telegram sink, a user may want to send both textual data, but also images with (or without) a caption.

In order to implement sending different kinds of data, the Kamelet should adapt according to the content that is received as input.

We’ll start by writing an end-to-end integration, then we’ll convert it into a Kamelet. This time, we’ll write routes directly in YAML DSL.

For this particular use case, I’ve created a simple integration before to get the Chat ID corresponding to my phone from the bot: more info here.

Let’s start with a simple integration:

telegram.yaml
# camel-k: language=yaml property=chatId=158584902 (1)

- from: (2)
    uri: "direct:endpoint"
    steps:
      - to:
          uri: "telegram:bots"
          parameters:
            authorizationToken: "{{authorizationToken}}"
            chatId: "{{chatId}}"
      - marshal: (3)
          json: {}

- from: (4)
    uri: timer:tick
    parameters:
      period: 5000
    steps:
    - setBody:
        constant: Hello
    - to: direct:endpoint
1 Setting the chatId property directly in modeline, the authorizationToken will be passed from command line
2 The route that will become the Kamelet route template
3 We marhsal the output as JSON because it may be required to be transferred over the wire
4 A testing route to check if the integration works

The end-to-end integration above should be good as initial scaffolding for the integration. We can run it using the following command:

kamel run telegram.yaml -p authorizationToken=the-token-you-got-from-bot-father

If everything went well, you should get a "Hello" message into your phone every 5 seconds.

Now, let’s check if we can also send an image, by changing the second route:

# first route as before
# ...

- from:
    uri: timer:tick
    parameters:
      period: 5000
    steps:
    - setHeader:
        name: CamelHttpMethod
        constant: GET
    - to: https://github.com/apache/camel/raw/7204aa132662ab6cb8e3c5afea8b9b0859eff0e8/docs/img/logo.png
    - to: direct:endpoint

The intended behavior is that we get the image in our phone via Telegram, but it’s throwing an error instead. This is something that often happens because standard Camel components are not suited to be used out-of-the-box as connectors.

In this case, the Telegram component requires that a CamelTelegramMediaType header is set to PHOTO_PNG in the exchange in order to accept the image, and that the body is converted to byte[]. But we cannot require that who sends the message to the Kamelet obey to all Camel rules. In general we should follow these guidelines:

  • We SHOULD NOT require that the sender sets Camel-specific bits in the message over the wire (e.g. a CamelTelegramMediaType): we should hide Camel under the covers as much as possible

  • We CAN use the "Content-Type" header to distinguish the type of incoming data

  • We CAN define new headers and allow the users to set them on the incoming message (e.g. when the incoming message is a picture, we can let the sender specify a caption for it in the "text" header)

  • When defining an header, it MUST be documented in the Kamelet definition

  • When defining an header, say "text", we should also account for an additional header named "ce-text": in some contexts, like Knative, only headers allowed by the CloudEvents specification are accepted in the brokers/channels (i.e. a ce- prefix is mandatory)

When applied to the current use case, the main route can be changed into something like this:

- from:
    uri: "direct:endpoint"
    steps:
    - choice: (1)
        when:
        - simple: "${header[Content-Type]} == 'image/png'"
          steps:
          - convert-body-to:
              type: "byte[]"
          - setHeader:
              name: CamelTelegramMediaType
              constant: PHOTO_PNG
        - simple: "${header[Content-Type]} == 'image/jpeg'"
          steps:
          - convert-body-to:
              type: "byte[]"
          - setHeader:
              name: CamelTelegramMediaType
              constant: PHOTO_JPG
        otherwise:
          steps:
          - convert-body-to:
              type: "java.lang.String"
    - choice: (2)
        when:
        - simple: "${header[text]}"
          steps:
          - setHeader:
              name: CamelTelegramMediaTitleCaption
              simple: "${header[text]}"
        - simple: "${header[ce-text]}"
          steps:
          - setHeader:
              name: CamelTelegramMediaTitleCaption
              simple: "${header[ce-text]}"
    - choice: (3)
        when:
        - simple: "${header[chat-id]}"
          steps:
          - setHeader:
              name: CamelTelegramChatId
              simple: "${header[chat-id]}"
        - simple: "${header[ce-chat-id]}"
          steps:
          - setHeader:
              name: CamelTelegramChatId
              simple: "${header[ce-chat-id]}"
    - to:
        uri: "telegram:bots"
        parameters:
          authorizationToken: "{{authorizationToken}}"
          chatId: "{{chatId}}"
    - marshal:
        json: {}
1 We do content-type based conversion into appropriate objects for the component
2 We allow specifying a text or ce-text header to set the image caption
3 We allow overriding the chat ID using a chat-id or ce-chat-id header
It’s not always obvious if it’s responsibility of the Kamelet to prepare the exchange to be fed into the Camel producer endpoint or if the Camel component should be changed to be more elastic. In this case, it seems appropriate to implement things like content-type base conversion and support for streaming content at component level. The Kamelet above is acceptable for the time being, but it needs to be simplified if such changes land into the component.

Having defined the main route template, we need to document the Kamelet and the parameters. We show here the final Kamelet:

apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: telegram-sink
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,..." # truncated
    camel.apache.org/provider: "Apache Software Foundation"
  labels:
    camel.apache.org/kamelet.type: "sink"
    camel.apache.org/kamelet.group: "Telegram"
spec:
  definition: (1)
    title: "Telegram Sink"
    description: |-
      Send a message to a Telegram chat using your Telegram bot as sender.

      To create a bot, contact the @botfather account using the Telegram app.

      This sink supports the following message types:

      - Standard text messages
      - PNG images (`Content-Type` must be set to `image/png`)
      - JPEG images (`Content-Type` must be set to `image/jpeg`)

      This following message headers are also supported:

      - `text` / `ce-text`: when sending an image, the image caption
      - `chat-id` / `ce-chat-id`: to override the default chat where messages are sent to
    required:
      - authorizationToken
    properties:
      authorizationToken:
        title: Token
        description: The token to access your bot on Telegram. You you can obtain it from the Telegram @botfather.
        type: string
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
      chatId:
        title: Chat ID
        description: The Chat ID where messages should be sent by default
        type: string
  dataTypes: (2)
    out:
      default: json
      types:
        json:
          mediaType: application/json
  template: (3)
    from:
      uri: "kamelet:source"
      steps:
      - choice:
          when:
          - simple: "${header[Content-Type]} == 'image/png'"
            steps:
            - log: h1
            - convert-body-to:
                type: "byte[]"
            - setHeader:
                name: CamelTelegramMediaType
                constant: PHOTO_PNG
          - simple: "${header[Content-Type]} == 'image/jpeg'"
            steps:
            - convert-body-to:
                type: "byte[]"
            - setHeader:
                name: CamelTelegramMediaType
                constant: PHOTO_JPG
          otherwise:
            steps:
            - convert-body-to:
                type: "java.lang.String"
      - choice:
          when:
          - simple: "${header[text]}"
            steps:
            - setHeader:
                name: CamelTelegramMediaTitleCaption
                simple: "${header[text]}"
          - simple: "${header[ce-text]}"
            steps:
            - setHeader:
                name: CamelTelegramMediaTitleCaption
                simple: "${header[ce-text]}"
      - choice:
          when:
          - simple: "${header[chat-id]}"
            steps:
            - setHeader:
                name: CamelTelegramChatId
                simple: "${header[chat-id]}"
          - simple: "${header[ce-chat-id]}"
            steps:
            - setHeader:
                name: CamelTelegramChatId
                simple: "${header[ce-chat-id]}"
      - to:
          uri: "telegram:bots"
          parameters:
            authorizationToken: "{{authorizationToken}}"
            chatId: "{{chatId}}"
      - marshal:
          json: {}
1 JSON schema definition of the Kamelet configuration
2 The Kamelet has a single possible output of type JSON
3 The flow identified above as Kamelet route template

Try it out

To try a sink Kamelet, we should feed it with some data. The best way to do it is to do it directly with another Kamelet.

So, for example, to send a text message to a chat, we may create a binding like the following:

telegram-text-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-text-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: timer-source
    properties:
      period: 10000
      message: Hello first Kamelet!
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-sink
    properties:
      authorizationToken: "put-your-own"
      chatId: "your-chat-id"

You can create the Kamelet with:

kubectl apply -f telegram-sink.kamelet.yaml

Then apply the binding with:

kubectl apply -f telegram-text-binding.yaml

If everything goes well, you should get a "Hello first Kamelet!" message in your phone every 10 seconds.

To check if we can also receive pictures using the above Kamelet, we can create the following binding:

telegram-text-binding.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: telegram-image-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: http-source
    properties:
      url: "https://github.com/apache/camel/raw/7204aa132662ab6cb8e3c5afea8b9b0859eff0e8/docs/img/logo.png"
      contentType: "image/png"
      period: 10000
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-sink
    properties:
      authorizationToken: "put-your-own"
      chatId: "your-chat-id"

This will create a new integration that forwards the Apache Camel logo to your phone every 10 seconds.

Testing

The most obvious way to test a Kamelet is via an e2e tests that verifies if the Kamelet respects its specification.

YAKS is the framework of choice for such e2e tests. You can find more information and documentation starting from the YAKS GitHub repository. Here we’ll provide examples for the Kamelets above.

Testing a source

YAKS allows writing a declarative Gherkin file to specify the behavior of the Kamelet.

Let’s try to test the earthquake Kamelet above, a Gherkin file for it should look like:

earthquake-source.feature
Feature: Kamelet earthquake-source works

  Background:
    Given Disable auto removal of Kamelet resources
    Given Disable auto removal of Kubernetes resources
    Given Camel K resource polling configuration
      | maxAttempts          | 60   |
      | delayBetweenAttempts | 3000 |

  Scenario: Bind Kamelet to service
    Given create Kubernetes service test-service with target port 8080
    And bind Kamelet earthquake-source to uri http://test-service.${YAKS_NAMESPACE}.svc.cluster.local/test
    When create Pipe earthquake-source-uri
    Then Pipe earthquake-source-uri should be available
    And Camel K integration earthquake-source-uri should be running

  Scenario: Verify binding
    Given HTTP server "test-service"
    And HTTP server timeout is 120000 ms
    Then expect HTTP request header: Content-Type="application/json;charset=UTF-8"
    And receive POST /test
    And delete Pipe earthquake-source-uri

As you see this is a declarative test that is materialized into something that actually checks that the service generates some data. Checks can be also more detailed than this one, but checking that it generates some JSON data is enough for a "smoke test" that verifies that the Kamelet can be actually used.

The test requires that you’re connected to a Kubernetes cluster and have also YAKS installed (refer to the YAKS documentation for more information). We’re also going to use the CLI:

# We assume the Kamelet is already installed in the namespace
yaks run earthquake-source.feature

When testing a source, the backbone of the Gherking file that you’ll write is similar to the one above. Depending on the source under test, you may need to stimulate the production of some data using additional Gherking steps before verifying that the data has been produced (in our case, it’s better not to try to stimulate an earthquake :D).

Testing a sink

A test for a sink is similar to the one for the source, except that we’re going to generate data to feed it.

To send data to the Kamelet we may think to bind it to another Kamelet of type webhook-source, that allows us to send data to it via HTTP. Let’s create a parameterized binding like the following one:

webhook-to-telegram.yaml
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: webhook-to-telegram
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: webhook-source
    properties:
      subpath: message
  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: telegram-sink
    properties:
      authorizationToken: "${telegram.authorization.token}"
      chatId: "${telegram.chat.id}"

This will expose an HTTP endpoint that we can use to forward a message to Telegram. It requires that two parameters are set in the YAKS configuration before creation. Those can be set in a simple property file:

telegram-credentials.properties
telegram.authorization.token=your-own-token
telegram.chat.id=your-own-chat

Then we’re ready to define the feature we want to test, i.e. the ability to send a message via the Telegram API.

An example of "smoke test" can be the following one:

telegram-sink.feature
Feature: Kamelet telegram-sink works

  Background:
    Given Disable auto removal of Kamelet resources
    Given Disable auto removal of Kubernetes resources
    Given Camel K resource polling configuration
      | maxAttempts          | 60   |
      | delayBetweenAttempts | 3000 |


  Scenario: Bind webhook to Kamelet sink
    Given load variables telegram-credentials.properties
    And load Pipe webhook-to-telegram.yaml
    Then Pipe webhook-to-telegram should be available
    And Camel K integration webhook-to-telegram should be running


  Scenario: Send a message to the Telegram Chat
    Given URL: http://webhook-to-telegram.${YAKS_NAMESPACE}.svc.cluster.local
    And HTTP request timeout is 60000 milliseconds
    And wait for GET on path / to return 404
    Given HTTP request headers
     | Content-Type          | text/plain |
    And HTTP request body
    """
    Hello from YAKS!
    """
    When send POST /message
    Then receive HTTP 200 OK
    And delete Pipe webhook-to-telegram

This test will only check that the Telegram API accept the message created by the test.

This can be run with the following command:

# We assume that both the webhook-source and the telegram-sink kamelet are already present in the namespace
yaks run telegram-sink.feature --resource webhook-to-telegram.yaml --resource telegram-credentials.properties

If everything goes well, you should receive a message during the test execution.

For a more specific test that checks also the content sent to Telegram, you should add additional Gherking steps to get and verify the actual message via other Telegram APIs. We’re not going in so much details for this example, but the Gherkin file highlighted above is a good approximation of the backbone you’ll find in tests for Kamelets of type "sink".

KEDA Integration

Kamelets of type source can be augmented with KEDA metadata to automatically configure autoscalers.

The additional KEDA metadata is needed for the following purposes:

  • Map Kamelet properties to corresponding KEDA parameters

  • Distinguish which KEDA parameters are needed for authentication (and need to be placed in a Secret)

  • Mark KEDA parameters as required to signal an error during reconciliation

Basic properties to KEDA parameter mapping

Any Kamelet property can be mapped to a KEDA parameter by simply declaring the mapping in the x-descriptors list. For example:

aws-sqs-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: aws-sqs-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
    # ...
    properties:
      queueNameOrArn:
        title: Queue Name
        description: The SQS Queue Name or ARN
        type: string
        x-descriptors:
        - urn:keda:metadata:queueURL (1)
        - urn:keda:required (2)
# ...
1 The Kamelet property queueNameOrArn corresponds to a KEDA metadata parameter named queueURL
2 The queueURL parameter is required by KEDA

In the example above, the queueNameOrArn Kamelet property is declared to correspond to a KEDA metadata parameter named queueURL, using the urn:keda:metadata: prefix. The queueURL parameter is documented in the the KEDA AWS SQS Queue scaler together with other options required by KEDA to configure an autoscaler (it can be a full queue URL or a simple queue name). By using the marker descriptor urn:keda:required, it is also marked as required by KEDA.

The queueURL is a metadata parameter for the autoscaler. In order to configure authentication parameters, the syntax is slightly different:

aws-sqs-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: aws-sqs-source
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
    # ...
    properties:
      # ...
      accessKey:
        title: Access Key
        description: The access key obtained from AWS
        type: string
        format: password
        x-descriptors:
        - urn:alm:descriptor:com.tectonic.ui:password
        - urn:camel:group:credentials
        - urn:keda:authentication:awsAccessKeyID (1)
        - urn:keda:required
# ...
1 The Kamelet property access corresponds to a KEDA authentication parameter named awsAccessKeyID

This time the property mapping uses the urn:keda:authentication: prefix, declaring it as a KEDA authentication parameter. The difference between the two approaches is that authentication parameters will be injected into a secret by the Camel K operator and linked to the KEDA ScaledObject using a TriggerAuthentication (refer to the KEDA documentation for more info).

Advanced KEDA property mapping

There are cases where KEDA requires some static values to be set in a ScaledObject or also values computed from multiple Kamelet properties. To deal with these cases it’s possible to use annotations on the Kamelet prefixed with camel.apache.org/keda.metadata. (for metadata parameters) or camel.apache.org/keda.authentication. (for authentication parameters). Those annotations can contain plain fixed values or also templates (using the Go syntax).

For example:

my-source.kamelet.yaml
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
  name: my-source
  labels:
    camel.apache.org/kamelet.type: "source"
  annotations:
    camel.apache.org/keda.authentication.sasl: "plaintext" (1)
    camel.apache.org/keda.metadata.queueLength: "5" (2)
    camel.apache.org/keda.metadata.queueAddress: "https://myhost.com/queues/{{.queueName}}" (3)
spec:
  definition:
    # ...
    properties:
      queueName:
        title: Queue Name
        description: The Queue Name
        type: string
# ...
1 An authentication parameter with a fixed value
2 A metadata parameter with a fixed value
3 A metadata parameter with a valued computed from a template

When using the template syntax, all Kamelet properties are available as fields. The default values are used in case they are missing from the user configuration.

For information on how to use Kamelets with KEDA, see the KEDA section in the user guide.