2.1 对Flux.subscribe订阅逻辑的解读

下面直接看看Flux.subscribe的订阅逻辑相关源码,对其进行一一分析,从而引出相关内容:

在这里,Consumer参数类型使用了java.util.function.Consumer,是从JDK 8开始使用的。先跳过LambdaSubscriber的定义,通过onLastAssembly方法可以知道,我们提前自定义了一个钩子函数Hooks.onLastOperatorHook,这样在每次发生订阅时都会进行统一的动作操作,可以认为这是一个拦截器。其典型应用是测试时的应用,在本书第10章中,将会看到Reactor 3测试库的编写实现,其中会涉及onLastAssembly方法的使用细节。

2.1.1 对CoreSubscriber的解读

从上面的源码可以发现,LambdaSubscriber实现了CoreSubscriber接口。根据该接口,可以衍生出各种各样的订阅者,对于生产者的Publisher接口,也是如此,所以从上面源码最后一行可以看到,Flux留了一个抽象的subscribe方法,留给具体的实现类来实现。只是最初的Publisher#subscribe(Subscriber)的参数类型Subscriber变为了CoreSubscriber,而为了保证Rx标准的统一,CoreSubscriber继承了org.reactivestreams.Subscriber接口,同时加入了一些Reactor 3特有的Context功能实现。

下面对CoreSubscriber来进行分析:

从这个接口的注释可以知道,如果订阅者发出的元素请求数量小于或等于0,则请求不会产生onError事件,而只会简单地忽略错误。我们可以将其与RxJava 2中的实现进行对比:

可以看到,当元素请求数量n小于或等于0时,会产生一个onError事件,并标明此事件为参数异常事件类型。Reactor 3.1+中的实现方式如下:

从上面的实现源码可以看出,这里仅记录了日志,并没有产生onError事件。

另外,从currentContext的定义可知,其主要用于元素下发过程中的中间操作或者中间定义的订阅者上。内部涉及的Context,主要用于存储此订阅者产生订阅到结束这一过程中的信息(比如异常信息、临时中间变量),这些信息可以被订阅者获取,其有点类似于ThreadLocal,但它是针对多线程调度下Reactor特有的东西,后面会用专门的篇幅来进行介绍。知道了这些内容,再回到当前环境下,在这里,根据具体的错误操作或者丢弃操作来做一些具体的设置,比如:

上述源码涉及两块内容:

其一,如果已经结束下发,那么采用放弃策略Operators.onNextDropped。

其二,如果往队列中添加元素失败,那么针对这个异常包装出一个错误事件Operators.onOperatorError,用于下发错误(可以在FluxPublishOn.PublishOnConditionalSubscriber#checkTerminated中看到该过程),Operators.onNextDropped和Operators.onOperatorError的内部都使用了actual.currentContext。此处,只分析Operators.onNextDropped,剩下的内容请读者自行探索:

在默认的情况下,由actual.currentContext可知,传入的context会产生一个Context0实例,这里的0代表其中没有键值对,而假如这个数是1,则代表有一个键值对,这个Context0实例是不可变的,调用该Context0实例的put方法,也就是重新生成一个全新的实例,这样也就保证了整个操作上下文的安全(因为原来那个对象并未发生改变,也就是那些管理着原来那个对象的线程根本无须担心对象会发生改变)。相关源码大家可以自行查阅,此处点到为止。

在这里,如果默认情况下不存在Hooks.KEYONNEXT_DROPPED这个key,它会返回一个null,这时会将Hooks.onNextDroppedHook赋值给hook,并在下一个if判断中调用执行该hook。假如Reactor全局环境下并没有设定Hooks.onNextDroppedHook这个钩子函数的实现,而且此时开启了Debug日志管理,则进入log.debug,同样也可以自行设定actual.currentContext,这样也就做到了自定义CoreSubscriber所特有的异常处理机制。

以上就是Reactor 3.1+中对异常的一些处理,未来在我们开发拓展API的时候,可以适当地使用这些处理方法。

2.1.2 对LambdaSubscriber的解读

在产生订阅时往往会自定义一些元素消费操作,这些操作会被Reactor 3包装成一个LambdaSubscriber类型的实例。这个类中有一些值得我们学习的亮点:

从传统的订阅逻辑来看,首先会调用onSubscribe方法,如果没有定义subscriptionConsumer,默认会最大化元素请求数量。在消费下发元素的时候调用onNext方法,其中的代码逻辑比较简单,不再赘述。

接下来,将要介绍的是LambdaSubscriber中的一种很实用的使用原子类的方法,即AtomicXxxFieldUpdater的技法应用,该技法可以直接应用于实际的项目。

2.1.3 AtomicXxxFieldUpdater的技法应用

在《Java编程方法论:响应式RxJava与代码设计实战》一书中,提及的RxJava 2的源码中也大量使用了原子类的一些特性用法,但它们往往是基于类级别的操作,这就导致其相对不灵活。假如一个类中需要定义两个或更多类型的原子类,仅仅将类本身定义为原子类来进行操作是完全不够的。那么有没有一个既可以基于类本身又可以与多个原子类相关的方便操作呢?本节就来展示这种技法。

首先,定义一个volatile变量:volatile Subscription subscription。

然后,因为此变量是一个Subscription类型对象,所以通过AtomicReferenceFieldUpdater.new Updater(LambdaSubscriber.class,Subscription.class,"subscription")将其加入原子类管理字段中,也就是LambdaSubscriber.class类下的一个类型为Subscription.class的volatile变量字段,其字段名称为subscription,并得到一个AtomicReferenceFieldUpdater类型的变量S。

最后,通过S.getAndSet(this,Operators.cancelledSubscription())将此subscription变量的值通过原子类操作进行改变。此操作返回的是修改subscription之后的值。

这其中到底发生了什么,下面试着探讨一下相关源码:

从newUpdater方法的注释可以看出,在3个参数中,tclass为所操作的目标类的类型,vclass为所操作目标类中字段的类型,fieldName为所操作目标字段的名字。这个方法返回一个AtomicReferenceFieldUpdaterImpl类型实例。

而在AtomicReferenceFieldUpdaterImpl构造器内,进行了一系列反射操作,以及一些对类和字段的权限判定和异常判断。请注意,这里的this.offset的初始值就是我们所传递字段在对象分配地址中的相对位置(可以这么理解:我摸到你的头顶,然后鼻子距离头顶的offset是10cm的话,我就很容易通过相对距离找到鼻子),这个相对位置在内存加载class字节码的时候就已经确定了。另外,所传递的目标字段必须是引用类型的,而且目标字段必须是volatile变量。CAS就是针对volatile的特性来做的基于内存级别的更加直接的优化操作,我们在实际应用的时候,切记将目标字段定义为volatile变量是一个大前提。其在JDK中的应用场景非常多,比如Varhandler操作(基于JDK 9+)。所以,此处的代码依然是可以进行迭代的,迭代方式请参考AQS的设计(java.util.concurrent.locks.AbstractQueuedSynchronizer):

通过AQS的设计可以发现,好的代码库也是把JDK源码中的一些设计拿来用而已,而且我们也可以找到迭代权威代码库的切入点,不断提升自己的水平。

接着来看看S.getAndSet(this,Operators.cancelledSubscription()),下面是它的源码:

此处,根据类生成的对象分配地址和此地址的偏移量offset,找到此对象在JVM中存储的定义字段的索引,获取这个字段对应的值,然后与新值一起执行CAS操作。这么说可能有些抽象,下面来看看this.offset=U.objectFieldOffset(field)的相关源码:

从这段源码的注释可以知道,不要指望对内存地址的相对偏移量执行任何改变操作,另外,一个对象中的两个字段的偏移量是不可能相同的。

这也是原子类操作的核心概念,但是很多图书和博客都没把这点讲清楚。由此可见,Reactor也应用了JDK 8中的一些东西。