What is IValueTaskSource?
IValueTaskSource<T> is an interface that allows you to create custom awaitable types that can be pooled and reused, avoiding allocations associated with Task<T> objects. It’s the foundation for ValueTask<T>, which is a discriminated union that can represent either a completed synchronous result or an asynchronous Task<T>. This is particularly useful for high-performance async scenarios where operations often complete synchronously.
How to Use It
Implement IValueTaskSource<T> when you need to create custom awaitable operations that can be pooled and reused. Use ValueTask<T> instead of Task<T> when your async methods frequently complete synchronously or when you want to avoid Task allocations in hot paths.
Example: Pooled Async Operations
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
// Custom IValueTaskSource implementation for pooled async operations
public class PooledValueTaskSource<T> : IValueTaskSource<T>
{
// ManualResetValueTaskSourceCore handles the async state machine logic
private ManualResetValueTaskSourceCore<T> _core;
// Pool for reusing instances
private static readonly ObjectPool<PooledValueTaskSource<T>> _pool =
new ObjectPool<PooledValueTaskSource<T>>(() => new PooledValueTaskSource<T>());
// Private constructor - use factory method instead
private PooledValueTaskSource()
{
// Initialize the core with callback to handle continuations
_core = new ManualResetValueTaskSourceCore<T>()
{
RunContinuationsAsynchronously = true
};
}
// Factory method to get instance from pool
public static PooledValueTaskSource<T> GetFromPool()
{
var instance = _pool.Get();
// Reset the core for reuse - important for pooling
instance._core.Reset();
return instance;
}
// Return instance to pool
public void ReturnToPool()
{
_pool.Return(this);
}
// Create ValueTask<T> that wraps this source
public ValueTask<T> Task
{
get
{
var version = _core.Version; // Get current version for safety
return new ValueTask<T>(this, version);
}
}
// Set result and complete the operation
public void SetResult(T result)
{
_core.SetResult(result);
}
// Set exception and complete the operation
public void SetException(Exception exception)
{
_core.SetException(exception);
}
// IValueTaskSource<T> implementation
public T GetResult(short token)
{
try
{
// Get result and handle any exceptions
return _core.GetResult(token);
}
finally
{
// Always return to pool after getting result
ReturnToPool();
}
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _core.GetStatus(token);
}
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_core.OnCompleted(continuation, state, token, flags);
}
}
// Simple object pool implementation
public class ObjectPool<T> where T : class
{
private readonly Func<T> _factory;
private readonly ConcurrentQueue<T> _pool = new ConcurrentQueue<T>();
private int _count;
private const int MaxSize = 100;
public ObjectPool(Func<T> factory)
{
_factory = factory;
}
public T Get()
{
if (_pool.TryDequeue(out var item))
{
Interlocked.Decrement(ref _count);
return item;
}
return _factory();
}
public void Return(T item)
{
if (_count < MaxSize)
{
_pool.Enqueue(item);
Interlocked.Increment(ref _count);
}
}
}
// High-performance async service using custom ValueTaskSource
public class HighPerformanceAsyncService
{
private readonly Random _random = new Random();
// Method that returns ValueTask<T> instead of Task<T>
public ValueTask<int> GetDataAsync(int id)
{
// Check if we can return result synchronously (fast path)
if (id <= 0)
{
// Return completed ValueTask directly - no allocation
return new ValueTask<int>(0);
}
// For async path, use pooled ValueTaskSource
return GetDataAsyncCore(id);
}
private ValueTask<int> GetDataAsyncCore(int id)
{
// Get pooled source instead of allocating new Task
var source = PooledValueTaskSource<int>.GetFromPool();
// Start async operation
StartAsyncOperation(source, id);
// Return ValueTask that wraps the pooled source
return source.Task;
}
private async void StartAsyncOperation(PooledValueTaskSource<int> source, int id)
{
try
{
// Simulate async work
await Task.Delay(_random.Next(1, 100));
// Calculate result
int result = id * 2 + _random.Next(1, 10);
// Complete the operation
source.SetResult(result);
}
catch (Exception ex)
{
// Handle any exceptions
source.SetException(ex);
}
}
// Optimized method for scenarios with high synchronous completion rate
public ValueTask<string> ProcessDataAsync(int[] data)
{
// Fast path - if array is null or empty, return immediately
if (data == null || data.Length == 0)
{
return new ValueTask<string>("empty");
}
// Medium complexity - use cached result if available
if (data.Length == 1)
{
return new ValueTask<string>($"single:{data[0]}");
}
// Complex case - truly async processing needed
return ProcessDataAsyncCore(data);
}
private async ValueTask<string> ProcessDataAsyncCore(int[] data)
{
// Actual async processing
await Task.Delay(10);
int sum = 0;
foreach (int value in data)
{
sum += value;
}
return $"processed:{sum}";
}
}
// Usage example
public class ValueTaskExample
{
public static async Task DemonstrateValueTask()
{
var service = new HighPerformanceAsyncService();
// These calls may complete synchronously without Task allocation
var result1 = await service.GetDataAsync(-1); // Synchronous path
var result2 = await service.GetDataAsync(5); // Async path with pooling
var result3 = await service.GetDataAsync(10); // Async path with pooling
Console.WriteLine($"Results: {result1}, {result2}, {result3}");
// Test string processing with different paths
var emptyResult = await service.ProcessDataAsync(null); // Sync
var singleResult = await service.ProcessDataAsync(new[] { 42 }); // Sync
var multiResult = await service.ProcessDataAsync(new[] { 1, 2, 3, 4, 5 }); // Async
Console.WriteLine($"Process results: {emptyResult}, {singleResult}, {multiResult}");
}
}