This project provides a complete data pipeline using Azure services, enabling seamless data ingestion, processing, and querying. The infrastructure is provisioned using Terraform, ensuring easy deployment and configuration.
β¨ This setup ensures a scalable, manageable, and efficient data pipeline on Azure! β¨
This Databricks workflow and Azure Data Factory is designed to process car sales data using a Medallion Architecture Framework, including Silver (processed data) and Gold (analytical data model) layers.
-
Silver Layer (Processed Data)
- Reads raw data from Azure Data Lake Storage (ADLS) - Bronze Layer.
- Performs data transformations, including:
- Extracting
model_category
fromModel_ID
- Calculating
RevPerUnit
(Revenue per Unit Sold)
- Extracting
- Writes transformed data back to Silver Layer in ADLS.
-
Gold Layer (Data Model)
- Creates Dimension Tables:
dim_branch
,dim_dealer
,dim_model
,dim_date
- Builds Fact Table (
fact_sales
) by joining with dimensions. - Implements Delta Table (SCD Type 1 - UPSERT) for incremental updates.
- Creates Dimension Tables:
- Uses Watermarking Strategy to handle incremental data loads.
- dbutils.widgets are used to define the incremental_flag.
- Fact Table (
fact_sales
) uses Merge (UPSERT) Strategy with Delta Tables.
- The workflow consists of:
- Silver Data Processing
- Creating Dimension Tables (
dim_branch
,dim_date
,dim_dealer
,dim_model
) - Building the Fact Table (
fact_sales
)
- Runs on Databricks Cluster and uses Databricks SQL for querying.
β
Delta Tables for efficient storage and incremental processing.
β
Parquet Format for optimized read/write performance.
β
Data Validation via schema enforcement and joins.
β
Databricks SQL Queries for efficient querying and reporting.
π Implement CI/CD Pipelines using GitHub Actions + Databricks Repos.
π Enable Monitoring & Alerting with Databricks Job Alerts / Azure Monitor.
This workflow ensures a scalable, efficient, and maintainable data pipeline for analytics. π
All resources in this project are deployed on Azure and managed with Terraform Registry.
-
Deploy Azure Database Server π
Terraform Path:/infra/00-infra-azure-mysql
- Creates an Azure MySQL Server as the main data source.
-
Deploy Azure Databricks Workspace π‘
Terraform Path:/infra/01-infra-adb-workspace
- Sets up an Azure Databricks Workspace for data transformation and analytics.
-
Setup Unity Catalog for Governance π
Terraform Path:/infra/02-infra-adb-unity-catalog
- Implements Unity Catalog for data governance and access control.
-
Deploy Azure Data Factory Workspace π
Terraform Path:/infra/03-infra-adf-workspace
- Creates an Azure Data Factory (ADF) instance for data ingestion and orchestration.
- Azure MySQL Server
- Azure MySQL Database
- Azure Databricks Workspace
- Storage Account (Datalake, Unity Catalog, Audit For MySQL)
- Azure Data Factory
- Azure Virtual Machine (Spark Cluster)
1. change the data set file Go to Source Data Pipeline Flow
2. Run Incremental Pipeline Go to Incremental Data Pipeline Flow
3. Run Databricks Workflow Go to Databrick Workflow
-
Source Data Ingestion π₯
-
Data Sink π¦
-
Mapping π
- Data is processed to mapping between Dataset. and Azure MySQL Database.
-
Run Jobs βοΈ
- Triggers ADF Pipeline to process data.
-
Query Processed Data π
- The processed data is ready to use.
For more Information about Source Pipeline
Overview
This pipeline is designed to perform incremental data loading by leveraging lookup activities to determine the last and current load timestamps. The data is then copied and processed before updating the watermark.
-
Lookup last_load π
-
Lookup current_load π
-
Copy_Increm_Data π₯
-- Query SELECT * FROM source_cars_data WHERE Date_ID > '@{activity('last_load').output.value[0].last_load}' AND Date_ID <= '@{activity('current_load').output.value[0].max_date}'
-
WatermarkUpdate π
-
Query Processed Data π
- The processed data is ready to use.
Job Complete
Watermark Table Updated
Overview
Databricks with Unity Catalog is a unified data governance solution designed to manage, secure, and organize data across multiple cloud environments. It extends Databricks capabilities by adding a centralized metadata layer, fine-grained access controls, and robust security features to ensure data compliance and governance at scale.
For more Information about Databrick with Unity Catalog for Data Governance
This Databricks notebook processes raw data from the Bronze layer, applies transformations, and writes the cleaned data to the Silver layer for further analysis.
- Reads raw data from the Bronze layer stored in Azure Data Lake Storage (ADLS).
- Uses Parquet format for optimized storage and performance.
df = spark.read.format('parquet') \
.option('inferSchema', True) \
.load('abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/bronze/rawdata')
- Extracts
model_category
fromModel_ID
. - Ensures
Units_Sold
is stored as a StringType. - Calculates
RevPerUnit
by dividingRevenue
byUnits_Sold
.
from pyspark.sql.functions import col, split, sum
from pyspark.sql.types import StringType
df = df.withColumn('model_category', split(col('Model_ID'), '-')[0])
df = df.withColumn('RevPerUnit', col('Revenue')/col('Units_Sold'))
- Displays data interactively.
- Aggregates total units sold by Year and BranchName.
display(df.groupBy('Year', 'BranchName').agg(sum('Units_Sold').alias('Total_Units')).sort('Year', 'Total_Units', ascending=[1,0]))
- Writes the transformed data to the Silver layer in Parquet format.
- Uses overwrite mode to refresh data.
df.write.format('parquet') \
.mode('overwrite') \
.option('path', 'abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/silver/') \
.save()
- Reads the processed Silver data using SQL.
SELECT * FROM parquet.`abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/silver`
This Databricks notebook is designed for processing and managing dimension tables in the Gold Layer of the data lake. It supports both initial loads and incremental updates using Slowly Changing Dimension (SCD) Type 1 (Upsert).
This notebook structure applies to the following Gold Dimension tables:
gold_dim_branch
gold_dim_date
gold_dim_dealer
gold_dim_model
- Uses
dbutils.widgets
to determine whether the run is initial (0
) or incremental (1
).
dbutils.widgets.text('incremental_flag', '0')
incremental_flag = dbutils.widgets.get('incremental_flag')
- Reads the source data from the Silver Layer.
- Extracts relevant columns for the dimension table.
df_src = spark.sql('''
select distinct(Branch_ID) as Branch_ID, BranchName
from parquet.`abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/silver`
''')
- If the dimension table exists, retrieve the current records.
- Otherwise, create an empty schema.
if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):
df_sink = spark.sql('''
SELECT dim_branch_key, Branch_ID, BranchName
FROM cars_catalog.gold.dim_branch
''')
else:
df_sink = spark.sql('''
SELECT 1 as dim_branch_key, Branch_ID, BranchName
FROM parquet.`abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/silver`
WHERE 1=0
''')
- Existing Records (
df_filter_old
): Already present in the dimension table. - New Records (
df_filter_new
): Need to be inserted with a surrogate key.
df_filter_old = df_filter.filter(col('dim_branch_key').isNotNull())
df_filter_new = df_filter.filter(col('dim_branch_key').isNull()).select(df_src['Branch_ID'], df_src['BranchName'])
- Fetch the max surrogate key from the existing table.
- Assign a new key for incremental records.
if incremental_flag == '0':
max_value = 1
else:
max_value_df = spark.sql("select max(dim_branch_key) from cars_catalog.gold.dim_branch")
max_value = max_value_df.collect()[0][0] + 1
df_filter_new = df_filter_new.withColumn('dim_branch_key', max_value + monotonically_increasing_id())
- Combines existing and new records into a final DataFrame.
df_final = df_filter_new.union(df_filter_old)
- Uses Delta Lake Merge to update existing records and insert new ones.
from delta.tables import DeltaTable
if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):
delta_tbl = DeltaTable.forPath(spark, "abfss://gold@carcddatalake.dfs.core.windows.net/dim_branch")
delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_branch_key = src.dim_branch_key") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
df_final.write.format("delta") \
.mode('overwrite') \
.option("path", "abfss://gold@carcddatalake.dfs.core.windows.net/dim_branch") \
.saveAsTable("cars_catalog.gold.dim_branch")
- Query the updated dimension table.
SELECT * FROM cars_catalog.gold.dim_branch;
- The logic applies to all dimension tables (
dim_branch
,dim_date
,dim_dealer
,dim_model
). - Ensure that
incremental_flag
is set correctly before running. - The SCD Type 1 Upsert ensures that the latest data is always reflected in the Gold layer.
It is located in the repository /codepipelin/adb-workspace for another DIM Nootbook.
This notebook is responsible for creating and updating the Fact Sales Table in the Gold Layer of the data lake. It integrates Silver Layer data with all necessary Dimension Tables (DIMS) to build a structured fact table.
- The notebook starts by reading raw transactional data from the Silver Layer stored in Azure Data Lake Storage (ADLS).
- All required dimension tables are fetched from the Gold Layer:
dim_dealer
dim_branch
dim_model
dim_date
- The fact table is built by joining the Silver Data with the respective Dimension Tables to bring in their keys.
- The final fact table is written into the Gold Layer as a Delta Table with support for incremental updates using the following logic:
- If the table exists, an upsert (MERGE) is performed based on matching keys.
- If the table does not exist, the data is written as a new table.
- A final query is run to verify that the
factsales
table has been correctly updated.
- Source Data (Silver Layer):
abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/silver
- Target Data (Gold Layer - Fact Sales Table):
abfss://dev-catalog@adbcdadventustorageaccuc.dfs.core.windows.net/gold/factsales
- Apache Spark (PySpark)
- Azure Data Lake Storage (ADLS)
- Delta Lake
- Databricks SQL
- This notebook ensures Slowly Changing Dimension (SCD) Type 1 processing for updates.
- Data integrity is maintained using JOIN operations and Delta Lake Merge.
- The notebook is designed to run in both initial load and incremental update modes.
Next Steps:
- Ensure all dimension tables are properly updated before running this notebook.
- Validate
factsales
table data in Databricks SQL after execution.
For gold fact located in /codepipelin/adb-workspace/gold_fact_sales.py
This workflow is designed to process and transform car sales data through different stages in a Databricks environment. The workflow consists of multiple Databricks notebooks, each responsible for handling specific tasks within the data pipeline.
The workflow follows a structured process to transform Silver Data into a structured Fact Table by leveraging dimension tables.
- Reads raw sales data from the Silver Layer.
- Performs necessary data transformations and data cleaning.
- Serves as a source for dimension and fact tables.
Each dimension table extracts and structures specific attributes from the Silver Layer to facilitate data normalization and improve query performance.
- Dim_Branch
- Extracts and stores unique Branch details.
- Dim_Date
- Creates a structured Date dimension for time-based analysis.
- Dim_Dealer
- Stores Dealer-related information.
- Dim_Model
- Extracts and organizes Model details.
- Combines Silver Data with all Dimension Tables.
- Stores sales transactions along with foreign keys from dimensions.
- Ensures historical tracking and enables analytical queries.
- Silver_Data Notebook runs first to prepare the source data.
- Each Dimension Notebook (Dim_Branch, Dim_Date, Dim_Dealer, Dim_Model) runs in parallel to extract relevant information.
- Once all dimensions are ready, the Fact_Sales Notebook runs to construct the final fact table.
- Databricks Notebooks
- Apache Spark (PySpark SQL, DataFrames)
- Delta Lake (for incremental and historical tracking)
- Azure Data Lake Storage (ADLS) (for storing Silver and Gold data)
β Modular Design - Each step is handled in separate notebooks for better manageability. β Parallel Processing - Dimensions are processed in parallel for faster execution. β Incremental Updates - Uses Delta Lake for efficient data updates and historical tracking. β Optimized Query Performance - Fact table is structured for efficient reporting and analysis.
- Implement orchestration using Databricks Workflows or Azure Data Factory (ADF).
- Enhance data validation before ingestion.
- Optimize storage and indexing for better performance.
π― This workflow ensures a scalable, high-performance data model for analyzing car sales effectively.