Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dpgen2/entrypoint/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def fp_args(inputs, run):
doc_inputs_config = "Configuration for preparing vasp inputs"
doc_run_config = "Configuration for running vasp tasks"
doc_task_max = "Maximum number of vasp tasks for each iteration"
doc_async_ratio = "Configuration ratio for async fp"
doc_extra_output_files = "Extra output file names, support wildcards"

return [
Expand All @@ -485,6 +486,13 @@ def fp_args(inputs, run):
doc=doc_run_config,
),
Argument("task_max", int, optional=True, default=10, doc=doc_task_max),
Argument(
"async_ratio",
float,
optional=True,
default=0.0,
doc=doc_async_ratio,
),
Argument(
"extra_output_files",
list,
Expand Down
9 changes: 9 additions & 0 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def make_concurrent_learning_op(
valid_data: Optional[S3Artifact] = None,
train_optional_files: Optional[List[str]] = None,
explore_config: Optional[dict] = None,
async_fp: bool = False,
):
if train_style in ("dp", "dp-dist"):
prep_run_train_op = PrepRunDPTrain(
Expand Down Expand Up @@ -268,6 +269,7 @@ def make_concurrent_learning_op(
select_confs_config=select_confs_config,
collect_data_config=collect_data_config,
upload_python_packages=upload_python_packages,
async_fp=async_fp,
)
# dpgen
dpgen_op = ConcurrentLearning(
Expand Down Expand Up @@ -308,6 +310,7 @@ def get_conf_filters(config):
def make_naive_exploration_scheduler_without_conf(config, explore_style):
model_devi_jobs = config["explore"]["stages"]
fp_task_max = config["fp"]["task_max"]
fp_async_ratio = config["fp"]["async_ratio"]
max_numb_iter = config["explore"]["max_numb_iter"]
fatal_at_max = config["explore"]["fatal_at_max"]
convergence = config["explore"]["convergence"]
Expand All @@ -325,6 +328,7 @@ def make_naive_exploration_scheduler_without_conf(config, explore_style):
report,
fp_task_max,
conf_filters,
fp_async_ratio,
)

for job_ in model_devi_jobs:
Expand Down Expand Up @@ -368,6 +372,7 @@ def make_lmp_naive_exploration_scheduler(config):
type_map = config["inputs"]["type_map"]
numb_models = config["train"]["numb_models"]
fp_task_max = config["fp"]["task_max"]
fp_async_ratio = config["fp"]["async_ratio"]
max_numb_iter = config["explore"]["max_numb_iter"]
fatal_at_max = config["explore"]["fatal_at_max"]
convergence = config["explore"]["convergence"]
Expand All @@ -385,6 +390,7 @@ def make_lmp_naive_exploration_scheduler(config):
report,
fp_task_max,
conf_filters,
fp_async_ratio,
)

sys_configs_lmp = []
Expand Down Expand Up @@ -490,6 +496,7 @@ def workflow_concurrent_learning(
cl_step_config = config["step_configs"]["cl_step_config"]
upload_python_packages = config.get("upload_python_packages", None)
train_optional_files = config["train"].get("optional_files", None)
fp_async_ratio = config["fp"]["async_ratio"]

if train_style == "dp":
init_models_paths = config["train"].get("init_models_paths", None)
Expand Down Expand Up @@ -556,6 +563,7 @@ def workflow_concurrent_learning(
valid_data=valid_data,
train_optional_files=train_optional_files,
explore_config=explore_config,
async_fp=(fp_async_ratio > 0),
)
scheduler = make_naive_exploration_scheduler(config)

Expand Down Expand Up @@ -664,6 +672,7 @@ def workflow_concurrent_learning(
"init_models": init_models,
"init_data": init_data,
"iter_data": iter_data,
"async_confs": upload_artifact([]),
},
)
return dpgen_step
Expand Down
2 changes: 1 addition & 1 deletion dpgen2/exploration/selector/conf_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ def select(
model_devis: Union[List[Path], List[HDF5Dataset]],
type_map: Optional[List[str]] = None,
optional_outputs: Optional[List[Path]] = None,
) -> Tuple[List[Path], ExplorationReport]:
) -> Tuple[List[Path], List[Path], ExplorationReport]:
pass
35 changes: 33 additions & 2 deletions dpgen2/exploration/selector/conf_selector_frame.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import math
import random
from collections import (
Counter,
)
Expand Down Expand Up @@ -48,19 +50,21 @@ def __init__(
report: ExplorationReport,
max_numb_sel: Optional[int] = None,
conf_filters: Optional[ConfFilters] = None,
async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
self.async_ratio = async_ratio

Comment on lines +53 to 60
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate async_ratio in constructor to prevent out-of-bounds selection and runtime errors

Negative or >1 ratios can lead to ValueError in random.sample and undefined split behavior. Guard early.

Apply this diff:

     def __init__(
         self,
         traj_render: TrajRender,
         report: ExplorationReport,
         max_numb_sel: Optional[int] = None,
         conf_filters: Optional[ConfFilters] = None,
-        async_ratio: float = 0.0,
+        async_ratio: float = 0.0,
     ):
         self.max_numb_sel = max_numb_sel
         self.conf_filters = conf_filters
         self.traj_render = traj_render
         self.report = report
-        self.async_ratio = async_ratio
+        # Validate [0, 1] to keep downstream splitting safe
+        if not (0.0 <= async_ratio <= 1.0):
+            raise ValueError(f"async_ratio must be within [0, 1], got {async_ratio}")
+        self.async_ratio = float(async_ratio)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
self.async_ratio = async_ratio
async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
# Validate [0, 1] to keep downstream splitting safe
if not (0.0 <= async_ratio <= 1.0):
raise ValueError(f"async_ratio must be within [0, 1], got {async_ratio}")
self.async_ratio = float(async_ratio)
🤖 Prompt for AI Agents
In dpgen2/exploration/selector/conf_selector_frame.py around lines 53 to 60,
validate the async_ratio constructor argument to ensure it is within [0.0, 1.0];
if async_ratio < 0.0 or async_ratio > 1.0 raise a ValueError with a clear
message indicating the invalid value and expected range so invalid ratios cannot
later cause random.sample or splitting errors.

def select(
self,
trajs: Union[List[Path], List[HDF5Dataset]],
model_devis: Union[List[Path], List[HDF5Dataset]],
type_map: Optional[List[str]] = None,
optional_outputs: Optional[List[Path]] = None,
) -> Tuple[List[Path], ExplorationReport]:
) -> Tuple[List[Path], List[Path], ExplorationReport]:
"""Select configurations

Parameters
Expand All @@ -81,6 +85,8 @@ def select(
-------
confs : List[Path]
The selected confgurations, stored in a folder in deepmd/npy format, can be parsed as dpdata.MultiSystems. The `list` only has one item.
async_confs : List[Path]
The selected confgurations for async fp, stored in a folder in deepmd/npy format, can be parsed as dpdata.MultiSystems. The `list` only has one item.
report : ExplorationReport
The exploration report recoding the status of the exploration.

Expand All @@ -102,8 +108,33 @@ def select(
optional_outputs,
)

async_confs = []
if self.async_ratio > 0:
async_ms, ms = split_multisystems(ms, self.async_ratio)
if len(async_ms) > 0:
async_out_path = Path("async_confs")
async_out_path.mkdir(exist_ok=True)
async_ms.to_deepmd_npy(async_out_path) # type: ignore
async_confs = [async_out_path]

out_path = Path("confs")
out_path.mkdir(exist_ok=True)
ms.to_deepmd_npy(out_path) # type: ignore

return [out_path], copy.deepcopy(self.report)
return [out_path], async_confs, copy.deepcopy(self.report)


def split_multisystems(ms, ratio):
selected_ms = dpdata.MultiSystems()
unselected_ms = dpdata.MultiSystems()
for s in ms:
nsel = math.floor(len(s) * ratio)
if random.random() < len(s) * ratio - nsel:
nsel += 1
selected_indices = random.sample(range(len(s)), nsel)
unselected_indices = list(set(range(len(s))).difference(selected_indices))
if len(selected_indices) > 0:
selected_ms.append(s.sub_system(selected_indices))
if len(unselected_indices) > 0:
unselected_ms.append(s.sub_system(unselected_indices))
return selected_ms, unselected_ms
5 changes: 5 additions & 0 deletions dpgen2/flow/dpgen_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def __init__(
"init_models": InputArtifact(optional=True),
"init_data": InputArtifact(),
"iter_data": InputArtifact(),
"async_confs": InputArtifact(optional=True),
}
self._output_parameters = {
"exploration_scheduler": OutputParameter(),
Expand Down Expand Up @@ -279,6 +280,7 @@ def __init__(
"init_models": InputArtifact(optional=True),
"init_data": InputArtifact(),
"iter_data": InputArtifact(),
"async_confs": InputArtifact(optional=True),
}
self._output_parameters = {
"exploration_scheduler": OutputParameter(),
Expand Down Expand Up @@ -376,6 +378,7 @@ def _loop(
"init_models": steps.inputs.artifacts["init_models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": steps.inputs.artifacts["iter_data"],
"async_confs": steps.inputs.artifacts["async_confs"],
},
key=step_keys["block"],
)
Expand Down Expand Up @@ -448,6 +451,7 @@ def _loop(
"init_models": block_step.outputs.artifacts["models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": block_step.outputs.artifacts["iter_data"],
"async_confs": block_step.outputs.artifacts["async_confs"],
},
when="%s == false" % (scheduler_step.outputs.parameters["converged"]),
)
Expand Down Expand Up @@ -552,6 +556,7 @@ def _dpgen(
"init_models": steps.inputs.artifacts["init_models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": steps.inputs.artifacts["iter_data"],
"async_confs": steps.inputs.artifacts["async_confs"],
},
key="--".join(["%s" % id_step.outputs.parameters["block_id"], loop_key]),
)
Expand Down
4 changes: 3 additions & 1 deletion dpgen2/op/collect_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def get_input_sign(cls):
default=CollectData.default_optional_parameter,
),
"labeled_data": Artifact(List[Path]),
"async_labeled_data": Artifact(List[Path], optional=True),
"iter_data": Artifact(List[Path]),
Comment on lines +53 to 54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard optional async_labeled_data to avoid KeyError; simplify concatenation

Accessing ip["async_labeled_data"] raises KeyError when the optional artifact is absent. Use .get() and default to an empty list. Then drop the inner “or []” in the concatenation.

Apply:

@@ class CollectData(OP):
-                "async_labeled_data": Artifact(List[Path], optional=True),
+                "async_labeled_data": Artifact(List[Path], optional=True),
@@ def execute(self, ip: OPIO) -> OPIO:
-        async_labeled_data = ip["async_labeled_data"]
+        async_labeled_data = ip.get("async_labeled_data") or []
@@ def execute(self, ip: OPIO) -> OPIO:
-        for ii in labeled_data + (async_labeled_data or []):
+        for ii in labeled_data + async_labeled_data:

Also applies to: 95-100

🤖 Prompt for AI Agents
In dpgen2/op/collect_data.py around lines 53-54 (and similarly lines 95-100),
the code indexes the optional artifact ip["async_labeled_data"] which raises
KeyError when absent; change those accesses to ip.get("async_labeled_data", [])
so missing keys yield an empty list, and simplify concatenations by removing the
inner "or []" (i.e., concatenate using ip.get("async_labeled_data", []) +
ip["iter_data"] or similar) to avoid redundant fallbacks.

}
)
Expand Down Expand Up @@ -91,10 +92,11 @@ def execute(
type_map = ip["type_map"]
mixed_type = ip["optional_parameter"]["mixed_type"]
labeled_data = ip["labeled_data"]
async_labeled_data = ip["async_labeled_data"]
iter_data = ip["iter_data"]

ms = dpdata.MultiSystems(type_map=type_map)
for ii in labeled_data:
for ii in labeled_data + (async_labeled_data or []):
if ii and len(list(ii.rglob("fparam.npy"))) > 0:
setup_ele_temp(False)
if ii and len(list(ii.rglob("aparam.npy"))) > 0:
Expand Down
4 changes: 3 additions & 1 deletion dpgen2/op/select_confs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def get_output_sign(cls):
{
"report": BigParameter(ExplorationReport),
"confs": Artifact(List[Path]),
"async_confs": Artifact(List[Path]),
}
)

Expand Down Expand Up @@ -88,7 +89,7 @@ def execute(
trajs, model_devis, optional_outputs
)

confs, report = conf_selector.select(
confs, async_confs, report = conf_selector.select(
trajs,
model_devis,
type_map=type_map,
Expand All @@ -99,6 +100,7 @@ def execute(
{
"report": report,
"confs": confs,
"async_confs": async_confs,
}
)

Expand Down
29 changes: 28 additions & 1 deletion dpgen2/superop/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(
select_confs_config: dict = normalize_step_dict({}),
collect_data_config: dict = normalize_step_dict({}),
upload_python_packages: Optional[List[os.PathLike]] = None,
async_fp: bool = False,
):
self._input_parameters = {
"block_id": InputParameter(),
Expand All @@ -115,6 +116,7 @@ def __init__(
"init_models": InputArtifact(optional=True),
"init_data": InputArtifact(),
"iter_data": InputArtifact(),
"async_confs": InputArtifact(optional=True),
}
self._output_parameters = {
"exploration_report": OutputParameter(),
Expand All @@ -123,6 +125,7 @@ def __init__(
"models": OutputArtifact(),
"iter_data": OutputArtifact(),
"trajs": OutputArtifact(),
"async_confs": OutputArtifact(),
}

super().__init__(
Expand Down Expand Up @@ -163,6 +166,7 @@ def __init__(
select_confs_config=select_confs_config,
collect_data_config=collect_data_config,
upload_python_packages=upload_python_packages,
async_fp=async_fp,
)

@property
Expand Down Expand Up @@ -198,6 +202,7 @@ def _block_cl(
select_confs_config: dict = normalize_step_dict({}),
collect_data_config: dict = normalize_step_dict({}),
upload_python_packages: Optional[List[os.PathLike]] = None,
async_fp: bool = False,
):
select_confs_config = deepcopy(select_confs_config)
collect_data_config = deepcopy(collect_data_config)
Expand Down Expand Up @@ -231,7 +236,25 @@ def _block_cl(
["%s" % block_steps.inputs.parameters["block_id"], "prep-run-train"]
),
)
block_steps.add(prep_run_dp_train)
if async_fp:
async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
block_steps.add(prep_run_dp_train)
async_labeled_data = None
Comment on lines +239 to +257
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Gate the async FP step when no async_confs are provided to avoid runtime failures

If async_fp is True but the block receives no async_confs (typical for the first iteration), PrepRunFp may receive a missing/empty artifact and fail. Use an Argo “when” guard leveraging argo_len so the step runs only when input exists.

Apply this diff:

-    if async_fp:
-        async_fp_step = Step(
+    if async_fp:
+        async_fp_step = Step(
             name=name + "-async-prep-run-fp",
             template=prep_run_fp_op,
             parameters={
                 "block_id": f"{block_steps.inputs.parameters['block_id']}-async",
                 "fp_config": block_steps.inputs.parameters["fp_config"],
                 "type_map": block_steps.inputs.parameters["type_map"],
             },
             artifacts={
                 "confs": block_steps.inputs.artifacts["async_confs"],
             },
+            # Run only when upstream provided async_confs
+            when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0,
             key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
         )
         block_steps.add([prep_run_dp_train, async_fp_step])
         async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
     else:

Follow-up: if you adopt the conf_selector change to return [] when confs are empty, consider adding a similar guard to the normal prep-run-fp step keyed by the length of select_confs.outputs.artifacts["confs"] to keep the DAG robust when the split is 100% async.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if async_fp:
async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
block_steps.add(prep_run_dp_train)
async_labeled_data = None
if async_fp:
async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
# Run only when upstream provided async_confs
when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0,
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
block_steps.add(prep_run_dp_train)
async_labeled_data = None
🤖 Prompt for AI Agents
In dpgen2/superop/block.py around lines 239-257, the async FP Step is created
and added unconditionally when async_fp is True which can cause PrepRunFp to
fail if the artifact async_confs is missing/empty; guard the async_fp_step with
an Argo "when" condition that checks
argo_len(block_steps.inputs.artifacts['async_confs']) > 0 so the step only
executes when async_confs exists and has items (apply the same condition to its
key/name logic as needed), and as a follow-up consider adding a similar
argo_len-based "when" guard to the normal prep-run-fp step using
select_confs.outputs.artifacts['confs'] to protect against 100% async splits.


prep_run_explore = Step(
name=name + "-prep-run-explore",
Expand Down Expand Up @@ -309,6 +332,7 @@ def _block_cl(
artifacts={
"iter_data": block_steps.inputs.artifacts["iter_data"],
"labeled_data": prep_run_fp.outputs.artifacts["labeled_data"],
"async_labeled_data": async_labeled_data,
},
key=step_keys["collect-data"],
executor=collect_data_executor,
Expand All @@ -328,5 +352,8 @@ def _block_cl(
block_steps.outputs.artifacts["trajs"]._from = prep_run_explore.outputs.artifacts[
"trajs"
]
block_steps.outputs.artifacts["async_confs"]._from = select_confs.outputs.artifacts[
"async_confs"
]

return block_steps
1 change: 1 addition & 0 deletions dpgen2/utils/dflow_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def matched_step_key(
re.match(f"iter-[0-9]*--{jj}-[0-9]*", kk)
or re.match(f"iter-[0-9]*--{jj}", kk)
or re.match(f"init--{jj}", kk)
or re.match(f"iter-[0-9]*-async--{jj}", kk)
):
ret.append(kk)
continue
Expand Down
Loading