在Spring WebFlux WEB服务中使用服务器发送事件
在Spring WebFlux WEB服务中,使用Server-Sent Events。
達成的目標
使用Spring WebFlux构建支持SSE的WEB服务来增进理解。
实现
在本地环境(Ubuntu)上构建和启动Spring WebFlux应用程序。
使用curl命令从Web服务访问并确认Server-Sent Events的功能。
技术背景
什么是Spring WebFlux?
这是用于开发异步和反应式web应用程序的Spring Framework模块。
Spring WebFlux的主要特点和优势如下:
异步和反应式的请求处理
Spring WebFlux使用称为响应式流的机制来异步处理请求。这样可以在提供较少资源的情况下实现较高的吞吐量。
WebFlux服务器
Spring WebFlux包含了基于Netty的WebFlux服务器。这个服务器可以实现较高的吞吐量和较低的延迟。
支持Reactive Streams API
Spring WebFlux符合被称为Reactive Streams API的规范,可以与其他使用Reactive Streams API的库进行集成。这可以实现更灵活、可扩展的应用程序开发。
Server-Sent Events (SSE) 是什么?
服务器推送事件(SSE)是一种专门用于服务器向客户端进行单向实时通信的技术,服务器可以实时推送更新信息给客户端。通过维持长时间连接,客户端可以实时接收来自服务器的更新信息。
SSE 主要用于需要实时信息传递的 Web 应用程序。例如,以下情况可能会使用 SSE:
社交媒体的实时更新
在像社交媒体这样的 Web 应用程序中,需要实时更新用户发布的新信息以及所关注的人的活动信息。使用 SSE,服务器可以每次接收到新信息都将该信息推送给客户端。
股票信息的实时更新
在像股票市场这样的行业中,需要实时信息,如股票价格的波动。使用 SSE,可以实时传递股票价格和交易更新信息。
在线游戏的实时更新
在线游戏需要实时传递玩家的行动和其他玩家的动作等实时信息。使用 SSE,可以实时传递信息给玩家。
这些只是示例,在需要实时信息传递的各种类型的 Web 应用程序中,可能会使用 SSE。
开发环境
-
- Windows 11 Home 22H2 を使用しています。
WSL の Ubuntu を操作していきますので macOS の方も参考にして頂けます。
> wsl –version
WSL版本:1.0.3.0
内核版本:5.15.79.1
WSLg版本:1.0.47Ubuntu
$ lsb_release -a
没有可用的LSB模块。
发行商ID:Ubuntu
描述:Ubuntu 22.04.1 LTS
版本:22.04
Java JDK ※ 最小構成 Java JDK的安装和Hello World!
$ java -version
openjdk版本“11.0.17”2022-10-18
OpenJDK运行时环境(构建11.0.17+8-post-Ubuntu-1ubuntu222.04)
OpenJDK 64位服务器VM(构建11.0.17+8-post-Ubuntu-1ubuntu222.04,混合模式,共享)
Maven ※ 最小構成 Maven的安装和Hello World!
$ mvn -version
Apache Maven 3.6.3
Maven主目录:/usr/share/maven
Java版本:11.0.17,供应商:Ubuntu,运行时:/usr/lib/jvm/java-11-openjdk-amd64
在这篇文章中,主要是通过Ubuntu的终端进行操作。
创建Spring MVC WEB服务时需要考虑的[并发处理]。
※ 我们将使用Spring Boot进行创建。
※ 这是为了比较同步处理和异步处理而创建的,请忽略不需要的部分?♂️
使用curl命令来验证Spring MVC应用程序
※ 我将从另一个终端确认。
※ 为了方便阅读,我在部分文字间加入了换行。
如果获取到对象(Map)
$ curl -v http://localhost:8080/one
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /one HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Tue, 21 Feb 2023 10:52:42 GMT
<
* Connection #0 to host localhost left intact
{"message":"Hello Object!"}
日志文件
2023-02-21 19:52:30.673 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-02-21 19:52:30.682 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.248 seconds (JVM running for 1.459)
2023-02-21 19:52:40.319 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-02-21 19:52:40.321 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-02-21 19:52:42.338 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.getOne:26 - Sending message: {message=Hello Object!}
可以推测该线程名为”http-nio-8080-exec-1″的线程是Spring MVC中Tomcat处理的线程。
如果获取到一个列表
$ curl -v http://localhost:8080/list
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /list HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Tue, 21 Feb 2023 10:41:12 GMT
<
* Connection #0 to host localhost left intact
[{"message":"Hello List 1!"},{"message":"Hello List 2!"},{"message":"Hello List 3!"},{"message":"Hello List 4!"},{"message":"Hello List 5!"}]
日志文件
2023-02-21 19:40:50.087 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-02-21 19:40:50.095 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.194 seconds (JVM running for 1.405)
2023-02-21 19:41:02.171 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-02-21 19:41:02.174 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-02-21 19:41:04.192 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 1!}
2023-02-21 19:41:06.196 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 2!}
2023-02-21 19:41:08.197 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 3!}
2023-02-21 19:41:10.198 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 4!}
2023-02-21 19:41:12.200 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 5!}
可以推测出,从线程名http-nio-8080-exec-1来看,该线程是Spring MVC中Tomcat处理的线程。
假设数据会以一次获取的方式在客户端(curl)端时间经过一段时间后被获取。
使用Spring WebFlux创建非同步处理的WEB服务
※ 我们会使用Spring Boot来创建。
创建项目文件夹
假设~/tmp/async-spring-webflux是项目文件夹。
$ cd ~
$ mkdir -p tmp/async-spring-webflux
$ cd ~/tmp/async-spring-webflux
创建应用程序类
※ 为了简化构成,我描述了所有的要素。
$ mkdir -p src/main/java/com/example/springwebflux
$ vim src/main/java/com/example/springwebflux/SpringbootApplication.java
文件的内容 de
package com.example.springwebflux;
import java.time.Duration;
import java.util.Map;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@CrossOrigin(origins = "*", allowedHeaders = "*")
@RestController
@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
@GetMapping("/mono")
public Mono<Map<String, String>> getMono() {
return Mono.just(
Map.of("message", "Hello Mono!"))
.delayElement(Duration.ofSeconds(2))
.doOnNext(map -> log.info("Sending message: {}", map));
}
@GetMapping("/flux")
public Flux<Map<String, String>> getFlux() {
return Flux.range(1, 5)
.map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
.delayElements(Duration.ofSeconds(2))
.doOnNext(map -> log.info("Sending message: {}", map));
}
@GetMapping("/flux-sse")
public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
return Flux.range(1, 5)
.map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
.delayElements(Duration.ofSeconds(2))
.doOnNext(map -> log.info("Sending message: {}", map))
.map(map -> ServerSentEvent.builder(map).build())
.concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
}
}
※ 每个处理都设置了2秒的延迟。
※ 提示:如果使用Spring WebFlux在服务器端实现SSE,需要明确地向接收端的客户端发送 “event: end”。
Mono<Map<String, String>> 是表示单个异步值的 Reactive Streams 类型。在这种情况下,它表示单个 Map 对象。
Mono.just(Map.of(“message”, “Hello Mono!”)) 创建一个具有 “message” 键和其值的 Map 对象,并将其包装在 Mono 中返回。
.delayElement(Duration.ofSeconds(2)) 是将 Mono 转换为在响应中延迟 2 秒的操作符。
.doOnNext(map -> log.info(“Sending message: {}”, map)) 是添加副作用以在每次值被通知时将消息输出到日志的操作符。
因此,该端点将在等待 2 秒后返回包含 “Hello Mono!” 消息的 Map 对象作为单个值。然后,日志将输出类似于 “Sending message: {message=Hello Mono!}” 的消息。
给客户端返回多个异步值响应的端点
@GetMapping(“/flux”) 注释将接收 HTTP GET 请求,并映射到 “/flux” 端点。
Flux<Map<String, String>> 是表示多个异步值的 Reactive Streams 类型。在这种情况下,它表示多个 Map 对象。
Flux.range(1, 5) 生成一个包含从 1 到 5 的整数的 Flux。
.map(idx -> Map.of(“message”, “Hello Flux ” + idx + “!”)) 是用于创建包含带有 “message” 键和其值的 Map 对象的操作符。
.delayElements(Duration.ofSeconds(2)) 是一个操作符,用于将每个元素延迟 2 秒。
.doOnNext(map -> log.info(“Sending message: {}”, map)) 是添加副作用以在每次值被通知时将消息输出到日志的操作符。
因此,该端点将返回包含以 2 秒间隔通知的带有 “Hello Flux {index}!” 消息的 Map 对象的多个值。然后,日志将输出类似于 “Sending message: {message=Hello Flux {index}!}” 的消息。
使用 Server-Sent Events(SSE)协议异步发送数据给客户端的端点
@GetMapping(“/flux-sse”) 注释将接收 HTTP GET 请求,并映射到 “/flux-sse” 端点。
Flux<ServerSentEvent<Map<String, String>>> 是表示多个异步值的 Reactive Streams 类型,并使用 Server-Sent Events 格式。在这种情况下,它表示多个 Map 对象。
Flux.range(1, 5) 生成一个包含从 1 到 5 的整数的 Flux。
.map(idx -> Map.of(“message”, “Hello Flux ” + idx + “!”)) 是用于创建包含带有 “message” 键和其值的 Map 对象的操作符。
.delayElements(Duration.ofSeconds(2)) 是一个用于将每个元素延迟 2 秒的操作符。
.doOnNext(map -> log.info(“Sending message: {}”, map)) 是添加副作用以在每次值被通知时将消息输出到日志的操作符。
.map(map -> ServerSentEvent.builder(map).build()) 是用于将每个 Map 对象转换为 ServerSentEvent 对象的操作符。
.concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event(“end”).build())) 是一个操作符,用于在流的末尾添加一个 ServerSentEvent 对象。这样可以发送一个表示 SSE 流结束的 event: end 标记。
因此,该端点将以 2 秒间隔通知的带有 “Hello Flux {index}!” 消息的 Map 对象作为多个值,使用 Server-Sent Events 格式返回。然后,日志将输出类似于 “Sending message: {message=Hello Flux {index}!}” 的消息。
创建pom.xml
$ vim pom.xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.7.8
com.example
sync-spring-webflux
1.0
sync-spring-webflux
11
UTF-8
org.springframework.boot
spring-boot-starter-webflux
org.projectlombok
lombok
1.18.26
provided
ch.qos.logback
logback-core
1.2.6
ch.qos.logback
logback-classic
1.2.6
org.slf4j
slf4j-api
1.7.30
app
org.springframework.boot
spring-boot-maven-plugin
创建Spring Boot配置文件
- application.properties
创建日志设置文件
- logback-spring.xml
※ 您可以参考之前的文章来创建上述两个设置文件。
运行Spring WebFlux应用程序
Java 应用程序构建
$ mvn clean install
按下Ctrl + C即可停止Java应用程序。
$ mvn spring-boot:run
用curl命令确认Spring WebFlux应用程序。
※ 我会从另一个终端确认。
※ 为了方便阅读,我增加了一些换行。
如果以普通的HTTP请求单色进行请求的情况下
$ curl -v http://localhost:8080/mono
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /mono HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
< Content-Length: 25
<
* Connection #0 to host localhost left intact
{"message":"Hello Mono!"}
日志文件
2023-02-21 19:09:36.821 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 19:09:36.829 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.095 seconds (JVM running for 1.321)
2023-02-21 19:09:44.125 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$0:46 - Sending message: {message=Hello Mono!}
从线程名为 “parallel-1” 可以推断出 Spring WebFlux 在并行处理线程上处理 Mono。
如果用普通的HTTP请求Flux
$ curl -v http://localhost:8080/flux
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
<
* Connection #0 to host localhost left intact
[{"message":"Hello Flux 1!"},{"message":"Hello Flux 2!"},{"message":"Hello Flux 3!"},{"message":"Hello Flux 4!"},{"message":"Hello Flux 5!"}]
日志文件
2023-02-21 18:57:26.755 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 18:57:26.763 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.092 seconds (JVM running for 1.314)
2023-02-21 18:57:39.522 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 1!}
2023-02-21 18:57:41.527 [INFO ] [parallel-2] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 2!}
2023-02-21 18:57:43.529 [INFO ] [parallel-3] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 3!}
2023-02-21 18:57:45.531 [INFO ] [parallel-4] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 4!}
2023-02-21 18:57:47.533 [INFO ] [parallel-5] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 5!}
从标题为”parallel-n”的线程中可以推测出,Spring WebFlux使用并行处理线程以每2秒处理一次Flux。
在客户端(curl)端,数据将在一段时间后一次性获取。
如果使用 Server-Sent Events 指定的 HTTP 请求 Flux
※ 添加”Accept: text/event-stream” 头部。
$ curl -v http://localhost:8080/flux-sse -H 'Accept: text/event-stream'
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux-sse HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: text/event-stream
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: text/event-stream;charset=UTF-8
<
data:{"message":"Hello Flux 1!"}
data:{"message":"Hello Flux 2!"}
data:{"message":"Hello Flux 3!"}
data:{"message":"Hello Flux 4!"}
data:{"message":"Hello Flux 5!"}
event:end
* Connection #0 to host localhost left intact
日志文件
2023-02-21 19:25:11.977 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 19:25:11.985 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.092 seconds (JVM running for 1.313)
2023-02-21 19:25:18.690 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 1!}
2023-02-21 19:25:20.708 [INFO ] [parallel-2] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 2!}
2023-02-21 19:25:22.712 [INFO ] [parallel-3] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 3!}
2023-02-21 19:25:24.717 [INFO ] [parallel-4] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 4!}
2023-02-21 19:25:26.722 [INFO ] [parallel-5] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 5!}
※ Spring WebFlux 使用并行处理线程来处理Flux,每2秒处理一次。
※ 客户端(curl)以实时方式逐个获取数据。
概括
-
- Spring WebFlux で Server-Sent Events を実装する初歩的なWEBサービスを実装することが出来ました。
- 今後この Server-Sent Events を使用するクライアント側のアプリを実装して理解を深ようと思いました。
请提供需要修改的原文。
-
- W3C Server-Sent Events
-
- HTML Living Standard 9.2 Server-sent events
- MDN サーバー送信イベント