-
Notifications
You must be signed in to change notification settings - Fork 35
[WIP] Async FP #308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[WIP] Async FP #308
Changes from all commits
03fd4fe
f63ba03
4339411
8e6b8c1
f8d6ca1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,7 @@ def get_input_sign(cls): | |
default=CollectData.default_optional_parameter, | ||
), | ||
"labeled_data": Artifact(List[Path]), | ||
"async_labeled_data": Artifact(List[Path], optional=True), | ||
"iter_data": Artifact(List[Path]), | ||
Comment on lines
+53
to
54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guard optional async_labeled_data to avoid KeyError; simplify concatenation Accessing ip["async_labeled_data"] raises KeyError when the optional artifact is absent. Use .get() and default to an empty list. Then drop the inner “or []” in the concatenation. Apply: @@ class CollectData(OP):
- "async_labeled_data": Artifact(List[Path], optional=True),
+ "async_labeled_data": Artifact(List[Path], optional=True),
@@ def execute(self, ip: OPIO) -> OPIO:
- async_labeled_data = ip["async_labeled_data"]
+ async_labeled_data = ip.get("async_labeled_data") or []
@@ def execute(self, ip: OPIO) -> OPIO:
- for ii in labeled_data + (async_labeled_data or []):
+ for ii in labeled_data + async_labeled_data: Also applies to: 95-100 🤖 Prompt for AI Agents
|
||
} | ||
) | ||
|
@@ -91,10 +92,11 @@ def execute( | |
type_map = ip["type_map"] | ||
mixed_type = ip["optional_parameter"]["mixed_type"] | ||
labeled_data = ip["labeled_data"] | ||
async_labeled_data = ip["async_labeled_data"] | ||
iter_data = ip["iter_data"] | ||
|
||
ms = dpdata.MultiSystems(type_map=type_map) | ||
for ii in labeled_data: | ||
for ii in labeled_data + (async_labeled_data or []): | ||
if ii and len(list(ii.rglob("fparam.npy"))) > 0: | ||
setup_ele_temp(False) | ||
if ii and len(list(ii.rglob("aparam.npy"))) > 0: | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -96,6 +96,7 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select_confs_config: dict = normalize_step_dict({}), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
collect_data_config: dict = normalize_step_dict({}), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
upload_python_packages: Optional[List[os.PathLike]] = None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async_fp: bool = False, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._input_parameters = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"block_id": InputParameter(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -115,6 +116,7 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"init_models": InputArtifact(optional=True), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"init_data": InputArtifact(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"iter_data": InputArtifact(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"async_confs": InputArtifact(optional=True), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self._output_parameters = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"exploration_report": OutputParameter(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -123,6 +125,7 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"models": OutputArtifact(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"iter_data": OutputArtifact(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"trajs": OutputArtifact(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"async_confs": OutputArtifact(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super().__init__( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -163,6 +166,7 @@ def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select_confs_config=select_confs_config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
collect_data_config=collect_data_config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
upload_python_packages=upload_python_packages, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async_fp=async_fp, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@property | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -198,6 +202,7 @@ def _block_cl( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select_confs_config: dict = normalize_step_dict({}), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
collect_data_config: dict = normalize_step_dict({}), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
upload_python_packages: Optional[List[os.PathLike]] = None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async_fp: bool = False, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select_confs_config = deepcopy(select_confs_config) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
collect_data_config = deepcopy(collect_data_config) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -231,7 +236,25 @@ def _block_cl( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
["%s" % block_steps.inputs.parameters["block_id"], "prep-run-train"] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_steps.add(prep_run_dp_train) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if async_fp: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async_fp_step = Step( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
name=name + "-async-prep-run-fp", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
template=prep_run_fp_op, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parameters={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"block_id": f"{block_steps.inputs.parameters['block_id']}-async", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"fp_config": block_steps.inputs.parameters["fp_config"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"type_map": block_steps.inputs.parameters["type_map"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
artifacts={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"confs": block_steps.inputs.artifacts["async_confs"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_steps.add([prep_run_dp_train, async_fp_step]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_steps.add(prep_run_dp_train) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async_labeled_data = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+239
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Gate the async FP step when no async_confs are provided to avoid runtime failures If async_fp is True but the block receives no async_confs (typical for the first iteration), PrepRunFp may receive a missing/empty artifact and fail. Use an Argo “when” guard leveraging argo_len so the step runs only when input exists. Apply this diff: - if async_fp:
- async_fp_step = Step(
+ if async_fp:
+ async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
+ # Run only when upstream provided async_confs
+ when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0,
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else: Follow-up: if you adopt the conf_selector change to return [] when confs are empty, consider adding a similar guard to the normal prep-run-fp step keyed by the length of select_confs.outputs.artifacts["confs"] to keep the DAG robust when the split is 100% async. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
prep_run_explore = Step( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
name=name + "-prep-run-explore", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -309,6 +332,7 @@ def _block_cl( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
artifacts={ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"iter_data": block_steps.inputs.artifacts["iter_data"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"labeled_data": prep_run_fp.outputs.artifacts["labeled_data"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"async_labeled_data": async_labeled_data, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
key=step_keys["collect-data"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
executor=collect_data_executor, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -328,5 +352,8 @@ def _block_cl( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_steps.outputs.artifacts["trajs"]._from = prep_run_explore.outputs.artifacts[ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"trajs" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
block_steps.outputs.artifacts["async_confs"]._from = select_confs.outputs.artifacts[ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"async_confs" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return block_steps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate async_ratio in constructor to prevent out-of-bounds selection and runtime errors
Negative or >1 ratios can lead to ValueError in random.sample and undefined split behavior. Guard early.
Apply this diff:
📝 Committable suggestion
🤖 Prompt for AI Agents