Skip to content

Commit

Permalink
More async InMemoryFileStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored and ejsmith committed Feb 15, 2021
1 parent b6b76c2 commit faed558
Showing 1 changed file with 123 additions and 56 deletions.
179 changes: 123 additions & 56 deletions src/Foundatio/Storage/InMemoryFileStorage.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -11,8 +13,9 @@

namespace Foundatio.Storage {
public class InMemoryFileStorage : IWritableStream {
private readonly Dictionary<string, Tuple<FileSpec, byte[]>> _storage = new Dictionary<string, Tuple<FileSpec, byte[]>>(StringComparer.OrdinalIgnoreCase);
private readonly object _lock = new object();
private readonly Dictionary<string, Tuple<FileSpec, byte[]>> _storage = new (StringComparer.OrdinalIgnoreCase);
private readonly SemaphoreSlim _semaphore = new (initialCount: 1, maxCount: 1);

private readonly ISerializer _serializer;

public InMemoryFileStorage() : this(o => o) {}
Expand Down Expand Up @@ -44,34 +47,41 @@ public RegisteringMemoryStream(InMemoryFileStorage storage, string path) {
void IDisposable.Dispose() {
if (Length > 0) {
var contents = ToArray();

lock (_memoryStorage._lock) {
_memoryStorage._storage[_path] = Tuple.Create(new FileSpec {
var entry = Tuple.Create(new FileSpec {
Created = SystemClock.UtcNow,
Modified = SystemClock.UtcNow,
Path = _path,
Size = contents.Length
}, contents);

_memoryStorage.Lock();
try {
_memoryStorage._storage[_path] = entry;

if (_memoryStorage._storage.Count > _memoryStorage.MaxFiles)
_memoryStorage._storage.Remove(_memoryStorage._storage.OrderByDescending(kvp => kvp.Value.Item1.Created).First().Key);
} finally {
_memoryStorage.Unlock();
}
}

base.Dispose();
}
}

public Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
public async Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

path = path.NormalizePath();
lock (_lock) {
if (!_storage.ContainsKey(path))
return Task.FromResult<Stream>(null);

return Task.FromResult<Stream>(new MemoryStream(_storage[path].Item2));
await LockAsync(cancellationToken);
try {
return !_storage.ContainsKey(path)
? null
: new MemoryStream(_storage[path].Item2);
} finally {
Unlock();
}
}

Expand Down Expand Up @@ -99,102 +109,129 @@ public Task<bool> ExistsAsync(string path) {
return Task.FromResult(_storage.ContainsKey(path));
}

private static byte[] ReadBytes(Stream input) {
using (var ms = new MemoryStream()) {
input.CopyTo(ms);
return ms.ToArray();
}
private static async ValueTask<byte[]> ReadBytesAsync(Stream input) {
using var ms = new MemoryStream();

await input.CopyToAsync(ms);
return ms.ToArray();
}

public Task<bool> SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default) {
public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

if (stream == null)
throw new ArgumentNullException(nameof(stream));

path = path.NormalizePath();
var contents = ReadBytes(stream);
var contents = await ReadBytesAsync(stream);
if (contents.Length > MaxFileSize)
throw new ArgumentException(String.Format("File size {0} exceeds the maximum size of {1}.", contents.Length.ToFileSizeDisplay(), MaxFileSize.ToFileSizeDisplay()));
throw new ArgumentException($"File size {contents.Length.ToFileSizeDisplay()} exceeds the maximum size of {MaxFileSize.ToFileSizeDisplay()}.");

lock (_lock) {
_storage[path] = Tuple.Create(new FileSpec {
var entry = Tuple.Create(new FileSpec {
Created = SystemClock.UtcNow,
Modified = SystemClock.UtcNow,
Path = path,
Size = contents.Length
}, contents);

await LockAsync(cancellationToken);
try {
_storage[path] = entry;

if (_storage.Count > MaxFiles)
_storage.Remove(_storage.OrderByDescending(kvp => kvp.Value.Item1.Created).First().Key);
} finally {
Unlock();
}

return Task.FromResult(true);
return true;
}

public Task<bool> RenameFileAsync(string path, string newPath, CancellationToken cancellationToken = default) {
public async Task<bool> RenameFileAsync(string path, string newPath, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));
if (String.IsNullOrEmpty(newPath))
throw new ArgumentNullException(nameof(newPath));

path = path.NormalizePath();
newPath = newPath.NormalizePath();
lock (_lock) {
if (!_storage.ContainsKey(path))
return Task.FromResult(false);

_storage[newPath] = _storage[path];
_storage[newPath].Item1.Path = newPath;
_storage[newPath].Item1.Modified = SystemClock.UtcNow;
await LockAsync(cancellationToken);
try {
if (!_storage.TryGetValue(path, out var entry)) {
return false;
}

Debug.Assert(entry != null);

entry.Item1.Path = newPath;
entry.Item1.Modified = SystemClock.UtcNow;

_storage[newPath] = entry;

_storage.Remove(path);
} finally {
Unlock();
}

return Task.FromResult(true);
return true;
}

public Task<bool> CopyFileAsync(string path, string targetPath, CancellationToken cancellationToken = default) {
public async Task<bool> CopyFileAsync(string path, string targetPath, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));
if (String.IsNullOrEmpty(targetPath))
throw new ArgumentNullException(nameof(targetPath));

path = path.NormalizePath();
targetPath = targetPath.NormalizePath();
lock (_lock) {
if (!_storage.ContainsKey(path))
return Task.FromResult(false);

_storage[targetPath] = _storage[path];
_storage[targetPath].Item1.Path = targetPath;
_storage[targetPath].Item1.Modified = SystemClock.UtcNow;
await LockAsync(cancellationToken);
try {
if (!_storage.TryGetValue(path, out var entry))
return false;

var newEntry = Tuple.Create(new FileSpec {
Created = entry.Item1.Created,
Modified = SystemClock.UtcNow,
Path = targetPath,
Size = entry.Item1.Size
}, entry.Item2);

_storage[targetPath] = newEntry;

} finally {
Unlock();
}

return Task.FromResult(true);
return true;
}

public Task<bool> DeleteFileAsync(string path, CancellationToken cancellationToken = default) {
public async Task<bool> DeleteFileAsync(string path, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

path = path.NormalizePath();
lock (_lock) {
if (!_storage.ContainsKey(path))
return Task.FromResult(false);

_storage.Remove(path);
await LockAsync(cancellationToken);
try {
return _storage.Remove(path);
} finally {
Unlock();
}

return Task.FromResult(true);
}

public Task<int> DeleteFilesAsync(string searchPattern = null, CancellationToken cancellation = default) {
public async Task<int> DeleteFilesAsync(string searchPattern = null, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(searchPattern) || searchPattern == "*") {
lock(_lock)
await LockAsync(cancellationToken);
try {
_storage.Clear();
} finally {
Unlock();
}

return Task.FromResult(0);
return 0;
}

searchPattern = searchPattern.NormalizePath();
Expand All @@ -206,15 +243,21 @@ public Task<int> DeleteFilesAsync(string searchPattern = null, CancellationToken
searchPattern = Path.Combine(searchPattern, "*");

var regex = new Regex("^" + Regex.Escape(searchPattern).Replace("\\*", ".*?") + "$");
lock (_lock) {
var keys = _storage.Keys.Where(k => regex.IsMatch(k)).Select(k => _storage[k].Item1).ToList();
await LockAsync(cancellationToken);
try {
var keys = _storage.Keys.Where(k => regex.IsMatch(k))
.Select(k => _storage[k].Item1)
.ToList();

foreach (var key in keys) {
_storage.Remove(key.Path);
count++;
}
} finally {
Unlock();
}

return Task.FromResult(count);
return count;
}

public async Task<PagedFileListResult> GetPagedFileListAsync(int pageSize = 100, string searchPattern = null, CancellationToken cancellationToken = default) {
Expand All @@ -226,12 +269,12 @@ public async Task<PagedFileListResult> GetPagedFileListAsync(int pageSize = 100,

searchPattern = searchPattern.NormalizePath();

var result = new PagedFileListResult(s => Task.FromResult(GetFiles(searchPattern, 1, pageSize)));
var result = new PagedFileListResult(s => GetFilesAsync(searchPattern, 1, pageSize, cancellationToken));
await result.NextPageAsync().AnyContext();
return result;
}

private NextPageResult GetFiles(string searchPattern, int page, int pageSize) {
private async Task<NextPageResult> GetFilesAsync(string searchPattern, int page, int pageSize, CancellationToken cancellationToken) {
var list = new List<FileSpec>();
int pagingLimit = pageSize;
int skip = (page - 1) * pagingLimit;
Expand All @@ -240,8 +283,17 @@ private NextPageResult GetFiles(string searchPattern, int page, int pageSize) {

var regex = new Regex("^" + Regex.Escape(searchPattern).Replace("\\*", ".*?") + "$");

lock (_lock)
list.AddRange(_storage.Keys.Where(k => regex.IsMatch(k)).Select(k => _storage[k].Item1).Skip(skip).Take(pagingLimit).ToList());

await LockAsync(cancellationToken);
try {
list.AddRange(_storage.Keys.Where(k => regex.IsMatch(k))
.Select(k => _storage[k].Item1)
.Skip(skip)
.Take(pagingLimit)
.ToList());
} finally {
Unlock();
}

bool hasMore = false;
if (list.Count == pagingLimit) {
Expand All @@ -253,12 +305,27 @@ private NextPageResult GetFiles(string searchPattern, int page, int pageSize) {
Success = true,
HasMore = hasMore,
Files = list,
NextPageFunc = hasMore ? s => Task.FromResult(GetFiles(searchPattern, page + 1, pageSize)) : (Func<PagedFileListResult, Task<NextPageResult>>)null
NextPageFunc = hasMore ? s => GetFilesAsync(searchPattern, page + 1, pageSize, cancellationToken) : null
};
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ConfiguredTaskAwaitable LockAsync(CancellationToken cancellationToken)
=> _semaphore.WaitAsync(cancellationToken).AnyContext();

private void Lock()
=> _semaphore.Wait();

private void Unlock()
=> _semaphore.Release();

public void Dispose() {
_storage?.Clear();
Lock();
try {
_storage?.Clear();
} finally {
Unlock();
}
}
}
}

0 comments on commit faed558

Please sign in to comment.