-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(airflow): Add grouping by namespace option to kedro-airflow #1031
base: main
Are you sure you want to change the base?
Changes from 12 commits
1e34dd1
7e03406
9eeca85
bf5f9fe
42bb0f2
b9a01e0
d4f9327
b51354d
d1cda4d
35b81b1
d178566
43ff4bc
339f3ba
e1f598e
ab2af51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,3 +113,37 @@ def dfs(cur_node_name: str, component: int) -> None: | |
group_dependencies[new_name_parent].append(new_name_child) | ||
|
||
return group_to_seq, group_dependencies | ||
|
||
|
||
def group_by_namespace( | ||
pipeline: Pipeline, | ||
) -> tuple[dict[str, list[Node]], dict[str, list[str]]]: | ||
""" | ||
Groups nodes based on their namespace using Pipeline's grouped_nodes_by_namespace property. | ||
Non-namespaced nodes are assigned to a default namespace. | ||
""" | ||
nodes_by_namespace: dict[str, list[Node]] = {} | ||
dependencies_by_namespace: dict[str, list[str]] = {} | ||
|
||
grouped_nodes = pipeline.grouped_nodes_by_namespace | ||
|
||
node_to_namespace = {} | ||
|
||
for group_name, group_info in grouped_nodes.items(): | ||
ns = group_info["name"] if group_info["type"] == "namespace" else "__default__" | ||
nodes_by_namespace.setdefault(ns, []) | ||
dependencies_by_namespace.setdefault(ns, []) | ||
nodes_by_namespace[ns].extend(group_info["nodes"]) | ||
for node in group_info["nodes"]: | ||
node_to_namespace[node] = ns | ||
|
||
for ns, nodes in nodes_by_namespace.items(): | ||
dependent_namespaces = set() | ||
for node in nodes: | ||
for parent in pipeline.node_dependencies.get(node, []): | ||
parent_ns = node_to_namespace.get(parent, "__default__") | ||
if parent_ns != ns: | ||
dependent_namespaces.add(parent_ns) | ||
dependencies_by_namespace[ns] = list(dependent_namespaces) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need to do this? |
||
|
||
return nodes_by_namespace, dependencies_by_namespace |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -121,6 +121,13 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str) | |
help="Group nodes with at least one MemoryDataset as input/output together, " | ||
"as they do not persist between Airflow operators.", | ||
) | ||
@click.option( | ||
"-gn", | ||
"--group-by-namespace", | ||
is_flag=True, | ||
default=False, | ||
help="Groups nodes based on their namespace using Kedro's grouped_nodes_by_namespace property.", | ||
) | ||
@click.option( | ||
"--tags", | ||
type=str, | ||
|
@@ -149,6 +156,7 @@ def create( # noqa: PLR0913, PLR0912 | |
target_path, | ||
jinja_file, | ||
group_in_memory, | ||
group_by_namespace, | ||
tags, | ||
params, | ||
conf_source, | ||
|
@@ -218,6 +226,8 @@ def create( # noqa: PLR0913, PLR0912 | |
# topological sort order obtained from pipeline.nodes, see group_memory_nodes() | ||
# implementation | ||
nodes, dependencies = group_memory_nodes(context.catalog, pipeline) | ||
elif group_by_namespace: | ||
nodes, dependencies = group_by_namespace(pipeline) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this call the method that @ankatiyar added to the framework? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh I see now. I guess I didn't expect there still be so much code needed to process the result from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get this error: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah good call, I was trying to solve that. |
||
else: | ||
# To keep the order of nodes and dependencies deterministic - nodes are | ||
# iterated in the topological sort order obtained from pipeline.nodes and | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to set a default. Ideally it should be -
Which is how the
grouped_by_namespace
propertydict
also returns to. I think the issue with the default would be that it could create a somewhat circular dependencies with the execution. For example when pipeline1 is not namespaced i.e. is default, pipeline2 is namespaced and pipeline3 is not namespaced, the execution order would be impossible to determineThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a question that I had yeah, it didn't follow the behavior of the results that come from the grouped_nodes_by_namespace function but it made sense when I was writing it.