Skip to content

Commit ca46a05

Browse files
authored
feat(airflow): Add --conf-source to DAG created with kedro-airflow (kedro-org#712)
* Add --conf-source to kedro-airflow Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * Add conf_source to operator Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * Set default behavour Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * fix tests Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * fix tests Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * Release notes + remove print Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * Turn of path exist check Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> * Turn of path resolving Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com> --------- Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com>
1 parent f157e1a commit ca46a05

File tree

4 files changed

+34
-1
lines changed

4 files changed

+34
-1
lines changed

kedro-airflow/RELEASE.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Upcoming Release
2+
* Added support to specify `--conf-source` which would point to the runtime configuration directory to be used for running the DAG in airflow. This configuration path is added to the generated DAG.
23

34
# Release 0.9.0
45
* Sort DAGs to make sure `kedro airflow create` is deterministic.

kedro-airflow/kedro_airflow/airflow_dag_template.j2

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class KedroOperator(BaseOperator):
2020
node_name: str | list[str],
2121
project_path: str | Path,
2222
env: str,
23+
conf_source: str,
2324
*args, **kwargs
2425
) -> None:
2526
super().__init__(*args, **kwargs)
@@ -28,10 +29,11 @@ class KedroOperator(BaseOperator):
2829
self.node_name = node_name
2930
self.project_path = project_path
3031
self.env = env
32+
self.conf_source = conf_source
3133

3234
def execute(self, context):
3335
configure_project(self.package_name)
34-
with KedroSession.create(self.project_path, env=self.env) as session:
36+
with KedroSession.create(self.project_path, env=self.env, conf_source=self.conf_source) as session:
3537
if isinstance(self.node_name, str):
3638
self.node_name = [self.node_name]
3739
session.run(self.pipeline_name, node_names=self.node_name)
@@ -41,6 +43,8 @@ env = "{{ env }}"
4143
pipeline_name = "{{ pipeline_name }}"
4244
project_path = Path.cwd()
4345
package_name = "{{ package_name }}"
46+
conf_source = "{{ conf_source }}" or Path.cwd() / "conf"
47+
4448

4549
# Using a DAG context manager, you don't have to specify the dag property of each task
4650
with DAG(
@@ -68,6 +72,7 @@ with DAG(
6872
node_name={% if node_list | length > 1 %}[{% endif %}{% for node in node_list %}"{{ node.name | safe }}"{% if not loop.last %}, {% endif %}{% endfor %}{% if node_list | length > 1 %}]{% endif %},
6973
project_path=project_path,
7074
env=env,
75+
conf_source=conf_source,
7176
),
7277
{% endfor %} }
7378

kedro-airflow/kedro_airflow/plugin.py

+12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
`--tags tag1,tag2`."""
3535
DEFAULT_RUN_ENV = "local"
3636
DEFAULT_PIPELINE = "__default__"
37+
CONF_SOURCE_HELP = """Path to the configuration folder or archived file to be used in the Airflow DAG."""
3738

3839

3940
@click.group(name="Kedro-Airflow")
@@ -134,6 +135,12 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str)
134135
help=PARAMS_ARG_HELP,
135136
callback=_split_params,
136137
)
138+
@click.option(
139+
"--conf-source",
140+
type=click.Path(exists=False, file_okay=True, resolve_path=False),
141+
help=CONF_SOURCE_HELP,
142+
default=None,
143+
)
137144
@click.pass_obj
138145
def create( # noqa: PLR0913, PLR0912
139146
metadata: ProjectMetadata,
@@ -144,9 +151,13 @@ def create( # noqa: PLR0913, PLR0912
144151
group_in_memory,
145152
tags,
146153
params,
154+
conf_source,
147155
convert_all: bool,
148156
):
149157
"""Create an Airflow DAG for a project"""
158+
159+
if conf_source is None:
160+
conf_source = ""
150161
if convert_all and pipeline_names != (DEFAULT_PIPELINE,):
151162
raise click.BadParameter(
152163
"The `--all` and `--pipeline` option are mutually exclusive."
@@ -228,6 +239,7 @@ def create( # noqa: PLR0913, PLR0912
228239
pipeline_name=name,
229240
package_name=package_name,
230241
pipeline=pipeline,
242+
conf_source=conf_source,
231243
**dag_config,
232244
).dump(str(dag_filename))
233245

kedro-airflow/tests/test_plugin.py

+15
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,18 @@ def test_create_airflow_all_and_pipeline(cli_runner, metadata):
376376
"Error: Invalid value: The `--all` and `--pipeline` option are mutually exclusive."
377377
in result.stdout
378378
)
379+
380+
381+
def test_create_airflow_conf_source(cli_runner, metadata):
382+
command = ["airflow", "create", "--conf-source", "conf"]
383+
result = cli_runner.invoke(commands, command, obj=metadata)
384+
assert result.exit_code == 0
385+
dag_file = metadata.project_path / "airflow_dags" / "fake_project_dag.py"
386+
387+
assert dag_file.exists()
388+
389+
expected_airflow_dag = 'conf_source = "conf" or Path.cwd() / "conf"'
390+
with dag_file.open(encoding="utf-8") as f:
391+
dag_code = [line.strip() for line in f.read().splitlines()]
392+
assert expected_airflow_dag in dag_code
393+
dag_file.unlink()

0 commit comments

Comments
 (0)