有时候我们还是会有同步的需求的,比如我们需要通过Flux获取一个列表,但是在我获取到第一个数据的时候,我就去做一件事,但是没获得之前我这里就都是阻塞的。这里刚好和我前段时间的一个需求类似,我需要获取一些信息,然后通过长链接依次转发出去,但是长链接是采用的一个懒创建的方式,在没有获得数据的时候是不创建的,如果获得到第一个数据的时候就可以创建了。
异步到同步阻塞(Reactive to Block)
如果针对异步代码,我们想回到阻塞方式有办法吗,答案是有的,我们本节的开始也说了一个简单的例子。
想要将reactive代码变为阻塞的,有一系列block方法。
如果是简单的就是将这个代码变为阻塞调用,那么就是直接调用block方法就好,这个方式是Mono才有的,我们可以阻塞获取数据并执行。如果是Flux,会有两个方法,分别是blockFirst和blockLast,也就是说,在获取到第一个元素的时候就进行处理,还是获取到最后的元素再处理数据。
当然这三个方法都还有对应的一个带参方法,参数是超时时间,也就是如果超过指定时间还没有得到想要的数据的话,就会抛出一个RuntimeException。
这里还有个阻塞方法,就是Flux的toIterable,我们可以将Flux返回的数据转为一个Iterable,但是要转肯定是要所有的数据都得到以后才能转了,所以这个方法也是个阻塞方法。
类似的还有一个,就是toStream,就是转为jdk8的stream,也是个阻塞的实现。而Mono由于是一个,所以也可以通过toFuture转为Future。
同步阻塞到异步(Block to Reactive)
针对已有的同步代码,如果其不支持响应式,我们怎么处理呢?
比如我的方法中有一系列操作,其中一个是jdbc并且不支持响应式编程,那么我们常规做这个的时候可以开个线程,单独去发起此类需求,在数据准备好了之后我们再接下来去处理。
而reactor其实帮我们想好了类似的情况应该怎么做,这就需要用到defer方法了。这个方法类似懒加载,正常处理我们都是声明了之后就要去搞了,但是defer包装的Publisher是在实际订阅(subscribe)的时候才去执行。下面针对这个方法举个小例子:
1 2 3 4 5 6 7 8 |
Mono<Date> mono1 = Mono.just(new Date()); Mono<Date> mono2 = Mono.defer(() -> Mono.just(new Date())); mono1.subscribe(System.out::println); mono2.subscribe(System.out::println); Thread.sleep(3000); mono1.subscribe(System.out::println); mono2.subscribe(System.out::println); |
这个数据的打印结果如下:
1 2 3 4 |
Tue Jan 18 18:01:29 CST 2022 Tue Jan 18 18:01:29 CST 2022 Tue Jan 18 18:01:29 CST 2022 Tue Jan 18 18:01:32 CST 2022 |
可以发现,3秒后第二遍订阅mono1的时候,时间和之前是一样的,但是mono2由于使用了defer,时间就延后了3秒。所以记住,just是声明的时候就已经定义好了,而defer有延迟的意思,在订阅的时候才现执行。
假设对于数据库的处理,我们可以这么做:
1 |
Flux.defer(() -> Flux.fromIterable(repository.findAll())); |
因为这个数据库操作是阻塞的代码,所以我们通过这种方式来声明延迟的阻塞执行,当然现在也有人开发了对应的非阻塞sdk,mysql之类的也支持该模式,所以如果可以的话还是建议使用响应式的数据库操作会更好。
当然对于这种耗时的操作,我们还有别的办法来处理,先上代码:
1 |
Flux.fromIterable(list).publishOn(Schedulers.boundedElastic()).doOnNext(user -> repository.save(user)).then().subscribe(); |
这里我们首先有个已经是响应式编程的Flux<User>了,这里我用了Flux#fromIterable来模拟的,然后通过publishOn方法可以进行线程的切换,这部分内容我将会在下一节讲解,包括其中的参数,目前只需要知道,publishOn可以让后续的流程进行线程切换(Worker线程),切换的线程就是其中的参数,另外publishOn也是可以重复调用的,可以支持多次切换线程。Schedulers.boundedElastic()一般是用于跑阻塞线程的。
接着我们通过doOnNext来对其中的元素进行的数据库的新增操作,调用then来转为Mono<Void>,最后调用subscribe来启动Publisher。
©原创文章,转载请注明来源: 赵伊凡's Blog
©本文链接地址: 响应式编程的异步与同步阻塞