Skip to content

Commit 5dd9523

Browse files
Merge pull request #3 from GuillaumeVadeBe/feature/more-batching
Fixed batching
2 parents 68c0830 + bb78c55 commit 5dd9523

File tree

1 file changed

+29
-6
lines changed

1 file changed

+29
-6
lines changed

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSinkExtension.cs

+29-6
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,41 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
2323

2424
await tableClient.CreateIfNotExistsAsync(cancellationToken);
2525

26+
await Parallel.ForEachAsync(GetBatches(dataItems, settings), new ParallelOptions() { MaxDegreeOfParallelism = 8 }, async (batch, token) =>
27+
{
28+
await InnerWriteAsync(batch, tableClient, logger, token);
29+
});
30+
}
31+
32+
private static async IAsyncEnumerable<List<TableEntity>> GetBatches(IAsyncEnumerable<IDataItem> dataItems, AzureTableAPIDataSinkSettings settings)
33+
{
2634
var entities = new List<TableEntity>();
35+
var first = true;
36+
var partitionKey = string.Empty;
2737

28-
await foreach (var item in dataItems.WithCancellation(cancellationToken))
38+
await foreach (var item in dataItems)
2939
{
30-
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
31-
entities.Add(entity);
40+
var tableEntity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
3241

33-
if (entities.Count == 100)
42+
if (first)
3443
{
35-
await InnerWriteAsync(entities, tableClient, logger, cancellationToken);
36-
entities.Clear();
44+
partitionKey = tableEntity.PartitionKey;
45+
first = false;
3746
}
47+
48+
if (!tableEntity.PartitionKey.Equals(partitionKey) || entities.Count == 100)
49+
{
50+
yield return entities;
51+
entities = new List<TableEntity>();
52+
partitionKey = tableEntity.PartitionKey;
53+
}
54+
55+
entities.Add(tableEntity);
56+
}
57+
58+
if (entities.Count > 0)
59+
{
60+
yield return entities;
3861
}
3962
}
4063

0 commit comments

Comments
 (0)