March 12, 2024

KSML v0.8: new features for Kafka Streams in Low Code environments

KSML is a wrapper language for Kafka Streams. It allows for easy specification and running of Kafka Streams applications, without requiring Java programming. It was first released in 2021 and is available as open source under the Apache License v2 on Github. Recently version 0.8.0 was released, which brings a number of interesting improvements. In […]

On this page

KSML is a wrapper language for Kafka Streams. It allows for easy specification and running of Kafka Streams applications, without requiring Java programming. It was first released in 2021 and is available as open source under the Apache License v2 on Github.

Recently version 0.8.0 was released, which brings a number of interesting improvements. In this article we will do a quick introduction of KSML and then zoom in on the features in the new release.

Introduction to KSML for Kafka Streams

Kafka Streams provides an easy way for Java developers to write streaming applications. But as powerful as the framework is, the requirement for writing Java code always remained. Even simple operations, like add or removing a field to a Kafka message, require developers to set up new Java projects, create a lot of boilerplate code, set up and maintain build pipelines and manage the application in production.

KSML allows developers to skip most of this work by expressing the desired functionality in YAML. KSML is not intended to be a full replacement of complex Kafka Streams code, or to compete directly with other stream processing frameworks like Flink. It is meant to ease the life of development teams for use cases where simplicity and quick development outweigh the overkill of heavier and more feature-complete frameworks.

One of the main advantages of KSML is that it is fully declarative. This means that common developer responsibilities – like opening and closing connections to Kafka – are handled by the framework. All developers need to worry about is how to transform input messages to output messages. To illustrate this, let’s look at a few common use cases.

Example producer

Before we can process data, we need to have data to process. KSML allows for the specification of data producers, which generate data on-demand and produce it to a specified output topic. Let’s set up one that allows us to process these messages further below.

# This example shows how to generate data and have it sent to a target topic in AVRO format.

functions:

 generate_sensordata_message:

   type: generator

   globalCode: |

     import time

     import random

     sensorCounter = 0

   code: |

     global sensorCounter

     key = "sensor"+str(sensorCounter)           # Set the key to return ("sensor0" to "sensor9")

     sensorCounter = (sensorCounter+1) % 10      # Increase the counter for next iteration

     # Generate some random sensor measurement data

     types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },

               1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },

               2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },

               3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },

               4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }

             }

     # Build the result value using any of the above measurement types

     value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }

     value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])

     value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])

     value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])

   expression: (key, value)                      # Return a message tuple with the key and value

   resultType: (string, json)                    # Indicate the type of key and value

producers:

 sensordata_avro_producer:

   generator: generate_sensordata_message

   interval: 444

   to:

     topic: ksml_sensordata_avro

     keyType: string

     valueType: avro:SensorData

This definition generates messages for Kafka topic ksml_sensordata_avro, with keyType string and valueType avro:SensorData. It does so by calling the function generate_sensordata_message every 444 milliseconds. The function illustrates how to use global variables inside Python functions. In this case, a global variable is used inside the generator function to auto-increment the sensor number we generate data for. The sensor name is returned as the key of the Kafka message. The value is composed by combining a number of random fields together into an emulated sensor reading.

When run, the output of the generator looks like:

2024-03-09T11:06:31,273Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:31,274Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor0, value=SensorData: {"city":"Utrecht", "color":"yellow", "name":"sensor0", "owner":"Dave", "timestamp":1709982391273, "type":"LENGTH", "unit":"ft", "value":"821"}

2024-03-09T11:06:31,292Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 1, offset 1646423

2024-03-09T11:06:31,711Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:31,712Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor1, value=SensorData: {"city":"Xanten", "color":"black", "name":"sensor1", "owner":"Dave", "timestamp":1709982391711, "type":"HUMIDITY", "unit":"g/m3", "value":"74"}

2024-03-09T11:06:31,728Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 1, offset 1646424

2024-03-09T11:06:32,162Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:32,162Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor2, value=SensorData: {"city":"Utrecht", "color":"black", "name":"sensor2", "owner":"Bob", "timestamp":1709982392162, "type":"HUMIDITY", "unit":"%", "value":"31"}

2024-03-09T11:06:32,182Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515151

2024-03-09T11:06:32,606Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:32,607Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor3, value=SensorData: {"city":"Utrecht", "color":"red", "name":"sensor3", "owner":"Evan", "timestamp":1709982392607, "type":"LENGTH", "unit":"m", "value":"802"}

2024-03-09T11:06:32,623Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515152

2024-03-09T11:06:33,045Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:33,046Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor4, value=SensorData: {"city":"Xanten", "color":"blue", "name":"sensor4", "owner":"Charlie", "timestamp":1709982393045, "type":"TEMPERATURE", "unit":"C", "value":"-31"}

2024-03-09T11:06:33,067Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515153

2024-03-09T11:06:33,493Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:33,494Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor5, value=SensorData: {"city":"Utrecht", "color":"white", "name":"sensor5", "owner":"Charlie", "timestamp":1709982393494, "type":"LENGTH", "unit":"ft", "value":"548"}

2024-03-09T11:06:33,510Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 0, offset 1515154

2024-03-09T11:06:33,936Z INFO  i.a.k.r.backend.KafkaProducerRunner  Calling generate_sensordata_message

2024-03-09T11:06:33,937Z INFO  i.a.k.r.backend.ExecutableProducer   Message: key=sensor6, value=SensorData: {"city":"Amsterdam", "color":"blue", "name":"sensor6", "owner":"Alice", "timestamp":1709982393937, "type":"LENGTH", "unit":"ft", "value":"365"}

2024-03-09T11:06:33,953Z INFO  i.a.k.r.backend.ExecutableProducer   Produced message to ksml_sensordata_avro, partition 1, offset 1646425

Example processing

The KSML repository contains numerous examples of how to process messages with Kafka Streams. Let’s look at two examples.

Filtering messages

If we want to filter on certain messages, for instance on blue sensors, then a typical KSML definition would look like this:

# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard

# other messages after logging.

streams:

 sensor_source:

   topic: ksml_sensordata_avro

   keyType: string

   valueType: avro:SensorData

 sensor_filtered:

   topic: ksml_sensordata_filtered

   keyType: string

   valueType: avro:SensorData

functions:

 sensor_is_blue:

   type: predicate

   code: |

     if value == None:

       log.warn("No value in message with key={}", key)

       return False

     if value["color"] != "blue":

       log.warn("Unknown color: {}", value["color"])

       return False

   expression: True

pipelines:

 filter_pipeline:

   from: sensor_source

   via:

     - type: filter

       if: sensor_is_blue

     - type: peek

       forEach:

         code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)

   to: sensor_filtered

This script looks at every Kafka message, determines if there is a value in the message (in Python: value != None) and returns True or False depending on whether the color field in the message is set to blue.

Converting messages

Another common use case is to receive messages in one format and to output them in another. This is done like below:

# This example shows how to read from an AVRO topic and write to an XML topic

pipelines:

 # This pipeline copies data literally, but converts the data type due to the different

 # definitions of the input and output topic valueType.

 avro_to_xml:

   from:

     topic: ksml_sensordata_avro

     keyType: string

     valueType: avro:SensorData

   to:

     topic: ksml_sensordata_xml

     keyType: string

     valueType: xml:SensorData

This example shows the power of the internal data representation KSML uses, as it allows for easy data mapping between different commonly used Kafka message formats.

Other examples

More examples on data stream processing can be found in the KSML repository on Github, including:

  • branching topologies ;
  • routing messages to specific topics based on their contents ;
  • modifying fields between input topics and output topics ;
  • data analysis, including counting and aggregating messages ;
  • joining streams with other streams and/or tables ; and
  • manual state store usage, which is actually part of the 0.8 release described below.

What’s new in KSML 0.8

The 0.8 release is a big update, coming from 0.2.x. It brings lots of feature and stability improvements. It also marks a point in time, where most of the language and framework are considered stable.

Let’s look at some of the most interesting feature additions.

Syntax validation and completion

Internal parsing logic was completely overhauled and is now able to generate its own JSON Schema for the KSML language. The allows developers to load a ksml.json file into their IDEs and perform syntax validation and code completion.

For example, in IntelliJ this can be done by adding the JSON Schema to the JSON Schema Mappings, which you can find at bottom right in the main IDE window.

Once the definition is added and applied to the KSML definition files, IntelliJ will perform syntax checking and provide suggestions for completing your definition.

As a second benefit, the JSON Schema of the KSML definition can also be converted into Markdown format, giving users an always-up-to-date and complete language specification. See here for the online version of the spec.

Better data types, schema handling and notation support

The code for data types, schema handling and (de)serialization was worked on heavily. It now separates the data types from external representations (called notations) like AVRO, CSV and XML. This makes it easier to add other notations (eg. Protobuf) in the future.

Another useful addition was the automatic conversion between types. This is useful when a function returns a json but the output topic is written to in AVRO format. Or when you return a string, which should be interpreted as CSV, JSON or XML. Or when one schema contains a field of type string, but another schema expects it as type long. In nearly all practical situations where KSML is able to convert type A into type B, it will do so automatically.

Improved support for state stores

KSML definitions are now able to specify their own state stores, which in turn can be referenced in Python functions. There is better handling of such scenarios, through:

  • Manual state stores can now be referenced in custom pipeline functions ;
  • The name of the state store is automatically available as a variable in Python ; and
  • State stores can be used completely ‘side-effect-free’. This means that if you want to store an AVRO object in a state store, KSML will not try to register the AVRO schema in a schema registry anymore.

An example of the improved state store handling is shown below.

streams:

 sensor_source_avro:

   topic: ksml_sensordata_avro

   keyType: string

   valueType: avro:SensorData

stores:

 last_sensor_data_store:

   type: keyValue

   keyType: string

   valueType: json

   persistent: false

   historyRetention: 1h

   caching: false

   logging: false

functions:

 process_message:

   type: forEach

   code: |

     last_value = last_sensor_data_store.get(key)

     if last_value != None:

       log.info("Found last value: {} = {}", key, last_value)

     last_sensor_data_store.put(key, value)

     if value != None:

       log.info("Stored new value: {} = {}", key, value)

   stores:

     - last_sensor_data_store

pipelines:

 process_message:

   from: sensor_source_avro

   forEach: process_message

This example reads data from the ksml_sensordata_avro topic and sends every message to the process_message function. This function uses the defined last_sensor_data_store, which is an in-memory key/value state (persistent = false). The function starts with looking up the Kafka message key in the state store. If an entry is found, it will log that fact. Then it stores the new message value in the state store.

Of course, real-life examples would do more with previous data found in the state store. But this example shows how one can use state stores to build stateful message processing.

Logging support in Python functions

Where previous KSML versions relied on print() statements, KSML 0.8 lets users use the log variable to output messages to the application log. This log variable is backed by a Java Logger and can be used in the same way. It supports the following operations:

error(message: str, value_params...) --> sends and error message to the log

warn(message: str, value_params...) --> sends and warning message to the log

info(message: str, value_params...) --> sends and informational message to the log

debug(message: str, value_params...) --> sends and debug message to the log

trace(message: str, value_params...) --> sends and trace message to the log

The message contains double curly brackets {}, which will be substituted by the value parameters. Examples are:

log.error("Something went completely bad here!")

log.info("Received message from topic: key={}, value={}", key, value)

log.debug("I'm printing five variables here: {}, {}, {}, {}, {}. Lovely isn't it?", 1, 2, 3, "text", {"json":"is cool"})

Output of the above statements looks like:

[LOG TIMESTAMP] ERROR function.name   Something went completely bad here!

[LOG TIMESTAMP] INFO  function.name   Received message from topic: key=123, value={"key":"value"}

[LOG TIMESTAMP] DEBUG function.name   I'm printing five variables here: 1, 2, 3, text, {"json":"is cool"}. Lovely isn't it?

Lots of small language updates

  • Improved readability for store types, filter operations and windowing operations.
  • Introduction of the “as” operation, which allows for pipeline referencing and chaining.
  • Type inference in situations where function parameters or result types can be derived from the context. This allows for shorter scripts and better readability of KSML definitions.

Miscellaneous updates

  • Configuration file updates, allowing for running multiple definitions in a single runner (each in its own namespace).
  • Examples updated to reflect the latest definition format.
  • Documentation updated.

Conclusion and Roadmap

While KSML 0.8 is a terrific upgrade coming from the 0.2.x series, the development team already has a series of features they will be working on towards 0.9. Amongst these are:

  • Protobuf support
  • Improvements on the Observability features
    • Logging
      • Configurable logging (via logback.xml)
      • JSON export
      • OpenTelemetry logging
      • LogLevel service
    • Metrics
      • Prometheus JMX exporter agent support
      • OpenTelemetry OLTP support
    • OpenTelemetry Tracing
      • Jaeger
      • Zipkin
      • StdOut
  • Topology description endpoint
  • Status / health service
  • Helm Charts

Use the following links to get to KSML resources:

Download the Whitepaper

Download now
Table name
Lorem ipsum
Lorem ipsum
Lorem ipsum

Answers to your questions about Axual’s All-in-one Kafka Platform

Are you curious about our All-in-one Kafka platform? Dive into our FAQs
for all the details you need, and find the answers to your burning questions.

What new features were introduced in KSML version 0.8.0?

The 0.8.0 release brought several enhancements, including syntax validation and code completion through JSON Schema support, improved data type handling, and enhanced state store support. Additionally, it introduced logging functionality via a built-in logging variable, allowing developers to easily output messages to application logs, and made the language more intuitive with readability improvements and type inference.

How does KSML handle message processing and data transformation?

KSML allows for various message processing scenarios, such as filtering messages based on specific criteria (e.g., color) and converting message formats (e.g., from AVRO to XML). Through declarative definitions, users can create pipelines that specify the input and output topics, data types, and any transformation logic needed, making it straightforward to implement complex data processing tasks without diving into the underlying Java code.

Jeroen van Disseldorp
CEO/CTO

Related blogs

View all
Rachel van Egmond
October 1, 2024
Release blog 2024.3 - the Autumn release
Release blog 2024.3 - the Autumn release

Let’s dive into the highlights of the 2024.3 release and see how we’re equipping you to confidently handle the next season of data challenges.

Axual Product
Axual Product
Rachel van Egmond
October 1, 2024
API-First: Building the foundation for modern applications
API-First: Building the foundation for modern applications

API-first approach has emerged as a strategic methodology that prioritizes the design and development of APIs as the foundation for building applications. Combined with Kafka, a distributed streaming platform, this approach becomes even more powerful, enabling organizations to create scalable, real-time, and event-driven systems.

Business
Business
Rachel van Egmond
September 27, 2024
How API Management transforms and connects IoT technology
How API Management transforms and connects IoT technology

the Internet of Things (IoT) is a significant innovation, revolutionizing how devices interact and share data. IoT technology makes our world more connected and intelligent, from smart home gadgets to industrial sensors. However, as the number of IoT devices grows, managing their communication and integration becomes increasingly complex. This is where API management comes into play, transforming and connecting IoT technology in profound ways.

Business
Business