using MessagePipe.Internal; using System; using System.Collections.Generic; namespace MessagePipe { [Preserve] public class MessageBroker : IPublisher, ISubscriber { readonly MessageBrokerCore core; readonly FilterAttachedMessageHandlerFactory handlerFactory; [Preserve] public MessageBroker(MessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) { this.core = core; this.handlerFactory = handlerFactory; } public void Publish(TKey key, TMessage message) { core.Publish(key, message); } public IDisposable Subscribe(TKey key, IMessageHandler handler, params MessageHandlerFilter[] filters) { return core.Subscribe(key, handlerFactory.CreateMessageHandler(handler, filters)); } } [Preserve] public class MessageBrokerCore : IDisposable { readonly Dictionary handlerGroup; readonly MessagePipeDiagnosticsInfo diagnotics; readonly HandlingSubscribeDisposedPolicy handlingSubscribeDisposedPolicy; readonly object gate; bool isDisposed; [Preserve] public MessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) { this.handlerGroup = new Dictionary(); this.diagnotics = diagnotics; this.handlingSubscribeDisposedPolicy = options.HandlingSubscribeDisposedPolicy; this.gate = new object(); } public void Publish(TKey key, TMessage message) { IMessageHandler[] handlers; lock (gate) { if (!handlerGroup.TryGetValue(key, out var holder)) { return; } handlers = holder.GetHandlers(); } for (int i = 0; i < handlers.Length; i++) { handlers[i]?.Handle(message); } } public IDisposable Subscribe(TKey key, IMessageHandler handler) { lock (gate) { if (isDisposed) return handlingSubscribeDisposedPolicy.Handle(nameof(MessageBrokerCore)); if (!handlerGroup.TryGetValue(key, out var holder)) { handlerGroup[key] = holder = new HandlerHolder(this); } return holder.Subscribe(key, handler); } } public void Dispose() { lock (gate) { if (!isDisposed) { isDisposed = true; foreach (var handlers in handlerGroup.Values) { handlers.Dispose(); } } } } // similar as Keyless-MessageBrokerCore but require to remove when key is empty on Dispose sealed class HandlerHolder : IDisposable, IHandlerHolderMarker { readonly FreeList> handlers; readonly MessageBrokerCore core; public HandlerHolder(MessageBrokerCore core) { this.handlers = new FreeList>(); this.core = core; } public IMessageHandler[] GetHandlers() => handlers.GetValues(); public IDisposable Subscribe(TKey key, IMessageHandler handler) { var subscriptionKey = handlers.Add(handler); var subscription = new Subscription(key, subscriptionKey, this); core.diagnotics.IncrementSubscribe(this, subscription); return subscription; } public void Dispose() { lock (core.gate) { if (handlers.TryDispose(out var count)) { core.diagnotics.RemoveTargetDiagnostics(this, count); } } } sealed class Subscription : IDisposable { bool isDisposed; readonly TKey key; readonly int subscriptionKey; readonly HandlerHolder holder; public Subscription(TKey key, int subscriptionKey, HandlerHolder holder) { this.key = key; this.subscriptionKey = subscriptionKey; this.holder = holder; } public void Dispose() { if (!isDisposed) { isDisposed = true; lock (holder.core.gate) { if (!holder.core.isDisposed) { holder.handlers.Remove(subscriptionKey, false); holder.core.diagnotics.DecrementSubscribe(holder, this); if (holder.handlers.GetCount() == 0) { holder.core.handlerGroup.Remove(key); } } } } } } } } // Singleton, Scoped variation [Preserve] public class SingletonMessageBroker : MessageBroker, ISingletonPublisher, ISingletonSubscriber { public SingletonMessageBroker(SingletonMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) : base(core, handlerFactory) { } } [Preserve] public class SingletonMessageBrokerCore : MessageBrokerCore { public SingletonMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) : base(diagnotics, options) { } } [Preserve] public class ScopedMessageBroker : MessageBroker, IScopedPublisher, IScopedSubscriber { public ScopedMessageBroker(ScopedMessageBrokerCore core, FilterAttachedMessageHandlerFactory handlerFactory) : base(core, handlerFactory) { } } [Preserve] public class ScopedMessageBrokerCore : MessageBrokerCore { public ScopedMessageBrokerCore(MessagePipeDiagnosticsInfo diagnotics, MessagePipeOptions options) : base(diagnotics, options) { } } }