ZK_Framework/Assets/Plugins/MessagePack/MessagePackStreamReader.cs

210 lines
8.9 KiB
C#

// Copyright (c) All contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Nerdbank.Streams;
namespace MessagePack
{
/// <summary>
/// Reads one or more messagepack data structures from a <see cref="Stream"/>.
/// </summary>
/// <remarks>
/// This class is *not* thread-safe. Do not call more than one member at once and be sure any call completes (including asynchronous tasks)
/// before calling the next one.
/// </remarks>
public partial class MessagePackStreamReader : IDisposable
{
private readonly Stream stream;
private readonly bool leaveOpen;
private SequencePool.Rental sequenceRental;
private SequencePosition? endOfLastMessage;
/// <summary>
/// Initializes a new instance of the <see cref="MessagePackStreamReader"/> class.
/// </summary>
/// <param name="stream">The stream to read from. This stream will be disposed of when this <see cref="MessagePackStreamReader"/> is disposed.</param>
public MessagePackStreamReader(Stream stream)
: this(stream, leaveOpen: false)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="MessagePackStreamReader"/> class.
/// </summary>
/// <param name="stream">The stream to read from.</param>
/// <param name="leaveOpen">If true, leaves the stream open after this <see cref="MessagePackStreamReader"/> is disposed; otherwise, false.</param>
public MessagePackStreamReader(Stream stream, bool leaveOpen)
: this(stream, leaveOpen, SequencePool.Shared)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="MessagePackStreamReader"/> class.
/// </summary>
/// <param name="stream">The stream to read from.</param>
/// <param name="leaveOpen">If true, leaves the stream open after this <see cref="MessagePackStreamReader"/> is disposed; otherwise, false.</param>
/// <param name="sequencePool">The pool to rent a <see cref="Sequence{T}"/> object from.</param>
public MessagePackStreamReader(Stream stream, bool leaveOpen, SequencePool sequencePool)
{
if (sequencePool == null)
{
throw new ArgumentNullException(nameof(sequencePool));
}
this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
this.leaveOpen = leaveOpen;
this.sequenceRental = sequencePool.Rent();
}
/// <summary>
/// Gets any bytes that have been read since the last complete message returned from <see cref="ReadAsync(CancellationToken)"/>.
/// </summary>
public ReadOnlySequence<byte> RemainingBytes => this.endOfLastMessage.HasValue ? this.ReadData.AsReadOnlySequence.Slice(this.endOfLastMessage.Value) : this.ReadData.AsReadOnlySequence;
/// <summary>
/// Gets the sequence that we read data from the <see cref="stream"/> into.
/// </summary>
private Sequence<byte> ReadData => this.sequenceRental.Value;
/// <summary>
/// Reads the next whole (top-level) messagepack data structure.
/// </summary>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>
/// A task whose result is the next whole data structure from the stream, or <c>null</c> if the stream ends.
/// The returned sequence is valid until this <see cref="MessagePackStreamReader"/> is disposed or
/// until this method is called again, whichever comes first.
/// </returns>
/// <remarks>
/// When <c>null</c> is the result of the returned task,
/// any extra bytes read (between the last complete message and the end of the stream) will be available via the <see cref="RemainingBytes"/> property.
/// </remarks>
public async ValueTask<ReadOnlySequence<byte>?> ReadAsync(CancellationToken cancellationToken)
{
this.RecycleLastMessage();
while (true)
{
// Check if we have a complete message and return it if we have it.
// We do this before reading anything since a previous read may have brought in several messages.
cancellationToken.ThrowIfCancellationRequested();
if (this.TryReadNextMessage(out ReadOnlySequence<byte> completeMessage))
{
return completeMessage;
}
if (!await this.TryReadMoreDataAsync(cancellationToken).ConfigureAwait(false))
{
// We've reached the end of the stream.
// We already checked for a complete message with what we already had, so evidently it's not a complete message.
return null;
}
}
}
/// <summary>
/// Arranges for the next read operation to start by reading from the underlying <see cref="Stream"/>
/// instead of any data buffered from a previous read.
/// </summary>
/// <remarks>
/// This is appropriate if the underlying <see cref="Stream"/> has been repositioned such that
/// any previously buffered data is no longer applicable to what the caller wants to read.
/// </remarks>
public void DiscardBufferedData()
{
this.sequenceRental.Value.Reset();
this.endOfLastMessage = default;
}
/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Disposes of managed and unmanaged resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> if this instance is being disposed; <see langword="false"/> if it is being finalized.</param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (!this.leaveOpen)
{
this.stream.Dispose();
}
this.sequenceRental.Dispose();
this.sequenceRental = default;
}
}
/// <summary>
/// Recycle memory from a previously returned message.
/// </summary>
private void RecycleLastMessage()
{
if (this.endOfLastMessage.HasValue)
{
// A previously returned message can now be safely recycled since the caller wants more.
this.ReadData.AdvanceTo(this.endOfLastMessage.Value);
this.endOfLastMessage = null;
}
}
/// <summary>
/// Read more data from the stream into the <see cref="ReadData"/> buffer.
/// </summary>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns><c>true</c> if more data was read; <c>false</c> if the end of the stream had already been reached.</returns>
private async Task<bool> TryReadMoreDataAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Memory<byte> buffer = this.ReadData.GetMemory(sizeHint: 0);
int bytesRead = 0;
try
{
bytesRead = await this.stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
return bytesRead > 0;
}
finally
{
// Keep our state clean in case the caller wants to call us again.
this.ReadData.Advance(bytesRead);
}
}
/// <summary>
/// Checks whether the content in <see cref="ReadData"/> include a complete messagepack structure.
/// </summary>
/// <param name="completeMessage">Receives the sequence of the first complete data structure found, if any.</param>
/// <returns><c>true</c> if a complete data structure was found; <c>false</c> otherwise.</returns>
private bool TryReadNextMessage(out ReadOnlySequence<byte> completeMessage)
{
if (this.ReadData.Length > 0)
{
var reader = new MessagePackReader(this.ReadData);
// Perf opportunity: instead of skipping from the start each time, we could incrementally skip across tries
// possibly as easy as simply keeping a count of how many tokens still need to be skipped (that we know about).
if (reader.TrySkip())
{
this.endOfLastMessage = reader.Position;
completeMessage = reader.Sequence.Slice(0, reader.Position);
return true;
}
}
completeMessage = default;
return false;
}
}
}