RxJava基本使用

优秀的人,不是不合群,而是他们合群的人里面没有你

RxJava是什么?

开源地址

RxAndroid

要知道RxJava是什么,那么你应该先去了解一下Rx。Rx的全称是Reactive Extensions,直译过来就是响应式扩展。Rx基于观察者模式,他是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。Rx已经渗透到了各个语言中,有了Rx所以才有了 RxJava,Rx.NET、RxJS、RxSwift、Rx.rb、RxPHP等等

官方描述 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)

  • RXJava 是基于观察者模式构建的

我们先把观察者模式说清楚 例如:“按下开关,台灯灯亮”

在这个事件中,台灯作为观察者,开关作为被观察者,台灯透过电线来观察开关的状态来并做出相应的处理

  • 开关(被观察者)作为事件的产生方(生产“开”和“关”这两个事件),是主动的,是整个开灯事理流程的起点。
  • 台灯(观察者)作为事件的处理方(处理“灯亮”和“灯灭”这两个事件),是被动的,是整个开灯事件流程的终点。
  • 在起点和终点之间,即事件传递的过程中是可以被加工,过滤,转换,合并等等方式处理的(上图没有体现,后面对会讲到)。

响应式编程是什么?

响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。

RxJava好在哪?

例如:从服务器上拉去最新小区列表的需求,可能这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
new Thread() {
@Override
public void run() {
super.run();
//从服务端获取小区列表
List<Community> communities = getCommunitiesFromServer();
for (Community community : communities) {
List<House> houses = community.houses;
for (House house : houses) {
if (house.price >= 5000000) {
runOnUiThread(new Runnable() {
@Override
public void run() {
//将房子的信息添加到屏幕上
addHouseInformationToScreen(house);
}
});
}
}
}
}
}.start();

RxJava 这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable.from(getCommunitiesFromServer())
.flatMap(new Func1<Community, Observable<House>>() {
@Override
public Observable<House> call(Community community) {
return Observable.from(community.houses);
}
}).filter(new Func1<House, Boolean>() {
@Override
public Boolean call(House house) {
return house.price>=5000000;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
//将房子的信息添加到屏幕上
addHouseInformationToScreen(house);
}
});

Lambda 简化:

1
2
3
4
5
Observable.from(getCommunitiesFromServer())
.flatMap(community -> Observable.from(community.houses))
.filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::addHouseInformationToScreen);

RxJava基本元素

RxJava基本使用

操作符

Create

create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private int i;
private Disposable mDisposable;

@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
mDisposable = d;
}

@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext : value : " + integer + "\n" );
i++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
mDisposable.dispose();
Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
}
}

@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}

@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});

结果:
1
2
3
4
5
6
7
8
onSubscribe : false
Observable emit 1
onNext : value : 1
Observable emit 2
onNext : value : 2
onNext : isDisposable : true
Observable emit 3
Observable emit 4

Map

它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});

结果:
1
2
3
07-10 14:15:49.857 26872-26872/com.nanchen.rxjava2examples E/RxMapActivity: accept : This is result 1
07-10 14:15:49.859 26872-26872/com.nanchen.rxjava2examples E/RxMapActivity: accept : This is result 2
07-10 14:15:49.861 26872-26872/com.nanchen.rxjava2examples E/RxMapActivity: accept : This is result 3

Concat

简单来说就是连接两个或多个发射器,向一个发射器一样,他们不产生交叉

  Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });

执行结果: 
07-10 14:24:10.147 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 1
07-10 14:24:10.150 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 2
07-10 14:24:10.154 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 3
07-10 14:24:10.158 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 4
07-10 14:24:10.162 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 5
07-10 14:24:10.166 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 6

FlatMap

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
               e.onNext(1);
               e.onNext(2);
               e.onNext(3);
           }
       }).flatMap(new Function<Integer, ObservableSource<String>>() {
           @Override
           public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
               List<String> list = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   list.add("I am value " + integer);
               }
               int delayTime = (int) (1 + Math.random() * 10);
               return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
           }
       }).subscribeOn(Schedulers.newThread())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<String>() {
                   @Override
                   public void accept(@NonNull String s) throws Exception {
                       Log.e(TAG, "flatMap : accept : " + s + "\n");
                       mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                   }
               });

结果:

ConcatMap

concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "flatMap : accept : " + s + "\n");
                        mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                    }
                });

执行结果:

Distinct

顾名思义,去重操作符

Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("distinct : " + integer + "\n");
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });

执行结果:

Filter

emit only those items from an Observable that pass a predicate test

Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("filter : " + integer + "\n");
                Log.e(TAG, "filter : " + integer + "\n");
            }
        });

结果:

just

就是一个简单的发射器依次调用 onNext() 方法。

Observable.just("1", "2", "3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        mRxOperatorsText.append("accept : onNext : " + s + "\n");
                        Log.e(TAG,"accept : onNext : " + s + "\n" );
                    }
                });

结果:

buffer

periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
buffer

操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。

Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                        mRxOperatorsText.append("buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        mRxOperatorsText.append("buffer value : ");
                        Log.e(TAG, "buffer value : " );
                        for (Integer i : integers) {
                            mRxOperatorsText.append(i + "");
                            Log.e(TAG, i + "");
                        }
                        mRxOperatorsText.append("\n");
                        Log.e(TAG, "\n");
                    }
                });

结果:

timer

create an Observable that emits a particular item after a given delay

Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n");
Observable.timer(2, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,所以需要切换回主线程
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
            }
        });

输出:

Interval

create an Observable that emits a sequence of integers spaced by a given time interval

Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
Observable.interval(3,2, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
                Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
            }
        });

输出:

skip

suppress the first n items emitted by an Observable

代表跳过 count 个数目开始接收。

Observable.just(1,2,3,4,5)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("skip : "+integer + "\n");
                        Log.e(TAG, "skip : "+integer + "\n");
                    }
                });

结果:

take

emit only the first n items emitted by an Observable

接受一个 long 型参数 count ,代表至多接收 count 个数据

Flowable.fromArray(1,2,3,4,5)
                .take(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("take : "+integer + "\n");
                        Log.e(TAG, "accept: take : "+integer + "\n" );
                    }
                });

结果:

Debounce

only emit an item from an Observable if a particular timespan has passed without it emitting another item

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("debounce :" + integer + "\n");
                        Log.e(TAG,"debounce :" + integer + "\n");
                    }
                });

结果:

last

emit only the last item (or the last item that meets some condition) emitted by an Observable

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

Observable.just(1, 2, 3)
                .last(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("last : " + integer + "\n");
                        Log.e(TAG, "last : " + integer + "\n");
                    }
                });

结果:

merge

combine multiple Observables into one by merging their emissions

merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("merge :" + integer + "\n");
                        Log.e(TAG, "accept: merge :" + integer + "\n" );
                    }
                });

结果:

reduce

reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

Observable.just(1, 2, 3)
               .reduce(new BiFunction<Integer, Integer, Integer>() {
                   @Override
                   public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                       return integer + integer2;
                   }
               }).subscribe(new Consumer<Integer>() {
           @Override
           public void accept(@NonNull Integer integer) throws Exception {
               mRxOperatorsText.append("reduce : " + integer + "\n");
               Log.e(TAG, "accept: reduce : " + integer + "\n");
           }
       });

结果:

scan

apply a function to each item emitted by an Observable, sequentially, and emit each successive value

scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

Observable.just(1, 2, 3)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("scan " + integer + "\n");
                Log.e(TAG, "accept: scan " + integer + "\n");
            }
        });

结果:

zip

combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Retrofit retrofit = new Retrofit.Builder()
             .baseUrl("http://lol.zhangyoubao.com/apis/rest/RolesService/")
             .addConverterFactory(GsonConverterFactory.create())
             .addCallAdapterFactory(RxJava2CallAdapterFactory.create()).build();

     LolInterface lolInterfaceRequest = retrofit.create(LolInterface.class);

     Observable<LolDataList> observable1 = lolInterfaceRequest.reqLolData().subscribeOn(Schedulers.io());

     Observable<LolDataList> observable2 = lolInterfaceRequest.reqLolData().subscribeOn(Schedulers.io());


     Observable.zip(observable1, observable2, new BiFunction<LolDataList, LolDataList, String>() {
         @Override
         public String apply(@NonNull LolDataList lolDataList, @NonNull LolDataList lolDataList2) throws Exception {
             return "合并后的数据为 lolDataList1="+lolDataList.getData().get(0).getDisplayName() + ",lolDataList2="+lolDataList2.getData().get(0).getDisplayName();
         }
     }).subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Consumer<String>() {
                 @Override
                 public void accept(@NonNull String s) throws Exception {
                     Log.e(TAG, "accept: 成功:" + s+"\n");
                 }
             }, new Consumer<Throwable>() {
                 @Override
                 public void accept(@NonNull Throwable throwable) throws Exception {
                     Log.e(TAG, "accept: 失败:" + throwable+"\n");
                 }
             });

结果:

accept: 成功:合并后的数据为 lolDataList1=灭世魔神 诺提勒斯,lolDataList2=灭世魔神 诺提勒斯

subScribeOn

指定的就是发射事件的线程

多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略

observeOn

指定的就是订阅者接收事件的线程。

多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次

线程切换

Observable.create(new ObservableOnSubscribe<Integer>() {
         @Override
         public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
             Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
             e.onNext(1);
             e.onComplete();
         }
     }).subscribeOn(Schedulers.newThread())
             .subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread())
             .doOnNext(new Consumer<Integer>() {
                 @Override
                 public void accept(@NonNull Integer integer) throws Exception {
                     Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                 }
             })
             .observeOn(Schedulers.io())
             .subscribe(new Consumer<Integer>() {
                 @Override
                 public void accept(@NonNull Integer integer) throws Exception {
                     Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                 }
             });

结果:

07-14 11:43:11.669 8550-8611/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-14 11:43:11.670 8550-8550/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-14 11:43:11.671 8550-8612/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2

场景举例

一个简单的网络请求

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
                Log.e(TAG, "create 线程:" + Thread.currentThread().getName() + "\n");
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {

                        Log.e(TAG, "map 线程:" + Thread.currentThread().getName() + "\n");
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:转换前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext 线程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");

                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "失败:" + throwable.getMessage() + "\n");
                    }
                });

执行结果:

07-13 09:13:08.664 16010-16578/com.nanchen.rxjava2examples E/RxNetSingleActivity: map 线程:RxCachedThreadScheduler-1
    map:转换前:okhttp3.internal.http.RealResponseBody@3e17a0f
07-13 09:13:08.691 16010-16578/com.nanchen.rxjava2examples E/RxNetSingleActivity: create 线程:RxCachedThreadScheduler-1
07-13 09:13:08.691 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: doOnNext 线程:main
07-13 09:13:08.702 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: doOnNext: 保存成功:MobileAddress{error_code=10005, reason='应用未审核超时,请提交认证', result=null}
07-13 09:13:08.737 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: subscribe 线程:main
07-13 09:13:08.741 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: 成功:MobileAddress{error_code=10005, reason='应用未审核超时,请提交认证', result=null}

多个网络请求串行执行

Observable.create(new ObservableOnSubscribe<LolDataList>() {
           @Override
           public void subscribe(ObservableEmitter<LolDataList> e) throws Exception {
               Log.e(TAG, "create 线程:" + Thread.currentThread().getName() + "\n");
               Request.Builder builder = new Request.Builder()
                       .url("http://lol.zhangyoubao.com/apis/rest/RolesService/new_paper_skin?roleid=19&i_=863472021700411&t_=1433993799448&p_=30689&v_=400500&a_=lol&pkg_=com.anzogame.lol&d_=android&osv_=16&cha=TEST&u_=&")
                       .get();
               Request request = builder.build();
               Call call = new OkHttpClient().newCall(request);
               okhttp3.Response response = call.execute();
               if (response.isSuccessful()) {
                   ResponseBody body = response.body();
                   if (body != null) {
                       LolDataList dataList =  new Gson().fromJson(body.string(), LolDataList.class);
                       e.onNext(dataList);
                   }
               }

           }
       }).subscribeOn(Schedulers.io())
       .doOnNext(new Consumer<LolDataList>() {
           @Override
           public void accept(LolDataList lolDataList) throws Exception {
               // 处理数据保存逻辑
           }
       })
       .flatMap(new Function<LolDataList, ObservableSource<LolDataList>>() {
           @Override
           public ObservableSource<LolDataList> apply(LolDataList lolDataList) throws Exception {
               Request.Builder builder = new Request.Builder()
                       .url("http://lol.zhangyoubao.com/apis/rest/RolesService/new_paper_skin?roleid=19&i_=863472021700411&t_=1433993799448&p_=30689&v_=400500&a_=lol&pkg_=com.anzogame.lol&d_=android&osv_=16&cha=TEST&u_=&")
                       .get();
               Request request = builder.build();
               Call call = new OkHttpClient().newCall(request);
               okhttp3.Response response = call.execute();
               LolDataList dataList = null;
               if (response.isSuccessful()) {
                   ResponseBody body = response.body();
                   if (body != null) {
                        dataList =  new Gson().fromJson(body.string(), LolDataList.class);
                   }
               }
               Log.e(TAG, "flatMap 线程:" + Thread.currentThread().getName() + "\n");
               return Observable.just(dataList);
           }
       })
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Consumer<LolDataList>() {
           @Override
           public void accept(LolDataList lolDataList) throws Exception {
               Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n"+lolDataList.getData().size());

           }
       }, new Consumer<Throwable>() {
           @Override
           public void accept(Throwable throwable) throws Exception {

           }
       });

执行结果:

13 16:48:53.667 9180-9382/? E/RxCaseFlatMapActivity: create 线程:RxCachedThreadScheduler-2
07-13 16:48:54.104 9180-9382/? E/RxCaseFlatMapActivity: flatMap 线程:RxCachedThreadScheduler-2
07-13 16:48:54.105 9180-9180/? E/RxCaseFlatMapActivity: subscribe 线程:main6

间隔任务实现心跳

mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
              .doOnNext(new Consumer<Long>() {
                  @Override
                  public void accept(@NonNull Long aLong) throws Exception {
                      Log.e(TAG, "accept: doOnNext : "+aLong );
                  }
              })
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new Consumer<Long>() {
                  @Override
                  public void accept(@NonNull Long aLong) throws Exception {
                      Log.e(TAG, "accept: 设置文本 :"+aLong );
                      mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n");
                  }
              });

结果:

07-14 11:01:19.050 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 0
07-14 11:01:19.052 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :0
07-14 11:01:20.048 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 1
07-14 11:01:20.050 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :1
07-14 11:01:21.050 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 2
07-14 11:01:21.052 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :2
07-14 11:01:22.049 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 3
07-14 11:01:22.051 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :3

资源链接

RxJava文档中文版