RxJava2.0 中的背压

背压 backpressure

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略,或者说背压是流速控制的一种策略
Backpressure 其实是一种现象:在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。

  • 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
  • 背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。

响应式拉取(reactive pull)

在RxJava的观察者模型中,被观察者是主动的推送数据给观察者,观察者是被动接收的。而响应式拉取则反过来,观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。

观察者可以根据自身实际情况按需拉取数据,而不是被动接收(也就相当于告诉上游观察者把速度慢下来),最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略

RxJava 2.x 更新中,出现了两种观察者模式:

  • Observable ( 被观察者 ) / Observer ( 观察者 )
  • Flowable (被观察者)/ Subscriber (观察者

RxJava 2.x 也为我们保留了简化订阅方法,我们可以根据需求,进行相应的简化订阅,只不过传入对象改为了 Consumer
Consumer 即消费者,用于接收单个值,BiConsumer 则是接收两个值.

Hot and Cold Observables

Hot Observables 和cold Observables并不是严格的概念区分,它只是对于两类Observable形象的描述

  • Cold Observables:指的是那些在订阅之后才开始发送事件的Observable(每个Subscriber都能接收到完整的事件)。
  • Hot Observables:指的是那些在创建了Observable之后,(不管是否订阅)就开始发送事件的Observable

在1.0中我们一般使用的都是Cold Observable,除非特殊需求,才会使用Hot Observable,在这里,Hot Observable这一类是不支持背压的,而是Cold Observable这一类中也有一部分并不支持背压(比如interval,timer等操作符创建的Observable),都是Observable,结果有的支持背压,有的不支持,这就是RxJava1.X的一个问题,但在2.0中,这种问题已经解决了

背压例子

以两根水管举例子:

之前我们所的上游和下游分别是Observable和Observer, 这次不一样的是上游变成了Flowable, 下游变成了Subscriber, 但是水管之间的连接还是通过subscribe(), 我们来看看最基本的用法吧:

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意这句代码
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        upstream.subscribe(downstream);

这段代码中,分别创建了一个上游Flowable和下游Subscriber, 上下游工作在同一个线程中, 执行结果:

07-20 16:30:29.642 7679-7679/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: onSubscribe
07-20 16:30:29.643 7679-7679/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: emit 1
    onNext: 1
07-20 16:30:29.644 7679-7679/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: emit 2
    onNext: 2
    emit 3
    onNext: 3
    emit complete
07-20 16:30:29.645 7679-7679/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: onComplete

在上面的代码中也有这么一句代码:

s.request(Long.MAX_VALUE);      

这句代码有什么用呢, 不要它可以吗? 我们来试试: 去掉这句这行结果:

07-20 16:35:47.474 12931-12931/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: onSubscribe
07-20 16:35:47.475 12931-12931/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: emit 1
07-20 16:35:47.487 12931-12931/com.nanchen.rxjava2examples W/RxCaseBackPressActivity: onError: 
    io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
        at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
        at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
        at com.nanchen.rxjava2examples.module.backpress.RxCaseBackPressActivity$1.subscribe(RxCaseBackPressActivity.java:194)
        at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
        at io.reactivex.Flowable.subscribe(Flowable.java:12986)
        at io.reactivex.Flowable.subscribe(Flowable.java:12935)
        at com.nanchen.rxjava2examples.module.backpress.RxCaseBackPressActivity.doSomething(RxCaseBackPressActivity.java:230)
        at com.nanchen.rxjava2examples.module.rxjava2.operators.item.RxOperatorBaseActivity.onViewClicked(RxOperatorBaseActivity.java:47)
        at com.nanchen.rxjava2examples.module.rxjava2.operators.item.RxOperatorBaseActivity_ViewBinding$1.doClick(RxOperatorBaseActivity_ViewBinding.java:38)
        at butterknife.internal.DebouncingOnClickListener.onClick(DebouncingOnClickListener.java:22)
        at android.view.View.performClick(View.java:6303)
        at android.view.View$PerformClick.run(View.java:24828)
        at android.os.Handler.handleCallback(Handler.java:789)
        at android.os.Handler.dispatchMessage(Handler.java:98)
        at android.os.Looper.loop(Looper.java:164)
        at android.app.ActivityThread.main(ActivityThread.java:6798)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.Zygote$MethodAndArgsCaller.run(Zygote.java:240)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:767)
07-20 16:35:47.487 12931-12931/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: emit 2
    emit 3
    emit complete

刚刚发出了第一个请求,就抛出异常了, 为什么呢?首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常, 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们 。

那异步线程会报错误么?

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

执行结果:

07-20 16:47:32.549 22878-22878/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: onSubscribe
    emit 1
07-20 16:47:32.550 22878-22878/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: emit 2
    emit 3
    emit complete

没有报异常,因为是异步,那么s.request(Long.MAX_VALUE); 什么意思呢,其实Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题,requeset 标识的是下游的处理能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM.

异步线程中上游正确发送了所有事件?

这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.

验证一下:

public static void request(long n) {
        mSubscription.request(n); //在外部调用request请求上游
    }

    public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;  //把Subscription保存起来
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

在界面上增加了一个按钮, 点击一次就调用Subscription.request(1), 来看看运行结果:

上游的缓冲池有多大?

答案是128 在Flowable源码里面可以找到

public abstract class Flowable<T> implements Publisher<T> {
   /** The default buffer size. */
   static final int BUFFER_SIZE;
   static {
       BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
   }

测试一下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

执行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
  ...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127
zlc.season.rxjava2demo D/TAG: emit 128  //这是第129个事件
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven$7.subscribe(ChapterSeven.java:169)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                                  at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                                  at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
                                  at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
                                  at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
                                  at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
                                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                  at java.lang.Thread.run(Thread.java:761)

当缓冲池里面的内容大于128的时候一样的报异常,这时候就需要一个更大池子了,BackpressureStrategy.BUFFER
这个策略下,这个池子没有上线,可以一直发,直到内存爆掉,不信看例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
...
zlc.season.rxjava2demo D/TAG: emit 997
zlc.season.rxjava2demo D/TAG: emit 998
zlc.season.rxjava2demo D/TAG: emit 999

如果放开数量限制,无线次发送事件,最终会导致内存暴增直到爆掉,所以Flowable用不好就内存泄漏啦

BackpressureStrategy.DROP 策略

Drop就是直接把存不下的事件丢弃

public static void request() {
        mSubscription.request(128);
    }

public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

执行结果:

FLowable内部的默认的水缸大小为128, 因此, 它刚开始肯定会把0-127这128个事件保存起来, 然后丢弃掉其余的事件, 当我们request(128)的时候,下游便会处理掉这128个事件, 那么上游水缸中又会重新装进新的128个事件

BackpressureStrategy.LATEST

Latest就是只保留最新的事件.

Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 10000; i++) {  //只发1w个事件
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    s.request(128);  //一开始就处理掉128个事件
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

Latest的运行结果

从运行结果中可以看到, 除去前面128个事件, 与Drop不同, Latest总是能获取到最后最新的事件, 例如这里我们总是能获得最后一个事件9999.

关于FLowable的策略我们也讲完了, 这些FLowable是我自己创建的, 所以我可以选择策略, 那面对有些FLowable并不是我自己创建的, 该怎么办呢? 比如RxJava中的interval操作符, 这个操作符并不是我们自己创建的, 来看下面这个例子

Flowable.interval(1, TimeUnit.MICROSECONDS)
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Subscriber<Long>() {
                   @Override
                   public void onSubscribe(Subscription s) {
                       Log.d(TAG, "onSubscribe");
                       s.request(Long.MAX_VALUE);
                   }

                   @Override
                   public void onNext(Long aLong) {
                       Log.d(TAG, "onNext: " + aLong);
                       try {
                           Thread.sleep(1000);  //延时1秒
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                   }

                   @Override
                   public void onError(Throwable t) {
                       Log.w(TAG, "onError: ", t);
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete");
                   }
               });

interval操作符发送Long型的事件, 从0开始, 每隔指定的时间就把数字加1并发送出来, 在这个例子里, 我们让它每隔1毫秒就发送一次事件, 在下游延时1秒去接收处理, 不用猜也知道结果是什么:

07-20 17:29:12.547 13561-13561/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: onSubscribe
07-20 17:29:12.568 13561-13561/com.nanchen.rxjava2examples D/RxCaseBackPressActivity: onNext: 0
07-20 17:29:13.577 13561-13561/com.nanchen.rxjava2examples W/RxCaseBackPressActivity: onError: 
    io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
        at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
        at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:457)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:307)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:302)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
        at java.lang.Thread.run(Thread.java:764)
07-20 17:29:13.578 13561-13561/com.nanchen.rxjava2examples I/Choreographer: Skipped 61 frames!  The application may be doing too much work on its main thread.

错误信息比较明显,缓冲池子超过了128 了,不能deliver了,呵呵,其实RxJava给我们提供了其他的方法,策略和上面的一样

onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()