Skip to content

Commit b5239a7

Browse files
authored
Merge pull request #44 from AzureCosmosDB/develop
JSON Source async Streams
2 parents f8184a7 + 9068e5a commit b5239a7

File tree

5 files changed

+152
-32
lines changed

5 files changed

+152
-32
lines changed

Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonSinkTests.cs

+77
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,88 @@ public async Task WriteAsync_WithFlatObjects_WritesToValidFile()
4444
Assert.IsTrue(outputData.Any(o => o.Id == 2 && o.Name == "Two"));
4545
Assert.IsTrue(outputData.Any(o => o.Id == 3 && o.Name == "Three"));
4646
}
47+
48+
[TestMethod]
49+
public async Task WriteAsync_WithSourceDates_PreservesDateFormats()
50+
{
51+
var sink = new JsonDataSinkExtension();
52+
53+
var now = DateTime.UtcNow;
54+
var randomTime = DateTime.UtcNow.AddMinutes(Random.Shared.NextDouble() * 10000);
55+
var data = new List<DictionaryDataItem>
56+
{
57+
new(new Dictionary<string, object?>
58+
{
59+
{ "Id", 1 },
60+
{ "Created", now },
61+
}),
62+
new(new Dictionary<string, object?>
63+
{
64+
{ "Id", 2 },
65+
{ "Created", DateTime.UnixEpoch },
66+
}),
67+
new(new Dictionary<string, object?>
68+
{
69+
{ "Id", 3 },
70+
{ "Created", randomTime },
71+
}),
72+
};
73+
string outputFile = $"{now:yy-MM-dd}_DateOutput.json";
74+
var config = TestHelpers.CreateConfig(new Dictionary<string, string>
75+
{
76+
{ "FilePath", outputFile }
77+
});
78+
79+
await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);
80+
81+
string json = await File.ReadAllTextAsync(outputFile);
82+
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(json);
83+
84+
Assert.IsTrue(outputData.Any(o => o.Id == 1 && o.Created == now));
85+
Assert.IsTrue(outputData.Any(o => o.Id == 2 && o.Created == DateTime.UnixEpoch));
86+
Assert.IsTrue(outputData.Any(o => o.Id == 3 && o.Created == randomTime));
87+
}
88+
89+
90+
[TestMethod]
91+
public async Task WriteAsync_WithDateArray_PreservesDateFormats()
92+
{
93+
var sink = new JsonDataSinkExtension();
94+
95+
var now = DateTime.UtcNow;
96+
var randomTime = DateTime.UtcNow.AddMinutes(Random.Shared.NextDouble() * 10000);
97+
var data = new List<DictionaryDataItem>
98+
{
99+
new(new Dictionary<string, object?>
100+
{
101+
{ "Id", 1 },
102+
{ "Dates", new[] { now, randomTime, DateTime.UnixEpoch } },
103+
})
104+
};
105+
106+
string outputFile = $"{now:yy-MM-dd}_DateArrayOutput.json";
107+
var config = TestHelpers.CreateConfig(new Dictionary<string, string>
108+
{
109+
{ "FilePath", outputFile }
110+
});
111+
112+
await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);
113+
114+
string json = await File.ReadAllTextAsync(outputFile);
115+
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(json);
116+
117+
Assert.AreEqual(now, outputData.Single().Dates.ElementAt(0));
118+
Assert.AreEqual(randomTime, outputData.Single().Dates.ElementAt(1));
119+
Assert.AreEqual(DateTime.UnixEpoch, outputData.Single().Dates.ElementAt(2));
120+
}
121+
47122
}
48123

49124
public class TestDataObject
50125
{
51126
public int Id { get; set; }
52127
public string? Name { get; set; }
128+
public DateTime? Created { get; set; }
129+
public List<DateTime>? Dates { get; set; }
53130
}
54131
}

Extensions/Json/Cosmos.DataTransfer.JsonExtension/Cosmos.DataTransfer.JsonExtension.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
1212
<PackageReference Include="System.ComponentModel.Composition" Version="6.0.0" />
13+
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
1314
</ItemGroup>
1415

1516
<ItemGroup>

Extensions/Json/Cosmos.DataTransfer.JsonExtension/JsonDataSourceExtension.cs

+62-29
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.ComponentModel.Composition;
22
using System.Runtime.CompilerServices;
3-
using System.Text;
43
using System.Text.Json;
54
using Cosmos.DataTransfer.Interfaces;
65
using Cosmos.DataTransfer.JsonExtension.Settings;
@@ -23,13 +22,16 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
2322
if (File.Exists(settings.FilePath))
2423
{
2524
logger.LogInformation("Reading file '{FilePath}'", settings.FilePath);
26-
var list = await ReadFileAsync(settings.FilePath, logger, cancellationToken);
25+
var list = ReadFileAsync(settings.FilePath, logger, cancellationToken);
2726

2827
if (list != null)
2928
{
30-
foreach (var listItem in list)
29+
await foreach (var listItem in list.WithCancellation(cancellationToken))
3130
{
32-
yield return new JsonDictionaryDataItem(listItem);
31+
if (listItem != null)
32+
{
33+
yield return new JsonDictionaryDataItem(listItem);
34+
}
3335
}
3436
}
3537
}
@@ -40,13 +42,16 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
4042
foreach (string filePath in files.OrderBy(f => f))
4143
{
4244
logger.LogInformation("Reading file '{FilePath}'", filePath);
43-
var list = await ReadFileAsync(filePath, logger, cancellationToken);
45+
var list = ReadFileAsync(filePath, logger, cancellationToken);
4446

4547
if (list != null)
4648
{
47-
foreach (var listItem in list)
49+
await foreach (var listItem in list.WithCancellation(cancellationToken))
4850
{
49-
yield return new JsonDictionaryDataItem(listItem);
51+
if (listItem != null)
52+
{
53+
yield return new JsonDictionaryDataItem(listItem);
54+
}
5055
}
5156
}
5257
}
@@ -63,15 +68,18 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
6368
yield break;
6469
}
6570

66-
var json = await response.Content.ReadAsStringAsync(cancellationToken);
71+
var json = await response.Content.ReadAsStreamAsync(cancellationToken);
6772

68-
var list = await ReadJsonItemsAsync(json, logger, cancellationToken);
73+
var list = ReadJsonItemsAsync(json, logger, cancellationToken);
6974

7075
if (list != null)
7176
{
72-
foreach (var listItem in list)
77+
await foreach (var listItem in list.WithCancellation(cancellationToken))
7378
{
74-
yield return new JsonDictionaryDataItem(listItem);
79+
if (listItem != null)
80+
{
81+
yield return new JsonDictionaryDataItem(listItem);
82+
}
7583
}
7684
}
7785
}
@@ -85,45 +93,70 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
8593
}
8694
}
8795

88-
private static async Task<List<Dictionary<string, object?>>?> ReadFileAsync(string filePath, ILogger logger, CancellationToken cancellationToken)
96+
private static IAsyncEnumerable<Dictionary<string, object?>?>? ReadFileAsync(string filePath, ILogger logger, CancellationToken cancellationToken)
8997
{
90-
var jsonText = await File.ReadAllTextAsync(filePath, cancellationToken);
91-
return await ReadJsonItemsAsync(jsonText, logger, cancellationToken);
98+
var jsonFile = File.OpenRead(filePath);
99+
return ReadJsonItemsAsync(jsonFile, logger, cancellationToken);
92100
}
93101

94-
private static async Task<List<Dictionary<string, object?>>?> ReadJsonItemsAsync(string jsonText, ILogger logger, CancellationToken cancellationToken)
102+
private static IAsyncEnumerable<Dictionary<string, object?>?>? ReadJsonItemsAsync(Stream jsonStream, ILogger logger, CancellationToken cancellationToken)
95103
{
104+
if (jsonStream is { CanSeek: true, Length: < 10485760L })
105+
{
106+
// test for single item in JSON
107+
var singleItemList = ReadSingleItemAsync(jsonStream, logger);
108+
if (singleItemList != null)
109+
{
110+
return singleItemList.ToAsyncEnumerable();
111+
}
112+
}
113+
96114
try
97115
{
98-
using MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(jsonText));
99-
return await JsonSerializer.DeserializeAsync<List<Dictionary<string, object?>>>(stream, cancellationToken: cancellationToken);
116+
jsonStream.Seek(0, SeekOrigin.Begin);
117+
return JsonSerializer.DeserializeAsyncEnumerable<Dictionary<string, object?>>(jsonStream, cancellationToken: cancellationToken);
100118
}
101-
catch (Exception ex)
119+
catch (Exception)
102120
{
103121
// list failed
104122
}
105123

106-
var list = new List<Dictionary<string, object?>>();
124+
return null;
125+
}
126+
127+
private static IEnumerable<Dictionary<string, object?>?>? ReadSingleItemAsync(Stream stream, ILogger logger)
128+
{
129+
Dictionary<string, object?>? item;
107130
try
108131
{
109-
using MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(jsonText));
110-
var item = await JsonSerializer.DeserializeAsync<Dictionary<string, object?>>(stream, cancellationToken: cancellationToken);
111-
if (item != null)
112-
{
113-
list.Add(item);
114-
}
132+
item = JsonSerializer.Deserialize<Dictionary<string, object?>>(stream);
115133
}
116-
catch (Exception ex)
134+
catch (Exception)
117135
{
118136
// single item failed
137+
return null;
119138
}
120139

121-
if (!list.Any())
140+
if (item != null)
141+
{
142+
return new[] { item };
143+
}
144+
145+
string textContent;
146+
try
147+
{
148+
var chars = new char[50];
149+
new StreamReader(stream).ReadBlock(chars, 0, chars.Length);
150+
textContent = new string(chars);
151+
}
152+
catch (Exception ex)
122153
{
123-
logger.LogWarning("No records read from '{Content}'", jsonText);
154+
logger.LogWarning(ex, "Failed to read stream");
155+
textContent = "<error>";
124156
}
157+
logger.LogWarning("No records read from '{Content}'", textContent);
125158

126-
return list;
159+
return null;
127160
}
128161
}
129162
}

Interfaces/Cosmos.DataTransfer.Interfaces/DataItemJsonConverter.cs

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Text;
1+
using System.Collections;
2+
using System.Text;
23
using System.Text.Json;
34

45
namespace Cosmos.DataTransfer.Interfaces;
@@ -52,7 +53,7 @@ private static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, obj
5253
{
5354
WriteDataItem(writer, child, includeNullFields, fieldName);
5455
}
55-
else if (fieldValue is IEnumerable<object> children)
56+
else if (fieldValue is not string && fieldValue is IEnumerable children)
5657
{
5758
writer.WriteStartArray(fieldName);
5859
foreach (object arrayItem in children)
@@ -69,6 +70,10 @@ private static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, obj
6970
{
7071
writer.WriteBooleanValue(boolean);
7172
}
73+
else if (arrayItem is DateTime date)
74+
{
75+
writer.WriteStringValue(date.ToString("O"));
76+
}
7277
else
7378
{
7479
writer.WriteStringValue(arrayItem.ToString());
@@ -84,6 +89,10 @@ private static void WriteFieldValue(Utf8JsonWriter writer, string fieldName, obj
8489
{
8590
writer.WriteBoolean(fieldName, boolean);
8691
}
92+
else if (fieldValue is DateTime date)
93+
{
94+
writer.WriteString(fieldName, date.ToString("O"));
95+
}
8796
else
8897
{
8998
writer.WriteString(fieldName, fieldValue.ToString());

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The Azure Cosmos DB Desktop Data Migration Tool is an open-source project contai
2626

2727
## Quick Installation
2828

29-
To use the tool, download the latest zip file for your platform (win-x64, mac-x64, or linux-x64) from [Releases](releases) and extract all files to your desired install location. To begin a data transfer operation, first populate the `migrationsettings.json` file with appropriate settings for your data source and sink (see [detailed instructions](#using-the-command-line) below), and then run the application from a command line: `dmt.exe` on Windows or `dmt` on other platforms.
29+
To use the tool, download the latest zip file for your platform (win-x64, mac-x64, or linux-x64) from [Releases](https://github.com/AzureCosmosDB/data-migration-desktop-tool/releases) and extract all files to your desired install location. To begin a data transfer operation, first populate the `migrationsettings.json` file with appropriate settings for your data source and sink (see [detailed instructions](#using-the-command-line) below), and then run the application from a command line: `dmt.exe` on Windows or `dmt` on other platforms.
3030

3131
## Extension documentation
3232

0 commit comments

Comments
 (0)