在Spring WebFlux WEB服务中使用服务器发送事件

在Spring WebFlux WEB服务中,使用Server-Sent Events。

達成的目標

使用Spring WebFlux构建支持SSE的WEB服务来增进理解。

实现

在本地环境(Ubuntu)上构建和启动Spring WebFlux应用程序。
使用curl命令从Web服务访问并确认Server-Sent Events的功能。

技术背景

什么是Spring WebFlux?

这里展示给您。Spring WebFlux

这是用于开发异步和反应式web应用程序的Spring Framework模块。
Spring WebFlux的主要特点和优势如下:

异步和反应式的请求处理
Spring WebFlux使用称为响应式流的机制来异步处理请求。这样可以在提供较少资源的情况下实现较高的吞吐量。

WebFlux服务器
Spring WebFlux包含了基于Netty的WebFlux服务器。这个服务器可以实现较高的吞吐量和较低的延迟。

支持Reactive Streams API
Spring WebFlux符合被称为Reactive Streams API的规范,可以与其他使用Reactive Streams API的库进行集成。这可以实现更灵活、可扩展的应用程序开发。

Server-Sent Events (SSE) 是什么?

您可以展开查看这里。服务器推送事件

服务器推送事件(SSE)是一种专门用于服务器向客户端进行单向实时通信的技术,服务器可以实时推送更新信息给客户端。通过维持长时间连接,客户端可以实时接收来自服务器的更新信息。
SSE 主要用于需要实时信息传递的 Web 应用程序。例如,以下情况可能会使用 SSE:

社交媒体的实时更新
在像社交媒体这样的 Web 应用程序中,需要实时更新用户发布的新信息以及所关注的人的活动信息。使用 SSE,服务器可以每次接收到新信息都将该信息推送给客户端。

股票信息的实时更新
在像股票市场这样的行业中,需要实时信息,如股票价格的波动。使用 SSE,可以实时传递股票价格和交易更新信息。

在线游戏的实时更新
在线游戏需要实时传递玩家的行动和其他玩家的动作等实时信息。使用 SSE,可以实时传递信息给玩家。
这些只是示例,在需要实时信息传递的各种类型的 Web 应用程序中,可能会使用 SSE。

开发环境

    • Windows 11 Home 22H2 を使用しています。

WSL の Ubuntu を操作していきますので macOS の方も参考にして頂けます。

WSL(Microsoft Store应用版)
> wsl –version
WSL版本:1.0.3.0
内核版本:5.15.79.1
WSLg版本:1.0.47Ubuntu
$ lsb_release -a
没有可用的LSB模块。
发行商ID:Ubuntu
描述:Ubuntu 22.04.1 LTS
版本:22.04

Java JDK ※ 最小構成 Java JDK的安装和Hello World!
$ java -version
openjdk版本“11.0.17”2022-10-18
OpenJDK运行时环境(构建11.0.17+8-post-Ubuntu-1ubuntu222.04)
OpenJDK 64位服务器VM(构建11.0.17+8-post-Ubuntu-1ubuntu222.04,混合模式,共享)

Maven ※ 最小構成 Maven的安装和Hello World!
$ mvn -version
Apache Maven 3.6.3
Maven主目录:/usr/share/maven
Java版本:11.0.17,供应商:Ubuntu,运行时:/usr/lib/jvm/java-11-openjdk-amd64

在这篇文章中,主要是通过Ubuntu的终端进行操作。

创建Spring MVC WEB服务时需要考虑的[并发处理]。

※ 我们将使用Spring Boot进行创建。
※ 这是为了比较同步处理和异步处理而创建的,请忽略不需要的部分?‍♂️

请看下面的部署。 创建项目文件夹。 ※将 ~/tmp/sync-spring-mvc 作为项目文件夹。 $ cd ~ $ mkdir -p tmp/sync-spring-mvc $ cd ~/tmp/sync-spring-mvc 创建应用程序类。 ※为了简化结构,将所有元素都写在这里。 $ mkdir -p src/main/java/com/example/springmvc $ vim src/main/java/com/example/springmvc/SpringbootApplication.java 文件内容 SpringbootApplication.java package com.example.springmvc; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @SpringBootApplication public class SpringbootApplication { public static void main(String[] args) { SpringApplication.run(SpringbootApplication.class, args); } @GetMapping(“/one”) public Map<String, String> getOne() throws InterruptedException { Map<String, String> map = Map.of(“message”, “Hello Object!”); TimeUnit.SECONDS.sleep(2); log.info(“Sending message: {}”, map); return map; } @GetMapping(“/list”) public List<Map<String, String>> getList() throws InterruptedException { List<Map<String, String>> list = List.of( Map.of(“message”, “Hello List 1!”), Map.of(“message”, “Hello List 2!”), Map.of(“message”, “Hello List 3!”), Map.of(“message”, “Hello List 4!”), Map.of(“message”, “Hello List 5!”)); list.forEach(map -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { log.error(e.getMessage()); throw new RuntimeException(e); } log.info(“Sending message: {}”, map); }); return list; } } ※每个处理都设置了2秒的延迟。 创建pom.xml $ vim pom.xml 文件内容 pom.xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.7.8 com.example sync-spring-mvc 1.0 sync-spring-mvc 11 UTF-8 org.springframework.boot spring-boot-starter-webflux org.projectlombok lombok 1.18.26 provided ch.qos.logback logback-core 1.2.6 ch.qos.logback logback-classic 1.2.6 org.slf4j slf4j-api 1.7.30 app org.springframework.boot spring-boot-maven-plugin 创建Spring Boot配置文件 application.properties 创建日志配置文件 logback-spring.xml ※可以参考以前的文章创建上述两个配置文件。 运行Spring MVC应用程序 Java应用程序构建 $ mvn clean install 启动Java应用程序(使用Ctrl + C停止) $ mvn spring-boot:run

使用curl命令来验证Spring MVC应用程序

※ 我将从另一个终端确认。
※ 为了方便阅读,我在部分文字间加入了换行。

如果获取到对象(Map)

$ curl -v http://localhost:8080/one
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /one HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Tue, 21 Feb 2023 10:52:42 GMT
<
* Connection #0 to host localhost left intact
{"message":"Hello Object!"}

日志文件

2023-02-21 19:52:30.673 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-02-21 19:52:30.682 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.248 seconds (JVM running for 1.459)
2023-02-21 19:52:40.319 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-02-21 19:52:40.321 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-02-21 19:52:42.338 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.getOne:26 - Sending message: {message=Hello Object!}

可以推测该线程名为”http-nio-8080-exec-1″的线程是Spring MVC中Tomcat处理的线程。

如果获取到一个列表

$ curl -v http://localhost:8080/list
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /list HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200
< Content-Type: application/json
< Transfer-Encoding: chunked
< Date: Tue, 21 Feb 2023 10:41:12 GMT
<
* Connection #0 to host localhost left intact
[{"message":"Hello List 1!"},{"message":"Hello List 2!"},{"message":"Hello List 3!"},{"message":"Hello List 4!"},{"message":"Hello List 5!"}]

日志文件

2023-02-21 19:40:50.087 [INFO ] [main] org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start:220 - Tomcat started on port(s): 8080 (http) with context path ''
2023-02-21 19:40:50.095 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.194 seconds (JVM running for 1.405)
2023-02-21 19:41:02.171 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:525 - Initializing Servlet 'dispatcherServlet'
2023-02-21 19:41:02.174 [INFO ] [http-nio-8080-exec-1] org.springframework.web.servlet.FrameworkServlet.initServletBean:547 - Completed initialization in 1 ms
2023-02-21 19:41:04.192 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 1!}
2023-02-21 19:41:06.196 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 2!}
2023-02-21 19:41:08.197 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 3!}
2023-02-21 19:41:10.198 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 4!}
2023-02-21 19:41:12.200 [INFO ] [http-nio-8080-exec-1] com.example.springmvc.SpringbootApplication.lambda$0:45 - Sending message: {message=Hello List 5!}

可以推测出,从线程名http-nio-8080-exec-1来看,该线程是Spring MVC中Tomcat处理的线程。
假设数据会以一次获取的方式在客户端(curl)端时间经过一段时间后被获取。

使用Spring WebFlux创建非同步处理的WEB服务

※ 我们会使用Spring Boot来创建。

创建项目文件夹

假设~/tmp/async-spring-webflux是项目文件夹。

$ cd ~
$ mkdir -p tmp/async-spring-webflux
$ cd ~/tmp/async-spring-webflux

创建应用程序类

※ 为了简化构成,我描述了所有的要素。

$ mkdir -p src/main/java/com/example/springwebflux
$ vim src/main/java/com/example/springwebflux/SpringbootApplication.java

文件的内容 de

package com.example.springwebflux;

import java.time.Duration;
import java.util.Map;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@CrossOrigin(origins = "*", allowedHeaders = "*")
@RestController
@SpringBootApplication
public class SpringbootApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootApplication.class, args);
    }

    @GetMapping("/mono")
    public Mono<Map<String, String>> getMono() {
        return Mono.just(
            Map.of("message", "Hello Mono!"))
            .delayElement(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    }

    @GetMapping("/flux")
    public Flux<Map<String, String>> getFlux() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map));
    }

    @GetMapping("/flux-sse")
    public Flux<ServerSentEvent<Map<String, String>>> getFluxWithSSE() {
        return Flux.range(1, 5)
            .map(idx -> Map.of("message", "Hello Flux " + idx + "!"))
            .delayElements(Duration.ofSeconds(2))
            .doOnNext(map -> log.info("Sending message: {}", map))
            .map(map -> ServerSentEvent.builder(map).build())
            .concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event("end").build()));
    }
}

※ 每个处理都设置了2秒的延迟。
※ 提示:如果使用Spring WebFlux在服务器端实现SSE,需要明确地向接收端的客户端发送 “event: end”。

说明@GetMapping(“/mono”) 注释将接收 HTTP GET 请求,并映射到 “/mono” 端点。

Mono<Map<String, String>> 是表示单个异步值的 Reactive Streams 类型。在这种情况下,它表示单个 Map 对象。

Mono.just(Map.of(“message”, “Hello Mono!”)) 创建一个具有 “message” 键和其值的 Map 对象,并将其包装在 Mono 中返回。

.delayElement(Duration.ofSeconds(2)) 是将 Mono 转换为在响应中延迟 2 秒的操作符。

.doOnNext(map -> log.info(“Sending message: {}”, map)) 是添加副作用以在每次值被通知时将消息输出到日志的操作符。

因此,该端点将在等待 2 秒后返回包含 “Hello Mono!” 消息的 Map 对象作为单个值。然后,日志将输出类似于 “Sending message: {message=Hello Mono!}” 的消息。

给客户端返回多个异步值响应的端点

@GetMapping(“/flux”) 注释将接收 HTTP GET 请求,并映射到 “/flux” 端点。

Flux<Map<String, String>> 是表示多个异步值的 Reactive Streams 类型。在这种情况下,它表示多个 Map 对象。

Flux.range(1, 5) 生成一个包含从 1 到 5 的整数的 Flux。

.map(idx -> Map.of(“message”, “Hello Flux ” + idx + “!”)) 是用于创建包含带有 “message” 键和其值的 Map 对象的操作符。

.delayElements(Duration.ofSeconds(2)) 是一个操作符,用于将每个元素延迟 2 秒。

.doOnNext(map -> log.info(“Sending message: {}”, map)) 是添加副作用以在每次值被通知时将消息输出到日志的操作符。

因此,该端点将返回包含以 2 秒间隔通知的带有 “Hello Flux {index}!” 消息的 Map 对象的多个值。然后,日志将输出类似于 “Sending message: {message=Hello Flux {index}!}” 的消息。

使用 Server-Sent Events(SSE)协议异步发送数据给客户端的端点

@GetMapping(“/flux-sse”) 注释将接收 HTTP GET 请求,并映射到 “/flux-sse” 端点。

Flux<ServerSentEvent<Map<String, String>>> 是表示多个异步值的 Reactive Streams 类型,并使用 Server-Sent Events 格式。在这种情况下,它表示多个 Map 对象。

Flux.range(1, 5) 生成一个包含从 1 到 5 的整数的 Flux。

.map(idx -> Map.of(“message”, “Hello Flux ” + idx + “!”)) 是用于创建包含带有 “message” 键和其值的 Map 对象的操作符。

.delayElements(Duration.ofSeconds(2)) 是一个用于将每个元素延迟 2 秒的操作符。

.doOnNext(map -> log.info(“Sending message: {}”, map)) 是添加副作用以在每次值被通知时将消息输出到日志的操作符。

.map(map -> ServerSentEvent.builder(map).build()) 是用于将每个 Map 对象转换为 ServerSentEvent 对象的操作符。

.concatWith(Mono.just(ServerSentEvent.<Map<String, String>>builder().event(“end”).build())) 是一个操作符,用于在流的末尾添加一个 ServerSentEvent 对象。这样可以发送一个表示 SSE 流结束的 event: end 标记。

因此,该端点将以 2 秒间隔通知的带有 “Hello Flux {index}!” 消息的 Map 对象作为多个值,使用 Server-Sent Events 格式返回。然后,日志将输出类似于 “Sending message: {message=Hello Flux {index}!}” 的消息。

创建pom.xml

$ vim pom.xml
文件内容pom.xml

4.0.0
org.springframework.boot
spring-boot-starter-parent
2.7.8

com.example
sync-spring-webflux
1.0
sync-spring-webflux

11
UTF-8

org.springframework.boot
spring-boot-starter-webflux

org.projectlombok
lombok
1.18.26
provided

ch.qos.logback
logback-core
1.2.6

ch.qos.logback
logback-classic
1.2.6

org.slf4j
slf4j-api
1.7.30

app
org.springframework.boot
spring-boot-maven-plugin

创建Spring Boot配置文件

    application.properties

创建日志设置文件

    logback-spring.xml

※ 您可以参考之前的文章来创建上述两个设置文件。

运行Spring WebFlux应用程序

Java 应用程序构建

$ mvn clean install

按下Ctrl + C即可停止Java应用程序。

$ mvn spring-boot:run

用curl命令确认Spring WebFlux应用程序。

※ 我会从另一个终端确认。
※ 为了方便阅读,我增加了一些换行。

如果以普通的HTTP请求单色进行请求的情况下

$ curl -v http://localhost:8080/mono
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /mono HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
< Content-Length: 25
<
* Connection #0 to host localhost left intact
{"message":"Hello Mono!"}

日志文件

2023-02-21 19:09:36.821 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 19:09:36.829 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.095 seconds (JVM running for 1.321)
2023-02-21 19:09:44.125 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$0:46 - Sending message: {message=Hello Mono!}

从线程名为 “parallel-1” 可以推断出 Spring WebFlux 在并行处理线程上处理 Mono。

如果用普通的HTTP请求Flux

$ curl -v http://localhost:8080/flux
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: application/json
<
* Connection #0 to host localhost left intact
[{"message":"Hello Flux 1!"},{"message":"Hello Flux 2!"},{"message":"Hello Flux 3!"},{"message":"Hello Flux 4!"},{"message":"Hello Flux 5!"}]

日志文件

2023-02-21 18:57:26.755 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 18:57:26.763 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.092 seconds (JVM running for 1.314)
2023-02-21 18:57:39.522 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 1!}
2023-02-21 18:57:41.527 [INFO ] [parallel-2] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 2!}
2023-02-21 18:57:43.529 [INFO ] [parallel-3] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 3!}
2023-02-21 18:57:45.531 [INFO ] [parallel-4] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 4!}
2023-02-21 18:57:47.533 [INFO ] [parallel-5] com.example.springwebflux.SpringbootApplication.lambda$1:66 - Sending message: {message=Hello Flux 5!}

从标题为”parallel-n”的线程中可以推测出,Spring WebFlux使用并行处理线程以每2秒处理一次Flux。
在客户端(curl)端,数据将在一段时间后一次性获取。

如果使用 Server-Sent Events 指定的 HTTP 请求 Flux

※ 添加”Accept: text/event-stream” 头部。

$ curl -v http://localhost:8080/flux-sse -H 'Accept: text/event-stream'
*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /flux-sse HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.81.0
> Accept: text/event-stream
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Vary: Origin
< Vary: Access-Control-Request-Method
< Vary: Access-Control-Request-Headers
< Content-Type: text/event-stream;charset=UTF-8
<
data:{"message":"Hello Flux 1!"}

data:{"message":"Hello Flux 2!"}

data:{"message":"Hello Flux 3!"}

data:{"message":"Hello Flux 4!"}

data:{"message":"Hello Flux 5!"}

event:end

* Connection #0 to host localhost left intact

日志文件

2023-02-21 19:25:11.977 [INFO ] [main] org.springframework.boot.web.embedded.netty.NettyWebServer.start:111 - Netty started on port 8080
2023-02-21 19:25:11.985 [INFO ] [main] org.springframework.boot.StartupInfoLogger.logStarted:61 - Started SpringbootApplication in 1.092 seconds (JVM running for 1.313)
2023-02-21 19:25:18.690 [INFO ] [parallel-1] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 1!}
2023-02-21 19:25:20.708 [INFO ] [parallel-2] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 2!}
2023-02-21 19:25:22.712 [INFO ] [parallel-3] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 3!}
2023-02-21 19:25:24.717 [INFO ] [parallel-4] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 4!}
2023-02-21 19:25:26.722 [INFO ] [parallel-5] com.example.springwebflux.SpringbootApplication.lambda$4:112 - Sending message: {message=Hello Flux 5!}

※ Spring WebFlux 使用并行处理线程来处理Flux,每2秒处理一次。
※ 客户端(curl)以实时方式逐个获取数据。

概括

    • Spring WebFlux で Server-Sent Events を実装する初歩的なWEBサービスを実装することが出来ました。

 

    今後この Server-Sent Events を使用するクライアント側のアプリを実装して理解を深ようと思いました。

请提供需要修改的原文。

    • W3C Server-Sent Events

 

    • HTML Living Standard 9.2 Server-sent events

 

    MDN サーバー送信イベント
广告
将在 10 秒后关闭
bannerAds