@@ -23,18 +23,41 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
23
23
24
24
await tableClient . CreateIfNotExistsAsync ( cancellationToken ) ;
25
25
26
+ await foreach ( var batch in GetBatches ( dataItems , settings ) . WithCancellation ( cancellationToken ) )
27
+ {
28
+ await InnerWriteAsync ( batch , tableClient , logger , cancellationToken ) ;
29
+ }
30
+ }
31
+
32
+ private static async IAsyncEnumerable < List < TableEntity > > GetBatches ( IAsyncEnumerable < IDataItem > dataItems , AzureTableAPIDataSinkSettings settings )
33
+ {
26
34
var entities = new List < TableEntity > ( ) ;
35
+ var first = true ;
36
+ var partitionKey = string . Empty ;
27
37
28
- await foreach ( var item in dataItems . WithCancellation ( cancellationToken ) )
38
+ await foreach ( var item in dataItems )
29
39
{
30
- var entity = item . ToTableEntity ( settings . PartitionKeyFieldName , settings . RowKeyFieldName ) ;
31
- entities . Add ( entity ) ;
40
+ var tableEntity = item . ToTableEntity ( settings . PartitionKeyFieldName , settings . RowKeyFieldName ) ;
41
+
42
+ if ( first )
43
+ {
44
+ partitionKey = tableEntity . PartitionKey ;
45
+ first = false ;
46
+ }
32
47
33
- if ( entities . Count == 100 )
48
+ if ( ! tableEntity . PartitionKey . Equals ( partitionKey ) || entities . Count == 100 )
34
49
{
35
- await InnerWriteAsync ( entities , tableClient , logger , cancellationToken ) ;
36
- entities . Clear ( ) ;
50
+ yield return entities ;
51
+ entities = new List < TableEntity > ( ) ;
52
+ partitionKey = tableEntity . PartitionKey ;
37
53
}
54
+
55
+ entities . Add ( tableEntity ) ;
56
+ }
57
+
58
+ if ( entities . Count > 0 )
59
+ {
60
+ yield return entities ;
38
61
}
39
62
}
40
63
0 commit comments