Skip to content

Commit c8ad5a8

Browse files
authored
Merge pull request #68 from AzureCosmosDB/develop
Minor fixes
2 parents 681c9a4 + 5c73141 commit c8ad5a8

File tree

9 files changed

+101
-69
lines changed

9 files changed

+101
-69
lines changed

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension.UnitTests/CosmosDataSinkExtensionTests.cs

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
namespace Cosmos.DataTransfer.CosmosExtension.UnitTests
1+
using Cosmos.DataTransfer.Interfaces;
2+
3+
namespace Cosmos.DataTransfer.CosmosExtension.UnitTests
24
{
35
[TestClass]
46
public class CosmosDataSinkExtensionTests
57
{
68
[TestMethod]
7-
public void BuildObject_WithNestedArrays_WorksCorrectly()
9+
public void BuildDynamicObjectTree_WithNestedArrays_WorksCorrectly()
810
{
911
var item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
1012
{
@@ -34,7 +36,7 @@ public void BuildObject_WithNestedArrays_WorksCorrectly()
3436
}
3537
});
3638

37-
dynamic obj = CosmosDataSinkExtension.BuildObject(item)!;
39+
dynamic obj = item.BuildDynamicObjectTree()!;
3840

3941
Assert.AreEqual(typeof(object[]), obj.array.GetType());
4042
Assert.AreEqual(2, obj.array.Length);

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj

-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
<OutputType>Exe</OutputType>
88
</PropertyGroup>
99

10-
<ItemGroup>
11-
<InternalsVisibleTo Include="Cosmos.DataTransfer.CosmosExtension.UnitTests" />
12-
</ItemGroup>
13-
1410
<ItemGroup>
1511
<PackageReference Include="Azure.Identity" Version="1.6.0" />
1612
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.34.0" />

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs

+1-55
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
using System.ComponentModel.Composition;
22
using System.Diagnostics;
33
using System.Dynamic;
4-
using System.Globalization;
5-
using System.Reflection;
64
using System.Text;
75
using System.Text.RegularExpressions;
8-
using Azure.Identity;
96
using Cosmos.DataTransfer.Interfaces;
107
using Microsoft.Azure.Cosmos;
118
using Microsoft.Extensions.Configuration;
@@ -95,7 +92,7 @@ void ReportCount(int i)
9592
}
9693
}
9794

98-
var convertedObjects = dataItems.Select(di => BuildObject(di, true)).Where(o => o != null).OfType<ExpandoObject>();
95+
var convertedObjects = dataItems.Select(di => di.BuildDynamicObjectTree(true)).Where(o => o != null).OfType<ExpandoObject>();
9996
var batches = convertedObjects.Buffer(settings.BatchSize);
10097
var retry = GetRetryPolicy(settings.MaxRetryCount, settings.InitialRetryDurationMs);
10198
await foreach (var batch in batches.WithCancellation(cancellationToken))
@@ -176,57 +173,6 @@ private static MemoryStream CreateItemStream(ExpandoObject item)
176173
return ((IDictionary<string, object?>)item)[propertyName]?.ToString();
177174
}
178175

179-
internal static ExpandoObject? BuildObject(IDataItem? source, bool requireStringId = false)
180-
{
181-
if (source == null)
182-
return null;
183-
184-
var fields = source.GetFieldNames().ToList();
185-
var item = new ExpandoObject();
186-
if (requireStringId && !fields.Contains("id", StringComparer.CurrentCultureIgnoreCase))
187-
{
188-
item.TryAdd("id", Guid.NewGuid().ToString());
189-
}
190-
foreach (string field in fields)
191-
{
192-
object? value = source.GetValue(field);
193-
var fieldName = field;
194-
if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId)
195-
{
196-
value = value?.ToString();
197-
fieldName = "id";
198-
}
199-
else if (value is IDataItem child)
200-
{
201-
value = BuildObject(child);
202-
}
203-
else if (value is IEnumerable<object?> array)
204-
{
205-
value = BuildArray(array);
206-
}
207-
208-
item.TryAdd(fieldName, value);
209-
}
210-
211-
return item;
212-
213-
static object BuildArray(IEnumerable<object?> array)
214-
{
215-
return array.Select(dataItem =>
216-
{
217-
if (dataItem is IDataItem childObject)
218-
{
219-
return BuildObject(childObject);
220-
}
221-
else if (dataItem is IEnumerable<object?> array)
222-
{
223-
return BuildArray(array);
224-
}
225-
return dataItem;
226-
}).ToArray();
227-
}
228-
}
229-
230176
public IEnumerable<IDataExtensionSettings> GetSettings()
231177
{
232178
yield return new CosmosSinkSettings();

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs

+8-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Globalization;
66
using System.Reflection;
77
using Azure.Core;
8+
using System.Text.RegularExpressions;
89

910
namespace Cosmos.DataTransfer.CosmosExtension
1011
{
@@ -44,16 +45,22 @@ private static string CreateUserAgentString(string displayName, string? sourceDi
4445
// Assembly.GetExecutingAssembly().GetName().Version,
4546
// context.SourceName, context.SinkName,
4647
// isShardedImport ? Resources.ShardedImportDesignator : String.Empty)
48+
string sourceName = StripSpecialChars(sourceDisplayName ?? "");
49+
string sinkName = StripSpecialChars(displayName);
4750

4851
var entryAssembly = Assembly.GetEntryAssembly();
4952
bool isShardedImport = false;
5053
string userAgentString = string.Format(CultureInfo.InvariantCulture, "{0}-{1}-{2}-{3}{4}",
5154
entryAssembly == null ? "dtr" : entryAssembly.GetName().Name,
5255
Assembly.GetExecutingAssembly().GetName().Version,
53-
sourceDisplayName, displayName,
56+
sourceName, sinkName,
5457
isShardedImport ? "-Sharded" : string.Empty);
5558
return userAgentString;
5659
}
60+
private static string StripSpecialChars(string displayName)
61+
{
62+
return Regex.Replace(displayName, "[^\\w]", "", RegexOptions.Compiled);
63+
}
5764

5865
public static async Task VerifyContainerAccess(Container? container, string? name, ILogger logger, CancellationToken cancellationToken)
5966
{

Extensions/Cosmos/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The Cosmos data transfer extension provides source and sink capabilities for rea
88

99
Source and sink require settings used to locate and access the Cosmos DB account. This can be done in one of two ways:
1010
- Using a `ConnectionString` that includes an AccountEndpoint and AccountKey
11-
- Using RBAC (Role Based Access Control) by setting `UseRbac` to true and specifying `AccountEndpoint` and optionally `EnableInteractiveCredentials` to prompt the user to log in to Azure if default credentials are not available.
11+
- Using RBAC (Role Based Access Control) by setting `UseRbacAuth` to true and specifying `AccountEndpoint` and optionally `EnableInteractiveCredentials` to prompt the user to log in to Azure if default credentials are not available.
1212

1313
Source and sink settings also both require parameters to specify the data location within a Cosmos DB account:
1414
- `Database`
@@ -33,7 +33,7 @@ Or with RBAC:
3333

3434
```json
3535
{
36-
"UseRbac": true,
36+
"UseRbacAuth": true,
3737
"AccountEndpoint": "https://...",
3838
"EnableInteractiveCredentials": true,
3939
"Database":"myDb",

Extensions/Json/Cosmos.DataTransfer.JsonExtension/JsonDictionaryDataItem.cs

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public IEnumerable<string> GetFieldNames()
3636
JsonValueKind kind = element.ValueKind;
3737
switch (kind)
3838
{
39+
case JsonValueKind.Null:
40+
return null;
3941
case JsonValueKind.String:
4042
return element.GetString();
4143
case JsonValueKind.Number:

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs

+10-2
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
2424
var batchSize = settings.BatchSize ?? 1000;
2525

2626
var objects = new List<BsonDocument>();
27-
await foreach (var item in dataItems)
27+
int itemCount = 0;
28+
await foreach (var item in dataItems.WithCancellation(cancellationToken))
2829
{
29-
var dict = item.GetFieldNames().ToDictionary(key => key, key => item.GetValue(key));
30+
var dict = item.BuildDynamicObjectTree();
3031
objects.Add(new BsonDocument(dict));
32+
itemCount++;
3133

3234
if (objects.Count == batchSize)
3335
{
3436
await repo.AddRange(objects);
37+
logger.LogInformation("Added {ItemCount} items to collection '{Collection}'", itemCount, settings.Collection);
3538
objects.Clear();
3639
}
3740
}
@@ -40,6 +43,11 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
4043
{
4144
await repo.AddRange(objects);
4245
}
46+
47+
if (itemCount > 0)
48+
logger.LogInformation("Added {ItemCount} total items to collection '{Collection}'", itemCount, settings.Collection);
49+
else
50+
logger.LogWarning("No items added to collection '{Collection}'", settings.Collection);
4351
}
4452
}
4553

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,28 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
2727

2828
foreach (var collection in collectionNames)
2929
{
30-
await foreach (var item in EnumerateCollectionAsync(context, collection))
30+
await foreach (var item in EnumerateCollectionAsync(context, collection, logger).WithCancellation(cancellationToken))
3131
{
3232
yield return item;
3333
}
3434
}
3535
}
3636
}
3737

38-
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName)
38+
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, ILogger logger)
3939
{
40+
logger.LogInformation("Reading collection '{Collection}'", collectionName);
4041
var collection = context.GetRepository<BsonDocument>(collectionName);
42+
int itemCount = 0;
4143
foreach (var record in collection.AsQueryable())
4244
{
4345
yield return new MongoDataItem(record);
46+
itemCount++;
4447
}
48+
if (itemCount > 0)
49+
logger.LogInformation("Read {ItemCount} items from collection '{Collection}'", itemCount, collectionName);
50+
else
51+
logger.LogWarning("No items read from collection '{Collection}'", collectionName);
4552
}
4653

4754
public IEnumerable<IDataExtensionSettings> GetSettings()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System.Dynamic;
2+
3+
namespace Cosmos.DataTransfer.Interfaces;
4+
5+
public static class DataItemExtensions
6+
{
7+
/// <summary>
8+
/// Given a source IDataItem, builds a dynamic object tree including child objects and arrays
9+
/// </summary>
10+
/// <param name="source"></param>
11+
/// <param name="requireStringId">If true, adds a new GUID "id" field to any top level items where one is not already present.</param>
12+
/// <returns>A dynamic object containing the entire data structure.</returns>
13+
/// <remarks>The returned ExpandoObject can be used directly as an IDictionary.</remarks>
14+
public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false)
15+
{
16+
if (source == null)
17+
return null;
18+
19+
var fields = source.GetFieldNames().ToList();
20+
var item = new ExpandoObject();
21+
if (requireStringId && !fields.Contains("id", StringComparer.CurrentCultureIgnoreCase))
22+
{
23+
item.TryAdd("id", Guid.NewGuid().ToString());
24+
}
25+
foreach (string field in fields)
26+
{
27+
object? value = source.GetValue(field);
28+
var fieldName = field;
29+
if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId)
30+
{
31+
value = value?.ToString();
32+
fieldName = "id";
33+
}
34+
else if (value is IDataItem child)
35+
{
36+
value = BuildDynamicObjectTree(child);
37+
}
38+
else if (value is IEnumerable<object?> array)
39+
{
40+
value = BuildArray(array);
41+
}
42+
43+
item.TryAdd(fieldName, value);
44+
}
45+
46+
return item;
47+
48+
static object BuildArray(IEnumerable<object?> array)
49+
{
50+
return array.Select(dataItem =>
51+
{
52+
switch (dataItem)
53+
{
54+
case IDataItem childObject:
55+
return BuildDynamicObjectTree(childObject);
56+
case IEnumerable<object?> array:
57+
return BuildArray(array);
58+
default:
59+
return dataItem;
60+
}
61+
}).ToArray();
62+
}
63+
}
64+
}

0 commit comments

Comments
 (0)