KSML v0.8: new features for Kafka Streams in Low Code environments
KSML is a wrapper language for Kafka Streams. It allows for easy specification and running of Kafka Streams applications, without requiring Java programming. It was first released in 2021 and is available as open source under the Apache License v2 on Github. Recently version 0.8.0 was released, which brings a number of interesting improvements. In […]
On this page
KSML is a wrapper language for Kafka Streams. It allows for easy specification and running of Kafka Streams applications, without requiring Java programming. It was first released in 2021 and is available as open source under the Apache License v2 on Github.
Recently version 0.8.0 was released, which brings a number of interesting improvements. In this article we will do a quick introduction of KSML and then zoom in on the features in the new release.
Introduction to KSML for Kafka Streams
Kafka Streams provides an easy way for Java developers to write streaming applications. But as powerful as the framework is, the requirement for writing Java code always remained. Even simple operations, like add or removing a field to a Kafka message, require developers to set up new Java projects, create a lot of boilerplate code, set up and maintain build pipelines and manage the application in production.
KSML allows developers to skip most of this work by expressing the desired functionality in YAML. KSML is not intended to be a full replacement of complex Kafka Streams code, or to compete directly with other stream processing frameworks like Flink. It is meant to ease the life of development teams for use cases where simplicity and quick development outweigh the overkill of heavier and more feature-complete frameworks.
One of the main advantages of KSML is that it is fully declarative. This means that common developer responsibilities – like opening and closing connections to Kafka – are handled by the framework. All developers need to worry about is how to transform input messages to output messages. To illustrate this, let’s look at a few common use cases.
Example producer
Before we can process data, we need to have data to process. KSML allows for the specification of data producers, which generate data on-demand and produce it to a specified output topic. Let’s set up one that allows us to process these messages further below.
# This example shows how to generate data and have it sent to a target topic in AVRO format.
functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate some random sensor measurement data
types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },
1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },
2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },
3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },
4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }
}
# Build the result value using any of the above measurement types
value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }
value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])
value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])
value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value
producers:
sensordata_avro_producer:
generator: generate_sensordata_message
interval: 444
to:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
This definition generates messages for Kafka topic ksml_sensordata_avro, with keyType string and valueType avro:SensorData. It does so by calling the function generate_sensordata_message every 444 milliseconds. The function illustrates how to use global variables inside Python functions. In this case, a global variable is used inside the generator function to auto-increment the sensor number we generate data for. The sensor name is returned as the key of the Kafka message. The value is composed by combining a number of random fields together into an emulated sensor reading.
When run, the output of the generator looks like:
2024-03-09T11:06:31,273Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:31,274Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor0, value=SensorData: {"city":"Utrecht", "color":"yellow", "name":"sensor0", "owner":"Dave", "timestamp":1709982391273, "type":"LENGTH", "unit":"ft", "value":"821"}
2024-03-09T11:06:31,292Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 1, offset 1646423
2024-03-09T11:06:31,711Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:31,712Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor1, value=SensorData: {"city":"Xanten", "color":"black", "name":"sensor1", "owner":"Dave", "timestamp":1709982391711, "type":"HUMIDITY", "unit":"g/m3", "value":"74"}
2024-03-09T11:06:31,728Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 1, offset 1646424
2024-03-09T11:06:32,162Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:32,162Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor2, value=SensorData: {"city":"Utrecht", "color":"black", "name":"sensor2", "owner":"Bob", "timestamp":1709982392162, "type":"HUMIDITY", "unit":"%", "value":"31"}
2024-03-09T11:06:32,182Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 0, offset 1515151
2024-03-09T11:06:32,606Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:32,607Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor3, value=SensorData: {"city":"Utrecht", "color":"red", "name":"sensor3", "owner":"Evan", "timestamp":1709982392607, "type":"LENGTH", "unit":"m", "value":"802"}
2024-03-09T11:06:32,623Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 0, offset 1515152
2024-03-09T11:06:33,045Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:33,046Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor4, value=SensorData: {"city":"Xanten", "color":"blue", "name":"sensor4", "owner":"Charlie", "timestamp":1709982393045, "type":"TEMPERATURE", "unit":"C", "value":"-31"}
2024-03-09T11:06:33,067Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 0, offset 1515153
2024-03-09T11:06:33,493Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:33,494Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor5, value=SensorData: {"city":"Utrecht", "color":"white", "name":"sensor5", "owner":"Charlie", "timestamp":1709982393494, "type":"LENGTH", "unit":"ft", "value":"548"}
2024-03-09T11:06:33,510Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 0, offset 1515154
2024-03-09T11:06:33,936Z INFO i.a.k.r.backend.KafkaProducerRunner Calling generate_sensordata_message
2024-03-09T11:06:33,937Z INFO i.a.k.r.backend.ExecutableProducer Message: key=sensor6, value=SensorData: {"city":"Amsterdam", "color":"blue", "name":"sensor6", "owner":"Alice", "timestamp":1709982393937, "type":"LENGTH", "unit":"ft", "value":"365"}
2024-03-09T11:06:33,953Z INFO i.a.k.r.backend.ExecutableProducer Produced message to ksml_sensordata_avro, partition 1, offset 1646425
Example processing
The KSML repository contains numerous examples of how to process messages with Kafka Streams. Let’s look at two examples.
Filtering messages
If we want to filter on certain messages, for instance on blue sensors, then a typical KSML definition would look like this:
# This example shows how to filter messages from a simple stream. Here we only let "blue sensors" pass and discard
# other messages after logging.
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
sensor_filtered:
topic: ksml_sensordata_filtered
keyType: string
valueType: avro:SensorData
functions:
sensor_is_blue:
type: predicate
code: |
if value == None:
log.warn("No value in message with key={}", key)
return False
if value["color"] != "blue":
log.warn("Unknown color: {}", value["color"])
return False
expression: True
pipelines:
filter_pipeline:
from: sensor_source
via:
- type: filter
if: sensor_is_blue
- type: peek
forEach:
code: log.info("MESSAGE ACCEPTED - key={}, value={}", key, value)
to: sensor_filtered
This script looks at every Kafka message, determines if there is a value in the message (in Python: value != None) and returns True or False depending on whether the color field in the message is set to blue.
Converting messages
Another common use case is to receive messages in one format and to output them in another. This is done like below:
# This example shows how to read from an AVRO topic and write to an XML topic
pipelines:
# This pipeline copies data literally, but converts the data type due to the different
# definitions of the input and output topic valueType.
avro_to_xml:
from:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
to:
topic: ksml_sensordata_xml
keyType: string
valueType: xml:SensorData
This example shows the power of the internal data representation KSML uses, as it allows for easy data mapping between different commonly used Kafka message formats.
Other examples
More examples on data stream processing can be found in the KSML repository on Github, including:
- branching topologies ;
- routing messages to specific topics based on their contents ;
- modifying fields between input topics and output topics ;
- data analysis, including counting and aggregating messages ;
- joining streams with other streams and/or tables ; and
- manual state store usage, which is actually part of the 0.8 release described below.
What’s new in KSML 0.8
The 0.8 release is a big update, coming from 0.2.x. It brings lots of feature and stability improvements. It also marks a point in time, where most of the language and framework are considered stable.
Let’s look at some of the most interesting feature additions.
Syntax validation and completion
Internal parsing logic was completely overhauled and is now able to generate its own JSON Schema for the KSML language. The allows developers to load a ksml.json file into their IDEs and perform syntax validation and code completion.
For example, in IntelliJ this can be done by adding the JSON Schema to the JSON Schema Mappings, which you can find at bottom right in the main IDE window.
Once the definition is added and applied to the KSML definition files, IntelliJ will perform syntax checking and provide suggestions for completing your definition.
As a second benefit, the JSON Schema of the KSML definition can also be converted into Markdown format, giving users an always-up-to-date and complete language specification. See here for the online version of the spec.
Better data types, schema handling and notation support
The code for data types, schema handling and (de)serialization was worked on heavily. It now separates the data types from external representations (called notations) like AVRO, CSV and XML. This makes it easier to add other notations (eg. Protobuf) in the future.
Another useful addition was the automatic conversion between types. This is useful when a function returns a json but the output topic is written to in AVRO format. Or when you return a string, which should be interpreted as CSV, JSON or XML. Or when one schema contains a field of type string, but another schema expects it as type long. In nearly all practical situations where KSML is able to convert type A into type B, it will do so automatically.
Improved support for state stores
KSML definitions are now able to specify their own state stores, which in turn can be referenced in Python functions. There is better handling of such scenarios, through:
- Manual state stores can now be referenced in custom pipeline functions ;
- The name of the state store is automatically available as a variable in Python ; and
- State stores can be used completely ‘side-effect-free’. This means that if you want to store an AVRO object in a state store, KSML will not try to register the AVRO schema in a schema registry anymore.
An example of the improved state store handling is shown below.
streams:
sensor_source_avro:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
stores:
last_sensor_data_store:
type: keyValue
keyType: string
valueType: json
persistent: false
historyRetention: 1h
caching: false
logging: false
functions:
process_message:
type: forEach
code: |
last_value = last_sensor_data_store.get(key)
if last_value != None:
log.info("Found last value: {} = {}", key, last_value)
last_sensor_data_store.put(key, value)
if value != None:
log.info("Stored new value: {} = {}", key, value)
stores:
- last_sensor_data_store
pipelines:
process_message:
from: sensor_source_avro
forEach: process_message
This example reads data from the ksml_sensordata_avro topic and sends every message to the process_message function. This function uses the defined last_sensor_data_store, which is an in-memory key/value state (persistent = false). The function starts with looking up the Kafka message key in the state store. If an entry is found, it will log that fact. Then it stores the new message value in the state store.
Of course, real-life examples would do more with previous data found in the state store. But this example shows how one can use state stores to build stateful message processing.
Logging support in Python functions
Where previous KSML versions relied on print() statements, KSML 0.8 lets users use the log variable to output messages to the application log. This log variable is backed by a Java Logger and can be used in the same way. It supports the following operations:
error(message: str, value_params...) --> sends and error message to the log
warn(message: str, value_params...) --> sends and warning message to the log
info(message: str, value_params...) --> sends and informational message to the log
debug(message: str, value_params...) --> sends and debug message to the log
trace(message: str, value_params...) --> sends and trace message to the log
The message contains double curly brackets {}, which will be substituted by the value parameters. Examples are:
log.error("Something went completely bad here!")
log.info("Received message from topic: key={}, value={}", key, value)
log.debug("I'm printing five variables here: {}, {}, {}, {}, {}. Lovely isn't it?", 1, 2, 3, "text", {"json":"is cool"})
Output of the above statements looks like:
[LOG TIMESTAMP] ERROR function.name Something went completely bad here!
[LOG TIMESTAMP] INFO function.name Received message from topic: key=123, value={"key":"value"}
[LOG TIMESTAMP] DEBUG function.name I'm printing five variables here: 1, 2, 3, text, {"json":"is cool"}. Lovely isn't it?
Lots of small language updates
- Improved readability for store types, filter operations and windowing operations.
- Introduction of the “as” operation, which allows for pipeline referencing and chaining.
- Type inference in situations where function parameters or result types can be derived from the context. This allows for shorter scripts and better readability of KSML definitions.
Miscellaneous updates
- Configuration file updates, allowing for running multiple definitions in a single runner (each in its own namespace).
- Examples updated to reflect the latest definition format.
- Documentation updated.
Conclusion and Roadmap
While KSML 0.8 is a terrific upgrade coming from the 0.2.x series, the development team already has a series of features they will be working on towards 0.9. Amongst these are:
- Protobuf support
- Improvements on the Observability features
- Logging
- Configurable logging (via logback.xml)
- JSON export
- OpenTelemetry logging
- LogLevel service
- Metrics
- Prometheus JMX exporter agent support
- OpenTelemetry OLTP support
- OpenTelemetry Tracing
- Jaeger
- Zipkin
- StdOut
- Logging
- Topology description endpoint
- Status / health service
- Helm Charts
Use the following links to get to KSML resources:
Download the Whitepaper
Download nowAnswers to your questions about Axual’s All-in-one Kafka Platform
Are you curious about our All-in-one Kafka platform? Dive into our FAQs
for all the details you need, and find the answers to your burning questions.
The 0.8.0 release brought several enhancements, including syntax validation and code completion through JSON Schema support, improved data type handling, and enhanced state store support. Additionally, it introduced logging functionality via a built-in logging variable, allowing developers to easily output messages to application logs, and made the language more intuitive with readability improvements and type inference.
KSML allows for various message processing scenarios, such as filtering messages based on specific criteria (e.g., color) and converting message formats (e.g., from AVRO to XML). Through declarative definitions, users can create pipelines that specify the input and output topics, data types, and any transformation logic needed, making it straightforward to implement complex data processing tasks without diving into the underlying Java code.
Related blogs
Let’s dive into the highlights of the 2024.3 release and see how we’re equipping you to confidently handle the next season of data challenges.
API-first approach has emerged as a strategic methodology that prioritizes the design and development of APIs as the foundation for building applications. Combined with Kafka, a distributed streaming platform, this approach becomes even more powerful, enabling organizations to create scalable, real-time, and event-driven systems.
the Internet of Things (IoT) is a significant innovation, revolutionizing how devices interact and share data. IoT technology makes our world more connected and intelligent, from smart home gadgets to industrial sensors. However, as the number of IoT devices grows, managing their communication and integration becomes increasingly complex. This is where API management comes into play, transforming and connecting IoT technology in profound ways.