using MessagePipe.Internal; using System; using System.Collections.Generic; using System.Threading; using Cysharp.Threading.Tasks; namespace MessagePipe { [Preserve] public class AsyncMessageBroker : IAsyncPublisher, IAsyncSubscriber { readonly AsyncMessageBrokerCore core; readonly FilterAttachedAsyncMessageHandlerFactory handlerFactory; [Preserve] public AsyncMessageBroker(AsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) { this.core = core; this.handlerFactory = handlerFactory; } public void Publish(TKey key, TMessage message, CancellationToken cancellationToken) { core.Publish(key, message, cancellationToken); } public UniTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken) { return core.PublishAsync(key, message, cancellationToken); } public UniTask PublishAsync(TKey key, TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { return core.PublishAsync(key, message, publishStrategy, cancellationToken); } public IDisposable Subscribe(TKey key, IAsyncMessageHandler handler, params AsyncMessageHandlerFilter[] filters) { return core.Subscribe(key, handlerFactory.CreateAsyncMessageHandler(handler, filters)); } } [Preserve] public class AsyncMessageBrokerCore : IDisposable { readonly Dictionary handlerGroup; readonly MessagePipeDiagnosticsInfo diagnotics; readonly AsyncPublishStrategy defaultAsyncPublishStrategy; readonly HandlingSubscribeDisposedPolicy handlingSubscribeDisposedPolicy; readonly object gate; bool isDisposed; [Preserve] public AsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) { this.handlerGroup = new Dictionary(); this.diagnotics = diagnotics; this.defaultAsyncPublishStrategy = options.DefaultAsyncPublishStrategy; this.handlingSubscribeDisposedPolicy = options.HandlingSubscribeDisposedPolicy; this.gate = new object(); } public void Publish(TKey key, TMessage message, CancellationToken cancellationToken) { IAsyncMessageHandler[] handlers; lock (gate) { if (!handlerGroup.TryGetValue(key, out var holder)) { return; } handlers = holder.GetHandlers(); } for (int i = 0; i < handlers.Length; i++) { handlers[i]?.HandleAsync(message, cancellationToken).Forget(); } } public UniTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken) { return PublishAsync(key, message, defaultAsyncPublishStrategy, cancellationToken); } public async UniTask PublishAsync(TKey key, TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { IAsyncMessageHandler[] handlers; lock (gate) { if (!handlerGroup.TryGetValue(key, out var holder)) { return; } handlers = holder.GetHandlers(); } if (publishStrategy == AsyncPublishStrategy.Sequential) { foreach (var item in handlers) { if (item != null) { await item.HandleAsync(message, cancellationToken); } } } else { await new AsyncHandlerWhenAll(handlers, message, cancellationToken); } } public IDisposable Subscribe(TKey key, IAsyncMessageHandler handler) { lock (gate) { if (isDisposed) return handlingSubscribeDisposedPolicy.Handle(nameof(AsyncMessageBrokerCore)); if (!handlerGroup.TryGetValue(key, out var holder)) { handlerGroup[key] = holder = new HandlerHolder(this); } return holder.Subscribe(key, handler); } } public void Dispose() { lock (gate) { if (!isDisposed) { isDisposed = true; foreach (var handlers in handlerGroup.Values) { handlers.Dispose(); } } } } // similar as Keyless-MessageBrokerCore but require to remove when key is empty on Dispose sealed class HandlerHolder : IDisposable, IHandlerHolderMarker { readonly FreeList> handlers; readonly AsyncMessageBrokerCore core; public HandlerHolder(AsyncMessageBrokerCore core) { this.handlers = new FreeList>(); this.core = core; } public IAsyncMessageHandler[] GetHandlers() => handlers.GetValues(); public IDisposable Subscribe(TKey key, IAsyncMessageHandler handler) { var subscriptionKey = handlers.Add(handler); var subscription = new Subscription(key, subscriptionKey, this); core.diagnotics.IncrementSubscribe(this, subscription); return subscription; } public void Dispose() { lock (core.gate) { if (handlers.TryDispose(out var count)) { core.diagnotics.RemoveTargetDiagnostics(this, count); } } } sealed class Subscription : IDisposable { bool isDisposed; readonly TKey key; readonly int subscriptionKey; readonly HandlerHolder holder; public Subscription(TKey key, int subscriptionKey, HandlerHolder holder) { this.key = key; this.subscriptionKey = subscriptionKey; this.holder = holder; } public void Dispose() { if (!isDisposed) { isDisposed = true; lock (holder.core.gate) { if (!holder.core.isDisposed) { holder.handlers.Remove(subscriptionKey, false); holder.core.diagnotics.DecrementSubscribe(holder, this); if (holder.handlers.GetCount() == 0) { holder.core.handlerGroup.Remove(key); } } } } } } } } // Singleton, Scoped variation [Preserve] public class SingletonAsyncMessageBroker : AsyncMessageBroker, ISingletonAsyncPublisher, ISingletonAsyncSubscriber { public SingletonAsyncMessageBroker(SingletonAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) : base(core, handlerFactory) { } } [Preserve] public class SingletonAsyncMessageBrokerCore : AsyncMessageBrokerCore { public SingletonAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) : base(diagnotics, options) { } } [Preserve] public class ScopedAsyncMessageBroker : AsyncMessageBroker, IScopedAsyncPublisher, IScopedAsyncSubscriber { public ScopedAsyncMessageBroker(ScopedAsyncMessageBrokerCore core, FilterAttachedAsyncMessageHandlerFactory handlerFactory) : base(core, handlerFactory) { } } [Preserve] public class ScopedAsyncMessageBrokerCore : AsyncMessageBrokerCore { public ScopedAsyncMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) : base(diagnotics, options) { } } }