using MessagePipe.Internal; using System; using System.Threading; using Cysharp.Threading.Tasks; namespace MessagePipe { [Preserve] public sealed class EventFactory { readonly MessagePipeOptions options; readonly MessagePipeDiagnosticsInfo diagnosticsInfo; readonly FilterAttachedMessageHandlerFactory handlerFactory; readonly FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory; [Preserve] public EventFactory( MessagePipeOptions options, MessagePipeDiagnosticsInfo diagnosticsInfo, FilterAttachedMessageHandlerFactory handlerFactory, FilterAttachedAsyncMessageHandlerFactory asyncHandlerFactory) { this.options = options; this.diagnosticsInfo = diagnosticsInfo; this.handlerFactory = handlerFactory; this.asyncHandlerFactory = asyncHandlerFactory; } public (IDisposablePublisher, ISubscriber) CreateEvent() { var core = new MessageBrokerCore(diagnosticsInfo, options); var publisher = new DisposablePublisher(core); var subscriber = new MessageBroker(core, handlerFactory); return (publisher, subscriber); } public (IDisposableAsyncPublisher, IAsyncSubscriber) CreateAsyncEvent() { var core = new AsyncMessageBrokerCore(diagnosticsInfo, options); var publisher = new DisposableAsyncPublisher(core); var subscriber = new AsyncMessageBroker(core, asyncHandlerFactory); return (publisher, subscriber); } public (IDisposableBufferedPublisher, IBufferedSubscriber) CreateBufferedEvent(T initialValue) { var innerCore = new MessageBrokerCore(diagnosticsInfo, options); var core = new BufferedMessageBrokerCore(innerCore); var broker = new BufferedMessageBroker(core, handlerFactory); var publisher = new DisposableBufferedPublisher(broker, innerCore); var subscriber = broker; publisher.Publish(initialValue); return (publisher, subscriber); } public (IDisposableBufferedAsyncPublisher, IBufferedAsyncSubscriber) CreateBufferedAsyncEvent(T initialValue) { var innerCore = new AsyncMessageBrokerCore(diagnosticsInfo, options); var core = new BufferedAsyncMessageBrokerCore(innerCore); var broker = new BufferedAsyncMessageBroker(core, asyncHandlerFactory); var publisher = new DisposableBufferedAsyncPublisher(broker, innerCore); var subscriber = broker; publisher.Publish(initialValue, CancellationToken.None); // set initial value is completely sync. return (publisher, subscriber); } } public interface IDisposablePublisher : IPublisher, IDisposable { } public interface IDisposableBufferedPublisher : IBufferedPublisher, IDisposable { } internal class DisposablePublisher : IDisposablePublisher { readonly MessageBrokerCore core; public DisposablePublisher(MessageBrokerCore core) { this.core = core; } public void Publish(TMessage message) { core.Publish(message); } public void Dispose() { core.Dispose(); } } internal class DisposableBufferedPublisher : IDisposableBufferedPublisher { readonly BufferedMessageBroker broker; readonly IDisposable disposable; public DisposableBufferedPublisher(BufferedMessageBroker broker, IDisposable disposable) { this.broker = broker; this.disposable = disposable; } public void Publish(TMessage message) { broker.Publish(message); } public void Dispose() { disposable.Dispose(); } } public interface IDisposableAsyncPublisher : IAsyncPublisher, IDisposable { } public interface IDisposableBufferedAsyncPublisher : IBufferedAsyncPublisher, IDisposable { } internal sealed class DisposableAsyncPublisher : IDisposableAsyncPublisher { readonly AsyncMessageBrokerCore core; public DisposableAsyncPublisher(AsyncMessageBrokerCore core) { this.core = core; } public void Publish(TMessage message, CancellationToken cancellationToken) { core.Publish(message, cancellationToken); } public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) { return core.PublishAsync(message, cancellationToken); } public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { return core.PublishAsync(message, publishStrategy, cancellationToken); } public void Dispose() { core.Dispose(); } } internal sealed class DisposableBufferedAsyncPublisher : IDisposableBufferedAsyncPublisher { readonly BufferedAsyncMessageBroker broker; readonly IDisposable disposable; public DisposableBufferedAsyncPublisher(BufferedAsyncMessageBroker broker, IDisposable disposable) { this.broker = broker; this.disposable = disposable; } public void Publish(TMessage message, CancellationToken cancellationToken) { broker.Publish(message, cancellationToken); } public UniTask PublishAsync(TMessage message, CancellationToken cancellationToken) { return broker.PublishAsync(message, cancellationToken); } public UniTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken) { return broker.PublishAsync(message, publishStrategy, cancellationToken); } public void Dispose() { disposable.Dispose(); } } }