At SPOUD, we offer Data-Streaming related services and consulting to our clients. Oftentimes, we are invited to support companies that are looking to start using Apache Kafka. We find that different companies and often even different teams within said companies need help with setting up Kafka for very similar use-cases. Doing this from scratch for each team at a customer does not scale. Teams will slow down and spend hours or potentially even days implementing use-cases that have already been implemented by other teams.
One such use-case is the setup of Change Data Capture (CDC) with Kafka Connect. CDC is a technique that triggers an event whenever a change is made to a database table. For example, if an application user changes their address in database, an event will be published to Kafka, and all services that are interested in this change can subscribe to the corresponding Kafka topic and be notified about it. With this approach, interested parties do not need to poll the database for changes, add triggers, or implement some custom notification logic between individual services, which would introduce tight coupling between them.
Typically, CDC is not something that is set up once for the entire organization, but rather something that different teams will want to set up independently of each other, as the degree of Kafka adoption can vary greatly between teams. We believe that the approach of setting this up from scratch for each team is not ideal. In the rest of this article, we will explore how Crossplane can be used to tackle this problem in a way that is more in line with the idea of platform engineering.
Platform Engineering
Platform engineering is the modern approach to dealing with the problem of teams needing to set up identical infrastructure for their applications. Instead of requesting said infrastructure from the team responsible for it (e.g. your Kafka team), the responsible team provides some kind of self-service tool that product teams can use to spin up the required infrastructure themselves. A good self-service tool offers a variety of benefits:
- Less bureaucracy, shorter wait times: Developer time is typically a very expensive resource. Infrastructure, often, is not. It is a real issue, if a developer is blocked because somebody with the permissions to create a topic or a user hasn’t done it yet. If developers are able to provision the Kafka topics, users, connect clusters etc. themselves instead of filing support tickets and waiting for someone to take care of them, their idle times are minimized.
- Less know-how needed to deliver an adequate solution: Even though some tools in the Kafka Ecosystem are marketed as real time-savers (which they ultimately are), sending someone unfamiliar with these tools to set them up, is bound to cause a lot of frustration. If good knowledge of Kafka and the surrounding technologies is a precondition to Kafka’s adoption within the organization, then said adoption will progress at a snail’s pace, as people will first need to be trained. A self-service tool, on the other hand, can provide opinionated abstractions. These abstractions hide away the complexity of the underlying infrastructure and make it easy for developers to get a working solution up and running.
- Compliance with company policies: Companies often have policies in place that govern how infrastructure should be set up. These may come in the form of specific naming conventions that make it easy to see which team a resource belongs to. These may also be security-related constraints. We may want to restrict the ability to create infrastructure to specific users. A self-service tool is able to perform additional validation to verify that these constraints are met.
Crossplane
We have recently started experimenting with Crossplane. Crossplane is a Kubernetes framework that allows teams to create such self-service tools. At a high level, Crossplane is a tool that allows to define infrastructure in the form of Kubernetes custom resources. Providers from the Crossplane ecosystem are able to detect these custom resources and then provision the infrastructure that they describe. For example, Crossplane has official providers for the major cloud platforms (AWS, Azure, GCP). This makes it possible to, for example, define S3 buckets or Lambda functions using Kubernetes Custom Resources.
On top of that, Crossplane introduces the concept of composite resource definitions (XRDs). A composite resource, as the name suggests, is a resource that is made up of multiple underlying resources. Compositions are handy because oftentimes multiple pieces of infrastructure must be provisioned in order for them to be usable. For example, if we want to deploy a serverless function to Google Cloud, we need to define a Cloud Function. But a Cloud Function alone is not able to do much. We also need to define a service account that the function will authenticate as. We must also define (or reference an existing) Artifact Repository where the container image of the function will be stored. Instead of defining these resources individually, the platform team could provide an XRD called ServerlessFunction
, which automatically provisions everything that is needed to run a Cloud Function.
Another popular tool for provisioning infrastructure is Terraform. Readers familiar with Terraform may wonder how the two tools compare and why one would consider using Crossplane over Terraform. After all, Terraform is also able to support the above use-case with its concept of modules. This is a topic that deserves its own article. The team behind Crossplane has already written a comprehensive comparison of the two tools.
In the next section, we will tackle the challenge of creating an XRD that configures Change Data Capture (CDC) between a MySQL database and an Apache Kafka Cluster.
Change Data Capture
To set up MySQL CDC with Strimzi, we need to configure a KafkaUser
, a KafkaConnect
cluster, and a KafkaConnector
CRDs. The provisioned Kafka User must have its ACLs configured in such a manner that it is able to write to all the required topics (status, offsets, and configs topics for Kafka connect as well as the topics that will hold the actual change data). The container image for our connect cluster needs to include the binaries of the relevant connector (in our case, the Debezium MySQL source connector). Finally, the connector needs to be provided some connector-specific configuration.
Setting up all of the above requires a solid understanding of Kafka Connect and could take hours to get right. However, at its core, what we are trying to achieve here is quite simple. We have a database that we want to read from, and we have a Kafka cluster (managed by the platform team) that we want to write to. The platform team can abstract away all the details required for talking to Kafka. All the product team must provide is a way for the CDC connector to talk to their database. So ideally, instead of setting up 3 low-level Strimzi custom resources, we want to set up only a very small and simple one:
apiVersion: spoud.io/v1alpha1
kind: MySqlCDC
metadata:
name: users
spec:
# database server connection details
databaseHostname: mysql.tt-27-project.svc.cluster.local
databasePort: 3306
databaseUser: mysql-user
databasePassword: XXXXXXXX
# name of the database to watch
databaseName: users
In response to this resource, we want Crossplane to take care of provisioning the following resources:
- A
KafkaUser
calledmysql-cdc-connect-XXX
whereXXX
is the name of theMySqlCDC
resource. The user must have all ACL permissions to topics prefixed withmysql-cdc-connect-XXX
, as well asRead
permissions to all consumer groups with the same prefix. - A
KafkaConnect
cluster calledconnect-cluster-XXX
whereXXX
is the name of theMySqlCDC
resource., which will pick up the secret of the generatedKafkaUser
and use it to authenticate with our Kafka cluster. Furthermore, the connect cluster must be configured to store its offsets, configs, and status in topics prefixed withmysql-cdc-connect-XXX
, so that theKafkaUser
can access them. Finally, we will need to configure the connect cluster to include the Debezium MySQL source connector. - A
KafkaConnector
calledmysql-cdc-connect-XXX-connector
whereXXX
is the name of theMySqlCDC
resource. The connector configuration is where most of the options from the above YAML file will end up. In addition to that, we must also inject the credentials of ourKafkaUser
into the configuration.
Crossplane Custom Resource Definition (XRD)
The first step towards implementing the above is to create a Crossplane Composite Resource Definition (XRD). The XRD will define the schema of our custom MySqlCDC
resource. In this schema, we describe which fields the resource will have, what their types are, and which of them are required.
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
name: mysqlcdcs.spoud.io
spec:
group: spoud.io
names:
kind: MySqlCDC
plural: mysqlcdcs
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
databaseHostname:
type: string
databasePort:
type: integer
default: 3306
databaseUser:
type: string
databasePassword:
type: string
databaseName:
type: string
required:
- databaseHostname
- databaseUser
- databasePassword
- databaseName
served: true
referenceable: true
claimNames:
kind: MySqlCDCClaim
plural: mysqlcdcclaims
If we were to apply the above XRD to our cluster, we would be able to create MySqlCDC
resources. However, these resources would not do anything yet. We must also define a composition that describes how the MySqlCDC
resource should be translated into the underlying infrastructure.
Note: The
claimNames
section allows us to create claims of typeMySqlCDCClaim
. In essence, a claim is a handle that points to aMySqlCDC
resource. Claims are namespace-scoped and by using them, we can avoid name collisions. For example, if we create aMySqlCDCClaim
calledusers
then Crossplane will provision aMySqlCDC
resource calledusers-XXX
whereXXX
is a random suffix (e.g.users-vww9t
). If another team creates a claim with the same name in their own namespace, then their underlyingMySqlCDC
resource will have a different suffix.
Defining the Composition
You can view the full composition manifest on github but let’s take a look at some exerpts from it to get a better understanding of how it works. Our composition spec contains two sections: resources
and patchSets
. The resources section describes the infrastructure that we want to provision in response to a MySqlCDC
resource. Here is the base definition of our KafkaUser
resource:
- name: connect-user
base:
apiVersion: kubernetes.crossplane.io/v1alpha1
kind: Object
metadata:
name: connect-user-xxx # will be patched
spec:
providerConfigRef:
name: kubernetes-provider
forProvider:
manifest:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
namespace: kafka
name: connect-user-xxx # will be patched
labels:
# let's assume that the organization has a single kafka cluster
# (otherwise we could also patch this)
strimzi.io/cluster: poc-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
- resource:
type: topic
name: "patchme" # will be patched
patternType: prefix
operations:
- Describe
- Read
- Write
- Create
- Delete
host: "*"
- resource:
type: group
name: "patchme" # will be patched
patternType: prefix
operations:
- Read
host: "*"
Notice how some of the fields are marked with # will be patched
comments. These are the fields where we want to plug in values from the MySqlCDC
resource. We will get to that in a bit. Before that, let's take a look at the following part of the definition:
apiVersion: kubernetes.crossplane.io/v1alpha1
kind: Object
metadata:
name: connect-user-xxx # will be patched
spec:
providerConfigRef:
name: kubernetes-provider
forProvider:
manifest:
...
This exerpt tells Crossplane that we want to provision a Kubernetes object. Crossplane’s Kubernetes provider detects resources of kind Object
and applies the attached manifest to the configured Kubernetes cluster. In the manifest section itself, we can see that we are defining a KafkaUser
custom resource. Strimzi will respond to the creation of a KafkaUser
resource by creating a new user in the Kafka cluster.
Underneath the base resource definition, we have a patches
section that allows us to plug in values from the MySqlCDC
resource into the KafkaUser
manifest. We apply the following patches to our KafkaUser
:
patches:
- type: CombineFromComposite # 1
toFieldPath: metadata.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s-user"
variables:
- fromFieldPath: metadata.name
- type: PatchSet # 2
patchSetName: name-patchset
- type: CombineFromComposite # 3
toFieldPath: spec.forProvider.manifest.spec.authorization.acls[0].resource.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s"
variables:
- fromFieldPath: metadata.name
- type: CombineFromComposite # 4
toFieldPath: spec.forProvider.manifest.spec.authorization.acls[1].resource.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s"
variables:
- fromFieldPath: metadata.nam
Let’s go through these patches one by one:
- This patch is of the type
CombineFromComposite
, which means that it will take a value from theMySqlCDC
composite resource and plug it into the base resource definition according to the specified strategy. In this case, we want to set themetadata.name
field of theObject
resource tomysql-cdc-connect-XXX-user
whereXXX
is the name of ourMySqlCDC
resource. - Apply the patch set called
name-patchset
. We defined this patch set earlier in thepatchSets
section. A patch set is a reusable collection of one or more patches. Thename-patchset
takes themetadata.name
field from theMySqlCDC
resource and plugs it into thename
field of the relevant resource. Because we want to use this logic for multiple resources, we defined it as a patch set so we do not have to define the same patch from scratch for each resource. Here is the fullpatchSets
section:
patchSets:
- name: name-patchset
patches:
- type: CombineFromComposite
toFieldPath: spec.forProvider.manifest.metadata.name
combine:
strategy: string
string:
fmt: "mysql-cdc-connect-%s"
variables:
- fromFieldPath: metadata.name
- This patch sets the name field of the first ACL rule to mysql-cdc-connect-XXX where XXX is the name of the database as specified in the MySqlCDC resource. So if the database is called users, then the generated ACL will allow the KafkaUser to perform all operations on topics prefixed with the string mysql-cdc-connect-users.
- Similar to the previous patch, this one generates the prefix for the second ACL rule, which will allow the
KafkaUser
to commit offsets for the relevant consumer groups.
The remaining resources in the composition follow the same pattern. We define Object
resources, which apply manifests to our Kubernetes cluster, that provision KafkaConnect
and KafkaConnector
resources. Where necessary, we apply patches to plug in values from the MySqlCDC
resource.
Putting it all together
Upon applying the above XRD and composition, we can create MySqlCDC
resources. Suppose we have a MySQL server reachable at mysql.tt-27-project.svc.cluster.local:3306
with a database called users
that we want to watch for changes.
If we apply our example MySqlCDC
resource, we can observe the provisioning of the CDC infrastructure:
$ kubectl get Object
NAME KIND PROVIDERCONFIG SYNCED READY AGE
mysql-cdc-connect-users-cluster KafkaConnect kubernetes-provider True True 50s
mysql-cdc-connect-users-connector KafkaConnector kubernetes-provider True True 50s
mysql-cdc-connect-users-user KafkaUser kubernetes-provider True True 50s
Here we can see that Object
resources have been created. These resources in turn define a KafkaConnect
cluster, a KafkaConnector
, and a KafkaUser
, which the Crossplane Kubernetes provider will provision for us.
By querying the running pods, we can see that our Connect worker image is being built:
$ kubectl get po
NAME READY STATUS RESTARTS AGE
mysql-cdc-connect-users-connect-build 1/1 Running 0 3m52s
Once the image is built, the connect cluster will start up and the connector will be deployed:
$ kubectl get po
NAME READY STATUS RESTARTS AGE
mysql-cdc-connect-users-connect-0 1/1 Running 0 3m31s
$ kubectl get KafkaConnector
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
mysql-cdc-connect-users mysql-cdc-connect-users io.debezium.connector.mysql.MySqlConnector 1 True
Finally, let us test that the connector is actually working.
$ export KAFKA_USER_PW=$(k get secret/mysql-cdc-connect-users -n kafka -o jsonpath="{ .data.password }" | base64 -d)
$ kubectl run -it --rm kafka-consumer \
--image=bitnami/kafka:latest \
--env KAFKA_USER_PW="${KAFKA_USER_PW}" \
-n kafka -- /bin/bash
Then, inside the container, run the kafka console consumer to consume the change events from the USERS
table:
$ kafka-console-consumer.sh --topic mysql-cdc-connect-users.users.USERS --from-beginning \
--consumer-property sasl.mechanism=SCRAM-SHA-512 \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"mysql-cdc-connect-users\" password=\"$KAFKA_USER_PW\";" \
--bootstrap-server poc-cluster-kafka-bootstrap:9092 \
--group mysql-cdc-connect-users-test
The consumer will now wait for new messages to arrive. If we insert a new user into the USERS
table, we will see the corresponding change event in JSON format in the console consumer. We inserted a user called Demo User
who lives at Demo Street 42
and this insertion has resulted in an event with the following payload:
...
"payload": {
"before": null,
"after": {
"name": "Demo User",
"address": "Demo Street 42"
},
"source": {
"version": "2.4.1.Final",
"connector": "mysql",
"name": "mysql-cdc-connect-users",
"ts_ms": 1703260631000,
"snapshot": "false",
"db": "users",
"sequence": null,
"table": "USERS",
"server_id": 1,
"gtid": null,
"file": "binlog.000005",
"pos": 669,
"row": 0,
"thread": 172441,
"query": null
},
"op": "c",
"ts_ms": 1703260631276,
"transaction": null
}
...
Wait, couldn’t we have done all this with Helm or Kustomize?
Users familiar with the Kubernetes ecosystem could ask why we just went through the trouble of defining a resource that provisions a resource that, in turn, finally provisions the resources that we want? Couldn’t we have simply used Kustomize or a templating solution like Helm to achieve the same effect?
The fact is that with Crossplane we are able to set up CDC without granting every user the ability to, for example, create Kafka Users. This is especially important in the world of Strimzi. As we have seen before, the permissions of the user are defined directly in the user manifest. This means that if we allow regular users to create KafkaUser
objects, which would be needed for the Helm solution, we also allow them to grant their users all kinds of permissions. This is problematic from a security standpoint. Some topics may contain confidential or personally identifiable information. We certainly do not want just anyone to be able to help themselves to this data.
With Crossplane, the resources are provisioned by the Crossplane operator in a namespace that is outside of most users’ reach. We are also able to use Kubernetes RBAC to impose granular restrictions on who may create what resources. Furthermore, the Crossplane self-service can be adapted even if we decide to move some of the infrastructure from Kubernetes to a managed solution (e.g. if we replace Strimzi with Confluent). The fact that Crossplane seamlessly integrates with Kubernetes RBAC means that we could even control through Kubernetes who is allowed to create what infrastructure outside of Kubernetes.
Conclusion
In this article, we have seen how Crossplane can be used to create a self-service API for provisioning Kafka Connectors. We have managed to hide away the complexity of setting up CDC with Kafka Connect behind a simple custom resource. While our minimal example is already sufficient to get started with CDC, we have only scratched the surface of what Crossplane can do. In our example, we have used Crossplane’s built-in templating mechanism to define our composition. The Crossplane community, however, also created ways to define compositions using Helm-like Go templates, which can be used to create more complex compositions. With this, we could, for example, add a databaseType
parameter to our composite resource and use it to configure a universal CDC solution that supports not only MySQL, but also other databases. In the end, it is up to the platform team to decide how much configurability they want to expose to the end user.
As Crossplane evolves, we will likely see more ways to make the above example even cleaner. A feature to watch out for is the EnvironmentConfig resource, which is in alpha at the time of writing. This feature will allow us to define ConfigMap-like resources that can be referenced in patches in the composition. With this resource, it will be possible to separate secrets and other (potentially shared) configuration from the MySqlCDC
resource itself.
Want to talk to the author about it? Reach out to us: https://www.linkedin.com/company/spoud.io / info@spoud.io / www.spoud.io