Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье вы узнаете, как использовать библиотеку потоков данных TPL для реализации шаблона производителя-потребителя. В этом шаблоне производитель отправляет сообщения в блок сообщений, а потребитель считывает сообщения из этого блока.
Замечание
Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) включена в .NET 6 и более поздние версии. Для проектов .NET Framework и .NET Standard необходимо установить 📦 пакет NuGet System.Threading.Tasks.Dataflow.
Example
В следующем примере показана базовая модель производителя-потребителя, использующая поток данных. Метод Produce записывает массивы, содержащие случайные байты данных в System.Threading.Tasks.Dataflow.ITargetBlock<TInput> объект, и Consume метод считывает байты из System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> объекта. Выполняя действия с интерфейсами ISourceBlock<TOutput> и ITargetBlock<TInput>, а не с их производными типами, вы можете написать повторно используемый код, который способен работать с различными типами блоков потока данных. В этом примере используется BufferBlock<T> класс.
BufferBlock<T> Поскольку класс выступает как в качестве исходного блока, так и в качестве целевого блока, производитель и потребитель могут использовать общий объект для передачи данных.
Метод Produce вызывает метод Post в процессе цикла для синхронной записи данных в целевой блок.
Produce После записи всех данных в целевой блок метод вызывает Complete метод, чтобы указать, что блок никогда не будет иметь дополнительных данных. Метод Consume использует асинхронные операторы и операторы await (Async и Await в Visual Basic) для асинхронного вычисления общего количества байтов, полученных от ISourceBlock<TOutput> объекта. Для асинхронной работы метод Consume вызывает метод OutputAvailableAsync, чтобы получить уведомление, когда в исходном блоке доступны данные и когда в нём больше не будет доступных данных.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Надежное программирование
В предыдущем примере используется только один потребитель для обработки исходных данных. Если в приложении есть несколько потребителей, используйте TryReceive метод для чтения данных из исходного блока, как показано в следующем примере.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Метод TryReceive возвращается False , когда данные недоступны. Если несколько потребителей должны одновременно получить доступ к блоку источника, этот механизм гарантирует, что данные по-прежнему доступны после вызова OutputAvailableAsync.