using MessagePipe.Internal; using System; using System.Runtime.CompilerServices; using System.Threading; using Cysharp.Threading.Tasks; namespace MessagePipe { [Preserve] public class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber { readonly AsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; [Preserve] public AsyncMessageBroker(AsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) { this.core = core; this.handlerFactory = handlerFactory; } public void Publish(TMessage message, CancellationToken cancellationToken) { core.Publish(message, cancellationToken); } public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) { return core.PublishAsync(message, cancellationToken); } public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { return core.PublishAsync(message, publishStrategy, cancellationToken); } public IDisposable Subscribe(IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters) { return core.Subscribe(handlerFactory.CreateAsyncMessageHandler(handler, filters)); } } [Preserve] public class AsyncMessageBrokerCore : IDisposable, IHandlerHolderMarker { FreeList> handlers; readonly MessagePipeDiagnosticsInfo diagnotics; readonly AsyncPublishStrategy defaultAsyncPublishStrategy; readonly HandlingSubscribeDisposedPolicy handlingSubscribeDisposedPolicy; readonly object gate = new object(); bool isDisposed; [Preserve] public AsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) { this.handlers = new FreeList>(); this.defaultAsyncPublishStrategy = options.DefaultAsyncPublishStrategy; this.handlingSubscribeDisposedPolicy = options.HandlingSubscribeDisposedPolicy; this.diagnotics = diagnotics; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Publish(TMessage message, CancellationToken cancellationToken) { var array = handlers.GetValues(); for (int i = 0; i < array.Length; i++) { array[i]?.HandleAsync(message, cancellationToken).Forget(); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) { return PublishAsync(message, defaultAsyncPublishStrategy, cancellationToken); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public async UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { var array = handlers.GetValues(); if (publishStrategy == AsyncPublishStrategy.Sequential) { foreach (var item in array) { if (item != null) { await item.HandleAsync(message, cancellationToken); } } } else { await new AsyncHandlerWhenAll(array, message, cancellationToken); } } public IDisposable Subscribe(IAsyncMessageHandler handler) { lock (gate) { if (isDisposed) return handlingSubscribeDisposedPolicy.Handle(nameof(AsyncMessageBrokerCore)); var subscriptionKey = handlers.Add(handler); var subscription = new Subscription(this, subscriptionKey); diagnotics.IncrementSubscribe(this, subscription); return subscription; } } public void Dispose() { lock (gate) { // Dispose is called when scope is finished. if (!isDisposed && handlers.TryDispose(out var count)) { isDisposed = true; diagnotics.RemoveTargetDiagnostics(this, count); } } } sealed class Subscription : IDisposable { bool isDisposed; readonly AsyncMessageBrokerCore core; readonly int subscriptionKey; public Subscription(AsyncMessageBrokerCore core, int subscriptionKey) { this.core = core; this.subscriptionKey = subscriptionKey; } public void Dispose() { if (!isDisposed) { isDisposed = true; lock (core.gate) { core.handlers.Remove(subscriptionKey, true); core.diagnotics.DecrementSubscribe(core, this); } } } } } [Preserve] public sealed class BufferedAsyncMessageBroker : IBufferedAsyncPublisher, IBufferedAsyncSubscriber { readonly BufferedAsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; [Preserve] public BufferedAsyncMessageBroker(BufferedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) { this.core = core; this.handlerFactory = handlerFactory; } public void Publish(TMessage message, CancellationToken cancellationToken) { core.Publish(message, cancellationToken); } public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) { return core.PublishAsync(message, cancellationToken); } public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { return core.PublishAsync(message, publishStrategy, cancellationToken); } public UniTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) { return SubscribeAsync(handler, Array.Empty>(), cancellationToken); } public UniTask SubscribeAsync(IAsyncMessageHandler handler, AsyncMessageHandlerFilter[] filters, CancellationToken cancellationToken) { handler = handlerFactory.CreateAsyncMessageHandler(handler, filters); return core.SubscribeAsync(handler, cancellationToken); } } [Preserve] public sealed class BufferedAsyncMessageBrokerCore { static readonly bool IsValueType = typeof(TMessage).IsValueType; readonly AsyncMessageBrokerCore core; TMessage lastMessage; [Preserve] public BufferedAsyncMessageBrokerCore(AsyncMessageBrokerCore core) { this.core = core; this.lastMessage = default; } public void Publish(TMessage message, CancellationToken cancellationToken) { lastMessage = message; core.Publish(message, cancellationToken); } public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) { lastMessage = message; return core.PublishAsync(message, cancellationToken); } public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { lastMessage = message; return core.PublishAsync(message, publishStrategy, cancellationToken); } public async UniTask SubscribeAsync(IAsyncMessageHandler handler, CancellationToken cancellationToken) { if (IsValueType || lastMessage != null) { await handler.HandleAsync(lastMessage, cancellationToken); } return core.Subscribe(handler); } } // Singleton, Scoped variation [Preserve] public class SingletonAsyncMessageBroker : AsyncMessageBroker, ISingletonAsyncPublisher, ISingletonAsyncSubscriber { [Preserve] public SingletonAsyncMessageBroker(SingletonAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) : base(core, handlerFactory) { } } [Preserve] public class ScopedAsyncMessageBroker : AsyncMessageBroker, IScopedAsyncPublisher, IScopedAsyncSubscriber { [Preserve] public ScopedAsyncMessageBroker(ScopedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) : base(core, handlerFactory) { } } [Preserve] public class SingletonAsyncMessageBrokerCore : AsyncMessageBrokerCore { [Preserve] public SingletonAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) : base(diagnostics, options) { } } [Preserve] public class ScopedAsyncMessageBrokerCore : AsyncMessageBrokerCore { [Preserve] public ScopedAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnostics, MessagePipeOptions options) : base(diagnostics, options) { } } }