前言

本文使用RxJava2.x,仅作为入门笔记,后续持续更新。本文所有代码使用junit测试可直接运行

集成

1
2
3
//RxJava+RxAndroid
implementation "io.reactivex.rxjava2:rxjava:2.2.10"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

简介

RxJava2.x有2种观察模式:

  1. Observable->Observer不支持背压
  2. Flowable->Subscriber支持背压

被观察者也叫做上游,下游即为观察者
被观察者可以想象成通过流的形式发送数据

HelloWorld

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Test
public void hello() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onComplete();
emitter.onNext("5");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}

这里用的是Observer,如果我们只关心onNext事件,可以使用Consumer来替换Observer

  1. emitter用来发送流
  2. Disposable 可以切断上游和下游的联系,但是上游还会发送事件

    线程切换

  3. subscribeOn()指定上游发送事件的线程(多次指定,只有第一次有效)
  4. observeOn()下游接受事件的线程(可以多次切换,与doOnNext对应)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    @Test
    public void threadChange() {
    Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    emitter.onNext("1");
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onNext("4");
    emitter.onComplete();
    emitter.onNext("5");
    }
    }).subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .doOnNext(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    System.out.println("doOnNext: Thread = " + Thread.currentThread() + "value =" + s);
    }
    })
    .observeOn(Schedulers.newThread())
    .subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    System.out.println("subscribe: Thread = " + Thread.currentThread() + "value =" + s);
    }
    });

    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

流的转化

  1. map-对每个流操作后继续流出(改变每个流的输入继续当输入。可改变输入的类型和值)
  2. flatMap-对每个流可进行转换,转换为新的Observable(根据输入重新开始流,不保证顺序)
  3. concatMap-与flatMap类似,保证顺序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Test
    public void mapTest() {
    Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    emitter.onNext("1");
    emitter.onNext("2");
    emitter.onNext("3");
    emitter.onNext("4");
    emitter.onComplete();
    emitter.onNext("5");
    }
    }).map(new Function<String, String>() {
    @Override
    public String apply(String s) throws Exception {
    return s + "+map";
    }
    }).flatMap(new Function<String, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(String s) throws Exception {
    return Observable.just(s + "+flatMap");
    }
    }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    System.out.println("subscribe: Thread = " + Thread.currentThread() + "value =" + s);
    }
    });
    }

流的组合

  • zip-将2个流组合起来

    必须拿到2个流才能组合,所以下游收到的流次数即为发送最少的上游的次数

    一个流结束,整个过程结束同时上游也会结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Test
public void zipTest() {
Observable.zip(Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onNext("5");
emitter.onComplete();
emitter.onNext("6");
}
}).subscribeOn(Schedulers.io()), Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
Thread.sleep(10);
emitter.onNext("B");
Thread.sleep(10);
emitter.onNext("C");
Thread.sleep(10);
emitter.onNext("D");
Thread.sleep(10);
emitter.onComplete();
Thread.sleep(10);
emitter.onNext("E");
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + "-" + s2;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}

流的过滤

我们可以根据条件过滤掉上游的数据,这样上游相当于会丢失一部分数据

  • filter-对流进行过滤,满足条件可通过(可减少下游压力)
  • sample-周期的去取最近的一次(如果想取周期的第一次可以使用throttleFirst)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
     @Test
    public void filterTest() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    for (int i = 0; i < 100; i++) {
    emitter.onNext(i);
    Thread.sleep(10);
    }
    emitter.onComplete();
    }
    }).filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
    return integer % 2 == 0;
    }
    })
    // .throttleFirst(50,TimeUnit.MILLISECONDS)
    .sample(50, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
    System.out.println(integer);
    }
    });

    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

流的条件运算

计算多个观测对象的运算符

  • TakeUntil-第二个观察对象发出流或者终止后,结束订阅,丢弃观测对象的任何流(不是终止观测对象)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @Test
    public void takeUntilTest() {
    Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    for (int i = 0; i < 100; i++) {
    System.out.println("onNext==" + i);
    emitter.onNext(String.valueOf(i));
    Thread.sleep(10);
    }
    }
    }).takeUntil(Observable.interval(100, 10, TimeUnit.MILLISECONDS).doOnNext(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
    System.out.println(aLong);
    }
    }))
    .subscribeOn(Schedulers.io())
    .subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
    System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {
    System.out.println("error" + e);
    }

    @Override
    public void onComplete() {
    System.out.println("complete");
    }
    });

    try {
    Thread.sleep(10000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

Flowable

另一种观察模式是使用Flowable,同时他支持背压。建议只在需要的时候使用,其他情况还是使用Observable
先来看例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Test
public void backPressureTest() {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// System.out.println(Thread.currentThread());
for (int i = 0; i < 128; i++) {
// if (emitter.requested() > 0) {
emitter.onNext(String.valueOf(i));
// }
}
emitter.onComplete();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
// .observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
// System.out.println(Thread.currentThread());
s.request(90);
}

@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable t) {
System.out.println("error" + t);
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
  • 使用Subscriber来订阅。也可以使用Consumer(但这样其实与用Observable没有什么区别)
  • 与Observable不同的是:
    1. 上游可以知道下游的处理能力,来做出相应的策略(FlowableEmitter.requested()即为下游可处理的事件)
    2. 上游的流可以存放起来等待下游处理(由bufferSize控制存放的最大量)
  • 其实我们使用的时候关心的核心就是2个:下游的处理能力、上游的缓存最大数量
  1. 默认下游的处理能力为0
  2. 增加下游处理能力:使用Subscription .request()来增加
  3. 减少下游处理能力:当上游发送流即emitter.onNext()时
  4. 上游的缓存默认为0
  5. 当上游无法缓存时,会走背压策略即BackpressureStrategy即create时的第二个参数
  6. 可以通过observeOn指定缓存最大数量,调用但是不指定内部会有默认值128
  • Subscription和FlowableEmitter
    这里贴出一个结论(如何得到的结论,以后会进行源码分析)
  1. 当不切换线程或者只指定上游的线程即subscribeOn时。Subscription和FlowableEmitter是同一个对象

    我们Subscription.request()设置后上游的FlowableEmitter.requested()会立马改变

  2. 当上游和下游都指定了线程。Subscription和FlowableEmitter是不同的对象,这里就涉及到下游的处理能力同步给上游的问题

    开始上游FlowableEmitter的requested()为128
    下游Subscription,request()96以后,上游的FlowableEmitter会更新到128
    即不是实时同步的

后续问题

关于背压的后续分析问题:

  1. Subscription和FlowableEmitter分别是什么时候赋值的
  2. 不同线程的时候Subscription和FlowableEmitter如何同步