using System; using System.Collections.Generic; using System.Threading; using Cysharp.Threading.Tasks; #if !UNITY_2018_3_OR_NEWER using System.Threading.Channels; #endif namespace MessagePipe { public static partial class SubscriberExtensions { public static IUniTaskAsyncEnumerable AsAsyncEnumerable(this IAsyncSubscriber subscriber, params AsyncMessageHandlerFilter[] filters) { return new AsyncEnumerableAsyncSubscriber(subscriber, filters); } public static IUniTaskAsyncEnumerable AsAsyncEnumerable(this IBufferedAsyncSubscriber subscriber, params AsyncMessageHandlerFilter[] filters) { return new BufferedAsyncEnumerableAsyncSubscriber(subscriber, filters); } public static IUniTaskAsyncEnumerable AsAsyncEnumerable(this IAsyncSubscriber subscriber, TKey key, params AsyncMessageHandlerFilter[] filters) { return new AsyncEnumerableAsyncSubscriber(key, subscriber, filters); } } internal class AsyncEnumerableAsyncSubscriber : IUniTaskAsyncEnumerable { readonly IAsyncSubscriber subscriber; readonly AsyncMessageHandlerFilter[] filters; public AsyncEnumerableAsyncSubscriber(IAsyncSubscriber subscriber, AsyncMessageHandlerFilter[] filters) { this.subscriber = subscriber; this.filters = filters; } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { var disposable = DisposableBag.CreateSingleAssignment(); var e = new AsyncMessageHandlerEnumerator(disposable, cancellationToken); disposable.Disposable = subscriber.Subscribe(e, filters); return e; } } internal class BufferedAsyncEnumerableAsyncSubscriber : IUniTaskAsyncEnumerable { readonly IBufferedAsyncSubscriber subscriber; readonly AsyncMessageHandlerFilter[] filters; public BufferedAsyncEnumerableAsyncSubscriber(IBufferedAsyncSubscriber subscriber, AsyncMessageHandlerFilter[] filters) { this.subscriber = subscriber; this.filters = filters; } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { var disposable = DisposableBag.CreateSingleAssignment(); var e = new AsyncMessageHandlerEnumerator(disposable, cancellationToken); var task = subscriber.SubscribeAsync(e, filters); SetDisposableAsync(task, disposable); return e; } async void SetDisposableAsync(UniTask task, SingleAssignmentDisposable d) { d.Disposable = await task; } } internal class AsyncEnumerableAsyncSubscriber : IUniTaskAsyncEnumerable { readonly TKey key; readonly IAsyncSubscriber subscriber; readonly AsyncMessageHandlerFilter[] filters; public AsyncEnumerableAsyncSubscriber(TKey key, IAsyncSubscriber subscriber, AsyncMessageHandlerFilter[] filters) { this.key = key; this.subscriber = subscriber; this.filters = filters; } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { var disposable = DisposableBag.CreateSingleAssignment(); var e = new AsyncMessageHandlerEnumerator(disposable, cancellationToken); disposable.Disposable = subscriber.Subscribe(key, e, filters); return e; } } internal class AsyncMessageHandlerEnumerator : IUniTaskAsyncEnumerator, IAsyncMessageHandler { Channel channel; CancellationToken cancellationToken; SingleAssignmentDisposable singleAssignmentDisposable; public AsyncMessageHandlerEnumerator(SingleAssignmentDisposable singleAssignmentDisposable, CancellationToken cancellationToken) { this.singleAssignmentDisposable = singleAssignmentDisposable; this.cancellationToken = cancellationToken; #if !UNITY_2018_3_OR_NEWER this.channel = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleWriter = true, SingleReader = true, AllowSynchronousContinuations = true }); #else this.channel = Channel.CreateSingleConsumerUnbounded(); #endif } TMessage IUniTaskAsyncEnumerator.Current { get { if (channel.Reader.TryRead(out var msg)) { return msg; } throw new InvalidOperationException("Message is not buffered in Channel."); } } UniTask IUniTaskAsyncEnumerator.MoveNextAsync() { return channel.Reader.WaitToReadAsync(cancellationToken); } UniTask IAsyncMessageHandler.HandleAsync(TMessage message, CancellationToken cancellationToken) { channel.Writer.TryWrite(message); return default; } UniTask IUniTaskAsyncDisposable.DisposeAsync() { singleAssignmentDisposable.Dispose(); // unsubscribe message. return default; } } }