Skip to content

Commit

Permalink
reduce loop
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Mar 19, 2024
1 parent f602dac commit d01e4c6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 41 deletions.
58 changes: 25 additions & 33 deletions pyiron_base/jobs/datamining.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import List, Tuple

from pyiron_base.utils.deprecate import deprecate
from pyiron_base.jobs.job.generic import GenericJob
from pyiron_base.jobs.job.generic import GenericJob, _get_executor
from pyiron_base.jobs.job.extension import jobstatus
from pyiron_base.storage.hdfio import FileHDFio
from pyiron_base.jobs.master.generic import get_function_from_string
Expand Down Expand Up @@ -241,7 +241,7 @@ def _get_new_functions(self, file: FileHDFio) -> Tuple[List, List]:
new_system_functions = []
return new_user_functions, new_system_functions

def create_table(self, file, job_status_list, executor=None, enforce_update=False):
def create_table(self, file, job_status_list, executor_type=None, cores=None, enforce_update=False):
"""
Create or update the table.
Expand Down Expand Up @@ -285,7 +285,8 @@ def create_table(self, file, job_status_list, executor=None, enforce_update=Fals
df_new_keys = self._iterate_over_job_lst(
job_id_lst=self._get_job_ids(),
function_lst=function_lst,
executor=executor,
executor_type=executor_type,
cores=cores,
)
if len(df_new_keys) > 0:
self._df = pandas.concat([self._df, df_new_keys], axis="columns")
Expand All @@ -300,7 +301,8 @@ def create_table(self, file, job_status_list, executor=None, enforce_update=Fals
df_new_ids = self._iterate_over_job_lst(
job_id_lst=new_jobs,
function_lst=self.add._function_lst,
executor=executor,
executor_type=executor_type,
cores=cores,
)
if len(df_new_ids) > 0:
self._df = pandas.concat([self._df, df_new_ids], ignore_index=True)
Expand Down Expand Up @@ -366,7 +368,8 @@ def _iterate_over_job_lst(
self,
job_id_lst: List,
function_lst: List,
executor: concurrent.futures.Executor = None,
executor_type: concurrent.futures.Executor = None,
cores: int = None,
) -> List[dict]:
"""
Apply functions to job.
Expand All @@ -390,15 +393,16 @@ def _iterate_over_job_lst(
]
for job_id in job_id_lst
]
if executor is not None:
future_lst = []
print("start loop")
for job_tuple in job_to_analyse_lst:
print(job_tuple)
future_lst.append(executor.submit(_apply_list_of_functions_on_job, job_tuple))
print("wait for futures")
diff_dict_lst = [f.result() for f in future_lst]
print("futures done")
if executor_type is not None:
with _get_executor(executor_type=executor_type, max_workers=cores) as exe:
future_lst = []
print("start loop")
for job_tuple in job_to_analyse_lst:
print(job_tuple)
future_lst.append(exe.submit(_apply_list_of_functions_on_job, job_tuple))
print("wait for futures")
diff_dict_lst = [f.result() for f in future_lst]
print("futures done")
else:
diff_dict_lst = list(
tqdm(
Expand Down Expand Up @@ -810,25 +814,13 @@ def update_table(self, job_status_list=None):
with self.project_hdf5.open("input") as hdf5_input:
if self._executor_type is None and self.server.cores > 1:
self._executor_type = "pympipool.mpi.executor.PyMPIExecutor"
if self._executor_type is not None:
print("Before executor")
with self._get_executor(max_workers=self.server.cores) as exe:
print("Before function")
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=exe,
)
print("After function")
print("After executor")
else:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=None,
)
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor_type=self._executor_type,
cores=self.server.cores,
)
self.to_hdf()
self._pyiron_table._df.to_csv(
os.path.join(self.working_directory, "pyirontable.csv"), index=False
Expand Down
20 changes: 12 additions & 8 deletions pyiron_base/jobs/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,14 +1530,7 @@ def _reload_update_master(self, project, master_id):
del self

def _get_executor(self, max_workers=None):
if self._executor_type is None:
raise ValueError(
"No executor type defined - Please set self.executor_type."
)
elif isinstance(self._executor_type, str):
return import_class(self._executor_type)(max_workers=max_workers)
else:
raise TypeError("The self.executor_type has to be a string.")
return _get_executor(executor_type=self.executor_type, max_workers=max_worker)


class GenericError(object):
Expand All @@ -1564,3 +1557,14 @@ def _print_error(self, file_name, string="", print_yes=True):
return ""
elif print_yes:
return string.join(self._job[file_name])


def _get_executor(executor_type, max_workers=None):
if executor_type is None:
raise ValueError(
"No executor type defined - Please set self.executor_type."
)
elif isinstance(executor_type, str):
return import_class(executor_type)(max_workers=max_workers)
else:
raise TypeError("The self.executor_type has to be a string.")

0 comments on commit d01e4c6

Please sign in to comment.