12 Mar 2024

KSML v0.8: new features for Kafka Streams in Low Code environments

KSML is a wrapper language for Kafka Streams. It allows for easy specification and running of Kafka Streams applications, without requiring Java programming. It was first released in 2021 and is available as open source under the Apache License v2 on Github. Recently version 0.8.0 was released, which brings a number of interesting improvements. In […]

KSML is a wrapper language for Kafka Streams. It allows for easy specification and running of Kafka Streams applications, without requiring Java programming. It was first released in 2021 and is available as open source under the Apache License v2 on Github.

Recently version 0.8.0 was released, which brings a number of interesting improvements. In this article we will do a quick introduction of KSML and then zoom in on the features in the new release.

Introduction to KSML for Kafka Streams

Kafka Streams provides an easy way for Java developers to write streaming applications. But as powerful as the framework is, the requirement for writing Java code always remained. Even simple operations, like add or removing a field to a Kafka message, require developers to set up new Java projects, create a lot of boilerplate code, set up and maintain build pipelines and manage the application in production.

KSML allows developers to skip most of this work by expressing the desired functionality in YAML. KSML is not intended to be a full replacement of complex Kafka Streams code, or to compete directly with other stream processing frameworks like Flink. It is meant to ease the life of development teams for use cases where simplicity and quick development outweigh the overkill of heavier and more feature-complete frameworks.

One of the main advantages of KSML is that it is fully declarative. This means that common developer responsibilities – like opening and closing connections to Kafka – are handled by the framework. All developers need to worry about is how to transform input messages to output messages. To illustrate this, let’s look at a few common use cases.

Example producer

Before we can process data, we need to have data to process. KSML allows for the specification of data producers, which generate data on-demand and produce it to a specified output topic. Let’s set up one that allows us to process these messages further below.

# This example shows how to generate data and have it sent to a target topic in AVRO format.

functions:

  generate_sensordata_message:

    type: generator

    globalCode: |

      import time

      import random

      sensorCounter = 0

    code: |

      global sensorCounter

      key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")

      sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

      # Generate some random sensor measurement data

      types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },

                1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },

                2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },

                3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },

                4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }

              }

      # Build the result value using any of the above measurement types

      value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }

      value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])

      value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])

      value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])

    expression: (key, value)                      # Return a message tuple with the key and value

    resultType: (string, json)                    # Indicate the type of key and value

producers:

  sensordata_avro_producer:

    generator: generate_sensordata_message

    interval: 444

    to:

      topic: ksml_sensordata_avro

      keyType: string

      valueType: avro:SensorData

This definition generates messages for Kafka topic ksml_sensordata_avro, with keyType string and valueType avro:SensorData. It does so by calling the function generate_sensordata_message every 444 milliseconds. The function illustrates how to use global variables inside Python functions. In this case, a global variable is used inside the generator function to auto-increment the sensor number we generate data for. The sensor name is returned as the key of the Kafka message. The value is composed by combining a number of random fields together into an emulated sensor reading.

When run, the output of the generator looks like:

2024-03-09T11:06:31,273Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:31,274Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor0, value=SensorData: {"city":"Utrecht", "color":"yellow", "name":"sensor0", "owner":"Dave", "timestamp":1709982391273, "type":"LENGTH", "unit":"ft", "value":"821"}

2024-03-09T11:06:31,292Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 1, offset 1646423

2024-03-09T11:06:31,711Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:31,712Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor1, value=SensorData: {"city":"Xanten", "color":"black", "name":"sensor1", "owner":"Dave", "timestamp":1709982391711, "type":"HUMIDITY", "unit":"g/m3", "value":"74"}

2024-03-09T11:06:31,728Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 1, offset 1646424

2024-03-09T11:06:32,162Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:32,162Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor2, value=SensorData: {"city":"Utrecht", "color":"black", "name":"sensor2", "owner":"Bob", "timestamp":1709982392162, "type":"HUMIDITY", "unit":"%", "value":"31"}

2024-03-09T11:06:32,182Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515151

2024-03-09T11:06:32,606Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:32,607Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor3, value=SensorData: {"city":"Utrecht", "color":"red", "name":"sensor3", "owner":"Evan", "timestamp":1709982392607, "type":"LENGTH", "unit":"m", "value":"802"}

2024-03-09T11:06:32,623Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515152

2024-03-09T11:06:33,045Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:33,046Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor4, value=SensorData: {"city":"Xanten", "color":"blue", "name":"sensor4", "owner":"Charlie", "timestamp":1709982393045, "type":"TEMPERATURE", "unit":"C", "value":"-31"}

2024-03-09T11:06:33,067Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515153

2024-03-09T11:06:33,493Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:33,494Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor5, value=SensorData: {"city":"Utrecht", "color":"white", "name":"sensor5", "owner":"Charlie", "timestamp":1709982393494, "type":"LENGTH", "unit":"ft", "value":"548"}

2024-03-09T11:06:33,510Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515154

2024-03-09T11:06:33,936Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:33,937Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor6, value=SensorData: {"city":"Amsterdam", "color":"blue", "name":"sensor6", "owner":"Alice", "timestamp":1709982393937, "type":"LENGTH", "unit":"ft", "value":"365"}

2024-03-09T11:06:33,953Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 1, offset 1646425

Example processing

The KSML repository contains numerous examples of how to process messages with Kafka Streams. Let’s look at two examples.

Filtering messages

If we want to filter on certain messages, for instance on blue sensors, then a typical KSML definition would look like this:

# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard

# other messages after logging.

streams:

  sensor_source:

    topic: ksml_sensordata_avro

    keyType: string

    valueType: avro:SensorData

  sensor_filtered:

    topic: ksml_sensordata_filtered

    keyType: string

    valueType: avro:SensorData

functions:

  sensor_is_blue:

    type: predicate

    code: |

      if value == None:

        log.warn("No value in message with key={}", key)

        return False

      if value["color"] != "blue":

        log.warn("Unknown color: {}", value["color"])

        return False

    expression: True

pipelines:

  filter_pipeline:

    from: sensor_source

    via:

      - type: filter

        if: sensor_is_blue

      - type: peek

        forEach:

          code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)

    to: sensor_filtered

This script looks at every Kafka message, determines if there is a value in the message (in Python: value != None) and returns True or False depending on whether the color field in the message is set to blue.

Converting messages

Another common use case is to receive messages in one format and to output them in another. This is done like below:

# This example shows how to read from an AVRO topic and write to an XML topic

pipelines:

  # This pipeline copies data literally, but converts the data type due to the different

  # definitions of the input and output topic valueType.

  avro_to_xml:

    from:

      topic: ksml_sensordata_avro

      keyType: string

      valueType: avro:SensorData

    to:

      topic: ksml_sensordata_xml

      keyType: string

      valueType: xml:SensorData

This example shows the power of the internal data representation KSML uses, as it allows for easy data mapping between different commonly used Kafka message formats.

Other examples

More examples on data stream processing can be found in the KSML repository on Github, including:

What’s new in KSML 0.8

The 0.8 release is a big update, coming from 0.2.x. It brings lots of feature and stability improvements. It also marks a point in time, where most of the language and framework are considered stable.

Let’s look at some of the most interesting feature additions.

Syntax validation and completion

Internal parsing logic was completely overhauled and is now able to generate its own JSON Schema for the KSML language. The allows developers to load a ksml.json file into their IDEs and perform syntax validation and code completion.

For example, in IntelliJ this can be done by adding the JSON Schema to the JSON Schema Mappings, which you can find at bottom right in the main IDE window.

Once the definition is added and applied to the KSML definition files, IntelliJ will perform syntax checking and provide suggestions for completing your definition.

As a second benefit, the JSON Schema of the KSML definition can also be converted into Markdown format, giving users an always-up-to-date and complete language specification. See here for the online version of the spec.

Better data types, schema handling and notation support

The code for data types, schema handling and (de)serialization was worked on heavily. It now separates the data types from external representations (called notations) like AVRO, CSV and XML. This makes it easier to add other notations (eg. Protobuf) in the future.

Another useful addition was the automatic conversion between types. This is useful when a function returns a json but the output topic is written to in AVRO format. Or when you return a string, which should be interpreted as CSV, JSON or XML. Or when one schema contains a field of type string, but another schema expects it as type long. In nearly all practical situations where KSML is able to convert type A into type B, it will do so automatically.

Improved support for state stores

KSML definitions are now able to specify their own state stores, which in turn can be referenced in Python functions. There is better handling of such scenarios, through:

An example of the improved state store handling is shown below.

streams:

  sensor_source_avro:

    topic: ksml_sensordata_avro

    keyType: string

    valueType: avro:SensorData

stores:

  last_sensor_data_store:

    type: keyValue

    keyType: string

    valueType: json

    persistent: false

    historyRetention: 1h

    caching: false

    logging: false

functions:

  process_message:

    type: forEach

    code: |

      last_value = last_sensor_data_store.get(key)

      if last_value != None:

        log.info("Found last value: {} = {}", key, last_value)

      last_sensor_data_store.put(key, value)

      if value != None:

        log.info("Stored new value: {} = {}", key, value)

    stores:

      - last_sensor_data_store

pipelines:

  process_message:

    from: sensor_source_avro

    forEach: process_message

This example reads data from the ksml_sensordata_avro topic and sends every message to the process_message function. This function uses the defined last_sensor_data_store, which is an in-memory key/value state (persistent = false). The function starts with looking up the Kafka message key in the state store. If an entry is found, it will log that fact. Then it stores the new message value in the state store.

Of course, real-life examples would do more with previous data found in the state store. But this example shows how one can use state stores to build stateful message processing.

Logging support in Python functions

Where previous KSML versions relied on print() statements, KSML 0.8 lets users use the log variable to output messages to the application log. This log variable is backed by a Java Logger and can be used in the same way. It supports the following operations:

error(message: str, value_params...) --> sends and error message to the log

warn(message: str, value_params...) --> sends and warning message to the log

info(message: str, value_params...) --> sends and informational message to the log

debug(message: str, value_params...) --> sends and debug message to the log

trace(message: str, value_params...) --> sends and trace message to the log

The message contains double curly brackets {}, which will be substituted by the value parameters. Examples are:

log.error("Something went completely bad here!")

log.info("Received message from topic: key={}, value={}", key, value)

log.debug("I'm printing five variables here: {}, {}, {}, {}, {}. Lovely isn't it?", 1, 2, 3, "text", {"json":"is cool"})

Output of the above statements looks like:

[LOG TIMESTAMP] ERROR function.name   Something went completely bad here!

[LOG TIMESTAMP] INFO  function.name   Received message from topic: key=123, value={"key":"value"}

[LOG TIMESTAMP] DEBUG function.name   I'm printing five variables here: 1, 2, 3, text, {"json":"is cool"}. Lovely isn't it?

Lots of small language updates

Miscellaneous updates

Conclusion and Roadmap

While KSML 0.8 is a terrific upgrade coming from the 0.2.x series, the development team already has a series of features they will be working on towards 0.9. Amongst these are:

Use the following links to get to KSML resources:

Other blogs

Product 3 weeks ago

Release blog 2024.1 - the Spring release

Explore Axual's Spring 2024.1 release, featuring a unified platform architecture that merges Axual Platform and Governance, simplifying installations and upgrades. New features include a Cloud health monitor for enhanced transparency and numerous improvements across security, governance, and support to ensure a seamless user experience.

Jeroen van Disseldorp
4 weeks ago

Axual achieves Red Hat OpenShift certification: empowering enterprises with certified event streaming

Axual announces certification for its event streaming solution on Red Hat OpenShift, the industry’s leading enterprise Kubernetes platform. 

Jeroen van Disseldorp
1 month ago

The Future of Energy: Leveraging SCADA Systems, Smart Grids, and Apache Kafka

We're at the start of an energy revolution, where managing data is key. With SCADA systems overwhelmed by data, Apache Kafka offers a modern solution for a smart, prosumer-driven grid.

Jeroen van Disseldorp

Apache Kafka is great, but what do you do
when great is not good enough?
See what Axual offers on top of Kafka.

Start your free trial
No credit card required