Real-time Financial Alerts at Rabobank with Apache Kafka’s Streams API

This article, written bij Axual founder Jeroen van Disseldorp, discusses the use of Apache Kafka’s Streams API for sending out alerts to customers of Rabobank. Rabobank is based in the Netherlands with over 900 locations worldwide, 48,000 employees, and €681B in assets. It is a bank by and for customers, a cooperative bank, a socially-responsible bank. It aims to be market leader across all financial markets in the Netherlands. Rabobank is also committed to being a leading bank in the field of food and agriculture worldwide. Rabobank provides financial products and services to millions of customers around the world.

Rabobank Netherlands

Rabobank is based in the Netherlands with over 900 locations worldwide, 48,000 employees, and €681B in assets. It is a bank by and for customers, a cooperative bank, a socially-responsible bank. It aims to be market leader across all financial markets in the Netherlands. Rabobank is also committed to being a leading bank in the field of food and agriculture worldwide. Rabobank provides financial products and services to millions of customers around the world.

For the past years, Rabobank has been actively investing in becoming a real-time, event-driven bank. If you are familiar with banking processes, you will understand that that is quite a step.  A lot of banking processes are implemented as batch jobs on not-so-commodity hardware, so the migration effort is immense. But as said, Rabobank picked up this challenge and defined the Business Event Bus (BEB) as the place where business events from across the organization are shared between applications. They chose Apache Kafka as the main engine underneath and wrote their own BEB client library to facilitate application developers with features like easy message producing/consuming and disaster recovery.

Rabobank uses an Active-Active Kafka setup, where Kafka clusters in multiple data centers are mirrored symmetrically. Upon data center failure or operator intervention, BEB clients—including the Kafka Streams based applications discussed in this article—may be switched over from one Kafka cluster to another without requiring a restart. This allows for 24×7 continued operations during disaster scenarios and planned maintenance windows. The BEB client library implements the switching mechanisms for producers, consumers and streams applications.

Rabo Alerts is a system formed by a set of microservices that produce, consume and/or stream messages from BEB. All data types and code discussed below can be found in a GitHub repository. This post will simplify source code listings to some extent (removing unused fields for example), but the listings still reflect the actual code running in production.

 

The case: Rabo Alerts

Rabo Alerts is a service that allows Rabobank customers to be alerted whenever interesting financial events occur. A simple example of an event is when a certain amount was debited from or credited to your account, but more complex events also exist. Alerts can be configured by the customer based on their preferences and sent via three channels: email, SMS and mobile push notifications. It’s noteworthy to mention that Rabo Alerts is not a new or piloted service. It has been in production for over ten years and is available to millions of account holders.

Challenges

The former implementation of Rabo Alerts resided on mainframe systems. All processing steps were batch-oriented, where the mainframe would derive alerts to be sent every couple of minutes up to only a few times per day, depending on the alert type. The implementation was very stable and reliable, but there were two issues that Rabobank wanted to solve: (1) lack of flexibility and (2) lack of speed.

Flexibility for adapting to new business requirements was low because changing the supported alerts or adding new (and smarter) alerts required a lot of effort. Rabobank’s pace to introduce new features in its online environment has increased heavily in the past years, thus an inflexible alerting solution was becoming increasingly problematic.

Speed of alert delivery was also an issue, because it could take the old implementation 5 minutes up to 4-5 hours to deliver alerts to customers (depending on alert type and batch execution windows). Ten years ago one could argue this was fast enough, but today customer expectations are much higher! The time window in which Rabobank can present “relevant information” to the customer is much smaller today than it used to be ten years ago.

So the question was raised on how the existing mechanism could be redesigned to become more extensible and faster. And of course the redesigned Rabo Alerts, too, would need to be robust and stable so that it could properly serve its existing user base of millions of customers.

Starting small

For the past year we have been redesigning and reimplementing the alerting mechanisms using Kafka and Kafka’s Streams API. Since the entire Rabo Alerts service is quite big, we decided to start with four easy but heavily used alerts:

  • Balance Above Threshold
  • Balance Below Threshold
  • Credited Above Threshold
  • Debited Above Threshold

Each of these alerts can be derived from the stream of payment details from the Current Account systems. Customers can enable these alerts and configure their own threshold per alert. For instance: “send me an SMS when my balance drops below €100” or “send me a push message when someone credits me more than €1000” (often used for salary deposit notifications).

Here are some screenshots that illustrate how Rabo Alerts are configured through the mobile banking app.

Alerting topology

Our first step was to redesign the alerting process. The basic flow goes like this:

  • Tap into the stream of transactions coming from the payment factory. This results in a stream of account entries. Note that a payment transaction always consists of two account entries, a Debit booking and a Credit booking.
  • For every account entry, execute the following steps:
  1. Translate the account number to a list of customers that have read permissions on the account.
  • For every customer, execute the following steps:

1. Look up if the customer has configured Rabo Alerts for given account number.

2. If so, check if this account entry matches the customer’s alert criteria.

3. If so, send the customer an alert on configured channels (email, push and SMS)

  • Step 1 requires a link with the core banking systems that execute the transactions.
  • Step 2a requires that we create a lookup table containing all customer permissions of all accounts.
  • Step 2b requires that we have a lookup table that contains the Rabo Alert settings for all customers.

Using this flow and requirements, we started drawing the following topic flow graph:

All the white boxes in the picture are Kafka topics, listing their Avro key/value data types. Most data types are self-explanatory, but the following data types are worth mentioning:

  • CustomerAlertSettings: Alert settings of a specific customer. These settings contain:
    • CustomerAlertAddresses: List of channels and addresses to which customer alerts are sent. Mobile push addresses are represented by CustomerId here, since the actual list of registered mobile devices is resolved later in the message sending process.
    • CustomerAccountAlertSettings: List of account-specific alert configurations for this customer. The list specifies what alerts the customers wants to receive for this account, and for which thresholds.
  • ChannelType: Enumeration of available channel types, currently being EMAIL, PUSH and SMS.
  • AccountEntry: One booking on one payment account. An account entry is half of a payment transaction, so it’s either the Debit booking on one account, or the Credit booking on the other.
  • OutboundMessage: The contents of the message being sent to a customer. It contains a message type and parameters, but not its addressing. That information is carried in the key of the Outbound topics.

The blue boxes are standalone applications (one might call them microservices), implemented as runnable jars using Spring Boot and deployed on a managed platform. Together they consist of all necessary functionality to implement Rabo Alerts:

  • Alert Settings Manager: fills the compaction topic that contains all alert settings per customer.
  • Account Authorization Manager: an account is not tied 1:1 with a customer, but may be viewed by different users. For example, shared accounts between spouses or business accounts with different authorizations for employees will have arbitrary account/user authorization relationships. This application fills a compaction topic that links an account number to authorized customer IDs. It is also real-time, so that changes in authorizations are immediately effectuated in the alerts that are sent out.
  • Account Entry Bridge: retrieves the stream of all payments from Rabobank’s mainframe-based payment factory via IBM MQ and forwards them onto a Kafka topic.
  • Alerting: core alerting service, see below.
  • Device Resolver: an auxiliary application, the Device Resolver, looks up all the customer’s mobile devices from an external system and writes out the same message to another topic per device (PushId). This lookup could also be done via another compaction topic, but it’s implemented using a remote service call for different reasons.
  • Senders: each Sender consumes messages from a channel-bound topic and sends it to the addressed customers. Channels are assigned their own Kafka topic to decouple channel failures from one another. This allows alerts to still be sent out via push when the email server is down, for example

Show me the code

The Kafka Streams code for Alerting consists of only 2 classes.

The first class is the BalanceAlertsTopology. This class defines the main Kafka Streams topology using a given KStreamBuilder. It implements BEB’s TopologyFactory, a custom interface used by BEB’s client library to generate a new Kafka Streams topology after the application starts or when it is directed to switch over to another Kafka cluster (datacenter switch/failover).

KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
builder.<AccountId, AccountEntry>stream(accountEntryStream)
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
if (isNull(customerIds)) {
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
} else {
return customerIds.getCustomerIds().stream()
.map(customerId -> KeyValue.pair(customerId, accountEntry))
.collect(toList());
}
})
.flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer)
.through(customerIdToAccountEntryStream)
.leftJoin(alertSettings, Pair::with)
.flatMapValues(
(Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) ->
BalanceAlertsGenerator.generateAlerts(
accountEntryAndSettings.getValue0(),
accountEntryAndSettings.getValue1())
);
// Send all Email messages from addressedMessages
addressedMessages
.filter((e, kv) -> kv.key instanceof EmailAddress)
.map((k, v) -> v)
.to(emailMessageStream);
// Send all Sms messages from addressedMessages
addressedMessages
.filter((e, kv) -> kv.key instanceof PhoneNumber)
.map((k, v) -> v)
.to(smsMessageStream);
// Send all Push messages from addressedMessages
// (CustomerId is later resolved to a list of customer's mobile devices)
addressedMessages
.filter((e, kv) -> kv.key instanceof CustomerId)
.map((k, v) -> v)
.to(customerPushMessageStream);

The topology defines a number of steps:

  • Lines 1-13 start with consuming from the account entry stream. When an account entry is retrieved, we look up which customers have access to that specific account. The result is stored in an intermediate topic, using CustomerId as key and AccountEntry as the value. The semantics of the topic are defined as “for this customer (key) this account entry (value) needs processing”.
  • Lines 14-20 are executed per customer. We look up the customer’s alert settings and ask a helper class to generate OutboundMessages if this account entry matches the customer’s interests.
  • Lines 22-39 take the list of all OutboundMessages, separates them into topics per channel.

The magic of alert generation is implemented in the BalanceAlertsGenerator helper class, called from line 17. Its main method is generateAlerts(), which gets an account entry and alert settings from an authorized customer to view the account. Here’s the code:

public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry,
CustomerAlertSettings settings) {
/* Generates addressed alerts for an AccountEntry, using the alert settings with the following steps:
* 1) Settings are for a specific account, drop AccountEntries not for this account
* 2) Match each setting with all alerts to generate appropriate messages
* 3) Address the generated messages
*/
if (settings == null) {
return new ArrayList<>();
}
return settings.getAccountAlertSettings().stream()
.filter(accountAlertSettings -> matchAccount(accountEntry, accountAlertSettings))
.flatMap(accountAlertSettings -> accountAlertSettings.getSettings().stream())
.flatMap(accountAlertSetting -> Stream.of(
generateBalanceAbove(accountEntry, accountAlertSetting),
generateBalanceBelow(accountEntry, accountAlertSetting),
generateCreditedAbove(accountEntry, accountAlertSetting),
generateDebitedAbove(accountEntry, accountAlertSetting))
)
.filter(Optional::isPresent).map(Optional::get)
.flatMap(messageWithChannels -> mapAddresses(messageWithChannels.getValue0(), settings.getAddresses())
.map(address -> KeyValue.pair(address, messageWithChannels.getValue1())))
.collect(toList());
}

The method executes these steps:

  • Line 13 streams all account related alerts settings (one object per account).
  • Line 14 matches the account number from the settings with the account number in the account entry.
  • Line 15 starts streaming the individual settings for the account.
  • Lines 16-21 constructs the series of messages to be sent, along with a list of channels to send the message to. We use a separate method for every alert type. The result is a stream of Pair<List, OutboundMessage>.
  • Line 22 filters out any empty results.
  • Lines 23-24 perform the lookup of the customer’s addresses for given channels and return a stream of KeyValue<address, OutboundMessage>.
  • Line 25 finally collects all results and output the result as a Java List.

Other helper methods in the same class are:

  • matchAccount(): matches an account entry with account alerts settings by comparing the account number and currency.
  • generateBalanceAbove/Below(): generates BalanceAbove/Below alert messages.
  • generateDebited/CreditedAbove(): generates Debited/CreditedAbove alert messages.
  • mapAddresses(): looks up the customer’s alert addresses for a given list of channels.
  • buildMessage(): builds an OutboundMessage.

Together with a few additional classes to wrap this functionality in a standalone application, that’s all there is to it!

First test run

After a first rudimentary implementation, we took the setup for a test drive. It was still a question of how fast it would be. Well, it proved to amaze us—and our expectations started out high! The whole round trip from payment order confirmation to the alert arriving on a mobile device takes only one to two seconds, more often being around one rather than two. This round trip includes the time taken by the payment factory (validation of the payment order, transaction processing), so response times may vary somewhat depending on the payment factory workload at that point in time. The whole alerting chain—from an account entry posted on Kafka up to senders pushing out messages to customers—is typically executed within 120 milliseconds. In the sending phase, push alerts are fastest, taking only 100-200 milliseconds to arrive on the customer’s mobile device. Email and SMS are somewhat slower channels, with messages arriving 2-4 seconds after the Senders push them out. Compare that to the old setup, where it would typically take several minutes up to a few hours until an alert would be delivered.

The following video demonstrates the speed of alert delivery using my personal test account. Note that although I use it for testing, this is a regular Rabobank payment account that runs in production!

 

What are Kafka Tools?

How do you manage Kafka?