A Short Example of Real-Time Event Streaming using Spring Webflux3 min read

Reactive

Overview

In this article, we’re going to implement a short example of Real-Time Event Streaming using Spring Webflux. This example will showcase how to create an endpoint that produces one event per second, and the client that simply logs the event as they come.

1. Maven Dependencies

Below highlighted are the key maven dependencies used in this example.

1.1 spring-boot-starter-webflux

Starter for building WebFlux applications using Spring Framework’s Reactive Web support.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>${version}</version>
</dependency>

1.2 spring-boot-starter-parent

Declaring parent boot dependency allows us to manage consistency in terms of java versions, and also in controlling versions of a dependency for all the child projects.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>${version}</version>
</dependency>

2. Entity Class

Let’s first create an Event class to store id and date.

public class Event {
	private long id;
	private Date date;

	public Event(long id, Date date) {
		this.setId(id);
		this.setDate(date);
	}

    // standard getters, setters
}

3. The Client

Here we are simply using a WebClient that acts as an entry point for requests. Inside the createWebClient() method, a WebClient is created with base URL on port 8080. Afterwards, we use a commandLineRunner and inject our WebClient into it.
Now, using this client we prepared a GET request to the server followed by the type of media(text/event-stream) that the client is accepting. Next method i.e. exchange() sends the HTTP request and returns the client response.

@SpringBootApplication
public class Client {
	
	@Bean
	WebClient createWebClient() {
		return WebClient.create("http://localhost:8080");
	}

	@Bean
	CommandLineRunner logger(WebClient webclient) {
	return args -> {
		webclient.get().uri("/StreamOfEvents")
		.accept(MediaType.TEXT_EVENT_STREAM)
                .exchange().flatMapMany(clientRes -> clientRes
                .bodyToFlux(Event.class))
		.subscribe(System.out::println);
		};
	}

	public static void main(String[] args) {
		new SpringApplicationBuilder(Client.class)
               .properties(Collections.singletonMap("server.port",
               "8081")).run(args);
	}

}

Note, that we have used bodyToFlux() method, this is simply because we are expecting flux of events.

4. The Endpoint

Our Server class i.e. Endpoint is making use of two Flux objects, the first flux being responsible for creating Events with id as currentTimeMillis() and date as a new Date() object. While another flux object is used to generate a new value every second.

@SpringBootApplication
@RestController
public class Endpoint {

	@GetMapping("/StreamOfEvents/{id}")
	Mono eventById(@PathVariable long id) {
		return Mono.just(new Event(id, new Date()));
	}

	@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, 
        value = "/StreamOfEvents")
	Flux events() {
	   Flux eventFlux = Flux.fromStream(Stream.
	   generate(() -> new Event(System.currentTimeMillis(),
	   new Date())));
	   Flux durationFlux = Flux.interval(Duration.ofSeconds(1));
	   return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
	}

	public static void main(String[] args) {
		SpringApplication.run(Endpoint.class, args);
	}
}

As there is a need to send events every second, we now need to combine both these Flux into a single return value, which is done using Flux.zip() method.

5. Running the example

In order to test the example, run both Client and Endpoint class. Then from the Command Prompt(Windows), run:

C:\Users\User>curl localhost:8080/streamOfEvents

Conclusion

This article provided with an insight into the Reactive Programming using Spring Webflux which is a Spring way of reactive programming.

We discussed in brief, how to:

  • Configure reactive client and server.
  • Use flux objects to generate a stream of events.

The full source code of this example is available on GitHub.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.