Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mp airflow improvements #44

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [DAGs - Generate DAGs from yml](/how-tos/airflow/generate-dags-from-yml.md)
- [DAGs - Run ADF Pipelines](/how-tos/airflow/run-adf-pipeline.md)
- [DAGs - Run Airbyte sync jobs](/how-tos/airflow/run-airbyte-sync-jobs.md)
- [DAGS - Run Amazon s3 jobs](/how-tos/airflow/run-amazon-s3-jobs.md)
- [DAGs - Run dbt](/how-tos/airflow/run-dbt.md)
- [DAGs - Run Databricks Notebooks](/how-tos/airflow/run-databricks-notebook.md)
- [DAGs - Run Fivetran sync jobs](/how-tos/airflow/run-fivetran-sync-jobs.md)
Expand Down Expand Up @@ -86,6 +87,7 @@
- [Service Connections](/reference/admin-menu/service_connections.md)
- [Users](/reference/admin-menu/users.md)
- [Airflow](/reference/airflow/)
- [Airflow Best Practices](/reference/airflow/airflow-best-practices.md)
- [Airflow Config Defaults](/reference/airflow/airflow-config-defaults.md)
- [DAG Generators](/reference/airflow/dag-generators.md)
- [Datacoves Operators](/reference/airflow/datacoves-operator.md)
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/how-tos/airflow/assets/s3_sample_data.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 3 additions & 2 deletions docs/how-tos/airflow/dynamically-set-schedule.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# How to Dynamically set the schedule Interval

The default schedule for DAG development is `paused`. However, there may be scenarios where this default configuration doesn't align with your requirements. For instance, you might forget to add/adjust the schedule interval before deploying to production, leading to unintended behaviors.
By default, DAGs are created with a `paused` state in Airflow, but you can change this with the `is_paused_on_creation=True` option. However, in some cases, you might forget to configure the schedule interval before pushing your DAG to production, which can lead to unexpected results.

To mitigate such risks, a practical approach is to dynamically configure the schedule according to the environment development or production. This can be done by implementing a function named `get_schedule`. This function will determine the appropriate schedule based on the current environment, ensuring that DAGs operate correctly across different stages of deployment.
To avoid this, a more flexible approach is to dynamically set the schedule interval based on your environment (development or production). By using a function called get_schedule, you can ensure that the correct schedule is applied depending on whether you're in a development or production environment. This method helps reduce mistakes and ensures consistent scheduling behavior across environments.

Here is how to achieve this:

Expand Down Expand Up @@ -60,6 +60,7 @@ from orchestrate.python_scripts.get_schedule import get_schedule
"email": "gomezn@example.com",
"email_on_failure": True,
},
is_paused_on_creation=True,
catchup=False,
tags=["version_8"],
description="Datacoves Sample dag",
Expand Down
136 changes: 136 additions & 0 deletions docs/how-tos/airflow/run-amazon-s3-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Loading S3 Files into Snowflake

## Schema Evolution

Snowflake has a built-in feature for [schema evolution](https://docs.snowflake.com/en/user-guide/data-load-schema-evolution). It allows us to create the initial table using a file as a template and then it will automatically handle any changes to the schema going forward.

### Caveats and limitations

Schema evolution is [limited to a change that adds a maximum of 10 columns or evolving no more than 1 schema per COPY operation](https://docs.snowflake.com/en/user-guide/data-load-schema-evolution#usage-notes)

You cannot go from a column containing `NUMBER` values to `VARCHAR`, but you can go from `VARCHAR` to `NUMBER` as it will insert it as `VARCHAR`.

**Step 1:** In a Snowflake SQL worksheet Navigate to the database and schema where you will be loading the data to.

**Step 2:** Create a custom file format for the data that you are planning to load. In this example we are using a custom CSV file format.

```
CREATE OR REPLACE FILE FORMAT MY_CSV_FORMAT
TYPE = 'CSV'
PARSE_HEADER = TRUE
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
```
**Step 3:** Now we will create a Snowflake stage and link our custom file format to it.
```
CREATE OR REPLACE STAGE TEST_STAGE
FILE_FORMAT = MY_CSV_FORMAT;
```

**Step 4:** We are now going to upload a csv file to this Snowflake stage that we will use as a template to create the table where data will be loaded. You can do this via the Snowflake UI or by using the SnowSQL command line tool.

This example will use SnowSQL and our data looks like this:

![Sample data](assets/s3_sample_data.jpg)

The SnowSQL command to upload the file to the Snowflake stage is below:
```
>PUT file://test_file.csv @TEST_STAGE;
```

Where test_file.csv is in the same folder from where we are running the SnowSQL command.

**Step 5:** Once the file has been uploaded to the test stage, we create the initial table using it as a template and enabling schema evolution.

```
CREATE OR REPLACE TABLE TEST_TABLE
ENABLE_SCHEMA_EVOLUTION== TRUE
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION->'@TEST_STAGE/test_file.csv.gz,
FILE_FORMAT=>'my_csv_format'
)
)
);
```

The TEST_TABLE schema now looks like this:

![test_table](assets/s3_test_table_schema.jpg)

However, the table does not have any data loaded into it.

![Empty test_table](assets/s3_test_table_empty.jpg)

**Step 6:** To load the data from the file we used as a template we use the following COPY INTO SQL.

```
COPY INTO TEST_TABLE
FROM '@TEST_STAGE/test_file.csv.gz'
FILE_FORMAT = 'my_csv_format'
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
```

And we can now see the data in the table:

![test_table copied](assets/s3_test_table_copied.jpg)

**Step 7:** Now we’re going to load another file into TEST_TABLE that has an additional column.

![Test data additional column](assets/s3_test_table_additional_col.jpg)

Again, we will use the SnowSQL PUT command seen below:

```
PUT files://test_file_copy.csv @TEST_STAGE;
```

**Step 8:** Now we can run another COPY INTO statement that references the new file.
```
COPY INTO TEST_TABLE
FROM '@TEST_STAGE/test_file_copy.csv.gz'
FILE_FORMAT = 'my_csv_format'
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
```

And now the table has an additional column called COUNTRY_CODE:

![test_table additional column](assets/s3_test_table_additional_call_snowflake.jpg)

## Loading JSON data into a variant column

If the data that you want to load is in JSON format and the schema is likely to change then a recommended pattern is to load the JSON into a single Snowflake variant column. This allows you to parse out and model the data downstream without having to worry about the change in schema.

**Step 1:** 1. Create a Snowflake table with a single variant column.

```
CREATE OR REPLACE TABLE VARIABLE_TABLE (MAIN_COLUMN VARIANT);
```

**Step 2:** Now create a custom file format for the JSON data.

```
CREATE OR REPLACE FILE FORMAT MY_JSON_FORMAT
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE;
```

**Step 3:** After this we create a Snowflake stage that uses the JSON custom file format.

```
CREATE OR REPLACE STAGE JSON_STAGE
FILE_FORMAT = MY_JSON_FORMAT;
```

**Step 4:** We can now load JSON files that have been staged using the following COPY INTO command.

```
COPY INTO VARIANT_TABLE
FROM @JSON_STAGE
```

Our variant table now looks like this:

![json variant table](assets/json_variant_table.jpg)

2 changes: 0 additions & 2 deletions docs/how-tos/airflow/run-dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ See [Datacoves Operators](reference/airflow/datacoves-operator.md) for more info

**Step 2:** Paste in the code below and be sure to replace information such as name, email and model name with your own.

>[!NOTE]The name of the DAG will be the name you set for the file. ie) my_sample_dag.py = my_sample_dag

### Python version

```python
Expand Down
2 changes: 1 addition & 1 deletion docs/how-tos/datacoves/how_to_configure_azure_DevOps.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ You will need to gather the following Application information to configure your

### Secret Based Authentication

You should have the secret value you save in an [earlier section](how-tos/datacoves/how_to_clone_with_azure.md#secret)
You should have the secret value you save in an [earlier section](how-tos/datacoves/how_to_configure_azure_DevOps.md#secret)

### Repo SSH and HTTP urls

Expand Down
4 changes: 2 additions & 2 deletions docs/how-tos/datacoves/how_to_projects.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ A Project configuration consists of the following fields:
- **Azure HTTPS Clone url** Cloning url found in Azure DevOps Portal
- **Tenant ID:** ID found in Azure Portal
- **Application ID:** ID found in Azure Portal
- **Client Secret:** This will be the [secret value](how-tos/datacoves/how_to_clone_with_azure.md#secret) found in Azure Portal.
- **Client Secret:** This will be the [secret value](how-tos/datacoves/how_to_configure_azure_DevOps.md#secret) found in Azure Portal.
- **Release Branch:** This will be the branch you would like to clone. It should be `main`

- **Azure DataOps Certificate** When Azure DataOps Certificate is selected, a certificate is needed for secure communication. See this [how-to guide on configuring Azure DevOps](how-tos/datacoves/how_to_configure_azure_DevOps.md) for detailed configuration information.
- **Certificate PEM file**: You will need to copy the PEM file to your desktop and [upload in Azure](how-tos/datacoves/how_to_clone_with_azure.md#certificate).
- **Certificate PEM file**: You will need to copy the PEM file to your desktop and [upload in Azure](how-tos/datacoves/how_to_configure_azure_DevOpsmd#certificate).
- **Git SSH url:** Cloning url found in Azure DevOps Portal
- **Azure HTTPS Clone url** Cloning url found in Azure DevOps Portal
- **Tenant ID:** ID found in Azure Portal
Expand Down
32 changes: 32 additions & 0 deletions docs/reference/airflow/airflow-best-practices.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Airflow Best Practices

This page should serve as a reference for tips and tricks that we recommend for the best Airflow experience. Please read the official [Airflow Best Practices doc](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html) first.

## Table of Contents
- [Start Date](/reference/airflow/airflow-best-practices.md#start-date)

This page aims to be a reference for airflow recommendations.

### Start Date

Do not use [dynamic scheduled dates](https://infinitelambda.com/airflow-start-date-execution-date/). Always set your start date for the day before or sooner and set `catchup=false` to avoid running additional runs:

```python
from pendulum import datetime
from airflow.decorators import dag

@dag(
default_args=("start_date": datetime(2023, 12, 29), # Set this to the day before or earlier
"owner": "Noel Gomez",
"email": "gomezn@example.com",
"email_on_failure": True,
),
dag_id="sample_dag",
schedule="@daily",
catchup=False, # Set this to false to avoid additional catchup runs
tags=["version_1"],
description="Datacoves Sample dag",
)
...
```