Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging improvements #5

Merged
merged 12 commits into from
Dec 19, 2024
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Azure.Identity;
using System.Diagnostics;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Expand All @@ -18,7 +19,7 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
BlobContainerClient account;
if (settings.UseRbacAuth)
{
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}'", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));

var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
#pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null
Expand All @@ -30,7 +31,7 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
}
else
{
logger.LogInformation("Connecting to Storage account using {ConnectionString}'", nameof(AzureBlobSourceSettings.ConnectionString));
logger.LogInformation("Connecting to Storage account using {ConnectionString}", nameof(AzureBlobSourceSettings.ConnectionString));

account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
}
Expand All @@ -40,15 +41,37 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur

logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);

var lastLogTime = DateTime.UtcNow;
var logInterval = TimeSpan.FromMinutes(1);
long totalBytes = 0;

await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions
{
BufferSize = settings.MaxBlockSizeinKB * 1024L,
ProgressHandler = new Progress<long>(l =>
{
logger.LogInformation("Transferred {UploadedBytes} bytes to Azure Blob", l);
if (DateTime.UtcNow - lastLogTime >= logInterval)
{
logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} MiB to Azure Blob", settings.BlobName, (double) l / 1024/1024);
lastLogTime = DateTime.UtcNow;
}

totalBytes = l;
})

}, cancellationToken);

var swWrite = new Stopwatch();
swWrite.Start();
await writeToStream(blobStream);
swWrite.Stop();

if (totalBytes != 0)
{
var totalMib = (double) totalBytes / 1024 / 1024;

logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.TotalSeconds);
}
}

public IEnumerable<IDataExtensionSettings> GetSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,14 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
var serviceClient = new TableServiceClient(settings.ConnectionString);
var tableClient = serviceClient.GetTableClient(settings.Table);

//Pageable<TableEntity> queryResultsFilter = tableClient.Query<TableEntity>(filter: $"PartitionKey eq '{partitionKey}'");
AsyncPageable<TableEntity> queryResults;
if (!string.IsNullOrWhiteSpace(settings.QueryFilter)) {
queryResults = tableClient.QueryAsync<TableEntity>(filter: settings.QueryFilter);
} else {
queryResults = tableClient.QueryAsync<TableEntity>();
}
var queryResults = !string.IsNullOrWhiteSpace(settings.QueryFilter)
? tableClient.QueryAsync<TableEntity>(filter: settings.QueryFilter, cancellationToken: cancellationToken)
: tableClient.QueryAsync<TableEntity>(cancellationToken: cancellationToken);

var enumerator = queryResults.GetAsyncEnumerator();
while (await enumerator.MoveNextAsync())
await foreach (var entity in queryResults.WithCancellation(cancellationToken))
{
yield return new AzureTableAPIDataItem(enumerator.Current, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
yield return new AzureTableAPIDataItem(entity, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
}
//do
//{
// yield return new AzureTableAPIDataItem(enumerator.Current, settings.PartitionKeyFieldName, settings.RowKeyFieldName);
//} while (await enumerator.MoveNextAsync());
}

public IEnumerable<IDataExtensionSettings> GetSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public Parquet.Data.DataColumn ParquetDataColumn { get; set; }

public ParquetDataCol()

Check warning on line 27 in Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs

View workflow job for this annotation

GitHub Actions / Build and test .NET projects

Non-nullable property 'ColumnName' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 27 in Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs

View workflow job for this annotation

GitHub Actions / Build and test .NET projects

Non-nullable property 'ParquetDataType' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 27 in Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs

View workflow job for this annotation

GitHub Actions / Build and test .NET projects

Non-nullable property 'ParquetDataColumn' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
{
ColumnType = Type.Missing.GetType();
}
Expand All @@ -33,6 +33,7 @@
{
ColumnName = name;
ColumnType = coltype;

if (coltype != System.Type.Missing.GetType())
{
ParquetDataType = MapDataType(name, coltype);
Expand Down
Loading