Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor runtime DAG APIs and add methods to return task pathspecs #2321

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 111 additions & 92 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,142 +1191,161 @@ def _iter_filter(self, x):
# exclude private data artifacts
return x.id[0] != "_"

def _iter_matching_tasks(self, steps, metadata_key, metadata_pattern):
def _get_matching_pathspecs(self, steps, metadata_key, metadata_pattern):
"""
Yield tasks from specified steps matching a foreach path pattern.
Yield pathspecs of tasks from specified steps that match a given metadata pattern.

Parameters
----------
steps : List[str]
List of step names to search for tasks
pattern : str
Regex pattern to match foreach-indices metadata
steps : List[Step]
List of Step objects to search for tasks.
metadata_key : str
Metadata key to filter tasks on (e.g., 'foreach-execution-path').
metadata_pattern : str
Regular expression pattern to match against the metadata value.

Returns
-------
Iterator[Task]
Tasks matching the foreach path pattern
Yields
------
str
Pathspec of each task whose metadata value for the specified key matches the pattern.
"""
flow_id, run_id, _, _ = self.path_components

for step in steps:
task_pathspecs = self._metaflow.metadata.filter_tasks_by_metadata(
flow_id, run_id, step.id, metadata_key, metadata_pattern
)
for task_pathspec in task_pathspecs:
yield Task(pathspec=task_pathspec, _namespace_check=False)
yield task_pathspec

@property
def parent_tasks(self) -> Iterator["Task"]:
def parent_task_pathspecs(self) -> Iterator[str]:
"""
Yields all parent tasks of the current task if one exists.
Yields pathspecs of all parent tasks of the current task.

Yields
------
Task
Parent task of the current task

str
Pathspec of the parent task of the current task
"""
flow_id, run_id, _, _ = self.path_components

steps = list(self.parent.parent_steps)
if not steps:
return []

current_path = self.metadata_dict.get("foreach-execution-path", "")

if len(steps) > 1:
# Static join - use exact path matching
pattern = current_path or ".*"
yield from self._iter_matching_tasks(
steps, "foreach-execution-path", pattern
)
return

# Handle single step case
target_task = Step(
f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False
).task
target_path = target_task.metadata_dict.get("foreach-execution-path")

if not target_path or not current_path:
# (Current task, "A:10") and (Parent task, "")
# Pattern: ".*"
pattern = ".*"
else:
current_depth = len(current_path.split(","))
target_depth = len(target_path.split(","))

if current_depth < target_depth:
# Foreach join
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13,C:21")
# Pattern: "A:10,B:13,.*"
pattern = f"{current_path},.*"
if not steps:
return # No parent steps, yield nothing
# Handle single step case
target_task = Step(
f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False
).task
target_path = target_task.metadata_dict.get("foreach-execution-path")

if not target_path or not current_path:
# (Current task, "A:10") and (Parent task, "")
# Pattern: ".*"
pattern = ".*"
else:
# Foreach split or linear step
# Option 1:
# (Current task, "A:10,B:13,C:21") and (Parent task, "A:10,B:13")
# Option 2:
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13")
# Pattern: "A:10,B:13"
pattern = ",".join(current_path.split(",")[:target_depth])

yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern)
current_depth = len(current_path.split(","))
target_depth = len(target_path.split(","))

if current_depth < target_depth:
# Foreach join
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13,C:21")
# Pattern: "A:10,B:13,.*"
pattern = f"{current_path},.*"
else:
# Foreach split or linear step
# Option 1:
# (Current task, "A:10,B:13,C:21") and (Parent task, "A:10,B:13")
# Option 2:
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13")
# Pattern: "A:10,B:13"
pattern = ",".join(current_path.split(",")[:target_depth])

metadata_key = "foreach-execution-path"
for pathspec in self._get_matching_pathspecs(steps, metadata_key, pattern):
yield pathspec

@property
def child_tasks(self) -> Iterator["Task"]:
def child_task_pathspecs(self) -> Iterator[str]:
"""
Yield all child tasks of the current task if one exists.
Yields pathspecs of all child tasks of the current task.

Yields
------
Task
Child task of the current task
str
Pathspec of the child task of the current task
"""
flow_id, run_id, _, _ = self.path_components
steps = list(self.parent.child_steps)
if not steps:
return []

current_path = self.metadata_dict.get("foreach-execution-path", "")

if len(steps) > 1:
# Static split - use exact path matching
pattern = current_path or ".*"
yield from self._iter_matching_tasks(
steps, "foreach-execution-path", pattern
)
return

# Handle single step case
target_task = Step(
f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False
).task
target_path = target_task.metadata_dict.get("foreach-execution-path")

if not target_path or not current_path:
# (Current task, "A:10") and (Child task, "")
# Pattern: ".*"
pattern = ".*"
else:
current_depth = len(current_path.split(","))
target_depth = len(target_path.split(","))

if current_depth < target_depth:
# Foreach split
# (Current task, "A:10,B:13") and (Child task, "A:10,B:13,C:21")
# Pattern: "A:10,B:13,.*"
pattern = f"{current_path},.*"
if not steps:
return # No child steps, yield nothing
# Handle single step case
target_task = Step(
f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False
).task
target_path = target_task.metadata_dict.get("foreach-execution-path")

if not target_path or not current_path:
# (Current task, "A:10") and (Child task, "")
# Pattern: ".*"
pattern = ".*"
else:
# Foreach join or linear step
# Option 1:
# (Current task, "A:10,B:13,C:21") and (Child task, "A:10,B:13")
# Option 2:
# (Current task, "A:10,B:13") and (Child task, "A:10,B:13")
# Pattern: "A:10,B:13"
pattern = ",".join(current_path.split(",")[:target_depth])

yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern)
current_depth = len(current_path.split(","))
target_depth = len(target_path.split(","))

if current_depth < target_depth:
# Foreach split
# (Current task, "A:10,B:13") and (Child task, "A:10,B:13,C:21")
# Pattern: "A:10,B:13,.*"
pattern = f"{current_path},.*"
else:
# Foreach join or linear step
# Option 1:
# (Current task, "A:10,B:13,C:21") and (Child task, "A:10,B:13")
# Option 2:
# (Current task, "A:10,B:13") and (Child task, "A:10,B:13")
# Pattern: "A:10,B:13"
pattern = ",".join(current_path.split(",")[:target_depth])

metadata_key = "foreach-execution-path"
for pathspec in self._get_matching_pathspecs(steps, metadata_key, pattern):
yield pathspec

@property
def parent_tasks(self) -> Iterator["Task"]:
"""
Yields all parent tasks of the current task if one exists.

Yields
------
Task
Parent task of the current task
"""
for pathspec in self.parent_task_pathspecs:
yield Task(pathspec=pathspec, _namespace_check=False)

@property
def child_tasks(self) -> Iterator["Task"]:
"""
Yields all child tasks of the current task if one exists.

Yields
------
Task
Child task of the current task
"""
for pathspec in self.child_task_pathspecs:
yield Task(pathspec=pathspec, _namespace_check=False)

@property
def metadata(self) -> List[Metadata]:
Expand Down
10 changes: 9 additions & 1 deletion test/core/tests/runtime_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ def _equals_task(task1, task2):
for name, value in type(task1).__dict__.items()
if isinstance(value, property)
if name
not in ["parent_tasks", "child_tasks", "metadata", "data", "artifacts"]
not in [
"parent_tasks",
"parent_task_pathspecs",
"child_tasks",
"child_task_pathspecs",
"metadata",
"data",
"artifacts",
]
]

for prop_name in properties:
Expand Down
Loading