using MessagePipe.Internal; using System; using System.Linq; using System.Threading; using Cysharp.Threading.Tasks; namespace MessagePipe { [Preserve] public sealed class InMemoryDistributedPublisher : IDistributedPublisher { readonly IAsyncPublisher publisher; [Preserve] public InMemoryDistributedPublisher(IAsyncPublisher publisher) { this.publisher = publisher; } public UniTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken = default) { return publisher.PublishAsync(key, message, cancellationToken); } } [Preserve] public sealed class InMemoryDistributedSubscriber : IDistributedSubscriber { readonly IAsyncSubscriber subscriber; [Preserve] public InMemoryDistributedSubscriber(IAsyncSubscriber subscriber) { this.subscriber = subscriber; } public UniTask SubscribeAsync(TKey key, IMessageHandler handler, CancellationToken cancellationToken = default) { var d = subscriber.Subscribe(key, new AsyncMessageHandlerBridge(handler)); return new UniTask(new AsyncDisposableBridge(d)); } public UniTask SubscribeAsync(TKey key, IMessageHandler handler, MessageHandlerFilter[] filters, CancellationToken cancellationToken = default) { var d = subscriber.Subscribe(key, new AsyncMessageHandlerBridge(handler), filters.Select(x => new AsyncMessageHandlerFilterBridge(x)).ToArray()); return new UniTask(new AsyncDisposableBridge(d)); } public UniTask SubscribeAsync(TKey key, IAsyncMessageHandler handler, CancellationToken cancellationToken = default) { var d = subscriber.Subscribe(key, handler); return new UniTask(new AsyncDisposableBridge(d)); } public UniTask SubscribeAsync(TKey key, IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters, CancellationToken cancellationToken = default) { var d = subscriber.Subscribe(key, handler, filters); return new UniTask(new AsyncDisposableBridge(d)); } } internal sealed class AsyncDisposableBridge : IUniTaskAsyncDisposable { readonly IDisposable disposable; public AsyncDisposableBridge(IDisposable disposable) { this.disposable = disposable; } public UniTask DisposeAsync() { disposable.Dispose(); return default; } } internal sealed class AsyncMessageHandlerBridge : IAsyncMessageHandler { readonly IMessageHandler handler; public AsyncMessageHandlerBridge(IMessageHandler handler) { this.handler = handler; } public UniTask HandleAsync(T message, CancellationToken cancellationToken) { handler.Handle(message); return default; } } internal sealed class AsyncMessageHandlerFilterBridge : AsyncMessageHandlerFilter { readonly MessageHandlerFilter filter; public AsyncMessageHandlerFilterBridge(MessageHandlerFilter filter) { this.filter = filter; this.Order = filter.Order; } public override UniTask HandleAsync(T message, CancellationToken cancellationToken, Func next) { filter.Handle(message, x => next(x, cancellationToken)); return default; } } }