2.6 将常见的监听器改造成响应式结构

很多读者对监听器一直都比较抗拒,其实完全没必要,你只需将它看作容器里的一个bean即可。其注册过程,其实就是将这个bean添加进一个容器的过程,这个容器可以是一个List对象,也可以是一个Set对象,还可以是一个Map对象,根据不同的使用场景而变化。

当产生事件时,根据事件类型,获取相应的支持处理该类型的监听器,得到监听器后进行遍历处理即可。此处不打算编写太复杂的Demo,监听器的注册过程只是一个简单的赋值操作,下面先定义接口:

然后,对MyEventProcessor接口进行实现:

在这里,通过register(MyEventListener eventListener)方法定义了注册过程,通过dataChunk(String...values)方法下发事件。我们把这个下发事件的过程放在一个全新的线程中执行,这样可以达到异步效果,同样processComplete也是由所定义的executor执行的。

最后,对监听器MyEventListener进行响应式的异步实现:

我们创建了一个源,自定义源的核心逻辑在于next和complete操作,其下发类型由next方法所传递的参数类型决定。确定了这些后,下面需要确定业务逻辑,主要是注册操作。然后思考,平时监听器需要一个相应的Handler来处理事件,那么此处为了简化逻辑,将本应由Handler处理的直接由监听器对应的方法实现,主要是处理、消费所下发的事件。

假如是在Spring中操作,可以理解为给此监听器内的相应Handler进行赋值。假如这个监听器对应了一个List集合的事件处理Handlers,那么就可以在自定义的Subscriber的onNext方法内根据特定匹配条件来选择最合适的Handler。

我们之前介绍过FluxCreate中的源码,在这里,回顾一下与此处内容相关的部分源码:

可以看到,只有在产生订阅的时候才会执行Consumer<?super FluxSink>source所代表的逻辑。对于监听器Demo来说,通过Flux.create得到的源会在产生订阅后引发执行source.accept,这时相应Handler的定义才会正式注册,在这里不会为新建的监听器对象分配内存,以此做到按需分配内存,提高程序性能。理解这个逻辑之后,可以知道事件分发也只有在产生订阅之后才可以进行,即调用myEventProcessor.dataChunk("foo","bar","baz");同样,若想解除监听,调用Flux.create所传参数中定义的myEventProcessor.processComplete即可解除订阅关系。