Change Data Capture with PostgreSQL, Debezium and Axual Part 1 – Preparing PostgreSQL
Most companies rely heavily on the data stored in their databases. If data isn’t stored in the correct database then business processes can be corrupted and you might make decisions based on outdated or incorrect information.
This makes the data and changes made to it an interesting source of events.
But getting data from a database into a streaming platform has its challenges. This blog will explain how to prepare PostgreSQL for Change Data Capture with Debezium on the Axual Platform.
PostgreSQL, or Postgres, is an open source relational database management system and one of the most popular databases to use whose popularity comes from the fact that it can be easily set up as a single machine, but can also be set up for massive scales and for concurrent access by multiple users. PostgreSQL can be found in big and small companies because of the features, performance and the fact that most cloud providers have a PostgreSQL Database as a Service (DBaaS) offering. But getting data from PostgreSQL and loading it into Axual Platform, or another Kafka based streaming platform, can be challenging. Change Data Capture is an approach often considered to be too complex, but has some distinct advantages when compared to other solutions.
Assessing the possible solutions
There are several solution available to architects and developers to get data from a database into a streaming platform, each with its own advantages and disadvantages.
Solution 1 – Change Data Capture
Change Data Capture is a pattern where a process identifies and captures changes made to data. It then sends the captured changes to another process or system which can process the changes. CDC solutions use the internal change tracing features of databases allowing them to capture and produce record deletion events as well as record insertions and updates. A lot of databases also allow transactions and changes to the tables definitions to be captured. Debezium is a CDC tool for Kafka based systems that can connect and process changes from a relatively wide range of databases and is packaged as a Kafka Connect source connector. An advantage of using CDC is that it can capture inserts, updates and deletes of records, as well as tracking transactions and sometimes datamodel changes. A disadvantage is that CDC tools often support only a limited number of database vendors, as specific internal knowledge is required to connect and use the internal change tracking features of a database.
Kafka Connect is of course one of the default solutions to investigate to get data from an external system to Kafka and back. JDBC, or Java Database Connectivity, is an API that allows applications to access a database in a generic way, without requiring knowledge of the native protocols used by the database. Database vendors often provide JDBC Drivers, which translate the JDBC API calls to the database specific format to handle connections and queries. There are JDBC Source connectors available for Kafka Connect that can read from tables and that regularly perform SQL queries to determine if there are changes. These changes are then produced to the topics by Kafka Connect. A major advantage of these connectors is the fact that they are generic. The solution works on most databases, because most database vendors supply JDBC Drivers. A disadvantage is that the change capture capabilities are limited by the use of SQL statements. Tables need a special column to determine for each record if and when it was changed, and that you cannot catch a deletion of a record in a table without additional logic in the database, such as triggers.
Solution 3 – Developing a custom application
A custom application can be developed that connects to the database, scans the schemas, tables and data inside the tables, and writes the results to one or more Kafka topics. The greatest advantage of this solution is that since this application is custom made it can be optimised for a specific database and use any database specific feature. A disadvantage of developing a custom application is that is can become very complex and hard to maintain as it is also very hard to design and implement an application like this to be reusable and scalable. Another disadvantage of this approach is that it almost impossible to determine if a record was deleted from a table, unless the database offers special features for that.
Example use case with Change Data Capture
Most organisations have a process for reporting and processing incident reports and in our use case the incident reports are created and updated by an application that stores the data in a PostgreSQL database. Change Data Capture will be used to capture the creation, updates and deletion of reports in the database and publish them on the Axual Platform to allow other systems to consume and process the events.
The table IncidentReports from the Incidents schema will be read by the Debezium Connector in Kafka Connect, which will load any new, updated or deleted data entries to the appropriate topic in the Kafka Cluster. The incident report changes should go to the topic cdc-incident-reports. The transaction metadata events that are captured from the database should be sent to the topic cdc-transactions.
Preparing the PostgreSQL database
The Debezium connector for PostgreSQL can scan schemas for changes in tables by using the PostgreSQL Logical Decoding feature. This feature makes it possible to extract the data changes from the transaction log and process these changes with a plugin. PostgreSQL versions 10 and later already have a default plugin installed, called pgoutput. An alternative approach is using the decoderbufs and wal2json plugins. The database administrator/operator should be involved in the selection and installation of these plugins, as they can have an operational impact.
For this guide the default pgoutput plugin will be used.
Prerequisites
The following resources are needed to prepare the database for the Change Data Capture example:
A PostgreSQL database, preferably version 10 or newer
Terminal access to the database server with a user that can read and write to the configuration files of the database and execute the Postgres cli commands, psql and pg_ctl reload
Administrator access to the database, or any user that can create schemas, tables, roles and grants
A SQL Client application, for example psql on the machine running the database
Creating the database schema and table
For these steps the SQL client is required. The example code will use the psql client on the database server.
Open a connection to the PostgreSQL server as an administrator psql -U <username> -d <database name> -h <hostname> -p <port number> A prompt should appear to enter the password.
Create the schema INCIDENTS CREATE SCHEMA INCIDENTS;
Create the IncidentReports table CREATE TABLE INCIDENTS."IncidentReports" ( "IncidentId" INTEGER PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, "Reporter" VARCHAR(100) NOT NULL, "Description" VARCHAR(255) NOT NULL, "Report" TEXT NOT NULL, "Status" VARCHAR(100) NOT NULL DEFAULT 'OPEN' );
The replication identity needs to be set to make sure that Debezium received both the changed record and the record as it was before the change ALTER TABLE INCIDENTS."IncidentReports" REPLICA IDENTITY FULL;
Creating the replication user and setting ownership
PostgreSQL has specific requirements of the grants, or permissions, needed by a user to perform replication like Debezium uses. These grants are:
Replication to allows the user to be set up replication connections
Create on database to allow creation of publication resources used to publish change events
Owner on schema and table to determine the state and any changes to the table and schema
Most of these requirements can be met by granting them to the user, but changing the ownership of a schema or table can disrupt other activities on the database. Groups can be created to prevent that. The group will own those resources, and the new replication user and the original owner will be members of that group. This ensures that there will be no unintended disruptions for other systems.
Open a connection to the PostgreSQL server as an administrator psql -U <username> -d <database name> -h <hostname> -p <port number> A prompt should appear to enter the password.
Create the user cdcUser with password cdcDemo to be used for replication CREATE ROLE "cdcUser" WITH REPLICATION LOGIN PASSWORD 'cdcDemo';
Create the new role that will own the INCIDENTS schema and IncidentReport table CREATE ROLE CDC_REPLICATION_GROUP;
Add the cdcUser to the group GRANT CDC_REPLICATION_GROUP TO "cdcUser";
Add the user that owns the schema and table to the group, in this case it’s the username used in step 1 GRANT CDC_REPLICATION_GROUP TO <username>;
Set the new group as owner of the schema to allow the scan for changes ALTER SCHEMA INCIDENTS OWNER TO CDC_REPLICATION_GROUP;
Set the new group as owner of the table: ALTER TABLE INCIDENTS."IncidentReports" OWNER TO CDC_REPLICATION_GROUP;
Give the new user create access on the database to create publications: GRANT CREATE ON DATABASE <database> TO "cdcUser";
Enabling Logical Decoding and allowing connections
Several changes are required to the PostgreSQL configuration files to enable the Logical Decoding feature and to allow replication connections.
Open a terminal on the machine hosting PostgreSQL
Go to the directory containing the database files and open the file postgresql.conf in a file editor
Search for the WAL settings and make sure the following configuration is used wal_level=logical
Save the postgresql.conf file and exit the file editor
Restart the PostgreSQL server if you had to add or change an existing setting Logical Decoding should now be active
Go to the directory containing the database files and open the file pg_hba.conf in a file editor
Add the following entry to the replication block to allow the user cdcUser to connect for replication from any host host replication cdcUser all scram-sha-256
Reload the configuration using the following command, the database directory is usually the directory where the configuration files are stored pg_ctl -D <path to database directory> reload
Verify that a replication connection can be created using the cdcUser credentials psql "dbname=<database_name> replication=database" -c "IDENTIFY_SYSTEM;" -U cdcUser -h <hostname> -p <port_number> A prompt to enter the password should appear, use cdcDemo An output similar to this should be shown systemid | timeline | xlogpos | dbname ---------------------+----------+-----------+--------- 7119453166424051742 | 1 | 0/17424D0 | axualdb (1 row)
Next steps
After completing the steps in this article the database is prepared for connections from Debezium. The next article will be focussed on using Debezium for PostgreSQL in the Axual Platform.