Asynchronous messaging between beans

Quarkus allows different beans to interact using asynchronous messages, enforcing loose-coupling. The messages are sent to virtual addresses. It offers 3 types of delivery mechanism:

  • Point-to-Point - send the message, one consumer receives it. If several consumers listen to the address, a round robin is applied;

  • Publish/Subscribe - publish a message, all the consumers listening to the address are receiving the message;

  • Request/Reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion

All these delivery mechanism are non-blocking, and provide one of the fundamental building blocks of reactive systems which promise better performance, reduced developer burden, better isolation between services, and improved recovery from failure.

The asynchronous message passing feature in Quarkus allows replying to messages — which is not supported by Reactive Messaging. However, it is limited to single-event behavior (no stream) and to local messages.

Add extension

This mechanism uses the Vert.x EventBus, so you need to enable the vertx extension to use this feature. Add the extension in the Terminal using this command:

mvn quarkus:add-extension -Dextensions="vertx"

You should see:

[INFO] [SUCCESS] ✅  Extension io.quarkus:quarkus-vertx has been installed

Eclipse Vert.x is a toolkit for building reactive applications. It is designed to be lightweight and embeddable. Vert.x defines a reactive execution model and provides a large ecosystem. Quarkus integrates Vert.x to implement different reactive features, such as asynchronous message passing (the subject of this exercise), and non-blocking HTTP client. Basically, Quarkus uses Vert.x as its reactive engine. While lots of reactive features from Quarkus don’t show Vert.x, it’s used underneath. But you can also access the managed Vert.x instance and benefit from the Vert.x ecosystem.

Create RESTful resource

We’ll start by creating a new asynchronous endpoint. Open the PersonResource class and add a new field which will provide access to the Vert.x event bus which is used to pass messages between components:

    @Inject EventBus bus;

You’ll also need to add more import statements at the top:

import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.inject.Inject;

Next, create two new endpoints in the same class which creates new people in our database given a name, and finds people by their name:

    @POST
    @Path("/{name}")
    public Uni<Person> addPerson(String name) {
          return bus.<Person>request("add-person", name)
                .onItem().transform(response -> response.body());
    }

    @GET
    @Path("/name/{name}")
    public Person byName(String name) {
        return Person.find("name", name).firstResult();
    }
1 send the name to the add-person address
2 when we get the reply, extract the body and send this as response to the user

And add the imports:

import io.smallrye.mutiny.Uni;
import jakarta.ws.rs.POST;

This code uses Mutiny reactive types. If you are not familiar with Mutiny, check out Mutiny - an intuitive reactive programming library.

This uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the JAX-RS endpoint, we are sending a message. This message is consumed by another bean and the response is sent using the reply mechanism.

The EventBus object provides methods to:

  • Send a message to a specific address - one single consumer receives the message

  • Publish a message to a specific address - all consumers receive the messages

  • Send a message and expect reply

With this endpoint we can POST to the /person/joe endpoint to create a new user given the name.

Try it, and fail

With our endpoint implemented and our app still running in Live Coding mode, confirm the new endpoint fails using the Terminal to execute:

curl -i -X POST http://localhost:8080/person/joe

This will fail with an 500 Internal Server Error. If you look at the Live Coding terminal, you’ll also see the reason:

ERROR [org.jbo.res.res.i18n] (executor-thread-1) RESTEASY002020: Unhandled asynchronous exception, sending back 500: (NO_HANDLERS,-1) No handlers for address add-person

We posted the message to the Vert.x event bus at the add-person address, but there’s nothing to receive it!

Create consumer

Create a new class file in the org.acme.people.service package called PersonService.java. Use the following code to implement our message consumer:

package org.acme.people.service;

import java.time.LocalDate;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

import org.acme.people.model.EyeColor;
import org.acme.people.model.Person;

import io.quarkus.vertx.ConsumeEvent;

@ApplicationScoped
public class PersonService {

    @ConsumeEvent(value = "add-person", blocking = true) (1)
    @Transactional
    public Person addPerson(String name) {
        LocalDate birth = LocalDate.now().plusWeeks(Math.round(Math.floor(Math.random() * 20 * 52 * -1)));
        EyeColor color = EyeColor.values()[(int)(Math.floor(Math.random() * EyeColor.values().length))];
        Person p = new Person();
        p.birth = birth;
        p.eyes = color;
        p.name = name;
        Person.persist(p); (2)
        return p; (3)
    }

}
1 By default, the code consuming the event must be non-blocking, as it’s called on the Vert.x event loop. Since our method will block to wait for the transaction, we use blocking = true to force this consumer to be run in a worker thread.
2 A new Person entity is created and persisted
3 The return value of a method annotated with @ConsumeEvent is used as response to the incoming message.

This bean receives the name, and creates a new Person entity and persists it, and then echos back the name (or a well defined failure if things go wrong).

Let’s try our test again:

curl -s -X POST http://localhost:8080/person/joe  | jq

You should get back Joe!

{
  "id": 1004,(1)
  "birth": "2000-03-15",
  "eyes": "BROWN",(2)
  "name": "joe"
}
1 The id may be different since its auto-generated
2 The eye color you see here may be difference, since it’s randomly generated in the addPerson() method you added!

Now let’s re-confirm Joe is present:

curl -s http://localhost:8080/person/name/joe | jq

You should also get back Joe!

{
  "id": 1004,
  "birth": "2000-03-15",
  "eyes": "BROWN",(1)
  "name": "joe"
}
1 The eye color you see here may be difference, since it’s randomly generated in the addPerson() method you added!

To better understand, let’s detail how the HTTP request/response has been handled:

  1. The request is received by the addPerson method

  2. a message containing the desired name is sent to the event bus

  3. Another bean receives this message and computes the response

  4. This response is sent back using the reply mechanism

  5. Once the reply is received by the sender, the content is written to the HTTP response

Congratulations!

In this exercise you learned how Quarkus allows different beans to interact using asynchronous messages. We’ll take this to the next level in the next exercise.