你可以在這里找到JVM平臺(tái)幾種語(yǔ)言的例子 language adaptor:
下面的示例從一個(gè)字符串列表創(chuàng)建一個(gè)Observable,然后使用一個(gè)方法訂閱這個(gè)Observable。
public static void hello(String... names) {
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Hello " + s + "!");
}
});
}
hello("Ben", "George");
Hello Ben!
Hello George!
def hello(String[] names) {
Observable.from(names).subscribe { println "Hello ${it}!" }
}
hello("Ben", "George")
Hello Ben!
Hello George!
(defn hello
[&rest]
(-> (Observable/from &rest)
(.subscribe #(println (str "Hello " % "!")))))
(hello ["Ben" "George"])
Hello Ben!
Hello George!
import rx.lang.scala.Observable
def hello(names: String*) {
Observable.from(names) subscribe { n =>
println(s"Hello $n!")
}
}
hello("Ben", "George")
Hello Ben!
Hello George!
要使用RxJava,首先你需要?jiǎng)?chuàng)建Observable(它們發(fā)射數(shù)據(jù)序列),使用Observable操作符變換那些Observables,獲取嚴(yán)格符合你要求的數(shù)據(jù),然后觀察并處理對(duì)這些數(shù)據(jù)序列(通過(guò)實(shí)現(xiàn)觀察者或訂閱者,然后訂閱變換后的Observable)。
要?jiǎng)?chuàng)建Observable,你可以手動(dòng)實(shí)現(xiàn)Observable的行為,也可以傳遞一個(gè)函數(shù)給create(?)
,還可以使用這些 創(chuàng)建操作符 將一個(gè)已有的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable。
你可以使用just(?)
和from(?)
方法將對(duì)象,列表,對(duì)象屬性轉(zhuǎn)換為發(fā)射那些對(duì)象的Observable:
Observable<String> o = Observable.from("a", "b", "c");
def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.from(list);
Observable<String> o = Observable.just("one object");
轉(zhuǎn)換后的Observable每發(fā)射一項(xiàng)數(shù)據(jù),會(huì)同步地調(diào)用任何訂閱者的onNext()
方法,最后會(huì)調(diào)用訂閱者的onCompleted()
方法。
create(?)
創(chuàng)建一個(gè)Observable使用 create(?)
方法,你可以創(chuàng)建你自己的Observable,可以實(shí)現(xiàn)異步I/O,計(jì)算操作,甚至是無(wú)限的數(shù)據(jù)流。
/**
* 這個(gè)例子展示了一個(gè)自定義的Observable,當(dāng)有訂閱時(shí)他會(huì)阻塞當(dāng)前線程。
*/
def customObservableBlocking() {
return Observable.create { aSubscriber ->
50.times { i ->
if (!aSubscriber.unsubscribed) {
aSubscriber.onNext("value_${i}")
}
}
// after sending all values we complete the sequence
if (!aSubscriber.unsubscribed) {
aSubscriber.onCompleted()
}
}
}
// To see output:
customObservableBlocking().subscribe { println(it) }
The following example uses Groovy to create an Observable that emits 75 strings.
下面的例子使用Groovy
創(chuàng)建了一個(gè)發(fā)射75個(gè)字符串的Observable。
為了讓它更清楚,例子很詳細(xì),使用靜態(tài)類型和匿名內(nèi)部類Func1
:
/**
* This example shows a custom Observable that does not block
* when subscribed to as it spawns a separate thread.
*/
def customObservableNonBlocking() {
return Observable.create({ subscriber ->
Thread.start {
for (i in 0..<75) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext("value_${i}")
}
// after sending all values we complete the sequence
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
} as Observable.OnSubscribe)
}
// To see output:
customObservableNonBlocking().subscribe { println(it) }
這是一個(gè)用Clojure
寫的例子,使用Future(而不是直接用線程),實(shí)現(xiàn)很簡(jiǎn)潔:
(defn customObservableNonBlocking []
"This example shows a custom Observable that does not block
when subscribed to as it spawns a separate thread.
returns Observable<String>"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
; after sending all values we complete the sequence
(-> subscriber .onCompleted))
))
))
; To see output
(.subscribe (customObservableNonBlocking) #(println %))
這個(gè)例子從維基百科網(wǎng)站抓取文章,每抓取一篇會(huì)調(diào)用一次onNext
:
(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
"Fetch a list of Wikipedia articles asynchronously.
return Observable<String> of HTML"
(Observable/create
(fn [subscriber]
(let [f (future
(doseq [articleName wikipediaArticleNames]
(-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
; after sending response to onnext we complete the sequence
(-> subscriber .onCompleted))
))))
(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"])
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))
回到Groovy
,同樣是從維基百科抓取文章,這兒使用閉包代替匿名內(nèi)部類:
/*
* Fetch a list of Wikipedia articles asynchronously.
*/
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
return Observable.create { subscriber ->
Thread.start {
for (articleName in wikipediaArticleNames) {
if (subscriber.unsubscribed) {
return
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
}
if (!subscriber.unsubscribed) {
subscriber.onCompleted()
}
}
return subscriber
}
}
fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
.subscribe { println "--- Article ---\n${it.substring(0, 125)}" }
結(jié)果:
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
<!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...
Note that all of the above examples ignore error handling, for brevity. See below for examples that include error handling.
More information can be found on the [[Observable]] and [[Creating Observables|Creating-Observables]] pages.
注意:為了簡(jiǎn)潔,上面的所有例子都忽略了錯(cuò)誤處理,查看下面包含錯(cuò)誤處理的例子。
更多的信息可以在這里找到:Observable
和 Creating Observables
。
RxJava讓你可以鏈?zhǔn)绞褂?code>操作符用來(lái)轉(zhuǎn)換和組合多個(gè)Observables。
The following example, in Groovy, uses a previously defined, asynchronous Observable that emits 75 items, skips over the first 10 of these (skip(10)
), then takes the next 5 (take(5)
), and transforms them (map(...)
) before subscribing and printing the items:
下面是一個(gè)Groovy
的例子,使用之前的定義,它會(huì)異步發(fā)射75個(gè)字符串,跳過(guò)最開始的10個(gè)((skip(10)
),然后獲取接下來(lái)的5個(gè)(take(5)
),在訂閱之前使用map()
轉(zhuǎn)換它們,然后打印結(jié)果字符串。
/**
* Asynchronously calls 'customObservableNonBlocking' and defines
* a chain of operators to apply to the callback sequence.
*/
def simpleComposition() {
customObservableNonBlocking().skip(10).take(5)
.map({ stringValue -> return stringValue + "_xform"})
.subscribe({ println "onNext => " + it})
}
輸出結(jié)果
onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform
這里有一個(gè)圖例解釋了轉(zhuǎn)換過(guò)程:
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.1.png" width="640" height="536" />這一個(gè)例子使用Clojure
,使用了三個(gè)異步的Observable,其中一個(gè)依賴另一個(gè),使用zip
組合這三個(gè)發(fā)射的數(shù)據(jù)項(xiàng)為一個(gè)單個(gè)數(shù)據(jù)項(xiàng),最后使用map()
轉(zhuǎn)換這個(gè)結(jié)果:
(defn getVideoForUser [userId videoId]
"Get video metadata for a given userId
- video metadata
- video bookmark position
- user data
return Observable<Map>"
(let [user-observable (-> (getUser userId)
(.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
bookmark-observable (-> (getVideoBookmark userId videoId)
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
; getVideoMetadata requires :language from user-observable so nest inside map function
video-metadata-observable (-> user-observable
(.mapMany
; fetch metadata after a response from user-observable is received
(fn [user-map]
(getVideoMetadata videoId (:language user-map)))))]
; now combine 3 observables using zip
(-> (Observable/zip bookmark-observable video-metadata-observable user-observable
(fn [bookmark-map metadata-map user-map]
{:bookmark-map bookmark-map
:metadata-map metadata-map
:user-map user-map}))
; and transform into a single response object
(.map (fn [data]
{:video-id videoId
:video-metadata (:metadata-map data)
:user-id userId
:language (:language (:user-map data))
:bookmark (:viewed-position (:bookmark-map data))
})))))
輸出是這樣的:
{:video-id 78965,
:video-metadata {:video-id 78965, :title House of Cards: Episode 1,
:director David Fincher, :duration 3365},
:user-id 12345, :language es-us, :bookmark 0}
這里有一個(gè)圖例解釋了這個(gè)過(guò)程:
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/Composition.2.png" width="640" height="742" />The following example, in Groovy, comes from Ben Christensen’s QCon presentation on the evolution of the Netflix API. It combines two Observables with the merge
operator, then uses the reduce
operator to construct a single item out of the resulting sequence, then transforms that item with map
before emitting it:
下面的例子使用Groovy
,來(lái)自這里 Ben Christensen’s QCon presentation on the evolution of the Netflix API,它使用merge
操作結(jié)合兩個(gè)Observables,使用reduce
操作符從結(jié)果序列構(gòu)建一個(gè)單獨(dú)的結(jié)果數(shù)據(jù)項(xiàng),然后在發(fā)射之前,使用map()
變換那個(gè)結(jié)果。
public Observable getVideoSummary(APIVideo video) {
def seed = [id:video.id, title:video.getTitle()];
def bookmarkObservable = getBookmark(video);
def artworkObservable = getArtworkImageUrl(video);
return( Observable.merge(bookmarkObservable, artworkObservable)
.reduce(seed, { aggregate, current -> aggregate << current })
.map({ [(video.id.toString() : it] }))
}
這里也有一個(gè)圖例解釋reduce
從多個(gè)Observable的結(jié)果構(gòu)建一個(gè)單一結(jié)構(gòu)的過(guò)程:
這里是另一個(gè)版本的維基百科的例子,包含錯(cuò)誤處理代碼:
/*
* Fetch a list of Wikipedia articles asynchronously, with error handling.
*/
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
return Observable.create({ subscriber ->
Thread.start {
try {
for (articleName in wikipediaArticleNames) {
if (true == subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
}
if (false == subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch(Throwable t) {
if (false == subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
return (subscriber);
}
});
}
下面的例子使用Groovy
,注意錯(cuò)誤發(fā)生時(shí)現(xiàn)在是如何調(diào)用onError(Throwable t)
的,下面的代碼傳遞給subscribe()
第二個(gè)方法用戶處理onError
通知:
fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
.subscribe(
{ println "--- Article ---\n" + it.substring(0, 125) },
{ println "--- Error ---\n" + it.getMessage() })
查看 錯(cuò)誤處理操作符
這一夜了解更多RxJava中的錯(cuò)誤處理技術(shù),包括使用 onErrorResumeNext()
和onErrorReturn()
等方法,它們讓你可以從錯(cuò)誤中恢復(fù)。
這里是一個(gè)Groovy
的例子:
myModifiedObservable = myObservable.onErrorResumeNext({ t ->
Throwable myThrowable = myCustomizedThrowableCreator(t);
return (Observable.error(myThrowable));
});