Integrating Azure Identities with Confluent Cloud Workload Identities

SPOUD
19 min readMay 21, 2024

In the realm of cloud services, securing access is crucial. This guide provides a straightforward approach to integrating Azure Managed Identities with Confluent Cloud Workload Identities. Azure Managed Identities offer a secure, credential-free method for authenticating Azure services, while Confluent Cloud brings managed event streaming capabilities, essential for modern, data-driven applications. This integration simplifies identity management, ensuring secure and seamless communication between Azure services and Confluent Cloud. This post, based on direct experience, will navigate through configuring Azure components and setting up Confluent Cloud to work with Azure identities.

Prerequisites for Integrating Azure Managed Identities with Confluent Cloud

Before diving into the integration process, ensure you have the following prerequisites in place:

  1. Azure Subscription: An active Azure subscription is necessary to access Azure resources and services. If you don’t have one, you can sign up for a free Azure account.
  2. Confluent Cloud Account: A Confluent Cloud account is required to manage and configure Workload Identities. Sign up at [Confluent Cloud](https://www.confluent.io/confluent-cloud/).
  3. Confluent Cloud Standard or Dedicated Cluster: On Confluent Cloud workload identities are only supported on Standard and Dedicated clusters
  4. Azure Active Directory (AD): Ensure Azure AD is set up in your Azure account for managing user identities and permissions.
  5. Permissions: Adequate permissions in Azure AD to create and manage Enterprise Applications, App Registrations, and Managed Identities.
  6. Basic Understanding of Identity and Access Management (IAM): A foundational understanding of IAM concepts, especially in the context of Azure and Confluent Cloud, is helpful.
  7. Virtual Machine Setup: Knowledge of how to set up and configure a virtual machine in Azure.
  8. Having these prerequisites in place will ensure a smooth and successful integration process.

For a detailed explanation on each prerequisite and its significance in the integration process, refer to [Azure Documentation](https://docs.microsoft.com/azure/) and [Confluent Cloud Documentation](https://docs.confluent.io/cloud/current/).

Terms

There is a good chance to mess up some terms since “Workload Identity” is used by Confluent Cloud and Azure for two different things. Confluent Cloud uses the term for the configuration of Identity Providers and Identity Pools. In Azure it’s a feature to assign a _anaged Identity to a Kubernetes Service Account. In this guide we will use the term “Workload Identity” mostly to describe this. We refer to the feature in Confluent Cloud with “Confluent Cloud Workload Identities”.

Setting Up Azure Components

Before we begin, we need to set up an App Registration. With this App Registration, we will be able to request JWTs from Azure, which can then be used to get a user principal in Confluent Cloud. If you also wish to use SAML-based SSO (Single Sign on) for your users, you can create an Enterprise Application. This Enterprise Application will create an App Registration which can also be used for OIDC. Otherwise, you can skip the following two steps as it should be sufficient to create the App Registration directly.

Creating an Enterprise Application

Navigate to Azure AD to set up an Enterprise Application. This application will act as a link between Azure AD and Confluent Cloud for SAML-based single sign-on (SSO).

From there click Create your own application to create a new Enterprise Application:

Configuring SAML

Enable Single sign-on on Confluent Cloud:

In the Azure Enterprise Application settings under Single sign-on, configure SAML-based authentication. Fill in the details given from Confluent Cloud from above.

Download the saml metadata from Azure and upload it to Confluent Cloud then finish the process on confluent cloud.

Now you should be able to single sign-on a user from your azure ad. If you create a test user make sure to assign an e-mail address.

Confluent cloud documentation: https://docs.confluent.io/cloud/current/access-management/authenticate/sso/overview.html

Configure App Registration

For Workload Identities to work it is important to enable the token version v2 for Azure JWT tokens. In order to configure this, navigate to the Manifest of your App Registration. There you can edit the XML Json definition of the App Registration.

Set the accessTokenAcceptedVersion to 2 as well as acceptMappedClaims to true

Note: If you already have tokens from the App Registration from before the change, they are cached and not recreated. You can create a new Managed Identity in order to get a new token immediately, or you need to wait for the cache to expire (24h is the TTL).

Configuring Confluent Cloud Workload Identities

In order to use the Workload Identities feature in Confluent Cloud you will need the App Registration from above.

Start by adding the Identity Provider.

Fill in the Azure Tenant ID. Then you can import the URIs from the Tenant.

Be careful with JWKS since it is not importing correctly if you want to use claims mapping. Then it needs to include the app id like:

https://login.microsoftonline.com/<TenantID>/discovery/keys?appid=<AppId>;

Identity Pools

After the Identity Provider is registered, you can create Identity Pools. Identity Pools define a filter expression on the OIDC claims. If the filter applies on the requested Identity Pool, the ACLs and roles are applied.

Note: by default, there is a limit of 100 Identity Pools per Identity Provider on Confluent Cloud.

Example Pool:

Identity pools are mapped to user principals in Kafka clusters

Identity pools are mapped to user principals in Kafka clusters. The form is like this: User:<IdentityPoolId> e.g. User:pool-7yzk. It’s important to know, that Identity Pools are not meant to be secret or hidden. They can be guessed easily. So in order to secure access to the cluster you need to define the appropriate filter rules on each Identity Pool.

Create ACLs

You can set ACLs on Identity Pools. For example set the permissions for the Identity Pool pool-7yzk to read / write all topics on the cluster lkc-w25n5g with a prefix filter on the topic name acl-test:

confluent kafka acl create --cluster lkc-w25n5g  --principal User:pool-7yzk --allow --prefix --topic acl-test --operations describe,describe-configs,read,write

This returns the following output:

Principal    | Permission |    Operation     | Resource Type | Resource Name | Pattern Type
-----------------+------------+------------------+---------------+---------------+---------------
User:pool-7yzk | ALLOW | DESCRIBE | TOPIC | acl-test | PREFIXED
User:pool-7yzk | ALLOW | DESCRIBE_CONFIGS | TOPIC | acl-test | PREFIXED
User:pool-7yzk | ALLOW | READ | TOPIC | acl-test | PREFIXED
User:pool-7yzk | ALLOW | WRITE | TOPIC | acl-test | PREFIXED

We can also check the ACLs for the Identity Pool:

confluent kafka acl list --cluster lkc-w25n5g  --principal User:pool-7yzk

which returns the same output as above.

E2E Verification

With the settings from above any user within the Azure Tenant (issuer claim contains TenantID) can read all topics on our test cluster.

In order to verify we create a topic called test and we want to run the kafka-topics.sh command on a virtual machine with the azure Managed Identity login.

The following command will list the topics at the end of the guide:

kafka-topics.sh --bootstrap-server pkc-p11xm.us-east-1.aws.confluent.cloud:9092 --command-config client.properties --list

Setup User-Assigned Managed Identity

First we need to create a user-assigned Managed Identity for our VM. The advantage of this type of identity is that it is not tied to the lifecycle of the individual virtual machines and can be used by, for example, multiple such VMs.

If, instead of using a user-assigned Managed Identity, you prefer to employ a system-assigned Managed Identity, whose lifecycle is tied to that of the virtual machine, you can skip this step. This will slightly change the API calls that we need to do in the following sections. In such cases, we highlight the differences to the user-assigned Managed Identity approach.

Once the identity is created, we go to the Managed Identity and copy its Client ID:

Setup virtual machine

Note: Kubernetes pod-identities would work the same way, therefore we will not cover this here.

Next create a Virtual Machine in the same region as the Managed Identity. Here for demonstration purpose we use Linux (ubuntu 20.04).

Under the VM settings you’ll find the point Identity there you can add the previously created Managed Identity.

This will allow you to query the Azure Instance Metadata Service from within the VM to get a token for the Managed Identity. See https://learn.microsoft.com/en-us/azure/virtual-machines/instance-metadata-service?tabs=linux for more information.

Install Kafka Tools

Make sure you have installed Java and the kafka cli tools.

sudo apt update && sudo apt install -y jq default-jre
cd /opt
sudo wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz -O kafka.tgz
sudo tar xzvf kafka.tgz
cd -
echo "export PATH=\$PATH:/opt/kafka_2.13-3.6.1/bin/" >> ~/.bashrc
source ~/.bashrc

Verify token

Now we can log in to the VM over ssh or azure cli ssh. From within the VM we can get an access token from the Azure Instance Metadata Service as our Kafka client will do eventually. For this run the following curl command.

curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5e20030f-3a3c-4aea-83bc-e6aa925a148f&client_id=2c10dec2-da22-424a-b2ab-3c01766e37f3' -H Metadata:true -s

Where resource query parameter is the Application (client) ID found in the App Registration metadata and client_id is the Client ID of your user-assigned Managed Identity. If you want to use a system assigned Managed Identity, simply omit the client_id parameter. Take care to also omit this parameter in the following code blocks where we use the same curl call.

You should see a Json output like the following:

{"access_token":"eyJ0eXAiOiJ...","token_type":"Bearer"}

With some jq magic you can have it parsed:

curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5e20030f-3a3c-4aea-83bc-e6aa925a148f&client_id=2c10dec2-da22-424a-b2ab-3c01766e37f3' -H Metadata:true -s \
| jq -r '.access_token | split(".") | .[0],.[1] | @base64d | fromjson'

This results in the decoded JWT token like this:

{
"typ": "JWT",
"alg": "RS256",
"kid": "w0adWJKQMACS2gTkXGXha_vyF4M"
}
{
"aud": "5e20030f-3a3c-4aea-83bc-e6aa925a148f",
"iss": "https://login.microsoftonline.com/b52245db-4800-4792-975a-1d9ed49512f2/v2.0",
"iat": 1705930970,
"nbf": 1705930970,
"exp": 1706017670,
"aio": "E2VgYCi4UeUvsCN5qU/UDp4Nbays09ROGPxkiNRw3M6cxeouuA8A",
"azp": "2c10dec2-da22-424a-b2ab-3c01766e37f3",
"azpacr": "2",
"oid": "61965619-0270-4683-bc43-554e4872bae6",
"rh": "0.Aa4A20UitQBIkkeXWh2e1JUS8g8DIF48OupKg7zmqpJaFI-rAAA.",
"sub": "61965619-0270-4683-bc43-554e4872bae6",
"tid": "b52245db-4800-4792-975a-1d9ed49512f2",
"uti": "dAxBlF29vkKBfY1oDbeBAA",
"ver": "2.0"
}

Make sure you have version 2 token "ver": "2.0". If you have a v1 token, follow the instructions above to set up a v2 token.

Accessing the REST API

You can now access the Kafka cluster on the Kafka protocol or over the REST API.

In order to run a curl request against the REST API run the following command.

curl -H "Confluent-Identity-Pool-Id: pool-7yzk" -H "Authorization: Bearer $(
curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5e20030f-3a3c-4aea-83bc-e6aa925a148f&client_id=2c10dec2-da22-424a-b2ab-3c01766e37f3' -H Metadata:true -s | jq -r ".access_token"
)" https://pkc-p11xm.us-east-1.aws.confluent.cloud:443/kafka/v3/clusters/lkc-w25n5g/topics

This should return a list of Kafka topics the Managed Identity is allowed to see by the access rules on the requested Identity Pool.

Here we need to set the Confluent-Identity-Pool-Id header to the Identity Pool ID we want to use. In this case pool-7yzk.

Access Kafka on the Kafka protocol

In order to use the Managed Identity we need to configure the Kafka client with a configuration that tells it all the details.

With Confluent Cloud we are going to use SASL_SSL security protocol. Then we need to define the token endpoint with the correct resource (App Registration) and Client ID (Managed Identity).

The Managed Identity callback handler needs to be installed and configured. see https://github.com/confluentinc/confluent-oauth-extensions

Unfortunately the handler is not part of the official Kafka distribution. So we need to install it manually.

git clone https://github.com/confluentinc/confluent-oauth-extensions.git
cd confluent-oauth-extensions
gradle jar

# for a vm:
scp -o PubkeyAuthentication=no build/libs/confluent-oauth-extensions-1.1-SNAPSHOT.jar adminuser@<IP>:/home/adminuser
#within the vm:
sudo cp confluent-oauth-extensions-1.0-SNAPSHOT.jar /opt/kafka_2.13-3.6.1/libs/
# for kubernetes pod
k cp build/libs/confluent-oauth-extensions-1.1-SNAPSHOT.jar pod:/opt/kafka/libs/

In the jaas config we need to address the logical cluster name and identity pool we want to use.

The following command will create the needed config file.

cat > client.properties <<EOF
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5e20030f-3a3c-4aea-83bc-e6aa925a148f&client_id=2c10dec2-da22-424a-b2ab-3c01766e37f3
sasl.login.callback.handler.class=io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='ignored' \
clientSecret='ignored' \
extension_logicalCluster='lkc-w25n5g' \
extension_identityPoolId='pool-7yzk';
EOF

Once you have this config, it’s time to check whether this works. Make sure that you have the uploaded jar file in the CLASSPATH of the Kafka client.

Run the command kafka-topics.sh --bootstrap-server pkc-p11xm.us-east-1.aws.confluent.cloud:9092 --command-config client.properties --list.

This should return a list of topics the Managed Identity is allowed to see by the access rules on the requested Identity Pool.

Use Workload Identities on Kubernetes

In order to use Workload Identities you need to register your Kubernetes cluster in Azure. For testing, you can follow the guide here: https://learn.microsoft.com/en-us/azure/aks/learn/tutorial-kubernetes-workload-identity Once you have the Kubernetes cluster registered, you can create a Workload Identity and assign it to a kubernetes service account.

With the following command you can create a pod which will be assigned the Managed Identity:

cat << EOF | kubectl diff -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-toolbox
spec:
replicas: 1
selector:
matchLabels:
app: 'kafka-toolbox'
template:
metadata:
labels:
app: 'kafka-toolbox'
spec:
serviceAccountName: ${SERVICE_ACCOUNT_NAME}
containers:
- name: kafka-toolbox
image: spoud/kafka-toolbox
command: ["/bin/sh"]
args: ["-c", "while true; do sleep 30; done"]
imagePullPolicy: Always
volumeMounts:
- name: kubernetes-federation-token
mountPath: /var/run/secrets/tokens
env:
- name: AZURE_CLIENT_ID
value: ${USER_ASSIGNED_CLIENT_ID}
- name: AZURE_TENANT_ID
value: ${AZURE_TENANT_ID}
- name: CONFLUENT_CLOUD_APP_ID
value: ${CONFLUENT_CLOUD_APP_ID}
- name: AZURE_FEDERATED_TOKEN_FILE
value: "/var/run/secrets/tokens/kubernetes-federation-token"
- name: AZURE_AUTHORITY_HOST
value: "https://login.microsoftonline.com/"
nodeSelector:
kubernetes.io/os: linux
volumes:
- name: kubernetes-federation-token
projected:
sources:
- serviceAccountToken:
path: kubernetes-federation-token
expirationSeconds: 3000 # AAD will not accept if more than 1 hour the service account token is valid for a complete day, so we cannot directly use
audience: api://AzureADTokenExchange
EOF

Then you can use the Workload Identity in your Kubernetes cluster to access Confluent Cloud.

Connect to the kubernetes pod:

k exec -ti kafka-toolbox-547bd896-bxs58 -- bash

Verify the token

IDENTITY_TOKEN=$(cat /var/run/secrets/tokens/kubernetes-federation-token)
curl -v -s --location --request GET "$AZURE_AUTHORITY_HOST/$AZURE_TENANT_ID/oauth2/v2.0/token" \
--form 'grant_type="client_credentials"' \
--form 'client_id="'$AZURE_CLIENT_ID'"' \
--form 'scope="'$CONFLUENT_CLOUD_APP_ID'/.default"' \
--form 'client_assertion_type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer"' \
--form 'client_assertion="'$IDENTITY_TOKEN'"' > token.json
jq -r '.access_token | split(".") | .[0],.[1] | @base64d | fromjson' token.json

This should return a token with the version 2.0.

{
"typ": "JWT",
"alg": "RS256",
"kid": "w0adWJKQMACS2gTkXGXha_vyF4M"
}
{
"aud": "5e20030f-3a3c-4aea-83bc-e6aa925a148f",
"iss": "https://login.microsoftonline.com/b52245db-4800-4792-975a-1d9ed49512f2/v2.0",
"iat": 1713873580,
"nbf": 1713873580,
"exp": 1713960280,
"aio": "E2NgYLg/szm2JG4Lj2v1JXVvJifRHXZWb3OzOr9HH2nc4v7+4D0A",
"azp": "da4ec9ae-b5c9-46be-96a2-077a500ad963",
"azpacr": "2",
"oid": "5012af54-d7eb-4729-8771-c0cdc188189c",
"rh": "0.Aa4A20UitQBIkkeXWh2e1JUS8g8DIF48OupKg7zmqpJaFI-rAAA.",
"sub": "5012af54-d7eb-4729-8771-c0cdc188189c",
"tid": "b52245db-4800-4792-975a-1d9ed49512f2",
"uti": "Tytb4JIXckqx1Rpn47XmAA",
"ver": "2.0"
}

Access Kafka

Now you can access the Kafka cluster with the token from the Managed Identity.

cat > client.properties <<EOF
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=${AZURE_AUTHORITY_HOST}${AZURE_TENANT_ID}/oauth2/v2.0/token
sasl.login.callback.handler.class=io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler

sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='ignored' \
clientSecret='ignored' \
useWorkloadIdentity='true' \
scope='${CONFLUENT_CLOUD_APP_ID}/.default' \
extension_logicalCluster='lkc-68v788' \
extension_identityPoolId='pool-7yzk';
EOF

Then you can run the following command to list the topics:

kafka-topics.sh --bootstrap-server pkc-12576z.us-west2.gcp.confluent.cloud:9092 --command-config client.properties --list

This should return a list of topics the Managed Identity is allowed to see by the access rules on the requested Identity Pool.

Troubleshooting

In case you get an error

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler for configuration sasl.login.callback.handler.class: Class io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler could not be found.

this means that the confluent-oauth-extensions jar is missing from the classpath. Check whether you have it installed and it’s in the right place.

ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (pkc-p11xm.us-east-1.aws.confluent.cloud/34.233.240.208:9092) failed authentication due to: {"status":"JWT_PROCESSING_FAILED"} (org.apache.kafka.clients.NetworkClient)

This means that you most likely send a v1 JWT to Confluent Cloud. Check the token versions and App Registration settings.

JWT_HEADER_KID_LOOKUP_FAILED

This means most likely that the JWKS is not set correctly. Check whether you find the kid from your token in the given JWKS URL configured on Confluent Cloud Identity Provider.

Enable Debug logging

In case something is not working you may want to enable the debug logging for the Kafka cli tools by

  • Copy the /opt/kafka/config/tools-log4j.properties e.g. into your home directory
  • Change the log4j.rootLogger=WARN, stderr to log4j.rootLogger=DEBUG, stderr
  • Make sure the tools use it by setting the KAFKA_OPTS variable export KAFKA_OPTS="-Dlog4j.configuration=file:/home/adminuser/tools-log4j.properties"
  • Make sure that schema registry and avro tools are using the new log configuration by setting the SCHEMA_REGISTRY_LOG4J_OPTS variable export SCHEMA_REGISTRY_LOG4J_OPTS="-Dlog4j.configuration=file:/home/adminuser/tools-log4j.properties"

Access Schema Registry

Using the same token of the Managed Identity we can also access the Schema Registry in Confluent Cloud. For this to work we need to set the target-sr-cluster header to the logical cluster name of the Schema Registry as well as the header for the identity pool Confluent-Identity-Pool-Id.

curl -H 'Confluent-Identity-Pool-Id: pool-7yzk' -H 'target-sr-cluster: lsrc-kjn9nv' -H "Authorization: Bearer ${TOKEN}" https://psrc-1wydj.us-east-2.aws.confluent.cloud/subjects

Kafka Client configuration

In order to use the Schema Registry with the Kafka client you need to set the schema.registry.url property in the client configuration. Then we also need (for now) a custom implementation of the credential source. This is because the Schema Registry client does not support the Managed Identity out of the box and also the SASL_OAUTHBEARER_INHERIT credential source seems not to support the sasl.login.callback.handler.class property.

The implementation can be found here https://github.com/spike83/confluent-oauth-extensions/tree/aks-workload-id-support

echo '{"make": "Ford", "model": "Mustang", "price": 10000}' |kafka-avro-console-producer --bootstrap-server pkc-12576z.us-west2.gcp.confluent.cloud:9092 \
--property schema.registry.url=https://psrc-1wydj.us-east-2.aws.confluent.cloud \
--property bearer.auth.credentials.source='CUSTOM' \
--property bearer.auth.custom.provider.class=io.confluent.oauth.azure.managedidentity.RegistryBearerAuthCredentialProvider \
--property bearer.auth.logical.cluster='lsrc-kjn9nv' \
--producer.config client.properties --reader-config client.properties --topic cars \
--property value.schema='{"type": "record", "name": "Car", "namespace": "io.spoud.training", "fields": [{"name": "make", "type": "string"}, {"name": "model", "type": "string"}, {"name": "price", "type": "int", "default": 0}]}'

This will register the schema in the Schema Registry and produce an Avro message to the topic cars.

kafka-avro-console-consumer --bootstrap-server pkc-12576z.us-west2.gcp.confluent.cloud:9092 \
--property schema.registry.url=https://psrc-1wydj.us-east-2.aws.confluent.cloud \
--property bearer.auth.credentials.source='CUSTOM' \
--property bearer.auth.custom.provider.class=io.confluent.oauth.azure.managedidentity.RegistryBearerAuthCredentialProvider \
--property bearer.auth.logical.cluster='lsrc-kjn9nv' \
--consumer.config client.properties --formatter-config client.properties --topic cars \
--from-beginning --group test1

This will consume the messages from the topic cars.

Use Confluent Cloud Api

In order to use the Confluent Cloud API with the Managed Identity, you need to get a token from the Confluent Cloud STS API. To do so call the following endpoint with the following parameters:

curl --request POST --url https://api.confluent.cloud/sts/v1/oauth2/token \
--header 'content-type: application/x-www-form-urlencoded' \
--data grant_type=urn:ietf:params:oauth:grant-type:token-exchange \
--data subject_token=$(
curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5e20030f-3a3c-4aea-83bc-e6aa925a148f' -H Metadata:true -s | jq -r ".access_token"
) \
--data identity_pool_id=pool-7yzk \
--data subject_token_type=urn:ietf:params:oauth:token-type:jwt \
--data requested_token_type=urn:ietf:params:oauth:token-type:access_token \
| jq -r '.access_token | split(".") | .[0],.[1] | @base64d | fromjson'

Here we get the following token back:

{
"alg": "RS256",
"jku": "https://auth-static.confluent.io/jwks",
"kid": "3597f47b-b718-11ee-9855-d6392fd35e8b",
"typ": "JWT"
}
{
"organizationId": <OrgId>,
"orgResourceId": "<OrgResourceId>",
"userId": 0,
"userResourceId": "pool-7yzk",
"externalIdentityId": "crn://confluent.cloud/organization=<OrgResourceId>/identity-provider=op-Nz4/identity=5c409794-7d52-4a0a-bf67-e7f2cdd4ed43",
"scope": "https://api.confluent.cloud/",
"aud": "CONTROL_PLANE",
"exp": 1706105689,
"jti": "3c3bc113-b34a-4a26-98b0-60f7771a7fa5",
"iat": 1706104789,
"iss": "Confluent",
"sub": "pool-7yzk",
"may_act": {
"principals": [
"pool-7yzk"
]
}
}

With this token we need to use to call the Confluent Cloud API like in this example:

token=$(
curl --request POST --url https://api.confluent.cloud/sts/v1/oauth2/token \
--header 'content-type: application/x-www-form-urlencoded' \
--data grant_type=urn:ietf:params:oauth:grant-type:token-exchange \
--data subject_token=$(
curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5e20030f-3a3c-4aea-83bc-e6aa925a148f' -H Metadata:true -s | jq -r ".access_token"
) \
--data identity_pool_id=pool-7yzk \
--data subject_token_type=urn:ietf:params:oauth:token-type:jwt \
--data requested_token_type=urn:ietf:params:oauth:token-type:access_token \
| jq -r '.access_token'
)

curl -X GET 'https://api.confluent.cloud/org/v2/environments' \
--header "Authorization: Bearer ${token}"

With the workload identity this would look like this:

IDENTITY_TOKEN=$(cat /var/run/secrets/tokens/kubernetes-federation-token)
azure_token=$(curl -v -s --location --request GET "$AZURE_AUTHORITY_HOST/$AZURE_TENANT_ID/oauth2/v2.0/token" \
--form 'grant_type="client_credentials"' \
--form 'client_id="'$AZURE_CLIENT_ID'"' \
--form 'scope="'$CONFLUENT_CLOUD_APP_ID'/.default"' \
--form 'client_assertion_type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer"' \
--form 'client_assertion="'$IDENTITY_TOKEN'"' | \
jq -r '.access_token'
)
token=$(
curl --request POST --url https://api.confluent.cloud/sts/v1/oauth2/token \
--header 'content-type: application/x-www-form-urlencoded' \
--data grant_type=urn:ietf:params:oauth:grant-type:token-exchange \
--data subject_token=${azure_token} \
--data identity_pool_id=pool-7yzk \
--data subject_token_type=urn:ietf:params:oauth:token-type:jwt \
--data requested_token_type=urn:ietf:params:oauth:token-type:access_token \
| jq -r '.access_token'
)
curl -X GET 'https://api.confluent.cloud/org/v2/environments' \
--header "Authorization: Bearer ${token}"

Audit log

If you look into the audit log of your Confluent Cloud organization you can see the following entries:

For the User Assigned Managed Identity:

{
"datacontenttype": "application/json",
"data": {
"serviceName": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"methodName": "kafka.Authentication",
"resourceName": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"authenticationInfo": {
"principal": "User:pool-7yzk",
"metadata": {
"mechanism": "SASL_SSL/OAUTHBEARER",
"identifier": "61965619-0270-4683-bc43-554e4872bae6"
},
"principalResourceId": "pool-7yzk",
"identity": "crn://confluent.cloud/organization=<OrgResourceId>/identity-provider=op-Nz4/identity=8c01c49c-5cc8-470a-831c-2e017be4cc97"
},
"requestMetadata": {
"connection_id": "170593838857300087"
},
"result": {
"status": "SUCCESS"
},
"clientAddress": [
{
"ip": "IP"
}
]
},
"subject": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"specversion": "1.0",
"id": "c860c844-ca8d-4282-aeb0-5896338e8a15",
"source": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"time": "2024-01-22T15:46:30.468632921Z",
"type": "io.confluent.kafka.server/authentication"
}

where the identifier is the object id of the Managed Identity in Azure Entra ID.

For the System Assigned Managed Identity:

{
"datacontenttype": "application/json",
"data": {
"serviceName": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"methodName": "kafka.Authentication",
"resourceName": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"authenticationInfo": {
"principal": "User:pool-7yzk",
"metadata": {
"mechanism": "SASL_SSL/OAUTHBEARER",
"identifier": "5c409794-7d52-4a0a-bf67-e7f2cdd4ed43"
},
"principalResourceId": "pool-7yzk",
"identity": "crn://confluent.cloud/organization=<OrgResourceId>/identity-provider=op-Nz4/identity=5c409794-7d52-4a0a-bf67-e7f2cdd4ed43"
},
"requestMetadata": {
"connection_id": "170593818112000023"
},
"result": {
"status": "SUCCESS"
},
"clientAddress": [
{
"ip": "IP"
}
]
},
"subject": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"specversion": "1.0",
"id": "208a40c0-a5b4-4b1d-ab0a-d214aa95e890",
"source": "crn://confluent.cloud/organization=<OrgResourceId>/environment=env-0jzjnp/cloud-cluster=lkc-w25n5g/kafka=lkc-w25n5g",
"time": "2024-01-22T15:43:02.196118060Z",
"type": "io.confluent.kafka.server/authentication"
}

Where the identifier is the object id of the virtual machine in Azure Entra ID.

Refreshing the credentials

Since we deal with short-lived credentials with lifetime of a couple of hours, we need to refresh the credentials. This will be done for us automatically by the Kafka client. We can observe the refresh in log messages in the Kafka client logs as follows:

[2024-04-20 09:48:51,731] INFO Initiating re-login for 5012af54-d7eb-4729-8771-c0cdc188189c, logout() still needs to be called on a previous login = true (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-20 09:48:51,943] INFO [Principal=5012af54-d7eb-4729-8771-c0cdc188189c]: Expiring credential valid from Sat Apr 20 09:43:51 UTC 2024 to Sun Apr 21 09:48:51 UTC 2024 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-20 09:48:51,943] INFO [Principal=:5012af54-d7eb-4729-8771-c0cdc188189c]: Expiring credential re-login sleeping until: Sun Apr 21 05:35:09 UTC 2024 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-21 05:35:09,134] INFO Initiating re-login for 5012af54-d7eb-4729-8771-c0cdc188189c, logout() still needs to be called on a previous login = true (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-21 05:35:09,417] INFO [Principal=5012af54-d7eb-4729-8771-c0cdc188189c]: Expiring credential valid from Sun Apr 21 05:30:09 UTC 2024 to Mon Apr 22 05:35:09 UTC 2024 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-21 05:35:09,417] INFO [Principal=:5012af54-d7eb-4729-8771-c0cdc188189c]: Expiring credential re-login sleeping until: Mon Apr 22 01:45:08 UTC 2024 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-22 01:45:08,895] INFO Initiating re-login for 5012af54-d7eb-4729-8771-c0cdc188189c, logout() still needs to be called on a previous login = true (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-22 01:45:09,126] INFO [Principal=5012af54-d7eb-4729-8771-c0cdc188189c]: Expiring credential valid from Mon Apr 22 01:40:08 UTC 2024 to Tue Apr 23 01:45:08 UTC 2024 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2024-04-22 01:45:09,126] INFO [Principal=:5012af54-d7eb-4729-8771-c0cdc188189c]: Expiring credential re-login sleeping until: Mon Apr 22 21:39:44 UTC 2024 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)

Where 5012af54-d7eb-4729-8771-c0cdc188189c is the id of the Managed Identity.

Questions

Q: Can we create an identity pool for each application if we have lets say 2000 applications in an organization?

A: There is a low limit on the Identity Pools you can create per Identity Provider. But this is adjustable limit. You would need to contact Confluent Support to increase it.

Q: Can developers with their personal Azure accounts use the Identity Pool?

A: In order to make this work the developer needs to have a way of getting a token of the App Registration. This can be done with Azure CLI when the Azure CLI is allowed to do so by adding ClientId 04b07795-8ddb-461a-bbee-02f9e1bf7b46 to the App Registration. Then the developer can use the Azure CLI to get a token for the App Registration and use this token to get a token for the Identity Pool. The user needs to be able to access the Confluent Cloud Cluster and log in with Azure CLI on the same machine. This may require some additional setup depending on your network setup. A PoC example is available here: https://github.com/spike83/confluent-oauth-extensions/tree/poc-user-credential

Conclusion

As we conclude our exploration into integrating Azure Managed Identities with Confluent Cloud Workload Identities, it’s evident that this synergy unlocks new potentials in cloud identity management. This guide has walked you through the intricacies of configuring Azure components and aligning them with Confluent Cloud, setting a foundation for secure, streamlined cloud operations.

The integration of Azure Managed Identities with Confluent Cloud Workload Identities represents more than just a technical configuration; it signifies a strategic approach to cloud security and identity management. By leveraging Azure’s robust identity capabilities alongside Confluent Cloud’s event streaming services, organizations can achieve a higher level of security and efficiency.

Key Takeaways:

  • The seamless integration enhances security, eliminating the need for manual credential management.
  • It simplifies operational complexities, making cloud management more efficient.
  • The guide provides a practical roadmap for IT professionals and developers for implementing this integration in their cloud architectures.

Looking forward, the landscape of cloud computing continues to evolve rapidly. Staying abreast of such integrations is crucial for maintaining secure, efficient, and modern cloud environments. This guide serves as a stepping stone for further experimentation and exploration in the vast domain of cloud technologies.

We encourage you to continue exploring the capabilities of Azure Managed Identities and Confluent Cloud, pushing the boundaries of what can be achieved in your cloud solutions. For more in-depth information, ongoing updates, and advanced techniques, refer to the Azure Documentation and Confluent Cloud Documentation.

Thank you for embarking on this journey of discovery and integration with us. We hope this guide has empowered you with the knowledge and confidence to enhance your cloud solutions, and we look forward to seeing how you apply these learnings in your cloud endeavors.

References

https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/overview.html

https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/configure-clients-oauth.html

https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/access-rest-apis.html

https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/best-practices.html

https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/access-rest-apis.html#use-the-ak-rest-api-with-your-identity-provider

https://docs.confluent.io/cloud/current/api.html#section/Authentication/External-OAuth

https://www.adamkielar.pl/posts/how-to-use-an-azure-ad-workload-identity-on-azure-kubernetes-service/

https://github.com/confluentinc/confluent-oauth-extensions

https://github.com/nniikkoollaaii/workload-identity-kafka-sasl-oauthbearer

https://github.com/spike83/confluent-oauth-extensions/tree/aks-workload-id-support

--

--