Spring Reactor Tutorial

Overview

In this article, we'll get introduced to the Spring Reactor project and its importance. The idea is to take advantage of the Reactive Streams Specification to build non-blocking reactive applications on the JVM.

Using this knowledge, we'll build a simple reactive application and compare it to a traditional blocking application.

Reactive applications are the "hot new thing" making a lot of applications switch to this model. You can read more about this in The Reactive Manifesto.

Motivation

Conventional APIs are Blocking

Modern applications deal with a high number of concurrent users and data. Moore's law is no longer holding as it used to. The hardware capabilities, although increasing, are not keeping up with modern applications where performance is very important.

Java developers by default write blocking code. It's just how the API was set up. Another example would be the traditional servlet (Tomcat) approach. Each request warrants a new thread which waits for the whole background process to finish in order to send the response back.

This means that our data layer logic is blocking the application by default since Threads idly wait for a response. It's wasteful to not reuse these Threads for a different purpose, while we wait for the response to get back.

Reactor motivationCredit: http://projectreactor.io/learn

Note: This might be a problem if we have limited resources or a process takes too much time to execute.

Asynchronous still Blocks

In Java, you can write code asynchronously using Callbacks and Futures. You can then get and join threads at some later point in time and process the result. Java 8 introduced us with a new class - CompletableFuture, which makes it much easier to coordinate these things.

It works in a simple fashion - when a single process ends, another one starts. After the second one ends, the results are combined into a third process.

This makes it a lot easier to coordinate your application, but it's still ultimately blocking as it creates Threads and waits upon calling a .join() method.

Reactor motivationCredit: http://projectreactor.io/learn

Reactive Programming

What we want is asynchronous and non-blocking. A group of developers from companies like Netflix, Pivotal, RedHat, etc. got together and converged on something called The Reactive Streams Specification.

Project Reactor is Spring's implementation of The Reactive Specification and it's specifically favored by the Spring WebFlux module, although you can use it with other modules like RxJava.

The idea is to operate Asynchronously with Backpressure using Publishers and Subscribers.

Here, we're being introduced to several new concepts! Let's explain them one by one:

  • Publisher - A Publisher is a provider of a potentially unbounded number of elements.
  • Subscriber - A Subscriber listens to that Publisher, asking for new data. Sometimes, it's also referred to as a Consumer.
  • Backpressure - The ability of the Subscriber to let the Publisher know how many requests it can handle at the time. So it's the Subscriber that is responsible for the flow of the data, not the Publisher as it just provides the data.

The Reactor Project offers 2 types of publishers. These are considered the main building blocks of Spring WebFlux:

  • Flux - is a publisher that produces 0 to N values. It could be unbounded. Operations that return multiple elements use this type.
  • Mono - is a publisher that produces 0 to 1 value. Operations that return a single element use this type.

Developing Reactive Applications

With all of the above in mind, let's jump into creating a simple web application and take advantage of this new reactive paradigm!

The simplest way to start with a skeleton Spring Boot project, as always, is using Spring Initializr. Select your preferred version of Spring Boot and add the "Reactive Web" dependency. After this, generate it as a Maven project and you're all set!

Let's define a simple POJO - Greeting:

public class Greeting {
    private String msg;
    // Constructors, getters and setters
}

Defining a Publisher

Alongside it, let's define a simple REST Controller with an adequate mapping:

@RestController
public class GreetReactiveController {
    @GetMapping("/greetings")
    public Publisher<Greeting> greetingPublisher() {
        Flux<Greeting> greetingFlux = Flux.<Greeting>generate(sink -> sink.next(new Greeting("Hello"))).take(50);
        return greetingFlux;
    }
}

Calling Flux.generate() will create a never ending stream of the Greeting object.

The take() method, as the name suggests, will only take the first 50 values from the stream.

It's important to note that the return type of the method is the asynchronous type Publisher<Greeting>.

To test this endpoint, navigate your browser to http://localhost:8080/greetings or use the curl client on your command line - curl localhost:8080/greetings

You'll be prompted with a response that looks something like:

This doesn't look like that big of a deal and we could have simply returned a List<Greeting> to achieve the same visual result.

But again, notice that we are returning a Flux<Greeting>, which is an asynchronous type since that changes everything.

Suppose we had a publisher that returned more than a thousand records, or even more. Think of what the framework has to do. It's given an object of type Greeting, which it has to convert to JSON for the end user.

Had we used the traditional approach with Spring MVC, these objects would keep on accumulating in your RAM and once it collects everything it would return it to the client. This might exceed our RAM capacity and also blocks any other operation from getting processed in the meantime.

When we use Spring WebFlux, the whole internal dynamics get changed. The framework starts subscribing to these records from the publisher and it serializes each item and sends it back to the client in chunks.

We do things asynchronously without creating too many threads and reusing the threads that are waiting for something. The best part is that you don't have to do anything extra for this. In traditional Spring MVC, we could achieve the same by returning AsyncResult, DefferedResult, etc. to get some asynchronicity, but internally Spring MVC had to create a new Thread, which gets blocked since it has to wait.

Server-Sent Events

Another publisher that has been used ever since their arrival is Server-Sent Events.

These events allow a web page to get updates from a server in real-time.

Let's define a simple reactive server:

@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Greeting> sseGreetings() {
    Flux<Greeting> delayElements = Flux
            .<Greeting>generate(sink -> sink.next(new Greeting("Hello @" + Instant.now().toString())))
            .delayElements(Duration.ofSeconds(1));
    return delayElements;
}

Alternatively, we could've defined this:

@GetMapping(value = "/greetings/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Greeting> events() {
    Flux<Greeting> greetingFlux = Flux.fromStream(Stream.generate(() -> new Greeting("Hello @" + Instant.now().toString())));
    Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));
    return Flux.zip(greetingFlux, durationFlux).map(Tuple2::getT1);
}

These methods produce a TEXT_EVENT_STREAM_VALUE which essentially means that the data is being sent in the form of Server-Sent events.

Note that in the first example, we're using a Publisher and in the second example we're using a Flux. A valid question would be:

"Which return type should I use then?"

It's advised to use Flux and Mono over Publisher. Both of these classes are implementations of the Publisher interface originating from Reactive Streams. While you can use them interchangeably, it's more expressive and descriptive to use the implementations.

Free eBook: Git Essentials

Check out our hands-on, practical guide to learning Git, with best-practices, industry-accepted standards, and included cheat sheet. Stop Googling Git commands and actually learn it!

These two examples highlight two ways to create delayed server-sent events:

  • .delayElements()- This method delays each element of the Flux by the given duration
  • .zip() - We're defining a Flux to generate events, and a Flux to generate values each second. By zipping them together, we get a Flux generating event each second.

Navigate to http://localhost:8080/greetings/sse or use a curl client on your command line and you will see a response that looks something like:

Defining a Consumer

Now let's see the consumer side of it. It's worth noting that you don't need to have a reactive publisher in order to use reactive programming on the consuming side:

public class Person {
    private int id;
    private String name;
    // Constructor with getters and setters
}

And then we have a traditional RestController with a single mapping:

@RestController
public class PersonController {
    private static List<Person> personList = new ArrayList<>();
    static {
        personList.add(new Person(1, "John"));
        personList.add(new Person(2, "Jane"));
        personList.add(new Person(3, "Max"));
        personList.add(new Person(4, "Alex"));
        personList.add(new Person(5, "Aloy"));
        personList.add(new Person(6, "Sarah"));
    }

    @GetMapping("/person/{id}")
    public Person getPerson(@PathVariable int id, @RequestParam(defaultValue = "2") int delay)
            throws InterruptedException {
        Thread.sleep(delay * 1000);
        return personList.stream().filter((person) -> person.getId() == id).findFirst().get();
    }
}

We initialized a list of type Person and based on the id passed to our mapping; we filtered that person out using a stream.

You might be alarmed by the usage of Thread.sleep() here, though it's just used to simulate network lag of 2 seconds.

If you're interested in reading more about Java Streams, we've got it covered!

Let's go ahead and create our consumer. Just like the publisher, we can do this easily using Spring Initializr:

Our producer app is running on port 8080. Now let's say that we want to call the /person/{id} endpoint 5 times. We know that, by default, each response takes a 2-second delay due to "network lag".

Let's first do this using the traditional RestTemplate approach:

public class CallPersonUsingRestTemplate {

    private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingRestTemplate.class);
    private static RestTemplate restTemplate = new RestTemplate();

    static {
        String baseUrl = "http://localhost:8080";
        restTemplate.setUriTemplateHandler(new DefaultUriBuilderFactory(baseUrl));
    }

    public static void main(String[] args) {
        Instant start = Instant.now();

        for (int i = 1; i <= 5; i++) {
            restTemplate.getForObject("/person/{id}", Person.class, i);
        }

        logTime(start);
    }

    private static void logTime(Instant start) {
        logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
    }
}

Lets run it:

As expected it took a little over 10 secs and this is how Spring MVC works by default.

In this day and age, waiting for a little over 10 seconds for a result on a page is unacceptable. This is the difference between keeping a customer/client and losing it due to waiting for too long.

Spring Reactor introduced a new web client to make web requests called WebClient. Compared to RestTemplate, this client has a more functional feel and is fully reactive. It's included in the spring-boot-starter-weblux dependency and it's build to replace RestTemplate in a non-blocking way.

Let's rewrite the same controller, this time, using WebClient:

public class CallPersonUsingWebClient_Step1 {

    private static final Logger logger = LoggerFactory.getLogger(CallPersonUsingWebClient_Step1.class);
    private static String baseUrl = "http://localhost:8080";
    private static WebClient client = WebClient.create(baseUrl);

    public static void main(String[] args) {

        Instant start = Instant.now();

        for (int i = 1; i <= 5; i++) {
            client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class);
        }

        logTime(start);
    }

    private static void logTime(Instant start) {
        logger.debug("Elapsed time: " + Duration.between(start, Instant.now()).toMillis() + "ms");
    }

}

Here, we created a WebClient by passing the baseUrl. Then in the main method, we simply call the endpoint.

get() indicates that we are making a GET request. We know that the response will be a single object, so we're using a Mono as explained before.

Ultimately, we asked Spring to map the response to a Person class:

And nothing happened, as expected.

This is because we are not subscribing. The whole thing is deferred. It's asynchronous but it also doesn't kick off until we call the .subscribe() method. This is a common problem with people who are new to Spring Reactor, so keep an eye out for this.

Let's change our main method and add subscribe:

for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).subscribe();
}

Adding the method prompts us with the wanted result:

The request is sent but the .subscribe() method doesn't sit and wait for the response. Since it doesn't block, it finishes before receiving the response at all.

Could we counter this by chaining .block() at the end of the method calls?

for (int i = 1; i <= 5; i++) {
    client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class).block();
}

Result:

We did get the response this time for each person, though it took over 10 seconds. This defeats the purpose of the application being reactive.

The way to fix all of these problems is simple: We make a list of type Mono and wait for all of them to complete, rather than waiting for each one:

List<Mono<Person>> list = Stream.of(1, 2, 3, 4, 5)
    .map(i -> client.get().uri("/person/{id}", i).retrieve().bodyToMono(Person.class))
    .collect(Collectors.toList());

Mono.when(list).block();

Result:

This is what we're aiming for. This time, it took just over two seconds, even with massive network lag. This increases the efficiency of our application drastically and really is a game-changer.

If you look closely at the threads, Reactor is reusing them rather than creating new ones. This is really important if your application handles many requests in a short span of time.

Conclusion

In this article, we discussed the need for reactive programming and Spring's implementation of it – the Spring Reactor.

Afterwards, we discussed the Spring WebFlux module, that internally uses Reactor, as well as covered concepts like Publisher and Subscriber. Upon this, we built an application that publishes data as a reactive stream and consumes it in another application.

The source code for this tutorial can be found on GitHub.

Last Updated: July 27th, 2023
Was this article helpful?

Make Clarity from Data - Quickly Learn Data Visualization with Python

Learn the landscape of Data Visualization tools in Python - work with Seaborn, Plotly, and Bokeh, and excel in Matplotlib!

From simple plot types to ridge plots, surface plots and spectrograms - understand your data and learn to draw conclusions from it.

© 2013-2024 Stack Abuse. All rights reserved.

AboutDisclosurePrivacyTerms