Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel processing to MoleculeCSVDataset using joblib #67

Merged
merged 3 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions python/dgllife/data/csv_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os

from dgl.data.utils import save_graphs, load_graphs
from dgllife.utils import pmap

__all__ = ['MoleculeCSVDataset']

Expand Down Expand Up @@ -51,9 +52,13 @@ class MoleculeCSVDataset(object):
Print a message every time ``log_every`` molecules are processed. Default to 1000.
init_mask : bool
Whether to initialize a binary mask indicating the existence of labels. Default to True.
n_jobs : int
Degree of parallelism for pre processing. Uses joblib backend. Default to 1.
Should not be greater than num_cpus for efficiency.
"""
def __init__(self, df, smiles_to_graph, node_featurizer, edge_featurizer, smiles_column,
cache_file_path, task_names=None, load=True, log_every=1000, init_mask=True):
cache_file_path, task_names=None, load=True, log_every=1000, init_mask=True,
n_jobs=1):
self.df = df
self.smiles = self.df[smiles_column].tolist()
if task_names is None:
Expand All @@ -63,10 +68,10 @@ def __init__(self, df, smiles_to_graph, node_featurizer, edge_featurizer, smiles
self.n_tasks = len(self.task_names)
self.cache_file_path = cache_file_path
self._pre_process(smiles_to_graph, node_featurizer, edge_featurizer,
load, log_every, init_mask)
load, log_every, init_mask, n_jobs)

def _pre_process(self, smiles_to_graph, node_featurizer,
edge_featurizer, load, log_every, init_mask):
edge_featurizer, load, log_every, init_mask, n_jobs=1):
"""Pre-process the dataset

* Convert molecules from smiles format into DGLGraphs
Expand All @@ -92,6 +97,8 @@ def _pre_process(self, smiles_to_graph, node_featurizer,
Print a message every time ``log_every`` molecules are processed.
init_mask : bool
Whether to initialize a binary mask indicating the existence of labels.
n_jobs : int
Degree of parallelism for pre processing.
"""
if os.path.exists(self.cache_file_path) and load:
# DGLGraphs have been constructed before, reload them
Expand All @@ -103,11 +110,17 @@ def _pre_process(self, smiles_to_graph, node_featurizer,
else:
print('Processing dgl graphs from scratch...')
self.graphs = []
for i, s in enumerate(self.smiles):
if (i + 1) % log_every == 0:
print('Processing molecule {:d}/{:d}'.format(i+1, len(self)))
self.graphs.append(smiles_to_graph(s, node_featurizer=node_featurizer,
edge_featurizer=edge_featurizer))
if n_jobs > 1:
self.graphs = pmap(smiles_to_graph,
self.smiles,
node_featurizer=node_featurizer,
edge_featurizer=edge_featurizer)
else:
for i, s in enumerate(self.smiles):
if (i + 1) % log_every == 0:
print('Processing molecule {:d}/{:d}'.format(i+1, len(self)))
self.graphs.append(smiles_to_graph(s, node_featurizer=node_featurizer,
edge_featurizer=edge_featurizer))
_label_values = self.df[self.task_names].values
# np.nan_to_num will also turn inf into a very large number
self.labels = F.zerocopy_from_numpy(np.nan_to_num(_label_values).astype(np.float32))
Expand Down
20 changes: 19 additions & 1 deletion python/dgllife/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,29 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from joblib import Parallel, delayed, cpu_count

from .analysis import *
from .complex_to_graph import *
from .early_stop import *
from .eval import *
from .featurizers import *
from .mol_to_graph import *
from .io import *
from .mol_to_graph import *
from .splitters import *


def pmap(pickleable_fn, data, n_jobs=cpu_count() - 1, verbose=1, **kwargs):
"""
Parallel map using joblib.

:param pickleable_fn: Fn to map over data.
:param data: Data to be mapped over.
:param n_jobs: CPU parallelism, uses 1 less than number detected by default.
:param verbose: Job logging verbosity, set to 0 to silence.
:param kwargs: Additional args for f
:return: Mapped output.
"""
return Parallel(n_jobs=n_jobs, verbose=verbose)(
delayed(pickleable_fn)(d, **kwargs) for d in data
)
4 changes: 3 additions & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def get_lib_path():
'numpy>=1.14.0',
'scipy>=1.1.0',
'networkx>=2.1',
'hyperopt'
'hyperopt',
'dgl',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recommend putting "dgl" here. The reason is that there are different variants of "dgl" for CPU/CUDA 9.0/CUDA 10.0, etc and it's better to let people install the version they want themselves.

'joblib'
],
url='https://github.com/awslabs/dgl-lifesci',
classifiers=[
Expand Down