-
Notifications
You must be signed in to change notification settings - Fork 3
Writing operators
Writing operators for IAsyncEnumerable
is both easier and harder than working with IObservable
s or Reactive Streams Publisher
s. It is easier for intermediate operators such as Map
and Filter
where async
/await
and compiler generated state machines line up pretty well. It is harder for operators working with multiple sources or time because one can only await
one thing in an async method at a time and often there is no clear thing to await first or that how many awaitable primitives to introduce.
In this wiki, I'll explain how to write such easy and complicated operators.
Despite the popularity of Rx.NET for push-based declarative data processing and the success of the non-blocking backpressure-enabled RxJava libraries, there is always the buzz that maybe it is possible to have convenient backpressure via coroutines, continuations, async-await or fibers.
One option is to make IEnumerable
/IEnumerator
support async/await and let the compiler work out the resumption state machine. We only have to await the next item and the concurrency is taken care of.
There were several proposals about such an async enumerable, and the latest one that seems to be winning is defined as follows.
The proposal introduces 3 new interfaces backing the concept of async sequences as well as language/compiler changes. We don't care about the latter as we don't have any such changes ready, but we can work with the interfaces right away as the necessary async/await support is already part of the released C# language.
The new interfaces and their methods were named distinctively with an Async
word so that they can be implemented on a class which already supported the synchronous IEnumerable
/IEnumerator
without conflict.
The IAsyncDisposable
is the analogue of the IDisposable
interface, but instead of a blocking or fire-and-forget disposal, the DisposeAsync
has to be called and awaited to make sure resources associated with an async sequence are all released before continuing.
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
Note that the design uses ValueTask
instead of Task
as there is likely enough cases where the method can run synchronously and thus the execution could avoid creating task objects and continuation handlers altogether.
The IAsyncEnumerator
is analogous with IEnumerator
that allows moving for the next item, awaiting this to happen and then getting the actual value if there is any.
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
ValueTask<bool> MoveNextAsync();
T Current { get; }
}
The IAsyncEnumerator
extends the IAsyncDisposable
and the consumer is required to await DisposeAsync
whenever it is done with the async enumerator.
There are a couple rules to be honored, which will be quite important later when coordination is required:
-
MoveNextAsync
must be called non-concurrently and non-overlappingly. This means that one has to wait for the method's returned task to signal true before calling it again. When used with async/await, this is guaranteed by the compiler generated state machine. Otherwise, call to this method must be serialized externally. -
Current
is only valid if the task evaluates to true after aMoveNextAsync
. It is invalid to read it before the firstMoveNextAsync
call, after a false task or after the call toDisposeAsync
. -
DisposeAsync
cannot be called concurrently withMoveNextAsync
but only when the next task evaluates to true/false/failure. In addition, it has to be awaited and is expected to return only when the whole chain above it has been disposed as well in a cascading fashion. - The methods should not throw exceptions themselves but they can return a failed/failing task.
Unfortunately, DisposeAsync
does not serve as an anytime cancellation like IDisposable.Dispose()
in Rx.NET and since it must not run concurrently with MoveNextAsync
, we have to serialize their execution externally in case we have something that an async-await loop could handle.
Finally the IAsyncEnumerable
is the equivalent of IEnumerable
with a single method to get the IAsyncEnumerator
. This, in theory, allows lazy and deferred async sequences which can be run as many times as necessary and even concurrently with each other. Only interacting with the IAsyncEnumerator
has to be serialized via async/await or external means.
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
This method has to return the async enumerator synchronously and not throw any exception.
Unfortunately, this is the only level where eager cancellation support can be injected, but unlike the Rx.NET scheme, it can't be made known to the downstream consumers within the flow. One has to keep injecting CancellationToken
s into such operators.
There is an additional problem with such injections. One can't inject different CancellationToken
s into different realizations of GetAsyncEnumerator()
s: they have to live on the IAsyncEnumerable
level and cancelling a token will cancel for all running async enumerators. Usually one would like to correlate cancellation with running async enumerators (which one gets from IObservable.Subscribe
quite clearly) so only some runs could be cancelled.
The solution is most likely to defer the creation of the entire IAsyncEnumerable
itself so cancellation can be dedicated at the point when an async enumerator is requested by some consumer.