Overview
In this article, we’re going to implement a short example of Real-Time Event Streaming using Spring Webflux. This example will showcase how to create an endpoint that produces one event per second, and the client that simply logs the event as they come.
1. Maven Dependencies
Below highlighted are the key maven dependencies used in this example.
1.1 spring-boot-starter-webflux
Starter for building WebFlux applications using Spring Framework’s Reactive Web support.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <version>${version}</version> </dependency>
1.2 spring-boot-starter-parent
Declaring parent boot dependency allows us to manage consistency in terms of java versions, and also in controlling versions of a dependency for all the child projects.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${version}</version> </dependency>
2. Entity Class
Let’s first create an Event class to store id and date.
public class Event { private long id; private Date date; public Event(long id, Date date) { this.setId(id); this.setDate(date); } // standard getters, setters }
3. The Client
Here we are simply using a WebClient that acts as an entry point for requests. Inside the createWebClient() method, a WebClient is created with base URL on port 8080. Afterwards, we use a commandLineRunner and inject our WebClient into it.
Now, using this client we prepared a GET request to the server followed by the type of media(text/event-stream) that the client is accepting. Next method i.e. exchange() sends the HTTP request and returns the client response.
@SpringBootApplication public class Client { @Bean WebClient createWebClient() { return WebClient.create("http://localhost:8080"); } @Bean CommandLineRunner logger(WebClient webclient) { return args -> { webclient.get().uri("/StreamOfEvents") .accept(MediaType.TEXT_EVENT_STREAM) .exchange().flatMapMany(clientRes -> clientRes .bodyToFlux(Event.class)) .subscribe(System.out::println); }; } public static void main(String[] args) { new SpringApplicationBuilder(Client.class) .properties(Collections.singletonMap("server.port", "8081")).run(args); } }
Note, that we have used bodyToFlux() method, this is simply because we are expecting flux of events.
4. The Endpoint
Our Server class i.e. Endpoint is making use of two Flux objects, the first flux being responsible for creating Events with id as currentTimeMillis() and date as a new Date() object. While another flux object is used to generate a new value every second.
@SpringBootApplication @RestController public class Endpoint { @GetMapping("/StreamOfEvents/{id}") Mono eventById(@PathVariable long id) { return Mono.just(new Event(id, new Date())); } @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/StreamOfEvents") Flux events() { Flux eventFlux = Flux.fromStream(Stream. generate(() -> new Event(System.currentTimeMillis(), new Date()))); Flux durationFlux = Flux.interval(Duration.ofSeconds(1)); return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1); } public static void main(String[] args) { SpringApplication.run(Endpoint.class, args); } }
As there is a need to send events every second, we now need to combine both these Flux into a single return value, which is done using Flux.zip() method.
5. Running the example
In order to test the example, run both Client and Endpoint class. Then from the Command Prompt(Windows), run:
C:\Users\User>curl localhost:8080/streamOfEvents
Conclusion
This article provided with an insight into the Reactive Programming using Spring Webflux which is a Spring way of reactive programming.
We discussed in brief, how to:
- Configure reactive client and server.
- Use flux objects to generate a stream of events.
The full source code of this example is available on GitHub.