- Reactive Programming with Spring WebFlux
- Spring WebFlux Example using Annotation based programming
- Spring WebFlux Example using Functional Programming
Server-Sent Events Introduction
LogHandler example
Spring Boot dependencies
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>provided</scope> </dependency> </dependencies>
Configure application.properties
spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=logs
Define the data object
package com.stackstalk; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor @Document public class LogInfo { @Id private String logId; private Long logTimestamp; private String logMessage; }
Define the repository
package com.stackstalk; import org.springframework.data.mongodb.repository.ReactiveMongoRepository; public interface LogInfoRepository extends ReactiveMongoRepository<LogInfo, String> { }
Define the Handler Function
Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.
There are multiple Sinks categories:
- many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure
- many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.
- many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
- one(): a sink that will play a single element to its subscribers
- empty(): a sink that will play a terminal signal only to its subscribers (error or complete)
In this example, we will use a replay sink which will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
package com.stackstalk; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @Component public class LogInfoHandler { @Autowired private LogInfoRepository logRepository; Sinks.Many<LogInfo> logInfoSink = Sinks.many().replay().all(); public Mono<ServerResponse> addLogInfo(ServerRequest serverRequest) { return serverRequest.bodyToMono(LogInfo.class) .doOnNext(logInfo -> { logInfoSink.tryEmitNext(logInfo); }) .flatMap(l -> { return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(logRepository.save(l), LogInfo.class); }).log(); } public Mono<ServerResponse> getLogStream(ServerRequest serverRequest) { return ServerResponse.ok().contentType(MediaType.APPLICATION_NDJSON) .body(logInfoSink.asFlux(), LogInfo.class).log(); } }
Define the Router Function
package com.stackstalk; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; @Configuration public class LogInfoRouter { @Bean public RouterFunction<ServerResponse> routes(LogInfoHandler handler) { return RouterFunctions.route(RequestPredicates.POST("/logs"), handler::addLogInfo) .andRoute(RequestPredicates.GET("/stream"), handler::getLogStream); } }
Define the application
package com.stackstalk; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class LogHandlerApplication { public static void main(String[] args) { SpringApplication.run(LogHandlerApplication.class, args); } }
Putting it all together and testing
Now, let us test the application and examine. On starting the application observe that Netty server is being started on port 8080. Also see the usage of reactive MongoDB repository.. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ [32m :: Spring Boot :: [39m [2m (v2.6.2)[0;39m [2m2022-01-24 15:37:19.428[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.stackstalk.LogHandlerApplication [0;39m [2m:[0;39m Starting LogHandlerApplication using Java 16.0.2 on FAKEUSER-M-F1VD with PID 30335 (/Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler/target/classes started by fakeuser in /Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler) [2m2022-01-24 15:37:19.430[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.stackstalk.LogHandlerApplication [0;39m [2m:[0;39m No active profile set, falling back to default profiles: default [2m2022-01-24 15:37:19.821[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode. [2m2022-01-24 15:37:19.949[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Finished Spring Data repository scanning in 124 ms. Found 1 Reactive MongoDB repository interfaces. [2m2022-01-24 15:37:20.372[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36morg.mongodb.driver.cluster [0;39m [2m:[0;39m Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} [2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection [0;39m [2m:[0;39m Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017 [2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection [0;39m [2m:[0;39m Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017 [2m2022-01-24 15:37:20.612[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.cluster [0;39m [2m:[0;39m Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=120133302} [2m2022-01-24 15:37:20.961[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.b.web.embedded.netty.NettyWebServer [0;39m [2m:[0;39m Netty started on port 8080 [2m2022-01-24 15:37:20.968[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.stackstalk.LogHandlerApplication [0;39m [2m:[0;39m Started LogHandlerApplication in 1.871 seconds (JVM running for 2.632)
Open a HTTP connection on the /stream endpoint
This API is used to listen for Server-Sent Events (SSE).curl --location --request GET 'http://localhost:8080/stream'The HTTP connection doesn't close and returns the log messages whenever added. Whenever a new message is added the event is pushed to the client. Open multiple connections and observe the replay behavior whenever a new subscription is made.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}
Add a log message with a POST on /logs
This API is used to add a new log message.curl --location --request POST 'http://localhost:8080/logs' \ > --header 'Content-Type: application/json' \ > --data-raw '{ > "logId": "1", > "logTimestamp": 1642649553432, > "logMessage": "Hello. This is my first message" > }'Returns the log message added.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}In the logs observe usage of reactive signals with onSubscribe, request, onNext (once) and onComplete.
Post a Comment