Skip to content

Commit fde0479

Browse files
authored
Merge pull request #87 from AzureCosmosDB/develop
Reading from cloud stores; JSON array null fix
2 parents f458054 + d00bcc9 commit fde0479

29 files changed

+342
-121
lines changed

Extensions/AwsS3/Cosmos.DataTransfer.AwsS3Storage/AwsS3DataSink.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
1111
var settings = config.Get<AwsS3SinkSettings>();
1212
settings.Validate();
1313

14-
logger.LogInformation("Saving file to AWS S3 Bucket '{BucketName}'", settings.S3BucketName);
14+
logger.LogInformation("Saving file {File} to AWS S3 Bucket '{BucketName}'", settings.FileName, settings.S3BucketName);
1515

16-
S3Writer.InitializeS3Client(settings.S3AccessKey, settings.S3SecretKey, settings.S3Region);
16+
using var s3 = new S3Client(settings.S3AccessKey, settings.S3SecretKey, settings.S3Region);
1717
await using var stream = new MemoryStream();
1818
await writeToStream(stream);
19-
await S3Writer.WriteToS3(settings.S3BucketName, stream, settings.FileName, cancellationToken);
19+
await s3.WriteToS3(settings.S3BucketName, stream, settings.FileName, cancellationToken);
2020
}
2121

2222
public IEnumerable<IDataExtensionSettings> GetSettings()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System.Runtime.CompilerServices;
2+
using Cosmos.DataTransfer.Interfaces;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace Cosmos.DataTransfer.AwsS3Storage;
7+
8+
public class AwsS3DataSource : IComposableDataSource
9+
{
10+
public async IAsyncEnumerable<Stream?> ReadSourceAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
11+
{
12+
var settings = config.Get<AwsS3SourceSettings>();
13+
settings.Validate();
14+
15+
logger.LogInformation("Reading file {File} from AWS S3 Bucket '{BucketName}'", settings.FileName, settings.S3BucketName);
16+
17+
using var s3 = new S3Client(settings.S3AccessKey, settings.S3SecretKey, settings.S3Region);
18+
var stream = await s3.ReadFromS3(settings.S3BucketName, settings.FileName, cancellationToken);
19+
yield return stream;
20+
}
21+
22+
public IEnumerable<IDataExtensionSettings> GetSettings()
23+
{
24+
yield return new AwsS3SourceSettings();
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using Cosmos.DataTransfer.Interfaces.Manifest;
3+
using System.ComponentModel.DataAnnotations;
4+
5+
namespace Cosmos.DataTransfer.AwsS3Storage;
6+
7+
public class AwsS3SourceSettings : IDataExtensionSettings
8+
{
9+
[Required]
10+
public string FileName { get; set; } = null!;
11+
[Required]
12+
public string S3Region { get; set; } = null!;
13+
[Required]
14+
public string S3BucketName { get; set; } = null!;
15+
[Required]
16+
[SensitiveValue]
17+
public string S3AccessKey { get; set; } = null!;
18+
[Required]
19+
[SensitiveValue]
20+
public string S3SecretKey { get; set; } = null!;
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using Amazon;
2+
using Amazon.S3;
3+
using Amazon.S3.Transfer;
4+
using System.Threading;
5+
6+
namespace Cosmos.DataTransfer.AwsS3Storage
7+
{
8+
public class S3Client : IDisposable
9+
{
10+
private readonly IAmazonS3 _s3Client;
11+
12+
public S3Client(string accessKey, string secretKey, string regionName)
13+
{
14+
RegionEndpoint region = RegionEndpoint.GetBySystemName(regionName);
15+
_s3Client = new AmazonS3Client(accessKey, secretKey, region);
16+
}
17+
18+
public async Task WriteToS3(string bucketName, Stream data, string filename, CancellationToken cancellationToken)
19+
{
20+
var ftu = new TransferUtility(_s3Client);
21+
await ftu.UploadAsync(data, bucketName, filename, cancellationToken);
22+
}
23+
24+
public async Task<Stream> ReadFromS3(string bucketName, string filename, CancellationToken cancellationToken)
25+
{
26+
var ftu = new TransferUtility(_s3Client);
27+
var stream = await ftu.OpenStreamAsync(bucketName, filename, cancellationToken);
28+
return stream;
29+
}
30+
31+
public void Dispose()
32+
{
33+
_s3Client.Dispose();
34+
}
35+
}
36+
}

Extensions/AwsS3/Cosmos.DataTransfer.AwsS3Storage/S3Writer.cs

-23
This file was deleted.

Extensions/AwsS3/README.md

+15-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
1-
# AWS S3 Extension (beta)
1+
# AWS S3 Extension
22

3-
The AWS S3 extension provides writing of formatted files to AWS S3 buckets.
3+
The AWS S3 extension provides reading and writing of formatted files to AWS S3 buckets.
44

55
> **Note**: This is a Binary Storage Extension that is only used in combination with File Format extensions.
66
77
## Settings
88

9-
Sink settings require all parameters shown below.
9+
Source and Sink settings require the parameters shown below.
10+
11+
### Source
12+
13+
```json
14+
{
15+
"FileName": "",
16+
"S3Region": "us-west-1",
17+
"S3BucketName": "",
18+
"S3AccessKey": "",
19+
"S3SecretKey": ""
20+
}
21+
```
1022

1123
### Sink
1224

Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
using Cosmos.DataTransfer.Interfaces;
1+
using Azure.Storage.Blobs;
2+
using Cosmos.DataTransfer.Interfaces;
23
using Microsoft.Extensions.Configuration;
34
using Microsoft.Extensions.Logging;
5+
using Azure.Storage.Blobs.Specialized;
6+
using Azure.Storage.Blobs.Models;
47

58
namespace Cosmos.DataTransfer.AzureBlobStorage
69
{
@@ -12,10 +15,20 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
1215
settings.Validate();
1316

1417
logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);
15-
await BlobWriter.InitializeAzureBlobClient(settings.ConnectionString, settings.ContainerName, settings.BlobName, cancellationToken);
16-
await using var stream = new MemoryStream();
17-
await writeToStream(stream);
18-
await BlobWriter.WriteToAzureBlob(stream.ToArray(), settings.MaxBlockSizeinKB, cancellationToken);
18+
19+
var account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
20+
await account.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
21+
var blob = account.GetBlockBlobClient(settings.BlobName);
22+
23+
await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions
24+
{
25+
BufferSize = settings.MaxBlockSizeinKB * 1024L,
26+
ProgressHandler = new Progress<long>(l =>
27+
{
28+
logger.LogInformation("Transferred {UploadedBytes} bytes to Azure Blob", l);
29+
})
30+
}, cancellationToken);
31+
await writeToStream(blobStream);
1932
}
2033

2134
public IEnumerable<IDataExtensionSettings> GetSettings()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using System.Runtime.CompilerServices;
2+
using Azure.Storage.Blobs;
3+
using Cosmos.DataTransfer.Interfaces;
4+
using Microsoft.Extensions.Configuration;
5+
using Microsoft.Extensions.Logging;
6+
using Azure.Storage.Blobs.Specialized;
7+
using Azure.Storage.Blobs.Models;
8+
9+
namespace Cosmos.DataTransfer.AzureBlobStorage;
10+
11+
public class AzureBlobDataSource : IComposableDataSource
12+
{
13+
public async IAsyncEnumerable<Stream?> ReadSourceAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
14+
{
15+
var settings = config.Get<AzureBlobSourceSettings>();
16+
settings.Validate();
17+
18+
logger.LogInformation("Reading file '{File}' from Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);
19+
20+
var account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
21+
var blob = account.GetBlockBlobClient(settings.BlobName);
22+
var existsResponse = await blob.ExistsAsync(cancellationToken: cancellationToken);
23+
if (!existsResponse)
24+
yield break;
25+
26+
var readStream = await blob.OpenReadAsync(new BlobOpenReadOptions(false)
27+
{
28+
BufferSize = settings.ReadBufferSizeInKB,
29+
}, cancellationToken: cancellationToken);
30+
31+
yield return readStream;
32+
}
33+
34+
public IEnumerable<IDataExtensionSettings> GetSettings()
35+
{
36+
yield return new AzureBlobSourceSettings();
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using System.ComponentModel.DataAnnotations;
3+
using Cosmos.DataTransfer.Interfaces.Manifest;
4+
5+
namespace Cosmos.DataTransfer.AzureBlobStorage;
6+
7+
public class AzureBlobSourceSettings : IDataExtensionSettings
8+
{
9+
[Required]
10+
[SensitiveValue]
11+
public string ConnectionString { get; set; } = null!;
12+
13+
[Required]
14+
public string ContainerName { get; set; } = null!;
15+
16+
[Required]
17+
public string BlobName { get; set; } = null!;
18+
19+
public int? ReadBufferSizeInKB { get; set; }
20+
}

Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/BlobWriter.cs

-63
This file was deleted.

Extensions/AzureBlob/README.md

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,29 @@
1-
# Azure Blob Storage Extension (beta)
1+
# Azure Blob Storage Extension
22

3-
The Azure Blob Storage extension provides writing of formatted files to Azure Blob Storage containers.
3+
The Azure Blob Storage extension provides reading and writing of formatted files to Azure Blob Storage containers.
44

55
> **Note**: This is a Binary Storage Extension that is only used in combination with File Format extensions.
66
77
## Settings
88

9-
Sink settings require all parameters shown below. An optional `MaxBlockSizeInKB` parameter can also be specified to control the transfer.
9+
Source and Sink settings require the parameters shown below.
10+
11+
### Source
12+
13+
An optional `ReadBufferSizeInKB` parameter can be used to control stream buffering.
14+
15+
```json
16+
{
17+
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=...",
18+
"ContainerName": "",
19+
"BlobName": "",
20+
}
21+
```
1022

1123
### Sink
1224

25+
An optional `MaxBlockSizeInKB` parameter can also be specified to control the transfer.
26+
1327
```json
1428
{
1529
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=...",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using Cosmos.DataTransfer.AwsS3Storage;
2+
using Cosmos.DataTransfer.Common;
3+
using Cosmos.DataTransfer.Interfaces;
4+
using System.ComponentModel.Composition;
5+
6+
namespace Cosmos.DataTransfer.CsvExtension;
7+
8+
[Export(typeof(IDataSourceExtension))]
9+
public class CsvAwsS3Source : CompositeSourceExtension<AwsS3DataSource, CsvFormatReader>
10+
{
11+
public override string DisplayName => "CSV-AWSS3";
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using Cosmos.DataTransfer.AzureBlobStorage;
2+
using Cosmos.DataTransfer.Common;
3+
using Cosmos.DataTransfer.Interfaces;
4+
using System.ComponentModel.Composition;
5+
6+
namespace Cosmos.DataTransfer.CsvExtension;
7+
8+
[Export(typeof(IDataSourceExtension))]
9+
public class CsvAzureBlobSource : CompositeSourceExtension<AzureBlobDataSource, CsvFormatReader>
10+
{
11+
public override string DisplayName => "CSV-AzureBlob";
12+
}

Extensions/Csv/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ Supported storage sinks:
1313

1414
Supported storage sources:
1515
- File - **Csv**
16+
- Azure Blob Storage - **Csv-AzureBlob**
17+
- AWS S3 - **Csv-AwsS3**
1618

1719
## Settings
1820

0 commit comments

Comments
 (0)