响应式编程的backpressure

reactor3

通常我们Publisher获取数据到subscriber订阅,是通过push模式进行的,但是实际上订阅者也可以主要拉取数据,形成push+pull的模式,主动拉取在响应式编程里面叫做backpressure(背压)。reactor3中的关键字是request(request(long)),也就是告诉Publisher我一次可以处理多少数据,如果你有就一次给我这么多,没有的话就会等待发布者push(所以实际上会是push+pull的模式)。request(Long.MAX_VALUE)意味着无限需求,Publisher会以最快的速度发送数据(也意味着订阅端认为来多少数据自己都是可以处理的)。

可以看下代码示例:

期望获取10个,实际上只能生产5个。

不过现在这个方法已经被弃用了,现在可以尝试下下面的方法:

take方法之前说过,类似stream的limit的意思。但是这里带了个第二个参数,其实可以理解为是否启用背压(其实这里会更严格的遵守背压,实际上没有用到request),默认take(2)这样,是不启用的。其中的区别在源码的注释里也写的很清楚了。启用背压的话,flux在生产数据的时候当生产到指定数值之后就不会继续生产了,这样的好处是不会产生多余的无用的数据,但是不启用的背压的话,虽然你只要两个数据,但是实际上还是会产生5个数据。

如果看api,还是会发现有个方法是limitRequest,实际上已经被上述方法替代了。

当然在频率控制上,还有个方法是limitRate,如果我们总共100个request,设置limitRate(10),则上游会最多10次把数据分批生产出来。

其实我没还可以对Flux的示例调用一个log方法,可以打印出更详细的调用情况。

要想使用request,还可以重新BaseSubscriber的hookOnSubscribe方法,看默认实现其实是subscription.request(Long.MAX_VALUE)

也就是无限需求。具体示例可以看下列代码:

这里重写了hookOnSubscribe方法,我们每次获取两个元素,重写hookOnNext,也就是说在拿到对应元素的时候要做什么,我们这里做的一件事就是打印这个值。以上例来看,每次获取两个元素,有个count来确认我们是否需要继续request,一开始request2个元素,那么就是能获取到两个元素,后面count2之后,如果不重新request的话,那就断了,没法继续获取元素了。所以需要重新request(2)来继续获取后面的两个元素。

©原创文章,转载请注明来源: 赵伊凡's Blog
©本文链接地址: 响应式编程的backpressure

发表评论

电子邮件地址不会被公开。 必填项已用*标注