Channel 是干什么的#
The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations.
Channels are an implementation of the producer/consumer conceptual programming model.
以上是微軟官方的解釋 channels。用中文說(shuō)的話就是這個(gè)類提供了在生產(chǎn)者跟消費(fèi)者之間異步傳統(tǒng)數(shù)據(jù)的能力,簡(jiǎn)單來(lái)說(shuō)可以認(rèn)為是一個(gè)內(nèi)存消息隊(duì)列。
示例 1#
下面是一個(gè)簡(jiǎn)單的示例,說(shuō)明如何使用 Channel 類來(lái)創(chuàng)建一個(gè)生產(chǎn)者-消費(fèi)者模型:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(1000); // 模擬生產(chǎn)者需要一些時(shí)間來(lái)生成數(shù)據(jù)
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消費(fèi)者接收到: {item}");
}
});
await Task.WhenAll(producer, consumer);
}
在這個(gè)例子中,我們創(chuàng)建了一個(gè)無(wú)界的通道,然后創(chuàng)建了兩個(gè)任務(wù),一個(gè)是生產(chǎn)者,一個(gè)是消費(fèi)者。生產(chǎn)者每秒生成一個(gè)數(shù)字,然后寫入通道。消費(fèi)者從通道中讀取數(shù)據(jù)并打印出來(lái)。當(dāng)生產(chǎn)者完成寫入后,它會(huì)調(diào)用 channel.Writer.Complete() 來(lái)通知消費(fèi)者沒(méi)有更多的數(shù)據(jù)可以讀取。
示例 2#
你可以使用 Channel.CreateBounded(capacity) 方法來(lái)創(chuàng)建一個(gè)有界的通道,其中 capacity 參數(shù)指定了通道的容量。當(dāng)通道滿時(shí),嘗試寫入的操作將會(huì)阻塞,直到有空間可用。
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<int>(5); // 創(chuàng)建一個(gè)容量為5的有界通道
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"生產(chǎn)者生成了: {i}");
await Task.Delay(1000); // 模擬生產(chǎn)者需要一些時(shí)間來(lái)生成數(shù)據(jù)
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消費(fèi)者接收到: {item}");
await Task.Delay(2000); // 模擬消費(fèi)者需要一些時(shí)間來(lái)處理數(shù)據(jù)
}
});
await Task.WhenAll(producer, consumer);
}
在這個(gè)例子中,我們創(chuàng)建了一個(gè)容量為5的有界通道。生產(chǎn)者每秒生成一個(gè)數(shù)字,然后寫入通道。消費(fèi)者從通道中讀取數(shù)據(jù)并打印出來(lái),但消費(fèi)者處理數(shù)據(jù)的速度比生產(chǎn)者慢,所以當(dāng)通道滿時(shí),生產(chǎn)者的 WriteAsync 操作將會(huì)阻塞,直到消費(fèi)者讀取了一些數(shù)據(jù),使得通道有空間可用。
示例 3#
下面是一個(gè)示例,展示了如何在多個(gè)生產(chǎn)者和消費(fèi)者之間共享一個(gè)通道:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
// 創(chuàng)建兩個(gè)生產(chǎn)者
var producer1 = Produce(channel.Writer, id: 1);
var producer2 = Produce(channel.Writer, id: 2);
// 創(chuàng)建兩個(gè)消費(fèi)者
var consumer1 = Consume(channel.Reader, id: 1);
var consumer2 = Consume(channel.Reader, id: 2);
// 等待所有生產(chǎn)者和消費(fèi)者完成
await Task.WhenAll(producer1, producer2, consumer1, consumer2);
}
static async Task Produce(ChannelWriter<int> writer, int id)
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
Console.WriteLine($"生產(chǎn)者{id}生成了: {i}");
await Task.Delay(1000); // 模擬生產(chǎn)者需要一些時(shí)間來(lái)生成數(shù)據(jù)
}
writer.Complete();
}
static async Task Consume(ChannelReader<int> reader, int id)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"消費(fèi)者{id}接收到: {item}");
await Task.Delay(2000); // 模擬消費(fèi)者需要一些時(shí)間來(lái)處理數(shù)據(jù)
}
}
在這個(gè)例子中,我們創(chuàng)建了兩個(gè)生產(chǎn)者和兩個(gè)消費(fèi)者,它們都共享同一個(gè)通道。這是一個(gè)非常重要使用模式。因?yàn)楫?dāng)我們使用消息隊(duì)列的時(shí)候往往會(huì)有多個(gè)生產(chǎn)者跟多個(gè)消費(fèi)者。我們可以通過(guò)控制生產(chǎn)者生產(chǎn)的速度來(lái)控制推入隊(duì)列的數(shù)據(jù)量。我們還可以通過(guò)控制消費(fèi)者的數(shù)量來(lái)控制消費(fèi)數(shù)據(jù)的速度,從而來(lái)調(diào)節(jié)系統(tǒng)的流量,達(dá)到消峰填谷的作用。
總結(jié)#
Channel 類是 .NET CORE 3.0 后新加入的類。為我們提供了便利的生產(chǎn)者/消費(fèi)者模式實(shí)現(xiàn)方案。相當(dāng)于是一個(gè)進(jìn)程內(nèi)的內(nèi)存隊(duì)列,而且它沒(méi)有持久化,純內(nèi)存操作,性能是非常非常高的。當(dāng)我們面對(duì)真正的高并發(fā)的時(shí)候可以為我們的系統(tǒng)提供吞吐量。當(dāng)然代價(jià)是內(nèi)存跟放棄一些實(shí)時(shí)性。
作者:Agile.Zhou
出處:https://www.cnblogs.com/kklldog/p/18201013/channel-in-net
版權(quán):本作品采用「署名-非商業(yè)性使用-相同方式共享 4.0 國(guó)際」許可協(xié)議進(jìn)行許可。
該文章在 2024/5/27 8:50:42 編輯過(guò)