在ReactiveX中,一個觀察者(Observer)訂閱一個可觀察對象(Observable)。觀察者對Observable發(fā)射的數(shù)據(jù)或數(shù)據(jù)序列作出響應。這種模式可以極大地簡化并發(fā)操作,因為它創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,在未來某個時刻響應Observable的通知,不需要阻塞等待Observable發(fā)射數(shù)據(jù)。
這篇文章會解釋什么是響應式編程模式(reactive pattern),以及什么是可觀察對象(Observables)和觀察者(observers),其它幾篇文章會展示如何用操作符組合和改變Observable的行為。
http://wiki.jikexueyuan.com/project/rx-docs/images/legend.png" alt="Observable" />
在很多軟件編程任務中,或多或少你都會期望你寫的代碼能按照編寫的順序,一次一個的順序執(zhí)行和完成。但是在ReactiveX中,很多指令可能是并行執(zhí)行的,之后他們的執(zhí)行結(jié)果才會被觀察者捕獲,順序是不確定的。為達到這個目的,你定義一種獲取和變換數(shù)據(jù)的機制,而不是調(diào)用一個方法。在這種機制下,存在一個可觀察對象(Observable),觀察者(Observer)訂閱(Subscribe)它,當數(shù)據(jù)就緒時,之前定義的機制就會分發(fā)數(shù)據(jù)給一直處于等待狀態(tài)的觀察者哨兵。
這種方法的優(yōu)點是,如果你有大量的任務要處理,它們互相之間沒有依賴關系。你可以同時開始執(zhí)行它們,不用等待一個完成再開始下一個(用這種方式,你的整個任務隊列能耗費的最長時間,不會超過任務里最耗時的那個)。
有很多術語可用于描述這種異步編程和設計模式,在在本文里我們使用這些術語:一個觀察者訂閱一個可觀察對象 (An observer subscribes to an Observable)。通過調(diào)用觀察者的方法,Observable發(fā)射數(shù)據(jù)或通知給它的觀察者。
在其它的文檔和場景里,有時我們也將Observer叫做Subscriber、Watcher、Reactor。這個模型通常被稱作Reactor模式。
本文使用類似于Groovy的偽代碼舉例,但是ReactiveX有多種語言的實現(xiàn)。
普通的方法調(diào)用(不是某種異步方法,也不是Rx中的并行調(diào)用),流程通常是這樣的:
用代碼描述就是:
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal
在異步模型中流程更像這樣的:
用代碼描述就是:
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
Subscribe方法用于將觀察者連接到Observable,你的觀察者需要實現(xiàn)以下方法的一個子集:
onNext(T item)
Observable調(diào)用這個方法發(fā)射數(shù)據(jù),方法的參數(shù)就是Observable發(fā)射的數(shù)據(jù),這個方法可能會被調(diào)用多次,取決于你的實現(xiàn)。
onError(Exception ex)
當Observable遇到錯誤或者無法返回期望的數(shù)據(jù)時會調(diào)用這個方法,這個調(diào)用會終止Observable,后續(xù)不會再調(diào)用onNext和onCompleted,onError方法的參數(shù)是拋出的異常。
onComplete
正常終止,如果沒有遇到錯誤,Observable在最后一次調(diào)用onNext之后調(diào)用此方法。
根據(jù)Observable協(xié)議的定義,onNext可能會被調(diào)用零次或者很多次,最后會有一次onCompleted或onError調(diào)用(不會同時),傳遞數(shù)據(jù)給onNext通常被稱作發(fā)射,onCompleted和onError被稱作通知。
下面是一個更完整的例子:
def myOnNext = { item -> /* do something useful with item */ };
def myError = { throwable -> /* react sensibly to a failed call */ };
def myComplete = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
在一些ReactiveX實現(xiàn)中,有一個特殊的觀察者接口Subscriber,它有一個unsubscribe方法。調(diào)用這個方法表示你不關心當前訂閱的Observable了,因此Observable可以選擇停止發(fā)射新的數(shù)據(jù)項(如果沒有其它觀察者訂閱)。
取消訂閱的結(jié)果會傳遞給這個Observable的操作符鏈,而且會導致這個鏈條上的每個環(huán)節(jié)都停止發(fā)射數(shù)據(jù)項。這些并不保證會立即發(fā)生,然而,對一個Observable來說,即使沒有觀察者了,它也可以在一個while循環(huán)中繼續(xù)生成并嘗試發(fā)射數(shù)據(jù)項。
ReactiveX的每種特定語言的實現(xiàn)都有自己的命名偏好,雖然不同的實現(xiàn)之間有很多共同點,但并不存在一個統(tǒng)一的命名標準。
而且,在某些場景中,一些名字有不同的隱含意義,或者在某些語言看來比較怪異。
例如,有一個onEvent命名模式(onNext, onCompleted, onError),在一些場景中,這些名字可能意味著事件處理器已經(jīng)注冊。然而在ReactiveX里,他們是事件處理器的名字。
Observable什么時候開始發(fā)射數(shù)據(jù)序列?這取決于Observable的實現(xiàn),一個"熱"的Observable可能一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個觀察者可以確保會收到整個數(shù)據(jù)序列。
在一些ReactiveX實現(xiàn)里,還存在一種被稱作Connectable的Observable,不管有沒有觀察者訂閱它,這種Observable都不會開始發(fā)射數(shù)據(jù),除非Connect方法被調(diào)用。
對于ReactiveX來說,Observable和Observer僅僅是個開始,它們本身不過是標準觀察者模式的一些輕量級擴展,目的是為了更好的處理事件序列。
ReactiveX真正強大的地方在于它的操作符,操作符讓你可以變換、組合、操縱和處理Observable發(fā)射的數(shù)據(jù)。
Rx的操作符讓你可以用聲明式的風格組合異步操作序列,它擁有回調(diào)的所有效率優(yōu)勢,同時又避免了典型的異步系統(tǒng)中嵌套回調(diào)的缺點。
下面是常用的操作符列表:
這些操作符并不全都是ReactiveX的核心組成部分,有一些是語言特定的實現(xiàn)或可選的模塊。
在RxJava中,一個實現(xiàn)了_Observer_接口的對象可以訂閱(subscribe)一個Observable 類的實例。訂閱者(subscriber)對Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應。這種模式簡化了并發(fā)操作,因為它不需要阻塞等待Observable發(fā)射數(shù)據(jù),而是創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,哨兵在未來某個時刻響應Observable的通知。