Skip to content

Commit 4582951

Browse files
committed
Add Parquet File option as source and sink
1 parent f8184a7 commit 4582951

12 files changed

+339
-0
lines changed

Core/Cosmos.DataTransfer.Core/migrationsettings.json

+5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22
"Source": null,
33
"Sink": null,
44
"SourceSettings": {
5+
"FilePath": "test.parquet",
6+
"ConnectionString": "data source=localhost;TrustServerCertificate=true;Initial Catalog=Stackoverflow2010; user id=sa; password=HasanSavran@gmail.com",
7+
"QueryText": "SELECT TOP 50000 * FROM [StackOverflow2010].[dbo].[Posts]"
8+
59
},
610
"SinkSettings": {
11+
"FilePath": "test2.parquet"
712
},
813
"Operations": [
914
//{

CosmosDbDataMigrationTool.sln

+10
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.SqlServ
5454
EndProject
5555
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.SqlServerExtension.UnitTests", "Extensions\SqlServer\Cosmos.DataTransfer.SqlServerExtension.UnitTests\Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj", "{3E4C4ABF-D8C2-4997-A719-E756483C8D63}"
5656
EndProject
57+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Parquet", "Parquet", "{2BD63BEE-CEE8-4FF3-8A2E-373D9F4E2FE8}"
58+
EndProject
59+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.ParquetExtension", "Extensions\Parquet\Cosmos.DataTransfer.ParquetExtension\Cosmos.DataTransfer.ParquetExtension.csproj", "{B83FEBC7-08B6-4C25-831A-EA0C08794F67}"
60+
EndProject
5761
Global
5862
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5963
Debug|Any CPU = Debug|Any CPU
@@ -108,6 +112,10 @@ Global
108112
{3E4C4ABF-D8C2-4997-A719-E756483C8D63}.Debug|Any CPU.Build.0 = Debug|Any CPU
109113
{3E4C4ABF-D8C2-4997-A719-E756483C8D63}.Release|Any CPU.ActiveCfg = Release|Any CPU
110114
{3E4C4ABF-D8C2-4997-A719-E756483C8D63}.Release|Any CPU.Build.0 = Release|Any CPU
115+
{B83FEBC7-08B6-4C25-831A-EA0C08794F67}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
116+
{B83FEBC7-08B6-4C25-831A-EA0C08794F67}.Debug|Any CPU.Build.0 = Debug|Any CPU
117+
{B83FEBC7-08B6-4C25-831A-EA0C08794F67}.Release|Any CPU.ActiveCfg = Release|Any CPU
118+
{B83FEBC7-08B6-4C25-831A-EA0C08794F67}.Release|Any CPU.Build.0 = Release|Any CPU
111119
EndGlobalSection
112120
GlobalSection(SolutionProperties) = preSolution
113121
HideSolutionNode = FALSE
@@ -127,6 +135,8 @@ Global
127135
{2F075279-E8B0-4A6E-A8D9-2058B7DEC671} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201}
128136
{7A020621-77E6-4DD1-B230-50A46B4BB2B1} = {2F075279-E8B0-4A6E-A8D9-2058B7DEC671}
129137
{3E4C4ABF-D8C2-4997-A719-E756483C8D63} = {2F075279-E8B0-4A6E-A8D9-2058B7DEC671}
138+
{2BD63BEE-CEE8-4FF3-8A2E-373D9F4E2FE8} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201}
139+
{B83FEBC7-08B6-4C25-831A-EA0C08794F67} = {2BD63BEE-CEE8-4FF3-8A2E-373D9F4E2FE8}
130140
EndGlobalSection
131141
GlobalSection(ExtensibilityGlobals) = postSolution
132142
SolutionGuid = {662B3F27-70D8-45E6-A1C0-1438A9C8A542}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
<Target Name="PublishToExtensionsFolder" AfterTargets="Build" Condition=" '$(Configuration)' == 'Debug' ">
10+
<Exec Command="dotnet publish --configuration $(Configuration) --no-build -p:PublishProfile=FolderProfile" />
11+
</Target>
12+
<ItemGroup>
13+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
14+
<PackageReference Include="Parquet.Net" Version="4.8.1" />
15+
<PackageReference Include="System.ComponentModel.Composition" Version="7.0.0" />
16+
</ItemGroup>
17+
<ItemGroup>
18+
<ProjectReference Include="..\..\..\Interfaces\Cosmos.DataTransfer.Interfaces\Cosmos.DataTransfer.Interfaces.csproj" />
19+
</ItemGroup>
20+
21+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using Cosmos.DataTransfer.ParqExtension.Settings;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.Logging;
5+
using Parquet.Schema;
6+
using Parquet;
7+
using System.ComponentModel.Composition;
8+
using Parquet.Data;
9+
using System.Data;
10+
11+
namespace Cosmos.DataTransfer.ParquetExtension
12+
{
13+
[Export(typeof(IDataSinkExtension))]
14+
public class ParqDataSinkExtension : IDataSinkExtension
15+
{
16+
public string DisplayName => "Parquet";
17+
public List<ParquetDataCol> parquetDataCols = new List<ParquetDataCol>();
18+
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken)
19+
{
20+
var settings = config.Get<ParquetSinkSettings>();
21+
settings.Validate();
22+
if (settings != null && settings.FilePath != null)
23+
{
24+
logger.LogInformation("Writing to file '{FilePath}", settings.FilePath);
25+
26+
await foreach (var item in dataItems.WithCancellation(cancellationToken))
27+
{
28+
ProcessColumns(item);
29+
}
30+
var schema = CreateSchema();
31+
CreateParquetColumns();
32+
await SaveFile(schema, settings.FilePath, cancellationToken);
33+
logger.LogInformation("Completed writing data to file '{FilePath}'", settings.FilePath);
34+
}
35+
}
36+
37+
private void ProcessColumns(IDataItem item)
38+
{
39+
var itemcolumns = item.GetFieldNames();
40+
foreach (var col in itemcolumns)
41+
{
42+
var current = parquetDataCols.FirstOrDefault(c => c.ColumnName == col);
43+
var colval = item.GetValue(col);
44+
var coltype = System.Type.Missing.GetType();
45+
if (colval != null)
46+
{
47+
coltype = colval.GetType();
48+
}
49+
if (current == null)
50+
{
51+
var newcol = new ParquetDataCol(col, coltype);
52+
newcol.ColumnData.Add(colval);
53+
parquetDataCols.Add(newcol);
54+
}
55+
else if (coltype != System.Type.Missing.GetType() && current.ColumnType != coltype)
56+
{
57+
if (current != null)
58+
{
59+
current.ColumnType = coltype;
60+
if (coltype != null)
61+
{
62+
current.ParquetDataType = new DataField(col, coltype, true);
63+
}
64+
}
65+
}
66+
if (current != null)
67+
{
68+
current.ColumnData.Add(colval);
69+
}
70+
}
71+
}
72+
73+
private void CreateParquetColumns()
74+
{
75+
for (var i = 0; i < parquetDataCols.Count; i++)
76+
{
77+
78+
var current = parquetDataCols[i];
79+
switch (current.ParquetDataType.ClrType.Name)
80+
{
81+
case "String":
82+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<String?>().ToArray());
83+
break;
84+
case "Int32":
85+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<Int32?>().ToArray());
86+
break;
87+
case "Int16":
88+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<Int16?>().ToArray());
89+
break;
90+
case "Int64":
91+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<Int64?>().ToArray());
92+
break;
93+
case "DateTime":
94+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<DateTime?>().ToArray());
95+
break;
96+
case "Boolean":
97+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<Boolean?>().ToArray());
98+
break;
99+
case "Float":
100+
current.ParquetDataColumn = new Parquet.Data.DataColumn(current.ParquetDataType, current.ColumnData.Cast<float?>().ToArray());
101+
break;
102+
}
103+
}
104+
}
105+
106+
private ParquetSchema CreateSchema()
107+
{
108+
var arr = new List<Field>();
109+
for (var i = 0; i < parquetDataCols.Count; i++)
110+
{
111+
arr.Add(parquetDataCols[i].ParquetDataType);
112+
}
113+
return new ParquetSchema(arr);
114+
}
115+
116+
private async Task<bool> SaveFile(ParquetSchema schema, string filepath, CancellationToken cancellationToken)
117+
{
118+
using Stream fs = File.OpenWrite(filepath);
119+
using ParquetWriter writer = await ParquetWriter.CreateAsync(schema, fs);
120+
using ParquetRowGroupWriter groupWriter = writer.CreateRowGroup();
121+
foreach (var col in parquetDataCols)
122+
{
123+
if (col.ParquetDataColumn != null)
124+
{
125+
await groupWriter.WriteColumnAsync(col.ParquetDataColumn, cancellationToken);
126+
}
127+
}
128+
return true;
129+
}
130+
}
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using Cosmos.DataTransfer.ParqExtension.Settings;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.Logging;
5+
using Parquet.Schema;
6+
using Parquet;
7+
using System.ComponentModel.Composition;
8+
using System.Runtime.CompilerServices;
9+
using Parquet.Data;
10+
11+
namespace Cosmos.DataTransfer.ParquetExtension
12+
{
13+
[Export(typeof(IDataSourceExtension))]
14+
public class ParquetDataSourceExtension : IDataSourceExtension
15+
{
16+
public string DisplayName => "Parquet";
17+
18+
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
19+
{
20+
var coldata = new List<DataColumn>();
21+
var settings = config.Get<ParquetSinkSettings>();
22+
settings.Validate();
23+
if (settings.FilePath != null)
24+
{
25+
logger.LogInformation("Reading file '{FilePath}", settings.FilePath);
26+
27+
using Stream fs = System.IO.File.OpenRead(settings.FilePath);
28+
using ParquetReader reader = await ParquetReader.CreateAsync(fs);
29+
var numberofrows = 0;
30+
//check if number of rows are same for each column.
31+
for (int i = 0; i < reader.RowGroupCount; i++)
32+
{
33+
using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i);
34+
foreach (DataField df in reader.Schema.GetDataFields())
35+
{
36+
var temp = await rowGroupReader.ReadColumnAsync(df);
37+
if (numberofrows == 0)
38+
{
39+
numberofrows = temp.Data.Length;
40+
}
41+
else
42+
{
43+
if (numberofrows != temp.Data.Length)
44+
{
45+
logger.LogInformation("Number of rows in '{colname}' column does not match the rest.", temp.Field.Name);
46+
}
47+
}
48+
coldata.Add(temp);
49+
}
50+
}
51+
for (var i = 0; i < numberofrows; i++)
52+
{
53+
var temp = new Dictionary<string, object?>();
54+
foreach (var x in coldata)
55+
{
56+
temp.Add(x.Field.Name, x.Data.GetValue(i));
57+
58+
}
59+
yield return new ParquetDictionaryDataItem(temp);
60+
}
61+
logger.LogInformation("Completed reading '{FilePath}'", settings.FilePath);
62+
}
63+
}
64+
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
3+
namespace Cosmos.DataTransfer.ParquetExtension
4+
{
5+
public class ParquetDictionaryDataItem : IDataItem
6+
{
7+
public IDictionary<string, object?> Items { get; set; }
8+
9+
public ParquetDictionaryDataItem(IDictionary<string, object?> items)
10+
{
11+
Items = items;
12+
}
13+
14+
public IEnumerable<string> GetFieldNames()
15+
{
16+
return Items.Keys;
17+
}
18+
19+
public object? GetValue(string fieldName)
20+
{
21+
if (!Items.TryGetValue(fieldName, out var value))
22+
{
23+
return null;
24+
}
25+
return value;
26+
}
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Console.WriteLine("Staring Parquet extension");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<!--
3+
https://go.microsoft.com/fwlink/?LinkID=208121.
4+
-->
5+
<Project>
6+
<PropertyGroup>
7+
<Configuration>Release</Configuration>
8+
<Platform>Any CPU</Platform>
9+
<PublishDir>..\..\..\Core\Cosmos.DataTransfer.Core\bin\Debug\net6.0\Extensions</PublishDir>
10+
<PublishProtocol>FileSystem</PublishProtocol>
11+
<_TargetId>Folder</_TargetId>
12+
</PropertyGroup>
13+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using System.ComponentModel.DataAnnotations;
3+
4+
namespace Cosmos.DataTransfer.ParqExtension.Settings
5+
{
6+
public class ParquetSinkSettings : IDataExtensionSettings
7+
{
8+
[Required]
9+
public string? FilePath { get; set; }
10+
11+
// Add option to set a custom row group size for very large files.
12+
//public int? CustomRowGroupSize { get; set; }
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using System.ComponentModel.DataAnnotations;
3+
4+
namespace Cosmos.DataTransfer.ParqExtension.Settings
5+
{
6+
public class ParquetSourceSettings : IDataExtensionSettings
7+
{
8+
[Required]
9+
public string? FilePath { get; set; }
10+
}
11+
}

Interfaces/Cosmos.DataTransfer.Interfaces/Cosmos.DataTransfer.Interfaces.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<ItemGroup>
1111
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
1212
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
13+
<PackageReference Include="Parquet.Net" Version="4.8.1" />
1314
</ItemGroup>
1415

1516
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using Parquet.Data;
2+
using Parquet.Schema;
3+
4+
namespace Cosmos.DataTransfer.Interfaces
5+
{
6+
public class ParquetDataCol
7+
{
8+
public string ColumnName { get; set; }
9+
public Type ColumnType { get; set; }
10+
public IList<object> ColumnData { get; set; }
11+
12+
public DataField ParquetDataType { get; set; }
13+
14+
public Parquet.Data.DataColumn ParquetDataColumn { get; set; }
15+
16+
public ParquetDataCol()
17+
{
18+
ColumnType = Type.Missing.GetType();
19+
ColumnData = new List<object>();
20+
}
21+
22+
public ParquetDataCol(string name, Type coltype)
23+
{
24+
ColumnName = name;
25+
ColumnType = coltype;
26+
ColumnData = new List<object>();
27+
if (coltype != System.Type.Missing.GetType())
28+
{
29+
ParquetDataType = MapDataType(name, coltype);
30+
}
31+
}
32+
33+
private static DataField MapDataType(string colname, Type coltype)
34+
{
35+
return new DataField(colname, coltype, true);
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)