优秀的人,不是不合群,而是他们合群的人里面没有你
RxJava是什么?
要知道RxJava是什么,那么你应该先去了解一下Rx。Rx的全称是Reactive Extensions,直译过来就是响应式扩展。Rx基于观察者模式,他是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。Rx已经渗透到了各个语言中,有了Rx所以才有了 RxJava,Rx.NET、RxJS、RxSwift、Rx.rb、RxPHP等等
官方描述 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)
- RXJava 是基于观察者模式构建的
我们先把观察者模式说清楚 例如:“按下开关,台灯灯亮”
在这个事件中,台灯作为观察者,开关作为被观察者,台灯透过电线来观察开关的状态来并做出相应的处理
- 开关(被观察者)作为事件的产生方(生产“开”和“关”这两个事件),是主动的,是整个开灯事理流程的起点。
- 台灯(观察者)作为事件的处理方(处理“灯亮”和“灯灭”这两个事件),是被动的,是整个开灯事件流程的终点。
- 在起点和终点之间,即事件传递的过程中是可以被加工,过滤,转换,合并等等方式处理的(上图没有体现,后面对会讲到)。
响应式编程是什么?
响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。
RxJava好在哪?
例如:从服务器上拉去最新小区列表的需求,可能这样写:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22new Thread() {
public void run() {
super.run();
//从服务端获取小区列表
List<Community> communities = getCommunitiesFromServer();
for (Community community : communities) {
List<House> houses = community.houses;
for (House house : houses) {
if (house.price >= 5000000) {
runOnUiThread(new Runnable() {
public void run() {
//将房子的信息添加到屏幕上
addHouseInformationToScreen(house);
}
});
}
}
}
}
}.start();
RxJava 这样写:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20Observable.from(getCommunitiesFromServer())
.flatMap(new Func1<Community, Observable<House>>() {
public Observable<House> call(Community community) {
return Observable.from(community.houses);
}
}).filter(new Func1<House, Boolean>() {
public Boolean call(House house) {
return house.price>=5000000;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<House>() {
public void call(House house) {
//将房子的信息添加到屏幕上
addHouseInformationToScreen(house);
}
});
Lambda 简化:1
2
3
4
5Observable.from(getCommunitiesFromServer())
.flatMap(community -> Observable.from(community.houses))
.filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::addHouseInformationToScreen);
RxJava基本元素
RxJava基本使用
操作符
create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private int i;
private Disposable mDisposable;
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
mDisposable = d;
}
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext : value : " + integer + "\n" );
i++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
mDisposable.dispose();
Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
}
}
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
结果:
1 | onSubscribe : false |
它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});
结果:
1 | 07-10 14:15:49.857 26872-26872/com.nanchen.rxjava2examples E/RxMapActivity: accept : This is result 1 |
Concat
简单来说就是连接两个或多个发射器,向一个发射器一样,他们不产生交叉
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "concat : "+ integer + "\n" );
}
});
执行结果:
07-10 14:24:10.147 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 1
07-10 14:24:10.150 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 2
07-10 14:24:10.154 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 3
07-10 14:24:10.158 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 4
07-10 14:24:10.162 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 5
07-10 14:24:10.166 26872-26872/com.nanchen.rxjava2examples E/RxConcatActivity: concat : 6
FlatMap
transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
mRxOperatorsText.append("flatMap : accept : " + s + "\n");
}
});
结果:
ConcatMap
concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
mRxOperatorsText.append("flatMap : accept : " + s + "\n");
}
});
执行结果:
Distinct
顾名思义,去重操作符
Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("distinct : " + integer + "\n");
Log.e(TAG, "distinct : " + integer + "\n");
}
});
执行结果:
Filter
emit only those items from an Observable that pass a predicate test
Observable.just(1, 20, 65, -5, 7, 19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer >= 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("filter : " + integer + "\n");
Log.e(TAG, "filter : " + integer + "\n");
}
});
结果:
just
就是一个简单的发射器依次调用 onNext() 方法。
Observable.just("1", "2", "3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : onNext : " + s + "\n");
Log.e(TAG,"accept : onNext : " + s + "\n" );
}
});
结果:
buffer
periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
buffer
操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
mRxOperatorsText.append("buffer size : " + integers.size() + "\n");
Log.e(TAG, "buffer size : " + integers.size() + "\n");
mRxOperatorsText.append("buffer value : ");
Log.e(TAG, "buffer value : " );
for (Integer i : integers) {
mRxOperatorsText.append(i + "");
Log.e(TAG, i + "");
}
mRxOperatorsText.append("\n");
Log.e(TAG, "\n");
}
});
结果:
timer
create an Observable that emits a particular item after a given delay
Log.e(TAG, "timer start : " + TimeUtil.getNowStrTime() + "\n");
Observable.timer(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,所以需要切换回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "timer :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
}
});
输出:
Interval
create an Observable that emits a sequence of integers spaced by a given time interval
Log.e(TAG, "interval start : " + TimeUtil.getNowStrTime() + "\n");
Observable.interval(3,2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
mRxOperatorsText.append("interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
Log.e(TAG, "interval :" + aLong + " at " + TimeUtil.getNowStrTime() + "\n");
}
});
输出:
skip
suppress the first n items emitted by an Observable
代表跳过 count 个数目开始接收。
Observable.just(1,2,3,4,5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("skip : "+integer + "\n");
Log.e(TAG, "skip : "+integer + "\n");
}
});
结果:
take
emit only the first n items emitted by an Observable
接受一个 long 型参数 count ,代表至多接收 count 个数据
Flowable.fromArray(1,2,3,4,5)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("take : "+integer + "\n");
Log.e(TAG, "accept: take : "+integer + "\n" );
}
});
结果:
Debounce
only emit an item from an Observable if a particular timespan has passed without it emitting another item
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("debounce :" + integer + "\n");
Log.e(TAG,"debounce :" + integer + "\n");
}
});
结果:
last
emit only the last item (or the last item that meets some condition) emitted by an Observable
last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
Observable.just(1, 2, 3)
.last(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("last : " + integer + "\n");
Log.e(TAG, "last : " + integer + "\n");
}
});
结果:
merge
combine multiple Observables into one by merging their emissions
merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。
Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("merge :" + integer + "\n");
Log.e(TAG, "accept: merge :" + integer + "\n" );
}
});
结果:
reduce
reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。
Observable.just(1, 2, 3)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("reduce : " + integer + "\n");
Log.e(TAG, "accept: reduce : " + integer + "\n");
}
});
结果:
scan
apply a function to each item emitted by an Observable, sequentially, and emit each successive value
scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。
Observable.just(1, 2, 3)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("scan " + integer + "\n");
Log.e(TAG, "accept: scan " + integer + "\n");
}
});
结果:
zip
combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://lol.zhangyoubao.com/apis/rest/RolesService/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()).build();
LolInterface lolInterfaceRequest = retrofit.create(LolInterface.class);
Observable<LolDataList> observable1 = lolInterfaceRequest.reqLolData().subscribeOn(Schedulers.io());
Observable<LolDataList> observable2 = lolInterfaceRequest.reqLolData().subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<LolDataList, LolDataList, String>() {
@Override
public String apply(@NonNull LolDataList lolDataList, @NonNull LolDataList lolDataList2) throws Exception {
return "合并后的数据为 lolDataList1="+lolDataList.getData().get(0).getDisplayName() + ",lolDataList2="+lolDataList2.getData().get(0).getDisplayName();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失败:" + throwable+"\n");
}
});
结果:
accept: 成功:合并后的数据为 lolDataList1=灭世魔神 诺提勒斯,lolDataList2=灭世魔神 诺提勒斯
subScribeOn
指定的就是发射事件的线程
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略
observeOn
指定的就是订阅者接收事件的线程。
多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次
线程切换
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
结果:
07-14 11:43:11.669 8550-8611/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-14 11:43:11.670 8550-8550/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-14 11:43:11.671 8550-8612/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
场景举例
一个简单的网络请求
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
Log.e(TAG, "create 线程:" + Thread.currentThread().getName() + "\n");
}
}).map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
Log.e(TAG, "map 线程:" + Thread.currentThread().getName() + "\n");
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:转换前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext 线程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "成功:" + data.toString() + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "失败:" + throwable.getMessage() + "\n");
}
});
执行结果:
07-13 09:13:08.664 16010-16578/com.nanchen.rxjava2examples E/RxNetSingleActivity: map 线程:RxCachedThreadScheduler-1
map:转换前:okhttp3.internal.http.RealResponseBody@3e17a0f
07-13 09:13:08.691 16010-16578/com.nanchen.rxjava2examples E/RxNetSingleActivity: create 线程:RxCachedThreadScheduler-1
07-13 09:13:08.691 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: doOnNext 线程:main
07-13 09:13:08.702 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: doOnNext: 保存成功:MobileAddress{error_code=10005, reason='应用未审核超时,请提交认证', result=null}
07-13 09:13:08.737 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: subscribe 线程:main
07-13 09:13:08.741 16010-16010/com.nanchen.rxjava2examples E/RxNetSingleActivity: 成功:MobileAddress{error_code=10005, reason='应用未审核超时,请提交认证', result=null}
多个网络请求串行执行
Observable.create(new ObservableOnSubscribe<LolDataList>() {
@Override
public void subscribe(ObservableEmitter<LolDataList> e) throws Exception {
Log.e(TAG, "create 线程:" + Thread.currentThread().getName() + "\n");
Request.Builder builder = new Request.Builder()
.url("http://lol.zhangyoubao.com/apis/rest/RolesService/new_paper_skin?roleid=19&i_=863472021700411&t_=1433993799448&p_=30689&v_=400500&a_=lol&pkg_=com.anzogame.lol&d_=android&osv_=16&cha=TEST&u_=&")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
okhttp3.Response response = call.execute();
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
LolDataList dataList = new Gson().fromJson(body.string(), LolDataList.class);
e.onNext(dataList);
}
}
}
}).subscribeOn(Schedulers.io())
.doOnNext(new Consumer<LolDataList>() {
@Override
public void accept(LolDataList lolDataList) throws Exception {
// 处理数据保存逻辑
}
})
.flatMap(new Function<LolDataList, ObservableSource<LolDataList>>() {
@Override
public ObservableSource<LolDataList> apply(LolDataList lolDataList) throws Exception {
Request.Builder builder = new Request.Builder()
.url("http://lol.zhangyoubao.com/apis/rest/RolesService/new_paper_skin?roleid=19&i_=863472021700411&t_=1433993799448&p_=30689&v_=400500&a_=lol&pkg_=com.anzogame.lol&d_=android&osv_=16&cha=TEST&u_=&")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
okhttp3.Response response = call.execute();
LolDataList dataList = null;
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
dataList = new Gson().fromJson(body.string(), LolDataList.class);
}
}
Log.e(TAG, "flatMap 线程:" + Thread.currentThread().getName() + "\n");
return Observable.just(dataList);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<LolDataList>() {
@Override
public void accept(LolDataList lolDataList) throws Exception {
Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n"+lolDataList.getData().size());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
执行结果:
13 16:48:53.667 9180-9382/? E/RxCaseFlatMapActivity: create 线程:RxCachedThreadScheduler-2
07-13 16:48:54.104 9180-9382/? E/RxCaseFlatMapActivity: flatMap 线程:RxCachedThreadScheduler-2
07-13 16:48:54.105 9180-9180/? E/RxCaseFlatMapActivity: subscribe 线程:main6
间隔任务实现心跳
mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : "+aLong );
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 设置文本 :"+aLong );
mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n");
}
});
结果:
07-14 11:01:19.050 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 0
07-14 11:01:19.052 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :0
07-14 11:01:20.048 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 1
07-14 11:01:20.050 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :1
07-14 11:01:21.050 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 2
07-14 11:01:21.052 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :2
07-14 11:01:22.049 7399-7660/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: doOnNext : 3
07-14 11:01:22.051 7399-7399/com.nanchen.rxjava2examples E/RxCaseIntervalActivity: accept: 设置文本 :3