Skip to content

Commit

Permalink
upgrade all tasks to dpdispatcher (#749)
Browse files Browse the repository at this point in the history
* upgrade all tasks to dpdispatcher

This commit upgrades init_reaction and init_surf to use dpdispatcher

* fix method args

* fix typo

* change the variable name from `work_dir` to `work_path`
  • Loading branch information
njzjz authored Jun 9, 2022
1 parent c25cea3 commit 828024b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 22 deletions.
38 changes: 21 additions & 17 deletions dpgen/data/reaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
output: data
"""

import argparse
import warnings
import glob
import json
import os
import random

import dpdata
from dpgen import dlog
from dpgen.dispatcher.Dispatcher import make_dispatcher
from dpgen.dispatcher.Dispatcher import make_submission_compat
from dpgen.remote.decide_machine import convert_mdata
from dpgen.generator.run import create_path, make_fp_task_name
from dpgen.util import sepline

Expand Down Expand Up @@ -73,14 +74,15 @@ def make_lmp(jdata):
return lmp_string


def run_reaxff(jdata, mdata, dispatcher, log_file="reaxff_log"):
def run_reaxff(jdata, mdata, log_file="reaxff_log"):
work_path = reaxff_path
reaxff_command = "{} -in {}".format(mdata["reaxff_command"], lmp_path)
run_tasks = glob.glob(os.path.join(work_path, 'task.*'))
run_tasks.sort()
run_tasks = [os.path.basename(ii) for ii in run_tasks]

dispatcher.run_jobs(mdata['reaxff_resources'],
make_submission_compat(mdata['reaxff_machine'],
mdata['reaxff_resources'],
[reaxff_command],
work_path,
run_tasks,
Expand All @@ -89,7 +91,8 @@ def run_reaxff(jdata, mdata, dispatcher, log_file="reaxff_log"):
[ff_path, data_init_path, control_path, lmp_path],
[trj_path],
outlog=log_file,
errlog=log_file)
errlog=log_file,
api_version=mdata.get("api_version", "0.9"))


def link_trj(jdata):
Expand All @@ -102,7 +105,7 @@ def link_trj(jdata):
os.path.join(task_path, trj_path)))


def run_build_dataset(jdata, mdata, dispatcher, log_file="build_log"):
def run_build_dataset(jdata, mdata, log_file="build_log"):
work_path = build_path
build_command = "{cmd} -n {dataset_name} -a {type_map} -d {lammpstrj} -c {cutoff} -s {dataset_size} -k \"{qmkeywords}\" --nprocjob {nprocjob} --nproc {nproc}".format(
cmd=mdata["build_command"],
Expand All @@ -119,7 +122,8 @@ def run_build_dataset(jdata, mdata, dispatcher, log_file="build_log"):
run_tasks.sort()
run_tasks = [os.path.basename(ii) for ii in run_tasks]

dispatcher.run_jobs(mdata['build_resources'],
make_submission_compat(mdata['build_machine'],
mdata['build_resources'],
[build_command],
work_path,
run_tasks,
Expand All @@ -128,7 +132,8 @@ def run_build_dataset(jdata, mdata, dispatcher, log_file="build_log"):
[trj_path],
[f"dataset_{dataset_name}_gjf"],
outlog=log_file,
errlog=log_file)
errlog=log_file,
api_version=mdata.get("api_version", "0.9"))


def link_fp_input():
Expand All @@ -146,7 +151,6 @@ def link_fp_input():

def run_fp(jdata,
mdata,
dispatcher,
log_file="output",
forward_common_files=[]):
fp_command = mdata['fp_command']
Expand All @@ -162,7 +166,8 @@ def run_fp(jdata,

run_tasks = [os.path.basename(ii) for ii in fp_run_tasks]

dispatcher.run_jobs(mdata['fp_resources'],
make_submission_compat(mdata['fp_machine'],
mdata['fp_resources'],
[fp_command],
work_path,
run_tasks,
Expand All @@ -171,7 +176,8 @@ def run_fp(jdata,
["input"],
[log_file],
outlog=log_file,
errlog=log_file)
errlog=log_file,
api_version=mdata.get("api_version", "0.9"))


def convert_data(jdata):
Expand All @@ -198,6 +204,7 @@ def gen_init_reaction(args):
with open(args.MACHINE, "r") as fp:
mdata = json.load(fp)

mdata = convert_mdata(mdata, ["reaxff", "build", "fp"])
record = "record.reaction"
iter_rec = -1
numb_task = 7
Expand All @@ -213,18 +220,15 @@ def gen_init_reaction(args):
elif ii == 0:
link_reaxff(jdata)
elif ii == 1:
dispatcher = make_dispatcher(mdata["reaxff_machine"])
run_reaxff(jdata, mdata, dispatcher)
run_reaxff(jdata, mdata)
elif ii == 2:
link_trj(jdata)
elif ii == 3:
dispatcher = make_dispatcher(mdata["build_machine"])
run_build_dataset(jdata, mdata, dispatcher)
run_build_dataset(jdata, mdata)
elif ii == 4:
link_fp_input()
elif ii == 5:
dispatcher = make_dispatcher(mdata["fp_machine"])
run_fp(jdata, mdata, dispatcher)
run_fp(jdata, mdata)
elif ii == 6:
convert_data(jdata)
with open(record, "a") as frec:
Expand Down
11 changes: 6 additions & 5 deletions dpgen/data/surf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

import time
import warnings
import os,json,shutil,re,glob,argparse
import numpy as np
import subprocess as sp
Expand All @@ -12,7 +12,7 @@
from dpgen import dlog
from dpgen import ROOT_PATH
from dpgen.remote.decide_machine import convert_mdata
from dpgen.dispatcher.Dispatcher import Dispatcher, make_dispatcher
from dpgen.dispatcher.Dispatcher import make_submission_compat
#-----PMG---------
from pymatgen.io.vasp import Poscar
from pymatgen.core import Structure, Element
Expand Down Expand Up @@ -565,15 +565,16 @@ def run_vasp_relax(jdata, mdata):
run_tasks = [ii.replace(work_dir+"/", "") for ii in relax_run_tasks]

#dlog.info(run_tasks)
dispatcher = make_dispatcher(mdata['fp_machine'], mdata['fp_resources'], work_dir, run_tasks, fp_group_size)
dispatcher.run_jobs(fp_resources,
make_submission_compat(mdata['fp_machine'],
fp_resources,
[fp_command],
work_dir,
run_tasks,
fp_group_size,
forward_common_files,
forward_files,
backward_files)
backward_files,
api_version=mdata.get("api_version", "0.9"))

def gen_init_surf(args):
try:
Expand Down
79 changes: 79 additions & 0 deletions dpgen/dispatcher/Dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from distutils.version import LooseVersion
import os,sys,time,random,json,glob
import warnings
from typing import List
from dpdispatcher import Task, Submission, Resources, Machine
from dpgen.dispatcher.LocalContext import LocalSession
Expand Down Expand Up @@ -406,3 +407,81 @@ def mdata_arginfo() -> List[Argument]:
return [
command_arginfo, machine_arginfo, resources_arginfo,
]


def make_submission_compat(
machine: dict,
resources: dict,
commands: List[str],
work_path: str,
run_tasks: List[str],
group_size: int,
forward_common_files: List[str],
forward_files: List[str],
backward_files: List[str],
outlog: str="log",
errlog: str="err",
api_version: str="0.9",
) -> None:
"""Make submission with compatibility of both dispatcher API v0 and v1.
If `api_version` is less than 1.0, use `make_dispatcher`. If
`api_version` is large than 1.0, use `make_submission`.
Parameters
----------
machine : dict
machine dict
resources : dict
resource dict
commands : list[str]
list of commands
work_path : str
working directory
run_tasks : list[str]
list of paths to running tasks
group_size : int
group size
forward_common_files : list[str]
forwarded common files shared for all tasks
forward_files : list[str]
forwarded files for each task
backward_files : list[str]
backwarded files for each task
outlog : str, default=log
path to log from stdout
errlog : str, default=err
path to log from stderr
api_version : str, default=0.9
API version. 1.0 is recommended
"""
if LooseVersion(api_version) < LooseVersion('1.0'):
warnings.warn(f"the dpdispatcher will be updated to new version."
f"And the interface may be changed. Please check the documents for more details")
dispatcher = make_dispatcher(machine, resources, work_dir, run_tasks, group_size)
dispatcher.run_jobs(resources,
commands,
work_path,
run_tasks,
group_size,
forward_common_files,
forward_files,
backward_files,
outlog=outlog,
errlog=errlog)

elif LooseVersion(api_version) >= LooseVersion('1.0'):
submission = make_submission(
machine,
resources,
commands=commands,
work_path=work_path,
run_tasks=run_tasks,
group_size=group_size,
forward_common_files=forward_common_files,
forward_files=forward_files,
backward_files=backward_files,
outlog=outlog,
errlog=errlog)
submission.run_submission()

0 comments on commit 828024b

Please sign in to comment.