using System; namespace MessagePipe { public static partial class SubscriberExtensions { public static IObservable AsObservable(this ISubscriber subscriber, params MessageHandlerFilter[] filters) { return new ObservableSubscriber(subscriber, filters); } public static IObservable AsObservable(this IBufferedSubscriber subscriber, params MessageHandlerFilter[] filters) { return new ObservableBufferedSubscriber(subscriber, filters); } public static IObservable AsObservable(this ISubscriber subscriber, TKey key, params MessageHandlerFilter[] filters) { return new ObservableSubscriber(key, subscriber, filters); } } internal sealed class ObservableSubscriber : IObservable { readonly TKey key; readonly ISubscriber subscriber; readonly MessageHandlerFilter[] filters; public ObservableSubscriber(TKey key, ISubscriber subscriber, MessageHandlerFilter[] filters) { this.key = key; this.subscriber = subscriber; this.filters = filters; } public IDisposable Subscribe(IObserver observer) { return subscriber.Subscribe(key, new ObserverMessageHandler(observer), filters); } } internal sealed class ObservableSubscriber : IObservable { readonly ISubscriber subscriber; readonly MessageHandlerFilter[] filters; public ObservableSubscriber(ISubscriber subscriber, MessageHandlerFilter[] filters) { this.subscriber = subscriber; this.filters = filters; } public IDisposable Subscribe(IObserver observer) { return subscriber.Subscribe(new ObserverMessageHandler(observer), filters); } } internal sealed class ObservableBufferedSubscriber : IObservable { readonly IBufferedSubscriber subscriber; readonly MessageHandlerFilter[] filters; public ObservableBufferedSubscriber(IBufferedSubscriber subscriber, MessageHandlerFilter[] filters) { this.subscriber = subscriber; this.filters = filters; } public IDisposable Subscribe(IObserver observer) { return subscriber.Subscribe(new ObserverMessageHandler(observer), filters); } } internal sealed class ObserverMessageHandler : IMessageHandler { readonly IObserver observer; public ObserverMessageHandler(IObserver observer) { this.observer = observer; } public void Handle(TMessage message) { observer.OnNext(message); } } }