RxJava初探
前言
本文使用RxJava2.x,仅作为入门笔记,后续持续更新。本文所有代码使用junit测试可直接运行
集成
1 | //RxJava+RxAndroid |
简介
RxJava2.x有2种观察模式:
- Observable->Observer不支持背压
- Flowable->Subscriber支持背压
被观察者也叫做上游,下游即为观察者
被观察者可以想象成通过流的形式发送数据
HelloWorld
1 |
|
这里用的是Observer,如果我们只关心onNext事件,可以使用Consumer来替换Observer
- emitter用来发送流
- Disposable 可以切断上游和下游的联系,但是上游还会发送事件
线程切换
- subscribeOn()指定上游发送事件的线程(多次指定,只有第一次有效)
- observeOn()下游接受事件的线程(可以多次切换,与doOnNext对应)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void threadChange() {
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onComplete();
emitter.onNext("5");
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
public void accept(String s) throws Exception {
System.out.println("doOnNext: Thread = " + Thread.currentThread() + "value =" + s);
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
System.out.println("subscribe: Thread = " + Thread.currentThread() + "value =" + s);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
流的转化
- map-对每个流操作后继续流出(改变每个流的输入继续当输入。可改变输入的类型和值)
- flatMap-对每个流可进行转换,转换为新的Observable(根据输入重新开始流,不保证顺序)
- concatMap-与flatMap类似,保证顺序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void mapTest() {
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onComplete();
emitter.onNext("5");
}
}).map(new Function<String, String>() {
public String apply(String s) throws Exception {
return s + "+map";
}
}).flatMap(new Function<String, ObservableSource<String>>() {
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s + "+flatMap");
}
}).subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
System.out.println("subscribe: Thread = " + Thread.currentThread() + "value =" + s);
}
});
}
流的组合
- zip-将2个流组合起来
必须拿到2个流才能组合,所以下游收到的流次数即为发送最少的上游的次数
一个流结束,整个过程结束同时上游也会结束
1 |
|
流的过滤
我们可以根据条件过滤掉上游的数据,这样上游相当于会丢失一部分数据
- filter-对流进行过滤,满足条件可通过(可减少下游压力)
- sample-周期的去取最近的一次(如果想取周期的第一次可以使用throttleFirst)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void filterTest() {
Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 100; i++) {
emitter.onNext(i);
Thread.sleep(10);
}
emitter.onComplete();
}
}).filter(new Predicate<Integer>() {
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
// .throttleFirst(50,TimeUnit.MILLISECONDS)
.sample(50, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
流的条件运算
计算多个观测对象的运算符
- TakeUntil-第二个观察对象发出流或者终止后,结束订阅,丢弃观测对象的任何流(不是终止观测对象)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void takeUntilTest() {
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 100; i++) {
System.out.println("onNext==" + i);
emitter.onNext(String.valueOf(i));
Thread.sleep(10);
}
}
}).takeUntil(Observable.interval(100, 10, TimeUnit.MILLISECONDS).doOnNext(new Consumer<Long>() {
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
}))
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
public void onSubscribe(Disposable d) {
}
public void onNext(String s) {
System.out.println(s);
}
public void onError(Throwable e) {
System.out.println("error" + e);
}
public void onComplete() {
System.out.println("complete");
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Flowable
另一种观察模式是使用Flowable,同时他支持背压。建议只在需要的时候使用,其他情况还是使用Observable
先来看例子
1 |
|
- 使用Subscriber来订阅。也可以使用Consumer(但这样其实与用Observable没有什么区别)
- 与Observable不同的是:
- 上游可以知道下游的处理能力,来做出相应的策略(FlowableEmitter.requested()即为下游可处理的事件)
- 上游的流可以存放起来等待下游处理(由bufferSize控制存放的最大量)
- 其实我们使用的时候关心的核心就是2个:下游的处理能力、上游的缓存最大数量
- 默认下游的处理能力为0
- 增加下游处理能力:使用Subscription .request()来增加
- 减少下游处理能力:当上游发送流即emitter.onNext()时
- 上游的缓存默认为0
- 当上游无法缓存时,会走背压策略即BackpressureStrategy即create时的第二个参数
- 可以通过observeOn指定缓存最大数量,调用但是不指定内部会有默认值128
- Subscription和FlowableEmitter
这里贴出一个结论(如何得到的结论,以后会进行源码分析)
- 当不切换线程或者只指定上游的线程即subscribeOn时。Subscription和FlowableEmitter是同一个对象
我们Subscription.request()设置后上游的FlowableEmitter.requested()会立马改变
- 当上游和下游都指定了线程。Subscription和FlowableEmitter是不同的对象,这里就涉及到下游的处理能力同步给上游的问题
开始上游FlowableEmitter的requested()为128
下游Subscription,request()96以后,上游的FlowableEmitter会更新到128
即不是实时同步的
后续问题
关于背压的后续分析问题:
- Subscription和FlowableEmitter分别是什么时候赋值的
- 不同线程的时候Subscription和FlowableEmitter如何同步