使用一個(gè)函數(shù)從頭開(kāi)始創(chuàng)建一個(gè)Observable
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/create.c.png" alt="create" />
你可以使用Create
操作符從頭開(kāi)始創(chuàng)建一個(gè)Observable,給這個(gè)操作符傳遞一個(gè)接受觀察者作為參數(shù)的函數(shù),編寫(xiě)這個(gè)函數(shù)讓它的行為表現(xiàn)為一個(gè)Observable--恰當(dāng)?shù)恼{(diào)用觀察者的onNext,onError和onCompleted方法。
一個(gè)形式正確的有限Observable必須嘗試調(diào)用觀察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再調(diào)用觀察者的任何其它方法。
http://wiki.jikexueyuan.com/project/rx-docs/images/operators/create.png" alt="create" />
RxJava將這個(gè)操作符實(shí)現(xiàn)為 create
方法。
建議你在傳遞給create
方法的函數(shù)中檢查觀察者的isUnsubscribed
狀態(tài),以便在沒(méi)有觀察者的時(shí)候,讓你的Observable停止發(fā)射數(shù)據(jù)或者做昂貴的運(yùn)算。
示例代碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
輸出:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
create
方法默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
create(OnSubscribe)