Reactive Streams with Quarkus and Kafka In this exercise, you will use the Quarkus Kafka extension to build a streaming application using MicroProfile Reactive Streams Messaging and Apache Kafka, a distributed streaming platform. You will also use Strimzi, which provides an easy way to run an Apache Kafka cluster on Kubernetes and OpenShift. Strimzi is an Open Source project which is also the upstream of Red Hats supported version of Kafka - Streams for Apache Kafka. In this excersise we will use Strimzi in a single node kafka cluster setup. What is Apache Kafka? Apache Kafka is a distributed streaming platform. A streaming platform has three key capabilities: Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. Store streams of records in a fault-tolerant durable way. Process streams of records as they occur. Kafka is generally used for two broad classes of applications: Building real-time streaming data pipelines that reliably get data between systems or applications Building real-time streaming applications that transform or react to the streams of data What is Streams for Apache Kafka? Streams for Apache Kafka simplifies the process of running Apache Kafka in an OpenShift cluster. It provides container images and Operators for running Kafka on OpenShift. Streams for Kafka Operators are fundamental to the running of the Kafka cluster. The Operators provided are purpose-built with specialist operational knowledge to effectively manage Kafka. Operators simplify the process of: Deploying and running Kafka clusters Deploying and running Kafka components Configuring access to Kafka Securing access to Kafka Upgrading Kafka Managing brokers Creating and managing topics Creating and managing users The Goal As mentioned earlier, in this exercise, we will not use the operator but install Strimzi as a single node kafka cluster. In the excersise we are going to generate (random) names in one component. These names are written in a Kafka topic (names). A second component reads from the names Kafka topic and applies some magic conversion to the name (adding an honorific). The result is sent to an in-memory stream consumed by a JAX-RS resource. The data is sent to a browser using server-sent events and displayed in the browser. It will look like this: Create Kafka Cluster In the kafka.yaml file in the src/main/kubernetes folder paste the below: apiVersion: v1 kind: Service metadata: name: names-cluster-kafka-bootstrap labels: app: names-cluster strimzi.io/cluster: names-cluster spec: ports: - port: 9092 protocol: TCP targetPort: 9092 selector: app: names-cluster type: ClusterIP --- apiVersion: apps/v1 kind: Deployment metadata: name: names-cluster (1) labels: app.openshift.io/runtime: amq strimzi.io/cluster: names-cluster app.kubernetes.io/part-of: strimzi-names-cluster spec: selector: matchLabels: app: names-cluster template: metadata: labels: app: names-cluster spec: containers: - name: names-cluster image: quay.io/strimzi/kafka:latest-kafka-3.9.0 (2) command: - /bin/sh - -c - 'export CLUSTER_ID=$(./bin/kafka-storage.sh random-uuid) && ./bin/kafka-storage.sh format -t $CLUSTER_ID -c ./config/kraft/server.properties && ./bin/kafka-server-start.sh ./config/kraft/server.properties --override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS}' env: - name: LOG_DIR value: /tmp/logs - name: KAFKA_ADVERTISED_LISTENERS value: PLAINTEXT://names-cluster-kafka-bootstrap:9092 resources: limits: memory: "1024Mi" cpu: "1000m" ports: - containerPort: 9092 1 Indicates that the name of our Kafka cluster 2 Indicates that the Strimzi image used for our Kafka cluster Then run the following command in the terminal in Dev Spaces: oc apply -f src/main/kubernetes/kafka.yaml This will install the kafka cluster in your namespace. You will be able to see it in the Topology view and follow the creation of the cluster. Make sure the Kafka instance pod is up and running (with dark blue circle): Kafka Topics Usually you also need to create the Kafka topics needed, to make it simpler in this excersise we will rely on Kafkas abiblity to auto create topics if not existing. This is something you can use in a local or test setup, not something we recommend to use in a production setup. In this case Kafka will create a topic for our application to stream to and from - called names If you want to check the creation of your topic(or create other topics) after running the test later in this excersise you can do that by running the below commands. First run this command: oc get pods | grep names-cluster This will leave you with a response similar to this: names-cluster-8578688d8-jcml8 1/1 Running 0 34m Copy the indentifier after names-cluster- (in this case 8578688d8-jcml8 ) and run the command but with your unique identifiers instead of the xxxxxxxxx-xxxxx: oc rsh names-cluster-xxxxxxxxx-xxxxx You can verify the topic is created by listing the topics with this command: bin/kafka-topics.sh --list --bootstrap-server localhost:9092 This will list the topics for you in the terminal. If you want to create another topic the below command will achieve that, change the name in the end of the command: Then lastly you create the topic by running this command: bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic someothername Add Quarkus Kafka Extension With Kafka installed, turn your attention back to the app. Like other exercises, we’ll need another extension to integrate with Kafka. Install it with: mvn quarkus:add-extension -Dextensions="messaging-kafka" You should see: [INFO] [SUCCESS] ✅ Extension io.quarkus:quarkus-smallrye-reactive-messaging-kafka has been installed This will add the necessary entries in your pom.xml to bring in the Kafka extension. The Application You Will Build The app consists of 3 components that pass messages via Kafka and an in-memory stream, then uses SSE to push messages to the browser. It looks like: Create name generator To start building the app, create a new Java class file in the org.acme.people.stream called NameGenerator.java. This class will generate random names and publish them to our Kafka topic for further processing. Use this code: package org.acme.people.stream; import jakarta.enterprise.context.ApplicationScoped; import org.acme.people.utils.CuteNameGenerator; import org.eclipse.microprofile.reactive.messaging.Outgoing; import io.smallrye.mutiny.Multi; import java.time.Duration; @ApplicationScoped public class NameGenerator { @Outgoing("generated-name") (1) public Multi<String> generate() { (2) return Multi.createFrom().ticks().every(Duration.ofSeconds(5)) .onOverflow().drop() .map(tick -> CuteNameGenerator.generate()); } } 1 Instruct Reactive Messaging to dispatch the items from returned stream to generated-name 2 The method returns Reactive Stream emitting a random name every 5 seconds The method returns a Reactive Stream. The generated items are sent to the stream named generated-name. This stream is mapped to Kafka using the application.properties file that we will create soon. Add honorifics The name converter reads the names from Kafka, and transforms them, adding a random (English) honorific to the beginning of the name. Create a new Java class file in the same package called NameConverter.java. Use this code: package org.acme.people.stream; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; import io.smallrye.reactive.messaging.annotations.Broadcast; @ApplicationScoped public class NameConverter { private static final String[] honorifics = {"Mr.", "Mrs.", "Sir", "Madam", "Lord", "Lady", "Dr.", "Professor", "Vice-Chancellor", "Regent", "Provost", "Prefect"}; @Incoming("names") (1) @Outgoing("my-data-stream") (2) @Broadcast (3) public String process(String name) { String honorific = honorifics[(int)Math.floor(Math.random() * honorifics.length)]; return honorific + " " + name; } } 1 Indicates that the method consumes the items from the names topic 2 Indicates that the objects returned by the method are sent to the my-data-stream stream 3 Indicates that the item are dispatched to all subscribers The process method is called for every Kafka record from the names topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream. Expose to front end Finally, let’s bind our stream to a JAX-RS resource. Create a new Java class in the same package called NameResource.java. Use this code: package org.acme.people.stream; import org.eclipse.microprofile.reactive.messaging.Channel; import org.reactivestreams.Publisher; import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; /** * A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events. */ @Path("/names") public class NameResource { @Inject @Channel("my-data-stream") Publisher<String> names; (1) @GET @Path("/stream") @Produces(MediaType.SERVER_SENT_EVENTS)(2) public Publisher<String> stream() { (3) return names; } } 1 Injects the my-data-stream stream using the @Channel qualifier 2 Indicates that the content is sent using Server Sent Events 3 Returns the stream (Reactive Stream) There is a pre-created names.html page for you to use (in the src/main/resources/META-INF/resources directory) which will make a request to this /names/stream endpoint using standard JavaScript running in the browser and draw the resulting names using the D3.js library. The JavaScript that makes this call looks like this (do not copy this into anything!): var source = new EventSource("/names/stream"); (1) source.onmessage = function (event) { (2) console.log("received new name: " + event.data); // process new name in event.data // ... // update the display with the new name update(); (3) }; 1 Uses your browser’s support for the EventSource API (part of the W3C SSE standard) to call the endpoint 2 Each time a message is received via SSE, react to it by running this function 3 Refresh the display using the D3.js library Configure application We need to configure the Kafka connector. This is done in the application.properties file (in the src/main/resources directory). The keys are structured as follows: mp.messaging.[outgoing|incoming].{channel-name}.property=value The channel-name segment must match the value set in the @Incoming and @Outgoing annotation: generated-name → sink to which we write the names names → source from which we read the names Add the following values to the app’s src/main/resources/application.properties: # Configure the Kafka sink (we write to it) %prod.mp.messaging.outgoing.generated-name.bootstrap.servers=names-cluster-kafka-bootstrap:9092(1) %prod.mp.messaging.outgoing.generated-name.connector=smallrye-kafka %prod.mp.messaging.outgoing.generated-name.topic=names %prod.mp.messaging.outgoing.generated-name.value.serializer=org.apache.kafka.common.serialization.StringSerializer # Configure the Kafka source (we read from it) %prod.mp.messaging.incoming.names.bootstrap.servers=names-cluster-kafka-bootstrap:9092(1) %prod.mp.messaging.incoming.names.connector=smallrye-kafka %prod.mp.messaging.incoming.names.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer %prod.uarkus.openshift.route.tls.insecure-edge-termination-policy=None %prod.quarkus.openshift.route.tls.termination=edge 1 The hostnames you see here will only make sense (be resolvable via DNS) when this app is run in the same Kubernetes namespace as the Kafka cluster you created earlier. So you’ll see this and other config values above prefixed with %prod which will not try to initialize Kafka when in dev mode. More details about this configuration is available on the Producer configuration and Consumer configuration section from the Kafka documentation. What about my-data-stream? This is an in-memory stream, not connected to a message broker. Rebuild and redeploy app to OpenShift Run the following command which will build and deploy using the OpenShift extension: mvn clean package -DskipTests && \ oc label deployment/people app.kubernetes.io/part-of=people --overwrite && \ oc label dc/postgres-database app.kubernetes.io/part-of=people --overwrite && \ oc annotate deployment/people app.openshift.io/connects-to=postgres-database --overwrite && \ oc rollout status -w deployment/people The build should take a minute or two to complete. Test Our application should be up and running in a few seconds after the build completes and generating names. To see if it’s working, acces the names-page. The URL can be found by running this command: oc get route people -o=go-template --template='https://{{ .spec.host }}/names.html {{printf "\n"}}' Open a new browser tab and paste the result to view the graphical name cloud powered by Quarkus, MicroProfile and Kafka The url should look something like this: https://people-user-dev.apps.sandbox-m2.ll9k.p1.openshiftapps.com/names.html You should see a cloud of names updating every 5 seconds (it may take a few seconds for it to start!): It takes a few seconds to establish the connection to Kafka. If you don’t see new names generated every 5 seconds, reload the browser page to re-initialize the SSE stream. These are the original names streamed through Kafka, altered to add a random honorific like Sir or Madam, and displayed in a word cloud for you to enjoy! Congratulations! This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications. If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus. Event-driven Messaging Monitoring with Prometheus and Grafana