Skip to content

Commit 778d005

Browse files
committed
introduce DataWriteMode.Delete to undo inserts to the wrong container
1 parent 434fa79 commit 778d005

File tree

3 files changed

+28
-5
lines changed

3 files changed

+28
-5
lines changed

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs

+15-4
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private static Task<ItemResult> AddItemAsync(Container container, ExpandoObject
151151
return t.Result;
152152
}
153153

154-
return new ItemResult(null, HttpStatusCode.InternalServerError);
154+
return new ItemResult(null, mode, HttpStatusCode.InternalServerError);
155155
}, cancellationToken);
156156
return task;
157157
}
@@ -179,14 +179,24 @@ private static async Task<ItemResult> PopulateItem(Container container, ExpandoO
179179
var upsertResponse = await container.UpsertItemAsync(item, cancellationToken: cancellationToken);
180180
statusCode = upsertResponse.StatusCode;
181181
break;
182+
case DataWriteMode.DeleteStream:
183+
ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath));
184+
var deleteMessage = await container.DeleteItemStreamAsync(itemId, new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken);
185+
statusCode = deleteMessage.StatusCode;
186+
break;
187+
case DataWriteMode.Delete:
188+
ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath));
189+
var deleteResponse = await container.DeleteItemAsync<ExpandoObject>(itemId, new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken);
190+
statusCode = deleteResponse.StatusCode;
191+
break;
182192
}
183193

184194
if (statusCode == null)
185195
{
186196
throw new ArgumentOutOfRangeException(nameof(mode), $"Invalid data write mode specified: {mode}");
187197
}
188198

189-
return new ItemResult(itemId, statusCode.Value);
199+
return new ItemResult(itemId, mode, statusCode.Value);
190200
}
191201

192202
private static MemoryStream CreateItemStream(ExpandoObject item)
@@ -205,9 +215,10 @@ public IEnumerable<IDataExtensionSettings> GetSettings()
205215
yield return new CosmosSinkSettings();
206216
}
207217

208-
public record ItemResult(string? Id, HttpStatusCode StatusCode)
218+
public record ItemResult(string? Id, DataWriteMode DataWriteMode, HttpStatusCode StatusCode)
209219
{
210-
public bool IsSuccess => StatusCode is HttpStatusCode.OK or HttpStatusCode.Created;
220+
public bool IsSuccess => StatusCode is HttpStatusCode.OK or HttpStatusCode.Created ||
221+
(StatusCode is HttpStatusCode.NoContent or HttpStatusCode.NotFound && DataWriteMode is DataWriteMode.Delete or DataWriteMode.DeleteStream);
211222
public int ItemCount => IsSuccess ? 1 : 0;
212223
}
213224
}

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/DataWriteMode.cs

+2
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ public enum DataWriteMode
66
Insert,
77
UpsertStream,
88
Upsert,
9+
DeleteStream,
10+
Delete
911
}
1012
}

Extensions/Cosmos/README.md

+11-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,17 @@ Or with RBAC:
4444
}
4545
```
4646

47-
Sink requires an additional `PartitionKeyPath` parameter which is used when creating the container if it does not exist. To use hierarchical partition keys, instead use the `PartitionKeyPaths` setting to supply an array of up to 3 paths. It also supports an optional `RecreateContainer` parameter (`false` by default) to delete and then recreate the container to ensure only newly imported data is present. The optional `BatchSize` parameter (100 by default) sets the number of items to accumulate before inserting. `ConnectionMode` can be set to either `Gateway` (default) or `Direct` to control how the client connects to the CosmosDB service. For situations where a container is created as part of the transfer operation `CreatedContainerMaxThroughput` (in RUs) and `UseAutoscaleForCreatedContainer` provide the initial throughput settings which will be in effect when executing the transfer. To instead use shared throughput that has been provisioned at the database level, set the `UseSharedThroughput` parameter to `true`. The optional `WriteMode` parameter specifies the type of data write to use: `InsertStream`, `Insert`, `UpsertStream`, or `Upsert`. The `IsServerlessAccount` parameter specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. Additional parameters allow changing the behavior of the Cosmos client appropriate to your environment.
47+
Sink requires an additional `PartitionKeyPath` parameter which is used when creating the container if it does not exist. To use hierarchical partition keys, instead use the `PartitionKeyPaths` setting to supply an array of up to 3 paths. It also supports an optional `RecreateContainer` parameter (`false` by default) to delete and then recreate the container to ensure only newly imported data is present.
48+
49+
The optional `BatchSize` parameter (100 by default) sets the number of items to accumulate before inserting.
50+
51+
`ConnectionMode` can be set to either `Gateway` (default) or `Direct` to control how the client connects to the CosmosDB service.
52+
53+
For situations where a container is created as part of the transfer operation `CreatedContainerMaxThroughput` (in RUs) and `UseAutoscaleForCreatedContainer` provide the initial throughput settings which will be in effect when executing the transfer. To instead use shared throughput that has been provisioned at the database level, set the `UseSharedThroughput` parameter to `true`.
54+
55+
The optional `WriteMode` parameter specifies the type of data write to use: `InsertStream`, `Insert`, `UpsertStream`, `Upsert`, `DeleteStream` or `Delete`. The latter is useful to undo inserts that might have been written to the wrong container by mistake, if you cannot delete the entire container because it might contain other unrelated data.
56+
57+
The `IsServerlessAccount` parameter specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. Additional parameters allow changing the behavior of the Cosmos client appropriate to your environment.
4858

4959
### Sink
5060

0 commit comments

Comments
 (0)