深入浅出反应式编程原理,反应式编程入门

原创 吴就业 129 0 2020-11-20

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/0187df0671004511a97d248978a7329e

作者:吴就业
链接:https://wujiuye.com/article/0187df0671004511a97d248978a7329e
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2020年11月20日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

原标题《往简单的方向深入理解,或许反应式编程更容易入门》

反应式编程虽然能提升性能,有诸多好处,却也带来一些弊端,增加代码的复杂度、高度的API侵入(相当于依赖了一个JDK)。

笔者个人认为,反应式编程不适用于业务开发,特别是复杂业务系统的开发,这或许就是反应式编程从推出到现在依然不温不火的原因吧。当然,这并不是劝说大家从入门到放弃。反应式编程适合做对性能要求高的中间件,或者脱离业务的底层系统,例如网关、消息推送服务。

反应式编程Reactor库完全实现了Reactive Streams规范,Reactive Streams定义了反应式编程的规范,如果你到Github查看它,你将只会看到这四个接口:PublisherSubscriberSubscriptionProcessor

在了解这几个接口之前,我们需要先了解什么是反应式编程。

本文观点仅站在笔者的个人角度理解,正确性与笔者的水平有关,读者在阅读本篇文章过程中,如果有疑惑的地方欢迎留言探讨!

以往阻塞式编程我们发起远程调用等I/O操作都是阻塞当前线程以等待接口的响应,待接收到响应后再消费响应结果。在等待过程中该线程会一直处于空闲状态,而如果是反应式编程,在发起请求后,当前线程就会转去做别的事情,直到接收到响应结果,再发布响应结果给订阅者继续消费响应结果,当然,发起网络请求不在Reactor库的职责范围内。

反应式编程,由发布者通过发布数据传递给订阅者消费数据。

反应式流指的是一个原始数据经过多重操作或者转化后,最终被订阅者消费。而每一步操作或转化也都是一次数据的发布订阅,这些发布订阅按顺序组合到一起就构成了反应式流。

Reactive Streams规范

由于这几个接口只是Reactive Streams定义的规范,详细执行过程需要结合Reactive Streams规范的实现库理解,因此我们先简单熟悉一下这几个接口,然后再介绍Reactor如何实现这些接口。

Publisher(发布者)

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

如果接触过WebFlux或者用过Reactor库应该对subscribe方法不陌生,尽管还不了解工作原理,但至少我们知道,只有调用Mono/Fluxsubscribe方法才会触发整个流执行。

即便没有接触过WebFlux或者Reactor,我们也应该都接触过Java8提供的StreamJava8 Stream只有遇到终止操作才会触发流的执行,而反应式编程Publishersubscribe方法就相当于Java8 Stream的终止操作。

在没有调用Publisher#subscribe方法之前,一切操作都只是我们定义的执行计划,执行计划制定整个流的执行过程。

Subscriber(订阅者)

public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}

Reactive Streams规范定义订阅者(Subscriber)可以订阅四种事件:

Subscription(订阅)

public interface Subscription {
    void request(long n);
    void cancel();
}

订阅操作(Subscription),相当于是一个场景类。

Reactor是如何实现Reactive Streams规范的

一个简单的Mono使用例子如下。

public class MonoStu{
    public static void main(String[] args){
        // (1)
        Mono.just(1)
        // (2)
            .subscribe(System.out::println);
    }
}

我们将一步步分析此案例的执行流程,以此了解Reactor是如何实现Reactive Streams规范的。

阅读本文不需要读者去翻阅源码,当然,如果能结合源码一起看效果更佳。Mono源码在reactor-core库的reactor.core.publisher包下。

public abstract class Mono<T> 
       implements Publisher<T> {
}

Mono是一个抽象类,它实现了Reactive Streams规范的Publisher接口,并扩展发布者的操作以提供流式编程(反应式流)。

我们先看Mono#just静态方法:

public abstract class Mono<T> implements Publisher<T> {
    public static <T> Mono<T> just(T data) {
		return onAssembly(new MonoJust<>(data));
	}
}

初次阅读源码,并且在不了解Reactor库的情况下,笔者不建议大家去纠结onAssembly方法,这也是学习方法,先掌握主干,再去关心细枝末节,所以我们选择忽略onAssembly方法。

忽略onAssembly之后的Mono#just静态方法(后文同):

public abstract class Mono<T> implements Publisher<T> {
    public static <T> Mono<T> just(T data) {
		return new MonoJust<>(data);
	}
}

just方法返回一个MonoJust对象,此类继承Mono。类似于我们使用Builder构造者模式,每调用一个方法都会返回this,直到调用build方法。只是ReactorMono#just返回的是一个新的Mono对象。

本着阅读源码先掌握主干的宗旨,我们去掉了MonoJust实现的其它接口,只关心它继承Mono实现的方法。经过精简后MonoJust的源码如下。

final class MonoJust<T> extends Mono<T> {

	final T value;

	MonoJust(T value) {
		this.value = value;
	}
    // 把注意力集中到这
	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		actual.onSubscribe(Operators.scalarSubscription(actual, value));
	}
}

由于Mono也是一个发布者,它实现了Publisher接口,所以我们重点关注的是MonoJust实现的subscribe方法。

如上面源码所示,MonoJust#subscribe方法的入参是一个CoreSubscriber,说明在父类(Mono)已经实现过Publisher接口的subscribe方法,源码如下。

public abstract class Mono<T> implements Publisher<T> {
    @Override
	public final void subscribe(Subscriber<? super T> actual) {
        // 原本是onLastAssembly(this) ,
        // 我们忽略onLastAssembly,直接使用this
		this.subscribe(Operators.toCoreSubscriber(actual));
	}
    // 子类MonoJust实现
	public abstract void subscribe(CoreSubscriber<? super T> actual);
}

现在我们只知道CoreSubscriber应该是Subscriber的子类,目前来说,了解到这一层足够了。

我们先继续看案例中的下一句#.subscribe(System.out::println),然后再回头分析MonoJust#subscribe方法。

Mono提供很多个subscribe方法的重载,无论我们使用哪个重载方法,最后都会调用Mono实现Publisher接口的subscribe方法,也就会调用到Mono子类实现的subscribe方法。

此案例中,我们调用subscribe传入的是一个lambda,对应是实现Consumer接口的accept方法,该Consumer最终被包装成LambdaMonoSubscriber,代码如下。

public abstract class Mono<T> implements Publisher<T> {
    public final Disposable subscribe(Consumer<? super T> consumer) {
        // 创建LambdaMonoSubscriber
		return subscribeWith(new LambdaMonoSubscriber<>(consumer, null, null, null));
	}
}

请把注意力集中在consumernew LambdaMonoSubscriber()以及subscribeWith方法上,不要走神哦,因为逻辑有点绕,一走神就看不懂了。

subscribeWith方法源码如下:

public abstract class Mono<T> implements Publisher<T> {
    public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
        // 调用的是Mono实现Publisher接口的subscribe方法
		subscribe(subscriber);
        // 刚入门不用考虑返回值Disposable,这是用于取消订阅的
		// return subscriber;
	}
}

subscribeWith调用Mono实现Publisher接口的subscribe方法,因此LambdaMonoSubscriber必然是一个Subscriber

到此,我们应该关心发布者Mono是如何传递数据给到订阅者(LambdaMonoSubscriber)的,为此我们需要回头分析MonoJust#subscribe方法。

final class MonoJust<T> extends Mono<T> {
	final T value;
    
	MonoJust(T value) {
	    this.value = value;
	}
    
    // 参数actual是LambdaMonoSubscriber
	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
        //  创建Subscription
        Subscription subscription = Operators.scalarSubscription(actual, value);
		actual.onSubscribe(subscription);
	}
}

此处subscribe的参数actual就是订阅者LambdaMonoSubscriber对象,发布者MonoJust直接在subscribe方法中调用订阅者的onSubscribe方法。

Reactive Streams规范中,订阅者SubscriberonSubscribe方法要求传入的是一个Subscription,所以MonoJust#subscribe方法中需要将真实订阅者LambdaMonoSubscriber和数据value封装成一个Subscription,这个Subscription同时也是一个Subscriber

LambdaMonoSubscriber会在onSubscribe方法中调用Subscriptionrequest请求数据。

LambdaMonoSubscriber源码如下,我们忽略LambdaMonoSubscriber构造方法传入的空参数,以简化LambdaMonoSubscriber的代码,便于阅读。

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
    
     // consumer是案例传递的:System.out::println
	final Consumer<? super T>  consumer;
    // 现在不要去考虑为什么要使用volatile
	volatile Subscription subscription;

	LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer) {
		this.consumer = consumer;
	}

    // s = MonoJust中的Operators.scalarSubscription(actual, value)返回值
	@Override
	public final void onSubscribe(Subscription s) {
	    this.subscription = s;
        // 请求数据
	    s.request(Long.MAX_VALUE);
	}

}

由于我们研究的是Mono,所以onSubscribe方法请求获取所有的数据s.request(Long.MAX_VALUE)

在分析此方法之前,得要知道Operators.scalarSubscription返回的Subscription是一个ScalarSubscription(同步订阅的实现)。

static final class ScalarSubscription<T> {
 
        // 这里是LambdaMonoSubscriber
		final CoreSubscriber<? super T> actual;
        // 数据
		final T value;
		
		ScalarSubscription(CoreSubscriber<? super T> actual, T value) {
			this.value = Objects.requireNonNull(value, "value");
			this.actual = Objects.requireNonNull(actual, "actual");
		}
		
		@Override
		public void request(long n) {
		    Subscriber<? super T> a = actual;
		    a.onNext(value);
		    a.onComplete();
		}	
}

为了简单,笔者又把ScalarSubscription代码去掉了很多,这样看起来较为容易理解。

真实订阅者actual与数据value都是在构造方法中传入的,在案例中,actual就是LambdaMonoSubscribervalue就是调用Mono#just传入的1

当调用LambdaMonoSubscriber#onSubscribe方法时,ScalarSubscription#request被调用,request方法中直接调用真实订阅者actualLambdaMonoSubscriber)的onNext方法传递数据value,并且在onNext方法执行结束之后,订阅者actualLambdaMonoSubscriber)的onComplete方法被调用。

此处订阅者actualLambdaMonoSubscriberLambdaMonoSubscriber#onNext方法经过笔者修剪后的源码如下:

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {
    @Override
    public final void onNext(T x) {
        if (consumer != null) {
            try {
                // 执行main方法中subscribe传入的lambda==>System.out::println
                consumer.accept(x);
            }catch (Throwable t) {
                Operators.onErrorDropped(t, Context.empty());
            }
        }
    }
}

onNext中调用的consumer正是我们传递的lambda表达式,也就是打印输出订阅到的数据。

此案例分析并未看到异步的实现,这是因为我们往最简单的不需要异步的场景分析。

总结此案例的执行流程如下:

现在我们再来丰富下案例:

public class MonoStu{
    public static void main(String[] args){
        // (1)
        Mono.just(1)
        // (2)
            .map(String::valueOf)
        // (3)
            .subscribe(System.out::println);
    }
}

我们增加了map(String::valueOf)操作,计划在订阅到MonoJust传递的数据时,将数据转为字符串,再将转化为字符串后的数据传递给真实订阅者订阅。

我们已经知道Mono.just返回的是一个MonoJust,那么map返回的是什么呢?

以下是笔者修剪后的Mono#map源码:

public abstract class Mono<T> implements Publisher<T> {
    public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
		return new MonoMap<>(this, mapper);
	}
}

可见, map返回的是一个MonoMap

final class MonoMap<T, R> extends MonoOperator<T, R> {

	final Function<? super T, ? extends R> mapper;
	
	MonoMap(Mono<? extends T> source, Function<? super T, ? extends R> mapper) {
		super(source);
		this.mapper = mapper;
	}

	@Override
	public void subscribe(CoreSubscriber<? super R> actual) {
		source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));
	}
}

这里出现了一个新的抽象类MonoOperatorMonoOperator我们以后会用得很多,例如:R2DBC动态数据源切换、SentinelWebFlux的支持都会用到。

MonoOperator是将一个Monosource)转为一个新的Mono(此处是MonoMap),当MonoMapsubscribe方法被调用时再调用sourcesubscribe方法,这样就能将两个发布者Mono串连起来了。

由于逻辑比较绕,我们先整理下逻辑:

首先案例中,我们是先创建MonoJust,然后才创建MonoMap,所以最后调用的是MonoMapsubscribe方法;

在(1)之后,先是MonoMapsubscribe方法被执行,然后由MonoMap调用MonoJustsubscribe

// source是MonoJust
source.subscribe(new FluxMap.MapSubscriber<>(actual, mapper));

在(2)之后,就先是MonoJustonSubscribe方法被调用,所以MonoJustonNext方法先被执行,然后再到MonoMaponSubscribe方法被调用,所以MonoMaponNext方法后执行;

所以,在此案例中,MonoMapsubscribe方法传递的参数才是真实的订阅者System.out::println,而sourceMonoJust。在MonoMap#subscribe方法被调用时,先调用MonoJustsubscribe方法,并将真实订阅者封装成FluxMap.MapSubscriber传递给MonoJust,让MonoJust认为FluxMap.MapSubscriber是真实订阅者,当FluxMap.MapSubscriberonSubscribe方法被调用时,再由它调用真实订阅者的onSubscribe方法。其实是用了委托设计模式。

到此,复杂一点的例子我们也分析完成了,实际上,很多的操作都是通过MonoOperator实现,这也是能实现上下文Context传递的原因,所以接下来我们将分析Context的实现。

拨开反应式编程中Context实现的神秘面纱

根据上一节总结的多个操作(发布-订阅)组合成一个流的执行流程为:顺序操作、倒序订阅、顺序消费数据,试想如何让一个Context在流中传递呢?

我们画个图来理解:

一个使用Context的简单案例:

public class ContextUseMain{
  private static void testMono() {
        // MonoMap
        Mono<Integer> mono = Mono.just(1)
                .subscriberContext(context -> {
                    // 可以获取到xxxx
                    System.out.println(context.get("xxxx").toString());
                    return context;
                })
                .map(x -> x * x);
        // MonoSubscriberContext
        mono = mono.subscriberContext(context -> context.put("xxxx", System.currentTimeMillis()));
        // MonoMap
        mono = mono.map(x -> (x + 1) * 2)
                 .subscriberContext(context -> {
                     // 这里会报空指针
                    System.out.println(context.get("xxxx").toString());
                    return context;
                });
        // 开始订阅
        mono.subscribe(System.out::println);
    }
}

mono.subscriberContext()为什么能够获取到Context,首先subscriberContext返回的是一个MonoSubscriberContext

public abstract class Mono<T> implements Publisher<T> {
    public final Mono<T> subscriberContext(Function<Context, Context> doOnContext) {
		return new MonoSubscriberContext<>(this, doOnContext);
    }
}

MonoSubscriberContext类继承MonoOperator,源码如下:

final class MonoSubscriberContext<T> extends MonoOperator<T, T> implements Fuseable {

	final Function<Context, Context> doOnContext;

	MonoSubscriberContext(Mono<? extends T> source,
			Function<Context, Context> doOnContext) {
		super(source);
		this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
	}

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		// (1) 获取前一个订阅者的Context
        Context c = actual.currentContext();
		try {
            // (2) 回调方法,可往Context put key-value,返回新的Context
			c = doOnContext.apply(c);
		}
		catch (Throwable t) {
			Operators.error(actual, Operators.onOperatorError(t, actual.currentContext()));
			return;
		}
        // (3) ContextStartSubscriber即是Subscription,也是CoreSubscriber
        Subscription  subscription = new FluxContextStart.ContextStartSubscriber<>(actual, c);
        // 调用前一个Mono的subscribe方法
		source.subscribe(subscription);
	}

}

CoreSubscriber提供获取ContextAPI:

public interface CoreSubscriber<T> extends Subscriber<T> {
	default Context currentContext(){
		return Context.empty();
	}
}

FluxContextStart.ContextStartSubscriber源码如下(有修剪):

static final class ContextStartSubscriber<T> implements ConditionalSubscriber<T>, InnerOperator<T, T>,  QueueSubscription<T> {

    final CoreSubscriber<? super T>        actual;
    final Context                          context;

    ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {
        this.actual = actual;
        this.context = context;
    }

    @Override
	public Context currentContext() {
        return this.context;
    }
}

Context.empty()创建的是Context0

public interface Context {
    static Context empty() {
		return Context0.INSTANCE;
    }
}

再来看Context0put方法:

final class Context0 implements Context {
	@Override
	public Context put(Object key, Object value) {
		return new Context1(key, value);
	}
}

可见,调用Context0#put创建的是Context1

Context1源码如下:

class Context1 implements Context, Map.Entry<Object, Object> {

	final Object key;
	final Object value;

	Context1(Object key, Object value) {
		this.key = key;
		this.value = value;
	}
}

如果继续调用Context1#put创建的是Context2

Context2源码如下:

final class Context2 implements Context {

	final Object key1;
	final Object value1;
	final Object key2;
	final Object value2;
	Context2(Object key1, Object value1, Object key2, Object value2) {
		this.key1 = key1;
		this.value1 = value1;
		this.key2 = key2;
		this.value2 = value2;
	}
}

再继续调用Context2#put创建的是Context3,往下亦如此。

每次put返回一个新的Context目的是避免多线程加锁,同时也能实现数据隔离:后续操作(Mono)不能获取之前操作(Monoput的数据。

委托模式的使用与BaseSubscriber

在介绍BaseSubscriber之前,我们先学习一个新的API:transformMono/Fluxtransform方法允许将一个Mono/Flux转为一个MonoOperator/FluxOperator,将订阅委托给该MonoOperator/FluxOperator

看个简单的案例:

public class BaseSubscriberUseMain{
    public static void main(String[] args){
         Mono<?> mono = createMono();
        // transform 将原mono转为新的mono
        mono = mono.transform((Function<Mono<?>, Publisher<?>>) m -> new MonoOperator(m) {
            @Override
            public void subscribe(CoreSubscriber actual) {
                source.subscribe(actual);
            }
        });
        mono.subscribe();
    }
}

这样我们就可以将MonoOperatorsubscribe方法参数传递的订阅者替换为我们自己实现的订阅者,修改后的案例如下:

public class BaseSubscriberUseMain{
    public static void main(String[] args){
         Mono<?> mono = createMono();
        // transform 将原mono转为新的mono
        mono = mono.transform((Function<Mono<?>, Publisher<?>>) m -> new MonoOperator(m) {
            @Override
            public void subscribe(CoreSubscriber actual) {
                source.subscribe(new ActualSubscriberDelegater(actual));
            }
        });
        mono.subscribe();
    }
}

ActualSubscriberDelegater继承BaseSubscriber,实现hook方法:

public class ActualSubscriberDelegater<T> extends BaseSubscriber<T> {

    /**
     * 真实的订阅者
     */
    private CoreSubscriber<? super T> actual;

    public ActualSubscriberDelegater(CoreSubscriber<? super T> actual) {
        super();
        this.actual = actual;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        actual.onSubscribe(subscription);
    }

    @Override
    protected void hookOnNext(T value) {
        // 获取订阅者的Context
        Long time = (Long) actual.currentContext().get("xxxx");
        actual.onNext(value);
    }

    @Override
    protected void hookOnComplete() {
        actual.onComplete();
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        actual.onError(throwable);
    }
}

看起来ActualSubscriberDelegater代理了真实订阅者的所有行为,但这不是代理模式,而是委托模式。

这样一来,ActualSubscriberDelegater就可用来实现调试、打印日记等操作。

Alibaba Sentinel也是通过transform API与实现BaseSubscriber组合使用适配反应式Reactor库的,我们也可以使用此统计一个接口的执行耗时。

spring-data-r2dbc实现多数据源动态切换

hotkit-r2dbc是笔者个人的开源项目,封装spring-data-r2dbc多数据源动态切换的实现。

在了解ReactorContext之后,再来看笔者是如何为hotkit-r2dbc实现多数据源动态切换就会觉得很简单,看完之后,你不需要使用hotkit-r2dbc,也能自己为spring-data-r2dbc实现动态数据源切换。

spring-data-r2dbc提供连接工厂路由类:AbstractRoutingConnectionFactory,我们只需要继承AbstractRoutingConnectionFactory并实现它的determineCurrentLookupKey方法,在该方法被调用时返回正确的数据源key即可。

/**
 * ConnectionFactory路由
 *
 * @author wujiuye 2020/11/03
 */
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

    private final static String DB_KEY = "HOTKIT-R2DBC-DB";

    public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {      
        // ....
        setTargetConnectionFactories(connectionFactoryMap);
        //....
    }
    
    @Override
    protected Mono<Object> determineCurrentLookupKey() {
        return Mono.subscriberContext().handle((context, sink) -> {
            if (context.hasKey(DB_KEY)) {
                sink.next(context.get(DB_KEY));
            }
       });
    }

}

当然,这样还不行,还需要提供一个方法,让切面可以获取到Context并写入数据源,所以完整的连接工厂路由器应该是这样的:

/**
 * ConnectionFactory路由
 *
 * @author wujiuye 2020/11/03
 */
public class HotkitR2dbcRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

    private final static String DB_KEY = "HOTKIT-R2DBC-DB";

    public HotkitR2dbcRoutingConnectionFactory(Map<String, ConnectionFactory> connectionFactoryMap) {      
        // ....
        setTargetConnectionFactories(connectionFactoryMap);
        //....
    }
 
    // 写入数据源
    public static <T> Mono<T> putDataSource(Mono<T> mono, String dataSource) {
        return mono.subscriberContext(context -> context.put(DB_KEY, dataSource));
    }
    
    // 写入数据源
    public static <T> Flux<T> putDataSource(Flux<T> flux, String dataSource) {
        return flux.subscriberContext(context -> context.put(DB_KEY, dataSource));
    }

    @Override
    protected Mono<Object> determineCurrentLookupKey() {
       return Mono.subscriberContext().handle((context, sink) -> {
            if (context.hasKey(DB_KEY)) {
                sink.next(context.get(DB_KEY));
            }
       });
    }
}

切面类的实现如下:

@Component
@Aspect
@Order(Ordered.HIGHEST_PRECEDENCE)
public class DynamicDataSourceAop {

    @Pointcut(value = "@annotation(com.wujiuye.hotkit.r2dbc.annotation.R2dbcDataBase)")
    public void point() {
    }

    @Around(value = "point()")
    public Object aroudAop(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        R2dbcDataBase dataSource = method.getAnnotation(R2dbcDataBase.class);
        // 方法返回值类型为Mono
        if (method.getReturnType() == Mono.class) {
            return HotkitR2dbcRoutingConnectionFactory.putDataSource((Mono<?>) pjp.proceed(), dataSource.value());
        }
        // 方法返回值类型为Flux 
        else {
            return HotkitR2dbcRoutingConnectionFactory.putDataSource((Flux<?>) pjp.proceed(), dataSource.value());
        }
    }
}

切面在目标方法执行完成返回Mono或者Flux之后,才调用返回值MonoFluxsubscriberContext方法将返回值Mono转为一个MonoSubscriberContext,或者将返回值Flux转为一个FluxContextStart

以返回值类型为Mono为例,在MonoSubscriberContext#subscribe方法的回调Function中为Context写入数据源,在Mono最终被订阅时,由MonoSubscriberContextContext传递给订阅者。

注意数据源切面与事务切面的顺序问题,避免事务不生效。

用一个测试用例说明数据源切换的时机:

public class RoutingTest extends SupporSpringBootTest {

    @Resource
    private DatabaseClient client;
    
    @Test
    public void test() throws InterruptedException {
         Mono<Void> operation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
                .bind("id", "joe")
                .bind("name", "Joe")
                .bind("age", 34)
                .fetch()
                .rowsUpdated()
                .then();
        // 切换数据源
        Mono<Void> dbOperation = HotkitR2dbcRoutingConnectionFactory.putDataSource(operation,MasterSlaveMode.Slave);
        dbOperation.subscribe();
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }

}

其中发布者operation在真正执行Sql之前肯定是先要从连接工厂路由器获取连接工厂的发布者,也就是连接工厂路由器的determineCurrentLookupKey被调用。

因此,在operation转为dbOperation之后,订阅dbOperation时为dbOperation的订阅者写入Contextput数据源,Context在流中传递,最终operation的订阅者就能调用determineCurrentLookupKey方法从订阅者的Context获取到数据源,这就是多数据源切换的实现原理。

End

本篇只介绍Mono的同步订阅流程,这是因为同步订阅更易于理解,Mono相比Flux也更容易介绍。

笔者建议:在了解本篇介绍的知识点之后,可阅读Flux的源码,或者实现异步订阅的源码,加深对反应式编程库Reactor库的理解。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

替换Shiro框架后,上线就Bug了,异步线程获取不到Session

我们将原有项目的登录授权功能从Shiro切换到接入SSO单点登录服务并非一帆风顺,因为系统多了,总有一些让我们预想不到的骚操作。

如何实现SSO单点登录

随着公司业务的发展,子系统越来越多,实现SSO单点登录的需求就愈加迫切。本篇介绍笔者如何实现SSO单点登录系统。

如何并行消费Kafka拉取的数据库Binlog,提升吞吐量

本篇介绍如何并行消费Kafka拉取的数据库Binlog,以及使用Kafka订阅Binlog字段值获取防坑指南(阿里云DTS)。

如何使用Kafka订阅数据库的实时Binlog

订阅Binlog的目的在于,实现实时的缓存更新、处理复杂逻辑数据实时同步到Elasticsearch或其它库-表等业务场景,本篇介绍如何使用Kafka订阅数据库的实时Binlog。

Spring Data R2DBC快速上手指南

本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。

使用Spring WebFlux + R2DBC搭建消息推送服务

消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。