Skip to content

Commit f458054

Browse files
authoredJul 27, 2023
Merge pull request #80 from AzureCosmosDB/feature/cosmos-error-reporting
Improving record level error reporting for Cosmos
2 parents 5ba8a1f + 1cffd38 commit f458054

File tree

1 file changed

+62
-29
lines changed

1 file changed

+62
-29
lines changed
 

‎Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs

+62-29
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
using System.ComponentModel.Composition;
22
using System.Diagnostics;
33
using System.Dynamic;
4+
using System.Net;
45
using System.Text;
5-
using System.Text.RegularExpressions;
66
using Cosmos.DataTransfer.Interfaces;
77
using Microsoft.Azure.Cosmos;
88
using Microsoft.Extensions.Configuration;
@@ -81,6 +81,7 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
8181
await CosmosExtensionServices.VerifyContainerAccess(container, settings.Container, logger, cancellationToken);
8282

8383
int addedCount = 0;
84+
int inputCount = 0;
8485

8586
var timer = Stopwatch.StartNew();
8687
void ReportCount(int i)
@@ -100,15 +101,17 @@ void ReportCount(int i)
100101
var addTasks = batch.Select(item => AddItemAsync(container, item, settings.PartitionKeyPath ?? settings.PartitionKeyPaths?.FirstOrDefault(), settings.WriteMode, retry, logger, cancellationToken)).ToList();
101102

102103
var results = await Task.WhenAll(addTasks);
103-
ReportCount(results.Sum());
104+
ReportCount(results.Sum(i => i.ItemCount));
105+
inputCount += results.Length;
104106
}
105107

106-
logger.LogInformation("Added {AddedCount} total records in {TotalSeconds}s", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}");
107-
}
108+
if (addedCount != inputCount)
109+
{
110+
logger.LogWarning("Added {AddedCount} of {TotalCount} total records in {TotalSeconds}s", addedCount, inputCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}");
111+
throw new Exception($"Only {addedCount} of {inputCount} records were added to Cosmos");
112+
}
108113

109-
private static string StripSpecialChars(string displayName)
110-
{
111-
return Regex.Replace(displayName, "[^\\w]", "", RegexOptions.Compiled);
114+
logger.LogInformation("Added {AddedCount} total records in {TotalSeconds}s", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}");
112115
}
113116

114117
private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRetryDuration)
@@ -124,44 +127,68 @@ private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRet
124127
return retryPolicy;
125128
}
126129

127-
private static Task<int> AddItemAsync(Container container, ExpandoObject item, string? partitionKeyPath, DataWriteMode mode, AsyncRetryPolicy retryPolicy, ILogger logger, CancellationToken cancellationToken)
130+
private static Task<ItemResult> AddItemAsync(Container container, ExpandoObject item, string? partitionKeyPath, DataWriteMode mode, AsyncRetryPolicy retryPolicy, ILogger logger, CancellationToken cancellationToken)
128131
{
129-
logger.LogTrace("Adding item {Id}", GetPropertyValue(item, "id"));
130-
var task = retryPolicy.ExecuteAsync(() =>
131-
{
132-
switch (mode)
133-
{
134-
case DataWriteMode.InsertStream:
135-
ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath));
136-
return container.CreateItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken);
137-
case DataWriteMode.Insert:
138-
return container.CreateItemAsync(item, cancellationToken: cancellationToken);
139-
case DataWriteMode.UpsertStream:
140-
ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath));
141-
return container.UpsertItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken);
142-
case DataWriteMode.Upsert:
143-
return container.UpsertItemAsync(item, cancellationToken: cancellationToken);
144-
}
132+
string? id = GetPropertyValue(item, "id");
133+
logger.LogTrace("Adding item {Id}", id);
145134

146-
throw new ArgumentOutOfRangeException(nameof(mode), $"Invalid data write mode specified: {mode}");
147-
})
135+
var task = retryPolicy.ExecuteAsync(() => PopulateItem(container, item, partitionKeyPath, mode, id, cancellationToken))
148136
.ContinueWith(t =>
149137
{
150-
if (t.IsCompletedSuccessfully)
138+
bool requestSucceeded = t.Result.IsSuccess;
139+
if (t.IsCompletedSuccessfully && requestSucceeded)
151140
{
152-
return 1;
141+
return t.Result;
153142
}
154143

155144
if (t.IsFaulted)
156145
{
157146
logger.LogWarning(t.Exception, "Error adding record: {ErrorMessage}", t.Exception?.Message);
158147
}
148+
else if (!requestSucceeded)
149+
{
150+
logger.LogWarning(t.Exception, "Error adding record {Id}: {ErrorMessage}", t.Result.Id, t.Result.StatusCode);
151+
return t.Result;
152+
}
159153

160-
return 0;
154+
return new ItemResult(null, HttpStatusCode.InternalServerError);
161155
}, cancellationToken);
162156
return task;
163157
}
164158

159+
private static async Task<ItemResult> PopulateItem(Container container, ExpandoObject item, string? partitionKeyPath, DataWriteMode mode, string? itemId, CancellationToken cancellationToken)
160+
{
161+
HttpStatusCode? statusCode = null;
162+
switch (mode)
163+
{
164+
case DataWriteMode.InsertStream:
165+
ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath));
166+
var insertMessage = await container.CreateItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken);
167+
statusCode = insertMessage.StatusCode;
168+
break;
169+
case DataWriteMode.Insert:
170+
var insertResponse = await container.CreateItemAsync(item, cancellationToken: cancellationToken);
171+
statusCode = insertResponse.StatusCode;
172+
break;
173+
case DataWriteMode.UpsertStream:
174+
ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath));
175+
var upsertMessage = await container.UpsertItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken);
176+
statusCode = upsertMessage.StatusCode;
177+
break;
178+
case DataWriteMode.Upsert:
179+
var upsertResponse = await container.UpsertItemAsync(item, cancellationToken: cancellationToken);
180+
statusCode = upsertResponse.StatusCode;
181+
break;
182+
}
183+
184+
if (statusCode == null)
185+
{
186+
throw new ArgumentOutOfRangeException(nameof(mode), $"Invalid data write mode specified: {mode}");
187+
}
188+
189+
return new ItemResult(itemId, statusCode.Value);
190+
}
191+
165192
private static MemoryStream CreateItemStream(ExpandoObject item)
166193
{
167194
var json = JsonConvert.SerializeObject(item);
@@ -177,5 +204,11 @@ public IEnumerable<IDataExtensionSettings> GetSettings()
177204
{
178205
yield return new CosmosSinkSettings();
179206
}
207+
208+
public record ItemResult(string? Id, HttpStatusCode StatusCode)
209+
{
210+
public bool IsSuccess => StatusCode is HttpStatusCode.OK or HttpStatusCode.Created;
211+
public int ItemCount => IsSuccess ? 1 : 0;
212+
}
180213
}
181214
}

0 commit comments

Comments
 (0)