Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現(xiàn)中(如RxJava),它同時充當(dāng)了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù),也可以發(fā)射新的數(shù)據(jù)。
由于一個Subject訂閱一個Observable,它可以觸發(fā)這個Observable開始發(fā)射數(shù)據(jù)(如果那個Observable是"冷"的--就是說,它等待有訂閱才開始發(fā)射數(shù)據(jù))。因此有這樣的效果,Subject可以把原來那個"冷"的Observable變成"熱"的。
針對不同的場景一共有四種類型的Subject。他們并不是在所有的實現(xiàn)中全部都存在,而且一些實現(xiàn)使用其它的命名約定(例如,在RxScala中Subject被稱作PublishSubject)。
一個AsyncSubject只在原始Observable完成后,發(fā)射來自原始Observable的最后一個值。(如果原始Observable沒有發(fā)射任何值,AsyncObject也不發(fā)射任何值)它會把這最后一個值發(fā)射給任何后續(xù)的觀察者。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.AsyncSubject.png" alt="" />
然而,如果原始的Observable因為發(fā)生了錯誤而終止,AsyncSubject將不會發(fā)射任何數(shù)據(jù),只是簡單的向前傳遞這個錯誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.AsyncSubject.e.png" alt="" />
當(dāng)觀察者訂閱BehaviorSubject時,它開始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時還沒有收到任何數(shù)據(jù),它會發(fā)射一個默認值),然后繼續(xù)發(fā)射其它任何來自原始Observable的數(shù)據(jù)。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.BehaviorSubject.png" alt="" />
然而,如果原始的Observable因為發(fā)生了一個錯誤而終止,BehaviorSubject將不會發(fā)射任何數(shù)據(jù),只是簡單的向前傳遞這個錯誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.BehaviorSubject.e.png" alt="" />
PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者。需要注意的是,PublishSubject可能會一創(chuàng)建完成就立刻開始發(fā)射數(shù)據(jù)(除非你可以阻止它發(fā)生),因此這里有一個風(fēng)險:在Subject被創(chuàng)建后到有觀察者訂閱它之前這個時間段內(nèi),一個或多個數(shù)據(jù)可能會丟失。如果要確保來自原始Observable的所有數(shù)據(jù)都被分發(fā),你需要這樣做:或者使用Create創(chuàng)建那個Observable以便手動給它引入"冷"Observable的行為(當(dāng)所有觀察者都已經(jīng)訂閱時才開始發(fā)射數(shù)據(jù)),或者改用ReplaySubject。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.PublishSubject.png" alt="" />
如果原始的Observable因為發(fā)生了一個錯誤而終止,PublishSubject將不會發(fā)射任何數(shù)據(jù),只是簡單的向前傳遞這個錯誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.PublishSubject.e.png" alt="" />
ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時訂閱的。也有其它版本的ReplaySubject,在重放緩存增長到一定大小的時候或過了一段時間后會丟棄舊的數(shù)據(jù)(原始Observable發(fā)射的)。
如果你把ReplaySubject當(dāng)作一個觀察者使用,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導(dǎo)致同時(非順序)調(diào)用,這會違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性。
http://wiki.jikexueyuan.com/project/rx-docs/images/S.ReplaySubject.png" alt="" />
假設(shè)你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你可以調(diào)用它的asObservable方法,這個方法返回一個Observable。具體使用方法可以參考Javadoc文檔。
如果你把 Subject
當(dāng)作一個 Subscriber
使用,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導(dǎo)致同時(非順序)調(diào)用,這會違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性。
要避免此類問題,你可以將 Subject
轉(zhuǎn)換為一個 SerializedSubject
,類似于這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );