Bhaal Dnd

Summary: Data collected, processed and stored in various industries is constantly being generated and updated. Database tables can be updated with new data from applications or events at intervals that are fractions of a second. With this in mind, IRI developed Ripcurrent as an add-on feature in Voracity to monitor, move and process database data that changes in real time. What is IRI Ripcurrent? Ripcurrent is the name of a Java application developed by IRI that combines the embedded Debezium engine and the streaming capability of IRI CoSort’s SortCL program to respond in real-time to database change events by replicating data to downstream destinations, optionally with transformation rules (e.g., PII masking) applied consistently based on the classification of the data. Ripcurrent integrates with Debezium to track changes from multiple DBs. Ripcurrent bundles Debezium connectors for MySQL, SQL Server, PostgreSQL and Oracle. Debezium supports MongoDB, DB2 and Vitess, but more work is needed to support Ripcurrent for these. Ripcurrent automatically causes SortCL to respond to inserted, updated, or deleted rows of data. This allows a set of equivalent targets (probably in a lower level environment) to be synchronized with the source tables, and optionally apply consistent field-level transformations (such as masking) to the data. Ripcurrent is a seamless analog to this approach for processing changes to flat files in real time, and an alternative to this approach for incrementally moving and masking changed DB data. IRI Ripcurrent can detect changes whenever rows are inserted, updated, or deleted in all tables in a database, except for tables or schemas that have been filtered out based on configuration properties passed to Ripcurrent. In addition, other database change events, such as a change in the structure of a table, are also monitored and recorded by IRI Ripcurrent. Only database change events that involve a change, addition, or deletion of data in a table cause IRI Ripcurrent to respond to that event. Why use IRI Ripcurrent? The ability to replicate and/or mask data in database tables to other database tables or files has long been supported by IRI software. This is usually done through IRI Workbench, the graphical user interface for all IRI (structured) data management products. The “Schema Data Class Job Wizard” in IRI Workbench – where data is searched in a schema and grouped into data classes – can be combined with masking rules. A batch file or shell script is generated from the wizard to perform bulk masking operations defined in FieldShield scripts. However, if changes are made to the database, there is no easy way to replicate the changes without re-running the batch/shell script. And if additional tables or new columns to existing tables have been added to the database, they will be ignored unless the wizard is run again to generate a new batch/shell script and the appropriate IRI FieldShield scripts for the operation. The need to stay in sync with database changes is the motivation for the development of IRI Ripcurrent. For those who want to keep the downstream target(s) in sync with a source database, for example for testing purposes, Ripcurrent provides the functionality of a data class database masking job, but in a dynamic way. Worldwide references: For over 40 years, our customers such as NASA, American Airlines, Walt Disney, Comcast, Universal Music, Reuters, the Federal Motor Transport Authority, the Federal Criminal Police Office, the Federal Employment Agency, Rolex, Commerzbank, Lufthansa, Mercedes Benz, Osram,… have been actively using our software for Big Data wrangling and protection! You can find many of our worldwide references here and a selection of German references here. Partnership with IRI: Since 1993 we have been cooperating with IRI (Innovative Routines International Inc.) from Florida, USA. Thus we have extended our portfolio by the products CoSort, Voracity, DarkShield, FieldShield, RowGen, NextForm, FACT and CellShield. Only JET-Software GmbH owns the German distribution rights for these products. More details about our partner IRI Inc. here. In the first part of the blog post we presented the technical possibilities to detect database changes and how the tool Debezium can be used together with the platform Apache Kafka to stream such change events and make them available to other applications. Now a small prototype is to be developed piece by piece, which demonstrates the functionality of Debezium. The architecture is structured as follows: On a local instance of a SQL server there is a database with a single table named CdcDemo. This contains a few records. For this purpose, one instance of Apache Kafka and one instance of Kafka Connect are set up. In the Kafka instance, a topic for change events will be created later, while Kafka Connect will contain the SQL Server connector from Debezium. Ultimately, the change data from the Topic will be read by two applications and displayed in a simplified way on the console. The two consumers are intended to show that Debezium’s messages can also be processed in parallel. Figure 1: Architecture of the prototype

Preparations in SQL Server

First of all, the foundation must be laid to be able to demonstrate Change Data Capture, i.e. a small database must be created. To do this, a table is created in a database of a local SQL Server instance with the following command:

CREATE TABLE CdcDemo ( Id INT PRIMARY KEY, Surname VARCHAR(50) NULL, Forum Name VARCHAR(50) NULL )

Now any records can be inserted into the table. In this example, we are dealing with the names of famous authors.

Id Surname Forename
101 Lindgren Astrid
102 King Stephen
103 Kästner Erich

The next step is to make the arrangements specific to the Debezium connector. In the case of SQL Server, this means that both the database and the table must be enabled for change data capture. This is done by executing the following two system procedures:

EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'CdcDemo', @role_name = N'CdcRole'.

In this example, dbo is the schema of the table that contains CdcDemo is called. The access to the change data is done by the role CdcRole.

Set up Kafka and Debezium

Once the preparations in SQL Server have been successfully made, the infrastructure required for Debezium can now be set up. Before doing so, however, it should be ensured that a current version of the Java runtime is installed. Now the Apache Kafka software can be downloaded from the official download page and unpacked into any folder. An installation is not necessary. After that, the SQL server connector must be downloaded from Debezium and also unzipped into a folder. After the successful download of Apache Kafka and Debezium, a new configuration for the connector can now be created using a Java properties file:

name=srcsys1-connector connector.class=io.debezium.connector.sqlserver.SqlServerConnector database.hostname=123.123.123.123 database.port=1433 database.user=cdc-demo-user database. password=cdc-demo-password database.dbname=cdc-demo-db database.server.name=srcsys1 table.whitelist=dbo.CdcDemo database.history.kafka.bootstrap.servers=localhost:9092 database.history.kafka.topic=dbhistory.srcsys1

Especially important is the setting connector.class. This tells Kafka Connect which of the previously downloaded executable connectors to use. The class names of the Debezium connectors can be found in the respective documentation. Furthermore database.server.name determines the logical name that Debezium uses for the database. This is important for the later naming of the topic in Kafka. With the help of the configuration table.whitelist can be used to specify all tables that the Debezium connector should monitor. An explanation of all other parameters can be found in the Debezium documentation. Next, the configuration file of Kafka Connect must be adapted, which is located in the folder config folder of the Kafka installation. Since only one instance is needed for this example, the file connect-standalone.properties should be used. In itself, the default settings can be kept here. Only for the property plugin.path property, the path to the downloaded Debezium connector must be specified. Important: This does not mean the path to the JAR files, but to the folder in the hierarchy above, since Kafka Connect can also run multiple connectors at the same time that are located in this folder. For Apache Kafka itself, there is one more small change in the configuration file server.properties makes sense. Since ultimately two consumers should process the Debezium message, it also makes sense to increase the number of partitions for a Topic to two. Thus the change events are written either into the first or into the second partition. The partitions are then each assigned to a consumer so that the messages are processed in parallel but not twice. To implement this, the parameter num.partitions the number 2 is to be entered. Now that the components involved have been configured, the instances can be started. Thereby the attention of the order is important.

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties $ ./bin/kafka-server-start.sh config/server.properties $ ./bin/connect-standalone.sh config/connect-standalone. properties 

First, ZooKeeper is started, which is responsible for managing Kafka instances. Then, a Kafka server is run, which logs into ZooKeeper. Finally, Kafka Connect is started along with the Debezium connector.

Implementing a consumer

By now, the infrastructure for Debezium is complete and all that is missing is a consumer that can process the messages from Kafka. For this purpose, an example .NET core console application is programmed using the Confluent.Kafka library, which is based on the library’s introduction example on GitHub. In addition to this, there is another method that displays the read JSON messages from Kafka succinctly on the console.

using Confluent.Kafka; using Newtonsoft.Json.Linq; using System; using System.Threading; namespace StreamKafka { class Program { static void Main(string[] args) { var config = new ConsumerConfig { GroupId = "streamer-group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, }; using (var consumer = new ConsumerBuilder(config).Build()) { consumer.Subscribe("srcsys1.dbo.CdcDemo"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var consumeResult = consumer. Consume(cts.Token); if (consumeResult.Message.Value != null) Console.WriteLine($"[{consumeResult.TopicPartitionOffset}] " + ProcessMessage(consumeResult.Message.Value)); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { consumer.Close(); } } } static string ProcessMessage(string jsonString) { var jsonObject = JObject.Parse(jsonString); var payload = jsonObject["payload"]; string returnString = ""; char operation = payload["op"].ToString()[0]; switch (operation) { case 'c': returnString += "INSERT: "; returnString += $"{payload["after"]["Id"]} | {payload["after"]["Nachname"]} | {payload["after"]["FirstName"]}"; break; case 'd': returnString += "DELETE: "; returnString += $"{payload["before"]["Id"]} | {payload["before"]["Nachname"]} | {payload["before"]["FirstName"]}"; break; case 'u': returnString += "UPDATE: "; returnString += $"{payload["before"]["Id"]} | {payload["before"]["Nachname"]} | {payload["before"]["Vorname"]} --> " + $"{payload["after"]["Id"]} | {payload["after"]["Nachname"]} | {payload["after"]["FirstName"]}"; break; default: returnString += $"{payload["after"]["Id"]} | {payload["after"]["Nachname"]} | {payload["after"]["FirstName"]}"; break; } return returnString; } } }

There are a couple of interesting places in the source code: First, a configuration is created for the consumer. This contains a GroupId, which is represented by a string. The group is used to divide the work among the consumers, since applications of the same group do not process messages twice. In the following, the consumer subscribes to the topic srcsys1.dbo.CdcDemowhich was automatically set up in Kafka by Debezium before. The topic name is derived from the server and table parameters specified in the Debezium configuration. The consumer then enters an infinite loop in which messages are read, processed and output.

Testing the prototype

Now all the necessary components are installed, configured and implemented for this prototype. It is time to try out the prototype. To do this, it is advisable to first start two instances of the implemented consumer and only then run Kafka or Debezium as described above. Once all components are up, the Debezium connector takes a snapshot of the database table and writes these messages to Kafka. The two consumers are already waiting there. They should produce output that looks similar to the following figure. Figure 2: The consumers output the snapshot of the database table. Briefly about the meaning of the output: The information in the square brackets before the actual record gives information about the topic, the partition number and the log number of the respective message. It can be seen that a consumer only cares about the messages of one partition. Debezium decides by hashing and modulo calculation of the primary key to which partition a record is assigned. Now you can test how Debezium reacts to changes to the table. Using SQL Server Management Studio, INSERT, UPDATE, and DELETE commands can be executed on the database. Just a short time after a statement is issued, the consumers should respond and produce output accordingly. After a few DML commands are executed, the output might look like the following: Figure 3: Consumer console output after some changes on the table. One question should be clarified at the end: Is it possible for race conditions to occur due to the partitioning of the messages? In other words, could changes to the same record “overtake” each other across the two partitions and thus be processed in the wrong order? The answer is no. Fortunately, Debezium has already thought of this. Since the change events are assigned to the respective partitions based on their primary key as described above, change data regarding the same data set will also always end up in the same partition one after the other and will only be processed there by a consumer in the correct order.

Conclusion

The example has shown that using Debezium together with Apache Kafka to stream database changes is quite easy to implement. Insert, modify, and delete commands can be processed with it in near real-time. In addition to the examples shown in this prototype, there is also the ability to stream changes to the data schema. For this, Debezium creates a separate topic in Kafka. It is important to note that the prototype presented is only a minimal example. For the productive use of Debezium, it is necessary to scale the corresponding components in such a way that a certain degree of fault tolerance and fail-safety is guaranteed.

This article was written by:

Richard Mogwitz

Richard Mogwitz is studying Applied Computer Science at HTW Dresden and joined ZEISS Digital Innovation as a working student in 2019. He is mainly involved in .NET application development, but also in web application programming with Blazor and Angular.

Definition What is Debezium?

Debezium is an open source framework for Change Data Capture (CDC). It captures changes in a database and makes them available to other applications in the form of event streams. The connection to the databases and their monitoring is realized via connectors. Connectors are available for many different database management systems. Provider on the topic (Image: © aga7ta – stock.adobe.com) Debezium is the name of an open source framework for Change Data Capture (CDC). It is a distributed platform consisting of various components and services that captures changes in databases and makes them available to other applications in the form of event streams. Usually Debezium is used together with Apache Kafka and Kafka Connect, but standalone use is also possible. Using the events provided, other applications can react to changes in the data of a database, such as updating indexes or updating parallel data sets. Continuous monitoring of the databases is realized via connectors. Debezium is fault-tolerant, fast and hardly affects the performance of the monitored database. The open source framework works with many different databases and database management systems. MySQL, MongoDB, PostgreSQL, Oracle, SQL Server, Cassandra, Db2 and Vitess are currently supported. Debezium is licensed under Apache 2.0 license. The name and logo are trademarks of Red Hat. Current versions of the software (as of August 2021) are Debezium version 1.5 (Stable), Debezium version 1.6 (Latest Stable) and Debezium version 1.7 (Development).

General information about Change Data Capture (CDC)

The term Change Data Capture, abbreviated CDC, describes concepts and methods for continuously monitoring data sources, capturing changes in data, and preparing and providing this information so that it can be used by other services and applications. Data changes can be deletions, new entries or data updates. Typical application areas are databases, Big Data and data warehouses. CDC provides the captured changes in the form of asynchronous events. The events provide information about the previous and current state of the changed record, as well as metadata such as timestamps. Debezium is a CDC framework that provides the required software for Change Data Capture.

Architecture and Functionality of Debezium

The architecture concept of Debezium is fault tolerant and based on a distributed system of different services and components. Monitoring processes, connectors and event streaming can be distributed across different systems. Replication and recording mechanisms are provided so that events are not lost in the event of problems with individual systems. Debezium works with so-called log-based change data capture, which uses the transaction logs of the databases as source information for data changes. Usually, Debezium is used together with the distributed streaming platform Apache Kafka and Kafka Connect. In the Debezium framework, Kafka performs the task of processing streams in real time, organizing data exchange and performing replications. Debezium’s source connectors deliver the detected changes in the databases to Kafka. Each connector is responsible for monitoring a database server. In the upstream direction, other applications (consumers) read the Kafka events and process them in the desired form. A Debezium system consists of a cluster of multiple Kafka brokers. The Debezium connectors are associated with clusters of Kafka Connect services. The number of Kafka brokers and clusters depends on the amount of events to be processed, the number of database tables to be monitored, and the amount of consumers. The process of capturing a database change and streaming an event is done in several steps. When data changes are written to a database, the database’s transaction log file is updated. Debezium monitors this process and immediately captures any changes. In the next step, Debezium creates a Kafka Record with the detected data change and publishes it to Kafka. There it is available to consumers. Debezium does not have to be used together with Kafka. It is possible to integrate Debezium connectors into applications standalone. In this way, the applications get direct monitoring access to a database and can react to data changes without streaming Kafka. The disadvantage of the standalone method is that the system is less fault-tolerant and features and scalability are limited.

Possible applications of Debezium

Change Data Capture using the Debezium framework has numerous potential applications. It can be used wherever an application needs to respond directly to a change in a database’s dataset. The events provided can be used for various purposes. For example, caches, indexes or parallel datasets can be updated. Typical use cases are:

  • Realization of a data store consistent with the source database
  • Creation of local data copies
  • Data synchronization
  • Updating the cache
  • Propagating data between microservices
  • Building a full-text search index
  • Parallel writing of data to multiple databases
  • Database sharing
  • Data aggregation in a data warehouse
  • Seamless migration of databases and datasets
  • Streaming database updates to multiple different applications without giving them direct access to the databases

(ID:47551814] There are many interesting people in the Java community who are driving development with their involvement in Java Specification Requests (JSRs) and open source projects. I would like to introduce some of them here one by one and talk to them about their projects. This time I talked to Gunnar Morling about Change Data Capture and the open source project Debezium.     Thorben Janssen: Thank you very much for being available for an interview again. Some of you may have missed the earlier interview with you about the Bean Validation 2.0 specification. Can you please introduce yourself again briefly? Gunnar Morling: I am an open source software developer for Red Hat. Initially, I worked as part of the Hibernate team on projects such as Hibernate Validator and Hibernate Search, among others, and led the development of Bean Validation 2.0 (JSR 380). As part of the Java EE platform, the Bean Validation specification is currently being transferred to Jakarta EE and will be further developed under the umbrella of the Eclipse Foundation. For more than two years now I have been working on Debezium, an open source platform for change data capture. Maybe some readers already know me from conferences like JavaLand or my work on other projects like MapStruct and Deptective. Janssen: Since microservices have become increasingly popular, there has also been more talk about patterns for exchanging data between different data sources. One of these is Change Data Capture (CDC). What exactly is it all about and what is the basic idea behind it? Morling: The idea of Change Data Capture is quickly explained: whenever something changes in a database, for example, a customer is created, an order is updated or a delivery is deleted, this change is captured and distributed as an event to interested consumers. These events describe the old and new state of the affected record, as well as metadata such as the timestamp of the change, transaction ID, and more. In terms of loose coupling, the change events are typically transmitted asynchronously to the consumers via message brokers such as Apache Kafka. Janssen: And what does an ideal use case scenario for CDC look like? Morling: The use cases for CDC are extremely diverse: On the one hand, you can react directly to the events and, for example, invalidate corresponding entries of a cache. Streaming queries also fall into this category: In this case, predefined queries (e.g. “What is the accumulated order value of the furniture product category within the last 60 minutes?”) is automatically executed whenever a relevant change event occurs, such as “Purchase order created”. The continuously updated results of such a streaming query can then in turn be used, for example, to update a dashboard live or to implement alerting when the inventory of a product falls below a certain threshold. On the other hand, change event data can also be used to update other data stores and keep them consistent with the source database. This could be, for example, a full-text search index in Elasticsearch, a data warehouse or other databases for analysis purposes. But also audit logs or optimized read data models in a CQRS architecture (Command Query Responsibility Segregration) can be generated via CDC. In the context of microservices, change data capture can be used to propagate data changes between different services. Services can then, for example, create a copy of other services’ data in their own local database, avoiding synchronous calls between services. Janssen: With the Debezium project, you provide an implementation of the CDC patterns. How does that work exactly and what data sources can I use it with? Morling: Debezium implements what is called log-based CDC, which means that the source of the change events are the transaction logs of the database. This offers great advantages over alternative approaches such as polling (i.e., repeatedly running queries to identify new or changed records):

  • All changes are guaranteed to be captured, including short successive updates to the same record as well as deletions.
  • Change events are identified from the log in a very efficient way without having a negative performance impact on other queries.
  • No changes to the data schema are required like a “Last Updated” column.
  • Depending on the configuration and database, the previous state of a changed record can be captured as well as metadata such as a change timestamp, the originating query, etc.

Debezium is based on Kafka Connect, a framework for implementing and executing Kafka connectors, which enables, among other things, connector management (configuration, start, stop, etc.), monitoring and high availability via clustering of Kafka Connect instances. Fault tolerance is also taken into account: If, for example, a connector crashes or simply has to be restarted due to an update, no change events are lost, but the log is simply read on from the last position processed. In the context of Kafka Connect, there are a number of so-called sink connectors that can be used with Debezium to transfer data changes to other databases, Elasticsearch, Hadoop clusters, and so on. In addition to deployment via Kafka Connect, Debezium can also be used as a library in any Java application, which users make use of, for example, to stream change events to Amazon Kinesis. Debezium supports various databases such as MySQL, PostgreSQL, SQL Server and MongoDB. The community is also currently working on a connector for Apache Cassandra. Janssen: What do I need to do if I want to use Debezium? Morling: A starting point could be reading the Debezium tutorial, which provides the necessary components (a database such as MySQL, Apache Kafka, Kafka Connect, Debezium) as container images and shows the steps from configuring a connector to displaying change events in Kafka. For production use based on Kubernetes/OpenShift, the Strimzi project is recommended, which provides a Kubernetes operator for running Kafka and Kafka Connect. There is commercial support for this from Red Hat if required; this also applies to Debezium itself (currently available as a Developer Preview). Janssen: Currently, Debezium is available in version 0.9. Is it already suitable for productive use? What is still missing for version 1.0? Morling: The community is already using the various Debezium connectors in production on a large scale. Users report deployments with hundreds of databases in some cases. We as a development team are therefore very keen to ensure compatibility with previous versions in new releases and to enable as smooth an upgrade as possible. Currently we are working on version 0.10, which among other things includes some unifications of the event formats of the various connectors and a first version of the Cassandra connector. Debezium 1.0 should follow next, here the focus will be mainly on unifications regarding the configuration options of the connectors and the extension of the automated test suite to further improve the stability of the connectors. Some keywords for possible future enhancements are integration with the CloudEvents standard, integration with additional message brokers, support for creating denormalized data views and some more. We are very much guided by the needs of the community and are constantly adapting the roadmap. Janssen: Where can I learn more about the project? Morling: The central point of contact is our website. There, the connectors can be downloaded; there is the reference documentation and the tutorial mentioned above for a quick start. In the blog, we provide information about new releases and advanced topics such as demos on specific CDC use cases. The community is available to answer questions as needed via a mailing list and chat room, and of course you can find @debezium on Twitter. Janssen: And where can people find you? Morling: I can be reached through those channels and also on Twitter at @gunnarmorling. Janssen: Thank you for the interview and continued success with Debezium. () Bhaal Dnd.




Leave a comment

Your email address will not be published. Required fields are marked *