ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,最初是LINQ的一個擴展,由微軟的架構(gòu)師Erik Meijer領(lǐng)導(dǎo)的團隊開發(fā),在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流,Rx庫支持.NET、JavaScript和C++,Rx近幾年越來越流行了,現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言了,Rx的大部分語言庫由ReactiveX這個組織負責(zé)維護,比較流行的有RxJava/RxJS/Rx.NET,社區(qū)網(wǎng)站是 reactivex.io。
微軟給的定義是,Rx是一個函數(shù)庫,讓開發(fā)者可以利用可觀察序列和LINQ風(fēng)格查詢操作符來編寫異步和基于事件的程序,使用Rx,開發(fā)者可以用Observables表示異步數(shù)據(jù)流,用LINQ操作符查詢異步數(shù)據(jù)流, 用Schedulers參數(shù)化異步數(shù)據(jù)流的并發(fā)處理,Rx可以這樣定義:Rx = Observables + LINQ + Schedulers。
ReactiveX.io給的定義是,Rx是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口,ReactiveX結(jié)合了觀察者模式、迭代器模式和函數(shù)式編程的精華。
很多公司都在使用ReactiveX,例如Microsoft、Netflix、Github、Trello、SoundCloud。
ReactiveX不僅僅是一個編程接口,它是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言。
Rx擴展了觀察者模式用于支持數(shù)據(jù)和事件序列,添加了一些操作符,它讓你可以聲明式的組合這些序列,而無需關(guān)注底層的實現(xiàn):如線程、同步、線程安全、并發(fā)數(shù)據(jù)結(jié)構(gòu)和非阻塞IO。
Observable通過使用最佳的方式訪問異步數(shù)據(jù)序列填補了這個間隙
單個數(shù)據(jù) | 多個數(shù)據(jù) | |
---|---|---|
同步 | T getData() |
Iterable<T> getData() |
異步 | Future<T> getData() |
Observable<T> getData() |
Rx的Observable模型讓你可以像使用集合數(shù)據(jù)一樣操作異步事件流,對異步事件流使用各種簡單、可組合的操作。
對于單層的異步操作來說,Java中Future對象的處理方式是非常簡單有效的,但是一旦涉及到嵌套,它們就開始變得異常繁瑣和復(fù)雜。使用Future很難很好的組合帶條件的異步執(zhí)行流程(考慮到運行時各種潛在的問題,甚至可以說是不可能的),當(dāng)然,要想實現(xiàn)還是可以做到的,但是非常困難,或許你可以用Future.get()
,但這樣做,異步執(zhí)行的優(yōu)勢就完全沒有了。從另一方面說,Rx的Observable一開始就是為組合異步數(shù)據(jù)流準備的。
Rx的Observable不僅支持處理單獨的標量值(就像Future可以做的),也支持數(shù)據(jù)序列,甚至是無窮的數(shù)據(jù)流。Observable
是一個抽象概念,適用于任何場景。Observable擁有它的近親Iterable的全部優(yōu)雅與靈活。
Observable是異步的雙向push,Iterable是同步的單向pull,對比:
事件 | Iterable(pull) | Observable(push) |
---|---|---|
獲取數(shù)據(jù) | T next() |
onNext(T) |
異常處理 | throws Exception |
onError(Exception) |
任務(wù)完成 | !hasNext() |
onCompleted() |
Rx對于對于并發(fā)性或異步性沒有任何特殊的偏好,Observable可以用任何方式實現(xiàn),線程池、事件循環(huán)、非阻塞IO、Actor模式,任何滿足你的需求的,你擅長或偏好的方式都可以。無論你選擇怎樣實現(xiàn)它,無論底層實現(xiàn)是阻塞的還是非阻塞的,客戶端代碼將所有與Observable的交互都當(dāng)做是異步的。
Observable是如何實現(xiàn)的?
public Observable<data> getData();
從Observer的視角看,這些都無所謂,重要的是:使用Rx,你可以改變你的觀念,你可以在完全不影響Observable程序庫使用者的情況下,徹底的改變Observable的底層實現(xiàn)。
回調(diào)在不阻塞任何事情的情況下,解決了Future.get()
過早阻塞的問題。由于響應(yīng)結(jié)果一旦就緒Callback就會被調(diào)用,它們天生就是高效率的。不過,就像使用Future一樣,對于單層的異步執(zhí)行來說,回調(diào)很容易使用,對于嵌套的異步組合,它們顯得非常笨拙。
Rx在大量的編程語言中都有實現(xiàn),并尊重實現(xiàn)語言的風(fēng)格,而且更多的實現(xiàn)正在飛速增加。
Rx提供了一系列的操作符,你可以使用它們來過濾(filter)、選擇(select)、變換(transform)、結(jié)合(combine)和組合(compose)多個Observable,這些操作符讓執(zhí)行和復(fù)合變得非常高效。
你可以把Observable當(dāng)做Iterable的推送方式的等價物,使用Iterable,消費者從生產(chǎn)者那拉取數(shù)據(jù),線程阻塞直至數(shù)據(jù)準備好。使用Observable,在數(shù)據(jù)準備好時,生產(chǎn)者將數(shù)據(jù)推送給消費者。數(shù)據(jù)可以同步或異步的到達,這種方式更靈活。
下面的例子展示了相似的高階函數(shù)在Iterable和Observable上的應(yīng)用
// Iterable
getDataFromLocalMemory()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.forEach({ println "next => " + it })
// Observable
getDataFromNetwork()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.subscribe({ println "onNext => " + it })
Observable類型給GOF的觀察者模式添加了兩種缺少的語義,這樣就和Iterable類型中可用的操作一致了:
onCompleted
方法)onError
方法)有了這兩種功能,Rx就能使Observable與Iterable保持一致了,唯一的不同是數(shù)據(jù)流的方向。任何對Iterable的操作,你都可以對Observable使用。
這里給出一些名詞的翻譯