// Copyright (c) All contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Nerdbank.Streams;
namespace MessagePack
{
///
/// Reads one or more messagepack data structures from a .
///
///
/// This class is *not* thread-safe. Do not call more than one member at once and be sure any call completes (including asynchronous tasks)
/// before calling the next one.
///
public partial class MessagePackStreamReader : IDisposable
{
private readonly Stream stream;
private readonly bool leaveOpen;
private SequencePool.Rental sequenceRental;
private SequencePosition? endOfLastMessage;
///
/// Initializes a new instance of the class.
///
/// The stream to read from. This stream will be disposed of when this is disposed.
public MessagePackStreamReader(Stream stream)
: this(stream, leaveOpen: false)
{
}
///
/// Initializes a new instance of the class.
///
/// The stream to read from.
/// If true, leaves the stream open after this is disposed; otherwise, false.
public MessagePackStreamReader(Stream stream, bool leaveOpen)
: this(stream, leaveOpen, SequencePool.Shared)
{
}
///
/// Initializes a new instance of the class.
///
/// The stream to read from.
/// If true, leaves the stream open after this is disposed; otherwise, false.
/// The pool to rent a object from.
public MessagePackStreamReader(Stream stream, bool leaveOpen, SequencePool sequencePool)
{
if (sequencePool == null)
{
throw new ArgumentNullException(nameof(sequencePool));
}
this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
this.leaveOpen = leaveOpen;
this.sequenceRental = sequencePool.Rent();
}
///
/// Gets any bytes that have been read since the last complete message returned from .
///
public ReadOnlySequence RemainingBytes => this.endOfLastMessage.HasValue ? this.ReadData.AsReadOnlySequence.Slice(this.endOfLastMessage.Value) : this.ReadData.AsReadOnlySequence;
///
/// Gets the sequence that we read data from the into.
///
private Sequence ReadData => this.sequenceRental.Value;
///
/// Reads the next whole (top-level) messagepack data structure.
///
/// A cancellation token.
///
/// A task whose result is the next whole data structure from the stream, or null if the stream ends.
/// The returned sequence is valid until this is disposed or
/// until this method is called again, whichever comes first.
///
///
/// When null is the result of the returned task,
/// any extra bytes read (between the last complete message and the end of the stream) will be available via the property.
///
public async ValueTask?> ReadAsync(CancellationToken cancellationToken)
{
this.RecycleLastMessage();
while (true)
{
// Check if we have a complete message and return it if we have it.
// We do this before reading anything since a previous read may have brought in several messages.
cancellationToken.ThrowIfCancellationRequested();
if (this.TryReadNextMessage(out ReadOnlySequence completeMessage))
{
return completeMessage;
}
if (!await this.TryReadMoreDataAsync(cancellationToken).ConfigureAwait(false))
{
// We've reached the end of the stream.
// We already checked for a complete message with what we already had, so evidently it's not a complete message.
return null;
}
}
}
///
/// Arranges for the next read operation to start by reading from the underlying
/// instead of any data buffered from a previous read.
///
///
/// This is appropriate if the underlying has been repositioned such that
/// any previously buffered data is no longer applicable to what the caller wants to read.
///
public void DiscardBufferedData()
{
this.sequenceRental.Value.Reset();
this.endOfLastMessage = default;
}
///
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Disposes of managed and unmanaged resources.
///
/// if this instance is being disposed; if it is being finalized.
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (!this.leaveOpen)
{
this.stream.Dispose();
}
this.sequenceRental.Dispose();
this.sequenceRental = default;
}
}
///
/// Recycle memory from a previously returned message.
///
private void RecycleLastMessage()
{
if (this.endOfLastMessage.HasValue)
{
// A previously returned message can now be safely recycled since the caller wants more.
this.ReadData.AdvanceTo(this.endOfLastMessage.Value);
this.endOfLastMessage = null;
}
}
///
/// Read more data from the stream into the buffer.
///
/// A cancellation token.
/// true if more data was read; false if the end of the stream had already been reached.
private async Task TryReadMoreDataAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Memory buffer = this.ReadData.GetMemory(sizeHint: 0);
int bytesRead = 0;
try
{
bytesRead = await this.stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
return bytesRead > 0;
}
finally
{
// Keep our state clean in case the caller wants to call us again.
this.ReadData.Advance(bytesRead);
}
}
///
/// Checks whether the content in include a complete messagepack structure.
///
/// Receives the sequence of the first complete data structure found, if any.
/// true if a complete data structure was found; false otherwise.
private bool TryReadNextMessage(out ReadOnlySequence completeMessage)
{
if (this.ReadData.Length > 0)
{
var reader = new MessagePackReader(this.ReadData);
// Perf opportunity: instead of skipping from the start each time, we could incrementally skip across tries
// possibly as easy as simply keeping a count of how many tokens still need to be skipped (that we know about).
if (reader.TrySkip())
{
this.endOfLastMessage = reader.Position;
completeMessage = reader.Sequence.Slice(0, reader.Position);
return true;
}
}
completeMessage = default;
return false;
}
}
}