Channels: Thread-Safe Async Communication
August 27, 2024
Channels provide high-performance, thread-safe communication between producers and consumers with backpressure support and configurable buffering strategies.
Available since: .NET Core 2.1 / .NET Standard 2.1
Basic Producer-Consumer
var channel = Channel.CreateUnbounded<int>();
var writer = channel.Writer;
var reader = channel.Reader;
// Producer
await writer.WriteAsync(42);
writer.Complete();
// Consumer
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out int item))
{
Console.WriteLine(item);
}
}
Bounded Channel with Backpressure
var options = new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
var channel = Channel.CreateBounded<WorkItem>(options);
// Producer blocks when channel is full
async Task ProduceWork()
{
for (int i = 0; i < 1000; i++)
{
await channel.Writer.WriteAsync(new WorkItem(i));
}
channel.Writer.Complete();
}
Pipeline Processing
public static async Task ProcessPipeline<T>(
ChannelReader<T> input,
ChannelWriter<T> output,
Func<T, Task<T>> processor)
{
await foreach (var item in input.ReadAllAsync())
{
var processed = await processor(item);
await output.WriteAsync(processed);
}
output.Complete();
}
// Usage
var stage1 = Channel.CreateBounded<string>(50);
var stage2 = Channel.CreateBounded<string>(50);
_ = ProcessPipeline(stage1.Reader, stage2.Writer, item =>
Task.FromResult(item.ToUpper()));
Fan-Out Pattern
public static async Task FanOut<T>(
ChannelReader<T> source,
params ChannelWriter<T>[] destinations)
{
await foreach (var item in source.ReadAllAsync())
{
var tasks = destinations.Select(dest => dest.WriteAsync(item).AsTask());
await Task.WhenAll(tasks);
}
foreach (var dest in destinations)
dest.Complete();
}
Producer-Consumer with Cancellation
public async Task ProcessItems(CancellationToken cancellationToken)
{
var channel = Channel.CreateBounded<WorkItem>(100);
// Producer task
var producer = Task.Run(async () =>
{
try
{
for (int i = 0; !cancellationToken.IsCancellationRequested; i++)
{
await channel.Writer.WriteAsync(new WorkItem(i), cancellationToken);
await Task.Delay(100, cancellationToken);
}
}
finally
{
channel.Writer.Complete();
}
}, cancellationToken);
// Consumer
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
await ProcessItem(item);
}
}
AsyncEnumerable Bridge
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this ChannelReader<T> reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out T item))
{
yield return item;
}
}
}
// Usage
await foreach (var item in channel.Reader.ToAsyncEnumerable())
{
ProcessItem(item);
}
Drop-Oldest Strategy
var options = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.DropOldest,
AllowSynchronousContinuations = false
};
var channel = Channel.CreateBounded<LogEntry>(options);
// Always succeeds, drops oldest entries when full
channel.Writer.TryWrite(new LogEntry("New message"));
Channels excel at decoupling producers from consumers, handling backpressure, and building async processing pipelines with clean cancellation support.