forked from AzureCosmosDB/data-migration-desktop-tool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAzureBlobDataSink.cs
82 lines (67 loc) · 3.51 KB
/
AzureBlobDataSink.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
using System.Diagnostics;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace Cosmos.DataTransfer.AzureBlobStorage
{
public class AzureBlobDataSink : IComposableDataSink
{
public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
{
var settings = config.Get<AzureBlobSinkSettings>();
settings.Validate();
BlobContainerClient account;
if (settings.UseRbacAuth)
{
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));
var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
#pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null
var baseUri = new Uri(settings.AccountEndpoint);
var blobContainerUri = new Uri(baseUri, settings.ContainerName);
#pragma warning restore CS8604 // Restore warning
account = new BlobContainerClient(blobContainerUri, credential);
}
else
{
logger.LogInformation("Connecting to Storage account using {ConnectionString}", nameof(AzureBlobSourceSettings.ConnectionString));
account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
}
await account.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
var blob = account.GetBlockBlobClient(settings.BlobName);
logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);
var lastLogTime = DateTime.UtcNow;
var logInterval = TimeSpan.FromMinutes(1);
long totalBytes = 0;
await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions
{
BufferSize = settings.MaxBlockSizeinKB * 1024L,
ProgressHandler = new Progress<long>(l =>
{
if (DateTime.UtcNow - lastLogTime >= logInterval)
{
logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} MiB to Azure Blob", settings.BlobName, (double) l / 1024/1024);
lastLogTime = DateTime.UtcNow;
}
totalBytes = l;
})
}, cancellationToken);
var swWrite = new Stopwatch();
swWrite.Start();
await writeToStream(blobStream);
swWrite.Stop();
if (totalBytes != 0)
{
var totalMib = (double) totalBytes / 1024 / 1024;
logger.LogInformation("{BlobName}: transferred {TotalMiB:F2} Mib to Azure Blob in {TotalTime} seconds.", settings.BlobName, totalMib, swWrite.Elapsed.TotalSeconds);
}
}
public IEnumerable<IDataExtensionSettings> GetSettings()
{
yield return new AzureBlobSinkSettings();
}
}
}