LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

深入淺出JavaScript庫---RxJS

admin
2024年11月11日 19:36 本文熱度 108

前言

這個庫在angular中已經(jīng)集成了,所以使用起來有良好的代碼提示,但是在Vue中不行,一點提示都沒有,下面的代碼都在Vue項目中使用,以此分享自己在學(xué)習(xí)的體會:

一、初始RxJS

(1)安裝與導(dǎo)入

命令

npm install rxjs

按需導(dǎo)入:

import { Observer } from "rxjs";

(2)Observable的工作

說明:?Observable可以理解成被觀察者,Observer就是觀察者,連接兩者的橋梁就是Observable對象的函數(shù)subscribe,同時RxJS中的數(shù)據(jù)流就是Observable對象,它實現(xiàn)了觀察者模式和迭代器模式,這里聊聊前者。

=> 觀察者模式 <=

解決問題:?它需要解決在一個持續(xù)產(chǎn)生事件的系統(tǒng)中,如何分割功能,讓不同模塊只需要處理一部分邏輯。

解決方法:?將邏輯分為發(fā)布者和觀察者,發(fā)布者只管生產(chǎn)事件,之后將事件上注冊一個觀察者,至于事件如何被觀察者處理它不關(guān)心;同樣觀察者只管將接收到的事件處理掉,不關(guān)心它是如何產(chǎn)生的。

與RxJS的聯(lián)系:?Observable對象就是一個發(fā)布者,通過函數(shù)subscribe將其與觀察者Observer聯(lián)系起來。

import { of } from "rxjs";
// of操作符會返回一個observable對象,將傳入的內(nèi)容依次發(fā)射出來;
// 此時scoure$就是一個發(fā)布者,它產(chǎn)生的事件就是三個整數(shù)
const scoure$ = of(1, 2, 3);
// 這里console.log作為觀察者,將傳給它的內(nèi)容輸出出來,
// 不管數(shù)據(jù)是怎么產(chǎn)生的
scoure$.subscribe(console.log);
處理步驟:
產(chǎn)生事件:這是發(fā)布者的責(zé)任,也就是Observable對象的工作。?響應(yīng)事件:這是觀察者的責(zé)任,也就是由subscribe的參數(shù)決定。 發(fā)布者如何關(guān)聯(lián)觀察者:也就是何時調(diào)用subscribe。

=> 迭代器模式 <=

說明:?它提供一個通用的接口來遍歷數(shù)據(jù)集合的對象,并且讓使用者不用關(guān)心這個數(shù)據(jù)集合是如何實現(xiàn)的。從數(shù)據(jù)消費的角度,迭代器實現(xiàn)分為拉和推兩種,簡單理解就是拉取數(shù)據(jù)和推送數(shù)據(jù),RxJS屬于后者,它作為迭代器的使用者,并不需要主動去從Observable 中拉數(shù)據(jù),而是只要subscribe上Observable對象之后,然后就能夠收到消息的推送。

=> 創(chuàng)造Observable <=

執(zhí)行過程:?創(chuàng)建一個Observable,也就是創(chuàng)建一個發(fā)布者,這個發(fā)布者接收一個onSubscribe用于與觀察者產(chǎn)生聯(lián)系,當(dāng)發(fā)布者通過subscribe將其注冊給觀察者后,這個函數(shù)就會執(zhí)行,函數(shù)的參數(shù)就是觀察者對象,對這個對象的唯一要求就是需要存在next屬性,屬性的值是一個函數(shù),用來接收傳過來的數(shù)據(jù)

// 0.用于定義發(fā)布者
import { Observable } from "rxjs";
// 4.觸發(fā)后這個函數(shù)的參數(shù)是觀察者的一個包裝,
// ? 它與觀察者并不等價
const onSubscribe = (observer) => {
 ?observer.next(1);
 ?observer.next(2);
 ?observer.next(3);
};
// 1.這里創(chuàng)建一個發(fā)布者,它存在一個onSubscribe函數(shù)與
// ? 觀察者產(chǎn)生聯(lián)系
const source$ = new Observable(onSubscribe);
// 2.創(chuàng)建一個觀察者,有一個next屬性用于接收傳過來的值
const theObserver = {
 ?next: (item) => console.log(item),
};
// 3.通過subscribe函數(shù)將發(fā)布者和觀察者聯(lián)系起來,此時發(fā)
// ? 布者中的onSubscribe函數(shù)會被觸發(fā)
source$.subscribe(theObserver);


=> 延遲的Observable <=

舉例:?如何讓上面的例子中推送每個正整數(shù)之間有一定的時間間隔?

思考:?這個邏輯放在哪個部分更合適?

解釋:?按照分工,發(fā)布者產(chǎn)生數(shù)據(jù),觀察者處理數(shù)據(jù),這樣一來發(fā)布者控制推送數(shù)據(jù)的節(jié)奏也很合理。

const onSubscribe = (observer) => {
 ?let number = 1;
 ?const handle = setInterval(() => {
 ? ?observer.next(number++);
 ? ?if (number > 3) {
 ? ? ?clearInterval(handle);
 ? ?}
 ?}, 1000);
};

結(jié)論:?發(fā)布者推送數(shù)據(jù)可以有時間間隔,這樣使得異步操作十分容易,因為對于觀察者,只需要被動接受推送數(shù)據(jù)來處理,再不用關(guān)心數(shù)據(jù)何時產(chǎn)生。

=> 永無止境的Observable <=

說明:?其實發(fā)布者發(fā)射的數(shù)據(jù)可以是無窮的,每次發(fā)布者使用next發(fā)射出一個數(shù)據(jù),這個數(shù)據(jù)會被觀察者接收然后消化掉,所以不會存在數(shù)據(jù)堆積;如果發(fā)布者的next方法停止調(diào)用,只能表示發(fā)布者此時不會發(fā)射數(shù)據(jù)出去,但并不代表之后不會發(fā)射數(shù)據(jù);如果需要明確發(fā)布就不會再有新數(shù)據(jù)產(chǎn)生了,還需要多個Observable完結(jié)的方式。

const onSubscribe = (observer) => {
 ?let number = 1;
 ?const handle = setInterval(() => {
 ? ?observer.next(number++);
 ?}, 1000);
};

=> Observable的完結(jié) <=

說明:?觀察者的next方法只能表示現(xiàn)在推送的數(shù)據(jù)是什么,并不能表示后面沒有更多數(shù)據(jù)了,也就是沒辦法完全停止它推送數(shù)據(jù),但是在RxJS中,可以使用觀察者的complete方法來完成。

import { Observable } from "rxjs";
const onSubscribe = (observer) => {
 ?let number = 1;
 ?const handle = setInterval(() => {
 ? ?observer.next(number++);
 ? ?if (number > 3) {
 ? ? ?clearInterval(handle);
 ? ? ?// 使用函數(shù)完全停止數(shù)據(jù)的發(fā)送
 ? ? ?observer.complete();
 ? ?}
 ?}, 1000);
};
const source$ = new Observable(onSubscribe);
const theObserver = {
 ?next: (item) => console.log(item),
 ?// 定義函數(shù)來讓發(fā)布者完全停止數(shù)據(jù)的傳輸
 ?complete: () => console.log("No More Data"),
};
source$.subscribe(theObserver);


=> 錯誤的Observable <=

說明:?理想情況下,發(fā)布者只管生產(chǎn)數(shù)據(jù)給觀察者來消耗,但是,難免有時候發(fā)布者會遇到了異常情況,而且這種異常情況不是生產(chǎn)者所能夠處理并恢復(fù)正常的,發(fā)布者在這時候沒法再正常工作了,就需要通知對應(yīng)的觀察者發(fā)生了這個異常情況,如果只是簡單地調(diào)用 complete,觀察者只會知道沒有更多數(shù)據(jù),卻不知道沒有更多數(shù)據(jù)的原因是因為遭遇了異常,所以,我們還要在發(fā)布者和觀察者的交流渠道中增加一個新的函數(shù)error。

import { Observable } from "rxjs/Observable";
const onSubscribe = (observer) => {
 ?observer.next(1);
 ?// 此時發(fā)布者出現(xiàn)不能自己解決的錯誤,調(diào)用方法通知觀察者,
 ?// 此時發(fā)布者已經(jīng)進(jìn)入完結(jié)的狀態(tài),后面所調(diào)用的next和complete
 ?// 都會失效
 ?observer.error("Someting Wrong");
 ?observer.complete();
};
const source$ = new Observable(onSubscribe);
const theObserver = {
 ?next: (item) => console.log(item),
 ?// 用來處理錯誤信息
 ?error: (err) => console.log(err),
 ?complete: () => console.log("No More Data"),
};
source$.subscribe(theObserver);
在RxJS中,一個發(fā)布者對象只有一種終結(jié)狀態(tài),要么是complete,要么是error,一旦進(jìn)入出錯狀態(tài),這個發(fā)布者對象也就終結(jié)了,再不會調(diào)用對應(yīng)觀察者的next函數(shù),也不會再調(diào)用觀察者的complete函數(shù);同樣,如果一個發(fā)布者對象進(jìn)入了完結(jié)狀態(tài),也不能再調(diào)用觀察者的next和error。
此外,一個觀察者對象,里面可以存在next、error、complete三個方法,用于接受發(fā)布者的三種不同事件,如果不關(guān)心某種事件,可以不實現(xiàn)對應(yīng)的方法;比如對于一個永遠(yuǎn)不會結(jié)束的發(fā)布者, 真的沒有必要提供complete方法,因為它永遠(yuǎn)不會被調(diào)用到;但是對于錯誤,觀察者是無法察覺發(fā)布者會出現(xiàn)什么錯情況的,所以error方法還是需要。

(3)退訂Observable

說明:?有時候需要斷開發(fā)布者與觀察者之間的聯(lián)系,這個操作就叫做退訂,在發(fā)布者的onSubscribe函數(shù)執(zhí)行的時候,它可以返回一個對象,對象上可以有一個unsubscribe函數(shù),執(zhí)行這個函數(shù)來進(jìn)行退訂操作。

import { Observable } from "rxjs";
const onSubscribe = (observer) => {
 ?let number = 1;
 ?const handle = setInterval(() => {
 ? ?observer.next(number++);
 ?}, 1000);
 ?return {
 ? ?unsubscribe: () => {
 ? ? ?clearInterval(handle);
 ? ?},
 ?};
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe((item) => console.log(item));
setTimeout(() => {
 ?subscription.unsubscribe();
}, 3500);

注意:?退訂函數(shù)執(zhí)行后,表示觀察者不再接受發(fā)布者推送的數(shù)據(jù),但是發(fā)布者并沒有停止推送數(shù)據(jù),因為發(fā)布者并沒有到達(dá)終結(jié)狀態(tài),也就是沒有調(diào)用complete或者是error方法,此時只是發(fā)布者推送的數(shù)據(jù)觀察者不接收而已,看下面的例子:

import { Observable } from "rxjs";
const onSubscribe = (observer) => {
 ?let number = 1;
 ?const handle = setInterval(() => {
 ? ?// 將發(fā)布者發(fā)射的數(shù)據(jù)打印出來
 ? ?console.log("in onSubscribe ", number);
 ? ?observer.next(number++);
 ?}, 1000);
 ?return {
 ? ?unsubscribe: () => {
 ? ? ?// 這里不清除定時器,讓發(fā)布者繼續(xù)產(chǎn)生數(shù)據(jù)
 ? ? ?// clearInterval(handle);
 ? ?},
 ?};
};
const source$ = new Observable(onSubscribe);
// 每次觀察者執(zhí)行的時候打印出收到的數(shù)據(jù)
const subscription = source$.subscribe((item) => console.log(item));
setTimeout(() => {
 ?subscription.unsubscribe();
}, 3500);
發(fā)布者產(chǎn)生的事件,只有觀察者通過訂閱之后才會收到,在退訂之后就不會收到。

(4)了解兩種Observable

說明:?這里介紹的是Hot Observable和Cold Observable。

場景:?假設(shè)每個發(fā)布者對象有兩個觀察者對象來訂閱, 而且這兩個觀察者對象并不是同時訂閱,第一個觀察者對象訂閱N秒鐘之后,第二個觀察者對象才訂閱同一個發(fā)布者對象,而且,在這N秒鐘之內(nèi),發(fā)布者對象已經(jīng)吐出了一些數(shù)據(jù),此時對這吐出的數(shù)據(jù)有兩種處理:

Hot Observable:只需要接受從訂閱那一刻開始發(fā)布者產(chǎn)生的數(shù)據(jù)就行;有點類似在電視上面看節(jié)目,你所看到的內(nèi)容是節(jié)目當(dāng)前這一刻開始的,之前的節(jié)目你是看不見的,假如你的家人跟你一起看,那么你們看到的節(jié)目是一樣的,這就可以理解為獲取數(shù)據(jù)的數(shù)據(jù)源是相同的 Cold Observable:不能錯過,需要獲取發(fā)布者之前產(chǎn)生的數(shù)據(jù),也就是每次都需要獲取發(fā)布者完整的數(shù)據(jù),可以理解為每次得到的數(shù)據(jù)與之前的數(shù)據(jù)之間并不存在聯(lián)系,是相互獨立的,也就是每次會得到獨立的數(shù)據(jù)源,就像你在手機(jī)應(yīng)用市場下載游戲,跟你在同樣地方下載的游戲是一樣的。

理解:?那么就可以得到這樣的結(jié)果,如果Cold Observable沒有訂閱者,數(shù)據(jù)不會真正的產(chǎn)生,就像你如果不主動下載游戲,你手機(jī)上不可能玩到的;而對于Hot Observable在沒有訂閱者的時候,數(shù)據(jù)依然產(chǎn)生,只不過不傳入數(shù)據(jù)管道而已,就像電視機(jī)節(jié)目,節(jié)目一直存在與此,只是你沒切換到那個頻道觀看而已。

(5)操作符簡介

說明:?一個發(fā)布者對象就是一個數(shù)據(jù)流,在RxJS中數(shù)據(jù)流一般使用$開頭來命名;在一個復(fù)雜問題里面,數(shù)據(jù)流并不會直接交給觀察者來處理,在這途中會使用一系列內(nèi)置的函數(shù)來處理數(shù)據(jù),這些函數(shù)可以理解為操作符;就像一個管道,數(shù)據(jù)從管道的一段流入,途徑管道各個環(huán)節(jié),當(dāng)數(shù)據(jù)到達(dá)觀察者的時候,已經(jīng)被管道操作過,有的數(shù)據(jù)已經(jīng)被中途過濾拋棄掉了,有的數(shù)據(jù)已經(jīng)被改變了原來的形態(tài),而且最后的數(shù)據(jù)可能來自多個數(shù)據(jù)源,最后觀察者只需要處理能夠達(dá)到終點的數(shù)據(jù)。

說明:?在數(shù)據(jù)管道中流淌的數(shù)據(jù)就像是水,從上游流向下游。對一個操作符來說,上游可能是一個數(shù)據(jù)源,也可能是其他操作符,下游可能是最終的觀察者,也可能是另一個操作符,每一個操作符之間都是獨立的,正因為如此,所以可以對操作符進(jìn)行任意組合,從而產(chǎn)生各種功能的數(shù)據(jù)管道。


6)理解彈珠圖

作用:?RxJS中每一個發(fā)布者是一個數(shù)據(jù)流,簡單的數(shù)據(jù)流可以由大腦想象出來,但是復(fù)雜的可就不好像了,此時就可以使用彈珠圖來具體的方式來描述數(shù)據(jù)流,看兩張圖:

說明:?這個彈珠圖所表示的數(shù)據(jù)流,每間隔一段時間吐出一個遞增的正整數(shù),吐出到3的時候結(jié)束。因為每一個吐出來的數(shù)據(jù)都像是一個彈珠,所以這種表達(dá)方式叫做彈珠圖。在彈珠圖中,每個彈珠之間的間隔,代表的是吐出數(shù)據(jù)之間的時間間隔,通過這種形式,能夠很形象地看清楚每個發(fā)布者對象中數(shù)據(jù)的分布。 根據(jù)彈珠圖的傳統(tǒng),豎杠符號|代表的是數(shù)據(jù)流的完結(jié),對應(yīng)調(diào)用complete函數(shù),數(shù)據(jù)流吐出數(shù)據(jù)3之后立刻就完結(jié)了。 符號×代表數(shù)據(jù)流中的異常,對應(yīng)于調(diào)用下游的error函數(shù)。

注意:?為了描述操作符的功能,彈珠圖中往往會出現(xiàn)多條時間軸,因為各部分操作符的工作都是把上游的數(shù)據(jù)轉(zhuǎn)為傳給下游的數(shù)據(jù),在彈珠圖上必須把上下游的數(shù)據(jù)流都展現(xiàn)出來,此外,編寫彈珠圖可以去此處,后面如果存在彈珠圖的地方所使用的代碼復(fù)制到此處就可以看到了。

二、實現(xiàn)操作符

理解:?一個操作符是返回一個Observable對象的函數(shù),不過,有的操作符是根據(jù)其他Observable對象產(chǎn)生返回的Observable對象,有的操作符則是利用其他類型輸出產(chǎn)生返回的Observable對象,還有一些操作符不需要輸出就可以憑空創(chuàng)造一個Observable對象,這里以實現(xiàn)一個操作符來慢慢理解什么是操作符。

(1)實現(xiàn)操作符函數(shù)

說明:?每一個操作符是一個函數(shù),不管函數(shù)的功能是怎樣的,它需要包含以下功能點,這里實現(xiàn)map操作符為例

返回?個全新的Observable對象。 需要存在訂閱和退訂的操作。 處理異常情況。 及時釋放資源。

=> 返回Observable對象 <=

分析:?首先map操作符的功能是遍歷得到的數(shù)據(jù),通過傳入的參數(shù)函數(shù)來處理這些數(shù)據(jù),看下面的例子:

// 這里的函數(shù)參數(shù)將數(shù)據(jù)的每一個值都乘以2,
// 如果source$是?個 1、2、3的序列,
// 那么map返回的序列就是2、4、6,根據(jù)函數(shù)式編程的原則,
// map函數(shù)是不會修改原始的數(shù)據(jù)的,同時其返回值是?個全
// 新的Observable對象,這樣可以保持原始Observable對象的狀態(tài)
// 避免不可預(yù)料的行為
const result$ = source$.map(x => x * 2);

實現(xiàn):?根據(jù)上面的分析可以得到下面這個函數(shù)

// 這里的project就是傳遞給map操作符的函數(shù)參數(shù)
function map(project) {
 ?// map中利?new關(guān)鍵字創(chuàng)造了?個Observable對象,
 ?// 函數(shù)返回的結(jié)果就是這個對象,如此?來,
 ?// map可以?于鏈?zhǔn)秸{(diào)?,可以在后?調(diào)?其他的操作符,
 ?// 或者調(diào)?subscribe增加Observer。
 ?return new Observable((observer) => {
 ? ?// 假設(shè)此處this表示發(fā)布者對象,訂閱后數(shù)據(jù)就會交給觀察者了
 ? ?this.subscribe({
 ? ? ?next: (value) => observer.next(project(value)),
 ? ? ?error: (err) => observer.error(error),
 ? ? ?complete: () => observer.complete(),
 ? ?});
 ?});
}

=> 退訂處理 <=

說明:?作為一個通用的操作符,無法預(yù)料上游Observable是如何實現(xiàn)的,上游完全可能在被訂閱時分配了特殊資源,如果不明確地告訴上游這些資源再也用不著了的話,它也不會釋放這些資源,此時就會造成資源的泄露,所以下游退訂那些資源,就要告訴上游退訂那些資源。

function map(project) {
 ?return new Observable((observer) => {
 ? ?const sub = this.subscribe({
 ? ? ?next: (value) => observer.next(project(value)),
 ? ? ?error: (err) => observer.error(error),
 ? ? ?complete: () => observer.complete(),
 ? ?});
 ? ?return {
 ? ? ?// 根據(jù)前面的了解這里需要存在一個unsubscribe
 ? ? ?// 方法用于退訂
 ? ? ?unsubscribe: () => {
 ? ? ? ?sub.unsubscribe();
 ? ? ?},
 ? ?};
 ?});
}

=> 處理異常 <=

說明:?上面代碼中的參數(shù)project可以輸入的情況有很多,可能存在執(zhí)行的時候不合理的代碼,此時就會出現(xiàn)異常,此時需要捕獲異常錯誤,把異常錯誤沿著數(shù)據(jù)流往下游傳遞,最終如何處理交給觀察者來決定。

function map(project) {
 ?return new Observable((observer) => {
 ? ?const sub = this.subscribe({
 ? ? ?next: (value) => {
 ? ? ? ?try {
 ? ? ? ? ?observer.next(project(value));
 ? ? ? ?} catch (err) {
 ? ? ? ? ?observer.error(err);
 ? ? ? ?}
 ? ? ?},
 ? ? ?error: (err) => observer.error(error),
 ? ? ?complete: () => observer.complete(),
 ? ?});
 ? ?return {
 ? ? ?unsubscribe: () => {
 ? ? ? ?sub.unsubscribe();
 ? ? ?},
 ? ?};
 ?});
}

(2)關(guān)聯(lián)Observable

使用原型:?這個操作符在使用的時候需要一個Observable對象實例,因此這個操作符是一個實例操作符,此時使用打補丁的方式關(guān)聯(lián)發(fā)布者對象的格式為Observable.prototype.操作符 = 操作符函數(shù),既然有實例操作符,當(dāng)然也有靜態(tài)操作符,它不需要Observable實例就可以使用,它的打補丁的格式為Observable.操作符 = 操作符函數(shù),這個的弊端在于會影響每一個Observable對象。

Observable.prototype.map = map;

使用call和bind:?解決上面的問題,可以讓我們?定義的操作符只對指定的 Observable對象可?,這時就可以?bind,當(dāng)然也可以使用call

// 一般使用
const result$ = map.bind(Observable對象)(x => x * 2);
// 鏈?zhǔn)秸{(diào)用
const result$ = map.bind(
 ? ? ? ? ? ? ? ? ? ?map.bind(Observable對象)((x) => x * 2)
 ? ? ? ? ? ? ? ?)((x) => x + 1);
// 一般使用
onst result$ = map.call(Observable對象, x => x * 2);
// 鏈?zhǔn)秸{(diào)用
const result$ = map.call(
 ? ?map.call(Observable對象, (x) => x * 2),
 ? ?(x) => x * 2
);

3)改進(jìn)操作符

說明:?如果遵循函數(shù)式編程思想,需要使用純函數(shù),也就是函數(shù)執(zhí)行的結(jié)果完全由輸入的參數(shù)決定,但是上面定義的函數(shù)中存在this,這是一個不確定的因素,也就是這個函數(shù)不屬于純函數(shù)了,所以在此處需要改進(jìn)一下。

=> 缺陷 <=

說明:?在現(xiàn)代網(wǎng)頁開發(fā)的過程中,都會經(jīng)過打包才發(fā)布到產(chǎn)品環(huán)境,在打包的過程中會使用Tree-Shaking這個工具來去除代碼中沒有使用的代碼,比如那些引入的變量但是并沒有使用這種的;但是這個工具對于RxJS來說沒什么用,這是因為Tree Shaking只能做靜態(tài)代碼檢查,并不是在程序運行時去檢測這個函數(shù)是否被真的調(diào)用,只有這個函數(shù)在任何代碼中間都沒有引用過,才認(rèn)為這個函數(shù)不會被引用。然而,RxJS中任何一個操作符都是掛在 Observable類上或者Observable.prototype上的,賦值給Observable或者 Observable.prototype上某個屬性在Tree Shaking看來就是被引用,所以,所有的操作符,不管真實運用時是否被調(diào)用,都會被Tree Shaking認(rèn)為是會用到的代碼,也就不會當(dāng)做死代碼刪除;其次上面關(guān)聯(lián)Observable的方式是直接添加到其原型上面,由于全局存在一個Observable對象,就跟window對象一樣,像上面添加屬性和方法是不可取的,是會帶來隱患的。

=> 不"打補丁" <=

說明:?開發(fā)RxJS庫的規(guī)則的其中一條就是不能使用打補丁的方式使操作符函數(shù)與Observable對象關(guān)聯(lián)起來。如果是實例操作符,可以使用前面介紹過的bind/call,讓一個操作符函數(shù)只對一個具體的Observable對象生效;如果是靜態(tài)操作符,直接使用就好。

// 這里使用上面實現(xiàn)的map函數(shù)實現(xiàn)一個double操作符
import { Observable, of } from "rxjs";
function map(project) {
 ?return new Observable((observer) => {
 ? ?const sub = this.subscribe({
 ? ? ?next: (value) => {
 ? ? ? ?try {
 ? ? ? ? ?observer.next(project(value));
 ? ? ? ?} catch (err) {
 ? ? ? ? ?observer.error(err);
 ? ? ? ?}
 ? ? ?},
 ? ? ?error: (err) => observer.error(error),
 ? ? ?complete: () => observer.complete(),
 ? ?});
 ? ?return {
 ? ? ?unsubscribe: () => {
 ? ? ? ?sub.unsubscribe();
 ? ? ?},
 ? ?};
 ?});
}
Observable.prototype.double = function () {
 ? ?// 將當(dāng)前的Observable對象作為this值傳遞給map函數(shù)
 ? ?return map.call(this, (x) => x * 2);
};
// of操作符用于創(chuàng)建一個Observable對象
const source$ = of(1, 2, 3);
const result$ = source$.double().subscribe((res) => console.log(res));

(4)lettable/pipeable操作符

原因:?上面使用call/bind方法在函數(shù)體內(nèi)還是會使用this,函數(shù)還是不純,其次call的返回類型是無法確定的,在ts中只能使用any表示,因此會讓其失去類型檢查。

說明:?從RxJS v5.5.0開始,加上了這種更先進(jìn)的操作符定義和使用方式,稱為pipeable操作符,也曾經(jīng)被稱為lettable操作符,但是因為字面上太難理解,所以改成pipeable。

=> let操作符 <=

作用:?實際上就是把上游的Observable對象作為參數(shù)傳遞給let操作符里面的參數(shù)進(jìn)行處理,處理完之后將返回的Observable交給下游來訂閱。

// 下面的map函數(shù)就是上面寫的那個,這是以前的寫法,現(xiàn)在不支持,
import {Observable} from 'rxjs/Observable'; 
import 'rxjs/add/observable/of'; 
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/let'; 
const source$ = Observable.of(1, 2, 3); 
// 雖然map的使?是通過給Observable打補丁導(dǎo)?的,
// 但是map是直接作?于參數(shù)obs$,?不是作?于this,
// 所以,double$是?個純函數(shù)。
const double$ = obs$ => obs$.map(x => x * 2); 
// let的參數(shù)是?個函數(shù),在這個例?中函數(shù)參數(shù)名為double$,
// 這個函數(shù)名也以$為后綴,代表它返回的是?個Observable對象,
// double$同樣接受?個Observable對象作為輸?參數(shù),也就是說,
// double$的功能就是根據(jù)?個Observable對象產(chǎn)??個新的
// Observable對象。
const result$ = source$.let(double$);

過程:?let起到連接上游下游的作用,真正的工作完全由函數(shù)參數(shù)map來執(zhí)行。

處理之前的map函數(shù):?此時map的實現(xiàn)部分也看不到對this的訪問,而是用一個參數(shù)obs$代替了 this,這樣,在數(shù)據(jù)管道中上游的Observable是以參數(shù)形式傳遞,而不是靠 this來獲得,讓map徹底成了一個純函數(shù)。map執(zhí)行返回的結(jié)果是一個函數(shù),接受一個Observable對象返回一個 Observable 對象,正好滿足let的參數(shù)要求。

const map = (fn) => (obs$) =>
 ?new Observable((observer) =>
 ? ?obs$.subscribe({
 ? ? ?next: (value) => observer.next(fn(value)),
 ? ? ?error: (err) => observer.error(error),
 ? ? ?complete: () => observer.complete(),
 ? ?})
 ?);

好處:?由于每一個lettable操作符都是純函數(shù),而且也不會被作為補丁掛在Observable上,Tree Shaking就能夠找到根本不會被使用的操作符。

=> pipe操作符 <=

原因:?要導(dǎo)入let這個操作符,又不得不用傳統(tǒng)的打補丁或者使用call的方式,使用起來要導(dǎo)入很麻煩;所以創(chuàng)建了pipe操作符,它可以滿足let具備的功能。使用pipe只需像使用let那樣導(dǎo)入模塊,任何Observable對象都保持pipe,此外還有管道功能,可以把多個lettable操作符串接起來,形成數(shù)據(jù)管道。

import { map, filter, of } from "rxjs";
const source$ = of(1, 2, 3);
// 可以一次使用多個操作符
const result$ = source$.pipe(
 ?filter((x) => x % 2 === 0),
 ?map((x) => x * 2)
);
result$.subscribe(console.log);

三、創(chuàng)建數(shù)據(jù)流

(1)創(chuàng)建類操作符

說明:?這里所說的創(chuàng)造,是指這些操作符不依賴于其他Observable對象,這些操作符可以憑空或者根據(jù)其他數(shù)據(jù)源創(chuàng)造出?個Observable對象,其次創(chuàng)建類操作符往往不會從其他Observable對象獲取數(shù)據(jù),因為在數(shù)據(jù)管道中它自己就是數(shù)據(jù)流的源頭,基于這些特性大部分的創(chuàng)建類操作符都是靜態(tài)操作符。

(2)創(chuàng)建同步數(shù)據(jù)流

說明:?對于同步的Observable對象,需要關(guān)心的是存在哪些數(shù)據(jù)和數(shù)據(jù)之間的先后順序,由于數(shù)據(jù)之間的時間間隔不存在因此不需要考慮時間方面的問題。

=> of操作符 <=

作用:?可以輕松創(chuàng)建指定數(shù)據(jù)集合的Observable對象;

參數(shù):?of(數(shù)據(jù)1,數(shù)據(jù)2,數(shù)據(jù)3...);

注意:?of操作符所產(chǎn)生的Observable對象被訂閱后會將參數(shù)依次吐出來,吐完之后會調(diào)用complete方法;吐的這個過程是同步的,也就是所有數(shù)據(jù)之間是不存在間隔的。

const { of } = Rx;
of(1).pipe();
const { of } = Rx;
of(1, 2, 3).pipe();


值:?of產(chǎn)生的是Cold Observable,對于每一個Observer都會重復(fù)吐出同樣的一組數(shù)據(jù),所以可以反復(fù)使用。


=> range操作符 <=

作用:?對需要產(chǎn)生多個很長連續(xù)數(shù)字序列的場景,就是得上range這個操作符了,range的含義就是“范圍”,只需要指定一個范圍的開始值和長度,range 就能夠產(chǎn)生這個范圍內(nèi)的依次+1的數(shù)字序列;同樣數(shù)據(jù)吐完之后會調(diào)用complete方法。

參數(shù):?range(序列開始的任意數(shù)字,序列的長度)

const { range } = Rx;
range(1, 100).pipe();

局限性:?無法規(guī)定每次遞增的大小

=> generate操作符 <=

作用:?類似一個for循環(huán),設(shè)定一個初始值,每次遞增這個值,直到滿足某個條件的時候才中止循環(huán),同時,循環(huán)體內(nèi)可以根據(jù)當(dāng)前值產(chǎn)生數(shù)據(jù)。

參數(shù):?generate(初始值, 條件判斷函數(shù), 值如何增加函數(shù), 返回結(jié)果處理函數(shù))

// 假設(shè)存在這樣的for循環(huán):產(chǎn)??個?10?的所有偶數(shù)的平?
const result = [];
for (let i = 2; i < 10; i += 2) {
 ?result.push(i * i);
}
// 使用generate類似實現(xiàn)
const { generate } = Rx;
generate(
 ? ?2, // 初始值,相當(dāng)于for循環(huán)中的i=2
 ? ?value => value < 10, //繼續(xù)的條件,相當(dāng)于for中的條件判斷
 ? ?value => value + 2, //每次值的遞增
 ? ?value => value * value // 產(chǎn)?的結(jié)果
).pipe();

注意:?使用時需要保證后面三個函數(shù)參數(shù)為純函數(shù)

=> repeat操作符 <=

作用:?重復(fù)上游Observable中的數(shù)據(jù)n

參數(shù)1:?repeat(重復(fù)的次數(shù))

const { of } = Rx;
const { repeat } = RxOperators;
of(1, 2, 3).pipe(repeat(100))

參數(shù)2:?repeat({count: 重復(fù)的次數(shù), delay: 數(shù)據(jù)的時間間隔})

import { of, repeat } from "rxjs";
of(1, 2, 3)
 ?.pipe(
 ? ?repeat({
 ? ? ?count: 10,
 ? ? ?delay: 1000,
 ? ?})
 ?)
 ?.subscribe((res) => console.log(res));

注意:?保證上游Observable對象一定會完結(jié)。

=> EMPTY常量 <=

作用:?產(chǎn)生一個直接完結(jié)的Observable對象,沒有參數(shù),不產(chǎn)生任何數(shù)據(jù),直接完結(jié)。

const { EMPTY ?} = Rx;
EMPTY.pipe()


=> throwError操作符 <=

作用:?它所產(chǎn)生的Observable對象也是什么都不做,直接出錯,拋出的錯誤就是throw的參數(shù)

參數(shù):?throwError(錯誤程序)

const { throwError ?} = Rx;
throwError(new Error('這是一個錯誤')).pipe()


=> NEVER常量 <=

作用:?產(chǎn)生的Observable對象就真的是什么都不做,既不吐出數(shù)據(jù),也不完結(jié),也不產(chǎn)生錯誤,就這樣待著,一直到永遠(yuǎn)

const { NEVER ?} = Rx;
NEVER.pipe()


(3)創(chuàng)建異步數(shù)據(jù)流

說明:?就是創(chuàng)建異步的Observable對象,不光要考慮產(chǎn)生什么數(shù)據(jù),還需要考慮數(shù)據(jù)之間的時間間隔了

=> interval操作符 <=

作用:?定時從Observable對象吐出一個數(shù)據(jù),如果不主動結(jié)束就一直執(zhí)行

參數(shù):?interval(吐數(shù)據(jù)的間隔毫秒數(shù))

const { interval } = Rx;
interval(1000).pipe()
注意:
它所創(chuàng)建的數(shù)據(jù)流不會自動完結(jié),也就是不會調(diào)用complete方法,要想結(jié)束只能夠執(zhí)行退訂操作了 其次這個異步數(shù)據(jù)序列總是從0開始遞增的; 最后它與原生的setinterval的地位是等價的

=> timer操作符 <=

作用:?產(chǎn)生的Observable對象在指定毫秒之后會吐出一個數(shù)據(jù),執(zhí)行后立即結(jié)束

參數(shù):?timer(毫秒數(shù) / 一個Date對象, 時間間隔)

// 明確延時產(chǎn)?數(shù)據(jù)的時間間隔
const { timer } = Rx;
timer(1000).pipe()
// 明確的是?個時間點
const { timer } = Rx;
timer(
 ?new Date(
 ? ?new Date().getTime() + 1000
 ?)
).pipe()
const { timer } = Rx;
timer(1000,2000).pipe()
注意:
如果使用第二個參數(shù),產(chǎn)生的數(shù)據(jù)流跟interval類似,只不過產(chǎn)生第一個數(shù)據(jù)的時間間隔由第一個參數(shù)決定,后面產(chǎn)生數(shù)據(jù)的時間間隔由第二個參數(shù)決定;如果兩個參數(shù)一致,那就是interval了

=> from操作符 <=

作用:?以把任何可迭代對象都轉(zhuǎn)化為Observable對象

參數(shù):?from(任何可迭代對象)

const { from } = Rx;
from([1,2,3]).pipe()
const { from } = Rx;
from('abc').pipe()

注意:?在from的眼中,把輸出參數(shù)都當(dāng)做一個Iterable來看待,所以上面的字符串a(chǎn)bc在from看來就和數(shù)組['a','b','c']沒有區(qū)別

import { from } from "rxjs";
const promise = Promise.resolve("good");
const source$ = from(promise);
source$.subscribe(
 ?console.log,
 ?(error) => console.log("catch", error),
 ?() => console.log("complete")
);
import { from } from "rxjs";
const promise = Promise.reject("error");
const source$ = from(promise);
source$.subscribe(
 ?console.log,
 ?(error) => console.log("catch", error),
 ?() => console.log("complete")
);

解釋:?如果from的參數(shù)是promise,當(dāng)promsie成功結(jié)束,from產(chǎn)生的Observable對象就會吐出Promise成功的結(jié)果,并且立刻結(jié)束,如果以失敗而告終的時候,from產(chǎn)生的Observable對象也會立刻產(chǎn)生失敗事件

=> fromEvent操作符 <=

作用1:?在網(wǎng)頁開發(fā)中,可以把DOM中的事件轉(zhuǎn)化為Observable對象中的數(shù)據(jù)

參數(shù)1:?fromEvent(事件源, 事件名稱)

// 希望點擊id為clickMe的按鈕時,id為text的div中的數(shù)字會增加1, 
// 連續(xù)點擊那個按鈕,對應(yīng)數(shù)字會持續(xù)增加
<template>
 ?<div>
 ? ?<button id="clickMe">Click Me</button>
 ? ?<div id="text">0</div>
 ?</div>
</template>
<script setup>
import { fromEvent } from "rxjs";
import { onMounted } from "vue";
let clickCount = 0;
onMounted(() => {
 ?const event$ = fromEvent(
 ? ? ?document.querySelector("#clickMe"), 
 ? ? ?"click"
 ?);
 ?event$.subscribe(() => {
 ? ?document
 ? ? ? ?.querySelector("#text")
 ? ? ? ?.innerText = ++clickCount;
 ?});
});
</script>
<style></style>

說明:?網(wǎng)頁開發(fā)中事件源一般是DOM節(jié)點

// 這里展示從Node.js的events中獲得數(shù)據(jù)
import { fromEvent } from "rxjs";
// 這個模塊需要使用 npm i events 安裝一下
import EventEmitter from "events";
const emitter = new EventEmitter();
// 只接受數(shù)據(jù)源中事件為"msg"的數(shù)據(jù)
const source$ = fromEvent(emitter, "msg");
source$.subscribe(
 ?console.log,
 ?(error) => console.log("catch", error),
 ?() => console.log("complete")
);
// emitter的emit函數(shù)發(fā)送任何名稱的事件,
// 第?個參數(shù)就是事件名稱,第?個參數(shù)是數(shù)據(jù)
emitter.emit("msg", 1);
emitter.emit("msg", 2);
emitter.emit("another-msg", "oops");
emitter.emit("msg", 3);

注意:?fromEvent產(chǎn)生的是Hot Observable,也就是數(shù)據(jù)的產(chǎn)生和訂閱是無關(guān)的,如果在訂閱之前調(diào)用emitter.emit,那有沒有Observer這些數(shù)據(jù)都會立刻吐出來,等不到訂閱的時候,當(dāng)添加了Observer的時候,仍然什么數(shù)據(jù)都獲得不到。

import { fromEvent } from "rxjs";
import EventEmitter from "events";
const emitter = new EventEmitter();
const source$ = fromEvent(emitter, "msg");
// 在訂閱之前發(fā)射數(shù)據(jù)
emitter.emit("msg", 1);
emitter.emit("msg", 2);
emitter.emit("another-msg", "oops");
emitter.emit("msg", 3);
source$.subscribe(
 ?console.log,
 ?(error) => console.log("catch", error),
 ?() => console.log("complete")
);


=> fromEventPattern操作符 <=

作用:?用于處理的Observable對象被訂閱退訂時的動作

參數(shù):?fromEventPattern(被訂閱時觸發(fā)的函數(shù), 被退訂時觸發(fā)的函數(shù))

import { fromEventPattern } from "rxjs";
import EventEmitter from "events";
const emitter = new EventEmitter();
// handler參數(shù)可以理解為觀察者對象中的next方法
const addHandler = (handler) => {
 ?// 監(jiān)聽事件源中的msg事件,每次觸發(fā)事件執(zhí)行next方法
 ?emitter.addListener("msg", handler);
};
const removeHandler = (handler) => {
 ?// 與上面相反,會移除msg事件上面的next方法
 ?emitter.removeListener("msg", handler);
};
const source$ = fromEventPattern(addHandler, removeHandler);
const subscription = source$.subscribe(
 ?console.log,
 ?(error) => console.log("catch", error),
 ?() => console.log("complete")
);
emitter.emit("msg", "hello");
emitter.emit("msg", "world");
// 取消訂閱后emitter上面監(jiān)聽的事件被取消掉,
// 所以此處的值并不會出現(xiàn)在Observable對象里面
subscription.unsubscribe();
emitter.emit("msg", "end");

說明:?它提供的就是一種模式,不管數(shù)據(jù)源是怎樣的行為,最后的產(chǎn)出都是一個Observable對象

=> ajax操作符 <=

作用:?用于發(fā)送請求并根據(jù)結(jié)果返回Observable對象

參數(shù):?ajax('請求的地址')

// 根據(jù)github上的api獲取RxJS項?獲得的Start的數(shù)量
<template>
 ?<div>
 ? ?<button id="getStar">Get RxJS Star Count</button>
 ? ?<div id="text"></div>
 ?</div>
</template>
<script setup>
import { onMounted } from "vue";
import { fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
onMounted(() => {
 ?fromEvent(
 ? ? ?document.querySelector("#getStar"),
 ? ? ?"click"
 ?).subscribe(
 ? ? ?() => {
 ? ? ? ? ?ajax("https://api.github.com/repos/ReactiveX/rxjs")
 ? ? ? ? ?.subscribe(
 ? ? ? ? ? ? ?(value) => {
 ? ? ? ? ? ? ? ? ?const starCount = 
 ? ? ? ? ? ? ? ? ? ? ?value.response.stargazers_count;
 ? ? ? ? ? ? ? ? ? ? ?
 ? ? ? ? ? ? ? ? ?document.querySelector("#text").innerText = 
 ? ? ? ? ? ? ? ? ? ? ?starCount;
 ? ? ? ? ? });
 ?});
});
</script>


=> defer操作符 <=

作用:?用于延遲執(zhí)行某些操作

參數(shù):?defer(一個函數(shù),這個函數(shù)會在被訂閱時調(diào)用)

// 延遲發(fā)送請求
import { defer } from "rxjs";
import { ajax } from "rxjs/ajax";
defer(
 ? ?() => ajax("https://api.github.com/repos/ReactiveX/rxjs")
 ? ? ? ? ? ?.subscribe(
 ? ? ? ? ? ? ? ?(res) => console.log(res)
 ? ? ? ? ? ?)
);


四、合并數(shù)據(jù)流

(1)合并類操作符

說明:?其作用在于將有多個Observable對象作為數(shù)據(jù)來源,把不同來源的數(shù)據(jù)根據(jù)不同的規(guī)則合并到一個Observable對象中。

=> concat操作符 <=

作用:?把多個Observable中的數(shù)據(jù)內(nèi)容依次合并,合并的時候原有數(shù)據(jù)不變

參數(shù):?concat(數(shù)據(jù)1, 數(shù)據(jù)2, 數(shù)據(jù)3...)

import { concat, of } from "rxjs";
const source1$ = of(1, 2, 3);
const source2$ = of(4, 5, 6);
concat(source1$, source2$)
 ? ?.subscribe(
 ? ? ? ?(res) => console.log(res)
 ? ?);

注意:?它在工作的時候,會先從第一個Observable對象中獲取數(shù)據(jù),等它complete之后,再從下一個對象中去數(shù)據(jù),直到取完所有的,此時,如果其中有一個對象是不完結(jié)的狀態(tài),那么它之后的Observable對象就不會有被取到的機(jī)會。

=> merge操作符 <=

作用:?一定性訂閱上游所有的Observable對象,只要有數(shù)據(jù)傳遞下來,這個數(shù)據(jù)就會被傳遞給下游,也就是數(shù)據(jù)采取先到先出的原則,同時合并的時候原有數(shù)據(jù)不變

參數(shù):?merge(數(shù)據(jù)1, 數(shù)據(jù)2, 數(shù)據(jù)3, ... 可選數(shù)字參數(shù))

場景一:合并異步數(shù)據(jù)流
import { merge, of, repeat, pipe } from "rxjs";
// 隔700ms重復(fù)一個A,重復(fù)的次數(shù)為5次
const source1$ = of("A").pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?repeat(
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 700 }
 ? ? ? ? ? ? ? ? ? ? ? ?));
// 隔800ms重復(fù)一個B,重復(fù)的次數(shù)為5次
const source2$ = of("B").pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?repeat(
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 800 }
 ? ? ? ? ? ? ? ? ? ? ? ?));
const merged$ = merge(source1$, source2$);
merged$.subscribe((res) => console.log(res));
場景二: 同步限流

解釋:?此時需要用到最后的參數(shù),參數(shù)是一個數(shù)字,表示可以同時合并的個數(shù)

import { merge, of, repeat, pipe } from "rxjs";
// 隔700ms重復(fù)一個A,重復(fù)的次數(shù)為5次
const source1$ = of("A").pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?repeat(
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 700 }
 ? ? ? ? ? ? ? ? ? ? ? ?));
// 隔800ms重復(fù)一個B,重復(fù)的次數(shù)為5次
const source2$ = of("B").pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?repeat(
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 800 }
 ? ? ? ? ? ? ? ? ? ? ? ?));
 ? ? ? ? ? ? ? ? ? ? ? ?
// 隔900ms重復(fù)一個C,重復(fù)的次數(shù)為5次
const source3$ = of("C").pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?repeat(
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?{ count: 5, delay: 900 }
 ? ? ? ? ? ? ? ? ? ? ? ?));
 ? ? ? ? ? ? ? ? ? ? ? ?
// 限定合并的個數(shù)為2
const merged$ = merge(source1$, source2$, source3$, 2);
merged$.subscribe((res) => console.log(res));
場景三:合并多個事件

說明:?一個元素存在click事件和touch事件,對應(yīng)網(wǎng)頁和移動設(shè)備,假如其事件處理程序的邏輯一致,此時就可以分別使用fromEvent獲取單個事件流,之后用merge合并成一個數(shù)據(jù)流,就可以集中管理了

// 可以像這樣處理
const click$ = fromEvent(element, 'click'); 
const touchend$ = fromEvent(element, 'touchend'); 
merge(click$, touchend$).subscribe(處理函數(shù));

=> zip操作符 <=

作用:?將數(shù)據(jù)流里面的數(shù)據(jù)一一對應(yīng)并使用數(shù)組組合起來

參數(shù):?zip(數(shù)據(jù)流1, 數(shù)據(jù)流2, 數(shù)據(jù)流3...)

場景一: 一對一合并
import { interval, of, zip } from "rxjs";
// 一個異步數(shù)據(jù)流,產(chǎn)生的數(shù)據(jù)是無限的
const source1$ = interval(1000);
// 一個同步數(shù)據(jù)流,產(chǎn)生的數(shù)據(jù)流是有限
const source2$ = of("a", "b", "c");
// 將兩個數(shù)據(jù)流合并
zip(source1$, source2$)
 ? ?.subscribe(
 ? ? ? ?(res) => console.log(res),
 ? ? ? ?null,
 ? ? ? ?() => console.log('complete')
 ? ?);

注意:?這里數(shù)據(jù)的匹配是一一對應(yīng)的,所以數(shù)據(jù)少的那個Observable決定zip產(chǎn)生數(shù)據(jù)的個數(shù);然后在對應(yīng)的時候需要雙方都有數(shù)據(jù)才能夠?qū)?yīng),這也是為什么上面的打印會隔1s才打印。

問題: 數(shù)據(jù)積壓

說明:?如果某個上游source1$吐出數(shù)據(jù)的速度很快,而另一個上游source2$吐出數(shù)據(jù)的速度很慢,那zip就不得不先存儲source1$吐出的數(shù)據(jù),因為RxJS的工作方式是“推”, Observable把數(shù)據(jù)推給下游之后就沒有責(zé)任保存數(shù)據(jù)了。被source1$推送了數(shù)據(jù)之后,zip就有責(zé)任保存這些數(shù)據(jù),等著和source2$未來吐出的數(shù)據(jù)配對。假如source2$遲遲不吐出數(shù)據(jù),那么zip就會一直保存source1$沒有配對的數(shù)據(jù),然而這時候source1$可能會持續(xù)地產(chǎn)生數(shù)據(jù),最后zip積壓的數(shù)據(jù)就會越來越多,占用的內(nèi)存也就越來越多。對于數(shù)據(jù)量比較小的Observable對象,這樣的數(shù)據(jù)積壓還可以忍受,但是對于超大量的數(shù)據(jù)流,使用zip就不得不考慮潛在的內(nèi)存壓力問題。

=> combineLatest操作符 <=

作用:?合并上游所有Observable一個最新的數(shù)據(jù),也就是它返回值是一個數(shù)組

參數(shù):?combineLatest([數(shù)據(jù)1, 數(shù)據(jù)2, 數(shù)據(jù)3 ...], 處理函數(shù))

場景一: 基本使用
import { combineLatest, timer } from "rxjs";
// 隔1s產(chǎn)生一個數(shù)字
const firstTimer = timer(1000, 1000);
// 隔2s產(chǎn)生一個數(shù)字
const secondTimer = timer(1000, 2000);
// 合并上面的數(shù)據(jù)流
const combinedTimers = combineLatest([firstTimer, secondTimer]);
combinedTimers.subscribe((value) => console.log(value));

注意:?首先還是一一對應(yīng)的關(guān)系,也就是如果一個數(shù)據(jù)源還沒發(fā)射值出來,那么會等待它將值發(fā)射出來,如果值沒有改變并且操作沒有完結(jié)的話,發(fā)射的值將一直是這一個,所以只有所有的Observable對象完結(jié),combineLatest才會給下游一個complete信號,表示不會有任何數(shù)據(jù)更新了

場景二: 合并同步數(shù)據(jù)流
const firstTimer = of("a", "b", "c");
const secondTimer = of(1, 2, 3);
const combinedTimers = combineLatest([firstTimer, secondTimer]);
combinedTimers.subscribe((value) => console.log(value));

工作方式:?combineLatest在工作的時候,會按照順序依次訂閱所有上游的Observable對象,只有所有上游Observable對象都已經(jīng)吐出數(shù)據(jù)了,才會給下游傳遞所有上游“最新數(shù)據(jù)”組合的數(shù)據(jù)

解釋:?由于of產(chǎn)生的同步數(shù)據(jù)流,在被訂閱時就會吐出所有數(shù)據(jù),最后一個吐出的數(shù)據(jù)是字符串c,這也就是最新的數(shù)據(jù),然后訂閱下一個對象,下一個對象會依次吐出數(shù)據(jù),然后跟上一個對象的最新數(shù)據(jù)c結(jié)合,這就得到了上面看到的內(nèi)容

場景三:定制下游數(shù)據(jù)

說明:?這里就需要啊使用處理函數(shù)了,這個函數(shù)的參數(shù)就是每一個數(shù)據(jù)源的最新值,其返回值就是下游所接受到的數(shù)據(jù),如果沒有返回值,則下游收到的數(shù)據(jù)為undefined

import { combineLatest, timer, of } from "rxjs";
const firstTimer = of("a", "b", "c");
const secondTimer = of(1, 2, 3);
const combinedTimers = combineLatest(
 ?[firstTimer, secondTimer],
 ?(res1, res2, res3) => {
 ? ?// 上面只有兩個數(shù)據(jù)源,所以參數(shù)只會前兩個有值
 ? ?console.log(res1, res2, res3);
 ?}
);
combinedTimers.subscribe();


=> withLatestFrom操作符 <=

說明:?這個的作用于combineLatest是類似的,只不過下游推送數(shù)據(jù)只能由一個上游Observable對象驅(qū)動,也就是調(diào)用withLatestFrom的那個Observable對象起到主導(dǎo)數(shù)據(jù)產(chǎn)生節(jié)奏的作用,作為參數(shù)的Observable對象只能貢獻(xiàn)數(shù)據(jù),不能控制產(chǎn)生數(shù)據(jù)的時機(jī)

參數(shù):?數(shù)據(jù)源1.withLatestFrom(數(shù)據(jù)源2)

import { withLatestFrom, timer, pipe, map } from "rxjs";
// 每隔兩秒產(chǎn)生100、200、300這樣的數(shù)字
const source1$ = timer(0, 2000)
 ? ? ? ? ? ? ? ? ? ?.pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?map((x) => 100 * x)
 ? ? ? ? ? ? ? ? ? ?);
// 每隔一秒產(chǎn)生0、1、2這樣的數(shù)字
const source2$ = timer(500, 1000);
// 后面的處理函數(shù)將它們想加起來
const result$ = source1$
 ? ? ? ? ? ? ? ? ? ?.pipe(
 ? ? ? ? ? ? ? ? ? ? ? ?withLatestFrom(
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?source2$, 
 ? ? ? ? ? ? ? ? ? ? ? ? ? ?(a, b) => a + b
 ? ? ? ? ? ? ? ? ? ? ? ?)
 ? ? ? ? ? ? ? ? ? ?);
result$.subscribe(console.log);


=> race操作符 <=

作用:?以O(shè)bservable產(chǎn)生第一個數(shù)據(jù)的時間為準(zhǔn),只留下最快的那一個,當(dāng)然,使用的所有數(shù)據(jù)也是最快的那一個

參數(shù):?race(數(shù)據(jù)源1, 數(shù)據(jù)源2, 數(shù)據(jù)源3 ...)

import { timer, race, pipe, map } from "rxjs";
// 立即開始產(chǎn)生數(shù)據(jù)a
const source1$ = timer(0, 2000).pipe(map(() => "a"));
// 500ms后開始產(chǎn)生數(shù)據(jù)b
const source2$ = timer(500, 1000).pipe(map(() => "b"));
// 比賽
const winner$ = race(source1$, source2$);
winner$.subscribe(console.log);

=> startWith操作符 <=

作用:?在讓?個Observable對象在被訂閱的時候,總是先同步吐出指定的若?個數(shù)據(jù)

參數(shù):?數(shù)據(jù)源.startWith(參數(shù)1, 參數(shù)2, 參數(shù)3 ...)

import { of, startWith } from "rxjs";
of(1000)
 ?.pipe(startWith("timer start", 1, 2))
 ?.subscribe((x) => console.log(x));

=> forkJoin操作符 <=

作用:?等待所有參數(shù)Observable對象的最后?個數(shù)據(jù),將其合并成一個數(shù)組發(fā)射出去

參數(shù):?forkJoin(對象 / 數(shù)組)

import { forkJoin, of, timer } from "rxjs";
// 下面會在四秒后返回結(jié)果
forkJoin({
 ?foo: of(1, 2, 3, 4),
 ?bar: Promise.resolve(8),
 ?baz: timer(4000),
}).subscribe((res) => console.log(res));

(2)高階Observable

說明:?簡單理解就是一個Observable中存在Observable,它有一個特點就是高階Observable完結(jié)不代表其里面的Observable完結(jié)

=> concatAll操作符 <=

說明:?這個操作符針對高階Observable,也是依次訂閱Observable內(nèi)部的Observable取值結(jié)合,訂閱的過程中如果上一個Observable沒有完結(jié)就不會訂閱下一個Observable對象。其他操作可以參照concat

參數(shù):?沒有參數(shù)

import { of, concatAll } from "rxjs";
const source = of(
 ? ? ? ? ? ? ? ? ? ?of(1, 2, 3),
 ? ? ? ? ? ? ? ? ? ?of(4, 5, 6),
 ? ? ? ? ? ? ? ? ? ?of(7, 8, 9)
 ? ? ? ? ? ? ? ?);
source.pipe(concatAll())
 ? ? ? ?.subscribe(
 ? ? ? ? ? ?(val) => console.log(val)
 ? ? ? ?);

=> mergeAll操作符 <=

說明:?針對高階Observable,在合并的時候,依次訂閱其內(nèi)部的Observable對象,那個對象有數(shù)據(jù)傳下來,這個數(shù)據(jù)就會傳遞給下游;它可以傳遞一個數(shù)字來限定合并的最大流的個數(shù)。其他操作可以參照merge

參數(shù):?mergeAll(數(shù)字)

import { of, mergeAll, repeat } from "rxjs";
// 這里A延遲復(fù)制的時間比B長,所以第二次打印的時候B在前面
const source = of(
 ?of("A").pipe(
 ? ? ? ? ? ? ?repeat({ count: 5, delay: 800 })
 ? ? ? ? ?),
 ?of("B").pipe(
 ? ? ? ? ? ? ?repeat({ count: 5, delay: 700 })
 ? ? ? ? ?)
);
source.pipe(mergeAll())
 ? ? ? ? ? ?.subscribe(
 ? ? ? ? ? ? ? ?(val) => console.log(val)
 ? ? ? ? ? ?);

=> zipAll操作符 <=

說明:?對高階Observable使用的時候,將數(shù)據(jù)流里面的數(shù)據(jù)一一對應(yīng)并使用數(shù)組組合起來。其它操作可以參考zip

參數(shù):?zipAll(處理函數(shù))

import { of, zipAll } from "rxjs";
const source = of(of("A", "B", "C"), of(1, 2, 3), of("X", "Y", "Z"));
source
 ?.pipe(
 ? ?// 可以接收一個處理函數(shù),每個參數(shù)對應(yīng)返回值的每一項
 ? ?zipAll((a, b, c) => {
 ? ? ?// 這里將參數(shù)打印出來,如果沒有返回值則下游將收不到值
 ? ? ?console.log(a, b, c);
 ? ?})
 ?)
 ?.subscribe();

=> combineLatestAll操作符 <=

說明:?在處理高階Observable的時候,將其內(nèi)部Observable產(chǎn)生的最新數(shù)據(jù)一一對應(yīng)并用數(shù)組的形式返回出來。其它操作可以參考combineLatest

參數(shù):?combineLatestAll(處理函數(shù))

import { of, combineLatestAll } from "rxjs";
const source = of(of("A", "B", "C"), of(1, 2, 3), of("X", "Y", "Z"));
source
 ?.pipe(
 ? ?// 可以接收一個處理函數(shù),每個參數(shù)對應(yīng)返回值的每一項
 ? ?combineLatestAll((a, b, c) => {
 ? ? ?// 這里將參數(shù)打印出來,如果沒有返回值則下游將收不到值
 ? ? ?console.log(a, b, c);
 ? ?})
 ?)
 ?.subscribe();

五、輔助類操作符

(1)數(shù)學(xué)類操作符

說明:?這里介紹的操作符會遍歷上游Observable對象中吐出的所有數(shù)據(jù)才給下游傳遞數(shù)據(jù), 也就是說,它們只有在上游完結(jié)的時候,才給下游傳遞唯?數(shù)據(jù)。

=> count操作符 <=

作用:?用于統(tǒng)計上游Observable對象吐出的所有數(shù)據(jù)個數(shù),所以上游的Observable需要完結(jié)

參數(shù):?count(過濾函數(shù))

import { of, interval, count } from "rxjs";
// 可以完結(jié)
of(1000, 1)
 ?.pipe(
 ? ? ?// 此時過濾出數(shù)據(jù)為1的數(shù)量
 ? ? ?count(
 ? ? ? ? ?(val) => val === 1
 ? ? ?)
 ?)
 ?.subscribe((res) => console.log(res));
// 無法完結(jié)
interval(1000)
 ?.pipe(count())
 ?.subscribe((res) => console.log(res));

=> max和min操作符 <=

作用:?找出上游數(shù)據(jù)中的最大值最小值

參數(shù):?max(比較函數(shù)) / min(比較函數(shù))

import { of, max } from "rxjs";
of(
 ? ?{ age: 7, name: "Foo" },
 ? ?{ age: 5, name: "Bar" },
 ? ?{ age: 9, name: "Beer" }
).pipe(
 ? ?// 返回值為正 => a > b
 ? ?// 返回值為0 => a = b
 ? ?// 返回值為負(fù) => a < b
 ? ?max((a, b) => a.age > b.age)
 ?)
 .subscribe(
 ? ? (x) => console.log(x.name)
 );

注意:?如果Observable吐出的數(shù)據(jù)類型是復(fù)雜數(shù)據(jù)類型,?如?個對象,那必須指定?個?較這種復(fù)雜類型??的?法,就像上面使用的那樣

=> reduce操作符 <=

說明:?對上游的每個數(shù)據(jù)進(jìn)行自定義計算,也就是對每一個元素都會調(diào)用一次這個函數(shù)

參數(shù):?reduce((累加的值, 當(dāng)前元素的值) => {}, 初始值)

// 計算 1-100 的和
import { range, reduce } from "rxjs";
range(1, 100)
 ?.pipe(
 ? ? ?reduce(
 ? ? ? ? ?(acc, current) => acc + current,
 ? ? ? ? ?0
 ? ? ?)
 ?)
 ?.subscribe(
 ? ? ?(res) => console.log(res)
 ?);

2)條件操作符

說明:?根據(jù)上游Observable對象的某些條件產(chǎn)生一個新的 Observable對象

=> every操作符 <=

作用:?它接受一個判定函數(shù)作為參數(shù),如果上游所有數(shù)據(jù)都能夠通過這個函數(shù),那么會返回一個包含true值的Observable,有一個通不過就返回一個包含false值的Observable,在吐出結(jié)果后every產(chǎn)生的Observable會立即完結(jié);不要對不會完結(jié)的對象使用

參數(shù):?every(判定函數(shù))

import { every, of } from "rxjs";
of(1, 100)
 ?.pipe(
 ? ? ?every(
 ? ? ? ? ?// 這里判定是否所有值都大于10,顯然1不行
 ? ? ? ? ?(val) => val > 10
 ? ? ?)
 ?)
 ?.subscribe((res) => console.log(res));

=> find和findIndex操作符 <=

作用:?通過一個處理函數(shù)來在上游數(shù)據(jù)中查找滿足條件的數(shù)據(jù),find會吐出找到的上游的數(shù)據(jù),findIndex會吐出滿足判定條件的索引,如果找不到find會吐出undefined后完結(jié),findIndex則會吐出-1后完結(jié);不要對不會完結(jié)的對象使用

參數(shù):?find(處理函數(shù)) / findIndex(處理函數(shù))

import { find, findIndex, of } from "rxjs";
of(1, 100)
 ?.pipe(
 ? ? ?find(
 ? ? ? ? ?(val) => (val = 100)
 ? ? ?)
 ?)
 ?.subscribe((res) => console.log(res));
 ?
of(1, 100)
 ?.pipe(
 ? ? ?findIndex(
 ? ? ? ? ?(val) => (val = 100)
 ? ? ?)
 ?)
 ?.subscribe((res) => console.log(res));

=> isEmpty操作符 <=

作用:?檢測上游Observable對象是不是空的,如果在完結(jié)之前沒有吐出數(shù)據(jù),它就是空的,此時返回一個包含true值的Observable,否則返回一個包含false值的Observable

import { EMPTY, isEmpty, of } from "rxjs";
// 不是空的
of(1)
 ?.pipe(isEmpty())
 ?.subscribe((res) => console.log(res));
 ?
// 是空的
EMPTY.pipe(isEmpty())
 ?.subscribe((res) => console.log(res));

=> defaultIfEmpty操作符 <=

作用:?接受一個默認(rèn)值,如果檢測上游的Observable是空的,則把這個默認(rèn)值傳遞給下游,如果不是空的就把上游的東西傳遞給下游;如果不傳但是上游檢測還是空的,下游就會收到一個undefined

import { defaultIfEmpty, EMPTY, of } from "rxjs";
// 不是空值,不傳參數(shù)
of(1)
 ?.pipe(defaultIfEmpty())
 ?.subscribe((res) => console.log(res));
// 不是空值,傳參數(shù)
of(1)
 ?.pipe(defaultIfEmpty("存在內(nèi)容"))
 ?.subscribe((res) => console.log(res));
// 是空值,不傳參數(shù)
EMPTY.pipe(defaultIfEmpty())
 ?.subscribe((res) => console.log(res));
 ?
// 是空值,傳參數(shù)
EMPTY.pipe(defaultIfEmpty("存在內(nèi)容"))
 ?.subscribe((res) => console.log(res));

六、過濾數(shù)據(jù)流

(1)過濾類操作符

說明:?對上游Observable中所有的數(shù)據(jù)使用判定函數(shù)進(jìn)行操作,決定是否某些元素不能通過進(jìn)入下游,如果對某個元素處理結(jié)果為true,表示能通過,否則就不能通過

=> filter操作符 <=

作用:?跟JavaScript中的filter使用起來是類似的,只不過這里針對的是Observable

參數(shù):?filter(過濾函數(shù))

import { filter, interval } from "rxjs";
source$ = interval(1000)
 ?.pipe(
 ? ? ?// 過濾能被2整除的數(shù)據(jù)
 ? ? ?filter(
 ? ? ? ? ?(x) => x % 2 === 0
 ? ? ?)
 ?)
 ?
source$.subscribe((res) => console.log(res));

注意:?當(dāng)上游產(chǎn)?數(shù)據(jù)的時候,只要這個數(shù)據(jù)滿?判定條件,就會立刻被同步傳給下游。

=> first操作符 <=

作用:?過濾出Observable中第一個滿足條件的值,在沒有找到的時候會拋出一個錯誤,如果不想這個錯誤傳遞給下游可以使用第二個默認(rèn)值,它的作用是在沒找到滿足條件的值的時候?qū)⑦@個值傳遞出去。如果不傳參數(shù)則將第一個數(shù)據(jù)返回出去,

參數(shù):?filter(過濾函數(shù), 默認(rèn)值)

import { first, of } from "rxjs";
// 找不到結(jié)果拋出錯誤,但是給默認(rèn)值
of(1, 3)
 ?.pipe(first((x) => x % 2 === 0, 2))
 ?.subscribe((res) => console.log(res));
// 找到結(jié)果
of(1, 4, 3)
 ?.pipe(first((x) => x % 2 === 0))
 ?.subscribe((res) => console.log(res));
// 找不到結(jié)果拋出錯誤
of(1, 3)
 ?.pipe(first((x) => x % 2 === 0))
 ?.subscribe((res) => console.log(res));

=> last操作符 <=

說明:?這個作用與first相反,它是找最后一個滿足條件的值,使用可以參考first,這里需要注意,使用這個操作符的上游必須完結(jié),否則操作符不知道哪一個是最后一個數(shù)據(jù)

參數(shù):?filter(過濾函數(shù), 默認(rèn)值)

import { last, interval } from "rxjs";
// 這個Observable不會完結(jié),自然也不會拿到結(jié)果
interval(1000)
 ?.pipe(last((x) => x % 2 === 0, 2))
 ?.subscribe((res) => console.log(res));

=> take操作符 <=

作用:?從上游的數(shù)據(jù)中拿指定個數(shù)的數(shù)據(jù),拿完之后就會完結(jié),并將獲取的數(shù)據(jù)返回

參數(shù):?take(需要的個數(shù))

import { interval, of, take } from "rxjs";
// 數(shù)據(jù)不夠拿,那就拿完為止
of("a", "b", "c")
 ?.pipe(take(4))
 ?.subscribe((res) => console.log(res));
// 獲取指定個數(shù)的數(shù)據(jù)
interval(1000)
 ?.pipe(take(4))
 ?.subscribe((res) => console.log(res));

注意:?上游每產(chǎn)生一個數(shù)據(jù)就會立即傳給下游,也就是同步操作的

=> takeLast操作符 <=

作用:?從后往前獲取指定個數(shù)的數(shù)據(jù),之后將數(shù)據(jù)一次性返回出去之后完結(jié)

參數(shù):?takeLast(需要的個數(shù))

import { interval, of, takeLast } from "rxjs";
// 數(shù)據(jù)不夠拿,那就拿完為止
of("a", "b", "c")
 ?.pipe(takeLast(4))
 ?.subscribe((res) => console.log(res));
// 數(shù)據(jù)沒有完結(jié),獲取不到數(shù)據(jù)
interval(1000)
 ?.pipe(takeLast(4))
 ?.subscribe((res) => console.log(res));

注意:?如果上游的Observable對象不會完結(jié)的話,那么是拿不到數(shù)據(jù)的,因為不知道誰是最后一個數(shù)據(jù)

=> takeWhile操作符 <=

說明:?takeWhile接受?個判定函數(shù)作為參數(shù),這個判定函數(shù)有兩個參數(shù),分別代表上游的數(shù)據(jù)和對應(yīng)的序號,takeWhile會吐出上游數(shù)據(jù),直到判定函數(shù)返回false,只要遇到第一個判定函數(shù)返回false的情況, takeWhile產(chǎn)生的Observable就完結(jié)

參數(shù):?takeWhile(判定函數(shù), 布爾值)

// 這里關(guān)注第二個參數(shù)
import { range, takeWhile } from "rxjs";
range(1, 10)
 ?.pipe(
 ? ? ?takeWhile(
 ? ? ? ? ?(val) => val < 3, true
 ? ? ?)
 ?)
 ?.subscribe((res) => console.log(res));
range(1, 10)
 ?.pipe(
 ? ? ?takeWhile(
 ? ? ? ? ?(val) => val < 3, false
 ? ? ?)
 ?)
 ?.subscribe((res) => console.log(res));

注意:?第二個參數(shù)表示是否將第一次導(dǎo)致判定函數(shù)結(jié)果為false的那個值發(fā)射出去,默認(rèn)是false,表示不發(fā)射,true則表示發(fā)射。

=> takeUntil操作符 <=

說明:?它接受一個Observable對象,在這個對象沒有吐出數(shù)據(jù)之前,上游的數(shù)據(jù)會直接傳遞給下游,在參數(shù)對象吐出第一個數(shù)據(jù)時,上游的數(shù)據(jù)就不能傳遞給下游了。其次參數(shù)對象出現(xiàn)錯誤的時候,這個錯誤會傳遞給下游,此時上游數(shù)據(jù)也不能傳遞給下游了

參數(shù):?takeUntil(Observable對象)

// 假如使用interval創(chuàng)建數(shù)據(jù),在第三秒的時候停止
import { interval, takeUntil, timer } from "rxjs";
interval(1000)
 ?.pipe(
 ? ? ?takeUntil(timer(3000))
 ?)
 ?.subscribe((res) => console.log(res));

=> skip操作符 <=

作用:?跳過上游的前n個值,然后從上游的第n+1個值開始傳遞給下游,這個操作符不關(guān)心最后一個值是什么,所以這個操作符的上游不管會不會完結(jié)下游都會有值。

參數(shù):?skip(跳過的個數(shù))

import { interval, skip } from "rxjs";
// 跳過前兩個值
interval(1000)
 ?.pipe(skip(2))
 ?.subscribe((res) => console.log(res));

=> skipLast操作符 <=

作用:?可以理解成去除上游的最后n個值,然后將剩下的值傳遞給下游;

參數(shù):?skipLast(跳過的n個值)

import { interval, skipLast, of } from "rxjs";
// 一個完結(jié)的對象
of("a", "b", "c")
 ?.pipe(skipLast(2))
 ?.subscribe((res) => console.log(res));
// 不會完結(jié)的對象
interval(1000)
 ?.pipe(skipLast(2))
 ?.subscribe((res) => console.log(res));

注意:?上游沒有完結(jié)下游依然可以收到數(shù)據(jù)

=> skipWhile操作符 <=

說明:?它接收一個函數(shù)作為參數(shù),上游的每一個數(shù)據(jù)都會執(zhí)行這個函數(shù),只要有一個數(shù)據(jù)在函數(shù)中的返回值是false,那么這個數(shù)據(jù)之前的數(shù)據(jù)都會被過濾調(diào)用,剩下的數(shù)據(jù)會傳遞給下游。

參數(shù):?skipWhile(處理函數(shù))

import { interval, skipWhile } from "rxjs";
interval(1000)
 ?.pipe(skipWhile((val) => val % 2 === 0))
 ?.subscribe((res) => console.log(res));

=> skipUntil操作符 <=

作用:?用于在一個Observable中跳過一些值,直到另一個Observable發(fā)出了特定的信號或者達(dá)到某種狀態(tài)。

參數(shù):?skipUntil(Observable對象)

import { interval, timer, skipUntil } from "rxjs";
// 創(chuàng)建一個每秒發(fā)出一個值的Observable
const source$ = interval(1000);
// 創(chuàng)建一個在5秒后發(fā)出第一個值的Observable
const trigger$ = timer(5000);
// 使用skipUntil操作符,跳過source$的值,直到trigger$發(fā)出第一個值
const example$ = source$.pipe(skipUntil(trigger$));
const subscription = example$.subscribe((val) => console.log(val));

(2)有損回壓控制

解釋:?如果數(shù)據(jù)管道中某一個環(huán)節(jié)處理數(shù)據(jù)的速度跟不上數(shù)據(jù)涌現(xiàn)的速度,上游無法把數(shù)據(jù)推送給下游,就會在緩沖區(qū)中積壓數(shù)據(jù),這就相當(dāng)于對上游施加了壓力,這就是RxJS世界中的回壓。

處理:?造成這種現(xiàn)象的原因是數(shù)據(jù)管道中某個環(huán)節(jié)數(shù)據(jù)涌?的速度超過了處理速度,那么,既然處理不過來,干脆就舍棄掉某些涌現(xiàn)的數(shù)據(jù),這種方式稱為有損回壓控制

可選的調(diào)度器:
asyncScheduler:這是默認(rèn)的調(diào)度器,它使用setTimeout或setInterval來安排任務(wù)的執(zhí)行。它適用于異步操作。 queueScheduler:這個調(diào)度器會按順序執(zhí)行任務(wù),并且會等待當(dāng)前任務(wù)完成后才執(zhí)行下一個任務(wù)。適用于同步操作。 animationFrameScheduler:這個調(diào)度器會根據(jù)瀏覽器的刷新率來執(zhí)行任務(wù),通常用于實現(xiàn)動畫效果或者對性能要求較高的操作。 asapScheduler:這個調(diào)度器會盡可能快地在當(dāng)前執(zhí)行棧中執(zhí)行任務(wù),但是會在微任務(wù)隊列中等待其他任務(wù)完成后執(zhí)行。適用于需要盡快執(zhí)行的任務(wù)。 TestScheduler:這是用于測試的調(diào)度器,可以用來模擬時間的流逝,方便測試 RxJS 代碼。
可選參數(shù)對象:
leading:布爾值,表示是否在節(jié)流周期的開始時立即發(fā)出第一個值。默認(rèn)為true。 trailing:布爾值,表示是否在節(jié)流周期結(jié)束時發(fā)出最后一個值。默認(rèn)為false。

=> throttleTime操作符 <=

說明:?在一個時間范圍內(nèi),上游傳遞給下游的數(shù)據(jù)只能傳遞一個;這里參數(shù)如果只傳一個,其它值都會使用默認(rèn)值;

參數(shù):?throttleTime(時間范圍, 調(diào)度器, 可選參數(shù)對象)

import { 
 ? ? ? ?interval,
 ? ? ? ?throttleTime,
 ? ? ? ?asyncScheduler
} from "rxjs";
// 這里每隔1s產(chǎn)生一個數(shù)字
interval(1000)
 ?.pipe(
 ? ? ?throttleTime(
 ? ? ? ? ? ? ? ? ? ? ?2000,
 ? ? ? ? ? ? ? ? ? ? ?asyncScheduler,
 ? ? ? ? ? ? ? ? ? ? ?// trailing為true時產(chǎn)生的結(jié)果是:2、4、6...
 ? ? ? ? ? ? ? ? ? ? ?// leading為true時產(chǎn)生的結(jié)果是:3、6、9...
 ? ? ? ? ? ? ? ? ? ? ?{ leading: false, trailing: true }
 ? ? ? ? ? ? ? ? ?)
 ? ? ?)
 ?.subscribe((res) => console.log(res));

=> debounceTime操作符 <=

說明:?在一個時間范圍內(nèi),一直有數(shù)據(jù)產(chǎn)生一直不會將數(shù)據(jù)傳遞給下游,只有在這個時間外產(chǎn)生的第一個數(shù)據(jù)才會傳遞給下游;所以產(chǎn)生數(shù)據(jù)的間隔需要大于這個時間范圍才可以

參數(shù):?throttleTime(時間范圍, 調(diào)度器)

import { interval, debounceTime, asyncScheduler } from "rxjs";
// 這里的值如果比2000還小那么就不會有數(shù)據(jù)打印出來
interval(4000)
 ?.pipe(debounceTime(2000, asyncScheduler))
 ?.subscribe((res) => console.log(res));

=> throttle和debounce操作符 <=

作用:?這兩個都是使用Observable中的數(shù)據(jù)來控制流量,區(qū)別在于時機(jī)不同而已

參數(shù):?throttle(處理函數(shù), 可選參數(shù)對象)

參數(shù):?debounce(處理函數(shù))

// 這里以throttle為例
import { interval, timer, throttle } from "rxjs";
const source$ = interval(1000);
// 處理函數(shù)的參數(shù)只能拿到上游的數(shù)據(jù)
const durationSelector = (value) => {
 ?console.log(`# call durationSelector with ${value}`);
 ?return timer(2000);
};
const result$ = source$.pipe(throttle(durationSelector));
result$.subscribe(console.log);

理解:?當(dāng)source$產(chǎn)生第一個數(shù)據(jù)0的時候,throttle就和throttleTime一樣,毫不 猶豫地把這個數(shù)據(jù)0傳給了下游,在此之前會將這個數(shù)據(jù)0作為參數(shù)調(diào)用 durationSelector,然后訂閱durationSelector返回的Observable對象,在這個 Observable對象產(chǎn)生第一個對象之前,所有上游傳過來的數(shù)據(jù)都會被丟棄,于是,source$產(chǎn)生的數(shù)據(jù)1就被丟棄了,因為durationSelector返回的 Observable對象被訂閱之后2000毫秒才會產(chǎn)生數(shù)據(jù)。 這個過程,相當(dāng)于throttle每往下游傳遞一個數(shù)據(jù),都關(guān)上了上下游之間閘門,只有當(dāng)durationSelector產(chǎn)生數(shù)據(jù)的時候才打開這個閘門。到了2000毫秒的時刻,durationSelector第二次被調(diào)用產(chǎn)生的Observable對象終于產(chǎn)生了多個數(shù)據(jù),閘門被打開,source$產(chǎn)生的第三個數(shù)據(jù)2正好趕上,被 傳遞給了下游,同時關(guān)上閘門,這時候throttle會立刻退訂上一次 durationSelector返回的Observable對象,重新將數(shù)據(jù)2作為參數(shù)調(diào)用 durationSelector來獲得一個新的Observable對象,這個新的Observable對象產(chǎn)生數(shù)據(jù)的時候,閘門才會再次打開??梢姡琩urationSelector產(chǎn)生Observable對象只有第一個產(chǎn)生的數(shù)據(jù)會有作用,而且這個數(shù)據(jù)的產(chǎn)生時機(jī)是關(guān)鍵,至于這個數(shù)據(jù)是個什么值不重要。

=> auditTime和audit操作符 <=

說明:?這兩個都是在一個時間內(nèi),將最后一個產(chǎn)生的值發(fā)射出去,其余的值會被忽略掉。它們之間的區(qū)別是一個使用時間范圍管理,一個使用函數(shù)管理

參數(shù):?auditTime(時間范圍, 可選參數(shù)對象)

參數(shù):?audit(處理函數(shù))

import { interval, auditTime } from "rxjs";
interval(1000)
 ? ?.pipe(auditTime(3000))
 ? ?.subscribe(
 ? ? ? ?(val) => console.log("auditTime:", val)
 ? ?); ? ? ? ? ?
// 第一個3s:0、1、2、3 --> 三秒末也是四秒初發(fā)出值3
// 第二個3s:4、5、6、7 --> 六秒末也是七秒初發(fā)出值7
// ...

理解:?上面的時間寫3s,所以在第一個3s內(nèi)產(chǎn)生了值0、1、2,在第3s結(jié)束的時候,產(chǎn)生了值3,根據(jù)定義,所以第一個3s發(fā)出的值是3,在物理上,第n秒結(jié)束的時候,也就是第n+1秒開始的時候,所以下一個3s是從第四秒開始,然后這個時間內(nèi)產(chǎn)生4、5、6,第7s結(jié)束的時候,產(chǎn)生值7,將其傳遞給下游...后面的值都是這樣產(chǎn)生的,也就是它發(fā)出一個值傳遞到下游之后,它會等待下一個值到達(dá),才會開始其計時

=> sampleTime和sample操作符 <=

說明:?sampleTime的作用是搜尋一個時間范圍內(nèi)的最后一個數(shù)據(jù),將其傳遞給下游,如果這個時間范圍里面沒有值則不會傳值到下游,然后繼續(xù)下一個時間范圍的搜尋; 而sample有點不同,它的參數(shù)接收一個Observable對象來控制Observable,這個參數(shù)被稱為notifier,當(dāng)notifier產(chǎn)生一個數(shù)據(jù)的時候, sample就從上游拿最后一個產(chǎn)生的數(shù)據(jù)傳給下游。

參數(shù):?sampleTime(時間范圍, 調(diào)度器)

參數(shù):?sample(observable對象)

interval(1000)
 ?.pipe(sampleTime(2000))
 ?.subscribe((res) => console.log("sampleTime:", res));

理解:?上面數(shù)據(jù)是每隔1s產(chǎn)生一個,然后我搜尋時間范圍是2s,第一個2s,產(chǎn)生值0、1,將1傳遞出去,繼續(xù)第二個2s的搜尋,產(chǎn)生值2、3,將3傳遞出去...以此類推

(3)去重

=> distinct操作符 <=

作用:?上游同樣的數(shù)據(jù)只有第一次產(chǎn)生時會傳給下游,其余的都被舍棄掉了,判斷是否相等使用的是===

參數(shù):?distinct(一個函數(shù)來定制需要對比什么屬性, 一個Observable對象用于清空數(shù)據(jù))

場景一: 基本使用
import { distinct, of } from "rxjs";
of(1, 3, 2, 5, 7, 1, 2)
 ?.pipe(distinct())
 ?.subscribe((res) => console.log(res));


場景二:?對對象使用

import { distinct, of } from "rxjs";
of(
 ?{ name: "RxJS", version: "v4" },
 ?{ name: "React", version: "v15" },
 ?{ name: "React", version: "v16" },
 ?{ name: "RxJS", version: "v5" }
)
 ?// 這里規(guī)定數(shù)據(jù)中的name字段相同就算相同數(shù)據(jù)
 ?.pipe(distinct((x) => x.name))
 ?.subscribe((res) => console.log(res));

第二個參數(shù):?distinct在運作的時候自己會先創(chuàng)建一個集合,里面存放上游的不同數(shù)據(jù),每次上游傳遞一個數(shù)據(jù)出來就對比集合中是否有元素跟它相等,相等就舍棄,如果上游數(shù)據(jù)無限多切都是不同的,那么這個集合就會有無限的數(shù)據(jù)在里面,這就存在數(shù)據(jù)壓力,為了解決這個問題,可以使用第二個可選參數(shù),當(dāng)這個Observable對象產(chǎn)生數(shù)據(jù)的時候,這個集合中的數(shù)據(jù)就會被清空。

=> distinctUntilChanged操作符 <=

作用:?將上游中的連續(xù)數(shù)據(jù)過濾掉

參數(shù):?distinctUntilChanged(比較函數(shù))

import { distinctUntilChanged, of } from "rxjs";
of(
 ?{ name: "RxJS", version: "v4" },
 ?{ name: "React", version: "v15" },
 ?{ name: "React", version: "v16" },
 ?{ name: "RxJS", version: "v5" }
)
 ?// a表示上一個值,b表示當(dāng)前值
 ?.pipe(distinctUntilChanged((a, b) => a.name === b.name))
 ?.subscribe((res) => console.log(res));

注意:?比較函數(shù)需要返回布爾值來確定由哪些屬性決定數(shù)據(jù)相等

(4)其它

=> ignoreElements操作符 <=

作用:?忽略上游所有元素,只關(guān)心complete和error事件

參數(shù):?沒有參數(shù)

import { ignoreElements, of } from "rxjs";
of(1, 2, 3)
 ?.pipe(ignoreElements())
 ?.subscribe((res) => console.log(res));

=> elementAt操作符 <=

說明:?把上游數(shù)據(jù)當(dāng)數(shù)組,只獲取指定下標(biāo)的那?個數(shù)據(jù),如果找不到,則拋出一個錯誤事件,如果不想出現(xiàn)錯誤,可以使用第二個參數(shù),在找不到的時候,會將第二個參數(shù)做為默認(rèn)值傳遞給下游

參數(shù):?elementAt(下標(biāo), 默認(rèn)值)

import { elementAt, of } from "rxjs";
of(1, 2, 3)
 ?.pipe(elementAt(3, "使用默認(rèn)值作為數(shù)據(jù)傳遞給下游"))
 ?.subscribe((res) => console.log(res));

=> single操作符 <=

作用:?檢查上游是否只有一個滿足對應(yīng)條件的數(shù)據(jù),如果答案為是,就向下游傳遞這個數(shù)據(jù);如果答案為否,就向下游傳遞一個異常

參數(shù):?single(過濾函數(shù))

import { of, single } from "rxjs";
of(1, 2, 3)
 ?.pipe(single((x) => x % 2 === 0))
 ?.subscribe((res) => console.log(res));

七、轉(zhuǎn)化數(shù)據(jù)流

(1)映射數(shù)據(jù)

理解:?映射數(shù)據(jù)是最簡單的轉(zhuǎn)化形式。假如上游的數(shù)據(jù)是A、B、C、D的序列,那么可以認(rèn)為經(jīng)過轉(zhuǎn)化類操作符之后,就會變成f(A)、f(B)、f(C)、f(D)的序列,其中f是一個函數(shù),作用于上游數(shù)據(jù)之后,產(chǎn)生的就是傳給下游新的數(shù)據(jù)

=> map操作符 <=

說明:?它接受一個函數(shù)作為參數(shù),這個函數(shù)通常稱為project,指定了數(shù)據(jù)映射的邏輯,每當(dāng)上游推下來一個數(shù)據(jù),map就把這個數(shù)據(jù)作為參數(shù)傳給map的參數(shù)函數(shù),然后再把函數(shù)執(zhí)行的返回值推給下游

參數(shù):?map(處理函數(shù))

import { of, map } from "rxjs";
of(1, 2, 3)
 ?.pipe(
 ? ?map((item, index) => {
 ? ? ?// 處理函數(shù)的item表示當(dāng)前值,index表示當(dāng)前值得索引
 ? ? ?console.log(item, index);
 ? ?})
 ?)
 ?.subscribe();

2)無損回壓控制

說明:?把上游在一段時間內(nèi)產(chǎn)生的數(shù)據(jù)放到一個數(shù)據(jù)集合中,當(dāng)時機(jī)合適時,把緩存的數(shù)據(jù)匯聚到一個數(shù)組或者Observable對象傳給下游,這就是無損回壓控制

=> windowTime和bufferTime操作符 <=

作用:?用一個參數(shù)來指定產(chǎn)生緩沖窗口的時間間隔,以此緩存上游的數(shù)據(jù)

參數(shù):?windowTime(劃分區(qū)塊間隔, 內(nèi)部區(qū)塊開始間隔, 最多緩存數(shù)據(jù)個數(shù))

參數(shù):?bufferTime(劃分區(qū)塊間隔, 內(nèi)部區(qū)塊開始間隔, 最多緩存數(shù)據(jù)個數(shù))

場景一: 基本使用
import { timer, windowTime } from "rxjs";
const source$ = timer(0, 1000);
const result$ = source$.pipe(windowTime(4000));

理解:?windowTime的參數(shù)是4000,也就會把時間劃分為連續(xù)的4000毫秒長度區(qū)塊,在每個時間區(qū)塊中,上游傳下來的數(shù)據(jù)不會直接送給下游,而是在該時間區(qū)塊的開始就新創(chuàng)建一個Observable對象推送給下游,然后在這個時間區(qū)塊內(nèi)上游產(chǎn)生的數(shù)據(jù)放到這個新創(chuàng)建的Observable對象中。在每個4000毫秒的時間區(qū)間內(nèi),上游的每個數(shù)據(jù)都被傳送給對應(yīng)時間區(qū)間的內(nèi)部Observable對象中,當(dāng)4000毫秒時間一到,這個區(qū)間的內(nèi)部Observable對象就會完結(jié),將結(jié)果打印出來會發(fā)現(xiàn)控制臺每隔1000毫秒打印一個數(shù)字出來,因此windowTime把上游數(shù)據(jù)傳遞出去是不需要延遲的

import { bufferTime, timer } from "rxjs";
const source$ = timer(0, 1000);
const result$ = source$
 ?.pipe(bufferTime(4000))
 ?.subscribe((res) => console.log(res));

理解:?bufferTime產(chǎn)?的是普通的Observable對象,其中的數(shù)據(jù)是數(shù)組形式, bufferTime會把時間區(qū)塊內(nèi)的數(shù)據(jù)緩存,在時間區(qū)塊結(jié)束的時候把所有緩存的數(shù)據(jù)放在一個數(shù)組再傳給下游,在控制臺你會看見每隔4秒打印一個數(shù)組,因此bufferTime把上游數(shù)據(jù)傳遞出去是需要延遲的

場景二: 第二個參數(shù)

作用:?指定每個時間區(qū)塊開始的時間間隔。

import { timer, windowTime } from "rxjs";
const source$ = timer(0, 1000);
source$.pipe(windowTime(4000, 2000)).subscribe();

理解:?windowTime使用第二個參數(shù)200之后,產(chǎn)生內(nèi)部Observable的頻率更高了,每200毫秒就會產(chǎn)生一個內(nèi)部Observable對象, 而且各內(nèi)部Observable對象中的數(shù)據(jù)會重復(fù),例如數(shù)據(jù)2和3就同時出現(xiàn)在第一個和第二個內(nèi)部Observable對象中

import { bufferTime, timer } from "rxjs";
const source$ = timer(0, 1000);
source$
 ? ?.pipe(bufferTime(4000, 2000, 2))
 ? ?.subscribe(console.log);

理解:?對于bufferTime,因為需要緩存上游數(shù)據(jù),不管參數(shù)設(shè)定的數(shù)據(jù)區(qū)間有多短,都無法預(yù)期在這段時間內(nèi)上游會產(chǎn)生多少數(shù)據(jù),如果上游在短時間內(nèi)爆發(fā)出很多數(shù)據(jù),那就會給bufferTime很大的內(nèi)存壓力,為了防止出現(xiàn)這種情況可以使用第三個可選參數(shù)來指定每個時間區(qū)間內(nèi)緩存的最多數(shù)據(jù)個數(shù)。

注意:?如果第一個參數(shù)比第二個參數(shù)大,那么就有可能出現(xiàn)數(shù)據(jù)重復(fù),如果第二個參數(shù)比第一個參數(shù)大,那么就有可能出現(xiàn)上游數(shù)據(jù)的丟失。之所以說“有可能”,是因為丟失或者重疊的時間區(qū)塊中可能上游沒有產(chǎn)生數(shù)據(jù),所以也就不會引起上游數(shù)據(jù)的丟失和重復(fù)。從這個意義上說來,windowTime和bufferTime如果用上了第二個參數(shù),也未必是“止損”的回壓控制。

=> windowCount和bufferCount操作符 <=

作用:?根據(jù)數(shù)據(jù)個數(shù)來決定內(nèi)部的一個Observabe需要保存多少數(shù)據(jù)。

參數(shù):?windowCount(時間區(qū)間長度, 隔幾個數(shù)據(jù)重新開一個區(qū)間)

import { timer, windowCount } from "rxjs";
const source$ = timer(0, 1000);
source$.pipe(windowCount(4)).subscribe(console.log);
import { timer, windowCount } from "rxjs";
const source$ = timer(0, 1000);
source$.pipe(windowCount(4, 5)).subscribe(console.log);

理解:?windowCount還支持可選的第二個參數(shù),如果不使用第二個參數(shù),那么所有的時間區(qū)間沒有重疊部分;如果使用了第二個參數(shù),那么第二個參數(shù)依然是時間區(qū)間的長度,但是每間隔第二個參數(shù)毫秒數(shù),就會新開一個時間區(qū)間

說明:?對于bufferCount,和windowCount一樣,區(qū)別只是傳給下游的是緩存數(shù)據(jù)組成的數(shù)組

=> windowWhen和bufferWhen操作符 <=

說明:?它們接受一個函數(shù)作為參數(shù),這個函數(shù)返回一個Observable對象,用于控制上游的數(shù)據(jù)分割,每當(dāng)返回的Observable對象產(chǎn)生數(shù)據(jù)或者完結(jié)時,windowWhen就認(rèn)為是一個緩沖區(qū)塊的結(jié)束,重新開啟一個緩沖窗口。bufferWhen跟這個是類似的

參數(shù):?windowWhen(處理函數(shù))

import { timer, windowWhen } from "rxjs";
const source$ = timer(0, 100);
const closingSelector = () => {
 ?return timer(400);
};
// 被訂閱的時候windowWhen就開始?作,?先開啟?個緩沖
// 窗口,然后?刻調(diào)?closingSelector獲得?個Observable對象,
// 在這個Observable對象輸出數(shù)據(jù)的時候,當(dāng)前的緩沖窗?就關(guān)閉,
// 同時開啟?個新的緩沖窗口,然后再次調(diào)?closingSelector
// 獲得?個Observable對象
source$.pipe(windowWhen(closingSelector));

=> windowToggle和bufferToggle操作符 <=

說明:?利?Observable來控制緩沖窗口的開和關(guān)。它需要兩個參數(shù),第一個參數(shù)是一個Observable對象,當(dāng)產(chǎn)生一個數(shù)據(jù),代表一個緩沖窗口的開始;同時,第二個參數(shù)是一個函數(shù),它也會被調(diào)用,用來獲得緩沖窗口結(jié)束的通知;其次函數(shù)的參數(shù)是第一個參數(shù)產(chǎn)生的數(shù)據(jù),這樣就可以由前一個參數(shù)控制緩沖窗口的開始時機(jī),函數(shù)控制其關(guān)閉時機(jī),從而控制產(chǎn)生高階Observable的節(jié)奏;同理bufferToggle也是類似的

import { timer, windowToggle } from "rxjs";
const source$ = timer(0, 100);
const openings$ = timer(0, 400);
const closingSelector = (value) => {
 ?return value % 2 === 0 ? timer(200) : timer(100);
};
// opening$每400毫秒產(chǎn)??個數(shù)據(jù),所以每400毫秒就會有?個
// 緩沖區(qū)間開始。每當(dāng)opening$產(chǎn)??個數(shù)據(jù)時,closingSelector
// 就會被調(diào)?返回控制對應(yīng)緩沖區(qū)間結(jié)束的Observable對象,
// 如果參數(shù)為偶數(shù),就會延時200毫秒產(chǎn)??個數(shù)據(jù),否則就延時100
// 毫秒產(chǎn)??個數(shù)據(jù)
source$.pipe(windowToggle(openings$, closingSelector));

=> window和buffer操作符 <=

說明:?保持一個Observable類型的參數(shù),稱為notifier$,每當(dāng)notifer$產(chǎn)生一個數(shù)據(jù),既是前一個緩存窗口的結(jié)束,也是后一個緩存窗口的開始;如果這個Observable完結(jié)了,那么window產(chǎn)生的一階Observable對象也會完結(jié),buffer也是類似的

參數(shù):?window(一個Observable對象)

import { timer, window } from "rxjs";
const source$ = timer(0, 100);
// 一個不會完結(jié)的Observable
const notifer$ = timer(400, 400);
source$.pipe(window(notifer$));
import { timer, window } from "rxjs";
const source$ = timer(0, 100);
// 一個會完結(jié)的Observable
const notifer$ = timer(400);
source$.pipe(window(notifer$));

(3)高階map

說明:?傳統(tǒng)map與高階map的區(qū)別在于其函數(shù)參數(shù)的返回值,前者是將一個數(shù)據(jù)映射成另一個數(shù)據(jù),而后者是將一個數(shù)據(jù)轉(zhuǎn)變成一個Observable

import { interval, map } from "rxjs";
const source$ = interval(200);
// 這里每個數(shù)據(jù)都會轉(zhuǎn)換成一個包含數(shù)字0、1、2、3、4的
// Observable對象
source$.pipe(
 ? ? ? ? ? ?map(
 ? ? ? ? ? ? ? ?() => interval(100).take(5)
 ? ? ? ? ? ?)
 ? ? ? ?);

=> concatMap操作符 <=

說明:?可以理解成concatMap = map + concatAll

import { interval, concatMap } from "rxjs";
const source$ = interval(200);
source$.pipe(
 ? ? ? ? ? ?concatMap(
 ? ? ? ? ? ? ? ?() => interval(100).take(5)
 ? ? ? ? ? ?)
 ? ? ? ?);

理解:?第一個內(nèi)部Observable對象中的數(shù)據(jù)被完整傳遞給了 concatMap的下游,但是,第一個產(chǎn)生的內(nèi)部Observable對象沒有那么快處理,只有到第一個內(nèi)部Observable對象完結(jié)之后,concatMap才會去訂閱第二個內(nèi)部Observable,這樣就導(dǎo)致第二個內(nèi)部Observable對象中的數(shù)據(jù)排在了后面,絕不會和第一個內(nèi)部Observable對象中的數(shù)據(jù)交叉。

=> mergeMap操作符 <=

說明:?可以理解成mergeMap = map + mergeAll

注意:?一旦內(nèi)部Observable發(fā)出一個值,它就會立即將該值傳遞給下游觀察者,而不管其他內(nèi)部Observable是否已經(jīng)發(fā)出或者完成了

import { interval, mergeMap, take } from "rxjs";
const source$ = interval(200).take(2);
source$.pipe(
 ? ? ? ? ? ?mergeMap(
 ? ? ? ? ? ? ? ?() => interval(100).take(5)
 ? ? ? ? ? ?)
 ? ? ? ?);

=> switchMap操作符 <=

說明:?可以理解成switchMap = map + switchAll

注意:?后產(chǎn)生的內(nèi)部Observable對象優(yōu)先級總是更高,只要有新的內(nèi)部Observable對象產(chǎn)生,就立刻退訂之前的內(nèi)部 Observable對象,改為從最新的內(nèi)部Observable對象拿數(shù)據(jù)

import { interval, switchMap, take } from "rxjs";
const source$ = interval(200).take(2);
source$.pipe(
 ? ? ? ? ? ?switchMap(
 ? ? ? ? ? ? ? ?() => interval(100).take(5)
 ? ? ? ? ? ?)
 ? ? ? ?);

4)分組

=> groupBy操作符 <=

參數(shù):?groupBy(一個處理函數(shù),用于得到數(shù)據(jù)的key值)

機(jī)制:?對于上游推送下來的任何數(shù)據(jù),檢查這個數(shù)據(jù)的key值,如果這個key值是第一次出現(xiàn),就產(chǎn)生一個新的內(nèi)部Observable對象,同時這個數(shù)據(jù)就是內(nèi)部Observable對象的第一個數(shù)據(jù);如果key值已經(jīng)出現(xiàn)過,就直接把這個數(shù)據(jù)塞給對應(yīng)的內(nèi)部Observable對象

import { groupBy, interval } from "rxjs";
const source$ = interval(200);
source$.pipe(groupBy((val) => val % 2));

理解:?groupBy的函數(shù)參數(shù)取的是參數(shù)除以2的余數(shù),所以會產(chǎn)生兩個key值:0和1。從彈珠圖中可以看到,0和2屬于第一個內(nèi)部 Observable對象,第一個內(nèi)部Observable對象收納所有key值為0的數(shù)據(jù),1 和3屬于第二個內(nèi)部Observable對象,因為它們對應(yīng)的key值為1

=> partition操作符 <=

說明:?partition接受一個判定函數(shù)作為參數(shù),對上游的每個數(shù)據(jù)進(jìn)行判定,滿足條件的放一個Observable對象,不滿足條件的放到另一個Observable對象,就這樣來分組,它返回的是一個數(shù)組,包含兩個元素,第一個元素是容納滿組判定條件的Observable對象,第二個元素當(dāng)然是不滿足判定條件的Observable對象。

參數(shù):?partition(數(shù)據(jù)源, 判定函數(shù))

import { partition, timer } from "rxjs";
const source$ = timer(0, 100);
// 解構(gòu)賦值
const [even$, odd$] = partition(source$, (x) => x % 2 === 0);
even$.subscribe((value) => console.log("even:", value));
odd$.subscribe((value) => console.log("odd:", value));

注意:?使用 partition一般也不會在后面直接使用鏈?zhǔn)秸{(diào)用,需要把結(jié)果以變量存儲,然后分別處理結(jié)果中的兩個Observable對象

(5)累計數(shù)據(jù)

=> scan操作符 <=

說明:?與reduce操作符類似,它也有一個求和函數(shù)參數(shù)和一個可選的seed種子參數(shù)作為求和初始值。scan和reduce的區(qū)別在于scan對上游每一個數(shù)據(jù)都會產(chǎn)生一個求和結(jié)果,reduce是對上游所有數(shù)據(jù)進(jìn)行求和,reduce最多只給下游傳遞一個數(shù)據(jù),如果上游數(shù)據(jù)永不完結(jié),那reduce也永遠(yuǎn)不會產(chǎn)生數(shù)據(jù),scan完全可以處理一個永不完結(jié)的上游Observable對象

參數(shù):?scan(求和函數(shù), 初始值)

import { interval, scan } from "rxjs";
const source$ = interval(1000);
source$
 ?.pipe(
 ? ?// sum:上一次求和后的值
 ? ?// current:當(dāng)前需要進(jìn)行求和的值
 ? ?scan((sum, current) => {
 ? ? ?console.log(sum, current);
 ? ? ?return sum + current;
 ? ?})
 ?)
 ?.subscribe();

理解:?scan的規(guī)約函數(shù)參數(shù)把之前求和的值加上當(dāng)前數(shù)據(jù)作為求和結(jié)果,每一次上游產(chǎn)生數(shù)據(jù)的時候,這個求和函數(shù)都會被調(diào)用,結(jié)果會傳給下游,同時結(jié)果也會由scan保存,作為下一次調(diào)用規(guī)約函數(shù)時的sum參數(shù)

=> mergeScan操作符 <=

說明:?它在使用的時候跟scan是類似的,不過它的返回值是一個Observable對象

機(jī)制:?每當(dāng)上游推送一個數(shù)據(jù)下來,mergeScan就調(diào)用一次求和函數(shù),并且訂閱返回的Observable對象,之后,這個Observable對象會使用類似merge的方式與下游合并,此時mergeScan會記住傳給下游的最后一個數(shù)據(jù),當(dāng)上游再次推送數(shù)據(jù)下來的時候,就把最后一次傳遞給下游的數(shù)據(jù)作為求和函數(shù)的sum參數(shù)

注意:?如果mergeScan返回一個復(fù)雜或者不會完結(jié)的Observable對象,可能會導(dǎo)致上游數(shù)據(jù)和返回的Observable對象會交叉?zhèn)鬟f數(shù)據(jù)給下游,這樣那個值是最后一次傳遞給下游的會很難確定,因此在使用的時候返回的Observable里面包含的值盡量簡單

八、錯誤處理

說明:?錯誤異常和數(shù)據(jù)一樣,會沿著數(shù)據(jù)流管道從上游向下游流動,流過所有的過濾類或者轉(zhuǎn)化類操作符,最后會觸發(fā)Observer的error方法,不過也不是所有錯誤都交給Observer處理,不然它需要處理的東西就太多了,此時就需要在數(shù)據(jù)管道中處理掉,這里處理異常有兩類方法:恢復(fù)和重試。在實際應(yīng)用中,重試和恢復(fù)往往配合使用,因為重試往往是有次數(shù)限制的,不能無限重試,如果嘗試了次數(shù)上限之后得到的依然是錯誤異常, 還是要用“恢復(fù)”的方法獲得默認(rèn)值繼續(xù)運算。

恢復(fù):就是本來雖然產(chǎn)生了錯誤異常,但是依然讓運算繼續(xù)下去。最常見的場景就是在獲取某個數(shù)據(jù)的過程中發(fā)生了錯誤,這時候雖然沒有獲得正確數(shù)據(jù),但是用一個默認(rèn)值當(dāng)做返回的結(jié)果,讓運算繼續(xù)。 重試:就是當(dāng)發(fā)現(xiàn)錯誤異常的時候,認(rèn)為這個錯誤只是臨時的,重新嘗試之前發(fā)生錯誤的操作,寄希望于重試之后能夠獲得正常的結(jié)果,其本質(zhì)是在訂閱上游的同時,退訂上一次訂閱的內(nèi)容

=> catchError操作符 <=

作用:?會在管道中捕獲上游傳遞過來的錯誤

參數(shù):?catchError(異常函數(shù))

import { range, map, catchError, of } from "rxjs";
// 產(chǎn)生數(shù)據(jù)1、2、3、4、5
const source$ = range(1, 5);
// 遍歷數(shù)據(jù)發(fā)現(xiàn)在4這個位置會拋出一個錯誤
const error$ = source$.pipe(
 ?map((value) => {
 ? ?if (value === 4) {
 ? ? ?throw new Error("unlucky number 4");
 ? ?}
 ? ?return value;
 ?})
);
// 此時錯誤會被catchError的處理函數(shù)所接收
const catch$ = error$
 ?.pipe(
 ? ?// err:被捕獲的錯誤
 ? ?// caught$:上游緊鄰的那個Observable對象,此處就是指error$了
 ? ?catchError((err, caught$) => {
 ? ?
 ? ? ?// 函數(shù)的返回值是一個Observable對象,用來替代發(fā)生錯誤的那個數(shù)據(jù),然后傳遞給下游
 ? ? ?return of(8);
 ? ?})
 ?)
 ?// 錯誤被catchError捕獲處理,所以此處不存在錯誤
 ?.subscribe(console.log);

注意:?異常函數(shù)的第一個參數(shù)caught$比較有意思,因為它代表的是上游的 Observable對象,如果異常函數(shù)就返回caught$的話,相當(dāng)于讓上游Observable 重新試一遍,所以,catch這個操作符其實不光有恢復(fù)的功能,也有重試的功能

=> retry操作符 <=

第一種參數(shù): 直接傳一個數(shù)字

說明:?它可以讓上游的Observable重新試一遍,以達(dá)到重試的目的,它接受一個數(shù)值參數(shù)number,number等于指定重試的次數(shù), 如果number為負(fù)數(shù)或者沒有number參數(shù),那么就是無限次retry,直到上游不再拋出錯誤異常為止

參數(shù):?retry(重試的次數(shù))

注意:?retry調(diào)用應(yīng)該有一個正整數(shù)的參數(shù),也就是要指定有限次數(shù)的重試,否則,很可能陷入無限循環(huán),畢竟被重試的上游Observable只是有可能重試成功,意思就是也有可能重試不成功,如果真的運氣不好就是重試不成功,也真沒有必要一直重試下去,因為retry通常要限定重試次數(shù),所以retry通常也要和catch配合使用,重試只是增加獲得成功結(jié)果的概率,當(dāng)重試依然沒有結(jié)果的時候,還是要catch上場做恢復(fù)的操作

import { range, map, catchError, of, retry } from "rxjs";
const source$ = range(1, 5);
const error$ = source$.pipe(
 ?map((value) => {
 ? ?if (value === 4) {
 ? ? ?throw new Error("unlucky number 4");
 ? ?}
 ? ?return value;
 ?})
);
const catch$ = error$
 ?.pipe(
 ? ?// 重復(fù)兩次
 ? ?retry(2),
 ? ?catchError((err, caught$) => {
 ? ? ?return of(8);
 ? ?})
 ?)
 ?.subscribe(console.log);
第二種參數(shù): 傳一個配置對象
配置對象的取值:
count: 表示重試的次數(shù)限制。如果未指定,將會無限次重試,直到成功或者遇到無法處理的錯誤 delay: 表示每次重試之間的延遲時間??梢允且粋€數(shù)字,表示固定的延遲時間,也可以是一個函數(shù),接受錯誤對象和重試次數(shù)作為參數(shù),返回一個 Observable 或 Promise,用于動態(tài)計算延遲時間 resetOnSuccess: 表示是否在成功后重置重試計數(shù)。如果設(shè)置為true,則在每次成功后重置重試計數(shù),否則會保持重試計數(shù)直到達(dá)到設(shè)定的重試次數(shù)或者遇到無法處理的錯誤
注意:
delay地方如果寫一個函數(shù)在這里,這個函數(shù)會在發(fā)生錯誤時被調(diào)用,它有兩個參數(shù),一個是err$表示發(fā)生錯誤的對象, 一個是retryCount表示當(dāng)前重試的次數(shù),它需要一個返回值,不然函數(shù)無法正確的獲取錯誤對象,導(dǎo)致重試不會繼續(xù)下去。
如果delay函數(shù)的返回值是一個Observable對象,那么每次這個對象吐出一個數(shù)據(jù),就會重復(fù)一次,由此可以結(jié)合timer類似的操作符來達(dá)到延遲重復(fù)的目的
import { range, map, catchError, of, retry } from "rxjs";
const source$ = range(1, 5);
const error$ = source$.pipe(
 ?map((value) => {
 ? ?if (value === 4) {
 ? ? ?throw new Error("unlucky number 4");
 ? ?}
 ? ?return value;
 ?})
);
const catch$ = error$
 ?.pipe(
 ? ?// 重復(fù)兩次
 ? ?retry({
 ? ? ?count: 2,
 ? ? ?delay: (err$, retryCount) => {
 ? ? ? ?console.log(err$, retryCount);
 ? ? ? ?// 如果這里沒有返回值,下面只會出現(xiàn)一次重復(fù)
 ? ? ? ?return of(1000);
 ? ? ?},
 ? ?}),
 ? ?catchError((err, caught$) => {
 ? ? ?return of(8);
 ? ?})
 ?)
 ?.subscribe();

=> finalize操作符 <=

說明:?它接受一個回調(diào)函數(shù)作為參數(shù),上游無論是完結(jié)還是出現(xiàn)錯誤這個函數(shù)都會執(zhí)行,只不過在一個數(shù)據(jù)流中只會作用一次,同時這個函數(shù)也無法影響數(shù)據(jù)流。

九、多播

說明:?多播就是讓一個數(shù)據(jù)流的內(nèi)容被多個Observer訂閱

(1)數(shù)據(jù)流的關(guān)系

說明:?這里指的是Observable和Observer的關(guān)系,可以理解成前者播放內(nèi)容,后者接受內(nèi)容,播放的形式有單播、廣播和多播

理解概念:
單播: 就是一個播放者對應(yīng)朵個收聽者,一對朵的關(guān)系,例如,你使用微信給你的朋友發(fā)送信息,這就是單播,你發(fā)送的信息只有你的朋友才能收到
廣播: 例如,有一個好消息你不想只分享給一個人,而是想告訴所有的同事或者同學(xué),你就在辦公室或者教室里大聲吼出這個好消息,所有人都聽見了,這就是“廣播”,不過發(fā)布消息的根本不知道聽眾是什么樣的人,于是篩選消息的責(zé)任就完全落在了接收方的人上,以至于難以控制。
多播: 假如有一些八卦消息,你想要分享給一群朋友,但并不想分享給所有人,或者不想在公共場合大聲嚷嚷,于是你在微信上把相關(guān)朋友拉進(jìn)一個群,在群里說出這個消息,只有被選中的朋友才能收到這條消息,這就叫做“多播”

(2)Subject

承上啟下:?根據(jù)第一部分對兩種Observable的理解不難得到Cold Observable實現(xiàn)的是單播,Hot Observable實現(xiàn)的多播

問題:?如何把Cold Observable變成Hot Observable呢

解決:?在函數(shù)式編程的世界里,有一個要求是保持不可變性,所以,要把一個Cold Observable對象轉(zhuǎn)換成一個Hot Observable對象,并不是去改變這個Cold Observable對象本身,而是產(chǎn)生一個新的Observable對象,包裝之前Cold Observable對象,這樣在數(shù)據(jù)流管道中,新的Observable對象就成為了下游,想要Hot數(shù)據(jù)源的Observer要訂閱的是這個作為下游的Observable對象,所以此時需要一個中間人來完成轉(zhuǎn)化,這個中間人就是Subject

中間人的職責(zé):
要提供subscribe方法,讓其他?能夠訂閱一個的數(shù)據(jù)源,相當(dāng)于一個Observable
要能夠有辦法接受推送的數(shù)據(jù),包括Cold Observable推送的數(shù)據(jù),相當(dāng)于一個Observer

=> 雙重身份 <=

說明:?這里說的是它具有具Observable和Observer的性質(zhì),雖然?個Subject對象是一個Observable,但是這兩個之間存在區(qū)別,區(qū)別在于Subject是存在記憶的,也就是它能夠記住有哪些Observer訂閱了自己,Subject有狀態(tài),這個狀態(tài)就是所有Observer的列表,所以,當(dāng)調(diào)用Subject的next函數(shù)時,才可以把消息通知給所有的Observer

import { Subject } from "rxjs";
const subject = new Subject();
// 1號Observer訂閱了subject
const subscription1 = subject.subscribe(
 ?(value) => console.log("on observer 1 data: " + value),
 ?(err) => console.log("on observer 1 error: " + err.message),
 ?() => console.log("on observer 1 complete")
);
// 調(diào)?subject的next推送了數(shù)據(jù)1,這個消息只有1號Observer響應(yīng),
// 因為當(dāng)前只有?個Observer。同時因為next(1)在2號Observer
// 加?之前執(zhí)?,所以2號Observer沒有接收到1
subject.next(1);
// 2號Observer也訂閱了subject
subject.subscribe(
 ?(value) => console.log("on observer 2 data: " + value),
 ?(err) => console.log("on observer 2 error: " + err.message),
 ?() => console.log("on observer 2 complete")
);
// 這時候調(diào)?subject的next?法推送數(shù)據(jù)2,subject現(xiàn)在知道??
// 有兩個Observer,所以會分別推送消息給1號和2號Observer
subject.next(2);
// subject的1號Observer通過unsubscribe?法退訂
subscription1.unsubscribe();
// 這時候subject知道??只有?個2號Observer,
// 所以,當(dāng)調(diào)?complete?法時,只有2號Observer接到通知
subject.complete();

特點:?后加入的觀察者,并不會獲得加入之前Subject對象上通過next推送的數(shù)據(jù)

實現(xiàn)多播:?既然Subject既有Observable又有Observer的特性,那么,可以讓一個Subject對象成為一個Cold Observable對象的下游,其他想要Hot數(shù)據(jù)源就可以訂閱這個Subject對象來達(dá)到轉(zhuǎn)換的目的,以此完成多播的操作。

=> 不能重復(fù)使用 <=

說明:?Subject對象也是一個Observable對象,但是因為它有完結(jié)的狀態(tài),所以不像Cold Observable對象一樣每次被subscribe都是一個新的開始,正因為如此,Subject對象是不能重復(fù)使用的,所謂不能重復(fù)使用,指的是一個 Subject對象一旦被調(diào)用了complete或者error函數(shù),那么,它作為Observable 的生命周期也就結(jié)束了,后續(xù)還想調(diào)用這個Subject對象的next函數(shù)傳遞數(shù)據(jù)給下游,會沒有任何反應(yīng)。

import { Subject } from "rxjs";
const subject = new Subject();
// ?先1號Observer成為subject的下游
subject.subscribe(
 ?(value) => console.log("on observer 1 data: " + value),
 ?(err) => console.log("on observer 1 error: " + err.message),
 ?() => console.log("on observer 1 complete")
);
// 然后通過subject的next函數(shù)傳遞了1和2
subject.next(1);
subject.next(2);
// 緊接著調(diào)?了subject的complete函數(shù),結(jié)束了subject的?命周期
subject.complete();
// 2號Observer也成為subject的下游,但是,這時候subject已經(jīng)完結(jié)了
subject.subscribe(
 ?(value) => console.log("on observer 2 data: " + value),
 ?(err) => console.log("on observer 2 error: " + err.message),
 ?() => console.log("on observer 2 complete")
);
// 后續(xù)通過next傳遞參數(shù)3的調(diào)?,不會傳遞給2號Observer,
// 也不會傳遞給1號Observer,但是可以獲取subject的complete通知,
// 可以這樣認(rèn)為,當(dāng)?個Subject對象的complete函數(shù)被調(diào)?之后,
// 它暴露給下游的Observable對象就是?個由empty變量產(chǎn)?的直接
// 完結(jié)的Observable對象
subject.next(3);

注意:?在Subject的生命周期結(jié)束之后,再次調(diào)用next方法沒有任何反應(yīng),也不會拋出錯誤,這樣可能會認(rèn)為上游所有數(shù)據(jù)都傳遞成功了,這是不合理的,由于Subject是一個Observable,那么它就會存在一個unsubscribe的方法,表示它已經(jīng)不管事了,再次調(diào)用其next方法就會報錯,所以可以像下面這樣達(dá)到警示的目的。

import { Subject, interval, take } from "rxjs";
// tick$會間隔?秒鐘吐出數(shù)據(jù),調(diào)?下游subject的next函數(shù)
const tick$ = interval(1000).pipe(take(5));
const subject = new Subject();
tick$.subscribe(subject);
subject.subscribe((value) => console.log("observer: " + value));
// 在1.5秒的時候subject的unsubscribe函數(shù)被調(diào)?,
// 所以,2秒以后的時間,tick$還要調(diào)?subject的
// next就會拋出?個錯誤異常
setTimeout(() => {
 ?subject.unsubscribe();
}, 1500);

=> 多個上游 <=

說明:?理論上可以用一個Subject合并多個Observable的數(shù)據(jù)流,但是這樣做并不合適,原因在于任何一個上游數(shù)據(jù)流的完結(jié)或者出錯信息都可以終結(jié)Subject對象的生命。

import { Subject, interval, take, map } from "rxjs";
// 這兩個數(shù)據(jù)流都是通過interval產(chǎn)?的Cold Observable對象,
// 每隔?秒鐘吐出?個整數(shù),然后利?map轉(zhuǎn)化為間隔?秒鐘吐出
// ?個固定的字符串,利?take只從兩個數(shù)據(jù)流中分別拿兩個數(shù)據(jù)
const tick1$ = interval(1000).pipe(
 ?map(() => "a"),
 ?take(2)
);
const tick2$ = interval(1000).pipe(
 ?map(() => "b"),
 ?take(2)
);
const subject = new Subject();
tick1$.subscribe(subject);
tick2$.subscribe(subject);
subject.subscribe((value) => console.log("observer 1: " + value));
subject.subscribe((value) => console.log("observer 2: " + value));
// tick1$每隔?秒鐘吐出?個a字符串,吐出兩個之后完結(jié), 
// tick2$同樣每隔?秒鐘吐出?個字符串,只不過吐出的是b,
// 同樣是吐出兩個之后完結(jié)。因為subject訂閱了tick1$和tick2$,
// 所以理論上結(jié)果應(yīng)該是下面這八個值,但其實并不是
// observer 1: a
// observer 2: a
// observer 1: b
// observer 2: b
// observer 1: a
// observer 2: a
// observer 1: b
// observer 2: b

理解:?為tick1$是由take產(chǎn)生的,也就是說在吐出2個數(shù)據(jù)之后就會調(diào)用下游的complete函數(shù),也就是調(diào)用subject的complete函數(shù),此時它已經(jīng)完結(jié),后續(xù)的next的方法是沒有效果的,這也是為什么第二個b不會有效果的原因。

=> 錯誤處理 <=

說明:?如果Subject有多個Observer,并且Subject的某個下游數(shù)據(jù)流產(chǎn)生了一個錯誤異常,而且這個錯誤異常沒有被Observer處理,那這個Subject其他的Observer都會失敗,為了避免這種情況的發(fā)生,每有一個Observer的時候,就需要給它一個處理錯誤的地放就可以解決這個問題了。

十、調(diào)度器Scheduler

(1)作用

作用:?用于控制RxJS數(shù)據(jù)流中數(shù)據(jù)消息的推送節(jié)奏

舉例:?這里以帶Scheduler類型的參數(shù)的操作符range為例,不過使用調(diào)度器的這種寫法已經(jīng)廢棄,這里只是舉例而已

// 不使用調(diào)度器
import { range } from "rxjs";
const source$ = range(1, 3);
console.log("before subscribe");
source$.subscribe(
 ?(value) => console.log("data: ", value),
 ?(error) => console.log("error: ", error),
 ?() => console.log("complete")
);
console.log("after subscribe");

解釋:?因為range是同步輸出數(shù)據(jù),所有當(dāng)Observer添加之后,會把所有數(shù)據(jù)全部吐出,所以上面的代碼也是完全同步執(zhí)行的。

// 使用調(diào)度器,寫法已經(jīng)廢棄
import { range, asapScheduler } from "rxjs";
const source$ = range(1, 3, asapScheduler);
console.log("before subscribe");
source$.subscribe(
 ?(value) => console.log("data: ", value),
 ?(error) => console.log("error: ", error),
 ?() => console.log("complete")
);
console.log("after subscribe");

思考:?所以這里的asapScheduler決定了數(shù)據(jù)推送任務(wù)不是同步執(zhí)行,因為range數(shù)據(jù)的吐出是在after subscribe字符串之后的,那么什么是Scheduler呢?

RxJS中定義Scheduler:
它是一種數(shù)據(jù)結(jié)構(gòu) 它是一個執(zhí)行環(huán)境 它有一個虛擬時鐘
解釋:
所謂Scheduer是?種數(shù)據(jù)結(jié)構(gòu),指的是Scheduler對象可以根據(jù)優(yōu)先級或者其他某種條件來安排任務(wù)執(zhí)行隊列 Scheduler可以指定一個任務(wù)何時何地執(zhí)行,所以它是一個執(zhí)行環(huán)境在RxJS的數(shù)據(jù)流世界中,Scheduler說現(xiàn)在是幾點幾分幾秒,那現(xiàn)在就是幾點幾分幾秒,所以Scheduler就像是這個世界中的權(quán)威標(biāo)準(zhǔn)時鐘,正因為Scheduler提供的虛擬時鐘可以被操縱,所以可以利用Scheduler來控制數(shù)據(jù)流中數(shù)據(jù)的流動節(jié)奏。

(2)內(nèi)置的Scheduler

調(diào)度器說明
null默認(rèn)不使用,代表同步執(zhí)行的情況
queueScheduler利用隊列實現(xiàn),用于迭代操作
asapScheduler在當(dāng)前工作之后,下個工作之前執(zhí)行,用于異步轉(zhuǎn)換
asyncScheduler用于基于時間的操作
animationFrameScheduler用于創(chuàng)建流暢的瀏覽器動畫

(3)支持Scheduler的操作符

=> observeOn操作符 <=

作用:?根據(jù)上游的Observable對象產(chǎn)生出一個新的Observable對象出來,讓這個新的Observable對象吐出的數(shù)據(jù)由指定的Scheduler來控制

參數(shù):?observeOn(調(diào)度器)

import { range, observeOn, asapScheduler } from "rxjs";
const source$ = range(1, 3);
const asapSource$ = source$.pipe(observeOn(asapScheduler));
console.log("before subscribe");
// 訂閱新產(chǎn)生的Observable發(fā)現(xiàn)受調(diào)度器的影響
asapSource$.subscribe(
 ?(value) => console.log("data: ", value),
 ?(error) => console.log("error: ", error),
 ?() => console.log("complete")
);
console.log("after subscribe");
import { range, observeOn, asapScheduler } from "rxjs";
const source$ = range(1, 3);
const asapSource$ = source$.pipe(observeOn(asapScheduler));
console.log("before subscribe");
// 訂閱上游數(shù)據(jù)發(fā)現(xiàn)不受調(diào)度器的影響
source$.subscribe(
 ?(value) => console.log("data: ", value),
 ?(error) => console.log("error: ", error),
 ?() => console.log("complete")
);
console.log("after subscribe");

注意:?observeOn只控制新產(chǎn)生的Observable對象的數(shù)據(jù)推送節(jié)奏,并不能改變上游Observable對象所使用的Scheduler

=> subscribeOn操作符 <=

說明:?這個跟observeOn的區(qū)別在于前者是控制什么時候訂閱Observable對象,而后者是控制Observable對象何時往下游推送數(shù)據(jù),使用和參數(shù)是類似的。


該文章在 2024/11/12 11:11:19 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運作、調(diào)度、堆場、車隊、財務(wù)費用、相關(guān)報表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點,圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點晴WMS倉儲管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務(wù)都免費,不限功能、不限時間、不限用戶的免費OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2024 ClickSun All Rights Reserved

美女性感一二区,天堂久久久久久中文,自拍偷拍亚洲动漫 ,小浪货腿打开水真多视频,国产无套内精在线观看,巨胸喷奶水免费视频,欧美一级粗黑,免费高清欧美一区二区三区,黑人越猛烈欧美XX00动态图,最近亚洲国产网页aⅴ,少妇自拍视频一区,亚洲精品福利视频久久,最新手机国产在线小视频,国产午夜福利无,在线免费观看国产污污污视频,97视频国产中文,在线欧美sm一卡
18以下勿进色禁网站| 老色鬼 欧美精品| h视频无遮挡免费网站| 午夜在线看美女| 一本到高清视频dvd| 韩国主播福利网一区二区三区| 国产鲁鲁精在线观看| 久久久888精品视频| 理论片免费视频观看影片| 污污污国产免费网站| 久久精品女人天堂A| 白后进极品翘臀在线播放| 欧美三级韩国三级日本三斤| 亚洲国产看片| 亚州无吗免费最新视频| 人成年免费视频一区二区| 色多多国产学生妹在线网址| 天堂网2014a天堂网| 偷拍与自偷拍亚洲精品农村的| 大黄蕉 在线欧美| 亚洲色大情网站久久久| 国产在线你懂得| 亚洲韩国精品| 大学生久久香蕉国产线观看| 波多野结衣紧身裙丝袜久久| 午夜激情电影免费在线观看| 69堂精品aⅴ| 第一福利 中文字幕| 国产在线99小视频| 伊人久久综合影院首页| 亚洲日韩中文字幕A∨| 国产激情四射| 2018久久精品免费视频| 99re全部都是精品视频在线| 欧美成 人影院在线观看| 午夜白浆在线视频| 精品国产亚洲一区二区三区| MD传媒新剧国产在线观看| 国产女人叫床高潮视频| 99国产美女露脸口爆吞精| 少妇厨房愉情理伦片视频下载| 亚洲尤物视频在线观看| 又爽又黄又无遮挡裸乳网站| 美女极品粉嫩美鮑20p图| 亚洲呦女| 就去吻亚洲精品日韩都没| 男人j放进女人p全黄在线| 久久www免费人咸| xxxx欧美和曰本| 欧美翘臀一区二区三区| 好紧好爽好大丝袜视频| 草草浮力影院禁止18进入| 日本性爱一区二区三区| yy11111111免费少妇电影院| 精品特色国产自在自亚洲| 理论片鲁丝二区爱情网| 午夜福利国产小电影| 91白浆在线视频| 90后极品粉嫩小泬20p欧美| 亚洲一区波多野结衣二区三区| 性色AⅤ在线观看SWAG| 日本久久久久亚洲中字幕| 亚洲欧美中东在线观看| 翁公在厨房猛烈的撞击| 在线观看免费高清aⅴ片| 偷拍精品视频一区二区三区果冻传| 亚洲色热女| 婬荡交换乱婬官网视频| 自拍 15 国产| 国产日产久久高清欧美一区AB| 爽死视频国产| 亚洲综合福利电影| 一个人看的免费视频在线| 日韩欧美在线综合网另类小说| 色婷婷综合激情中文字幕| 灌满精子的波多野结衣| 乱伦一区| 国产美女嘘嘘嘘嘘嘘| 国产精品永久免费高清| AV天堂免费网址| 婷婷激情婷婷激情产在线精品亚洲| 两个人看的www免费高清| 字幕网中文Aⅴ资源站| 60老妇乱子伦视频| 国产尤物视频91 在线| 2020亚洲а∨天堂在线直播| 精品一区二区三卡四卡网站| YYYY111111少妇影院亚瑟| 9久热久爱免费精品视频在线 | 中文字幕综合视频| 亚洲Va中文字幕久久一区| 国产午夜4k午夜福利网站| 日出白浆视频| 亚洲第一精品立川理恵| 国产美女口交免费视频| yw尤物麻豆国产精品| 亚洲永久网站| 亚洲嫩模综合| 女人一级免费看| 加勒比无 码中字幕| 亚洲精品亚洲人成在线| 天天做天天爱天天爽天天综合| 国产十八禁视频在线网站| 一少妇挑战三黑人4p| 国产精品张柏芝久久| 国产美女爽到喷出水来视频99| 亚洲中文版日本AⅤ| 老司机性色福利精品视频| 日本中字在线视频二区| 特级欧美XXXXX| 国产精品∧v在线观看| 777米奇色狠狠888影| 好大好硬我要喷水了免费视频| 五月婷婷麻豆| 亚洲日本va中文字幕在线不卡| 亚洲日韩AV动态图| 精品久久国产字幕高潮| 久久狠狠操视频| 亚洲影院图| AA级女人大片喷水在线观看 | 亚洲 欧洲 日产 韩国网站 | 白丝国产在线| 又大又硬的免费视频| 国产欧美日韩在线精品| 熟睡被义子侵犯中文字幕 | 大胆人休久久久大胆的做受| 国产色多多Av片| 又黄又大又硬好爽好疼好深视频| 啊啊高h在线观看| 性短视频在线观看免费不卡流畅| 国产萌白酱福利喷水视频在线观看| 免费人成在线观看网站| 韩国AV免费在线观看| 亚洲国产日韩在线yw| 啦啦啦啦免费视频在线观看 | 国产亚洲h小电影| 国产精品福利2020久久| 国产黑丝专区| 亚洲一区二区三区中文字幕 | 亚洲色老头在线| 亚洲五月天堂网| 无遮挡又黄免费视频| 91香蕉在线视频| 毛都没有就被开了视频苞| 2022国产男女视频| 办公室1战4波多野结衣在线| 亚洲日韩中文字幕视频| 免费视频边吃饭,边被操| 韩国午夜理伦三级好看| 夜夜澡人摸人人添人人看| 啊嗯干我在线观看| 久久国产免费2020| 日日碰夜夜操狠狠操| 又大又粗又黄的免费视频,| 国产AV麻豆精品原创| AV高潮在线| 亚洲丁香色婷婷综合欲色啪| 熟女五十路AV| 老熟女激烈的高潮| 亚洲色大成网站www久久九| 亚洲偷少妇熟女| 阿娇13分钟作爱视频| 九九在线精品国产| 国产喷水AⅤ| 国产激情在线观看播放| 亚洲免费二区中文| 国产日韩欧美在线精品| 伊人色在线视频| 深夜在线视频观看免费视频禁18| 4480YY私人精品国产| 一区二区吉泽明步中文字幕| 99国精品午夜福利视频不卡99| 国产在线悠悠视频| 婷婷激情在线| JIZZJIZZ日本护士视频| 亚洲综合婷婷激情| 无福利一区二区三区| 亚洲码在线观看| 草久在线 中文字幕| 国产午夜精品理论片| 欧美a级毛欧美1级a大片免费播放| 天天夜碰日日摸日日澡| 扒开双腿猛进入喷水视频| 扒女人两片毛茸茸黑森林| 亚洲熟女淫妇| 西西人体大胆4444WWW| 国产69堂高清精品| 亚洲欧洲日韩性视频在钱| 久久思思97视频| 天天插夜夜操美女图| AV网址手机在线观看| 大又大粗又爽又黄的视频| 2020狠狠狠狠久久免费观看| 又白又肥的大白屁股在线视频| 国产在视频线精品视频| 肉欲色区一区二区| 少妇自拍视频一区| JizzJizzJizz亚洲成年大全 | 热久久国产最新地址获取 | 99久久精品费精品国产| 26uuuAV高清| www毛茸茸毛茸茸www| 国产久精品搜索视频| 黑人巨鞭大战中国女人视频| 一区二区三区18岁| 白俄罗斯毛茸茸福利| 草草线在线禁18成年在线视频| 伊 在线 色| 太深了太粗了免费视频| 又色又爽又高潮的视频日本| 天天爽爽夜夜爽爽精品视频 | 最新系列国产专区亚洲国产| 久久久久久高潮白浆| xvideos在线视频播放| 亚洲国产爱| 338q西西人体大胆瓣开下部| 国产骚女精品| 久久艹禁精品| YW193尤物在线影院| 久久久精品成人免费观看国产| 日本xx在线免播放器观看| 国产鲁鲁视频在线观看鲁阿鲁 | 综合狠狠综合久久| 最近更新中文字幕第1_| 亚洲的欧美性爱| 国产有码免费视频| 无遮挡粉嫩小屁泬| 亚洲AV日韩综合一区二区三区| 黄h视频在线观看视频| 澳门男人女人做爽爽视频| 婷婷基地五月天六月丁香| А∨天堂一区一本到免费| 九月色综合| 国产午夜视频免费观看| 国产xo免费视频在线观看| 午夜男女无遮挡拍拍流水视频| a网站在线看| aⅴ18精品| 亚洲欧美日韩综合在线一| 国内精品久久免费伊人电影院 | 色婷婷激情综合| 100大片免费看| AV永久免费观看网站| 欧美大BBBB喷白水| 国产美女被高潮网站| 高清人人天天夜夜曰狠狠狠狠| 偷国内自拍视频在线观看手机版| 熟女乱妇| 久久无碼专区国产精品| 成上人色爱欧美一区二区丝瓜视频| 国产人成午夜电影免费观看| 大臿蕉香蕉大视频在线| 出轨的丝袜美女国产视频26| 美女被弄到高潮视频免费| GAy 在线 免费| 久久综合99热国产| 娇妻与老头高潮在线观看| 亚洲伊人久久大香线蕉| 69堂午夜精品视频在线| 噜噜色,噜噜啦,噜噜网| 亚州精品视频在线伊人| 久久久亚洲精品杨幂| 你懂的,在线看| 小蝌蚪国产午夜精品| 正在播放国产真实哭都没用| 亚洲中文字幕一本久道热线在线| 风间由美性色一区二区三区| 亚洲欧美日本久久综合网站点击| 亚洲自偷自拍首页精品| 成年轻人网站免费视频| 日韩欧美亚洲每日更新在线| 亚洲人成网站蜜桔午夜| 持续撞击高潮波多野结衣| 久久天天躁日日躁狠| 好湿好痛好紧的A级视频| 97人人模人人爽人人6| 中文字幕乱码免费熟女| 白浆免费网站| 国产在线播放鲁啊鲁视频| 久久久久久久久久伊春院视频| 国产亚洲精品VA在线| 55夜色66夜色国产亚洲一| 国产在线2019精品| 老司机午夜福利直播| 亚洲va中文字幕不卡| 2020日本精品网站尤物| 熟睡中被义子侵犯在线播放| igao在线播放 国产精品| 亚洲AV电影一区二区三区四区| 亚洲自拍愉拍| 青青青草网站免费| 亚洲国产精品500在线观看| 啊嗯~啊~啊啊啊~在线观看| 国产交换配乱婬视频偷最新章节| 夜夜澡人摸人人添人人看| a天堂在线视频| 国产乱子伦一区二区三区视频播放| 朝鲜毛茸茸片| 欧美成网站| 2012中文字幕第一页| 作爱视频网站久久| 亚洲本道一级视频免费| 少妇喷水综合在线| 久久久噜噜噜久久熟女| 少妇高潮a一级做完| 国产一区二区三区在线视频| 欧美xx精品| 午夜成人1000部免费视频 | 绿帽2021国产在线| 又黄又爽又刺激又免费视频| 热久久视久久精品18| 国产视频污在线观看| 亚洲高清视频在线观看你懂| 呦男呦女八区| 综合日韩精品| 又刺激又黄又爽又舒服的视频| 国产精品视频人人做人人| 亚洲日本va午夜在线电影蜜芽| 2021AV手机在线观看网址| 爱v天堂在线观看| 蜜芽亚洲AV尤物不卡资源| 又大又湿又紧 视频| 亚洲尤物精品一区| 国产免费AV电影在线第一页| 在线看片a免费人成视频| 大学生香蕉国产视频| 杨幂AV喷水在线| 亚洲精品综合网站| 波多野结衣AV久草| www.五月天| 姉欧美日韩国产精品777| 尤物视频在线| 国产精品福利片| aaa爽爽爽片在线观看| 亚州不卡網站| 幻女free性zozo交体内谢深喉| 国产精品农村妇女α片| 色在线天堂| 又大又粗又长外国一级| 日本WWWXXXX在线观看| 精品剧情V国产在线观看| 国产自国产自愉自愉免费24区| 国产精品原创巨作。无遮挡| 亚洲精品综合网站| 亚洲色大成网站永久| 久久久久久久久久伊春院视频| 亚洲一级AV免费直播| 在线v片免费观看视频 | 色久悠悠婷婷综合在线亚洲| 不卡日本高清精品免费观看| 亚洲免费公开| 国产情侣自拍区| 好吊色永久免费播放| 国产一区二区黄| 亚洲色熟女图激情另类图区| 亚洲精品网站在线观看你懂的 | 西西人体44www高清大胆| 免费观看sM重口视频| 中国大成色www永久网站| 中文字幕 动漫精品| 免费无遮挡AV| 大美女的屁股眼喷水视频丶| 新欧美丝袜脚交| 国产黄大片在线观看| 永久免费观看赤裸美女| 开心五月激情网| 国产又色又爽又黄的视频在线| 精品视频一本二本| 亚洲女成aⅴ| www123国产免费自拍视屏| 无套粉嫩国语对白| 国产精品三级一区| 国产调教打屁股网站97| 波多野结衣护士未删减版| 正在播放 国产精品推荐| 久久久久久久综合日本fuck| 草草热草在线9| 国产精品一二区不卡| 最新亚洲人成人在现| 夫目前侵犯波多野结衣中文字幕| 色综合欧美在线视频区| 99rv精品视频在线播放| 午夜刺激福利电影在线观看| 制服丝袜长腿先锋| 亚洲天堂网在线观看| 伊人少妇75在线视频| 手机免费Av片在线播放| 亚洲AⅤ元码一区二区三区| 伊人大香线在线播放| 亚洲伊人久久电影网| 亚洲笫一狠人久久| 亚洲福利视频午夜| 日本嗯啊视频在线观看| 粗大猛烈进出高潮视频二| 久久中文字幕系列| 午夜丝袜鲁丝| 色一鲁中文字幕| 成年片色大黄全免费APP久久| 丰满的少妇HD高清2| 亚洲最大你懂的网站| 国产薄丝脚交视频在线观看| 啊!摁摁~啊!用力~视频| a欧美亚洲日韩在线观看| 婷婷综合久久中文字幕| 在线观看天堂网A片| 永久免费看mv网站入口| 多水的粉嫩小穴视频| 日本小妹被我插的娇喘不停| 大炕上翁熄粗大交换刘雪| 五十路熟女交尾中出| 国内精品久久免费伊人电影院 | 日本激情网| 中国AAAA一级淫片| 亚洲综合图区小说专区| 巨胸喷奶水影院| 成年站免费网站看v片在线| 国产偷窥熟女精品视频大全| 久久久久久精品影院入口| 无遮挡十八禁污污污网站| 国产熟女高潮精选合集| 免费观看性行为视频的网站| 亚州人妖AV一区二区| 国产一进一出视频网站| 又污又黄又刺激的网站| 老司机午夜福利视频免费播放| 国内精品福利丝袜视频| 狼群国产高清在线观看| 国产999视频| 啊疼轻点一区二区三区在线观看| 亚洲天堂网站| 十八禁美女网站| 99久久亚洲综合精品成人| 加勒比高清不卡波多野结衣| 激情五月天四房播播| 真实国产乱子伦对白视频| 少妇yy111111| 2021午夜福利理论片| aⅴ天堂男人在线视频| 日本不卡一区二区不视频| 97一区二区在线精品视频| 一女被多男喷潮在线视频| 国产精品香蕉在线的人| 国产xvideos国产在线| 久久香蕉国产线观看| 国产精品亚洲w码日韩中文| 中文字幕AⅤ中文字幕| 亚洲v天堂v手机在线资料大全| 凹凸导航第一福利| 中文字幕一区木| 国产高潮流白浆喷潮免费| 国产欧美成AⅤ人高清| 在线天堂资源最新版| 亚洲精品福利| 白丝国产在线观看| 午夜三级a三级三点在线观看| 日本一道DVD中文字幕| 国产黑色丝袜视频在线观看| 在线观看h的网址| 操Av爽久| a欧美视频| 香蕉伊蕉伊中文在线视频| 先锋影音AV资源我色资源| 日本一区二区中文字幕| 国产美女a做受大片观看| 粗了大了整进去好爽视频 | 亚洲色在线观看另类| 男女肉粗暴进来120秒动态图| 苍井空一区二区三区| 综合 欧美 亚洲日本| 色多多一区二区在线观看| 嗯啊免费视频在线| 最新午夜男女福利片视频| 丁香五月欧美激情综合站| 国产综合色香蕉精品午夜婷| 久久久久夜夜夜综合国产| 国产在线视频大码| 精品日本久久久久久久久久| 久久久久久精品午夜福利| 专区亚洲欧洲日产国码AV| 久久受www免费人成_看片中文| 日韩日批在线免费播放视频| 禁止18点击进入在线观看片尤物| 夜夜爽夜夜爱| 日韩久久久性Av| 日日摸夜夜添高潮| 妇子伦视频在线观看| 久久精品呦女暗网app| 中文字幕夫妇交换乱叫| 再深点再快点要高潮了在线看| 岛国艾薇视频在线观看品爱| 久艹综合福利网| 换脸欧美一区二区| 精品三级视频a∨在线观看| 天天综合天天做站| AV不卡在线看波多野结衣| 亚洲精品你懂的在线观看| 自拍亚洲欧美在线成电影| 在线精品国精品国产尤物| 亚洲第一SE情网站| 久久精品国产免费观看| 97在线视频公开免费| 我和子发生了性关系视频| 大战兰州熟女邻居| 国产 在线 激情| 国产乱子伦一区二区三区=| 国产极品AV嫩模| 最新中文字幕Av专| A级成年网站| AV在线亚洲一区二区| 亚洲人成电影网站色WWW| 亚洲日本无在线码播放| 啦啦啦在线视频免费播放WWW| 亚洲巨制Av剧情在线| 成年片色大黄全免费APP久久| 亚洲欧洲日产国码综合国产人| 亚洲欧美日韩精品专区卡通| 午夜福利高潮| 亚洲va欧洲va日韩va忘忧草| 中文字幕 尤物视频| 国产干逼逼视频| 拍拍天天夜夜| 亚洲白嫩美女在线| 亚洲AV不卡在线观看播放| 天天爱天天碰| 99国产欧美久久久精品| 国产午夜羞羞| 日本免费一区二区三区最新| 女人下面给男人桶视频| 疯狂做受在线| 粗大进入日本高h视频| 我的公强要了我高潮在线观看| AV不卡一区二区在线直播| 国产精品色色自拍| 久久CAOPORN国产免费| 草裙社区精品视频三区| 国产欧美在线呦| 嘿咻嘿咻边摸边做动态图| 妇子伦视频在线观看| 国产区在线视频明星在线| 一级女性高潮视频| 国产高潮流白浆99ri| 亚州男同网| 国产制服丝袜在线第一页| 99久久久国产精品齐齐齐齐| 黑人按着韩国妞xx视频在线看| 暴力强到最舒服奷伦小说| 又黄又硬又爽视频好看| 久久综合网天天五月天丁香婷 | 尤物tv在线国产剧情| 极品少妇被猛得白浆直流视频| 大量国产私密保健视频 | 天堂最新版在线中文| 岛国AV一区二区三区在线观看 | 2021年新国产免费观看| 看男女日皮视频在线观看| 女人腿张开让男人桶爽肌肌| 宅男噜噜噜66网站高清| 国产欧美国日产在线视频 | 殴美亚洲动漫视频| 日本乱子伦精品免费看| 亚洲影院天堂中文AV色| 尹人大香蕉综合视频| 午夜福利影院网址| free性欧美人与牛| 亚洲香蕉网久久综合国产| 欧美另类极度残忍拳头交| 午夜在线观看的免费网站| caopor一区二区| 最新午夜福利视频| CHINESE白袜喷浆XNXX| 丰满巨臀大屁股白浆| 图片区 小说区 中文| 十八禁娇喘出水| 在线免费观看的小视频| 正在播放国产呦系列(784)| 白浆在线视频| 久久99精品久久久久久清纯| 69堂精品| 亚洲中文字幕毛茸茸| 亚色在线播放| 十八禁娇喘出水| 久久免费精品不卡| 国外欧美一区二区久久| 亚洲国产线看观看| 3344成年站福利在线视频| 少妇把腿扒开让我添69| 有码视频中文字幕| 日本高清不卡中文字幕视频| 手机看片久日韩| 国产社区精品视频| 老师洗澡让我吃她胸的视频| 来吧天天影视色香欲综合网| 狠狠综合久久久久综合| 就要鲁就要鲁夜夜爽| 综合VA欧美激情| 在线观看免费国产丝袜网红| 白浆在线观看国产| 欧洲亚洲色一区二区色99| 护士毛茸茸的性| 亚洲爱V天堂| 自拍三区| 黑人又粗又大一| 影音先锋日韩在线高清AV资源| 中文字幕制服丝袜第57页| 国产在线视频大学生白嫩| 大胆午夜视频| 对白刺激的老熟女露脸| 亚洲红杏在线观看| 12p亚洲喷白浆| 高潮尖叫免费视频| 久久人人爽人人爽人人片dvd| 亚洲一区女教师| 欧美A∨小影院| 女女高潮激烈免费视频| 亚洲片手机在线视频| h高潮娇喘抽搐国产在线看| 777综合| 久久青草精品免费免费| 欧美大胆A级短视频| 69堂国产欧美亚洲| 亚洲国产VA综合在线| 中日韩一区二区三区中文免费视频 | 国产剧情护士的遭遇在线观看| 背德乱辈伦中文字幕日韩电影片| 欧美日韩人人夜夜澡| 欧美老妇肥婆毛多水多性强| 五月天国产| 无遮挡H肉动漫在线播网站| 亚洲国产精品乱码一区二区| 日本aⅴ大伊香蕉精品视频| 国产盗摄免费视频大全| 在线观看黄页网免费视频| AV女天堂| 国产精品你懂的在线播放| 第一福利导航视频| 国产校花处被破的视频| 国产一区二区在线视频播放页| 中文字幕有码不卡视频在线 | a级情欲片在线观看| 露性器全程啪到尾的电影在线| 女人高潮视频..com| 国产精品未满十八禁止观看| 天堂在线诱惑| 欧美中文字幕一区,二区,三区| 娇妻高潮白浆狂涌视频| 欧美在线你懂的| 老师掀开短裙让我挺进| 国产中年熟女大集合| 久久97视频| 亚洲综合一本色一区| 国产农村妇女一级系列| 国产精品福利精品福利短片| 尤物yw午夜国产精品大臿蕉| 99日本精品杨幂| 国产一二区免费| 亚洲无碼香腸视頻| 野战好大好紧好爽快点老头视频| 阳茎进去女人阳道3d视频| 国产亚洲香蕉线播放ΑV38| 亚洲日韩欧美一区、二区| 邻居新婚少妇真紧| 国产久爱视频在线| 五月天婷婷在线视频精品播放| 宅男深夜视频网站| 2021天天狠天天透天干天天| 尤物网站永久在线观看| 杨幂久久精品| 久久综合久久综合九色| 国产青青视频精品久久| 色多多成视频人黄在线观看| 痉挛高潮中文字幕| 在线 午夜 制服 丝袜| 国产女人爽的流水的视频| 一级欧美一级日韩片| 看全色黄大色大片免费久久| 亚洲激情图片| 成年视频xxxxx在线| 亚洲制服日韩专区| 日日射、日日摸| 亚洲无吗福利视频| A级男女性高爱潮试看| 女生高喷母乳中文字幕| 可以免费看AV的网站| 8x成年视频在线观看| 色花都国产第一| 非洲视频中文字幕在线不卡| 国产高清在线精品黑人一区| 97视频在线精品国自产拍| 国产精品偷伦视频免费观看了| JIZZJIZZJIZZ亚洲熟| 好男人WWW在线影视社区| 亚洲多人视频在线播放| 欧美视频嗯啊| 69久久夜色精品国产| 色窝窝蝌蚪网| 久久精品亚洲男人的天堂| 午夜男女刺激爽爽影院| 白浆美女在线观看| 国产精品女同系列在线| 337P粉嫩日本欧洲亚洲大| 久久精品国产欧美日韩| 草裙社区免费视频一二三区| 天天做夜夜做狠狠做| 26uuu欧美日本在线播放| 亚州国产午夜在线视频无| 中文字幕视频在线观看第一页| yy111111少妇影院动漫| 99RE6热视频这里只精品15| 免费喷乳视频| 精品久久久久久久久久香蕉| 伊人久久大香线蕉五月天| 免费gay片敏感小受男男| 中文字幕aⅴ第一页在线| 亚洲一区AV| 人人添人人澡人人澡人人人人| 成年女人大片免费看官方| 国产l精品国产亚洲区| 亚洲男同GAy作爱视频网站| 中文综合在线第二页 | 午在线亚洲男人午在线| 亚洲欧美日韩中文字幕一区二区三区 | 正在播放HEYZO美熟女| 成 人 h在线播放| 亚洲已满18点击进入在线看片| 久久国产线看精品观看yw| 国产一区二区在线爱| 国产鲁鲁色| 亚洲骚妇视频| 亚洲 欧美 聚色| 国产精品石村小月美女视频一区二区三区 | 蝌蚪视频在线国产| 欧洲免费无线码在线观看二区| 二色天堂AV在线| 色综合久久久久网| 国产极品喷水视频| 男女性爽大片视频免费看| 在线乱码一卡二卡三卡| 久久国产情侣露脸精品| 张开腿无遮无挡在线观看| 婷停五月丁香深爱五月 | 小泽玛利亚 一区二区| 午夜在线不卡精品国产| 五月综合影院婷婷综合在线| 欧洲第一页浮力影院| 黑人粗长五月天在线观看| 国内大量揄拍视频| 在线观看亚洲色自拍一区| 另类残虐变态在线视频| 国产白浆喷水在线视频| 波多野结衣1区2区| 午夜福利体验免费体验区| 制服丝袜视频国产一区| free性欧美hd另类精品| 乌克兰18极品xx00| 华裔美女挑战黑人疯狂做| 亚洲va日韩va久久| 青青久久少妇| 亚洲AⅤ日韩AV电影在线观看| 小屁孩cao大人免费看| 国自产精品手机在线观看视频| 成人免费AV不卡在线观看| 拉开腿揉捏花蒂hAV视频| 国产白嫩大屁股免费视频| 又爽又硬出湿水免费视频| 精品亚洲aⅴ一区| 各种高潮VIDEOS2抽搐合集 | Α片在线观看免费| 久热香蕉在线视频免费版| 国产小u女AV| 国产私拍福利在线永久视频| 波多野结衣琪琪网| 午夜热情影院免费观看| 人成乱码一卡二卡三卡| 久久九九刘涛精品视频| 亚洲色永久域网站| AV色天堂网在线观看| 久久久亚洲精品杨幂| 小h片资源免费观看在线| 好男人在线看片神马电影| 白嫩出水在线播放国产| 黑人粗大猛烈进出视频| 正在播放国产强啪| 3d动漫精品福利在线看| 亚洲囯产一区二区三区| 国产点击进入在线影院尤物| 日本三级韩国三级韩级,播放| 国语自产视频在线| 欧美人拍拍动态免费看| 尤物yw193can在线观看| 无忧AV在线播放| 麻豆婷婷射| 免费网站永久在线播放| 久久久生热线品久久久频97色老 | 好吊色永久免费视频大全| 久久综合综合久久综合| 领导不戴套玩弄下属娇妻| 久久少妇野战| 2019最新国产不卡a精品2018| 精品国产精品国产偷麻豆| 中国农村自拍HDXXXX| 亚洲国产日韩A在线乱码最近更新中文字幕| 夜夜爽天天爽人人爽| 人体固定中出し輪姦在线观看| 精品午夜中文字幕在线| 国产真人作爱免费视频播放| 在线观看免费h视频网站| 色综合天天综合网国产| 婷婷久久综合九色综合98| 美女高潮无套内谢| 亚洲中文字幕日韩经典空组| 国产欧美日韩在线观看一区二区| 久久久久久综合精品免费| 未满十八岁禁止入内忘忧草视频| 白浆高潮抽搐视频| 欧美成网站| 国产小视频丝袜在线| 夜夜操欧美视频| 色妞影院在线观看品爱网| 女人张开双腿让男人猛桶| 亚洲最大中文网| 国产swag在线| 禁止的波多野结衣| 2021给个最新网站青草视频在线观看| 啊不要好痛视频国产| 同桌扒开我的内裤摸下面| 国产伦精品一区二区三区视频| 国产女人与公拘交| 久久久久久精品精品69| 伊人色合久久网| 久久电影国产亚洲欧美精品| bl国产免费一区二区三区bl| 成年视频XXXXX在线app| 国产免费人成视频在线播放播| 2021AV手机在线最新| 亚洲国产日韩一区二区在线 | 狠狠cao视频观看| 精品亚洲人人| 日韩欧美亚洲综合久久影院| 亚洲愉拍自拍欧美精品| 嗯啊视频在线看| 亚洲依依成人综合网址首页| 国产精品99久久免费| www亚洲天堂| 亚洲色哟哟在线| 国产午夜精品视频| 久久97久久精品免费看| 色久悠悠婷婷综合在线亚洲| 在线视频欧洲专区| 伊人成色综合网| 调教求饶视频导航| 综合图区自拍另类图片| 不要钱曰批免费视频播放网站 | 一本久道久久综合丁香五月影院官网版 | 26uuu在线亚洲欧美| 日韩90后极品粉嫩小泬20p| 精品国产免费看久久久| 18成禁人看免费无遮挡床震| 国产在线精品一区二区三区不| 无遮挡1000部拍拍拍免费视频| 最猛欧美XXXXBBBB| 窝窝福利精品网| 99久久免费国产精品2021| 好了AV第四电影综合站| 国产自偷自偷手机在线| AV网页手机在线观看| 精品国产品国语在线对白| 亚洲欧洲性AV| 国产污污污影院在线观看免费| 国产91精品久久久久久| 国产在线主播一区| 在线伊人5| 色多多成视频人黄在线观看| 老师被操的在线视频色多多| 殴美xxxxBBBB| AV性天堂高清在线观看| 性荡视频播放在线视频| www大成色永久| 国产精品免费aⅴ片在线观看| 一个人看的www高清视频免费在线| 一级特黄欧美曰皮片| 国产美女遭强高潮网站下载| 亚洲欧洲国产伦综合| 国内精品免费久久久久电影院| 中文字幕熟女| 岛国免费v片在线观看| 在线精品视频成人| 阳茎进去女人阳道视频特黄| 最新国产AV网址大全| 越南美女一级特黄大片| 厕所偷拍太清楚了10P| 欧美最猛性xxxxx69交| 九月丁香婷婷综合在线| 欧美亚洲日本韩国一区| 91在线看喷水| 欧美日韩一区二区三区在线| 一进一出一爽又粗又大动态图| 孕妇孕交日本在线播放| 992tv在线视频国产| 用劲太爽了再深一点| 2020亚洲а∨天堂在线| 2021一级α片免费看| 国产大屁股喷水视频| 欧美性色 播放| 日本S级视频在线| 国产好深好湿在线观看| 8x永久免费观看在线| 国产亚洲美女精品久久久| 成人AV高清不卡在线| 国产自偷在线播放| 成熟女性祼交视频| 91麻豆国产自产在线观看| 亚洲人妖AV免费在线AI| 国产白嫩极品在线观看| 国产亚洲福利在线视频| 国产尤物在线点击进入| 日本少妇被爽到高潮动态图| 免费国产在线精品一区二区三区| GV男男在线观看| 亚洲精品国产自在久| 3Dh版动漫视频在线观看| 国产精品自拍AV| 日韩AⅤ精品一区二区在线| 欧美激情国产精品| 高H猛烈失禁在线视频| 亚洲男男gv手机在线观看| 成年人视频在线看| 国产小呦泬泬99精品| 天天做天天摸天天爽欧美一区| 在线观看揉美女胸网站| 成 人 网站免费| 亚洲精品一二三四区波多野结衣| 国产孕妇福利3在线观看| 亚洲自拍AV不卡| 国产极品AV嫩模| 2012免费观看完整版在线播放| 永久手机版AV在线| 免费午夜福利在线观看视频| 亚洲国产精品乱码一区二区 | 国产精品白嫩美女在线观看| mm翘臀后进式视频| 波多野结衣一区二区AV| 女同性Av网站| 久久国产乱子伦精品免费不卡| 在线看片AⅤ免费观看| 日本嗯啊不要,视频| 护士白浆免费视频| 久久综合九色综合欧洲98| 99久久九九免费观看| 国产区图片区小说区亚洲区| 6080yy国产精品| 亚州日韩穿丝袜在线| 国产精品香蕉网页在线播放| 22222se男人的天堂| 亚洲A∨网址| h色在线观看网站| 高清性欧美ⅩXX护士| 白丝美女在线播放视频网站| 啊…轻点…好爽…用力…国产视频 | 中文字幕调教中文一区| 日本不卡不码在线看| 最新国产午夜精品视频不卡| 高清国语自产拍免费中国国语 | 亚洲国在线观看网| 日本三级韩国三级香港三级首页| 日韩αv精品| 国产爱导航在线| 可以免费看黄不卡的网站| 久爱www免费在线播放| 国产精品麻豆久久久| 大黄香蕉在线视频| 又紧又爽又水的免费视频| 欧美又大又粗又硬又长视频| 国产精品亚洲А∨天堂2021| 丁香婷婷激情亚洲| yw193尤物最新网站| 永久免费 亚洲| 又色又刺激的视频网站| 亚洲国产乱| 国产制丝袜美腿免费视频| 亚洲国产爱A∨在线观看| 99视频精品免视看| 超级碰碰青草免费视频APP| 久久女人视频| 888米奇四色狠狠下| а天堂最新版在线网| 菲菲学院亚洲欧美激情片| 亚洲一区二区免费电影| 看片A免费观看视频| 久久夜色精品国产网站| 成人性动漫在线观看18| 成熟女人网站| 亚洲性爱一区AⅤ| 国产igao视频网在线观看| 欧美ai精品视频| 最全黑人AV导航| 在线国产尤物导航| 波多野结衣免费视频一区二区三区| wwwxxxx国产| 曰韩欧美亚洲美日更新在线| 亚洲成a影院| 强行挺进朋友漂亮的娇妻| 嫒呦碧在线观看| 在线2022Av高清| 青青青亚洲视频播放| 成a人片免费网址| 久久国产综合激情对白| 在线国产精品色| 在线观看日本免费小视频| 玖玖资源站AV最稳定网址| 99久久综合狠狠综合久久止| 亚洲一区二区三区美女| 99久久er热在这里都是精品99| 中文字幕在线播放第二页| 亚洲原创国产AV| 三上悠亚日韩精品一区二区三区| 好男人在线社区www在线观看 | 国产簧片免费在线播放| 亚洲欧美中文字幕在线一区 | 国产欧美日韩在线精品| 国产91白浆四溢| 国产精品自在拍在线播放| 精品福利视频免费一区二区| 日本高清中文字幕免费一区二区| 国产交换配乱婬视频a免费| 91青青草原免费观看| 亚洲国产精品特色大片观看完整版| 精品国产亚洲福| 国产精品一区二区三区高清在线| 激情婷婷丁香9月色综合| 国产精品半夜| 亚洲激情婷婷| ai杨幂被弄高潮在线观看| 阿娇13分钟坐爱视频| 国产麻豆第一页在线观看| 90久久京东热加勒比一二区| 国内情侣作爱视频网站| 黑人freesex又粗又大又长| 无遮挡又爽又刺激的动态图| 久久国产伊人免费| 小泽玛利亚 一区二区| 99久久er热在这里只有精品15| 一级一片高潮在线观看| 久久综合九色综合欧美| 色五月丁香六月欧美综合| 久久国产美女| 女同互添下身视频在线观看| 手机在线观看你懂得| h娇喘喷水抽搐高潮视频| 最新国产蝌蚪视频在线播放不卡| 原创国产AV剧情丝袜秘书| 高清成年美女网免费视频| 亚洲日本欧美日韩高观看| 一区二区三区精油按摩| 亚洲无线观看国产精品| 无遮挡H动漫在线播放| 亚洲欧洲久久精品| 国产在线精品一区二区三区不卡| 久久麻豆国产精品| 激情综合色婷婷| 疯狂放荡的少妇作爱视频| 亚洲最大的激情4438五月| 国产精品草莓视频下载| 十八禁污视频男男| 午夜男女XX00视频福利| 久久夜视频| 国产午夜AV秒播在线观看| 呦女亚洲| 卡通动漫第一页AV天堂| 国产日产久久高清欧美一区| 在线播放观看gv人成视频免费| 无遮挡又黄又高潮的视频| 久久高潮影院| 日日放夜夜高潮| 好大好湿好硬顶到了视频| 18以下勿进色禁网站大全视频| 一卡二卡三卡四卡网址| AV春色天堂| 成年男女免费视频网站无毒| 色老汉免费网站免费视频| 国产后进白嫩翘臀在线视频| 中文字幕一二三区不卡2021| 久久HEZYO久综合亚洲色| 视频午夜福利2000| yy111111少妇影院永久版| 国产91r桃色| mm1313亚洲国产精品软件 | 极品中国少妇被黑| 阿v天堂在线观看| 美女高潮流白浆一区二区三区视频| 亚洲va久久久噜噜噜久久4399| 天天狠天天透天干天天怕| 97热久久免费频精品99| 中文字幕精品区| 欧美男射女视频| AV青青草原国产| 尤物国产在线精品福利三区| 视频中文 在线 日韩 亚洲| 999视频精品全部免费品| 国产免费污网无遮挡在线观看| 国产清纯美女遭强到高潮动漫| 亚洲色哟哟在线| 亚洲国产精品九月天| 2019最新亚洲中文字幕| 2020国自产拍国产小视频| 强奷妇系列中文字幕| 国产盗摄经典盗摄| 坐下来自己慢慢摇| 亚洲啊啊啊视频在线| 99久久精品国产免费看| 中国免费xxxx视频| HD女人奶水授乳milk漫画| 极品少妇高潮在线观看| 久久99国产这有精品| 亚洲欧美日韩在线观看A三区 | 国产精品欧美精品污污| 麻豆精品国产免费观看| 中文字幕天堂中文| XX娇小嫩XX中国XX| 久久婷婷大香萑太香蕉AV人| 最近最新中文字幕大全在线| 影音先锋va色资源站| 丁香五月激情视频| 欧美色a∨视频| 国产私拍福利精品视频推出导航| np在线视频欧美日本在线| 豪妇荡乳1一5潘金莲| 午日日日夜夜夜| 老师让我脱她乳罩摸她乳视频| 又色又硬又黄的美女视频| 2021最新手机在线观看视频| 免费人成网ww555kkk在线| 午夜福利频道国产区| 亚洲成国产人片在线观看| 亚洲国产美国国产综合一区| 亚洲AV一本岛在线播放| 2020亚洲а∨天堂在线观看| 国产欧美日韩在线观看一区二区| 欧美处破摘花精品| 五月天婷婷网亚洲综合在线| free性欧美hd另类精品| 亚洲欧美日韩愉拍自拍美利坚 | 波多野结衣AV久草| 亚洲 日韩 蜜| 国内久久久免费视频| 国产欧美丝袜在线二蜜芽TV| 自拍另类一区| 国产午夜精品自拍| 玖玖资源站AV最稳定网址| 变态另类~第1页| 亚洲最大4438| 变态另类久久变态变态| 天天爱天天做天天爽2021| 久久久久久精品免费免费WEⅠ| 在线观看国产网址你懂| 曰批女人视频在线观看免费| 污网站在线网页免费观看| 黑人大荫蒂bbwbbb| 国产高潮在线看| 国产小U未发育视频| 亚洲不卡女人视频| 在线看视频你懂得| 国产白丝网站精品污在线入口| 亚洲欧洲日产韩国综合| 又又又爽又黄的美女网站| 国产区闺蜜来| 三上悠亚无蔗挡网站| 亚洲国产婷婷综合在线精品| 特黄特色三级在线观看| 国产xxxx69真实实拍| 最新日本a∨中文字幕专区| 亚洲天堂网在线www在线资源| 国产美女精品视频线播放| 思思青青人人草热视频| 九九99线视频在线观看| 亚洲囯产一区二区三区| 免费看亲胸揉胸膜下刺激视频女| 中文字幕久久青青| 舌头伸进我下面很爽的动态图| 国产黑色丝袜视频在线观看| A级成年网站| 韩国激情高潮无遮挡hd| 国产人成精品午夜在线观看| 永久AⅤ网站| 亚洲AV色先锋资源电影网站| 在线日亚洲欧美视频| 亚洲精品m在线观看| 超级乱婬Av片免费网站| 东方AV在线点击进入| 呦泬泬精品导航| 好爽好硬好大偷拍视频| 五十路熟女久久| yy111111少妇影院动漫| 视频午夜福利2000| 欧洲无人区天空码头iv| 久久久久久久岛国免费AⅤ片| 精选国产日本韩国三级| 午夜福利000| 波多野结衣亚洲图片| 亚洲成aⅴ人片久青草影院 | 一进一出抽搐欧美| 7777Av在线| 一区二区三区精品免费视频| 6080YY理论三级在线看视频| 欧美 亚洲 精品 三区| 黑人巨大白妞出浆| 国产美女流白浆| 丝袜久久剧情精品国产| 日本少妇晚上拍拍拍拍| 尤物yw在线视频| 国产美女高潮流白浆视频网站| 与女乱目录伦视频在线观看| 又黄又硬又爽又色的视频| 2021AV天堂网手机版在线| 欧洲Av天堂| 国产一区二区二三区| 亚洲国产日韩在线人| 亚洲 中文 女同| 中文资源AV在线| 999热久久这里只有精品| 久久久久久精品免费亚瑟| 国产美女高潮流白浆视频网站| 国产va免费精品高清在线| 亚洲性人人天天夜夜摸| 丁香婷婷激情综合俺也| 国产福利免费爽爽视频| 无套粉嫩国语对白| 五月天丁香视频在线精品 | 50妺妺窝人体色www| 丁香婷婷激情九月视频| 日韩欧美一区二三区风间由美| 亚洲涩社区| 少妇 午夜 福利| bt在线一区二区| 波多结衣中文字幕| 99久热国产精品视频尤物| 正在播放国产强啪| 性XXXX欧美老妇胖老太肥婆| 伊香蕉大综综综合久久| 国产精品老女人精品视频| 婷婷 五月 香蕉| 最新手机国产在线小视频| 好爽好紧免费30分钟视频| jizz韩国日本护士| AV性天堂高清在线观看| 99综合狠狠| 国产免费AV片在线观看下载| 日韩欧洲免费视频一区| 欧亚熟女手机视频| 久久香蕉精品香蕉| 室友内裤被涂满了春药| 国产成年人拍拍免费视频| CHINESE乱子伦XXXX视| 视频精品在线| 国产尤物yw在线观看| 久久厕所偷窥视频| 95偷拍午夜Tv视频| 尤物视频在线| 国产免费的野战视频| 美女被啪到深处抽搐视频| 久久国产偷任你爽任你| 久久1024| 狼人视频国产在线视频www色| 黑森林精品导航AV网| 亚洲AV最新天堂网址| 狠狠操免费视屏| 131美女爱做高清免费视频| 最刺激的乱惀小说喷水| 国产交换配乱婬视频偷最新章节 | 草裙社区免费视频一二三区| 萌白酱国产一区在线网址播放| 欧美性爱在线| 西西人体444wwf高清大但| 久久精品91麻豆| 国色精品专区精品亚洲| 国产精品十区视频| 印度人粗长硬強行配种| 久久精品亚洲男人的天堂| 在线日韩欧美国产二区| 交换配偶国产精品| 26uuu欧美一区| 在线视频男人的天堂| 午夜福利推油按摩电影| 中文字幕乱换在线| 欧美va天堂吧| 潮中文字幕在线观看| AV日韩精选| 亚瑟国产精品久久| 国内永久福利在线视频| 国产片尤物网站在线观看| 西西人体午夜视频| www日韩欧美| 好多水好硬好紧好爽视频下载| 大成色站www| 丝袜国产在线| 在线观看美女白浆| 国产一卡二卡三卡四卡视频| 另类老熟女hd| 精品国产清自在天天线| 99久久狠狠| 亚洲午夜AV福利| 国产69视频在线播放| 午夜性刺激影院| 亚洲 欧美 日韩 综合AⅤ电影| 欧美精品v欧洲精品| 亚洲 国产 小屁孩| 888米奇在线视频四色| 亚洲精品小说视频高清| 国产偷国产偷亚洲高清日韩| 国产精品亚洲欧美卡通动漫| 又黄又爽的动漫视频网站| 久久99亚洲精品片片| 高清国语自产拍免费| 久久久午夜精品理论片| 二级人成电影免费| 国产免费人成视频在线观看播放| 92免费午夜福利1000合集| 久久大香香蕉国产免费网动漫| 国产放荡社区AV剧情演绎| 豪妇荡乳1一5潘金莲| 888米奇色狠狠俺去啦| 亚洲精品高清国产一线久久| 把日出女人白浆免费视频| 亚洲日本aⅴ片在线观看| 天天做天天摸天天爽天天爱| 污视频在线观看网站| 免费一级最婬荡视频| 高潮gif欧美一区| 亚洲欧洲日韩综合一区在线免费| 欧美性爱在线播放不卡| 伊人久久综合视频| IGAO视频天堂给爱激情| 污污污厕所白慰喷水网站| 精品国产自在精品国产精华天| 波多野结衣AV大高潮在线观看| 亚洲精品少妇30p| 好吊色午夜免费在线观看| 荔枝视频在线观看免费最新 | 久久综合亚洲一区云霸| 天天躁夜夜躁狠狠久久| 久久国产精品一区二区三区| 日本强奷中文字幕在线播放| 尤物yw193国产免费观看| AV女人高潮大喷水在线看| 久久久久久久久美女| 国内精品自线在拍2019不卡| 超黄超污网站| 97偷自拍亚洲综合| 456亚洲人成影院在线观看| 2020国自产拍精品网站不卡| 丁香五月网久久综合| 不卡无遮挡H肉动漫在线播放| 97久久久免费观看| 把腿张开我要添你下面| 欧美XX00后进式在线观看| 国产影院欧美性爱在线观看| 国产国产人在线成免费视频狼人色| 主播视频 国产一区| 天堂在线精品亚洲综合网| 真实处破女AV| 亚洲欧美日韩综合一区二区| 国产女与黑人在线精品| 日产国产欧美高清一区| 欧美性大战久久久久久i一| 久久精品欧美日韩精品手机放 | 又色又爽又黄又浪视频网站| 亚洲男同免费播放网站| 成 人 网 站 免费 在 看| 国产高清乱理伦片中文| 关晓彤高潮18以下禁止观看| 中文字幕aⅤ在线| 亚洲大肥熟女四五十路| 孕妇流白浆喷水视频| 岛国国产在线观看网站| 粉嫩国产尤物在线| 午夜福利日本女| 亚洲欧美日韩在线一区二区三区| 天堂在线看a| 五月天伊人久久大香线蕉| 3344永久在线观看视频| yy11111111手机版少妇`院| 老色AV综合| 性欧美乱妇come| 精品视频网站午夜| 白丝制服在线播放| 国产大胆露出在线视频| 日本亚洲语音视频在线观看 | 国产A久久精品| 麻豆久久婷婷国产综合五月| 国产chinese男男gay视频网| 日本大乳高潮视频在线观看| 中文字幕韩国激情视频网站| 9l国产精品久久久久尤物| 国产午夜亚洲精品不卡下载| 最新91在线老师啪国产| 欧美交换配乱吟粗大视频| 亚洲人成色7777在线观看| BBw下身丰满少妇18XXXX| 尤物动漫视频在线观看网址| 野外巨胸喷奶水视频www网站| 亚洲无线观看国产精品| 亚洲成a 人片| 张开腿无遮挡欧美| 亚洲国产免费男男| 制服丝袜中文字幕在线| 美女胸又www又黄的网站免费| 少妇太爽了在线观看| 999福利精品久久久| 看片在线高清在线观看| 3D黄动漫在线h| 国产无套双飞露脸两女在线高清| 最新久久国产AV| AV天堂永久网| 日日摸日日碰日日狠狠| 熟女毛茸茸bbw、bbw| 在线国产尤物导航| mm1313国产精品久久| 西西人体扒开下部试看120秒| Av不卡一区在线看| 国产一区私拍| 在线免费观看毛多多又长又黑| 亚洲成 人 综合 亚洲欧洲| 久操五月天| 粉嫩虎白女p虎白女在线| 亚洲成女人影观| 嫩模白浆| 久久天堂色| 白丝袜美女脚交国产精品 | 成 人 黄 色 网 站 视频麻豆8| 国产高清AV首播原创麻豆| 99久久99久久| 网站在线观看污| 2020国产精品永久在线| 成年视频一区二区三区| 囯产激情360盗攝一区| 国产女精品视频网站免费蜜芽 | 狠狠se网站| 妺妺窝人体色WWW视频| 成a人片在线视频观看| 极品少妇被猛的白浆直流| 国产欧美va天堂在线观看视频| 国产激情导航| 亚洲老熟女 @ tubeum tv| 日本高清中文字幕在线观线视频| 在线午夜视频网| 中国女人679xxxx高清视频 | 欧美国产一区二区三区| а√天堂网在线亚洲www| aa站在线观看免费| 国产精品性爱| 欧美日韩视频777888| 真实乱子伦露脸自拍| 亚洲成aⅴ人在线观看| 国产一级特黄大片全集在线观看 | 中国老太婆grdnnytube| 中国小younv女younv网站| 在线免费白嫩紧| 亚洲国产女人AV| 白嫩极品女粉嫩喷水视频的 | 成AV人色在线观看| 啊慢点太深了国产在线视频| 国产很爽的超薄丝袜脚交网站| 久久与与欧美视频| 中文字幕你懂的| 成AV日韩精品| 国外欧美一区二区久久| GAY18高中生白袜XNXX动| 老师穿着旗袍肉色丝袜让我玩| 性色aⅴ在线观看免费| 欧美午夜福利视频| 12—14幻女WBBXXXX在线播放| 九九久久香蕉| 久久精品女人天堂AV下载| 在线观看网站深夜3D催精| 粗大猛烈进出高潮免费看| 韩国AV高清在线观看完整版| 亚洲最大激情中文字幕| XXX国产精品XXX| 福利导航大全福利视频| 国产高清拍AV在线| 亚洲图片手机视频手机小说| 极品中国少妇被黑| 免费a级作爱片免费观看美国| 欧美又粗又大免费视:| 亚洲人成色7777在线播放| AV天堂 手机在线观看| h动漫无遮挡成人h视频| h福利小视屏| 夜夜春视频| 亚洲 中文 欧美 日韩 在线| 精品久久中文无| 男人的j桶女人免费网站| 亚洲 中文 女同| 国产高清吹潮免费视频| 午夜在线网站观看小舞被操| 久青草国产97香蕉在线影院| 81cao草棚gao进入| 日本亚洲色大成网站www久久 | 自拍偷区亚洲及综合第一页| 理论片福利片在线观看欧美| 国产乱子伦在线观看| 亚洲私人在线观看| 青青草原伊人AV网| 日韩在线24视频| 亚洲激情四视频| 国产免费网址看AV片| 亚洲成年女人AⅤ| 97夜夜澡人人双人人人喊| 伊人久久综合色97| 中文字幕欧美日韩一区二区三区| 午夜福利在线观看| 亚洲呦齿在线观看| 国产小电影网站在线观看| 一本大道一卡二卡免费视频| 337p日本大胆顶级欧美艺术| h片网站永久免费| 国产精品1024永| 91白浆视频在线| 中国国产高清免费AV片爱| 粉嫩高中生第一次不戴套| 在线丝袜制服性爱视频| 日本口工全彩生肉无遮挡| www伊人久久| 成人夜晚爱做免费观看| 狠狠cao高清免费| 国产真实交换免视频| 午夜激情福利网| 2020国产在视频线自在拍情侣| 按摩午夜福利片| 男人的j把女人的j桶爽了| h色在线免费看| 国产无套进入| 亚洲第一尤物视频在线观看导航| 最好在线观看免费韩国日本电影| 欧美多毛xxxxx性喷潮| 国产又色又爽又黄刺激在线视频| 日本韩国免费一区| 亚洲欧美人兽在线| 成 人 H动 漫在线播放日本| 亚洲永久免费网站在线观看| 色香欲天天天影视综合网| 又爽又黄又无遮挡的激情照片| 下面好紧好想添在线观看| 亚洲视频区蜜月| yw193 国产在线| 免费网禁国产you女网站下载| 777成影片免费观看2020| 中文字幕乳大人与日本| 777电影院米奇888| 国产高h视频| 日本gifXXOO动态图片| 好紧好痛的视频| 在线综合亚洲欧美日韩| 国产精品va尤物在线观看| 波多野结衣好大好紧| 亚洲a∨人成网站高清| 久久亚洲综合精品99国产| 在线看揉美女胸的免费网站| 久久久久国色免费| 伊人五月网在线视频| 国产美女高潮流白浆的视频| 白浆高潮在线| 5858S亚洲色大成网站WWW| 最新精品视频2020在线视频| 国产高清自产拍Av在| 亚洲Av手机在线观| 国产一级高清| 久久精品国产黄瓜| 国产午夜福利在线小视频| 日本丰满少妇一区二区| 久久久久久久69堂| 色爽爽网站免费看| 高潮中出在线资源| 中文字幕aⅴ第一页在线| 女教师巨大乳孔中文字幕| 193尤物视频com| аⅴ天堂亚洲最新版在线中文| AV色爱天堂网| 肏屄日出白浆视频| 亚洲偷少妇熟女| 久久久直播三级| 精品国产爱在线观看| 在线观看污网站| 俄罗斯午夜福利片在线观看| 色波多野结衣AV在线| 国产挤奶水主播在线播放| 粉嫩小泬图片国产20p| 国产麻豆精品第一页| 国产精品亚洲片在线VA| 免费午夜福利电影网| 老司机久久一区二区三区| 国产深喉口爆吞精在线观看| 香蕉国产线看观看| 有码高清在线视频| 亚洲欧美日韩一区在线观看| 思思久久美女软件免费观看| 九月丁香婷婷综合一区 | 久久久久精品国产99久久综合| 在线视频五十路| 国产嗷嗷叫高潮视频| 国产在线草莓aⅴ精品| 久久精品久久久久久不卡齐齐| 国产鲁鲁色| 国偷自产第一区| 浮力影院草草影院CCYYCOM| 亚洲色大成WWW永久网站| 午夜色午夜视频之日本视频| 久久69中文字幕精品| 天啪天啪综合在线视频| 国产三区在线播放| 欧美ai精品视频| 真实国产乱子伦精品一区二区三区| 粗大猛烈进出高潮网站| 波多野结衣紧身裙丝袜AV| 内地china麻豆videos| 99香蕉国产精品偷在线观看| 五十路熟女中出| 久人人爽人人爽人人片AV| 五月天性爱在线观看| 宅男噜噜噜66网欧美| 伊人五月天视频播放| 8x8ⅹ在线永久免费视频| 五月丁香六月狠狠爱综合| 2022免费在线视频网站| 很黄很色很刺激的免费网址| 好大好硬好湿免费视频| 国产在线欧美一区二区| 在线观看污网站| 亚洲中文字幕一本久道热线在线| 国内少妇自拍区视频免费大全| 综合在线视频精品专区| 中文字幕 第2页| 亚洲日韩欧美一区二区三区| 大香伊蕉在人线国产网站47| 波多野结衣好大好紧| 就是色国产成 人 综合 亚洲| 欧美人最猛性XXXXX| 又大又长又硬的免费视屏| caoporn视频在线观看| 国产一级熟女高清| 伊人熟女少妇| 国产亚洲aⅴ在线观看 | 好爽好刺激的视频在线观看| 高潮爽死痉挛白浆视频| 一个人www视频在线国产| 国产亚洲美女精品久久| 国产AV大学生情侣AV| 亚洲精品456播放| 亚洲第九色区在线视频| 自拍肉在线观看| 日本在线h一区| 麻豆娇妻偷情视频| 久久午夜张柏芝| 两根黑人粗大噗嗤噗嗤视频| аⅴ中文天堂最新版在线网站| 自拍午夜精品| 伊人久久大香线视频| 伊人精品久久久大香线蕉| 粗了大了整进去好爽视频| 欧美人成网站免费大全| 欧美色哟哟激情在线视频| 亚洲综合图区在线| 不卡的高清AV影院| 久久综合色中文字幕| 国产亚洲综合| 亚洲婷婷狠狠爱激情综合基地 | YW尤物在线精品视频| 成 人 黄 色 网站 视频免费| 老太bbwwbbww高潮| а√天堂网在线亚洲www| 粗了大了整进去好爽视频| 在线看亚洲十八禁APP| 新婚少妇无套内谢国语播放,| 国产午夜一级| 成年最新天天夜夜人人| H片在线播放免费观看| 污网站在线观看!| 中文字幕国产精品久久久| 婷婷久久香蕉五月综合网| 黑人粗长五月天在线观看| 丁香五月七月综合激情| 安眠药扒开女同学双腿玩弄| 美女AV网址| 两根粗大噗嗤噗嗤欧美精品| 亚洲大学生视频在线播放| 久久国产精品激情视频| 中文字幕日韩精品一区二区三区| 女教师大口吞精视频| 亚洲精品国产品国语在线| 国产00高中生在线无套进入| Jizz国产色系免费| 国产免费制服丝袜AV| 国产在线拍揄自揄视频网站| 无遮掩小视频国产在线观看| 青青草原综合久久大伊人精品| 亚洲一区二区三区草莓| 亚洲国产日韩一区二区在线| 亚洲美女综合久久| 动漫精品一区二区三区视频| 在线观看sm重口一区二区| 深夜XX00美女高潮视频| 在线观看手机AV网站| XXⅩ中国免费视频| 国产大学生AV片在线观看| 337p日本大胆欧美人术艺术69| 国产h在线看| 亚洲色动态视频| 1024你懂的国产国语| 美女视频黄频a免费高清不卡| 香港三日本三级少妇三级2021| 无套中出白嫩少妇| MM131美女黄的久久免费视频| 国产精品永久免费嫩草研究院| 国产黑色丝袜小视频在线| 开心五月丁香花综合网| 我和亲妺在客厅作爱视频网站| 天天夜夜人人爽| 最新久久少妇发布中文地址| 东方AⅤ免费观看久久AV| 欧美 亚洲 精品 三区| 综合亚洲无线码另类MP4| 一本大道久久精品| 亚洲天堂网www在线| 嗯啊国产视频| 国产愉拍视频在线观看| 亚洲第一网站男人都懂2021| 16处破外女视频在线观看| 又粗又大又紧又爽视频| 亚洲熟女少妇视频| 成年女人观看永久免费视频| 久久福利青草狠狠午夜| 波多野结衣被干视频| 精品国际久久久久999| 在线国产视频有码字幕| 天天爱天天做天天爽| 巨大videos喷水| 8x免费视频国产专区| 2018天天弄国产大片| 国产AV剧情遭闺蜜嫉妒陷害| 亚洲va韩国va欧美va| 亚洲人成网站观看在线播放| 一级少妇日本| 一区二区二区视频磨豆腐| 亚洲欧美日韩在线| 波多野结衣AV一区二区在线观看| 精品丝袜国产自在线拍高清| chinese多毛丰满video| 多人强伦姧孕妇免费看| 国产精品福利精品福利短片 | h高潮娇喘抽搐喷水视频免费| 亚洲色伊人久久综合| 青青草原宗合久久大伊人精品视频| 69老司机在线视频| 白丝袜足j丝袜在线观看| 色爽网免费视频| 特级婬片女子高清视频色| 国产91AV免费播放| 国产亚洲视频在线观看| 亚洲天天堂在线观看| 天天做天天欢摸夜夜摸狠狠摸| 免费国产乱理伦片在线观看| igao在线观看视频网| 一区二区免费视频| 成 人 网 站 在线668| 可以免费看AV的网站| 少妇夜夜春夜夜爽夜夜叫| 粗大猛烈进出高潮免费看 | 亚洲乱码尤物193yw影院| 日日摸处处碰夜夜爽| 综合在线观看| 很很鲁在线视频97很很鲁在线视频综合| 997 久久 国产| 又黄又爽又大又猛| 亚洲精品老司机在线观看| 国产欧美现场VA另类| 久久这里有精品国产电影网一区二区 | 春药刺激国产老富婆露脸 | 七月丁香八月婷婷| 亚洲Av男男| 精品午夜福利在线视频| 国产麻豆aⅴ尤物网站尤物| 亚洲成A√人片在线| 国产脚交榨精视频| 强 暴 处 疼 哭 身子视频| iGAo视频网站在线观看| 精品波多野结衣AV字幕在线 | 国内精品久久久久国产盗摄| 手机在线播放AV满18| ass日本少妇p| 男人桶爽女人30分钟视频| AV免费试看| 狠狠久久久| 最近最新高清中文字幕大全| 亚洲综合图片区自拍区| 国产在线qv| 亚洲久本草在线中文字幕| 2020免费国产a国产片高清| 九九九九精品视频在线观看| 邪恶网址在线观看免费喷水| 久久久免费人成| 久久免费视频6| 国语乱码中文字幕| BBBBBXXXXX偷拍| 色窝窝手在线视频| ww视频在线观看免费播放| 日批视频在线| 中文字幕在线看| 好吊妞在线免费视频| 日本激情中文有码| 又长又粗又大黑人免费视频| 国产灌醉视频免费观看| 888米奇影院奇米网狠狠| 日本高清在线观看免费视频| 2022国产高潮在线观看| 福利片国产在线观看| 5678电影网午夜理论片| 尤物日韩在线| 69天堂在线观看免费视频| 宅男噜噜噜66网站| yy111111少妇影院喷水仙踪林| 五月丁香花激情综合网| 影音先锋色AV| 又爽又黄又无遮| 真人作爱试看120分钟30| 亚洲熟女一区二区三区| 国产精品视频二区不卡| 2022在线精品视频网站| 久久香蕉国产线看观看网| 21AV少妇导航| 九月婷婷亚洲综合成人| 综合五月婷婷麻豆| www日日夜夜操| 99色综合| 把老师下面日出水视频| 日韩亚洲国产综合ΑV高清 | 伊人久久大香线蕉AV一区二区| 在线观看亚洲十八禁网站| 伊人狠狠综合| 狂野欧美午夜福利视频| 国内自拍日韩| 久久女视频| 在线观看国产福利91啪| 久久天天谢日日谢| 国产高清午夜国产精品| see国产精品免费| 网友自拍露脸国语对白| 国产无套乱子伦精彩是白视频高清下载| 高清成年美女网免费视频| 不卡的高清AV影院| 在线 日韩 欧美国产社区| 性色AV网站在线观看| 女人脱裤子让男人桶到爽| 亚洲第一天天久久| 久久99精品福利久久久| 99久久er热在这里都是精品66| 国产美女高潮流白浆免费视| 伊人心综合视频| 伊人久久大香线蕉综合直播| 奇米影视777奇米弟四色| 24小时日韩视频在线免费观看 | 亚洲有码视频一区二区| 日本福利导航视频| 国产chinesehdxxxx老太婆| 窝窝影视午夜看片免费| 欧美日韩精品一区二区三区高清视频| 高清A级在线观看视频| 青青草原精品视频美女大全| 国产美女被遭高潮免费视频 | 波多野结衣与乡下老人在线| 激烈的性高湖波多野结衣| 99RE热视频这里只精品4| 东北女人毛多水多视频| 国产精品尤物铁牛tv| 欧美13一14娇小性| www色多多色亚洲色| 粉嫩美女在线看网站| caoporm超免费公开视频| 麻豆久久国产综合五月| 亚洲依依成人精品| 亚洲天堂AV有码在线| 国产丝袜视频网| 国产肛交精品| 69免费性爱视频| 国产露出视频在线观看| 你懂的欧美视频在线观看| 国产卡1卡2卡3麻豆精品免费| 中国性BBBBBxxxxx另类| 在线观看免费视频高h| 日日爱 小视频| 亚洲—日韩aV在线| 午夜精品网址| 亚洲国产真实视频网站| 粉嫩高中生第一次不戴套| 喷潮视频| 国产伦精品一区二区三区| 亚洲国产欧美在线观看| 极品白人性网站| jizz国产大全免费关看| 国产永久免费BBw| 亚洲欧美高清一区二区三区| 国产最新视频网| 风流老太婆大BBwBBWBBWHD| 小蝌蚪亚洲精品国产| 国产欧美18p| 中国毛茸茸BBXX| 插插网天天影视综合网 | 国内在线一区二区三区| 色窝窝亚洲AⅤ网| 免费精品国产一区二区三区| 日本口工全彩生肉无遮挡| 俄罗斯13一16泑女| 国产AV高清网站| 成年网站免费拍拍拍拍| 在线观看亚洲你懂得| 精品国内在视频线2020视频在线看| 黄文一区免费| 亚洲精品456在线观看| 又黄又爽做受视频免费看| 亚洲a在线v| GOGO全球大胆高清luo模在线电影免费视频| 在线视频资源你懂的| 韩国免费a级作爱片试看| 最新日韩AV电影久久网| 香港曰本韩国三级网站| 美女的胸又黄又www网站| 夫の上司に犯波多野结衣| 欧美成 人影院在线观看| 亚洲欧美一区男男| 91青青草原视频| 2022AV网站在线播放| 999热在线精品免费观看| 久久久久久久岛国免费AⅤ片| 伊人久久大香线蕉综合网| 国产精品人成在线观看| 亚洲VA在线∨A天堂VA欧美V...| 一级欧美日韩理论片| 少妇高潮流白浆在线观看| 又黄又涩又高清免费视频网站| 囯模人体直播在线不卡| 欧美老妇BBB| 粉嫩无套白浆第一次| 国产红杏AV在线网站| 日本中文鲁啊鲁在线播放| 黑人video粗暴亚裔娇小| 97色伦在色在线播放三级| 国产短视频在线天堂| 国产精品自在线天天看片| HD女人奶水授乳milk电影| 国产AV二女共侍一夫| 少妇有码在线观看| 国产又刺激又黄又免费的视频 | 日本大综合色| 中文字幕亚洲分区| 国产乱子伦真实| 天堂αv亚洲αⅴ国产αv| 亚洲精品你懂的| 2020年国产最新在线观看| 欧美大黄免费观看视频| 卡通动漫第一页AV天堂| 亚洲国产欧洲综合997久久| 亚洲色大成网站www久久九尤物| 国产又爽又黄又无遮挡网站| 精品三及片网上| 热の国产AV| 大又大粗又爽又黄少妇毛| 白丝jk国产一区二区三区| 日本亚洲语音视频在线观看| 国产农村一级精品视频| 亚洲爱V天堂| 97se亚洲综合在线| 久久青草精品免费免费| 熟女老女人的网站| 亚洲色拍,国产,另类| jk女高中生捆绑调教正在播放 | 又湿又紧又大又爽A视频| AV网站在线观看天堂| 真人高h肉肉视频在线观看| 精品国产免费人成网站| 亚洲色丰满少妇高潮18p| 日韩一区二区三区中文在线观看| 成 人 色综合 综合网站| 97人人添人人澡人人澡人人澡| 好男人社区久久www| 国产精品美女白嫩在线播放 | 欧美日韩一本二本三本在线观看| 亚洲最大视频网站| 2021久久这里有精品免费| 中文字幕无线观看免费 | baoyu永久免费视频网站| 女人露P毛视频·WWW| 国产性生活爱视频| 日日摸夜夜狠狠| 亚洲日本va中文字幕在线直播| gv在线看片| 成人精品免费视频在线观看| 久久久久a v| 进去粗粗硬硬紧紧的好爽在线视频 | 91视觉盛宴在线| 800中文字幕在线免费观看| 色窝视频在线在线视频| 老司机午夜视频十八福利| 亚洲人成18| 亚洲乳头视频不卡| 午夜情视频午夜性视频| 自拍欧美人类综合在线| 聚色我要色综合| 西西人体444www高清大但| 国内精品久久久久久久久齐齐| 亚洲人成色7777在线看| 天堂俺去俺来也WWW色官网| 欧美日韩一级夜添| 国产大学生真实视频在线| 伊人天天躁夜夜躁狠狠| 亚洲综合色在线视频www| 岛国AAAA级午夜福利片不卡| 日本成人字幕在线不卡| 中国XXXX片免费| 国产美女视频一区二区三区| 久章草在线视频免费观看| 97午夜理论电影影院| 孕妇网站在线观看导航| 变态另类调教在线视频区| 国产精品一区二区在线观看| 小泽玛利亚久久一区二区三区| 午日日日夜夜夜| 在线观看免费的你懂的| 正在播放国语对白| 国产高清 在线播放 26uuu| 国产邪恶精品| 亚洲综合天天夜夜久久| 又湿又紧又大A视频| 影音先锋男人网| 亚洲欧美不卡高清在线观看| 国产午夜小电影网站| 天天爽天天摸日本一区二区 | 国产强奷视频在线观看| 久久综合九色综合欧美狠狠| 欧美毛多水多日本一区二区| 国产乱码一二三区四区| 浪潮一区国产| aaaa性bbbb欧美| 成 人 黄 色 网 站 视频麻豆8| 24小时日本中文在线| 性欧美长视频免费观看不卡| 青青草原视频在线观看网站| 加勒比久久综合网高清| 岳的水多肥嫩鲜艳| 在线 欧美 h 三区| 亚洲美女不天堂| 国产在线第一区二区三区| 极品美女国产精品| 羞羞午夜男女爽爽成人影院一| 亚洲精品aⅴ在线观看| 91欧美影院| 激情综合婷婷丁香五月尤物| 在线人成精品免费视频| 久久久国产视频| 色哟哟专区在线观看| 国产黑色丝袜在线看网红| 一级午夜福利| 亚洲中文字幕日产喷水| 大香网伊人久久综合网2018| 在厨房被强行侵犯中文字幕| 992tv在线视频| 久久久888精品视频| 欧美精品一区二区三区免费| 92国产午夜福利1000集20| 一本大道香蕉久在线播放看| 自在线看精品国产高| 亚洲小说图片综合在线专区l| 国产日产欧美A一集在线观看| 自拍偷拍1区2区3区不卡| 波多野结衣AV在线不卡| 在线观看国产一区二区三区| 国产成片一卡二卡三卡| 久久艹国产麻豆| 2021一级α片免费看| 色窝窝蝌蚪在线精品免费| 影音先锋va色资源站| 国产精品igao视频| 2020最新国产在线中文不卡| 异族女人性变态另类| 日本成本人片免费久久| 亚洲另类在线一区二区三区| 日韩AV免费| 国产农村乱子伦精品视频| 太大了太粗到底了h视频| 电视剧 第一页,欧美换爱交换| 一次迷晕三个国产| 亚洲综合一区自偷自拍| yw网站在线观看不卡| 欧洲色一欲网| 亚洲 中文 字幕永久免费| 午夜日本理伦中文字幕片| 国产美女粉嫩无套在线观看| 深夜刺激免费网站| 亚洲精油在线观看| 高中美女粉嫩视频福利在线| 永久天堂网Av手机在线| 国产精品99爱免费视频| 伊人大香线焦狠狠鲁的视频最快| 在线日本妇人成熟免费| AV女人天堂| 最近更新中文字幕第1_| 2019最新国产不卡a精品2018| 国产微拍精品一区| 欧美老肥妇多毛xxxxx| 尤物视频在线导航| 国产美女高潮流白浆的视频| 日本男女交性一区二区三区| 亚洲综合网986| 欧美白人最猛性xxxxx| 久久思思97视频| 亚洲国产午夜精品理论片| 国产精品福利精品福利短片| 国自产偷精品不卡在线| 在线视频播放免费视频| 又黄又刺激色多多| 2019国产品在线视频| 户外露出视频第一区| 国产自国产自愉自愉免费24区 | 国产杨幂AV在线播放| 国产精品福利免费视频不卡| 69堂网址在线影院| 操我在线播放导航| 午夜时刻免费实验区观看| 综合亚洲二| 白丝在线喷水免费| 高跟黑色丝袜国产在线s神| 亚洲一区二区精品观看| 国产一区二区欧美激情| 2020天天色| www.五月天| 永久天堂网免费2021AV手机版| 老色鬼在线精品视频| 波多野结衣高潮感受| 2020久久国产综合精品swag| 亚洲第一视角不卡高清在线| 白丝美女被内谢流白浆视频| h在线观看视频免费网址| 国产亚洲美女精品久久久2020| 最刺激的乱惀小说喷水网| 亚洲男人的天堂在线播放| 中文字幕色综合久久| 国产91AV免费播放| 爽爽视频免费版| 亚洲成a人片在线观看无吗| 亚洲女人的天堂| 久草香蕉97在线视频| 棚户区老熟女自拍视频| 欧美白人性受XXXX性高潮| 中国久久精品一级c片| 亚洲最大AV一区二区三区| 青青草原精品视频美女大全| 日本最新免费区中文| 杨幂国产在线观看| 小说区图片区综合区| 乱理片 最新乱理片2020年飘| 久久超级碰| 日韩免费A视频| 男女真人后进式猛烈小动态图| 国产熟睡乱子伦视频观看软件| 97日日摸夜夜添夜夜添高潮| 国产尤物视频91 在线| 在线观看网站污| 少妇午夜福利视频拍拍拍| 亚洲性爱视频网站| 办公室揉弄震动嗯~动态图 | 后进极品翘臀美女在线播放| 午夜福利图片网站| 野外巨胸喷奶水视频www网站| 小说区图片区综合区| 国产真实露脸多P视频播放| 粉嫩国产白浆在线播放| 成 人免费视频免费观看直播| 国产精品老女人精品视频| 在线丝袜制服性爱视频| 国产福利美女在线| 在线制服丝袜自拍日韩APP| 国产1区2区3区4区不卡| 国产女主播在线观看| 中文字幕无线码| 视频中文字幕第一页| 2022AV网站在线播放| 国产青榴社区视频在线观看| 国产在线拍揄自揄视精品不卡| 大香蕉天天噜在线视频| 国产免费破外女视频| AV在线精品网在线观看| 国产人成午夜免电影费观看| 69久久国产精品视频| 欧美小屁孩cao大人在线播放| 色爱综合另类图区| 欧美综合自拍亚洲综合百度| 成在线人AV| 制服丝袜另类专区制服| 2019中文字幕永久在线视频| 综合久久久综合欧美98| 在线视频第一页日本| 高清不卡一区二区三区香蕉| 久久精品国产99国产精2021| 丝袜美女喷水视频| h视频免费在线观看网址| 国产ktv交换配乱婬视频| 国产精品免费αv视频| 好男人在线社区www在线观看 | 免费人成视频x8x8入口| 日本无遮挡吸乳叫声视频 | 天天爽夜夜欢免费视频| 熟女乱妇| 中国老太婆BBWHD另类| 国产午夜时刻免费观看| 日韩AV中文在线观看导航| 欧洲 亚洲 国产| 久久久久久久久久伊春院视频| 免费视频久久久久| AA级女人大片喷水免费播放| 性,三级国产在线| 思思色免费视频| 午夜热门精品一区二区三区| 亚洲午夜福利在线观看首页| 亚洲一卡2卡三卡4卡 乱码| 亚洲成a人片在线观看中| 伊人久久大香线焦在观看| 一区2区3区4区女尤| 日韩一区二区久久久久久 | 久久AV免费观看| 国产综合一区二区在线观看| 中国女人679xxxx高清视频| AV天堂永久资源防屏蔽网址| 欧美成a网| 精品午夜福利1000在线观看| 在线看片a免费人成视频| 99护士自拍高潮在线| 男女肉粗暴进来120秒动态图| 亚洲а∨天堂2021在线网站| 亲趣三级在线18国产观看视| 伊人琪琪一区二区三区| 日日夜夜嗷嗷叫| 久久AVHD| 国产夜间视频| 中文有码在线嘿嘿视频| 亚洲a欧美| 国产亚洲日韩国网曝| 国产爆浆在线播放| 厨房掀起裙子从后面进去视频| 男人的j桶女人免费网站| 一线欧美精片一区二区三区| 自拍三区| 国产污网站在线观看15| 啦啦啦老逼视频| 国产精品91大屁股白浆一区二区| 国产区在线视频明星在线| 97超级碰碰碰碰久久久久| 亚洲国产综合777| 亚洲日韩中文字幕在线播放| 穿丁字内裤带着震蛋出门小说| 一起色一起碰一起爽| 亚洲444kkkk在线观看最新| 又黑又粗又硬在线视频| 国产福利免费视频不卡| 中文字幕日韩精品亚洲一区| G0G0日本肉体艺术激情| 国产aⅴ一区二区三区| 国产精品午夜理论片| 尤物在线视频国产m| 国产午夜免费不卡精品理论片| 在线播放AV满18| 国产精品jk白丝在线播放| 日本中文字幕有码| 国产免费AV片在线观看下载| 亚洲 欧美 日韩 综合aⅴ视频| 2019年天天夜夜爽| 成人又黄又爽又色的视频| 亚洲观看AV网址| 美AⅤ在线| 在线观看人免费视频| 成人一卡二卡三卡四卡视频| yeyecao亚洲夜夜综合久久| AV网站天堂手机版| 色就色 综合偷拍区第三十七页| 一区二区国产在线观看免费| 天天久久综合色| 色94色欧美SUTE亚洲线路一| 亚洲精品成人a在线观看| 紧身裙丝袜系列中文字幕| 成年网在线观看免费| 国产偷人激情视频在线观看| 日本三级a性爱在线免费观看| 国产va免费精品观看精品| 最近亚洲国产网页AⅤ| 最新地址一二三在线观看| 又大又黄的在线视频播放| 国产大全乱伦不卡中文字幕电影网 | 国产激情一区| 偷国内自拍视频在线观看手机版| 国产尤物喷潮在线观看| 91久久香蕉国产熟女线看观看| 日本三级香港三级韩国三级| 国产精品十区视频| 99久免费精品视频在线观看| 91尤物午夜网站在线播放| 中文字幕无线观看高清| 开心五月婷婷色婷在线| 中国一级特黄刺激刺激爽| 在线可以免费观看的Av蜜| 国产高h视频| jizz中出| 91青青欧美| 亚洲国产aⅴ精品一区二区| 久久不卡日韩美女| 99久久香蕉资源网| 小女生喷白浆在线播放| jiZZjiZZ日本护士水| 午夜福利国产精品久久熟国产| 久久国产午夜理论片| 久久社区久久久久| 狠狠狠久久久久| 狠狠se网站| 香港三日本三级少妇三级99下| 国产精品亚洲а∨天堂免在线| 日本韩国中文在线字慕| 国内少妇自拍区视频免費視色| 国产老肥熟xxxx| 2022手机在线免费观看视频网站| 美女自卫慰免费视频www免费 | 自拍卡通亚洲日本| AV在线网页大全| 亚洲中字慕日产2020| 色天堂最新网址| 亚洲视频你懂的| 野外性XXⅩXX大少妇| 十八禁在线观看无遮挡| 最新国产视频影院| 久久亚洲综合精品99国产| 两个人看的www在线观看| 九七电影院理论片在线观看| AV天堂最新手机网址| 永久69视频免费网址| 夜夜艹欧美| 又爽又黄又无遮挡的视频1000| 好男人在线观看直播视频| 亚洲色婷婷综合久久| 波多野结衣一区二区三区在线观看| 午夜男女XX00视频福利| 国产女与黑人在线播放| 国产原创巨作在线| 久久动漫一精品伊人| 国产失禁大喷潮在线观看| 2022AV网站在线播放| 久久综合亚洲色社区| 天天摸天天做天天爽| 99午夜视频| 丰满少妇被猛烈进入流水 | 538免费视频| 在线观看亚洲AV日韩AV| 69精品手机国产在线| 国产大学生视频合集| 网红三级视频在线| 亚洲人成网站999久久久全网| 亚洲熟女Av导航| 亚洲gay片在线网站| 国内精品伊人久久久久网站下载| 国产精品尤物网在线观看| 午夜电影福利| 国产干逼逼视频| 五月天婷婷在在线视频高清| 网禁拗女稀缺资源在线观看| 扒开双腿猛进入喷水视频| 国产污的网站| 亚洲欧美日韩精品综合网| 亚洲呦女视频| 国产极品白丝后进在线观看| 3D黄动漫在线h| 亚洲AV中文久久精品软件下载| 大香网伊人久久综合网2018| AV永久天堂网| 久久精品国产日本波多野结衣| 久久香蕉国产线看观看精品yw| 97精品伊人久久大香线蕉| 2020最新大胆偷拍美女视频| 尹人九九大色香蕉网站| 白丝jk在线观看喷水| 久久久激情视频| 97日日摸夜夜添夜夜添高潮| 嗯啊啊啊中文对白在线观看| 久久精品女人天堂aaa| 国产极品性奴| 中文字摹草美女Av| 五月婷婷麻豆| 亚洲成色全部综合网站| 女生十八禁不遮不挡网站| 国产一区二区www| 性少妇freesexvideos高清| 黄又色又免费的视频| www免费视频| 亚洲T∨在线观看| 日韩一级欧美一级在线看| 丝袜一区二区| 国产精品视频欧美激情专区| 国产白丝视频在线| 最新四色米奇影视777在线看| 女人天堂视频网| 亚洲狂野国产AV| 狠狠se网站| H肉动漫在线观看| 白浆AV网站导航| 国产国产人免费视频成| 麻豆丝袜脚交在线播放| 成年人片尤物视频| 国产白浆出水精选合集| 阿v天堂网2021在线观看| 影音先锋日韩在线高清AV资源| 中文有码亚洲中文| 波多野结衣初尝黑人巨大| 亚洲Av库在线观看国产| 2022免费在线视频| 变态另类第49页| www亚洲色| 一区二区三区精品视频| 精品1区1区3区4区产品乱码区| 中文字幕三级在| 欧美午夜精品一级| 国产a级特黄的片子免费| 啊啊啊嗯好痛好深啊视频| 国产高清在线视频小尤奈| 成人久久电影| 国产精品自产拍在线观看花钱看 | 在线看片免费人成视久网app| 国产精品二区三区免费播放心| 就去吻亚洲精品国产| 亚洲最大偷拍网| 亚欧视频高清在线播放| 日本大乳高潮视频在线观看| 菲菲学院亚洲欧美激情片| 又黄又刺激的免费网址| 色多多高清视频在线观看网站| 91视频免费观看| 国语自产拍在线视频中文| 亚洲一区电影在线观看| 美女裸胸又黄的网站| 在厨房被夫上司强迫中文| 喷 奶水 乳 码 视频在线| 在线小u 女国产| 在线视频男人的天堂| 国产护士囗交吞精视频| 男人扒开女人下面狂躁小视频| AV网站在线观看白丝| 朝鲜女人大白屁股ASS| 亚洲快速不卡的视频| 国内绝对真实医院偷窥短视频| 亚洲精品嫩草研究院| 午夜大片爽爽爽免费影院| www大成色永久| 亚洲女优一区| 久久久国产午夜理论| 男生把j桶进女的屁股里视频 | 又大又爽 视频免费| 亚洲欧洲国产伦综合| 大屁股少妇18p| 吃女邻居丰满的奶水在线观看 | 夜视频在线导航| 97电影在线免费播放| 337p日本| 日本A∨久久| 2020国产精品熟女天天更新| 第一页偷拍| 阿嗯疼啊午夜视频| 一级a做片性视频| 羞羞影院午夜男女爽爽| 国产亚洲日韩欧一级| 成人永久免费高清视频在线观看| 日本十八禁蜜芽tv网站| 日本视频一区二区三区h| 丁香五月开心婷婷综合缴情| 国产真实交换免视频| 国产亚洲中文日本不卡二区| 日本教师奶水四溅观看| 丁香婷婷色五月基地| 激情婷婷基地| 尤物国产在线| 公交车上拨开少妇内裤进入| 2021在线观看国产精品| 国产激情在线观看免费视频| 亚洲aⅤ视频在线观看| 2022在线免费视频| 丁香六月在线综合激情视频| 超级黑人巨黑吊sv中国女人| 吃女邻居丰满的奶水在线观看 | 亚洲精品偷拍福利视频| 久久精品免费观看国产日| 久久综合精品国产二区无| 国产亚洲精品AⅤ| 久久 女 hd| yy111111少妇光屁股影院免费观看 | AV人人夜夜澡人人爽| 白丝女高中生被c爽哭视频| 十八禁在线看网址| 亚洲午夜精品A| 精品精品精品国产自| 精品国产电影久久九九|