.NET 高性能緩沖隊(duì)列實(shí)現(xiàn) BufferQueue
當(dāng)前位置:點(diǎn)晴教程→知識管理交流
→『 技術(shù)文檔交流 』
前言BufferQueue 是一個用 .NET 編寫的高性能的緩沖隊(duì)列實(shí)現(xiàn),支持多線程并發(fā)操作。 項(xiàng)目地址:https://github.com/eventhorizon-cli/BufferQueue 項(xiàng)目是從 mocha 項(xiàng)目中獨(dú)立出來的一個組件,經(jīng)過修改以提供更通用的緩沖隊(duì)列功能。 目前支持的緩沖區(qū)類型為內(nèi)存緩沖區(qū),后續(xù)會考慮支持更多類型的緩沖區(qū)。 適用場景生產(chǎn)者和消費(fèi)者之間的速度不一致,需要并發(fā)批量處理數(shù)據(jù)的場景。 因?yàn)槟壳爸挥袃?nèi)存版本,不適用于不允許數(shù)據(jù)丟失的業(yè)務(wù)場景。 功能說明支持創(chuàng)建多個 Topic,每個 Topic 可以有多種數(shù)據(jù)類型。每一對 Topic 和數(shù)據(jù)類型對應(yīng)一個獨(dú)立的緩沖區(qū)。 支持創(chuàng)建多個 Consumer Group,每個 Consumer Group 的消費(fèi)進(jìn)度都是獨(dú)立的。支持多個 Consumer Group 并發(fā)消費(fèi)同一個 Topic。 支持同一個 Consumer Group 創(chuàng)建多個 Consumer,以負(fù)載均衡的方式消費(fèi)數(shù)據(jù)。 支持?jǐn)?shù)據(jù)的批量消費(fèi),可以一次性獲取多條數(shù)據(jù)。 支持 pull 模式和 push 模式兩種消費(fèi)模式。 pull 模式下和 push 模式下都支持 auto commit 和 manual commit 兩種提交方式。auto commit 模式下,消費(fèi)者在收到數(shù)據(jù)后自動提交消費(fèi)進(jìn)度,如果消費(fèi)失敗不會重試。manual commit 模式下,消費(fèi)者需要手動提交消費(fèi)進(jìn)度,如果消費(fèi)失敗只要不提交進(jìn)度就可以重試。 需要注意的是,當(dāng)前版本出于簡化實(shí)現(xiàn)的考慮,暫不支持消費(fèi)者的動態(tài)擴(kuò)容和縮容,需要在創(chuàng)建消費(fèi)者時指定消費(fèi)者數(shù)量。 使用示例安裝 Nuget 包: dotnet add package BufferQueue 項(xiàng)目基于 Microsoft.Extensions.DependencyInjection,使用時需要先注冊服務(wù)。 BufferQueue 支持兩種消費(fèi)模式:pull 模式和 push 模式。
pull 模式的消費(fèi)者示例:
push 模式的消費(fèi)者示例: 通過 BufferPushCustomer 特性注冊 push 模式的消費(fèi)者。 push consumer 會被注冊到 DI 容器中,可以通過構(gòu)造函數(shù)注入其他服務(wù),可以通過設(shè)置 ServiceLifetime 來控制 consumer 的生命周期。 BufferPushCustomerAttribute 中的 concurrency 參數(shù)用于設(shè)置 push consumer 的消費(fèi)并發(fā)數(shù),對應(yīng) pull consumer 的 consumerNumber。
Producer 示例: 通過 IBufferQueue 獲取到指定的 Producer,然后調(diào)用 ProduceAsync 方法發(fā)送數(shù)據(jù)。
BufferQueue 內(nèi)部設(shè)計概述Topic 的隔離BufferQueue 有以下的特性: 同一個數(shù)據(jù)類型 下的 不同 Topic 的 BufferQueue 互不干擾。 同一個 Topic 下的 不同數(shù)據(jù)類型 的 BufferQueue 互不干擾。
這個特性是通過以下兩層接口設(shè)計實(shí)現(xiàn)的: IBufferQueue:根據(jù) TopicName 和 類型參數(shù) T 將請求轉(zhuǎn)發(fā)給具體的 IBufferQueue<T> 實(shí)現(xiàn)(借助 KeyedService 實(shí)現(xiàn)),其中參數(shù) T 代表 Buffer 所承載的數(shù)據(jù)實(shí)體的類型。 IBufferQueue<T>:具體的 BufferQueue 實(shí)現(xiàn),負(fù)責(zé)管理 Topic 下的數(shù)據(jù)。屬于 Buffer 模塊的內(nèi)部實(shí)現(xiàn),不對外暴露。
Partition 的設(shè)計為了保證消費(fèi)速度,BufferQueue 將數(shù)據(jù)劃分為多個 Partition,每個 Partition 都是一個獨(dú)立的隊(duì)列,每個 Partition 都有一個對應(yīng)的消費(fèi)者線程。 Producer 以輪詢的方式往每個 Partition 中寫入數(shù)據(jù)。 Consumer 最多不允許超過 Partition 的數(shù)量,Partition 按平均分配到組內(nèi)每個 Customer 上。 當(dāng)一個 Consumer 被分配了多個 Partition 時,以輪訓(xùn)的方式進(jìn)行消費(fèi)。 每個 Partition 上會記錄不同消費(fèi)組的消費(fèi)進(jìn)度,不同組之間的消費(fèi)進(jìn)度互不干擾。
對并發(fā)的支持Producer 支持并發(fā)寫入。 Consumer 消費(fèi)時是綁定 Partition 的,為保證能正確管理 Partition 的消費(fèi)進(jìn)度,Consumer 不支持并發(fā)消費(fèi)。 如果要增加消費(fèi)速度,需創(chuàng)建多個 Consumer。 Partition 的動態(tài)擴(kuò)容Partition 的基本組成單元是 Segment,Segment 代表保存數(shù)據(jù)的數(shù)組,多個 Segment 通過鏈表的形式組合成一個 Partition。 當(dāng)一個 Segment 寫滿后,通過在其后面追加一個 Segment 實(shí)現(xiàn)擴(kuò)容。 Segment 中用于保存數(shù)據(jù)的數(shù)組的每一個元素稱為 Slot,每個 Slot 都有一個Partition 內(nèi)唯一的自增 Offset。
Segment 的回收機(jī)制每次在 Partition 中新增 Segment 時,會從頭判斷此前的 Segment 是否已經(jīng)被所有消費(fèi)組消費(fèi)完,回收最后一個消費(fèi)完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。
Benchmark測試環(huán)境:Apple M2 Max 64GB 寫入性能測試與 BlockingCollection 對比并發(fā),并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12, partitionNumber 為 1 和 12。 測試結(jié)果
在并發(fā)寫入時,BufferQueue 的寫入性能明顯優(yōu)于 BlockingCollection。 消費(fèi)性能測試pull 模式 consumer 與 BlockingCollection 對比并發(fā)讀取性能,并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12,partitionNumber 為 12。 測試結(jié)果
在批量消費(fèi)時,隨著批量大小的增加,BufferQueue 的消費(fèi)性能優(yōu)勢更加明顯。 轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/18331018 該文章在 2024/8/5 9:51:53 編輯過 |
關(guān)鍵字查詢
相關(guān)文章
正在查詢... |