定期收集Observable的數(shù)據(jù)放進一個數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個值。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer.png" alt="buffer" />
Buffer
操作符將一個Observable變換為另一個,原來的Observable正常發(fā)射數(shù)據(jù),變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合。Buffer
操作符在很多語言特定的實現(xiàn)中有很多種變體,它們在如何緩存這個問題上存在區(qū)別。
注意:如果原來的Observable發(fā)射了一個onError
通知,Buffer
會立即傳遞這個通知,而不是首先發(fā)射緩存的數(shù)據(jù),即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)。
Window
操作符與Buffer
類似,但是它在發(fā)射之前把收集到的數(shù)據(jù)放進單獨的Observable,而不是放進一個數(shù)據(jù)結構。
在RxJava中有許多Buffer
的變體:
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer3.png" alt="buffer3" />
buffer(count)
以列表(List)的形式發(fā)射非重疊的緩存,每一個緩存至多包含來自原始Observable的count項數(shù)據(jù)(最后發(fā)射的列表數(shù)據(jù)可能少于count項)
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer4.png" alt="buffer4" />
buffer(count,?skip)
從原始Observable的第一項數(shù)據(jù)開始創(chuàng)建新的緩存,此后每當收到skip
項數(shù)據(jù),用count
項數(shù)據(jù)填充緩存:開頭的一項和后續(xù)的count-1
項,它以列表(List)的形式發(fā)射緩存,取決于count
和skip
的值,這些緩存可能會有重疊部分(比如skip < count時),也可能會有間隙(比如skip > count時)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer1.png" alt="buffer1" />
當它訂閱原來的Observable時,buffer(bufferClosingSelector)
開始將數(shù)據(jù)收集到一個List
,然后它調用bufferClosingSelector
生成第二個Observable,當?shù)诙€Observable發(fā)射一個TClosing
時,buffer
發(fā)射當前的List
,然后重復這個過程:開始組裝一個新的List
,然后調用bufferClosingSelector
創(chuàng)建一個新的Observable并監(jiān)視它。它會一直這樣做直到原來的Observable執(zhí)行完成。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer8.png" alt="buffer8" />
buffer(boundary)
監(jiān)視一個名叫boundary
的Observable,每當這個Observable發(fā)射了一個值,它就創(chuàng)建一個新的List
開始收集來自原始Observable的數(shù)據(jù)并發(fā)射原來的List
。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer2.png" alt="buffer2" />
buffer(bufferOpenings,?bufferClosingSelector)
監(jiān)視這個叫bufferOpenings
的Observable(它發(fā)射BufferOpening
對象),每當bufferOpenings
發(fā)射了一個數(shù)據(jù)時,它就創(chuàng)建一個新的List
開始收集原始Observable的數(shù)據(jù),并將bufferOpenings
傳遞給closingSelector
函數(shù)。這個函數(shù)返回一個Observable。buffer
監(jiān)視這個Observable,當它檢測到一個來自這個Observable的數(shù)據(jù)時,就關閉List
并且發(fā)射它自己的數(shù)據(jù)(之前的那個List)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer5.png" alt="buffer5" />
buffer(timespan,?unit)
定期以List
的形式發(fā)射新的數(shù)據(jù),每個時間段,收集來自原始Observable的數(shù)據(jù)(從前面一個數(shù)據(jù)包裹之后,或者如果是第一個數(shù)據(jù)包裹,從有觀察者訂閱原來的Observale之后開始)。還有另一個版本的buffer
接受一個Scheduler
參數(shù),默認情況下會使用computation
調度器。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer6.png" alt="buffer6" />
每當收到來自原始Observable的count項數(shù)據(jù),或者每過了一段指定的時間后,buffer(timespan,?unit,?count)
就以List
的形式發(fā)射這期間的數(shù)據(jù),即使數(shù)據(jù)項少于count項。還有另一個版本的buffer
接受一個Scheduler
參數(shù),默認情況下會使用computation
調度器。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/buffer7.png" alt="buffer7" />
buffer(timespan,?timeshift,?unit)
在每一個timeshift
時期內(nèi)都創(chuàng)建一個新的List
,然后用原始Observable發(fā)射的每一項數(shù)據(jù)填充這個列表(在把這個List
當做自己的數(shù)據(jù)發(fā)射前,從創(chuàng)建時開始,直到過了timespan
這么長的時間)。如果timespan
長于timeshift
,它發(fā)射的數(shù)據(jù)包將會重疊,因此可能包含重復的數(shù)據(jù)項。
還有另一個版本的buffer
接受一個Scheduler
參數(shù),默認情況下會使用computation
調度器。
你可以使用Buffer
操作符實現(xiàn)反壓backpressure
(意思是,處理這樣一個Observable:它產(chǎn)生數(shù)據(jù)的速度可能比它的觀察者消費數(shù)據(jù)的速度快)。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.buffer2.png" alt="bp.buffer2" />
Buffer操作符可以將大量的數(shù)據(jù)序列縮減為較少的數(shù)據(jù)緩存序列,讓它們更容易處理。例如,你可以按固定的時間間隔,定期關閉和發(fā)射來自一個爆發(fā)性Observable的數(shù)據(jù)緩存。這相當于一個緩沖區(qū)。
示例代碼
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/bp.buffer1.png" alt="bp.buffer1" />
或者,如果你想更進一步,可以在爆發(fā)期將數(shù)據(jù)收集到緩存,然后在爆發(fā)期終止時發(fā)射這些數(shù)據(jù),使用 Debounce
操作符給buffer
操作符發(fā)射一個緩存關閉指示器(buffer closing indicator
)可以做到這一點。
代碼示例:
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);