Skip to content

Commit 4140082

Browse files
Merge pull request #5 from GuillaumeVadeBe/feature/test-parquet
Logging improvements
2 parents 8eab0a9 + 0a7c0a3 commit 4140082

File tree

3 files changed

+33
-18
lines changed

3 files changed

+33
-18
lines changed

Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs

+27-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Azure.Identity;
1+
using System.Diagnostics;
2+
using Azure.Identity;
23
using Azure.Storage.Blobs;
34
using Azure.Storage.Blobs.Models;
45
using Azure.Storage.Blobs.Specialized;
@@ -18,7 +19,7 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
1819
BlobContainerClient account;
1920
if (settings.UseRbacAuth)
2021
{
21-
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}'", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));
22+
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));
2223

2324
var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
2425
#pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null
@@ -30,7 +31,7 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
3031
}
3132
else
3233
{
33-
logger.LogInformation("Connecting to Storage account using {ConnectionString}'", nameof(AzureBlobSourceSettings.ConnectionString));
34+
logger.LogInformation("Connecting to Storage account using {ConnectionString}", nameof(AzureBlobSourceSettings.ConnectionString));
3435

3536
account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
3637
}
@@ -40,15 +41,37 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
4041

4142
logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);
4243

44+
var lastLogTime = DateTime.UtcNow;
45+
var logInterval = TimeSpan.FromMinutes(1);
46+
long totalBytes = 0;
47+
4348
await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions
4449
{
4550
BufferSize = settings.MaxBlockSizeinKB * 1024L,
4651
ProgressHandler = new Progress<long>(l =>
4752
{
48-
logger.LogInformation("Transferred {UploadedBytes} bytes to Azure Blob", l);
53+
if (DateTime.UtcNow - lastLogTime >= logInterval)
54+
{
55+
logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} MiB to Azure Blob", settings.BlobName, (double) l / 1024/1024);
56+
lastLogTime = DateTime.UtcNow;
57+
}
58+
59+
totalBytes = l;
4960
})
61+
5062
}, cancellationToken);
63+
64+
var swWrite = new Stopwatch();
65+
swWrite.Start();
5166
await writeToStream(blobStream);
67+
swWrite.Stop();
68+
69+
if (totalBytes != 0)
70+
{
71+
var totalMib = (double) totalBytes / 1024 / 1024;
72+
73+
logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.TotalSeconds);
74+
}
5275
}
5376

5477
public IEnumerable<IDataExtensionSettings> GetSettings()

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSourceExtension.cs

+5-14
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,14 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
2323
var serviceClient = new TableServiceClient(settings.ConnectionString);
2424
var tableClient = serviceClient.GetTableClient(settings.Table);
2525

26-
//Pageable<TableEntity> queryResultsFilter = tableClient.Query<TableEntity>(filter: $"PartitionKey eq '{partitionKey}'");
27-
AsyncPageable<TableEntity> queryResults;
28-
if (!string.IsNullOrWhiteSpace(settings.QueryFilter)) {
29-
queryResults = tableClient.QueryAsync<TableEntity>(filter: settings.QueryFilter);
30-
} else {
31-
queryResults = tableClient.QueryAsync<TableEntity>();
32-
}
26+
var queryResults = !string.IsNullOrWhiteSpace(settings.QueryFilter)
27+
? tableClient.QueryAsync<TableEntity>(filter: settings.QueryFilter, cancellationToken: cancellationToken)
28+
: tableClient.QueryAsync<TableEntity>(cancellationToken: cancellationToken);
3329

34-
var enumerator = queryResults.GetAsyncEnumerator();
35-
while (await enumerator.MoveNextAsync())
30+
await foreach (var entity in queryResults.WithCancellation(cancellationToken))
3631
{
37-
yield return new AzureTableAPIDataItem(enumerator.Current, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
32+
yield return new AzureTableAPIDataItem(entity, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
3833
}
39-
//do
40-
//{
41-
// yield return new AzureTableAPIDataItem(enumerator.Current, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
42-
//} while (await enumerator.MoveNextAsync());
4334
}
4435

4536
public IEnumerable<IDataExtensionSettings> GetSettings()

Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public ParquetDataCol(string name, Type coltype)
3333
{
3434
ColumnName = name;
3535
ColumnType = coltype;
36+
3637
if (coltype != System.Type.Missing.GetType())
3738
{
3839
ParquetDataType = MapDataType(name, coltype);

0 commit comments

Comments
 (0)