最后更新: 2025-7-8
次查看
字数 17078阅读时长 43 分钟
type
Post
status
Published
date
Apr 10, 2021
slug
21
summary
webFlux 初识LambdaLambda 表达式,有时候也称为匿名函数或箭头函数,几乎在当前的各种主流的编程语言中都有它的身影。Java8 中引入 Lambda 表达式,使原本需要用匿名类实现接口
tags
Spring
Java
响应式
异步编程
WebFlux
category
技术分享
icon
password
musicId
catalog
archives
sort
0

webFlux 初识

Lambda

Lambda 表达式,有时候也称为匿名函数或箭头函数,几乎在当前的各种主流的编程语言中都有它的身影。Java8 中引入 Lambda 表达式,使原本需要用匿名类实现接口来传递行为,现在通过 Lambda 可以更直观的表达。
  • Lambda 表达式,也可称为闭包。闭包就是一个定义在函数内部的函数,闭包使得变量即使脱离了该函数的作用域范围也依然能被访问到。
  • Lambda 表达式的本质只是一个”语法糖”,由编译器推断并帮你转换包装为常规的代码,因此你可以使用更少的代码来实现同样的功能。
  • Lambda 表达式是一个匿名函数,即没有函数名的函数。有些函数如果只是临时一用,而且它的业务逻辑也很简单时,就没必要非给它取个名字不可。
  • Lambda 允许把函数作为一个方法的参数(函数作为参数传递进方法中).
Lambda 表达式语法如下:形参列表=>函数体(函数体多于一条语句的可用大括号括起)。在Java里就是**() -> {}**:
Lambda表达式的重要特征:
  • Lambda 表达式主要用来定义行内执行的方法类型接口,例如,一个简单方法接口。
  • Lambda表达式是通过函数式接口(必须有且仅有一个抽象方法声明)识别的
  • 可选类型声明:不需要声明参数类型,编译器可以统一识别参数值。
  • 可选的参数圆括号:一个参数无需定义圆括号,但多个参数需要定义圆括号。
  • 可选的大括号:如果主体包含了一个语句,就不需要使用大括号。
  • 可选的返回关键字:如果主体只有一个表达式返回值,则编译器会自动返回值,大括号需要指定表达式返回一个值。
Lambda表达式中的变量作用域:
  • 访问权限与匿名对象的方式非常类似。只能够访问局部对应的外部区域的局部final变量,以及成员变量和静态变量
  • 在Lambda表达式中能访问域外的局部非final变量、但不能修改Lambda域外的局部非final变量。因为在Lambda表达式中,Lambda域外的局部非final变量会在编译的时候,会被隐式地当做final变量来处理
  • Lambda表达式内部无法访问接口默认(default)方法
例子:使用Java 8之前的方法来实现对一个列表进行排序:
Java 8 Lambda 表达式:

类型推断

通常 Lambda 表达式的参数并不需要显示声明类型。那么对于给定的Lambda表达式,程序如何知道对应的是哪个函数接口以及参数的类型呢?编译器通过 Lambda 表达式所在的上下文来进行目标类型推断,通过检查 Lambda 表达式的入参类型及返回类型,和对应的目标类型的方法签名是否一致,推导出合适的函数接口。比如:
在上面的例子中,对于传入 map 方法的 Lamda 表达式,从 Stream 的类型上下文可以推导出入参是 String 类型,从函数的返回值可以推导出出参是整形类型,因此可推导出对应的函数接口类型为 Function;对于传入 filter 方法的 Lamda 表达式,从 pipeline 的上下文可得知入参是整形类型,因此可推导出函数接口 Predicate。

方法引用

Java 8 中还可以通过方法引用表示 Lambda 表达式。方法引用是用来直接访问类或者实例的已经存在的方法或者构造方法。Java 8 允许你通过"::"关键字获取方法或者构造函数的引用。方法引用提供了一种引用而不执行方法的方式,它需要由兼容的函数式接口构成目标类型上下文。计算时,方法引用会创建函数式接口的一个实例。常用的方法引用有:
  • 静态方法引用:ClassName::methodName
  • 实例对象上的方法引用:instanceReference::methodName
  • 类上的方法引用:ClassName::methodName
  • 构造方法引用:Class::new
  • 数组构造方法引用:TypeName[]::new
例子:

函数式接口

Java 8 中采用函数式接口作为Lambda 表达式的目标类型。函数式接口(Functional Interface)是一个有且仅有一个抽象方法声明接口。任意只包含一个抽象方法的接口,我们都可以用来做成Lambda表达式。每个与之对应的lambda表达式必须要与抽象方法的声明相匹配。函数式接口与其他普通接口的区别:
  • 函数式接口中只能有一个抽象方法(这里不包括与Object的方法重名的方法)
  • 接口中唯一抽象方法的命名并不重要,因为函数式接口就是对某一行为进行抽象,主要目的就是支持 Lambda 表达式
  • 自定义函数式接口时,应当在接口前加上**@FunctionalInterface**标注(虽然不加也不会有错误)。编译器会注意到这个标注,如果你的接口中定义了第二个抽象方法的话,编译器会抛出异常。

函数式编程

Java来讲,从命令式编程到函数式编程的关键转变是从Java8多了一个funtcion包开始,在此基础上的stream更好的诠释了这一点,而之后java 9 的reactor,再到spring5的webflux都是在其基础上一步步演变的

java.util.function

notion image

Function

标注为FunctionalInterface的接口被称为函数式接口,该接口只能有一个自定义方法,但是可以包括从object类继承而来的方法。如果一个接口只有一个方法,则编译器会认为这就是一个函数式接口。
是否是一个函数式接口,需要注意的有以下几点:
  • 该注解只能标记在”有且仅有一个抽象方法”的接口上。
  • JDK8接口中的静态方法和默认方法,都不算是抽象方法。
  • 接口默认继承java.lang.Object,所以如果接口显示声明覆盖了Object中方法,那么也不算抽象方法。
  • 该注解不是必须的,如果一个接口符合”函数式接口”定义,那么加不加该注解都没有影响。加上该注解能够更好地让编译器进行检查。如果编写的不是函数式接口,但是加上了@FunctionInterface,那么编译器会报错。
  • 在一个接口中定义两个自定义的方法,就会产生Invalid ‘@FunctionalInterface’ annotation; FunctionalInterfaceTest is not a functional interface错误.

响应式

响应式流(Reactive Streams)通过定义一组实体,接口和互操作方法,给出了实现异步非阻塞背压的标准。第三方遵循这个标准来实现具体的解决方案,常见的有Reactor,RxJava,Akka Streams,Ratpack等。
响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式
一个通用的流处理架构一般会是这样的(生产者产生数据,对数据进行中间处理,消费者拿到数据消费)
notion image
  • 数据来源,一般称为生产者(Producer)
  • 数据的目的地,一般称为消费者(Consumer)
  • 在处理时,对数据执行某些操作一个或多个处理阶段。(Processor)
规范定义了4个接口
notion image
在响应式流上提到了back pressure(背压)这么一个概念。在响应式流实现异步非阻塞是基于生产者和消费者模式的,而生产者消费者很容易出现的一个问题就是:生产者生产数据多了,就把消费者给压垮了
通俗就是: 消费者能告诉生产者自己需要多少量的数据。这里就是Subscription接口所做的事
特质
即时响应性: :只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。
  • *回弹性:**系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。
弹性: 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性
  • *消息驱动:**反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。
notion image
大型系统由多个较小型的系统所构成, 因此整体效用取决于它们的构成部分的反应式属性。 这意味着, 反应式系统应用着一些设计原则,使这些属性能在所有级别的规模上生效,而且可组合。

Reactive

在reactor中有两个最基本的概念,发布者和订阅者,可以理解为生产者和消费者的概念。在Reactor中发布者有两个,一个是Flux,一个是Mono。 Flux代表的是0-N个元素的响应式序列,而Mono代表的是0-1个的元素的结果。
在Reactive中
  • Publisher(发布者)相当于生产者(Producer)
  • Subscriber(订阅者)相当于消费者(Consumer)
  • Processor就是在发布者与订阅者之间处理数据用的

Mono (返回0或1个元素)

Mono 是响应流 Publisher 具有基础 rx 操作符。可以成功发布元素或者错误。如图所示:
notion image

常用方法

Flux (返回0-n个元素)

Flux 是响应流 Publisher 具有基础 rx 操作符。可以成功发布 0 到 N 个元素或者错误。Flux 其实是 Mono 的一个补充。如图所示:
notion image
所以要注意:如果知道 Publisher 是 0 或 1 个,则用 Mono。
Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 可以发布 Iterable 类型的元素。
notion image
当调用just方法,查看源码可以得知,返回的是一个Flux<T>对象,当次数为0直接返回空,为1 的时候单独处理,其余的通过onAssembly方法处理返回fluxarrary对象。
流程
notion image

内置的 Processor

Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber)。 所以你能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列。
一直在聊响应式流的四个接口中的三个:Publisher、Subscriber、Subscription,唯独Processor迟迟没有提及。原因在于想用好它们不太容易,多数情况下,我们应该进行避免使用Processor,通常来说仅用于一些特殊场景。
Reactor Core 内置多种 Processor。这些 processor 具有不同的语法,大概分为三类。
  • 直接的(direct)(DirectProcessor 和 UnicastProcessor):这些 processors 只能通过直接 调用 Sink 的方法来推送数据。
  • 同步的(synchronous)(EmitterProcessor 和 ReplayProcessor):这些 processors 既可以直接调用 Sink 方法来推送数据,也可以通过订阅到一个上游的发布者来同步地产生数据。
  • 异步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):这些 processors 可以将从多个上游发布者得到的数据推送下去。由于使用了 RingBuffer 的数据结构来缓存多个来自上游的数据,因此更加有健壮性。
异步的 processor 在实例化的时候最复杂,因为有许多不同的选项。因此它们暴露出一个 Builder 接口。 而简单的 processors 有静态的工厂方法。
 

DirectProcessor

DirectProcessor 可以将信号分发给零到多个订阅者(Subscriber)。它是最容易实例化的,使用静态方法 create() 即可。另一方面,它的不足是无法处理背压。所以,当DirectProcessor推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个IllegalStateException。
一旦 Processor 结束(通常通过调用它的 Sink 的 error(Throwable) 或 complete() 方法), 虽然它允许更多的订阅者订阅它,但是会立即向它们重新发送终止信号。
 

UnicastProcessor

UnicastProcessor可以使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者(上一节的例子通过publish转换成了ConnectableFlux,所以可以接入两个订阅者)。
UnicastProcessor有多种选项,因此提供多种不同的create静态方法。例如,它默认是 无限的(unbounded) :如果你在在订阅者还没有请求数据的情况下让它推送数据,它会缓存所有数据。
可以通过提供一个自定义的 Queue 的具体实现传递给 create 工厂方法来改变默认行为。如果给出的队列是有限的(bounded), 并且缓存已满,而且未收到下游的请求,processor 会拒绝推送数据。
在“有限的”队列中,还可以在构造 processor 的时候提供一个回调方法,这个回调方法可以在每一个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。

EmitterProcessor

EmitterProcessor能够向多个订阅者发送数据,并且可以对每一个订阅者进行背压处理。它本身也可以订阅一个发布者并同步获得数据。
最初如果没有订阅者,它仍然允许推送一些数据到缓存,缓存大小由bufferSize定义。 之后如果仍然没有订阅者订阅它并消费数据,对onNext的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。
因此第一个订阅者会收到最多bufferSize个元素。然而之后,后续接入的订阅者只能获取到它们开始订阅之后推送的数据。这个内部的缓存会继续用于背压的目的。
默认情况下,如果所有的订阅者都取消了订阅,它会清空内部缓存,并且不再接受更多的订阅者。这一点可以通过 create 静态工厂方法的 autoCancel 参数来配置。

ReplayProcessor

ReplayProcessor会缓存直接通过自身的 Sink 推送的元素,以及来自上游发布者的元素, 并且后来的订阅者也会收到重发(replay)的这些元素。
可以通过多种配置方式创建它:
  • 缓存一个元素(cacheLast)。
  • 缓存一定个数的历史元素(create(int)),所有的历史元素(create())。
  • 缓存基于时间窗期间内的元素(createTimeout(Duration))。
  • 缓存基于历史个数和时间窗的元素(createSizeOrTimeout(int, Duration))。

TopicProcessor

TopicProcessor是一个异步的 processor,它能够重发来自多个上游发布者的元素, 这需要在创建它的时候配置shared(build() 的 share(boolean) 配置)。
如果你企图在并发环境下通过并发的上游发布者调用TopicProcessor的onNext、 onComplete,或onError方法,就必须配置shared。否则,并发调用就是非法的,从而 processor 是完全兼容响应式流规范的。
TopicProcessor能够对多个订阅者发送数据。它通过对每一个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出onError或onComplete信号,或关联的订阅者被取消。 最多可以接受的订阅者个数由构造者方法executor指定,通过提供一个有限线程数的 ExecutorService来限制这一个数。
这个 processor 基于一个RingBuffer数据结构来存储已发送的数据。每一个订阅者线程 自行管理其相关的数据在RingBuffer中的索引。
这个 processor 也有一个autoCancel构造器方法:如果设置为true(默认的),那么当 所有的订阅者取消之后,上游发布者也就被取消了。

WorkQueueProcessor

WorkQueueProcessor也是一个异步的 processor,也能够重发来自多个上游发布者的元素, 同样在创建时需要配置shared(它多数构造器配置与TopicProcessor相同)。
它放松了对响应式流规范的兼容,但是好处就在于相对于TopicProcessor来说需要更少的资源。 它仍然基于RingBuffer,但是不再要求每一个订阅者都关联一个线程,因此相对于TopicProcessor来说更具扩展性。
代价在于分发模式有些区别:来自订阅者的请求会汇总在一起,并且这个 processor 每次只对一个 订阅者发送数据,因此需要循环(round-robin)对订阅者发送数据,而不是一次全部发出的模式(无法保证完全公平的循环分发)。
WorkQueueProcessor多数构造器方法与TopicProcessor相同,比如autoCancel、share, 以及waitStrategy。下游订阅者的最大数目同样由构造器executor配置的ExecutorService 决定。
  • *注意:**最好不要有太多订阅者订阅WorkQueueProcessor,因为这会锁住 processor。如果你需要限制订阅者数量,最好使用一个ThreadPoolExecutor或 ForkJoinPool。这个 processor 能够检测到(线程池)容量并在订阅者过多时抛出异常。

完成信号

对于 Flux和Mono来说,just 是数据完成的信号,那如果不是通过just声明的数据流,没有这种数据准备完成的信号,那么这个流就是一个无限流。除了我们手动声明数据准备的完成,错误信号也标志这整个流的完成。
还有一种情况就是当Flux和Mono没有发出任何一个元素,而是直接发出了完成信号,那么这个流就是一个空的流,像这样。
还有很重要的一点就是 Flux.just(1,2,4) 只是定义了一个数据流而已,在subscribe() 之前的操作什么也不会发生,类似 Stream的惰性求值,在中止操作之前的操作都不会触发。
例如打印声明的数据流需要这样做
另外 subscribe 时,还可以指定错误的回调处理,以及数据处理完的完成回调
notion image
所以可以这样写
流程:
notion image

流量控制(背压)

上面提到了一个问题,当生产者生产的速度远远大于消费者消费的的速度的时候,会引发任务大量堆积的情况,最终压垮整个管道。
notion image
那么响应式是怎么解决这个问题的,通过背压(back pressure)的机制,如下图
notion image
这种下游可以向上游反馈自己消费能力的机制就叫做背压,具体背压的原理和运行机制会在后面的实战中带入,因为很多刚接触这种概念的同学只听理论的话会一时很难理解。
通过 Reactor提供的BaseSubscriber来进行自定义我们自己流量控制的subscriber

Reactor中的多线程

在我们java的传统的编程中,对于线程之间的调度有封装好的线程池工具类供我们使用,或者我们可以通过线程池的构造函数定义自己的线程池,这一切都让多线程的调度都变得很容易,那么在reactor中怎么处理线程的调度
4.1 Schedulers
在reactor中处理线程调度的不叫thread pool,而是Schedulers(调度器),通过调度器就可以创建出供我们直接使用的多线程环境。
4.1.1 Schedulers.immediate()
在当前线程中使用
4.1.2 Schedulers.single()
创建一个可重用的单线程环境,该方法的所有调用者都会重复使用同一个线程。
4.1.3 Schedulers.elastic()
创建一个弹性线程池,会重用空闲线程,当线程池空闲时间过长就会自动废弃掉。通常使用的场景是给一个阻塞的任务分配自己的线程,从而不会影响到其他任务的执行。
4.1.4 Schedulers.parallel()
创建一个固定大小的线程池,线程池大小和cpu个数相等。
来看一个具体使用的实例,通过 Schedulers.elastic() 将一个同步阻塞的方法改写成异步的。
简单分析上述代码,通过fromCallable声明 一个callable 的mono,然后通过subscribeOn 切换环境,调度任务到单独的弹性线程池工作。

错误处理

在传统的编程中,我们处理单个接口错误的方式,可能是 try-catch-finally的方式,也可能是try-winth-resource的语法糖,这些在reactor中变得不太一样。下面来说一说reactor中的几种错误处理方式。
5.1 onErrorReturn
onErrorReturn在发生错误的时候,会提供一个缺省值,类似于安全取值的问题,但是这个在响应式流里面会更加实用。
这样就可以在处理一些未知元素的时候,又不想让未知因素中止程序的继续运行,就可以采取这种方式。
5.2 onErrorResume
在发生错误的时候,提供一个新的流或者值返回,这样说可能不太清楚,看代码。
类似于错误的一个callback;
5.3 onErrorMap
上面的都是我们去提供缺省的方法或值处理错误,但是有的时候,我们需要抛出错误,但是需要将错误包装一下,可读性好一点,也就是抛出自定义异常。
5.4 doOnError 记录错误日志
在发生错误的时候我们需要记录日志,在reactor里面专门独立出api记录错误日志
doOnError 对于流里面的元素只读,也就是他不会对流里面的任务元素操作,记录日志后,会讲错误信号继续抛到后面,让后面去处理。
5.5 finally 确保做一些事情
有的时候我们想要像传统的同步代码那样使用finally去做一些事情,比如关闭http连接,清理资源,那么在reactor中怎么去做finally
或者当我们打开一个连接需要关闭资源的时候,还可以这样写
使用using函数的三个参数,获取client,发送请求,关闭资源。
5.6 retry 重试机制
当遇到一些不可控因素导致的程序失败,但是代码逻辑确实是正确的,这个时候需要重试机制。
但是需要注意的是重试不是从错误的地方开始重试,相当于对publisher 的重订阅,也就是从零开始重新执行一遍,所以无法达到类似于断点续传的功能,所以使用场景还是有限制。

如何调试reactor

在我们传统阻塞代码里面,调试(Debug)的时候是一件非常简单的事情,通过打断点,得到相关的stack 的信息,就可以很清楚的知道错误信息(不过在多线程的环境下去打断点,需要切换线程环境,也有点麻烦)。
但是在reactor 环境下去调试代码并不是一件简单的事情,最常见的就是 一个 Flux流,怎么去得到每个元素信息,怎么去知道在管道里面下一个元素是什么,每个元素是否像期望的那样做了操作。所以这也是从传统编程切换到响应式编程的难点,开发人员需要花时间去学习这个操作,但是感觉难受总是好的,因为做什么都太容易的话,自己会长期止步于此,像早期的EJB到j2ee,ssh -> ssm -> spring boot -> spring cloud ,从微服务->service mesh -> serve less ,到现在一些一线大厂盛行的中台。也许这一次就是改变自己的时候。
言归正传,关于比较复杂的调试后期再说,我们先从最基本的单元测试开始。官方推荐的工具是 StepVerifier
  1. 创建测试的异步流
  1. 测试期望的值
  1. 测试是否完成
  1. 验证
我们通常使用create方法创建基于Flux或Mono的StepVerifier,然后就可以进行以下测试:
  • 测试期望发出的下一个信号。如果收到其他信号(或者信号与期望不匹配),整个测试就会 失败(AssertionError),如expectNext(T...)或expectNextCount(long)。`
  • 处理(consume)下一个信号。当你想要跳过部分序列或者当你想对信号内容进行自定义的校验的时候会用到它,可以使用consumeNextWith(Consumer<T>)。
  • 其他操作,比如暂停或运行一段代码。比如,你想对测试状态或内容进行调整或处理, 你可能会用到thenAwait(Duration)和then(Runnable)。
对于终止事件,相应的期望方法(如expectComplete()、expectError(),及其所有的变体方法) 使用之后就不能再继续增加别的期望方法了。最后你只能对 StepVerifier 进行一些额外的配置并 触发校验(通常调用verify()及其变体方法)。
从StepVerifier内部实现来看,它订阅了待测试的 Flux 或 Mono,然后将序列中的每个信号与测试 场景的期望进行比对。如果匹配的话,测试成功。如果有不匹配的情况,则抛出AssertionError异常。
响应式流是一种基于时间的数据流。许多时候,待测试的数据流存在延迟,从而持续一段时间。如果这种场景比较多的话,那么会导致自动化测试运行时间较长。因此StepVerifier提供了可以操作“虚拟时间”的测试方式,这时候需要使用StepVerifier.withVirtualTime来构造。
为了提高 StepVerifier 正常起作用的概率,它一般不接收一个简单的 Flux 作为输入,而是接收 一个Supplier,从而可以在配置好订阅者之后 “懒创建”待测试的 flux,如:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) //... 继续追加期望方法
有两种处理时间的期望方法,无论是否配置虚拟时间都是可用的:
  • thenAwait(Duration)会暂停校验步骤(允许信号延迟发出)。
  • expectNoEvent(Duration)同样让序列持续一定的时间,期间如果有任何信号发出则测试失败。 在普通的测试中,两个方法都会基于给定的持续时间暂停线程的执行。而如果是在虚拟时间模式下就相应地使用虚拟时间。
  1. expectNoEvent 将订阅(subscription)也认作一个事件。假设你用它作为第一步,如果检测 到有订阅信号,也会失败。这时候可以使用expectSubscription().expectNoEvent(duration) 来代替;
  1. 期待“一天”内没有信号发生;
  1. verify或变体方法最终会返回一个Duration,这是实际的测试时长。
3.1 map
这里的map和java 8 stream的map是同一个意思,用于元素的转换,像这样
还是之前的代码,只是对每一个元素都自增加一,这里就不多说了,对lambada熟悉的同学都了解。
3.2 flatmap
flatmap也是对元素的转换,但是不同的是flatmap是将元素转换为流,再将流合并为一个大的流。
tips :flatmap 和 map的区别 从源码上来看 map就是一个function函数,输入一个输出一个,对于flatmap来讲它接受的是输出为Publisher的function,也就是说对于flatmap来讲 输入一个值,输出的是一个Publisher,所以map是一对一的关系,而flatmap 是一对多或者多对多的关系,并且两者输出也不一样。那flatmap 的应用场景在哪里,例如一个接口,入参是List<id>,用户id 的集合,需求是返回每个id 对应的具体信息,所以代码看起来就是这样 xx.flatmap(id->getUserInfoById(id))
3.3 filter
reactor 的filter和java 8 stream 的filter是一样的,就不多说了,这里过滤掉值为2 的
3.4 zip
这个是操作可能看起来比较陌生,顾名思义,“压缩”就是将多个流一对一的合并起来,还有一个原因,因为在每个flux流或者mono流里面,各个流的速度是不一样,zip还有个作用就是将两个流进行同步对齐。例如我们这里在加入另一个流,这个流会不停的发出元素,为了让大家可以感受到同,这里限制另一个流的速度为没1秒发出一个元素,这样合并的流也会向另一个流对齐。
上面讲的这四个是比较常用的,还有很多。

jdk9的响应式规范

JDK 9提供了对于Reactive的完整支持,JDK9也定义了上述提到的四个接口,在java.util.concurrent包上
Flow的源码

webFlux

Spring WebFlux 是 Spring 5 引入的响应式 Web 框架,它基于 Reactive Streams 标准,提供了异步非阻塞的编程模型,能够处理大量并发请求,提高系统的吞吐量和响应性能。
响应式编程是异步非阻塞的(是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式)
notion image
以往根据不同的应用场景选择不同的技术,有的场景适合用于同步阻塞的,有的场景适合用于异步非阻塞的。而Spring5提供了一整套响应式(非阻塞)的技术栈供我们使用(包括Web控制器、权限控制、数据访问层等等)。
响应式一般用Netty或者Servlet 3.1的容器(因为支持异步非阻塞),而Servlet技术栈用的是Servlet容器
notion image
Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux
WebFlux使用的响应式流并不是用JDK9平台的,而是Reactor响应式流库为啥?因为人家是兄弟公司!

Spring web的注解路由

从调用者(浏览器)的角度而言,是感知不到有什么变化的,因为都是得等待5s才返回数据。但是,从服务端的日志我们可以看出,WebFlux是直接返回Mono对象的(而不是像SpringMVC一直同步阻塞5s,线程才返回)。
这正是WebFlux的好处:能够以固定的线程来处理高并发(充分发挥机器的性能)。
 

SSE路由

WebFlux还支持服务器推送(SSE - >Server Send Event),我们来看个例子:
效果就是每秒会给浏览器推送数据:大内密探00x
 

路由函数(Router Functions)

Spring WebFlux 提供了路由函数的方式来定义请求路由,这是一种函数式的路由定义方式,相比于传统的注解方式更加灵活。
 

核心组件

1.HttpHandler
是一种带有处理 HTTP 请求和响应方法的简单契约。
2.WebHandler
notion image
webHandler显得有一些抽象,我们可以通过对比SpringMVC的一些组件帮助大家理解一下在WebFlux中各个组件的作用:
notion image
notion image
 

处理函数(Handler Functions)

处理函数是处理请求的具体逻辑,它接收请求并返回响应。
 

WebClient

WebClient 是 Spring WebFlux 提供的非阻塞的 HTTP 客户端,用于发起异步 HTTP 请求。
 
 

错误处理

在 WebFlux 中,可以通过全局异常处理器来处理请求过程中发生的异常。
 

请求处理流程

notion image
RouterFunctionMapping中有private RouterFunction<?> routerFunction;这里面表面看起来只有一个Bean,其实它里面组合了非常多的RouterFunction,它是如何根据用户的请求找到对应的Function的呢?
关键部分就是通过它的成员变量routerFunction的route方法来找,其实就是通过用户写的predicate来判断请求是否相符合,如果符合就返回一个Mono<HandlerFunction<T>>
notion image

总结

反应式编程框架主要采用了观察者模式,而 Spring Reactor的核心则是对观察者模式的一种衍伸。关于观察者模式的架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这时就出现了两种情况:
  • 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件好比观察者在等米下锅,程序等待)。
  • 被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。(好比被观察者生产的大米没人吃,堆积最后就会烂掉)。为了方便下面理解Mono和Flux,也可以理解为Publisher(发布者也可以理解为被观察者)主动推送数据给Subscriber(订阅者也可以叫观察者),如果Publisher发布消息太快,超过了Subscriber的处理速度,如何处理。这时就出现了Backpressure(背压—–指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略)
  • WebFlux提升的其实往往是系统的伸缩性,对于速度的提升没有太多的明显。
  • Reactive 编程尽管没有新增大量的代码,然而编码(和调试)却是变得更为复杂
  • 现在面临的最大问题是缺少文档。在开发应用中给我们造成了最大障碍。且Spring WebFlux 尚未证明自身明显地优于Spring MVC
你有什么值得分享的高效学习方法?API网关之Gateway
Loading...