本篇文章写于2020年11月20日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。
原标题《往简单的方向深入理解,或许反应式编程更容易入门》
反应式编程虽然能提升性能,有诸多好处,却也带来一些弊端,增加代码的复杂度、高度的API
侵入(相当于依赖了一个JDK
)。
笔者个人认为,反应式编程不适用于业务开发,特别是复杂业务系统的开发,这或许就是反应式编程从推出到现在依然不温不火的原因吧。当然,这并不是劝说大家从入门到放弃。反应式编程适合做对性能要求高的中间件,或者脱离业务的底层系统,例如网关、消息推送服务。
Reactive Streams
规范Reactor
是如何实现Reactive Streams
规范的- 拨开反应式编程中
Context
实现的神秘面纱 - 委托模式的使用与
BaseSubscriber
- 为
spring-data-r2dbc
实现多数据源动态切换
反应式编程Reactor
库完全实现了Reactive Streams
规范,Reactive Streams
定义了反应式编程的规范,如果你到Github
查看它,你将只会看到这四个接口:Publisher
、Subscriber
、Subscription
、Processor
。
在了解这几个接口之前,我们需要先了解什么是反应式编程。
本文观点仅站在笔者的个人角度理解,正确性与笔者的水平有关,读者在阅读本篇文章过程中,如果有疑惑的地方欢迎留言探讨!
以往阻塞式编程我们发起远程调用等I/O操作都是阻塞当前线程以等待接口的响应,待接收到响应后再消费响应结果。在等待过程中该线程会一直处于空闲状态,而如果是反应式编程,在发起请求后,当前线程就会转去做别的事情,直到接收到响应结果,再发布响应结果给订阅者继续消费响应结果,当然,发起网络请求不在Reactor
库的职责范围内。
反应式编程,由发布者通过发布数据传递给订阅者消费数据。
反应式流指的是一个原始数据经过多重操作或者转化后,最终被订阅者消费。而每一步操作或转化也都是一次数据的发布订阅,这些发布订阅按顺序组合到一起就构成了反应式流。
Reactive Streams
规范
Publisher
:发布者;Subscriber
:订阅者;Subscription
:订阅,用于连接发布者和订阅者;Processor
:处理器,即是订阅者也是发布者;
由于这几个接口只是Reactive Streams
定义的规范,详细执行过程需要结合Reactive Streams
规范的实现库理解,因此我们先简单熟悉一下这几个接口,然后再介绍Reactor
如何实现这些接口。
Publisher(发布者)
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
subscribe
:订阅者订阅发布者;
如果接触过WebFlux
或者用过Reactor
库应该对subscribe
方法不陌生,尽管还不了解工作原理,但至少我们知道,只有调用Mono/Flux
的subscribe
方法才会触发整个流执行。
即便没有接触过WebFlux
或者Reactor
,我们也应该都接触过Java8
提供的Stream
。Java8 Stream
只有遇到终止操作才会触发流的执行,而反应式编程Publisher
的subscribe
方法就相当于Java8 Stream
的终止操作。
在没有调用Publisher#subscribe
方法之前,一切操作都只是我们定义的执行计划,执行计划制定整个流的执行过程。
Subscriber(订阅者)
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
Reactive Streams
规范定义订阅者(Subscriber
)可以订阅四种事件:
onSubscribe
:通常由Publisher
调用,被此订阅者订阅的发布者执行Publisher#subscribe
方法时调用;onNext
:通常由Subscription
调用,Subscription#request
请求到数据时调用;onError
:通常由onNext
方法调用,当onNext
捕获到数据消费异常时被调用;onComplete
:通常由Subscription
调用,在数据全部被正常消费完成、没有错误导致订阅终止时被调用;
Subscription(订阅)
public interface Subscription {
void request(long n);
void cancel();
}
订阅操作(Subscription
),相当于是一个场景类。
request
:订阅者调用此方法请求指定数量的数据,在请求到数据时调用订阅者的onNext
方法传递数据给订阅者,通常在Subscriber
的onSubscribe
方法中被调用;cancel
:通常由订阅者调用此方法来取消订阅,此方法被调用后request
不再产生数据、不再触发订阅者的onNext
;
Reactor是如何实现Reactive Streams规范的
一个简单的Mono
使用例子如下。
public class MonoStu{
public static void main(String[] args){
// (1)
Mono.just(1)
// (2)
.subscribe(System.out::println);
}
}
我们将一步步分析此案例的执行流程,以此了解Reactor
是如何实现Reactive Streams
规范的。
阅读本文不需要读者去翻阅源码,当然,如果能结合源码一起看效果更佳。Mono
源码在reactor-core
库的reactor.core.publisher
包下。
public abstract class Mono<T>
implements Publisher<T> {
}
Mono
是一个抽象类,它实现了Reactive Streams
规范的Publisher
接口,并扩展发布者的操作以提供流式编程(反应式流)。
我们先看Mono#just
静态方法:
public abstract class Mono<T> implements Publisher<T> {
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}
}
初次阅读源码,并且在不了解Reactor
库的情况下,笔者不建议大家去纠结onAssembly
方法,这也是学习方法,先掌握主干,再去关心细枝末节,所以我们选择忽略onAssembly
方法。
忽略onAssembly
之后的Mono#just
静态方法(后文同):
public abstract class Mono<T> implements Publisher<T> {
public static <T> Mono<T> just(T data) {
return new MonoJust<>(data);
}
}
just
方法返回一个MonoJust
对象,此类继承Mono
。类似于我们使用Builder
构造者模式,每调用一个方法都会返回this
,直到调用build
方法。只是Reactor
的Mono#just
返回的是一个新的Mono
对象。
本着阅读源码先掌握主干的宗旨,我们去掉了MonoJust
实现的其它接口,只关心它继承Mono
实现的方法。经过精简后MonoJust
的源码如下。
final class MonoJust<T> extends Mono<T> {
final T value;
MonoJust(T value) {
this.value = value;
}
// 把注意力集中到这
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
}
由于Mono
也是一个发布者,它实现了Publisher
接口,所以我们重点关注的是MonoJust
实现的subscribe
方法。
如上面源码所示,MonoJust#subscribe
方法的入参是一个CoreSubscriber
,说明在父类(Mono
)已经实现过Publisher
接口的subscribe
方法,源码如下。
public abstract class Mono<T> implements Publisher<T> {
@Override
public final void subscribe(Subscriber<? super T> actual) {
// 原本是onLastAssembly(this) ,
// 我们忽略onLastAssembly,直接使用this
this.subscribe(Operators.toCoreSubscriber(actual));
}
// 子类MonoJust实现
public abstract void subscribe(CoreSubscriber<? super T> actual);
}
现在我们只知道CoreSubscriber
应该是Subscriber
的子类,目前来说,了解到这一层足够了。
我们先继续看案例中的下一句#.subscribe(System.out::println)
,然后再回头分析MonoJust#subscribe
方法。
Mono
提供很多个subscribe
方法的重载,无论我们使用哪个重载方法,最后都会调用Mono
实现Publisher
接口的subscribe
方法,也就会调用到Mono
子类实现的subscribe
方法。
此案例中,我们调用subscribe
传入的是一个lambda
,对应是实现Consumer
接口的accept
方法,该Consumer
最终被包装成LambdaMonoSubscriber
,代码如下。
public abstract class Mono<T> implements Publisher<T> {
public final Disposable subscribe(Consumer<? super T> consumer) {
// 创建LambdaMonoSubscriber
return subscribeWith(new LambdaMonoSubscriber<>(consumer, null, null, null));
}
}
请把注意力集中在consumer
、new LambdaMonoSubscriber()
以及subscribeWith
方法上,不要走神哦,因为逻辑有点绕,一走神就看不懂了。
subscribeWith
方法源码如下:
public abstract class Mono<T> implements Publisher<T> {
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
// 调用的是Mono实现Publisher接口的subscribe方法
subscribe(subscriber);
// 刚入门不用考虑返回值Disposable,这是用于取消订阅的
// return subscriber;
}
}
subscribeWith
调用Mono
实现Publisher
接口的subscribe
方法,因此LambdaMonoSubscriber
必然是一个Subscriber
。
到此,我们应该关心发布者Mono
是如何传递数据给到订阅者(LambdaMonoSubscriber
)的,为此我们需要回头分析MonoJust#subscribe
方法。
final class MonoJust<T> extends Mono<T> {
final T value;
MonoJust(T value) {
this.value = value;
}
// 参数actual是LambdaMonoSubscriber
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
// 创建Subscription
Subscription subscription = Operators.scalarSubscription(actual, value);
actual.onSubscribe(subscription);
}
}
此处subscribe
的参数actual
就是订阅者LambdaMonoSubscriber
对象,发布者MonoJust
直接在subscribe
方法中调用订阅者的onSubscribe
方法。
Reactive Streams
规范中,订阅者Subscriber
的onSubscribe
方法要求传入的是一个Subscription
,所以MonoJust#subscribe
方法中需要将真实订阅者LambdaMonoSubscriber
和数据value
封装成一个Subscription
,这个Subscription
同时也是一个Subscriber
。
LambdaMonoSubscriber
会在onSubscribe
方法中调用Subscription
的request
请求数据。
LambdaMonoSubscriber
源码如下,我们忽略LambdaMonoSubscriber
构造方法传入的空参数,以简化LambdaMonoSubscriber
的代码,便于阅读。
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
// consumer是案例传递的:System.out::println
final Consumer<? super T> consumer;
// 现在不要去考虑为什么要使用volatile
volatile Subscription subscription;
LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer) {
this.consumer = consumer;
}
// s = MonoJust中的Operators.scalarSubscription(actual, value)返回值
@Override
public final void onSubscribe(Subscription s) {
this.subscription = s;
// 请求数据
s.request(Long.MAX_VALUE);
}
}
由于我们研究的是Mono
,所以onSubscribe
方法请求获取所有的数据s.request(Long.MAX_VALUE)
。
在分析此方法之前,得要知道Operators.scalarSubscription
返回的Subscription
是一个ScalarSubscription
(同步订阅的实现)。
static final class ScalarSubscription<T> {
// 这里是LambdaMonoSubscriber
final CoreSubscriber<? super T> actual;
// 数据
final T value;
ScalarSubscription(CoreSubscriber<? super T> actual, T value) {
this.value = Objects.requireNonNull(value, "value");
this.actual = Objects.requireNonNull(actual, "actual");
}
@Override
public void request(long n) {
Subscriber<? super T> a = actual;
a.onNext(value);
a.onComplete();
}
}
为了简单,笔者又把ScalarSubscription
代码去掉了很多,这样看起来较为容易理解。
真实订阅者actual
与数据value
都是在构造方法中传入的,在案例中,actual
就是LambdaMonoSubscriber
,value
就是调用Mono#just
传入的1
。
当调用LambdaMonoSubscriber#onSubscribe
方法时,ScalarSubscription#request
被调用,request
方法中直接调用真实订阅者actual
(LambdaMonoSubscriber
)的onNext
方法传递数据value
,并且在onNext
方法执行结束之后,订阅者actual
(LambdaMonoSubscriber
)的onComplete
方法被调用。
此处订阅者actual
是LambdaMonoSubscriber
,LambdaMonoSubscriber#onNext
方法经过笔者修剪后的源码如下:
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
@Override
public final void onNext(T x) {
if (consumer != null) {
try {
// 执行main方法中subscribe传入的lambda==>System.out::println
consumer.accept(x);
}catch (Throwable t) {
Operators.onErrorDropped(t, Context.empty());
}
}
}
}
onNext
中调用的consumer
正是我们传递的lambda
表达式,也就是打印输出订阅到的数据。
此案例分析并未看到异步的实现,这是因为我们往最简单的不需要异步的场景分析。
总结此案例的执行流程如下:
- 1、调用
Mono#just
创建一个MonoJust
发布者,并且参数传递的value
将是该发布者需要发布的数据; - 2、调用
MonoJust#subscribe(Consumber lambda)
传递一个消费者消费数据,而该消费者会被包装成一个订阅者LambdaMonoSubscriber
; - 3、
MonoJust#subscribe(CoreSubscriber)
被调用,在该方法中调用了Operators.scalarSubscription(actual, value)
创建ScalarSubscriptin
, 并且调用了订阅者LambdaMonoSubscriber
的onSubscribe
方法; - 4、
LambdaMonoSubscriber#onSubscribe
被调用,在该方法中调用了ScalarSubscription#request
请求数据; - 5、
ScalarSubscription#request
调用真实订阅者LambdaMonoSubscriber
的onNext
方法并传递数据,在onNext
方法执行完成后调用onComplete
方法; - 6、
LambdaMonoSubscriber#onNext
调用案例中的lambda
表达式消费数据。
现在我们再来丰富下案例:
public class MonoStu{
public static void main(String[] args){
// (1)
Mono.just(1)
// (2)
.map(String::valueOf)
// (3)
.subscribe(System.out::println);
}
}
我们增加了map(String::valueOf)
操作,计划在订阅到MonoJust
传递的数据时,将数据转为字符串,再将转化为字符串后的数据传递给真实订阅者订阅。
我们已经知道Mono.just
返回的是一个MonoJust
,那么map
返回的是什么呢?
以下是笔者修剪后的Mono#map
源码:
public abstract class Mono<T> implements Publisher<T> {
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
return new MonoMap<>(this, mapper);
}
}
可见, map
返回的是一个MonoMap
:
final class MonoMap<T, R> extends MonoOperator<T, R> {
final Function<? super T, ? extends R> mapper;
MonoMap(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = mapper;
}
@Override
public void subscribe(CoreSubscriber<? super R> actual) {
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
}
}
这里出现了一个新的抽象类MonoOperator
,MonoOperator
我们以后会用得很多,例如:R2DBC
动态数据源切换、Sentinel
对WebFlux
的支持都会用到。
MonoOperator
是将一个Mono
(source
)转为一个新的Mono
(此处是MonoMap
),当MonoMap
的subscribe
方法被调用时再调用source
的subscribe
方法,这样就能将两个发布者Mono
串连起来了。
由于逻辑比较绕,我们先整理下逻辑:
- 1、顺序操作
首先案例中,我们是先创建MonoJust
,然后才创建MonoMap
,所以最后调用的是MonoMap
的subscribe
方法;
- 2、倒序订阅
在(1)之后,先是MonoMap
的subscribe
方法被执行,然后由MonoMap
调用MonoJust
的subscribe
;
// source是MonoJust
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
- 3、顺序消费数据
在(2)之后,就先是MonoJust
的onSubscribe
方法被调用,所以MonoJust
的onNext
方法先被执行,然后再到MonoMap
的onSubscribe
方法被调用,所以MonoMap
的onNext
方法后执行;
所以,在此案例中,MonoMap
的subscribe
方法传递的参数才是真实的订阅者System.out::println
,而source
是MonoJust
。在MonoMap#subscribe
方法被调用时,先调用MonoJust
的subscribe
方法,并将真实订阅者封装成FluxMap.MapSubscriber
传递给MonoJust
,让MonoJust
认为FluxMap.MapSubscriber
是真实订阅者,当FluxMap.MapSubscriber
的onSubscribe
方法被调用时,再由它调用真实订阅者的onSubscribe
方法。其实是用了委托设计模式。
到此,复杂一点的例子我们也分析完成了,实际上,很多的操作都是通过MonoOperator
实现,这也是能实现上下文Context
传递的原因,所以接下来我们将分析Context
的实现。
拨开反应式编程中Context实现的神秘面纱
根据上一节总结的多个操作(发布-订阅)组合成一个流的执行流程为:顺序操作、倒序订阅、顺序消费数据,试想如何让一个Context
在流中传递呢?
- 倒序订阅:假设在流的中间某个
Mono
创建Context
,可通过subscribe
方法层层往上传递Context
; - 顺序消费:在创建
Context
的Mono
之前的Mono
都可以使用到这个Context
,而在这个Mono
之后的Mono
就获取不到该Context
;
我们画个图来理解:
一个使用Context
的简单案例:
public class ContextUseMain{
private static void testMono() {
// MonoMap
Mono<Integer> mono = Mono.just(1)
.subscriberContext(context -> {
// 可以获取到xxxx
System.out.println(context.get("xxxx").toString());
return context;
})
.map(x -> x * x);
// MonoSubscriberContext
mono = mono.subscriberContext(context -> context.put("xxxx", System.currentTimeMillis()));
// MonoMap
mono = mono.map(x -> (x + 1) * 2)
.subscriberContext(context -> {
// 这里会报空指针
System.out.println(context.get("xxxx").toString());
return context;
});
// 开始订阅
mono.subscribe(System.out::println);
}
}
mono.subscriberContext()
为什么能够获取到Context
,首先subscriberContext
返回的是一个MonoSubscriberContext
:
public abstract class Mono<T> implements Publisher<T> {
public final Mono<T> subscriberContext(Function<Context, Context> doOnContext) {
return new MonoSubscriberContext<>(this, doOnContext);
}
}
MonoSubscriberContext
类继承MonoOperator
,源码如下:
final class MonoSubscriberContext<T> extends MonoOperator<T, T> implements Fuseable {
final Function<Context, Context> doOnContext;
MonoSubscriberContext(Mono<? extends T> source,
Function<Context, Context> doOnContext) {
super(source);
this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
// (1) 获取前一个订阅者的Context
Context c = actual.currentContext();
try {
// (2) 回调方法,可往Context put key-value,返回新的Context
c = doOnContext.apply(c);
}
catch (Throwable t) {
Operators.error(actual, Operators.onOperatorError(t, actual.currentContext()));
return;
}
// (3) ContextStartSubscriber即是Subscription,也是CoreSubscriber
Subscription subscription = new FluxContextStart.ContextStartSubscriber<>(actual, c);
// 调用前一个Mono的subscribe方法
source.subscribe(subscription);
}
}
- (1):调用订阅者的
currentContext
获取订阅者的Context
,如果不存在则会返回Context.empty()
(一个空的Context
);
CoreSubscriber
提供获取Context
的API
:
public interface CoreSubscriber<T> extends Subscriber<T> {
default Context currentContext(){
return Context.empty();
}
}
(2):调用
Function
获取新的Context
,如果往currentContext
获取的Context
put
一个key-value
,那么就会创建新的Context
;(3):
FluxContextStart.ContextStartSubscriber
即是Subscription
也是Subscriber
。在构造方法中传递了Context
,所以,如果在FluxContextStart.ContextStartSubscriber
中重写currentContext
方法,就能获取到Context
;
FluxContextStart.ContextStartSubscriber
源码如下(有修剪):
static final class ContextStartSubscriber<T> implements ConditionalSubscriber<T>, InnerOperator<T, T>, QueueSubscription<T> {
final CoreSubscriber<? super T> actual;
final Context context;
ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {
this.actual = actual;
this.context = context;
}
@Override
public Context currentContext() {
return this.context;
}
}
Context.empty()
创建的是Context0
:
public interface Context {
static Context empty() {
return Context0.INSTANCE;
}
}
再来看Context0
的put
方法:
final class Context0 implements Context {
@Override
public Context put(Object key, Object value) {
return new Context1(key, value);
}
}
可见,调用Context0#put
创建的是Context1
。
Context1
源码如下:
class Context1 implements Context, Map.Entry<Object, Object> {
final Object key;
final Object value;
Context1(Object key, Object value) {
this.key = key;
this.value = value;
}
}
如果继续调用Context1#put
创建的是Context2
。
Context2
源码如下:
final class Context2 implements Context {
final Object key1;
final Object value1;
final Object key2;
final Object value2;
Context2(Object key1, Object value1, Object key2, Object value2) {
this.key1 = key1;
this.value1 = value1;
this.key2 = key2;
this.value2 = value2;
}
}
再继续调用Context2#put
创建的是Context3
,往下亦如此。
每次put
返回一个新的Context
目的是避免多线程加锁,同时也能实现数据隔离:后续操作(Mono
)不能获取之前操作(Mono
)put
的数据。
委托模式的使用与BaseSubscriber
在介绍BaseSubscriber
之前,我们先学习一个新的API:transform
。Mono/Flux
的transform
方法允许将一个Mono/Flux
转为一个MonoOperator/FluxOperator
,将订阅委托给该MonoOperator/FluxOperator
;
看个简单的案例:
public class BaseSubscriberUseMain{
public static void main(String[] args){
Mono<?> mono = createMono();
// transform 将原mono转为新的mono
mono = mono.transform((Function<Mono<?>, Publisher<?>>) m -> new MonoOperator(m) {
@Override
public void subscribe(CoreSubscriber actual) {
source.subscribe(actual);
}
});
mono.subscribe();
}
}
这样我们就可以将MonoOperator
的subscribe
方法参数传递的订阅者替换为我们自己实现的订阅者,修改后的案例如下:
public class BaseSubscriberUseMain{
public static void main(String[] args){
Mono<?> mono = createMono();
// transform 将原mono转为新的mono
mono = mono.transform((Function<Mono<?>, Publisher<?>>) m -> new MonoOperator(m) {
@Override
public void subscribe(CoreSubscriber actual) {
source.subscribe(new ActualSubscriberDelegater(actual));
}
});
mono.subscribe();
}
}
ActualSubscriberDelegater
继承BaseSubscriber
,实现hook
方法:
public class ActualSubscriberDelegater<T> extends BaseSubscriber<T> {
/**
* 真实的订阅者
*/
private CoreSubscriber<? super T> actual;
public ActualSubscriberDelegater(CoreSubscriber<? super T> actual) {
super();
this.actual = actual;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
actual.onSubscribe(subscription);
}
@Override
protected void hookOnNext(T value) {
// 获取订阅者的Context
Long time = (Long) actual.currentContext().get("xxxx");
actual.onNext(value);
}
@Override
protected void hookOnComplete() {
actual.onComplete();
}
@Override
protected void hookOnError(Throwable throwable) {
actual.onError(throwable);
}
}
看起来ActualSubscriberDelegater
代理了真实订阅者的所有行为,但这不是代理模式,而是委托模式。
hookOnSubscribe
:该方法在父类的onSubscribe
方法被调用时调用;hookOnNext
:该方法在父类的onNext
方法被调用时调用;hookOnComplete
:该方法在父类的onComplete
方法被调用时调用;hookOnError
:该方法在父类的onError
方法被调用时调用;
这样一来,ActualSubscriberDelegater
就可用来实现调试、打印日记等操作。
Alibaba Sentinel
也是通过transform API
与实现BaseSubscriber
组合使用适配反应式Reactor
库的,我们也可以使用此统计一个接口的执行耗时。
为spring-data-r2dbc
实现多数据源动态切换
hotkit-r2dbc
是笔者个人的开源项目,封装spring-data-r2dbc
多数据源动态切换的实现。
Github链接
:https://github.com/wujiuye/hotkit-r2dbc
在了解Reactor
的Context
之后,再来看笔者是如何为hotkit-r2dbc
实现多数据源动态切换就会觉得很简单,看完之后,你不需要使用hotkit-r2dbc
,也能自己为spring-data-r2dbc
实现动态数据源切换。
spring-data-r2dbc
提供连接工厂路由类:AbstractRoutingConnectionFactory
,我们只需要继承AbstractRoutingConnectionFactory
并实现它的determineCurrentLookupKey
方法,在该方法被调用时返回正确的数据源key
即可。
/**
* ConnectionFactory路由
*
* @author wujiuye 2020/11/03
*/
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final static String DB_KEY = "HOTKIT-R2DBC-DB";
public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {
// ....
setTargetConnectionFactories(connectionFactoryMap);
//....
}
@Override
protected Mono<Object> determineCurrentLookupKey() {
return Mono.subscriberContext().handle((context, sink) -> {
if (context.hasKey(DB_KEY)) {
sink.next(context.get(DB_KEY));
}
});
}
}
当然,这样还不行,还需要提供一个方法,让切面可以获取到Context
并写入数据源,所以完整的连接工厂路由器应该是这样的:
/**
* ConnectionFactory路由
*
* @author wujiuye 2020/11/03
*/
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final static String DB_KEY = "HOTKIT-R2DBC-DB";
public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {
// ....
setTargetConnectionFactories(connectionFactoryMap);
//....
}
// 写入数据源
public static <T> Mono<T> putDataSource(Mono<T> mono, String dataSource) {
return mono.subscriberContext(context -> context.put(DB_KEY, dataSource));
}
// 写入数据源
public static <T> Flux<T> putDataSource(Flux<T> flux, String dataSource) {
return flux.subscriberContext(context -> context.put(DB_KEY, dataSource));
}
@Override
protected Mono<Object> determineCurrentLookupKey() {
return Mono.subscriberContext().handle((context, sink) -> {
if (context.hasKey(DB_KEY)) {
sink.next(context.get(DB_KEY));
}
});
}
}
切面类的实现如下:
@Component
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE)
public class DynamicDataSourceAop {
@Pointcut(value = "@annotation(com.wujiuye.hotkit.r2dbc.annotation.R2dbcDataBase)")
public void point() {
}
@Around(value = "point()")
public Object aroudAop(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
R2dbcDataBase dataSource = method.getAnnotation(R2dbcDataBase.class);
// 方法返回值类型为Mono
if (method.getReturnType() == Mono.class) {
return HotkitR2dbcRoutingConnectionFactory.putDataSource((Mono<?>) pjp.proceed(), dataSource.value());
}
// 方法返回值类型为Flux
else {
return HotkitR2dbcRoutingConnectionFactory.putDataSource((Flux<?>) pjp.proceed(), dataSource.value());
}
}
}
切面在目标方法执行完成返回Mono
或者Flux
之后,才调用返回值Mono
或Flux
的subscriberContext
方法将返回值Mono
转为一个MonoSubscriberContext
,或者将返回值Flux
转为一个FluxContextStart
。
以返回值类型为Mono
为例,在MonoSubscriberContext#subscribe
方法的回调Function
中为Context
写入数据源,在Mono
最终被订阅时,由MonoSubscriberContext
将Context
传递给订阅者。
注意数据源切面与事务切面的顺序问题,避免事务不生效。
用一个测试用例说明数据源切换的时机:
public class RoutingTest extends SupporSpringBootTest {
@Resource
private DatabaseClient client;
@Test
public void test() throws InterruptedException {
Mono<Void> operation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34)
.fetch()
.rowsUpdated()
.then();
// 切换数据源
Mono<Void> dbOperation = HotkitR2dbcRoutingConnectionFactory.putDataSource(operation,MasterSlaveMode.Slave);
dbOperation.subscribe();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
其中发布者operation
在真正执行Sql
之前肯定是先要从连接工厂路由器获取连接工厂的发布者,也就是连接工厂路由器的determineCurrentLookupKey
被调用。
因此,在operation
转为dbOperation
之后,订阅dbOperation
时为dbOperation
的订阅者写入Context
并put
数据源,Context
在流中传递,最终operation
的订阅者就能调用determineCurrentLookupKey
方法从订阅者的Context
获取到数据源,这就是多数据源切换的实现原理。
End
本篇只介绍Mono
的同步订阅流程,这是因为同步订阅更易于理解,Mono
相比Flux
也更容易介绍。
笔者建议:在了解本篇介绍的知识点之后,可阅读Flux
的源码,或者实现异步订阅的源码,加深对反应式编程库Reactor
库的理解。