From caf047ff2ddd57269f34a4fa04e40a675d32d640 Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Mon, 16 Dec 2024 15:14:59 +0100 Subject: [PATCH 01/12] Test Parquet sink --- .../Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs | 1 + .../ParquetFormatWriter.cs | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs b/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs index ceea2b7..1bfc0cd 100644 --- a/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs +++ b/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs @@ -33,6 +33,7 @@ public ParquetDataCol(string name, Type coltype) { ColumnName = name; ColumnType = coltype; + if (coltype != System.Type.Missing.GetType()) { ParquetDataType = MapDataType(name, coltype); diff --git a/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs b/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs index 773d307..10bb836 100644 --- a/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs +++ b/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs @@ -42,6 +42,10 @@ private void ProcessColumns(IDataItem item, long row) { coltype = colval.GetType(); } + if (col == "Timestamp" && coltype == typeof(DateTimeOffset)) + { + continue; + } if (current == null) { var newcol = new ParquetDataCol(col, coltype); From ba174d9bd07e270c1613ac7e97275bcdbcab40af Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 10:58:45 +0100 Subject: [PATCH 02/12] Lower buffer size --- .../Settings/JsonFormatWriterSettings.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs index e0301a7..05f02b4 100644 --- a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs +++ b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs @@ -6,6 +6,6 @@ public class JsonFormatWriterSettings : IDataExtensionSettings { public bool IncludeNullFields { get; set; } public bool Indented { get; set; } - public int BufferSizeMB { get; set; } = 200; + public int BufferSizeMB { get; set; } = 50; } } \ No newline at end of file From ac0ecab1c5b259d914fe42da100198ab8de91d7f Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 11:25:41 +0100 Subject: [PATCH 03/12] changed logging --- .../AzureBlobDataSink.cs | 30 +++++++++++++++++-- .../Settings/JsonFormatWriterSettings.cs | 2 +- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index 2747b15..d18b614 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -1,4 +1,5 @@ -using Azure.Identity; +using System.Diagnostics; +using Azure.Identity; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Azure.Storage.Blobs.Specialized; @@ -40,20 +41,45 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName); + var lastLogTime = DateTime.MinValue; + var logInterval = TimeSpan.FromMinutes(1); + long totalBytes = 0; + + var sw = new Stopwatch(); + sw.Start(); await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions { BufferSize = settings.MaxBlockSizeinKB * 1024L, ProgressHandler = new Progress(l => { - logger.LogInformation("Transferred {UploadedBytes} bytes to Azure Blob", l); + if (DateTime.UtcNow - lastLogTime >= logInterval) + { + logger.LogInformation("Transferred {TotalMiB} bytes to Azure Blob", l/1024/1024); + lastLogTime = DateTime.UtcNow; + } + + totalBytes = l; }) + }, cancellationToken); await writeToStream(blobStream); + + sw.Stop(); + + var totalMib = totalBytes / 1024 / 1024; + var rate = totalMib / sw.ElapsedMilliseconds / 1000; + + logger.LogInformation("Transferred {TotalMiB} Mib to Azure Blob in {TotalTime} minutes ({Rate} MiB/s).", totalMib, sw.ElapsedMilliseconds/1000/60, rate); } public IEnumerable GetSettings() { yield return new AzureBlobSinkSettings(); } + + private void LogBytes() + { + + } } } \ No newline at end of file diff --git a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs index 05f02b4..e0301a7 100644 --- a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs +++ b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs @@ -6,6 +6,6 @@ public class JsonFormatWriterSettings : IDataExtensionSettings { public bool IncludeNullFields { get; set; } public bool Indented { get; set; } - public int BufferSizeMB { get; set; } = 50; + public int BufferSizeMB { get; set; } = 200; } } \ No newline at end of file From b32fb8010840892c67e82a784ba9a7d5b7138518 Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 11:59:29 +0100 Subject: [PATCH 04/12] improve --- .../AzureBlobDataSink.cs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index d18b614..5b72989 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -41,7 +41,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName); - var lastLogTime = DateTime.MinValue; + var lastLogTime = DateTime.UtcNow; var logInterval = TimeSpan.FromMinutes(1); long totalBytes = 0; @@ -54,7 +54,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur { if (DateTime.UtcNow - lastLogTime >= logInterval) { - logger.LogInformation("Transferred {TotalMiB} bytes to Azure Blob", l/1024/1024); + logger.LogInformation("{BlobName}: transferred {TotalMiB} MiB to Azure Blob", settings.BlobName, l / 1024/1024); lastLogTime = DateTime.UtcNow; } @@ -62,24 +62,23 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur }) }, cancellationToken); + await writeToStream(blobStream); sw.Stop(); - var totalMib = totalBytes / 1024 / 1024; - var rate = totalMib / sw.ElapsedMilliseconds / 1000; + if (totalBytes != 0) + { + var totalMib = totalBytes / 1024 / 1024; + var rate = totalMib / sw.ElapsedMilliseconds / 1000; - logger.LogInformation("Transferred {TotalMiB} Mib to Azure Blob in {TotalTime} minutes ({Rate} MiB/s).", totalMib, sw.ElapsedMilliseconds/1000/60, rate); + logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} minutes ({Rate} MiB/s).", settings.BlobName, totalMib, sw.ElapsedMilliseconds / 1000 / 60, rate); + } } public IEnumerable GetSettings() { yield return new AzureBlobSinkSettings(); } - - private void LogBytes() - { - - } } } \ No newline at end of file From 397ba7fcc4088a482abd98dd7c50e3f86361e31b Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 13:05:51 +0100 Subject: [PATCH 05/12] fix --- .../Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index 5b72989..b7ec397 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -70,7 +70,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur if (totalBytes != 0) { var totalMib = totalBytes / 1024 / 1024; - var rate = totalMib / sw.ElapsedMilliseconds / 1000; + var rate = totalMib / (sw.ElapsedMilliseconds / 1000); logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} minutes ({Rate} MiB/s).", settings.BlobName, totalMib, sw.ElapsedMilliseconds / 1000 / 60, rate); } From 401008dfaed8e8c445a351d240d0bd4e4a01b72b Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 14:01:44 +0100 Subject: [PATCH 06/12] f --- .../Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index b7ec397..ab3c2eb 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -70,9 +70,8 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur if (totalBytes != 0) { var totalMib = totalBytes / 1024 / 1024; - var rate = totalMib / (sw.ElapsedMilliseconds / 1000); - logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} minutes ({Rate} MiB/s).", settings.BlobName, totalMib, sw.ElapsedMilliseconds / 1000 / 60, rate); + logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, sw.ElapsedMilliseconds / 1000); } } From df193e2b1bbb2a3d9e2a1d5f387b8413c67ad29b Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 14:13:13 +0100 Subject: [PATCH 07/12] changed buffersize --- .../AzureBlobDataSink.cs | 6 +++--- .../Settings/JsonFormatWriterSettings.cs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index ab3c2eb..bf66b78 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -54,7 +54,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur { if (DateTime.UtcNow - lastLogTime >= logInterval) { - logger.LogInformation("{BlobName}: transferred {TotalMiB} MiB to Azure Blob", settings.BlobName, l / 1024/1024); + logger.LogInformation("{BlobName}: transferred {TotalMiB} MiB to Azure Blob", settings.BlobName, (double) l / 1024/1024); lastLogTime = DateTime.UtcNow; } @@ -69,9 +69,9 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur if (totalBytes != 0) { - var totalMib = totalBytes / 1024 / 1024; + var totalMib = (double) totalBytes / 1024 / 1024; - logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, sw.ElapsedMilliseconds / 1000); + logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, (double) sw.ElapsedMilliseconds / 1000); } } diff --git a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs index e0301a7..d5d370c 100644 --- a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs +++ b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs @@ -6,6 +6,6 @@ public class JsonFormatWriterSettings : IDataExtensionSettings { public bool IncludeNullFields { get; set; } public bool Indented { get; set; } - public int BufferSizeMB { get; set; } = 200; + public int BufferSizeMB { get; set; } = 100; } } \ No newline at end of file From da4ebc4fce64c4f134f5a8affd1047fde1a8ff8d Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Tue, 17 Dec 2024 14:36:27 +0100 Subject: [PATCH 08/12] read improvements --- .../AzureBlobDataSink.cs | 13 ++++++++----- .../AzureTableAPIDataSourceExtension.cs | 19 +++++-------------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index bf66b78..45dd50c 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -45,8 +45,8 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur var logInterval = TimeSpan.FromMinutes(1); long totalBytes = 0; - var sw = new Stopwatch(); - sw.Start(); + var swOpenWrite = new Stopwatch(); + swOpenWrite.Start(); await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions { BufferSize = settings.MaxBlockSizeinKB * 1024L, @@ -62,16 +62,19 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur }) }, cancellationToken); + swOpenWrite.Stop(); + var swWrite = new Stopwatch(); + swWrite.Start(); await writeToStream(blobStream); - - sw.Stop(); + swWrite.Stop(); + logger.LogInformation("{BlobName}: open write for blob took {TotalTime} seconds.", settings.BlobName, swOpenWrite.Elapsed.Seconds); if (totalBytes != 0) { var totalMib = (double) totalBytes / 1024 / 1024; - logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, (double) sw.ElapsedMilliseconds / 1000); + logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.Seconds); } } diff --git a/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSourceExtension.cs b/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSourceExtension.cs index 7184db2..10c5bce 100644 --- a/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSourceExtension.cs +++ b/Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSourceExtension.cs @@ -23,23 +23,14 @@ public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogge var serviceClient = new TableServiceClient(settings.ConnectionString); var tableClient = serviceClient.GetTableClient(settings.Table); - //Pageable queryResultsFilter = tableClient.Query(filter: $"PartitionKey eq '{partitionKey}'"); - AsyncPageable queryResults; - if (!string.IsNullOrWhiteSpace(settings.QueryFilter)) { - queryResults = tableClient.QueryAsync(filter: settings.QueryFilter); - } else { - queryResults = tableClient.QueryAsync(); - } + var queryResults = !string.IsNullOrWhiteSpace(settings.QueryFilter) + ? tableClient.QueryAsync(filter: settings.QueryFilter, cancellationToken: cancellationToken) + : tableClient.QueryAsync(cancellationToken: cancellationToken); - var enumerator = queryResults.GetAsyncEnumerator(); - while (await enumerator.MoveNextAsync()) + await foreach (var entity in queryResults.WithCancellation(cancellationToken)) { - yield return new AzureTableAPIDataItem(enumerator.Current, settings.PartitionKeyFieldName, settings.RowKeyFieldName); + yield return new AzureTableAPIDataItem(entity, settings.PartitionKeyFieldName, settings.RowKeyFieldName); } - //do - //{ - // yield return new AzureTableAPIDataItem(enumerator.Current, settings.PartitionKeyFieldName, settings.RowKeyFieldName); - //} while (await enumerator.MoveNextAsync()); } public IEnumerable GetSettings() From 62cba7e2157bb6e54f15aaa0fae4db8748066c50 Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Wed, 18 Dec 2024 11:29:45 +0100 Subject: [PATCH 09/12] Logging --- .../AzureBlobDataSink.cs | 8 ++------ .../Settings/JsonFormatWriterSettings.cs | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index 45dd50c..ccd6fdd 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -19,7 +19,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur BlobContainerClient account; if (settings.UseRbacAuth) { - logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}'", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials)); + logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials)); var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials); #pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null @@ -31,7 +31,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur } else { - logger.LogInformation("Connecting to Storage account using {ConnectionString}'", nameof(AzureBlobSourceSettings.ConnectionString)); + logger.LogInformation("Connecting to Storage account using {ConnectionString}", nameof(AzureBlobSourceSettings.ConnectionString)); account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName); } @@ -45,8 +45,6 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur var logInterval = TimeSpan.FromMinutes(1); long totalBytes = 0; - var swOpenWrite = new Stopwatch(); - swOpenWrite.Start(); await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions { BufferSize = settings.MaxBlockSizeinKB * 1024L, @@ -62,14 +60,12 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur }) }, cancellationToken); - swOpenWrite.Stop(); var swWrite = new Stopwatch(); swWrite.Start(); await writeToStream(blobStream); swWrite.Stop(); - logger.LogInformation("{BlobName}: open write for blob took {TotalTime} seconds.", settings.BlobName, swOpenWrite.Elapsed.Seconds); if (totalBytes != 0) { var totalMib = (double) totalBytes / 1024 / 1024; diff --git a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs index d5d370c..e0301a7 100644 --- a/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs +++ b/Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs @@ -6,6 +6,6 @@ public class JsonFormatWriterSettings : IDataExtensionSettings { public bool IncludeNullFields { get; set; } public bool Indented { get; set; } - public int BufferSizeMB { get; set; } = 100; + public int BufferSizeMB { get; set; } = 200; } } \ No newline at end of file From 7ca9df7f47f323c90f66cedf55867bc85298cfc2 Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Wed, 18 Dec 2024 11:55:15 +0100 Subject: [PATCH 10/12] Undo parquet changes --- .../ParquetFormatWriter.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs b/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs index 10bb836..773d307 100644 --- a/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs +++ b/Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs @@ -42,10 +42,6 @@ private void ProcessColumns(IDataItem item, long row) { coltype = colval.GetType(); } - if (col == "Timestamp" && coltype == typeof(DateTimeOffset)) - { - continue; - } if (current == null) { var newcol = new ParquetDataCol(col, coltype); From 39aec40d809a562494ccdf372f737c021df6dce3 Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Wed, 18 Dec 2024 16:55:52 +0100 Subject: [PATCH 11/12] formatting --- .../Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index ccd6fdd..b1a26de 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -52,7 +52,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur { if (DateTime.UtcNow - lastLogTime >= logInterval) { - logger.LogInformation("{BlobName}: transferred {TotalMiB} MiB to Azure Blob", settings.BlobName, (double) l / 1024/1024); + logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} MiB to Azure Blob", settings.BlobName, (double) l / 1024/1024); lastLogTime = DateTime.UtcNow; } @@ -70,7 +70,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur { var totalMib = (double) totalBytes / 1024 / 1024; - logger.LogInformation("{BlobName}: transferred {TotalMiB} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.Seconds); + logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.Seconds); } } From 0a7c0a3a070c7475c9247eab3c82b7afb110c9a7 Mon Sep 17 00:00:00 2001 From: Guillaume Van den Berghe Date: Wed, 18 Dec 2024 17:10:33 +0100 Subject: [PATCH 12/12] Fix logging --- .../Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs index b1a26de..a3b548e 100644 --- a/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs +++ b/Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs @@ -70,7 +70,7 @@ public async Task WriteToTargetAsync(Func writeToStream, IConfigur { var totalMib = (double) totalBytes / 1024 / 1024; - logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.Seconds); + logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.TotalSeconds); } }