// Copyright (c) All contributors. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Buffers; using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; using MessagePack.LZ4; using Nerdbank.Streams; namespace MessagePack { /// /// High-Level API of MessagePack for C#. /// [System.Diagnostics.CodeAnalysis.SuppressMessage("ApiDesign", "RS0026:Do not add multiple public overloads with optional parameters", Justification = "Each overload has sufficiently unique required parameters.")] public static partial class MessagePackSerializer { private const int LZ4NotCompressionSizeInLz4BlockType = 64; private const int MaxHintSize = 1024 * 1024; /// /// Gets or sets the default set of options to use when not explicitly specified for a method call. /// /// The default value is . /// /// This is an AppDomain or process-wide setting. /// If you're writing a library, you should NOT set or rely on this property but should instead pass /// in (or the required options) explicitly to every method call /// to guarantee appropriate behavior in any application. /// If you are an app author, realize that setting this property impacts the entire application so it should only be /// set once, and before any use of occurs. /// public static MessagePackSerializerOptions DefaultOptions { get; set; } = MessagePackSerializerOptions.Standard; /// /// A thread-local, recyclable array that may be used for short bursts of code. /// [ThreadStatic] private static byte[] scratchArray; /// /// Serializes a given value with the specified buffer writer. /// /// The buffer writer to serialize with. /// The value to serialize. /// The options. Use null to use default options. /// A cancellation token. /// Thrown when any error occurs during serialization. public static void Serialize(IBufferWriter writer, T value, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { var fastWriter = new MessagePackWriter(writer) { CancellationToken = cancellationToken, }; Serialize(ref fastWriter, value, options); fastWriter.Flush(); } /// /// Serializes a given value with the specified buffer writer. /// /// The buffer writer to serialize with. /// The value to serialize. /// The options. Use null to use default options. /// Thrown when any error occurs during serialization. public static void Serialize(ref MessagePackWriter writer, T value, MessagePackSerializerOptions options = null) { options = options ?? DefaultOptions; bool originalOldSpecValue = writer.OldSpec; if (options.OldSpec.HasValue) { writer.OldSpec = options.OldSpec.Value; } try { if (options.Compression.IsCompression() && !PrimitiveChecker.IsMessagePackFixedSizePrimitive) { using (var scratchRental = options.SequencePool.Rent()) { var scratch = scratchRental.Value; MessagePackWriter scratchWriter = writer.Clone(scratch); options.Resolver.GetFormatterWithVerify().Serialize(ref scratchWriter, value, options); scratchWriter.Flush(); ToLZ4BinaryCore(scratch, ref writer, options.Compression); } } else { options.Resolver.GetFormatterWithVerify().Serialize(ref writer, value, options); } } catch (Exception ex) { throw new MessagePackSerializationException($"Failed to serialize {typeof(T).FullName} value.", ex); } finally { writer.OldSpec = originalOldSpecValue; } } /// /// Serializes a given value with the specified buffer writer. /// /// The value to serialize. /// The options. Use null to use default options. /// A cancellation token. /// A byte array with the serialized value. /// Thrown when any error occurs during serialization. public static byte[] Serialize(T value, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { byte[] array = scratchArray; if (array == null) { scratchArray = array = new byte[65536]; } options = options ?? DefaultOptions; var msgpackWriter = new MessagePackWriter(options.SequencePool, array) { CancellationToken = cancellationToken, }; Serialize(ref msgpackWriter, value, options); return msgpackWriter.FlushAndGetArray(); } /// /// Serializes a given value to the specified stream. /// /// The stream to serialize to. /// The value to serialize. /// The options. Use null to use default options. /// A cancellation token. /// Thrown when any error occurs during serialization. public static void Serialize(Stream stream, T value, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { options = options ?? DefaultOptions; cancellationToken.ThrowIfCancellationRequested(); using (SequencePool.Rental sequenceRental = options.SequencePool.Rent()) { Serialize(sequenceRental.Value, value, options, cancellationToken); try { foreach (ReadOnlyMemory segment in sequenceRental.Value.AsReadOnlySequence) { cancellationToken.ThrowIfCancellationRequested(); stream.Write(segment.Span); } } catch (Exception ex) { throw new MessagePackSerializationException("Error occurred while writing the serialized data to the stream.", ex); } } } /// /// Serializes a given value to the specified stream. /// /// The stream to serialize to. /// The value to serialize. /// The options. Use null to use default options. /// A cancellation token. /// A task that completes with the result of the async serialization operation. /// Thrown when any error occurs during serialization. public static async Task SerializeAsync(Stream stream, T value, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { options = options ?? DefaultOptions; cancellationToken.ThrowIfCancellationRequested(); using (SequencePool.Rental sequenceRental = options.SequencePool.Rent()) { Serialize(sequenceRental.Value, value, options, cancellationToken); try { foreach (ReadOnlyMemory segment in sequenceRental.Value.AsReadOnlySequence) { cancellationToken.ThrowIfCancellationRequested(); await stream.WriteAsync(segment, cancellationToken).ConfigureAwait(false); } } catch (Exception ex) { throw new MessagePackSerializationException("Error occurred while writing the serialized data to the stream.", ex); } } } /// /// Deserializes a value of a given type from a sequence of bytes. /// /// The type of value to deserialize. /// The sequence to deserialize from. /// The options. Use null to use default options. /// A cancellation token. /// The deserialized value. /// Thrown when any error occurs during deserialization. public static T Deserialize(in ReadOnlySequence byteSequence, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { var reader = new MessagePackReader(byteSequence) { CancellationToken = cancellationToken, }; return Deserialize(ref reader, options); } /// /// Deserializes a value of a given type from a sequence of bytes. /// /// The type of value to deserialize. /// The reader to deserialize from. /// The options. Use null to use default options. /// The deserialized value. /// Thrown when any error occurs during deserialization. public static T Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options = null) { options = options ?? DefaultOptions; try { if (options.Compression.IsCompression()) { using (var msgPackUncompressedRental = options.SequencePool.Rent()) { var msgPackUncompressed = msgPackUncompressedRental.Value; if (TryDecompress(ref reader, msgPackUncompressed)) { MessagePackReader uncompressedReader = reader.Clone(msgPackUncompressed.AsReadOnlySequence); return options.Resolver.GetFormatterWithVerify().Deserialize(ref uncompressedReader, options); } else { return options.Resolver.GetFormatterWithVerify().Deserialize(ref reader, options); } } } else { return options.Resolver.GetFormatterWithVerify().Deserialize(ref reader, options); } } catch (Exception ex) { throw new MessagePackSerializationException($"Failed to deserialize {typeof(T).FullName} value.", ex); } } /// /// Deserializes a value of a given type from a sequence of bytes. /// /// The type of value to deserialize. /// The buffer to deserialize from. /// The options. Use null to use default options. /// A cancellation token. /// The deserialized value. /// Thrown when any error occurs during deserialization. public static T Deserialize(ReadOnlyMemory buffer, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { var reader = new MessagePackReader(buffer) { CancellationToken = cancellationToken, }; return Deserialize(ref reader, options); } /// /// Deserializes a value of a given type from a sequence of bytes. /// /// The type of value to deserialize. /// The memory to deserialize from. /// The number of bytes read. /// A cancellation token. /// The deserialized value. /// Thrown when any error occurs during deserialization. public static T Deserialize(ReadOnlyMemory buffer, out int bytesRead, CancellationToken cancellationToken = default) => Deserialize(buffer, options: null, out bytesRead, cancellationToken); /// /// Deserializes a value of a given type from a sequence of bytes. /// /// The type of value to deserialize. /// The memory to deserialize from. /// The options. Use null to use default options. /// The number of bytes read. /// A cancellation token. /// The deserialized value. /// Thrown when any error occurs during deserialization. public static T Deserialize(ReadOnlyMemory buffer, MessagePackSerializerOptions options, out int bytesRead, CancellationToken cancellationToken = default) { var reader = new MessagePackReader(buffer) { CancellationToken = cancellationToken, }; T result = Deserialize(ref reader, options); bytesRead = buffer.Slice(0, (int)reader.Consumed).Length; return result; } /// /// Deserializes the entire content of a . /// /// The type of value to deserialize. /// /// The stream to deserialize from. /// The entire stream will be read, and the first msgpack token deserialized will be returned. /// If is true on the stream, its position will be set to just after the last deserialized byte. /// /// The options. Use null to use default options. /// A cancellation token. /// The deserialized value. /// Thrown when any error occurs during deserialization. /// /// If multiple top-level msgpack data structures are expected on the stream, use instead. /// public static T Deserialize(Stream stream, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { options = options ?? DefaultOptions; if (TryDeserializeFromMemoryStream(stream, options, cancellationToken, out T result)) { return result; } using (var sequenceRental = options.SequencePool.Rent()) { var sequence = sequenceRental.Value; try { int bytesRead; do { cancellationToken.ThrowIfCancellationRequested(); Span span = sequence.GetSpan(stream.CanSeek ? (int)Math.Min(MaxHintSize, stream.Length - stream.Position) : 0); bytesRead = stream.Read(span); sequence.Advance(bytesRead); } while (bytesRead > 0); } catch (Exception ex) { throw new MessagePackSerializationException("Error occurred while reading from the stream.", ex); } return DeserializeFromSequenceAndRewindStreamIfPossible(stream, options, sequence, cancellationToken); } } /// /// Deserializes the entire content of a . /// /// The type of value to deserialize. /// /// The stream to deserialize from. /// The entire stream will be read, and the first msgpack token deserialized will be returned. /// If is true on the stream, its position will be set to just after the last deserialized byte. /// /// The options. Use null to use default options. /// A cancellation token. /// The deserialized value. /// Thrown when any error occurs during deserialization. /// /// If multiple top-level msgpack data structures are expected on the stream, use instead. /// public static async ValueTask DeserializeAsync(Stream stream, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default) { options = options ?? DefaultOptions; if (TryDeserializeFromMemoryStream(stream, options, cancellationToken, out T result)) { return result; } using (var sequenceRental = options.SequencePool.Rent()) { var sequence = sequenceRental.Value; try { int bytesRead; do { Memory memory = sequence.GetMemory(stream.CanSeek ? (int)Math.Min(MaxHintSize, stream.Length - stream.Position) : 0); bytesRead = await stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false); sequence.Advance(bytesRead); } while (bytesRead > 0); } catch (Exception ex) { throw new MessagePackSerializationException("Error occurred while reading from the stream.", ex); } return DeserializeFromSequenceAndRewindStreamIfPossible(stream, options, sequence, cancellationToken); } } private delegate int LZ4Transform(ReadOnlySpan input, Span output); private static readonly LZ4Transform LZ4CodecEncode = LZ4Codec.Encode; private static readonly LZ4Transform LZ4CodecDecode = LZ4Codec.Decode; private static bool TryDeserializeFromMemoryStream(Stream stream, MessagePackSerializerOptions options, CancellationToken cancellationToken, out T result) { cancellationToken.ThrowIfCancellationRequested(); if (stream is MemoryStream ms && ms.TryGetBuffer(out ArraySegment streamBuffer)) { result = Deserialize(streamBuffer.AsMemory(checked((int)ms.Position)), options, out int bytesRead, cancellationToken); // Emulate that we had actually "read" from the stream. ms.Seek(bytesRead, SeekOrigin.Current); return true; } result = default; return false; } private static T DeserializeFromSequenceAndRewindStreamIfPossible(Stream streamToRewind, MessagePackSerializerOptions options, ReadOnlySequence sequence, CancellationToken cancellationToken) { if (streamToRewind is null) { throw new ArgumentNullException(nameof(streamToRewind)); } var reader = new MessagePackReader(sequence) { CancellationToken = cancellationToken, }; T result = Deserialize(ref reader, options); if (streamToRewind.CanSeek && !reader.End) { // Reverse the stream as many bytes as we left unread. int bytesNotRead = checked((int)reader.Sequence.Slice(reader.Position).Length); streamToRewind.Seek(-bytesNotRead, SeekOrigin.Current); } return result; } /// /// Performs LZ4 compression or decompression. /// /// The input for the operation. /// The buffer to write the result of the operation. /// The LZ4 codec transformation. /// The number of bytes written to the . private static int LZ4Operation(in ReadOnlySequence input, Span output, LZ4Transform lz4Operation) { ReadOnlySpan inputSpan; byte[] rentedInputArray = null; if (input.IsSingleSegment) { inputSpan = input.First.Span; } else { rentedInputArray = ArrayPool.Shared.Rent((int)input.Length); input.CopyTo(rentedInputArray); inputSpan = rentedInputArray.AsSpan(0, (int)input.Length); } try { return lz4Operation(inputSpan, output); } finally { if (rentedInputArray != null) { ArrayPool.Shared.Return(rentedInputArray); } } } private static bool TryDecompress(ref MessagePackReader reader, IBufferWriter writer) { if (!reader.End) { // Try to find LZ4Block if (reader.NextMessagePackType == MessagePackType.Extension) { MessagePackReader peekReader = reader.CreatePeekReader(); ExtensionHeader header = peekReader.ReadExtensionFormatHeader(); if (header.TypeCode == ThisLibraryExtensionTypeCodes.Lz4Block) { // Read the extension using the original reader, so we "consume" it. ExtensionResult extension = reader.ReadExtensionFormat(); var extReader = new MessagePackReader(extension.Data); // The first part of the extension payload is a MessagePack-encoded Int32 that // tells us the length the data will be AFTER decompression. int uncompressedLength = extReader.ReadInt32(); // The rest of the payload is the compressed data itself. ReadOnlySequence compressedData = extReader.Sequence.Slice(extReader.Position); Span uncompressedSpan = writer.GetSpan(uncompressedLength).Slice(0, uncompressedLength); int actualUncompressedLength = LZ4Operation(compressedData, uncompressedSpan, LZ4CodecDecode); Debug.Assert(actualUncompressedLength == uncompressedLength, "Unexpected length of uncompressed data."); writer.Advance(actualUncompressedLength); return true; } } // Try to find LZ4BlockArray if (reader.NextMessagePackType == MessagePackType.Array) { MessagePackReader peekReader = reader.CreatePeekReader(); var arrayLength = peekReader.ReadArrayHeader(); if (arrayLength != 0 && peekReader.NextMessagePackType == MessagePackType.Extension) { ExtensionHeader header = peekReader.ReadExtensionFormatHeader(); if (header.TypeCode == ThisLibraryExtensionTypeCodes.Lz4BlockArray) { // switch peekReader as original reader. reader = peekReader; // Read from [Ext(98:int,int...), bin,bin,bin...] var sequenceCount = arrayLength - 1; var uncompressedLengths = ArrayPool.Shared.Rent(sequenceCount); try { for (int i = 0; i < sequenceCount; i++) { uncompressedLengths[i] = reader.ReadInt32(); } for (int i = 0; i < sequenceCount; i++) { var uncompressedLength = uncompressedLengths[i]; var lz4Block = reader.ReadBytes(); Span uncompressedSpan = writer.GetSpan(uncompressedLength).Slice(0, uncompressedLength); var actualUncompressedLength = LZ4Operation(lz4Block.Value, uncompressedSpan, LZ4CodecDecode); Debug.Assert(actualUncompressedLength == uncompressedLength, "Unexpected length of uncompressed data."); writer.Advance(actualUncompressedLength); } return true; } finally { ArrayPool.Shared.Return(uncompressedLengths); } } } } } return false; } private static void ToLZ4BinaryCore(in ReadOnlySequence msgpackUncompressedData, ref MessagePackWriter writer, MessagePackCompression compression) { if (compression == MessagePackCompression.Lz4Block) { if (msgpackUncompressedData.Length < LZ4NotCompressionSizeInLz4BlockType) { writer.WriteRaw(msgpackUncompressedData); return; } var maxCompressedLength = LZ4Codec.MaximumOutputLength((int)msgpackUncompressedData.Length); var lz4Span = ArrayPool.Shared.Rent(maxCompressedLength); try { int lz4Length = LZ4Operation(msgpackUncompressedData, lz4Span, LZ4CodecEncode); const int LengthOfUncompressedDataSizeHeader = 5; writer.WriteExtensionFormatHeader(new ExtensionHeader(ThisLibraryExtensionTypeCodes.Lz4Block, LengthOfUncompressedDataSizeHeader + (uint)lz4Length)); writer.WriteInt32((int)msgpackUncompressedData.Length); writer.WriteRaw(lz4Span.AsSpan(0, lz4Length)); } finally { ArrayPool.Shared.Return(lz4Span); } } else if (compression == MessagePackCompression.Lz4BlockArray) { // Write to [Ext(98:int,int...), bin,bin,bin...] var sequenceCount = 0; var extHeaderSize = 0; foreach (var item in msgpackUncompressedData) { sequenceCount++; extHeaderSize += GetUInt32WriteSize((uint)item.Length); } writer.WriteArrayHeader(sequenceCount + 1); writer.WriteExtensionFormatHeader(new ExtensionHeader(ThisLibraryExtensionTypeCodes.Lz4BlockArray, extHeaderSize)); { foreach (var item in msgpackUncompressedData) { writer.Write(item.Length); } } foreach (var item in msgpackUncompressedData) { var maxCompressedLength = LZ4Codec.MaximumOutputLength(item.Length); var lz4Span = writer.GetSpan(maxCompressedLength + 5); int lz4Length = LZ4Codec.Encode(item.Span, lz4Span.Slice(5, lz4Span.Length - 5)); WriteBin32Header((uint)lz4Length, lz4Span); writer.Advance(lz4Length + 5); } } else { throw new ArgumentException("Invalid MessagePackCompression Code. Code:" + compression); } } private static int GetUInt32WriteSize(uint value) { if (value <= MessagePackRange.MaxFixPositiveInt) { return 1; } else if (value <= byte.MaxValue) { return 2; } else if (value <= ushort.MaxValue) { return 3; } else { return 5; } } private static void WriteBin32Header(uint value, Span span) { unchecked { span[0] = MessagePackCode.Bin32; // Write to highest index first so the JIT skips bounds checks on subsequent writes. span[4] = (byte)value; span[3] = (byte)(value >> 8); span[2] = (byte)(value >> 16); span[1] = (byte)(value >> 24); } } private static class PrimitiveChecker { public static readonly bool IsMessagePackFixedSizePrimitive; static PrimitiveChecker() { IsMessagePackFixedSizePrimitive = IsMessagePackFixedSizePrimitiveTypeHelper(typeof(T)); } } private static bool IsMessagePackFixedSizePrimitiveTypeHelper(Type type) { return type == typeof(short) || type == typeof(int) || type == typeof(long) || type == typeof(ushort) || type == typeof(uint) || type == typeof(ulong) || type == typeof(float) || type == typeof(double) || type == typeof(bool) || type == typeof(byte) || type == typeof(sbyte) || type == typeof(char) ; } } }