Skip to content

Commit 68c0830

Browse files
Merge pull request #2 from GuillaumeVadeBe/feature/add-batching
Added batching
2 parents b7b2756 + b205507 commit 68c0830

File tree

1 file changed

+40
-4
lines changed

1 file changed

+40
-4
lines changed

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSinkExtension.cs

+40-4
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,49 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
2121
var serviceClient = new TableServiceClient(settings.ConnectionString);
2222
var tableClient = serviceClient.GetTableClient(settings.Table);
2323

24-
await tableClient.CreateIfNotExistsAsync();
24+
await tableClient.CreateIfNotExistsAsync(cancellationToken);
2525

26-
await Parallel.ForEachAsync(dataItems, cancellationToken, async (item, token) =>
26+
var entities = new List<TableEntity>();
27+
28+
await foreach (var item in dataItems.WithCancellation(cancellationToken))
2729
{
2830
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
29-
await tableClient.AddEntityAsync(entity, token);
30-
});
31+
entities.Add(entity);
32+
33+
if (entities.Count == 100)
34+
{
35+
await InnerWriteAsync(entities, tableClient, logger, cancellationToken);
36+
entities.Clear();
37+
}
38+
}
39+
}
40+
41+
private static async Task InnerWriteAsync(List<TableEntity> tableEntities, TableClient tableClient, ILogger logger, CancellationToken cancellationToken)
42+
{
43+
var transactionsActions = tableEntities.Select(e => new TableTransactionAction(TableTransactionActionType.Add, e));
44+
45+
try
46+
{
47+
await tableClient.SubmitTransactionAsync(transactionsActions, cancellationToken);
48+
return;
49+
}
50+
catch (Exception e)
51+
{
52+
logger.LogError(e, "Batch transaction failed, processing entities one by one instead.");
53+
}
54+
55+
foreach (var entity in tableEntities)
56+
{
57+
try
58+
{
59+
// Do an upsert here because there could already be some successful entities added from the batch
60+
await tableClient.UpsertEntityAsync(entity, TableUpdateMode.Replace, cancellationToken);
61+
}
62+
catch (Exception e)
63+
{
64+
logger.LogError(e, "Adding a single entity failed, continuing with other entities.");
65+
}
66+
}
3167
}
3268

3369
public IEnumerable<IDataExtensionSettings> GetSettings()

0 commit comments

Comments
 (0)