Skip to content

Commit 69efd3e

Browse files
authoredJul 5, 2023
Merge pull request #76 from AzureCosmosDB/develop
Extension improvements
2 parents ee76585 + ae71d46 commit 69efd3e

27 files changed

+178
-438
lines changed
 

‎Core/Cosmos.DataTransfer.Core/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public static async Task<int> Main(string[] args)
4141
Sink = ctx.BindingContext.ParseResult.GetValueForOption(rootCommand.Options.ElementAt(1)) as string,
4242
Settings = ctx.BindingContext.ParseResult.GetValueForOption(rootCommand.Options.ElementAt(2)) as FileInfo
4343
};
44-
await handler.InvokeAsync(ctx);
44+
ctx.ExitCode = await handler.InvokeAsync(ctx);
4545
}
4646
});
4747

‎Core/Cosmos.DataTransfer.Core/RunCommand.cs

+61-58
Original file line numberDiff line numberDiff line change
@@ -74,60 +74,71 @@ public async Task<int> InvokeAsync(InvocationContext context)
7474
{
7575
CancellationToken cancellationToken = context.GetCancellationToken();
7676

77-
var configuredOptions = _configuration.Get<DataTransferOptions>() ?? new DataTransferOptions();
78-
var combinedConfig = BuildSettingsConfiguration(_configuration,
79-
Settings?.FullName ?? configuredOptions.SettingsPath,
80-
string.IsNullOrEmpty(Source ?? configuredOptions.Source) && string.IsNullOrEmpty(Sink ?? configuredOptions.Sink),
81-
cancellationToken);
77+
try
78+
{
79+
var configuredOptions = _configuration.Get<DataTransferOptions>() ?? new DataTransferOptions();
80+
var combinedConfig = await BuildSettingsConfiguration(_configuration,
81+
Settings?.FullName ?? configuredOptions.SettingsPath,
82+
string.IsNullOrEmpty(Source ?? configuredOptions.Source) && string.IsNullOrEmpty(Sink ?? configuredOptions.Sink),
83+
cancellationToken);
8284

83-
var options = combinedConfig.Get<DataTransferOptions>();
85+
var options = combinedConfig.Get<DataTransferOptions>();
8486

85-
string extensionsPath = _extensionLoader.GetExtensionFolderPath();
86-
CompositionContainer container = _extensionLoader.BuildExtensionCatalog(extensionsPath);
87+
string extensionsPath = _extensionLoader.GetExtensionFolderPath();
88+
CompositionContainer container = _extensionLoader.BuildExtensionCatalog(extensionsPath);
8789

88-
var sources = _extensionLoader.LoadExtensions<IDataSourceExtension>(container);
89-
var sinks = _extensionLoader.LoadExtensions<IDataSinkExtension>(container);
90+
var sources = _extensionLoader.LoadExtensions<IDataSourceExtension>(container);
91+
var sinks = _extensionLoader.LoadExtensions<IDataSinkExtension>(container);
9092

91-
cancellationToken.ThrowIfCancellationRequested();
93+
cancellationToken.ThrowIfCancellationRequested();
9294

93-
var source = GetExtensionSelection(Source ?? options.Source, sources, "Source", cancellationToken);
94-
cancellationToken.ThrowIfCancellationRequested();
95-
var sink = GetExtensionSelection(Sink ?? options.Sink, sinks, "Sink", cancellationToken);
96-
cancellationToken.ThrowIfCancellationRequested();
95+
var source = await GetExtensionSelection(Source ?? options.Source, sources, "Source", cancellationToken);
96+
cancellationToken.ThrowIfCancellationRequested();
97+
var sink = await GetExtensionSelection(Sink ?? options.Sink, sinks, "Sink", cancellationToken);
98+
cancellationToken.ThrowIfCancellationRequested();
9799

98-
var sourceConfig = combinedConfig.GetSection("SourceSettings");
99-
var sinkConfig = GetSinkConfig(combinedConfig);
100-
var operationConfigs = combinedConfig.GetSection("Operations");
101-
var operations = operationConfigs?.GetChildren().ToList();
102-
if (operations?.Any() == true)
103-
{
104-
foreach (var operationConfig in operations)
100+
var sourceConfig = combinedConfig.GetSection("SourceSettings");
101+
var sinkConfig = GetSinkConfig(combinedConfig);
102+
var operationConfigs = combinedConfig.GetSection("Operations");
103+
var operations = operationConfigs?.GetChildren().ToList();
104+
bool succeeded = true;
105+
if (operations?.Any() == true)
105106
{
106-
var operationSource = operationConfig.GetSection("SourceSettings");
107-
var sourceBuilder = new ConfigurationBuilder().AddConfiguration(sourceConfig);
108-
if (operationSource.Exists())
109-
{
110-
sourceBuilder.AddConfiguration(operationSource);
111-
}
112-
var operationSink = GetSinkConfig(operationConfig);
113-
var sinkBuilder = new ConfigurationBuilder().AddConfiguration(sinkConfig);
114-
if (operationSink.Exists())
107+
foreach (var operationConfig in operations)
115108
{
116-
sinkBuilder.AddConfiguration(operationSink);
109+
var operationSource = operationConfig.GetSection("SourceSettings");
110+
var sourceBuilder = new ConfigurationBuilder().AddConfiguration(sourceConfig);
111+
if (operationSource.Exists())
112+
{
113+
sourceBuilder.AddConfiguration(operationSource);
114+
}
115+
var operationSink = GetSinkConfig(operationConfig);
116+
var sinkBuilder = new ConfigurationBuilder().AddConfiguration(sinkConfig);
117+
if (operationSink.Exists())
118+
{
119+
sinkBuilder.AddConfiguration(operationSink);
120+
}
121+
succeeded &= await ExecuteDataTransferOperation(source,
122+
sourceBuilder.Build(),
123+
sink,
124+
sinkBuilder.Build(),
125+
cancellationToken);
117126
}
118-
await ExecuteDataTransferOperation(source,
119-
sourceBuilder.Build(),
120-
sink,
121-
sinkBuilder.Build(),
122-
cancellationToken);
123127
}
128+
else
129+
{
130+
succeeded = await ExecuteDataTransferOperation(source, sourceConfig, sink, sinkConfig, cancellationToken);
131+
}
132+
133+
return succeeded ? 0 : 1;
124134
}
125-
else
135+
catch (OperationCanceledException ex)
126136
{
127-
await ExecuteDataTransferOperation(source, sourceConfig, sink, sinkConfig, cancellationToken);
137+
_logger.LogDebug(ex, "Operation canceled.");
138+
Console.WriteLine();
139+
Console.WriteLine("Operation canceled. Exiting.");
140+
return 1;
128141
}
129-
130-
return 0;
131142
}
132143

133144
private static IConfigurationSection GetSinkConfig(IConfiguration combinedConfig)
@@ -148,7 +159,7 @@ private static IConfigurationSection GetSinkConfig(IConfiguration combinedConfig
148159
return config;
149160
}
150161

151-
private async Task ExecuteDataTransferOperation(IDataSourceExtension source, IConfiguration sourceConfig, IDataSinkExtension sink, IConfiguration sinkConfig, CancellationToken cancellationToken)
162+
private async Task<bool> ExecuteDataTransferOperation(IDataSourceExtension source, IConfiguration sourceConfig, IDataSinkExtension sink, IConfiguration sinkConfig, CancellationToken cancellationToken)
152163
{
153164
_logger.LogDebug("Loaded {SettingCount} settings for source {SourceName}:\n\t\t{SettingList}",
154165
sourceConfig.AsEnumerable().Count(),
@@ -168,19 +179,21 @@ private async Task ExecuteDataTransferOperation(IDataSourceExtension source, ICo
168179
await sink.WriteAsync(data, sinkConfig, source, _loggerFactory.CreateLogger(sink.GetType().Name), cancellationToken);
169180

170181
_logger.LogInformation("Data transfer complete");
182+
return true;
171183
}
172184
catch (Exception ex)
173185
{
174186
_logger.LogError(ex, "Data transfer failed");
187+
return false;
175188
}
176189
}
177190

178-
private static T GetExtensionSelection<T>(string? selectionName, List<T> extensions, string inputPrompt, CancellationToken cancellationToken)
191+
private static async Task<T> GetExtensionSelection<T>(string? selectionName, List<T> extensions, string inputPrompt, CancellationToken cancellationToken)
179192
where T : class, IDataTransferExtension
180193
{
181194
if (!string.IsNullOrWhiteSpace(selectionName))
182195
{
183-
var extension = extensions.FirstOrDefault(s => selectionName.Equals(s.DisplayName, StringComparison.OrdinalIgnoreCase));
196+
var extension = extensions.FirstOrDefault(s => s.MatchesExtensionSelection(selectionName));
184197
if (extension != null)
185198
{
186199
Console.WriteLine($"Using {extension.DisplayName} {inputPrompt}");
@@ -197,18 +210,18 @@ private static T GetExtensionSelection<T>(string? selectionName, List<T> extensi
197210

198211
string? selection = "";
199212
int input;
200-
while (!int.TryParse(selection, out input) || input > extensions.Count)
213+
while (!int.TryParse(selection, out input) || input > extensions.Count || input <= 0)
201214
{
202215
cancellationToken.ThrowIfCancellationRequested();
203-
selection = Console.ReadLine();
216+
selection = await Console.In.ReadLineAsync(cancellationToken);
204217
}
205218

206219
T selected = extensions[input - 1];
207220
Console.WriteLine($"Using {selected.DisplayName} {inputPrompt}");
208221
return selected;
209222
}
210223

211-
private IConfiguration BuildSettingsConfiguration(IConfiguration configuration, string? settingsPath, bool promptForFile, CancellationToken cancellationToken)
224+
private async Task<IConfiguration> BuildSettingsConfiguration(IConfiguration configuration, string? settingsPath, bool promptForFile, CancellationToken cancellationToken)
212225
{
213226
IConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
214227
if (!string.IsNullOrEmpty(settingsPath) && File.Exists(settingsPath))
@@ -220,7 +233,7 @@ private IConfiguration BuildSettingsConfiguration(IConfiguration configuration,
220233
else if (promptForFile)
221234
{
222235
Console.Write("Path to settings file? (leave empty to skip): ");
223-
var path = Console.ReadLine();
236+
var path = await Console.In.ReadLineAsync(cancellationToken);
224237
cancellationToken.ThrowIfCancellationRequested();
225238
if (!string.IsNullOrWhiteSpace(path))
226239
{
@@ -234,16 +247,6 @@ private IConfiguration BuildSettingsConfiguration(IConfiguration configuration,
234247
.AddConfiguration(configuration)
235248
.Build();
236249
}
237-
238-
private static bool IsYesResponse(string? response)
239-
{
240-
if (response?.Equals("y", StringComparison.CurrentCultureIgnoreCase) == true)
241-
return true;
242-
if (response?.Equals("yes", StringComparison.CurrentCultureIgnoreCase) == true)
243-
return true;
244-
245-
return false;
246-
}
247250
}
248251
}
249252
}

‎CosmosDbDataMigrationTool.sln

+9
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ EndProject
5555
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.SqlServerExtension.UnitTests", "Extensions\SqlServer\Cosmos.DataTransfer.SqlServerExtension.UnitTests\Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj", "{3E4C4ABF-D8C2-4997-A719-E756483C8D63}"
5656
EndProject
5757
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Parquet", "Parquet", "{2BD63BEE-CEE8-4FF3-8A2E-373D9F4E2FE8}"
58+
ProjectSection(SolutionItems) = preProject
59+
Extensions\Parquet\README.md = Extensions\Parquet\README.md
60+
EndProjectSection
5861
EndProject
5962
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.ParquetExtension", "Extensions\Parquet\Cosmos.DataTransfer.ParquetExtension\Cosmos.DataTransfer.ParquetExtension.csproj", "{B83FEBC7-08B6-4C25-831A-EA0C08794F67}"
6063
EndProject
@@ -85,6 +88,12 @@ EndProject
8588
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.CsvExtension", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension\Cosmos.DataTransfer.CsvExtension.csproj", "{6A3FB90C-B837-4724-A406-214D4CEA686F}"
8689
EndProject
8790
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cosmos.DataTransfer.CsvExtension.UnitTests", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension.UnitTests\Cosmos.DataTransfer.CsvExtension.UnitTests.csproj", "{40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}"
91+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{BCBBAF22-0CB5-416B-8C80-03AB2FC4D0A0}"
92+
ProjectSection(SolutionItems) = preProject
93+
Contributing.md = Contributing.md
94+
ExampleConfigs.md = ExampleConfigs.md
95+
README.md = README.md
96+
EndProjectSection
8897
EndProject
8998
Global
9099
GlobalSection(SolutionConfigurationPlatforms) = preSolution

‎Extensions/Csv/Cosmos.DataTransfer.CsvExtension/CsvFileSink.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
namespace Cosmos.DataTransfer.CsvExtension;
88

99
[Export(typeof(IDataSinkExtension))]
10-
public class CsvFileSink : CompositeSinkExtension<FileDataSink, CsvFormatWriter>
10+
public class CsvFileSink : CompositeSinkExtension<FileDataSink, CsvFormatWriter>, IAliasedDataTransferExtension
1111
{
12-
public override string DisplayName => "CSV-File";
12+
public override string DisplayName => "CSV";
13+
public IEnumerable<string> Aliases => new[] { "CSV-File" };
1314
}

‎Extensions/Csv/Cosmos.DataTransfer.CsvExtension/CsvFileSource.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
namespace Cosmos.DataTransfer.CsvExtension;
66

77
[Export(typeof(IDataSourceExtension))]
8-
public class CsvFileSource : CompositeSourceExtension<FileDataSource, CsvFormatReader>
8+
public class CsvFileSource : CompositeSourceExtension<FileDataSource, CsvFormatReader>, IAliasedDataTransferExtension
99
{
10-
public override string DisplayName => "CSV-File";
10+
public override string DisplayName => "CSV";
11+
public IEnumerable<string> Aliases => new[] { "CSV-File" };
1112
}

‎Extensions/Csv/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ The CSV extension provides formatter capabilities for reading from and writing t
77
> **Note**: When specifying the CSV extension as the Source or Sink property in configuration, utilize the names listed below.
88
99
Supported storage sinks:
10-
- File - **Csv-File**
10+
- File - **Csv**
1111
- Azure Blob Storage - **Csv-AzureBlob**
1212
- AWS S3 - **Csv-AwsS3**
1313

1414
Supported storage sources:
15-
- File - **Csv-File**
15+
- File - **Csv**
1616

1717
## Settings
1818

‎Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonFileSinkTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public async Task WriteAsync_WithFlatObjects_WritesToValidFile()
3636
{ "FilePath", outputFile }
3737
});
3838

39-
await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonDataSourceExtension(), NullLogger.Instance);
39+
await sink.WriteAsync(data.ToAsyncEnumerable(), config, new JsonFileSource(), NullLogger.Instance);
4040

4141
var outputData = JsonConvert.DeserializeObject<List<TestDataObject>>(await File.ReadAllTextAsync(outputFile));
4242

‎Extensions/Json/Cosmos.DataTransfer.JsonExtension.UnitTests/JsonRoundTripTests.cs

-86
This file was deleted.

0 commit comments

Comments
 (0)