如果你想給Observable操作符鏈添加多線程功能,你可以指定操作符(或者特定的Observable)在特定的調(diào)度器(Scheduler)上執(zhí)行。
某些ReactiveX的Observable操作符有一些變體,它們可以接受一個Scheduler參數(shù)。這個參數(shù)指定操作符將它們的部分或全部任務(wù)放在一個特定的調(diào)度器上執(zhí)行。
使用ObserveOn和SubscribeOn操作符,你可以讓Observable在一個特定的調(diào)度器上執(zhí)行,ObserveOn指示一個Observable在一個特定的調(diào)度器上調(diào)用觀察者的onNext, onError和onCompleted方法,SubscribeOn更進(jìn)一步,它指示Observable將全部的處理過程(包括發(fā)射數(shù)據(jù)和通知)放在特定的調(diào)度器上執(zhí)行。
下表展示了RxJava中可用的調(diào)度器種類:
調(diào)度器類型 | 效果 |
---|---|
Schedulers.computation(?) | 用于計算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量 |
Schedulers.from(executor) | 使用指定的Executor作為調(diào)度器 |
Schedulers.immediate(?) | 在當(dāng)前線程立即開始執(zhí)行任務(wù) |
Schedulers.io(?) | 用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計算任務(wù),請使用Schedulers.computation();Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器 |
Schedulers.newThread(?) | 為每個任務(wù)創(chuàng)建一個新線程 |
Schedulers.trampoline(?) | 當(dāng)其它排隊的任務(wù)完成后,在當(dāng)前線程排隊開始執(zhí)行 |
在RxJava中,某些Observable操作符的變體允許你設(shè)置用于操作執(zhí)行的調(diào)度器,其它的則不在任何特定的調(diào)度器上執(zhí)行,或者在一個指定的默認(rèn)調(diào)度器上執(zhí)行。下面的表格個列出了一些操作符的默認(rèn)調(diào)度器:
操作符 | 調(diào)度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan,?count) | computation |
buffer(timespan,?timeshift) | computation |
debounce(timeout,?unit) | computation |
delay(delay,?unit) | computation |
delaySubscription(delay,?unit) | computation |
interval | computation |
repeat | trampoline |
replay(time,?unit) | computation |
replay(buffersize,?time,?unit) | computation |
replay(selector,?time,?unit) | computation |
replay(selector,?buffersize,?time,?unit) | computation |
retry | trampoline |
sample(period,?unit) | computation |
skip(time,?unit) | computation |
skipLast(time,?unit) | computation |
take(time,?unit) | computation |
takeLast(time,?unit) | computation |
takeLast(count,?time,?unit) | computation |
takeLastBuffer(time,?unit) | computation |
takeLastBuffer(count,?time,?unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector,?timeoutSelector) | immediate |
timeout(timeoutSelector,?other) | immediate |
timeout(timeout,?timeUnit) | computation |
timeout(firstTimeoutSelector,?timeoutSelector,?other) | immediate |
timeout(timeout,?timeUnit,?other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan,?count) | computation |
window(timespan,?timeshift) | computation |
除了將這些調(diào)度器傳遞給RxJava的Observable操作符,你也可以用它們調(diào)度你自己的任務(wù)。下面的示例展示了Scheduler.Worker的用法:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
要調(diào)度遞歸的方法調(diào)用,你可以使用schedule,然后再用schedule(this),示例:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
Worker類的對象實現(xiàn)了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在訂閱取消時停止任務(wù),或者從正在調(diào)度的任務(wù)內(nèi)部取消訂閱,示例:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
Worker同時是Subscription,因此你可以(通常也應(yīng)該)調(diào)用它的unsubscribe方法通知可以掛起任務(wù)和釋放資源了。
你可以使用schedule(action,delayTime,timeUnit)在指定的調(diào)度器上延時執(zhí)行你的任務(wù),下面例子中的任務(wù)將在500毫秒之后開始執(zhí)行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另一個版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法讓你可以安排一個定期執(zhí)行的任務(wù),下面例子的任務(wù)將在500毫秒之后執(zhí)行,然后每250毫秒執(zhí)行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler讓你可以對調(diào)度器的時鐘表現(xiàn)進(jìn)行手動微調(diào)。這對依賴精確時間安排的任務(wù)的測試很有用處。這個調(diào)度器有三個額外的方法: