Data replication with Debezium

SPOUD
8 min readSep 6, 2024

--

Debezium is a reliable tool to generate Change Data Capture (CDC) for the most popular databases like Postgres, MySQL, Oracle, MongoDB and others. The extensive documentation and the numerous blog posts of the Debezium team provide thorough information about its functionality and different applications. There are multiple tutorials on the internet about how to set up a CDC system step by step. Putting together a simple CDC is no sorcery. However, the devil is in the details, as usual. As a developer, you will eventually stumble upon particular problems where the internet resources can only help you to a limited degree.

SPOUD supports customers in building CDC pipelines. One such case was fascinating. We were asked to help a customer set up a data replication system between two databases with Kafka as a data channel. The system we had to design was subject to rigid restrictions that caused us to deal with non-trivial problems.

This blog post is the first of a series. In each, we’ll examine a specific problem that arose during design and development. We assume that the reader already knows about Kafka Connect and Debezium.

System Overview

The system we were about to design should replicate data from an Oracle database to a Postgres database, with Kafka as a transportation channel in between. We decided to rely on Debezium in a Kafka Connect cluster to connect Oracle with Kafka because we needed all data changes, transaction information and schema changes.

Connecting Kafka with Postgres is a challenging decision on the other end of the pipeline. The project’s requirements prevented us from using a simple JDBC sink connector. One reason was that transaction boundaries of all changes needed to be respected, even in the target database. We ended up implementing an application to apply a transaction as a whole to Postgres.

System Overview

Data integrity with transactional changes

Changes to a database’s data are wrapped in transactions that are finally committed or rolled back. Only when committed are the changes visible to all clients. The intermediate state in an open transaction must only be visible to the client performing the changes.

Say we are managing a database in a banking environment. A table in our database stores the current balance of each bank account. Frank wants to transfer money from his to John’s account. This transaction involves two updates: one to deduct the transfer amount from Frank’s account and one to add it to John’s account. The state between these two operations violates the data integrity because the overall amount of money the bank manages doesn’t equal the amount before the transaction. The intermediate state is temporary and should not be visible to the public.

How do you design a replication system with Kafka if you have to respect data integrity in the target database at every point? A simple setup with Debezium cannot fulfil these requirements because of the following issues:

  1. If you distribute the change events across multiple topics (the default in Debezium) or even a single topic with multiple partitions, the consumer cannot guarantee that they will be processed in the same order as they were produced. Therefore, without certain precautions, the target database might end up in a state that never existed in the source database.
  2. If the change events were produced to the same topic having only one partition (thus guaranteeing the total order), the target database might violate data integrity because there are no transaction boundaries. This causes intermediate states to be publicly visible.

Data integrity cannot be guaranteed without specific measurements from us. Several approaches can be taken to tackle these problems.

Solution 1: outbox pattern

One approach to deal with these problems is a design pattern called the outbox pattern. The idea of this pattern is to have a table containing high-level events describing the data changes. In the money transfer example above, the client would not just update both bank accounts but also insert a MoneyTransfer event into the outbox table, containing information about the source and target account and the transfer amount. The source connector only generates a CDC from this table. Every message includes the information required to perform the database changes. All the information to recreate the transaction is compacted in one row. Thus, the transaction boundary is the message itself.

Though it solves the described problems, it also has some implications. You’d have to create a separate outbox table and add additional application logic to fill it. Depending on the circumstances, this might be impossible. Say a separate team is responsible for the database, and your team is only a data consumer. You will likely not have the authority to create a table or change the application logic, nor will you receive it.

If you build data replication with an outbox pattern, you must implement application logic twice. In the money transfer example above, an application must execute the two updates in the source database and add a corresponding row to the outbox table. To replicate the same state in the target database, another application has to read the outbox event, translate it into the same update statements and apply it to the target database. The same applies to every type of event.

You must be wary of the synchronization between producers and consumers of the outbox events. They need to have a strict contract about the messages they exchange. Event types and their payload need to be designed carefully so that the producer knows what events need to be published and the consumer knows what events it can expect to arrive. Introducing new events requires changes in both producers and consumers.

Solution 2: separate transaction events

Debezium offers a configuration property provide.transaction.metadata, which is false by default. By setting it to true, Debezium writes transaction boundaries, which are a BEGIN and an END message, into a separate topic. For detailed transaction information, look at the Debezium documentation.

In the end, you get one topic containing the transaction data and a topic for each table containing data change events. To achieve data integrity in the target database, the messages in those topics must be merged so that each transaction can be written as a single entity. The transaction messages dictate the system’s timing; the data change events are only attached to the transaction.

A component that merges transaction and data change events can be implemented in various ways, be it as a Kafka Streams application or an application that reads from the topics and writes to the target database directly. They all have in common the need to merge the messages into transaction packages. The more topics they must consider, the more complex and time-consuming the logic to synchronize the events.

Solution 3: mixing transaction and data change events

This approach follows the same idea as solution 2 but tries to be smarter in the distribution of data change events. Debezium internally uses a transaction buffer. There are four different events in a database that Debezium reacts to concerning this buffer:

  1. Start of a new transaction: Debezium opens a new entry for it in its transaction buffer.
  2. Data change (any DML statement): Debezium looks for the data change’s transaction in its buffer and appends the message.
  3. Transaction commit: Debezium looks for the transaction in its buffer and emits a begin message, a message for each data change and an end message.
  4. Transaction abortion: Debezium removes the entry for the transaction in the buffer and drops the transaction’s accumulated data changes.

Whenever Debezium receives a transaction commit, it subsequently emits all the messages of the transaction. If Debezium knows the order of a transaction, why do we bother distributing them across multiple topics and merging them back together later on? Why not use the order that Debezium constructs for us?

We start by configuring Debezium to emit transaction and data change events on the same topic to achieve that. The name of the transaction topic is defined as ${topic.prefix}.${topic.transaction}. By defining

{
...
"topic.prefix": "dev.application",
"topic.transaction":"CDC",
...
}

the transaction events will be produced to the topic dev.aplication.cdc.

Next, we want to redirect the data change events to the same topic. By default, Debezium emits changes to a table-specific topic that is defined as ${topic.prefix}.${db-schema}.${table}. In Debezium, there is no built-in way to change those names. Instead, we can make use of SMTs.

{
...
"transforms":"reroute",
"transforms.reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.reroute.topic.regex":"(.*)\\.my-schema\\.(.*)",
"transforms.reroute.topic.replacement":"$1.cdc",
...
}

With this SMT, each message whose topic contains the schema name will be rerouted to dev.aplication.cdc. In the end, all messages will be produced on the same topic. However, messages from the same transaction can still be distributed across multiple partitions.

One approach to solve this is to choose a keying strategy so that transaction and data change events that belong to the same transaction receive the same key and thus end up in the same partition. Debezium offers no way out of the box to change the message key to the transaction ID. Here, we have to rely on SMTs like ValueToKey to adjust the key before a message is sent to Kafka.

Another approach to the distributed message problem is assigning only one partition to the topic. This gives us the advantage that the system can even guarantee the order of the transaction. But at this point, all your Kafka engineering alarm bells should be ringing! Topics with a single partition are dangerous and should be avoided if possible. Why?

  • The one partition automatically becomes a hot partition. All the traffic, reading and writing, will go to the same broker. High traffic can cause the broker to reach resource limits for CPU, memory, network, disk, etc. The broker might become your application’s performance bottleneck. Hot partitions can lead to an effect called a noisy neighbour problem. A broker is a machine shared among multiple applications or tenants. Let’s assume we have a hot partition occupying a significant share of the broker’s resources. This implies that partitions of other applications have fewer resources available for their workload. Thus, an application may see a performance loss even though the bottleneck is caused by another application that may belong to another tenant.
  • The consumer of a single partition topic cannot be scaled horizontally due to the limit of one consumer per partition.

There are cases where topics with a single partition might be necessary. Just be aware of the implications!

Conclusion

Keeping data integrity intact across multiple systems is challenging, especially when a distributed system like Kafka is involved. We have to make certain adjustments to achieve this.

We showed three different approaches to dealing with data integrity while reading changes from a database. Each has its advantages and disadvantages. Choosing the one that matches your conditions takes much work. We recommend exploring the solutions in the order they are presented in this document and carefully examining whether they can be applied.

Coming up

Once Debezium is running, it needs to be adequately surveyed. In the following blog post, we will explore the topic of monitoring to gain insights into your system’s stability, performance, and potential bottlenecks.

--

--