Self-Service CDC with Kafka Connect, Crossplane, and Strimzi

SPOUD
13 min readFeb 14, 2024

At SPOUD, we offer Data-Streaming related services and consulting to our clients. Oftentimes, we are invited to support companies that are looking to start using Apache Kafka. We find that different companies and often even different teams within said companies need help with setting up Kafka for very similar use-cases. Doing this from scratch for each team at a customer does not scale. Teams will slow down and spend hours or potentially even days implementing use-cases that have already been implemented by other teams.

One such use-case is the setup of Change Data Capture (CDC) with Kafka Connect. CDC is a technique that triggers an event whenever a change is made to a database table. For example, if an application user changes their address in database, an event will be published to Kafka, and all services that are interested in this change can subscribe to the corresponding Kafka topic and be notified about it. With this approach, interested parties do not need to poll the database for changes, add triggers, or implement some custom notification logic between individual services, which would introduce tight coupling between them.

Typically, CDC is not something that is set up once for the entire organization, but rather something that different teams will want to set up independently of each other, as the degree of Kafka adoption can vary greatly between teams. We believe that the approach of setting this up from scratch for each team is not ideal. In the rest of this article, we will explore how Crossplane can be used to tackle this problem in a way that is more in line with the idea of platform engineering.

Platform Engineering

Platform engineering is the modern approach to dealing with the problem of teams needing to set up identical infrastructure for their applications. Instead of requesting said infrastructure from the team responsible for it (e.g. your Kafka team), the responsible team provides some kind of self-service tool that product teams can use to spin up the required infrastructure themselves. A good self-service tool offers a variety of benefits:

  1. Less bureaucracy, shorter wait times: Developer time is typically a very expensive resource. Infrastructure, often, is not. It is a real issue, if a developer is blocked because somebody with the permissions to create a topic or a user hasn’t done it yet. If developers are able to provision the Kafka topics, users, connect clusters etc. themselves instead of filing support tickets and waiting for someone to take care of them, their idle times are minimized.
  2. Less know-how needed to deliver an adequate solution: Even though some tools in the Kafka Ecosystem are marketed as real time-savers (which they ultimately are), sending someone unfamiliar with these tools to set them up, is bound to cause a lot of frustration. If good knowledge of Kafka and the surrounding technologies is a precondition to Kafka’s adoption within the organization, then said adoption will progress at a snail’s pace, as people will first need to be trained. A self-service tool, on the other hand, can provide opinionated abstractions. These abstractions hide away the complexity of the underlying infrastructure and make it easy for developers to get a working solution up and running.
  3. Compliance with company policies: Companies often have policies in place that govern how infrastructure should be set up. These may come in the form of specific naming conventions that make it easy to see which team a resource belongs to. These may also be security-related constraints. We may want to restrict the ability to create infrastructure to specific users. A self-service tool is able to perform additional validation to verify that these constraints are met.

Crossplane

We have recently started experimenting with Crossplane. Crossplane is a Kubernetes framework that allows teams to create such self-service tools. At a high level, Crossplane is a tool that allows to define infrastructure in the form of Kubernetes custom resources. Providers from the Crossplane ecosystem are able to detect these custom resources and then provision the infrastructure that they describe. For example, Crossplane has official providers for the major cloud platforms (AWS, Azure, GCP). This makes it possible to, for example, define S3 buckets or Lambda functions using Kubernetes Custom Resources.

On top of that, Crossplane introduces the concept of composite resource definitions (XRDs). A composite resource, as the name suggests, is a resource that is made up of multiple underlying resources. Compositions are handy because oftentimes multiple pieces of infrastructure must be provisioned in order for them to be usable. For example, if we want to deploy a serverless function to Google Cloud, we need to define a Cloud Function. But a Cloud Function alone is not able to do much. We also need to define a service account that the function will authenticate as. We must also define (or reference an existing) Artifact Repository where the container image of the function will be stored. Instead of defining these resources individually, the platform team could provide an XRD called ServerlessFunction, which automatically provisions everything that is needed to run a Cloud Function.

Another popular tool for provisioning infrastructure is Terraform. Readers familiar with Terraform may wonder how the two tools compare and why one would consider using Crossplane over Terraform. After all, Terraform is also able to support the above use-case with its concept of modules. This is a topic that deserves its own article. The team behind Crossplane has already written a comprehensive comparison of the two tools.

In the next section, we will tackle the challenge of creating an XRD that configures Change Data Capture (CDC) between a MySQL database and an Apache Kafka Cluster.

Change Data Capture

To set up MySQL CDC with Strimzi, we need to configure a KafkaUser, a KafkaConnect cluster, and a KafkaConnector CRDs. The provisioned Kafka User must have its ACLs configured in such a manner that it is able to write to all the required topics (status, offsets, and configs topics for Kafka connect as well as the topics that will hold the actual change data). The container image for our connect cluster needs to include the binaries of the relevant connector (in our case, the Debezium MySQL source connector). Finally, the connector needs to be provided some connector-specific configuration.

Setting up all of the above requires a solid understanding of Kafka Connect and could take hours to get right. However, at its core, what we are trying to achieve here is quite simple. We have a database that we want to read from, and we have a Kafka cluster (managed by the platform team) that we want to write to. The platform team can abstract away all the details required for talking to Kafka. All the product team must provide is a way for the CDC connector to talk to their database. So ideally, instead of setting up 3 low-level Strimzi custom resources, we want to set up only a very small and simple one:

apiVersion: spoud.io/v1alpha1
kind: MySqlCDC
metadata:
name: users
spec:
# database server connection details
databaseHostname: mysql.tt-27-project.svc.cluster.local
databasePort: 3306
databaseUser: mysql-user
databasePassword: XXXXXXXX
# name of the database to watch
databaseName: users

In response to this resource, we want Crossplane to take care of provisioning the following resources:

  • A KafkaUser called mysql-cdc-connect-XXX where XXX is the name of the MySqlCDC resource. The user must have all ACL permissions to topics prefixed with mysql-cdc-connect-XXX, as well as Read permissions to all consumer groups with the same prefix.
  • A KafkaConnect cluster called connect-cluster-XXX where XXX is the name of the MySqlCDC resource., which will pick up the secret of the generated KafkaUser and use it to authenticate with our Kafka cluster. Furthermore, the connect cluster must be configured to store its offsets, configs, and status in topics prefixed with mysql-cdc-connect-XXX, so that the KafkaUser can access them. Finally, we will need to configure the connect cluster to include the Debezium MySQL source connector.
  • A KafkaConnector called mysql-cdc-connect-XXX-connector where XXX is the name of the MySqlCDC resource. The connector configuration is where most of the options from the above YAML file will end up. In addition to that, we must also inject the credentials of our KafkaUser into the configuration.

Crossplane Custom Resource Definition (XRD)

The first step towards implementing the above is to create a Crossplane Composite Resource Definition (XRD). The XRD will define the schema of our custom MySqlCDC resource. In this schema, we describe which fields the resource will have, what their types are, and which of them are required.

apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
name: mysqlcdcs.spoud.io
spec:
group: spoud.io
names:
kind: MySqlCDC
plural: mysqlcdcs
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
databaseHostname:
type: string
databasePort:
type: integer
default: 3306
databaseUser:
type: string
databasePassword:
type: string
databaseName:
type: string
required:
- databaseHostname
- databaseUser
- databasePassword
- databaseName
served: true
referenceable: true
claimNames:
kind: MySqlCDCClaim
plural: mysqlcdcclaims

If we were to apply the above XRD to our cluster, we would be able to create MySqlCDC resources. However, these resources would not do anything yet. We must also define a composition that describes how the MySqlCDC resource should be translated into the underlying infrastructure.

Note: The claimNames section allows us to create claims of type MySqlCDCClaim. In essence, a claim is a handle that points to a MySqlCDC resource. Claims are namespace-scoped and by using them, we can avoid name collisions. For example, if we create a MySqlCDCClaim called users then Crossplane will provision a MySqlCDC resource called users-XXX where XXX is a random suffix (e.g. users-vww9t). If another team creates a claim with the same name in their own namespace, then their underlying MySqlCDC resource will have a different suffix.

Defining the Composition

You can view the full composition manifest on github but let’s take a look at some exerpts from it to get a better understanding of how it works. Our composition spec contains two sections: resources and patchSets. The resources section describes the infrastructure that we want to provision in response to a MySqlCDC resource. Here is the base definition of our KafkaUser resource:

- name: connect-user
base:
apiVersion: kubernetes.crossplane.io/v1alpha1
kind: Object
metadata:
name: connect-user-xxx # will be patched
spec:
providerConfigRef:
name: kubernetes-provider
forProvider:
manifest:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
namespace: kafka
name: connect-user-xxx # will be patched
labels:
# let's assume that the organization has a single kafka cluster
# (otherwise we could also patch this)
strimzi.io/cluster: poc-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
- resource:
type: topic
name: "patchme" # will be patched
patternType: prefix
operations:
- Describe
- Read
- Write
- Create
- Delete
host: "*"
- resource:
type: group
name: "patchme" # will be patched
patternType: prefix
operations:
- Read
host: "*"

Notice how some of the fields are marked with # will be patched comments. These are the fields where we want to plug in values from the MySqlCDC resource. We will get to that in a bit. Before that, let's take a look at the following part of the definition:

        apiVersion: kubernetes.crossplane.io/v1alpha1
kind: Object
metadata:
name: connect-user-xxx # will be patched
spec:
providerConfigRef:
name: kubernetes-provider
forProvider:
manifest:
...

This exerpt tells Crossplane that we want to provision a Kubernetes object. Crossplane’s Kubernetes provider detects resources of kind Object and applies the attached manifest to the configured Kubernetes cluster. In the manifest section itself, we can see that we are defining a KafkaUser custom resource. Strimzi will respond to the creation of a KafkaUser resource by creating a new user in the Kafka cluster.

Underneath the base resource definition, we have a patches section that allows us to plug in values from the MySqlCDC resource into the KafkaUser manifest. We apply the following patches to our KafkaUser:

patches:
- type: CombineFromComposite # 1
toFieldPath: metadata.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s-user"
variables:
- fromFieldPath: metadata.name
- type: PatchSet # 2
patchSetName: name-patchset
- type: CombineFromComposite # 3
toFieldPath: spec.forProvider.manifest.spec.authorization.acls[0].resource.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s"
variables:
- fromFieldPath: metadata.name
- type: CombineFromComposite # 4
toFieldPath: spec.forProvider.manifest.spec.authorization.acls[1].resource.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s"
variables:
- fromFieldPath: metadata.nam

Let’s go through these patches one by one:

  • This patch is of the type CombineFromComposite, which means that it will take a value from the MySqlCDC composite resource and plug it into the base resource definition according to the specified strategy. In this case, we want to set the metadata.name field of the Object resource to mysql-cdc-connect-XXX-user where XXX is the name of our MySqlCDC resource.
  • Apply the patch set called name-patchset. We defined this patch set earlier in the patchSets section. A patch set is a reusable collection of one or more patches. The name-patchset takes the metadata.name field from the MySqlCDC resource and plugs it into the name field of the relevant resource. Because we want to use this logic for multiple resources, we defined it as a patch set so we do not have to define the same patch from scratch for each resource. Here is the full patchSets section:
patchSets:
- name: name-patchset
patches:
- type: CombineFromComposite
toFieldPath: spec.forProvider.manifest.metadata.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s"
variables:
- fromFieldPath: metadata.name
  • This patch sets the name field of the first ACL rule to mysql-cdc-connect-XXX where XXX is the name of the database as specified in the MySqlCDC resource. So if the database is called users, then the generated ACL will allow the KafkaUser to perform all operations on topics prefixed with the string mysql-cdc-connect-users.
  • Similar to the previous patch, this one generates the prefix for the second ACL rule, which will allow the KafkaUser to commit offsets for the relevant consumer groups.

The remaining resources in the composition follow the same pattern. We define Object resources, which apply manifests to our Kubernetes cluster, that provision KafkaConnect and KafkaConnector resources. Where necessary, we apply patches to plug in values from the MySqlCDC resource.

Putting it all together

Upon applying the above XRD and composition, we can create MySqlCDC resources. Suppose we have a MySQL server reachable at mysql.tt-27-project.svc.cluster.local:3306 with a database called users that we want to watch for changes.

If we apply our example MySqlCDC resource, we can observe the provisioning of the CDC infrastructure:

$ kubectl get Object
NAME KIND PROVIDERCONFIG SYNCED READY AGE
mysql-cdc-connect-users-cluster KafkaConnect kubernetes-provider True True 50s
mysql-cdc-connect-users-connector KafkaConnector kubernetes-provider True True 50s
mysql-cdc-connect-users-user KafkaUser kubernetes-provider True True 50s

Here we can see that Object resources have been created. These resources in turn define a KafkaConnect cluster, a KafkaConnector, and a KafkaUser, which the Crossplane Kubernetes provider will provision for us.

By querying the running pods, we can see that our Connect worker image is being built:

$ kubectl get po
NAME READY STATUS RESTARTS AGE
mysql-cdc-connect-users-connect-build 1/1 Running 0 3m52s

Once the image is built, the connect cluster will start up and the connector will be deployed:

$ kubectl get po
NAME READY STATUS RESTARTS AGE
mysql-cdc-connect-users-connect-0 1/1 Running 0 3m31s
$ kubectl get KafkaConnector
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
mysql-cdc-connect-users mysql-cdc-connect-users io.debezium.connector.mysql.MySqlConnector 1 True

Finally, let us test that the connector is actually working.

$ export KAFKA_USER_PW=$(k get secret/mysql-cdc-connect-users -n kafka -o jsonpath="{ .data.password }" | base64 -d)
$ kubectl run -it --rm kafka-consumer \
--image=bitnami/kafka:latest \
--env KAFKA_USER_PW="${KAFKA_USER_PW}" \
-n kafka -- /bin/bash

Then, inside the container, run the kafka console consumer to consume the change events from the USERS table:

$ kafka-console-consumer.sh --topic mysql-cdc-connect-users.users.USERS --from-beginning \
--consumer-property sasl.mechanism=SCRAM-SHA-512 \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"mysql-cdc-connect-users\" password=\"$KAFKA_USER_PW\";" \
--bootstrap-server poc-cluster-kafka-bootstrap:9092 \
--group mysql-cdc-connect-users-test

The consumer will now wait for new messages to arrive. If we insert a new user into the USERS table, we will see the corresponding change event in JSON format in the console consumer. We inserted a user called Demo User who lives at Demo Street 42 and this insertion has resulted in an event with the following payload:

...
"payload": {
"before": null,
"after": {
"name": "Demo User",
"address": "Demo Street 42"
},
"source": {
"version": "2.4.1.Final",
"connector": "mysql",
"name": "mysql-cdc-connect-users",
"ts_ms": 1703260631000,
"snapshot": "false",
"db": "users",
"sequence": null,
"table": "USERS",
"server_id": 1,
"gtid": null,
"file": "binlog.000005",
"pos": 669,
"row": 0,
"thread": 172441,
"query": null
},
"op": "c",
"ts_ms": 1703260631276,
"transaction": null
}
...

Wait, couldn’t we have done all this with Helm or Kustomize?

Users familiar with the Kubernetes ecosystem could ask why we just went through the trouble of defining a resource that provisions a resource that, in turn, finally provisions the resources that we want? Couldn’t we have simply used Kustomize or a templating solution like Helm to achieve the same effect?

The fact is that with Crossplane we are able to set up CDC without granting every user the ability to, for example, create Kafka Users. This is especially important in the world of Strimzi. As we have seen before, the permissions of the user are defined directly in the user manifest. This means that if we allow regular users to create KafkaUser objects, which would be needed for the Helm solution, we also allow them to grant their users all kinds of permissions. This is problematic from a security standpoint. Some topics may contain confidential or personally identifiable information. We certainly do not want just anyone to be able to help themselves to this data.

With Crossplane, the resources are provisioned by the Crossplane operator in a namespace that is outside of most users’ reach. We are also able to use Kubernetes RBAC to impose granular restrictions on who may create what resources. Furthermore, the Crossplane self-service can be adapted even if we decide to move some of the infrastructure from Kubernetes to a managed solution (e.g. if we replace Strimzi with Confluent). The fact that Crossplane seamlessly integrates with Kubernetes RBAC means that we could even control through Kubernetes who is allowed to create what infrastructure outside of Kubernetes.

Conclusion

In this article, we have seen how Crossplane can be used to create a self-service API for provisioning Kafka Connectors. We have managed to hide away the complexity of setting up CDC with Kafka Connect behind a simple custom resource. While our minimal example is already sufficient to get started with CDC, we have only scratched the surface of what Crossplane can do. In our example, we have used Crossplane’s built-in templating mechanism to define our composition. The Crossplane community, however, also created ways to define compositions using Helm-like Go templates, which can be used to create more complex compositions. With this, we could, for example, add a databaseType parameter to our composite resource and use it to configure a universal CDC solution that supports not only MySQL, but also other databases. In the end, it is up to the platform team to decide how much configurability they want to expose to the end user.

As Crossplane evolves, we will likely see more ways to make the above example even cleaner. A feature to watch out for is the EnvironmentConfig resource, which is in alpha at the time of writing. This feature will allow us to define ConfigMap-like resources that can be referenced in patches in the composition. With this resource, it will be possible to separate secrets and other (potentially shared) configuration from the MySqlCDC resource itself.

Want to talk to the author about it? Reach out to us: https://www.linkedin.com/company/spoud.io / info@spoud.io / www.spoud.io

--

--