Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): Add grouping by namespace option to kedro-airflow #1031

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
34 changes: 34 additions & 0 deletions kedro-airflow/kedro_airflow/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,37 @@ def dfs(cur_node_name: str, component: int) -> None:
group_dependencies[new_name_parent].append(new_name_child)

return group_to_seq, group_dependencies


def group_by_namespace(
pipeline: Pipeline,
) -> tuple[dict[str, list[Node]], dict[str, list[str]]]:
"""
Groups nodes based on their namespace using Pipeline's grouped_nodes_by_namespace property.
Non-namespaced nodes are assigned to a default namespace.
"""
nodes_by_namespace: dict[str, list[Node]] = {}
dependencies_by_namespace: dict[str, list[str]] = {}

grouped_nodes = pipeline.grouped_nodes_by_namespace

node_to_namespace = {}

for group_name, group_info in grouped_nodes.items():
ns = group_info["name"] if group_info["type"] == "namespace" else "__default__"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to set a default. Ideally it should be -

  • group the namespace
  • individual nodes per task if not namespaced

Which is how the grouped_by_namespace property dict also returns to. I think the issue with the default would be that it could create a somewhat circular dependencies with the execution. For example when pipeline1 is not namespaced i.e. is default, pipeline2 is namespaced and pipeline3 is not namespaced, the execution order would be impossible to determine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a question that I had yeah, it didn't follow the behavior of the results that come from the grouped_nodes_by_namespace function but it made sense when I was writing it.

nodes_by_namespace.setdefault(ns, [])
dependencies_by_namespace.setdefault(ns, [])
nodes_by_namespace[ns].extend(group_info["nodes"])
for node in group_info["nodes"]:
node_to_namespace[node] = ns

for ns, nodes in nodes_by_namespace.items():
dependent_namespaces = set()
for node in nodes:
for parent in pipeline.node_dependencies.get(node, []):
parent_ns = node_to_namespace.get(parent, "__default__")
if parent_ns != ns:
dependent_namespaces.add(parent_ns)
dependencies_by_namespace[ns] = list(dependent_namespaces)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to do this? pipeline.grouped_nodes_by_namespace also returns the dependencies for each namespace right? https://github.com/kedro-org/kedro/blob/main/kedro/pipeline/pipeline.py#L398


return nodes_by_namespace, dependencies_by_namespace
10 changes: 10 additions & 0 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str)
help="Group nodes with at least one MemoryDataset as input/output together, "
"as they do not persist between Airflow operators.",
)
@click.option(
"-gn",
"--group-by-namespace",
is_flag=True,
default=False,
help="Groups nodes based on their namespace using Kedro's grouped_nodes_by_namespace property.",
)
@click.option(
"--tags",
type=str,
Expand Down Expand Up @@ -149,6 +156,7 @@ def create( # noqa: PLR0913, PLR0912
target_path,
jinja_file,
group_in_memory,
group_by_namespace,
tags,
params,
conf_source,
Expand Down Expand Up @@ -218,6 +226,8 @@ def create( # noqa: PLR0913, PLR0912
# topological sort order obtained from pipeline.nodes, see group_memory_nodes()
# implementation
nodes, dependencies = group_memory_nodes(context.catalog, pipeline)
elif group_by_namespace:
nodes, dependencies = group_by_namespace(pipeline)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call the method that @ankatiyar added to the framework?

Copy link
Contributor Author

@lrcouto lrcouto Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's calling Pipeline.grouped_nodes_by_namespace, I based it on Ankita's prototype. You think it could be called there directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see now. I guess I didn't expect there still be so much code needed to process the result from pipeline.grouped_nodes_by_namespace

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this error: TypeError: 'bool' object is not callable
I think it's because the group_by_namespace flag (which is boolean) and the function group_by_namespace() are called the same thing. Maybe it can be called group_namespace_nodes() similar to group_memory_nodes() it's also not been imported above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good call, I was trying to solve that.

else:
# To keep the order of nodes and dependencies deterministic - nodes are
# iterated in the topological sort order obtained from pipeline.nodes and
Expand Down
95 changes: 94 additions & 1 deletion kedro-airflow/tests/test_node_grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline

from kedro_airflow.grouping import _is_memory_dataset, group_memory_nodes
from kedro_airflow.grouping import (
_is_memory_dataset,
group_by_namespace,
group_memory_nodes,
)


class TestDataset(AbstractDataset):
Expand Down Expand Up @@ -87,6 +91,63 @@ def identity_one_to_one(x):
)


def mock_kedro_pipeline_with_namespaces() -> Pipeline:
def identity_one_to_one(x):
return x

return Pipeline(
[
node(
func=identity_one_to_one,
inputs="ds1",
outputs="ds2",
name="f1",
namespace="namespace1",
),
node(
func=lambda x: (x, x),
inputs="ds2",
outputs=["ds3", "ds4"],
name="f2",
namespace="namespace1",
),
node(
func=identity_one_to_one,
inputs="ds3",
outputs="ds5",
name="f3",
namespace="namespace2",
),
node(
func=identity_one_to_one,
inputs="ds3",
outputs="ds6",
name="f4",
namespace="namespace2",
),
node(
func=identity_one_to_one,
inputs="ds4",
outputs="ds8",
name="f5",
namespace="namespace3",
),
node(
func=identity_one_to_one,
inputs="ds6",
outputs="ds7",
name="f6",
),
node(
func=lambda x, y: x,
inputs=["ds3", "ds6"],
outputs="ds9",
name="f7",
),
]
)


@pytest.mark.parametrize(
"all_nodes,memory_nodes,expected_nodes,expected_dependencies",
[
Expand Down Expand Up @@ -166,3 +227,35 @@ def test_is_memory_dataset(
assert _is_memory_dataset(mock_catalog, node_name)
else:
assert not _is_memory_dataset(mock_catalog, node_name)


@pytest.mark.parametrize(
"pipeline, expected_nodes, expected_dependencies",
[
(
mock_kedro_pipeline_with_namespaces(),
{
"__default__": ["f6", "f7"],
"namespace1": ["namespace1.f1", "namespace1.f2"],
"namespace2": ["namespace2.f3", "namespace2.f4"],
"namespace3": ["namespace3.f5"],
},
{
"__default__": ["namespace1", "namespace2"],
"namespace1": [],
"namespace2": ["namespace1"],
"namespace3": ["namespace1"],
},
),
],
)
def test_group_by_namespace(pipeline, expected_nodes, expected_dependencies):
"""Test grouping of nodes by namespace."""
nodes_by_namespace, dependencies_by_namespace = group_by_namespace(pipeline)

nodes_by_namespace = {
ns: [node.name for node in nodes] for ns, nodes in nodes_by_namespace.items()
}

assert nodes_by_namespace == expected_nodes
assert dependencies_by_namespace == expected_dependencies
Loading