RxJava操作符大全
本文基于RxJava 2.0文档,Github地址:https://github.com/ReactiveX/RxJava
注意查看RxJava 1.x 与 2.x的不同
1 RxJava2的简单使用¶
先上一段示例代码
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "subscribe thread = " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
})
.observeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.e(TAG, "apply thread = " + Thread.currentThread().getName() + ", integer = " + integer);
return String.valueOf(integer);
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe thread = " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext thread = " + Thread.currentThread().getName() + ", String = " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError thread = " + Thread.currentThread().getName() + ", Throwable = " + e);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete thread = " + Thread.currentThread().getName());
}
});
E/RxJavaActivity: onSubscribe thread = main
E/RxJavaActivity: subscribe thread = main
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 1
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 2
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 3
E/RxJavaActivity: apply thread = RxCachedThreadScheduler-1, integer = 4
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 1
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 2
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 3
E/RxJavaActivity: onNext thread = RxNewThreadScheduler-1, String = 4
E/RxJavaActivity: onComplete thread = RxNewThreadScheduler-1
这就是RxJava使用的三部曲 1. 创建Observable
创建Observable
时,回调的是ObservableEmitter
,即发射器,用于发射数据(onNext
)和通知(onError/onComplete
) 2. 创建Observer
创建的Observer
中有一个回调方法onSubscribe
,传递参数为Disposable
,可用于解除订阅。 3. 建立订阅关系observable.subscribe(observer)
RxJava2中仍然保留了其他简化订阅方法,我们可以根据需求,选择相应的简化订阅(Consumer
)。
同时,RxJava2引入了新的类Flowable
,专门用于应对背压(backpressure)问题,但这并不是RxJava2.x中新引入的概念。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则会造成点击两次的效果。
在RxJava2.x中将其独立了出来,取名为Flowable
。因此,Observable
已经不具备背压处理能力。
2 RxJava中的操作符¶
2.1 阻塞操作¶
当阻塞的Observables
执行完成后,其他代码才能执行.
- forEach - invoke a function on each item emitted by the Observable; block until the Observable completes
-
first/firstOrDefault - block until the Observable emits an item, then return the first item emitted by the Observable or a default item if the Observable did not emit an item
-
last/lastOrDefault - block until the Observable completes, then return the last item emitted by the Observable or a default item if there is no last item
-
mostRecent - returns an iterable that always returns the item most recently emitted by the Observable
- next - returns an iterable that blocks until the Observable emits another item, then returns that item
- latest - returns an iterable that blocks until or unless the Observable emits an item that has not been returned by the iterable, then returns that item
- single - if the Observable completes after emitting a single item, return that item, otherwise throw an exception
- singleOrDefault - if the Observable completes after emitting a single item, return that item, otherwise return a default item
-
toFuture - convert the Observable into a Future
-
toIterable - convert the sequence emitted by the Observable into an Iterable
- getIterator - convert the sequence emitted by the Observable into an Iterator
示例代码
Observable.just(1, 2, 3).observeOn(Schedulers.io()).blockingForEach(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, integer + " - " + Thread.currentThread().getName());
}
});
Log.e(TAG, "done - " + Thread.currentThread().getName());
02-12 16:33:13.083 7539-7539/? E/RxJavaActivity: 1 - main
02-12 16:33:13.083 7539-7539/? E/RxJavaActivity: 2 - main
02-12 16:33:13.083 7539-7539/? E/RxJavaActivity: 3 - main
02-12 16:33:13.087 7539-7539/? E/RxJavaActivity: done - main
注意: RxJava2中针对此部分有了变化:
toBlocking().y - inlined as blockingY() operators, except toFuture
也就是说,在RxJava2中使用上述操作符,应该是这样的Observable.just(...).blockingForEach
。即使用时加上前缀blockingXXX
2.2 组合操作¶
-
startWith - emit a specified sequence of items before beginning to emit the items from the Observable
-
merge - combine multiple Observables into one
-
mergeDelayError - combine multiple Observables into one, allowing error-free Observables to continue before propagating errors
-
zip - combine sets of items emitted by two or more Observables together via a specified function and emit items based on the results of this function
-
combineLatest - when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
-
join and groupJoin - combine the items emitted by two Observables whenever one item from one Observable falls within a window of duration specified by an item emitted by the other Observable
如果一个Observable发射了一条数据,只要在另一个Observable发射的数据定义的时间窗口内,就结合两个Observable发射的数据,然后发射结合后的数据。
目标Observable和源Observable发射的数据都有一个有效时间限制,比如目标发射了一条数据(a)有效期为3s,过了2s后,源发射了一条数据(b),因为2s<3s,目标的那条数据还在有效期,所以可以组合为ab;再过2s,源又发射了一条数据(c),这时候一共过去了4s,目标的数据a已经过期,所以不能组合了…
- switchOnNext - convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently emitted of those Observables
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。
Switch订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。
注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后,前一个Observable发射的数据将被丢弃(就像图例上的那个黄色圆圈一样)。
组合操作例子:
Observable.just(1, 2, 3)
.startWith(0)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept " + integer);
}
});
2.3 条件与Boolean操作符¶
条件操作符 - amb — given two or more source Observables, emits all of the items from the first of these Observables to emit an item
-
defaultIfEmpty — emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items
-
skipUntil — discard items emitted by a source Observable until a second Observable emits an item, then emit the remainder of the source Observable's items
-
skipWhile — discard items emitted by an Observable until a specified condition is false, then emit the remainder
-
takeUntil — emits the items from the source Observable until a second Observable emits an item or issues a notification
-
takeWhile and takeWhileWithIndex — emit items emitted by an Observable as long as a specified condition is true, then skip the remainder
Boolean操作符 - all — determine whether all items emitted by an Observable meet some criteria
-
contains — determine whether an Observable emits a particular item or not
-
exists and isEmpty — determine whether an Observable emits any items or not
- sequenceEqual — test the equality of the sequences emitted by two Observables
2.4 Connectable Observable¶
A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect()
method is called. In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.
- ConnectableObservable.connect — instructs a Connectable Observable to begin emitting items
- Observable.publish — represents an Observable as a Connectable Observable
-
Observable.replay — ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items
-
ConnectableObservable.refCount — makes a Connectable Observable behave like an ordinary Observable
2.5 Error Handling Operators¶
There are a variety of operators that you can use to react to or recover from onError notifications from Observables. For example, you might: 1. swallow the error and switch over to a backup Observable to continue the sequence 2. swallow the error and emit a default item 3. swallow the error and immediately try to restart the failed Observable 4. swallow the error and try to restart the failed Observable after some back-off interval
The following pages explain these operators. - onErrorResumeNext — instructs an Observable to emit a sequence of items if it encounters an error
- onErrorReturn — instructs an Observable to emit a particular item when it encounters an error
- onExceptionResumeNext — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
-
retry — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
-
retryWhen — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
2.6 Filtering¶
-
filter — filter items emitted by an Observable
-
takeLast — only emit the last n items emitted by an Observable
-
last — emit only the last item emitted by an Observable
-
lastOrDefault — emit only the last item emitted by an Observable, or a default value if the source Observable is empty
- takeLastBuffer — emit the last n items emitted by an Observable, as a single list item
-
skip — ignore the first n items emitted by an Observable
-
skipLast — ignore the last n items emitted by an Observable
-
take — emit only the first n items emitted by an Observable
-
first and takeFirst — emit only the first item emitted by an Observable, or the first item that meets some condition
-
firstOrDefault — emit only the first item emitted by an Observable, or the first item that meets some condition, or a default value if the source Observable is empty
-
elementAt — emit item n emitted by the source Observable
-
elementAtOrDefault — emit item n emitted by the source Observable, or a default item if the source Observable emits fewer than n items
-
sample or throttleLast — emit the most recent items emitted by an Observable within periodic time intervals
-
throttleFirst — emit the first items emitted by an Observable within periodic time intervals
-
throttleWithTimeout or debounce — only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items
-
timeout — emit items from a source Observable, but issue an exception if no item is emitted in a specified timespan
-
distinct — suppress duplicate items emitted by the source Observable
-
distinctUntilChanged — suppress duplicate consecutive items emitted by the source Observable
- ofType — emit only those items from the source Observable that are of a particular class
- ignoreElements — discard the items emitted by the source Observable and only pass through the error or completed notification
2.7 聚合操作Aggregate¶
-
concat — concatenate two or more Observables sequentially
-
count and countLong — counts the number of items emitted by an Observable and emits this count
-
reduce — apply a function to each emitted item, sequentially, and emit only the final accumulated value
-
collect — collect items emitted by the source Observable into a single mutable data structure and return an Observable that emits this structure
-
toList — collect all items from an Observable and emit them as a single List
-
toSortedList — collect all items from an Observable and emit them as a single, sorted List
- toMap — convert the sequence of items emitted by an Observable into a map keyed by a specified key function
- toMultiMap — convert the sequence of items emitted by an Observable into an ArrayList that is also a map keyed by a specified key function
2.8 Observable Creation¶
-
just — convert an object or several objects into an Observable that emits that object or those objects
-
from — convert an Iterable, a Future, or an Array into an Observable
-
create — advanced use only! create an Observable from scratch by means of a function, consider fromEmitter instead
-
fromEmitter — create safe, backpressure-enabled, unsubscription-supporting Observable via a function and push events.
-
defer — do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription
-
range — create an Observable that emits a range of sequential integers
-
interval — create an Observable that emits a sequence of integers spaced by a given time interval
-
timer — create an Observable that emits a single item after a given delay
-
empty — create an Observable that emits nothing and then completes
-
error — create an Observable that emits nothing and then signals an error
-
never — create an Observable that emits nothing at all
2.9 Transformational¶
-
map — transform the items emitted by an Observable by applying a function to each of them
-
flatMap, concatMap, and flatMapIterable — transform the items emitted by an Observable into Observables (or Iterables), then flatten this into a single Observable
-
switchMap — transform the items emitted by an Observable into Observables, and mirror those items emitted by the most-recently transformed Observable
-
scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
-
groupBy — divide an Observable into a set of Observables that emit groups of items from the original Observable, organized by key
-
buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
-
window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
-
cast — cast all items from the source Observable into a particular type before reemitting them
2.10 Utility Operators¶
-
materialize — convert an Observable into a list of Notifications
-
dematerialize — convert a materialized Observable back into its non-materialized form
-
timestamp — attach a timestamp to every item emitted by an Observable
-
serialize — force an Observable to make serialized calls and to be well-behaved
-
cache — remember the sequence of items emitted by the Observable and emit the same sequence to future Subscribers
-
observeOn — specify on which Scheduler a Subscriber should observe the Observable
-
subscribeOn — specify which Scheduler an Observable should use when its subscription is invoked
-
doOnEach — register an action to take whenever an Observable emits an item
-
doOnCompleted — register an action to take when an Observable completes successfully
- doOnError — register an action to take when an Observable completes with an error
- doOnTerminate — register an action to take when an Observable completes, either successfully or with an error
- doOnSubscribe — register an action to take when an observer subscribes to an Observable
- doOnUnsubscribe — register an action to take when an observer unsubscribes from an Observable
- finallyDo — register an action to take when an Observable completes
-
delay — shift the emissions from an Observable forward in time by a specified amount
-
delaySubscription — hold an Subscriber's subscription request for a specified amount of time before passing it on to the source Observable
-
timeInterval — emit the time lapsed between consecutive emissions of a source Observable
-
using — create a disposable resource that has the same lifespan as an Observable
- single — if the Observable completes after emitting a single item, return that item, otherwise throw an exception
- singleOrDefault — if the Observable completes after emitting a single item, return that item, otherwise return a default item
-
repeat — create an Observable that emits a particular item or sequence of items repeatedly
-
repeatWhen — create an Observable that emits a particular item or sequence of items repeatedly, depending on the emissions of a second Observable
3 操作符决策树¶
This tree can help you find the ReactiveX Observable operator you’re looking for.
I want to create a new Observable
…that emits a particular item: Just
…that was returned from a function called at subscribe-time: Start
…that was returned from an Action
, Callable
, Runnable
, or something of that sort, called at subscribe-time: From
…after a specified delay: Timer
…that pulls its emissions from a particular Array
, Iterable
, or something like that: From
…by retrieving it from a Future: Start
…that obtains its sequence from a Future: From
…that emits a sequence of items repeatedly: Repeat
…from scratch, with custom logic: Create
…for each observer that subscribes: Defer
…that emits a sequence of integers: Range
…at particular intervals of time: Interval
…after a specified delay: Timer
…that completes without emitting items: Empty
…that does nothing at all: Never
I want to create an Observable by combining other Observables
…and emitting all of the items from all of the Observables in whatever order they are received: Merge
…and emitting all of the items from all of the Observables, one Observable at a time: Concat
…by combining the items from two or more Observables sequentially to come up with new items to emit
…whenever each of the Observables has emitted a new item: Zip
…whenever any of the Observables has emitted a new item: CombineLatest
…whenever an item is emitted by one Observable in a window defined by an item emitted by another: Join
…by means of Pattern
and Plan
intermediaries: And/Then/When
…and emitting the items from only the most-recently emitted of those Observables: Switch
I want to emit the items from an Observable after transforming them
…one at a time with a function: Map
…by emitting all of the items emitted by corresponding Observables: FlatMap
…one Observable at a time, in the order they are emitted: ConcatMap
…based on all of the items that preceded them: Scan
…by attaching a timestamp to them: Timestamp
…into an indicator of the amount of time that lapsed before the emission of the item: TimeInterval
I want to shift the items emitted by an Observable forward in time before reemitting them: Delay
I want to transform items and notifications from an Observable into items and reemit them
…by wrapping them in Notification
objects: Materialize
…which I can then unwrap again with: Dematerialize
I want to ignore all items emitted by an Observable and only pass along its completed/error notification: IgnoreElements
I want to mirror an Observable but prefix items to its sequence: StartWith
…only if its sequence is empty: DefaultIfEmpty
I want to collect items from an Observable and reemit them as buffers of items: Buffer
…containing only the last items emitted: TakeLastBuffer
I want to split one Observable into multiple Observables: Window
…so that similar items end up on the same Observable: GroupBy
I want to retrieve a particular item emitted by an Observable:
…the last item emitted before it completed: Last
…the sole item it emitted: Single
…the first item it emitted: First
I want to reemit only certain items from an Observable
…by filtering out those that do not match some predicate: Filter
…that is, only the first item: First
…that is, only the first item*s*: Take
…that is, only the last item: Last
…that is, only item n: ElementAt
…that is, only those items after the first items
…that is, after the first n items: Skip
…that is, until one of those items matches a predicate: SkipWhile
…that is, after an initial period of time: Skip
…that is, after a second Observable emits an item: SkipUntil
…that is, those items except the last items
…that is, except the last n items: SkipLast
…that is, until one of those items matches a predicate: TakeWhile
…that is, except items emitted during a period of time before the source completes: SkipLast
…that is, except items emitted after a second Observable emits an item: TakeUntil
…by sampling the Observable periodically: Sample
…by only emitting items that are not followed by other items within some duration: Debounce
…by suppressing items that are duplicates of already-emitted items: Distinct
…if they immediately follow the item they are duplicates of: DistinctUntilChanged
…by delaying my subscription to it for some time after it begins emitting items: DelaySubscription
I want to reemit items from an Observable only on condition that it was the first of a collection of Observables to emit an item: Amb
I want to evaluate the entire sequence of items emitted by an Observable
…and emit a single boolean indicating if all of the items pass some test: All
…and emit a single boolean indicating if the Observable emitted any item (that passes some test): Contains
…and emit a single boolean indicating if the Observable emitted no items: IsEmpty
…and emit a single boolean indicating if the sequence is identical to one emitted by a second Observable: SequenceEqual
…and emit the average of all of their values: Average
…and emit the sum of all of their values: Sum
…and emit a number indicating how many items were in the sequence: Count
…and emit the item with the maximum value: Max
…and emit the item with the minimum value: Min
…by applying an aggregation function to each item in turn and emitting the result: Scan
I want to convert the entire sequence of items emitted by an Observable into some other data structure: To
I want an operator to operate on a particular Scheduler: SubscribeOn
…when it notifies observers: ObserveOn
I want an Observable to invoke a particular action when certain events occur: Do
I want an Observable that will notify observers of an error: Throw
…if a specified period of time elapses without it emitting an item: Timeout
I want an Observable to recover gracefully
…from a timeout by switching to a backup Observable: Timeout
…from an upstream error notification: Catch
…by attempting to resubscribe to the upstream Observable: Retry
I want to create a resource that has the same lifespan as the Observable: Using
I want to subscribe to an Observable and receive a Future
that blocks until the Observable completesStartI want an Observable that does not start emitting items to subscribers until asked: Publish
…and then only emits the last item in its sequence: PublishLast
…and then emits the complete sequence, even to those who subscribe after the sequence has begun: Replay
…but I want it to go away once all of its subscribers unsubscribe: RefCount
…and then I want to ask it to start: Connect