2.3 蛇行走位的QueueSubscription

从QueueSubscription接口的注释可以知道,它主要用于对那些内部有队列支持的subscription进行优化。

对于那些有固定数量大小的同步源序列,可以通过拉取的方式来发射元素,在很多情况下这样避免了请求计算的开销。而那些异步源序列,则同时扮演了queue和subscription两种角色,其中的大部分操作都是将元素存储至一个又一个队列数组中(队列满了则再创建一个队列插入上一个队列的队尾)。这样下发和创建数组都是需要进行计算的,比如前面介绍过的reactor.core.publisher.FluxCreate.BufferAsyncSink#next操作,里面包含了queue.offer(t),可支持背压,然后根据元素请求数量在一个循环体中调用queue.poll方法来完成元素的下发。

2.3.1 无界队列SpscLinkedArrayQueue

在这里,先来看看Reactor 3中对RxJava 2中SpscLinkedArrayQueue部分的迭代。通过本节,我们可以对《Java编程方法论:响应式RxJava与代码设计实战》“Flowable与背压”一章的“BackpressureStrategy.BUFFER策略”一节中所述的内容有更深一步的理解(没有看过这部分内容也不影响理解本节知识)。

首先来看看源码:

在这里,针对多线程下的安全操作,分别对索引和producerArray进行原子化,接着设计一个无界队列,如图2-3所示。

图2-3

首先得到一个原子类型的数组,把该数组最后一个位置空出来,用于存储下一个数组的引用。然后假设在存储一个元素A时,若发现将该元素存储到数组中会造成数组内部空间占满,那么此时在A要存储的位置放置一个关键字元素NEXT(一个final常量),用于标记此数组已满,该关键字元素表示:请到本数组最后一个位置获取下一个数组的引用和NEXT在原数组中相对位置的偏移量,并在下一个数组相同的偏移量位置存入元素A。

在设定队列中单个数组长度的时候(假如为c),为了更好地适应无界队列场景并确定所存储元素在数组中的位置,c的大小必须为2N。因为我们要存储一个指向下一个数组的引用,同时也要存储一个标志位NEXT,而这个NEXT又与我们所存储元素的位置相关,所以我们给设定的数组长度加1,即c+1,并将其作为实际的数组长度,而一个数组中实际能存储的元素数量仅为c-1,所以设定一个变量mask,通过二进制计算来专门确定当前下发元素在数组中要存储的位置,如图2-4所示(图中的length为我们设定的传入数组的长度)。

图2-4

在将元素存储到这个数组中的时候,如何存储NEXT标志位常量元素呢?这时就要腾出一个位置,我们使用this.mask=c-1空出了两个位置(对于this.producerArray数组来说,c已经代表了这个数组最后一个位置的下标,那么c-1就代表了倒数第2位的下标)。接着通过(index+1)&mask来确定我们所插入元素在该数组中的下一个位置的偏移量(也可以理解为在该数组中位置的下标,此处的index是要存储元素在整个无界队列中的位置),用来判断我们是否需要切换新的数组。从上面的代码注释来看,假如传入的linkSize为128,那么c就是128,数组长度就是129,而this.mask就是127,当index的值等于127的时候,(index+1)&mask就等于0,此时指向的就是数组中下标为0的位置。

这样,当要存储元素所在的index是一个很大的数时,就可以通过计算得到该数在当下正在操作的数组中的偏移量(也就是数组中对应位置的下标),接着判断其下一个位置中是否有元素(即(index+1)&mask位置中是否有元素),这里如果执行的是添加元素操作,那么当下一个位置中有元素时,则说明当前数组已满,这时就创建新数组b,并将这个元素存储在新数组的偏移量大小为index&mask的位置,然后在原数组内的这个位置存储NEXT标志位常量元素,并在原数组的最后一个位置存储新数组的引用,将这个新数组b赋值给this.producerArray。因为第一个原数组的引用已存储于this.consumerArray(可查看上面定义的构造器),所以不用担心其会丢失。

之所以要求必须使用2N的长度,是因为只有这样,在长度减1之后才能得到一个以0开头后面全都是1的二进制数,在使用任意合法范围内的正整数与之进行&运算时,所得结果都会在0至2N-1范围内。

在这里,有读者可能会产生疑问,如果有一个b数组,可参考图2-4中下面的数组,NEXT标志位常量元素就不能存储于倒数第2位了(因为倒数第2位已经被触发创建b数组的那个元素所占据),所以这个标志位常量元素索性就存储于b数组的倒数第3位,然后在下一个新数组c的倒数第3位存储新元素,当又有一个新元素要往数组c中存储的时候,其会存储在数组c中的倒数第2位,再来一个新元素就存储于c的正数第1位(也就是c数组中下标为0的位置),依此类推。最后那个位置总是固定空出,留给下一个数组的引用,而NEXT标志位常量元素的意义是,你应该去下一个数组的这个偏移量位置找元素。

注意:请区分总index长度和相对每个数组的index长度(这个相对index长度就是具体每个数组的偏移量offset),这对理解通过SpscLinkedArrayQueue中的poll方法来拉取元素的过程很有帮助。

接下来看看源码实现:

在这里,利用原子类控制来实现了多线程操作下的安全保障,利用&操作来达到了类似于环形队列般的位置锁定(类似于%取余操作),也提高了底层代码的性能,毕竟数组的创建和存储,以及获取元素的操作,都需要很大的计算量。最后,利用NEXT常量元素来做标志位,标记下一个元素位置。这样可以穿针引线,达到了蛇形走位的目的。

2.3.2 QueueSubscription.requestFusion的催化效应

有些时候,队列只是一个形式,其并不会真正地产生、存储或获取元素,更多的是为了匹配大的架构代码规则。可以思考一下,当基于对状态值感应的情况下发元素时,真的不需要存储元素,从主观的角度来说,这里无法接受多个线程同时请求下发元素(虽然在RxJava 2中是可以做到的),也就是说Reactor 3和RxJava 2不一样,Reactor 3中的实现过程和RxJava 2相比,加了一个布尔型变量的判断控制。那么该如何将消费控制在一个单线程中呢?即便加入调度操作也不好使,在不改变RxJava 2源码大的格局形式的情况下,该如何对Reactor 3进行创新(应该说在源头进行Bug的修复,毕竟多线程请求很容易发生状态值异常,这点有多线程编程基础的读者都应该很清楚)呢?下面就来看看QueueSubscription.requestFusion带给我们的催化效应吧。

先来看一段源码:

在这里,可以看到peek、add、offer这3个方法都是默认方法,而且这3个方法的默认实现都是抛出异常,它们就是用来给QueueSubscription接口定基调的东西,告诉我们这个接口并不支持我们所熟悉的背压。结合上下文可知,它需要一个用来判断是同步还是异步请求拉取策略的方法,这就是requestFusion。

从int requestFusion(int requestedMode)方法的注释可以知道,作为一个订阅者(即消费者),下游操作会调用上一个操作实现的requestFusion方法,你传入的参数所代表的模式可以从SYNC、ASYNC或者ANY等任选其一(但绝不能是NONE)。而上游的操作对于requestFusion方法的具体实现,所返回的模式结果应该是NONE、SYNC或ASYNC三者之一(但绝不能是ANY)。

到此可以知道,requestFusion方法就是用来做策略判断的,根据订阅者(也就是中间操作)所传入的支持的模式,源做出相应的反馈,比如支持异步request请求的话就回馈一个ASYNC,也就是说,这个模式归根结底还是根据源所支持的类型来进行选择的。

下面来看看FluxGenerate.requestFusion的实现:

可以看到,其策略是返回一个Fuseable.NONE,因为RxJava 2的FlowableGenerate.GeneratorSubscription并没有实现BasicIntQueueSubscription接口(我们可以理解为RxJava在此不做请求模式设定),所以Reactor 3中GeneratorSubscription实现的QueueSubscription接口表示在非特殊情况下不对模式做任何设定,在requestFusion方法实现中,if条件语句不成立的情况下,默认返回一个NONE策略标志。

至此,对QueueSubscription的讲解暂时告一段落,更多关于其在多线程下的实际应用,将放在讲解调度器的时候具体讲解。