Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async operations to serializer. Add ability to get read or write streams from IFileStorage #260

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Foundatio.JsonNet/JsonNetSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Foundatio.Serializer {
Expand All @@ -21,5 +23,14 @@ public object Deserialize(Stream inputStream, Type objectType) {
using (var reader = new JsonTextReader(sr))
return _serializer.Deserialize(reader, objectType);
}

public Task SerializeAsync(object data, Stream outputStream, CancellationToken cancellationToken) {
Serialize(data, outputStream);
return Task.CompletedTask;
}

public ValueTask<object> DeserializeAsync(Stream inputStream, Type objectType, CancellationToken cancellationToken) {
return new ValueTask<object>(Deserialize(inputStream, objectType));
}
}
}
18 changes: 14 additions & 4 deletions src/Foundatio.MessagePack/MessagePackSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using MessagePack.Resolvers;

Expand All @@ -11,12 +13,20 @@ public MessagePackSerializer(MessagePackSerializerOptions options = null) {
_options = options ?? MessagePackSerializerOptions.Standard.WithResolver(ContractlessStandardResolver.Instance);
}

public void Serialize(object data, Stream output) {
MessagePack.MessagePackSerializer.Serialize(data.GetType(), output, data, _options);
public void Serialize(object data, Stream outputStream) {
MessagePack.MessagePackSerializer.Serialize(data.GetType(), outputStream, data, _options);
}

public object Deserialize(Stream input, Type objectType) {
return MessagePack.MessagePackSerializer.Deserialize(objectType, input, _options);
public object Deserialize(Stream inputStream, Type objectType) {
return MessagePack.MessagePackSerializer.Deserialize(objectType, inputStream, _options);
}

public Task SerializeAsync(object data, Stream outputStream, CancellationToken cancellationToken) {
return MessagePack.MessagePackSerializer.SerializeAsync(data.GetType(), outputStream, data, _options);
}

public ValueTask<object> DeserializeAsync(Stream inputStream, Type objectType, CancellationToken cancellationToken) {
return MessagePack.MessagePackSerializer.DeserializeAsync(objectType, inputStream, _options);
}
}
}
18 changes: 14 additions & 4 deletions src/Foundatio.Utf8Json/Utf8JsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Utf8Json;
using Utf8Json.Resolvers;

Expand All @@ -11,12 +13,20 @@ public Utf8JsonSerializer(IJsonFormatterResolver resolver = null) {
_formatterResolver = resolver ?? StandardResolver.Default;
}

public void Serialize(object data, Stream output) {
JsonSerializer.NonGeneric.Serialize(data.GetType(), output, data, _formatterResolver);
public void Serialize(object data, Stream outputStream) {
JsonSerializer.NonGeneric.Serialize(data.GetType(), outputStream, data, _formatterResolver);
}

public object Deserialize(Stream input, Type objectType) {
return JsonSerializer.NonGeneric.Deserialize(objectType, input, _formatterResolver);
public object Deserialize(Stream inputStream, Type objectType) {
return JsonSerializer.NonGeneric.Deserialize(objectType, inputStream, _formatterResolver);
}

public Task SerializeAsync(object data, Stream outputStream, CancellationToken cancellationToken) {
return JsonSerializer.NonGeneric.SerializeAsync(data.GetType(), outputStream, data, _formatterResolver);
}

public ValueTask<object> DeserializeAsync(Stream inputStream, Type objectType, CancellationToken cancellationToken) {
return new ValueTask<object>(JsonSerializer.NonGeneric.DeserializeAsync(objectType, inputStream, _formatterResolver));
}
}
}
5 changes: 5 additions & 0 deletions src/Foundatio/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public static ConfiguredTaskAwaitable<TResult> AnyContext<TResult>(this Task<TRe
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
public static ConfiguredValueTaskAwaitable<TResult> AnyContext<TResult>(this ValueTask<TResult> task) {
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
public static ConfiguredCancelableAsyncEnumerable<TResult> AnyContext<TResult>(this IAsyncEnumerable<TResult> source) {
return source.ConfigureAwait(continueOnCapturedContext: false);
Expand Down
11 changes: 9 additions & 2 deletions src/Foundatio/Serializer/ISerializer.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Foundatio.Serializer {
public interface ISerializer {
object Deserialize(Stream data, Type objectType);
void Serialize(object value, Stream output);
void Serialize(object value, Stream outputStream);
object Deserialize(Stream inputStream, Type objectType);
Task SerializeAsync(object data, Stream outputStream, CancellationToken cancellationToken);
ValueTask<object> DeserializeAsync(Stream inputStream, Type objectType, CancellationToken cancellationToken);
}

/// <summary>
/// Marker interface indicating that the underlying serialization format is text based (ie. JSON)
/// </summary>
public interface ITextSerializer : ISerializer {}

public static class DefaultSerializer {
Expand Down
10 changes: 10 additions & 0 deletions src/Foundatio/Serializer/SystemTextJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.IO;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;

namespace Foundatio.Serializer {
public class SystemTextJsonSerializer : ITextSerializer {
Expand Down Expand Up @@ -33,6 +35,14 @@ public object Deserialize(Stream inputStream, Type objectType) {
using var reader = new StreamReader(inputStream);
return JsonSerializer.Deserialize(reader.ReadToEnd(), objectType, _deserializeOptions);
}

public Task SerializeAsync(object data, Stream outputStream, CancellationToken cancellationToken) {
return JsonSerializer.SerializeAsync(outputStream, data, data.GetType(), _serializeOptions, cancellationToken);
}

public ValueTask<object> DeserializeAsync(Stream inputStream, Type objectType, CancellationToken cancellationToken) {
return JsonSerializer.DeserializeAsync(inputStream, objectType, _deserializeOptions, cancellationToken);
}
}

public class ObjectToInferredTypesConverter : JsonConverter<object> {
Expand Down
9 changes: 7 additions & 2 deletions src/Foundatio/Storage/ActionableStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@

namespace Foundatio.Storage {
public class ActionableStream : Stream {
private readonly Action _disposeAction;
private readonly Action<Stream> _disposeAction;
private readonly Stream _stream;

protected override void Dispose(bool disposing) {
try {
_disposeAction.Invoke();
_disposeAction.Invoke(_stream);
} catch { /* ignore if these are already disposed; this is to make sure they are */ }

_stream.Dispose();
base.Dispose(disposing);
}

public ActionableStream(Stream stream, Action disposeAction) {
_stream = stream ?? throw new ArgumentNullException();
_disposeAction = s => disposeAction();
}

public ActionableStream(Stream stream, Action<Stream> disposeAction) {
_stream = stream ?? throw new ArgumentNullException();
_disposeAction = disposeAction;
}
Expand Down
20 changes: 18 additions & 2 deletions src/Foundatio/Storage/FolderFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,33 @@ public FolderFileStorage(Builder<FolderFileStorageOptionsBuilder, FolderFileStor
public string Folder { get; set; }
ISerializer IHaveSerializer.Serializer => _serializer;

public Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
public Task<Stream> GetFileStreamAsync(string path, FileAccess access, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

path = path.NormalizePath();
path = Path.Combine(Folder, path);

try {
return Task.FromResult<Stream>(File.OpenRead(Path.Combine(Folder, path)));
if (access == FileAccess.Read) {
return Task.FromResult<Stream>(File.OpenRead(path));
} else if (access == FileAccess.Write) {
string directory = Path.GetDirectoryName(path);
if (directory != null)
Directory.CreateDirectory(directory);

return Task.FromResult<Stream>(File.OpenWrite(path));
} else {
string directory = Path.GetDirectoryName(path);
if (directory != null)
Directory.CreateDirectory(directory);

return Task.FromResult<Stream>(File.Open(path, FileMode.OpenOrCreate, FileAccess.ReadWrite));
}
} catch (IOException ex) when (ex is FileNotFoundException || ex is DirectoryNotFoundException) {
if (_logger.IsEnabled(LogLevel.Trace))
_logger.LogTrace(ex, "Error trying to get file stream: {Path}", path);

return Task.FromResult<Stream>(null);
}
}
Expand Down
30 changes: 24 additions & 6 deletions src/Foundatio/Storage/IFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

namespace Foundatio.Storage {
public interface IFileStorage : IHaveSerializer, IDisposable {
Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default);
Task<Stream> GetFileStreamAsync(string path, FileAccess access, CancellationToken cancellationToken = default);
Task<FileSpec> GetFileInfoAsync(string path);
Task<bool> ExistsAsync(string path);
Task<bool> SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default);
Task<bool> RenameFileAsync(string path, string newPath, CancellationToken cancellationToken = default);
Task<bool> CopyFileAsync(string path, string targetPath, CancellationToken cancellationToken = default);
Task<bool> DeleteFileAsync(string path, CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -91,12 +90,31 @@ public class FileSpec {
}

public static class FileStorageExtensions {
public static Task<bool> SaveObjectAsync<T>(this IFileStorage storage, string path, T data, CancellationToken cancellationToken = default) {
public static Task<Stream> GetFileStreamAsync(this IFileStorage storage, string path, CancellationToken cancellationToken = default) {
return storage.GetFileStreamAsync(path, FileAccess.Read, cancellationToken);
}

public static async Task<bool> SaveFileAsync(this IFileStorage storage, string path, Stream inputStream, CancellationToken cancellationToken = default) {
using (var stream = await storage.GetFileStreamAsync(path, FileAccess.Write, cancellationToken).AnyContext()) {
if (stream == null)
throw new IOException("Unable to get writable file stream from storage.");

await inputStream.CopyToAsync(stream);
return true;
}
}

public static async Task<bool> SaveObjectAsync<T>(this IFileStorage storage, string path, T data, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

var bytes = storage.Serializer.SerializeToBytes(data);
return storage.SaveFileAsync(path, new MemoryStream(bytes), cancellationToken);
using (var stream = await storage.GetFileStreamAsync(path, FileAccess.Write, cancellationToken).AnyContext()) {
if (stream == null)
throw new IOException("Unable to get writable file stream from storage.");

await storage.Serializer.SerializeAsync(data, stream, cancellationToken).AnyContext();
return true;
}
}

public static async Task<T> GetObjectAsync<T>(this IFileStorage storage, string path, CancellationToken cancellationToken = default) {
Expand All @@ -105,7 +123,7 @@ public static async Task<T> GetObjectAsync<T>(this IFileStorage storage, string

using (var stream = await storage.GetFileStreamAsync(path, cancellationToken).AnyContext()) {
if (stream != null)
return storage.Serializer.Deserialize<T>(stream);
return (T)await storage.Serializer.DeserializeAsync(stream, typeof(T), cancellationToken).AnyContext();
}

return default;
Expand Down
69 changes: 57 additions & 12 deletions src/Foundatio/Storage/InMemoryFileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class InMemoryFileStorage : IFileStorage {
private readonly object _lock = new object();
private readonly ISerializer _serializer;

public InMemoryFileStorage() : this(o => o) {}
public InMemoryFileStorage() : this(o => o) { }

public InMemoryFileStorage(InMemoryFileStorageOptions options) {
if (options == null)
Expand All @@ -26,23 +26,68 @@ public InMemoryFileStorage(InMemoryFileStorageOptions options) {
_serializer = options.Serializer ?? DefaultSerializer.Instance;
}

public InMemoryFileStorage(Builder<InMemoryFileStorageOptionsBuilder, InMemoryFileStorageOptions> config)
public InMemoryFileStorage(Builder<InMemoryFileStorageOptionsBuilder, InMemoryFileStorageOptions> config)
: this(config(new InMemoryFileStorageOptionsBuilder()).Build()) { }

public long MaxFileSize { get; set; }
public long MaxFiles { get; set; }
ISerializer IHaveSerializer.Serializer => _serializer;

public Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
public Task<Stream> GetFileStreamAsync(string path, FileAccess access, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

path = path.NormalizePath();
lock (_lock) {
if (access == FileAccess.Read) {
if (!_storage.ContainsKey(path))
return Task.FromResult<Stream>(null);

return Task.FromResult<Stream>(new MemoryStream(_storage[path].Item2));
lock (_lock) {
return Task.FromResult<Stream>(new MemoryStream(_storage[path].Item2));
}
} else if (access == FileAccess.Write) {
return Task.FromResult<Stream>(new ActionableStream(new MemoryStream(), s => {
lock (_lock) {
var memoryStream = (MemoryStream)s;
if (memoryStream.Length > 0) {
var contents = memoryStream.ToArray();

_storage[path] = Tuple.Create(new FileSpec {
Created = SystemClock.UtcNow,
Modified = SystemClock.UtcNow,
Path = path,
Size = contents.Length
}, contents);

if (_storage.Count > MaxFiles)
_storage.Remove(_storage.OrderByDescending(kvp => kvp.Value.Item1.Created).First().Key);
}
}
}));
} else {
MemoryStream ms;
lock (_lock) {
ms = _storage.ContainsKey(path) ? new MemoryStream(_storage[path].Item2) : new MemoryStream();
}

return Task.FromResult<Stream>(new ActionableStream(ms, s => {
lock (_lock) {
var memoryStream = (MemoryStream)s;
if (memoryStream.Length > 0) {
var contents = memoryStream.ToArray();

_storage[path] = Tuple.Create(new FileSpec {
Created = SystemClock.UtcNow,
Modified = SystemClock.UtcNow,
Path = path,
Size = contents.Length
}, contents);

if (_storage.Count > MaxFiles)
_storage.Remove(_storage.OrderByDescending(kvp => kvp.Value.Item1.Created).First().Key);
}
}
}));
}
}

Expand Down Expand Up @@ -154,7 +199,7 @@ public Task<bool> DeleteFileAsync(string path, CancellationToken cancellationTok

public Task<int> DeleteFilesAsync(string searchPattern = null, CancellationToken cancellation = default) {
if (String.IsNullOrEmpty(searchPattern) || searchPattern == "*") {
lock(_lock)
lock (_lock)
_storage.Clear();

return Task.FromResult(0);
Expand All @@ -163,7 +208,7 @@ public Task<int> DeleteFilesAsync(string searchPattern = null, CancellationToken
searchPattern = searchPattern.NormalizePath();
int count = 0;

if (searchPattern[searchPattern.Length - 1] == Path.DirectorySeparatorChar)
if (searchPattern[searchPattern.Length - 1] == Path.DirectorySeparatorChar)
searchPattern = $"{searchPattern}*";
else if (!searchPattern.EndsWith(Path.DirectorySeparatorChar + "*") && !Path.HasExtension(searchPattern))
searchPattern = Path.Combine(searchPattern, "*");
Expand Down Expand Up @@ -200,23 +245,23 @@ private NextPageResult GetFiles(string searchPattern, int page, int pageSize) {
int skip = (page - 1) * pagingLimit;
if (pagingLimit < Int32.MaxValue)
pagingLimit += 1;

var regex = new Regex("^" + Regex.Escape(searchPattern).Replace("\\*", ".*?") + "$");

lock (_lock)
list.AddRange(_storage.Keys.Where(k => regex.IsMatch(k)).Select(k => _storage[k].Item1).Skip(skip).Take(pagingLimit).ToList());

bool hasMore = false;
if (list.Count == pagingLimit) {
hasMore = true;
list.RemoveAt(pagingLimit - 1);
}

return new NextPageResult {
Success = true,
HasMore = hasMore,
Success = true,
HasMore = hasMore,
Files = list,
NextPageFunc = hasMore ? s => Task.FromResult(GetFiles(searchPattern, page + 1, pageSize)) : (Func<PagedFileListResult, Task<NextPageResult>>)null
NextPageFunc = hasMore ? s => Task.FromResult(GetFiles(searchPattern, page + 1, pageSize)) : (Func<PagedFileListResult, Task<NextPageResult>>)null
};
}

Expand Down
Loading