From b044724db710dabf5bea23195b599eab3ba46bb3 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 22 May 2018 14:35:02 +0800 Subject: [PATCH 01/32] update fluid Train API param_path to checkpoint_config --- python/paddle/fluid/trainer.py | 50 +++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 7da123dd92ed9..01c40bb90e464 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -27,11 +27,8 @@ from transpiler import distribute_transpiler __all__ = [ - 'Trainer', - 'BeginEpochEvent', - 'EndEpochEvent', - 'BeginStepEvent', - 'EndStepEvent', + 'Trainer', 'BeginEpochEvent', 'EndEpochEvent', 'BeginStepEvent', + 'EndStepEvent', 'CheckpointConfig' ] @@ -59,6 +56,17 @@ def __init__(self, epoch_id, step_id, metrics): self.metrics = metrics +class CheckpointConfig(object): + def __init__(self, + checkpoint_dir=None, + max_num_checkpoints=3, + save_interval_secs=600): + if checkpoint_dir is None: + self.checkpoint_dir = os.getcwd() + self.max_num_checkpoints = max_num_checkpoints + self.save_interval_secs = save_interval_secs + + def check_and_get_place(place): """ Check the type of place or get the default place @@ -97,9 +105,9 @@ class Trainer(object): def __init__(self, train_func, optimizer, - param_path=None, place=None, - parallel=False): + parallel=False, + checkpoint_config=None): self.__stop = False self.parallel = parallel # 1. we need to generate a framework.Program by calling @@ -108,6 +116,16 @@ def __init__(self, if not isinstance(optimizer, opt_module.Optimizer): raise TypeError("The optimizer should be an instance of Optimizer") + # config for checkpoint + # only chief worker will save variables + self.chief = True + self.checkpoint = checkpoint_config + if self.checkpoint and not isinstance(self.checkpoint, + CheckpointConfig): + raise TypeError( + "The checkpoint_config shoule be an instance of CheckpointConfig" + ) + self.scope = core.Scope() self.startup_program = framework.Program() @@ -136,9 +154,10 @@ def __init__(self, exe = executor.Executor(place) exe.run(self.startup_program) - if param_path: - # load params from param_path into scope - io.load_persistables(exe, dirname=param_path) + if self.checkpoint: + exe = executor.Executor(place) + io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, + self.startup_program) def _transpile_nccl2_dist(self): # PADDLE_TRAINER_IPS @@ -146,6 +165,7 @@ def _transpile_nccl2_dist(self): self.nccl_id_var = None else: self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + self.chief = self.trainer_id == 0 port = os.getenv("PADDLE_PSERVER_PORT") worker_ips = os.getenv("PADDLE_TRAINER_IPS") worker_endpoints = [] @@ -194,6 +214,7 @@ def _dist_transpile_if_necessary(self, optimize_ops, params_grads): # the unique trainer id, starting from 0, needed by trainer # only trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self.chief = self.trainer_id == 0 # the role, should be either PSERVER or TRAINER training_role = os.getenv("PADDLE_TRAINING_ROLE") with self._prog_and_scope_guard(): @@ -263,6 +284,14 @@ def save_params(self, param_path): exe = executor.Executor(self.place) io.save_persistables(exe, dirname=param_path) + def _save_checkpoint(self): + if self.checkpoint and self.chief: + exe = executor.Executor(self.place) + io.save_checkpoint(exe, self.checkpoint.checkpoint_dir, + self.checkpoint.max_num_checkpoints, + self.checkpoint.save_interval_secs, + self.train_program) + @contextlib.contextmanager def _prog_and_scope_guard(self): with framework.program_guard( @@ -309,6 +338,7 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): else: metrics = exe.run(feed=data, fetch_list=[]) event_handler(EndStepEvent(epoch_id, step_id, metrics)) + self._save_checkpoint() event_handler(EndEpochEvent(epoch_id)) def _test_by_executor(self, reader, feed_order, fetch_list): From dca0b6d9ccc5b770e78a0903839f2ed89d79be58 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 23 May 2018 19:50:25 +0800 Subject: [PATCH 02/32] restore param_path --- python/paddle/fluid/trainer.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 01c40bb90e464..24254b4980c13 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -105,6 +105,7 @@ class Trainer(object): def __init__(self, train_func, optimizer, + param_path=None, place=None, parallel=False, checkpoint_config=None): @@ -120,8 +121,8 @@ def __init__(self, # only chief worker will save variables self.chief = True self.checkpoint = checkpoint_config - if self.checkpoint and not isinstance(self.checkpoint, - CheckpointConfig): + if self.checkpoint and \ + not isinstance(self.checkpoint, CheckpointConfig): raise TypeError( "The checkpoint_config shoule be an instance of CheckpointConfig" ) @@ -159,6 +160,10 @@ def __init__(self, io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, self.startup_program) + if param_path: + # load params from param_path into scope + io.load_persistables(exe, dirname=param_path) + def _transpile_nccl2_dist(self): # PADDLE_TRAINER_IPS if "PADDLE_TRAINER_IPS" not in os.environ: From 514b2427edbd30013ca1783769af18fb96ffb626 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Mon, 28 May 2018 20:08:23 +0800 Subject: [PATCH 03/32] add save/load persist_vars_without_grad --- python/paddle/fluid/io.py | 46 +++++++++++++++++++++++----------- python/paddle/fluid/trainer.py | 3 ++- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 8e58e5eb794e1..f626039363a68 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -24,7 +24,8 @@ 'save_vars', 'save_params', 'save_persistables', 'load_vars', 'load_params', 'load_persistables', 'save_inference_model', 'load_inference_model', 'get_inference_program', 'save_checkpoint', 'load_checkpoint', - 'clean_checkpoint' + 'clean_checkpoint', 'load_persist_vars_without_grad', + 'save_persist_vars_without_grad' ] @@ -455,6 +456,33 @@ def get_parameter_value_by_name(name, executor, program=None): return get_parameter_value(var, executor) +def load_persist_vars_without_grad(executor, dirname, program): + """ + load_persist_vars_without_grad will load variables from a directory by an executor, + the variable named end with "@GRAD" will not be loaded. + """ + load_vars( + executor, + dirname=dirname, + main_program=program, + predicate=_is_checkpoint_var, + filename=None) + + +def save_persist_vars_without_grad(executor, dirname, program): + """ + save_persist_vars_without_grad will save variables to a directory by an executor, + the variable named end with "@GRAD" will not be saved. + """ + save_vars( + executor, + dirname=dirname, + main_program=program, + vars=None, + predicate=_is_checkpoint_var, + filename=None) + + SUCCESS_MARK_FILENAME = "_SUCCESS" CHECKPOINT_PREFIX = "checkpoint" CHECKPOINT_SEPARATOR = "_" @@ -491,13 +519,7 @@ def save_checkpoint(executor, serial += 1 cur_dir = _get_serial_dir(serial, checkpoint_dir) - save_vars( - executor, - dirname=cur_dir, - main_program=main_program, - vars=None, - predicate=_is_checkpoint_var, - filename=None) + load_persist_vars_without_grad(executor, cur_dir, main_program) _write_success(cur_dir) _lru_delete(checkpoint_dir, max_num_checkpoints) @@ -521,13 +543,7 @@ def load_checkpoint(executor, checkpoint_dir=None, main_program=None): return cur_dir = _get_serial_dir(serial, checkpoint_dir) - - load_vars( - executor, - dirname=cur_dir, - main_program=main_program, - predicate=_is_checkpoint_var, - filename=None) + load_persist_vars_without_grad(executor, cur_dir, main_program) def clean_checkpoint(checkpoint_dir, delete_dir=False): diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 24254b4980c13..b4b7b75b96e31 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -162,7 +162,8 @@ def __init__(self, if param_path: # load params from param_path into scope - io.load_persistables(exe, dirname=param_path) + io.load_persist_vars_without_grad( + exe, dirname=param_path, program=self.startup_program) def _transpile_nccl2_dist(self): # PADDLE_TRAINER_IPS From 5eea5db95fb6eaca2db9a0af63e871a9fc29c6bf Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 29 May 2018 14:37:59 +0800 Subject: [PATCH 04/32] optimized checkpoint and save_model --- python/paddle/fluid/__init__.py | 1 + python/paddle/fluid/io.py | 61 +++++++++++++++------------------ python/paddle/fluid/trainer.py | 40 +++++++++++++++------ 3 files changed, 58 insertions(+), 44 deletions(-) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 859605d005328..aece8fc149099 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -26,6 +26,7 @@ from trainer import EndEpochEvent from trainer import BeginStepEvent from trainer import EndStepEvent +from trainer import CheckpointConfig import inferencer from inferencer import Inferencer diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index f626039363a68..aa039bdfaa416 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -491,7 +491,6 @@ def save_persist_vars_without_grad(executor, dirname, program): def save_checkpoint(executor, checkpoint_dir=None, max_num_checkpoints=3, - save_interval_secs=600, main_program=None): """ Save Checkpoint will save persistable LodTensor variables from main_program in checkpoint directory, @@ -511,15 +510,10 @@ def save_checkpoint(executor, if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) - serial = _get_lastest_checkpoint_dir(checkpoint_dir) - if serial >= 0 and not _interval_secs_exceed( - _get_serial_dir(serial, checkpoint_dir), save_interval_secs): - return - - serial += 1 - cur_dir = _get_serial_dir(serial, checkpoint_dir) + serial = _get_lastest_checkpoint_dir(checkpoint_dir) + 1 + cur_dir = _get_serial_dir(checkpoint_dir, serial) - load_persist_vars_without_grad(executor, cur_dir, main_program) + save_persist_vars_without_grad(executor, cur_dir, main_program) _write_success(cur_dir) _lru_delete(checkpoint_dir, max_num_checkpoints) @@ -542,7 +536,7 @@ def load_checkpoint(executor, checkpoint_dir=None, main_program=None): if serial < 0: return - cur_dir = _get_serial_dir(serial, checkpoint_dir) + cur_dir = _get_serial_dir(checkpoint_dir, serial) load_persist_vars_without_grad(executor, cur_dir, main_program) @@ -559,11 +553,6 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): os.rmdir(checkpoint_dir) -def _get_serial_dir(serial, checkpoint_dir): - serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial) - return os.path.join(checkpoint_dir, serial_folder) - - def _is_checkpoint_var(var): """ the checkpoint will not save or load all the variables. @@ -582,29 +571,37 @@ def _is_checkpoint_var(var): return var.persistable -def _interval_secs_exceed(dirname, save_interval_secs): - dir_time = os.path.getmtime(dirname) - if save_interval_secs > (time.time() - dir_time): - return False - return True +def _get_dir_serial(dirname): + _, serial = dirname.split(CHECKPOINT_SEPARATOR) + + serial_num = -1 + try: + serial_num = int(serial) + except ValueError: + serial_num = -1 + return serial_num + + +def _get_serial_dir(dirname, serial): + serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial) + return os.path.join(dirname, serial_folder) def _lru_delete(dirname, max_num_checkpoints=3): dirs = os.listdir(dirname) - serials = [] + serial_map = {} for serial in dirs: - try: - serials.append(int(serial)) - except ValueError: - continue + serial_num = _get_dir_serial(serial) + serial_map[serial_num] = serial - if len(serials) <= max_num_checkpoints: + if len(serial_map.keys()) <= max_num_checkpoints: return + serials = serial_map.keys() serials.sort(reverse=True) serials = serials[max_num_checkpoints:] for serial in serials: - cur_dir = os.path.join(dirname, str(serial)) + cur_dir = _get_serial_dir(dirname, serial) shutil.rmtree(cur_dir) @@ -633,20 +630,18 @@ def has_success(checkpoint_dir, cur_dir): """ is _SUCCESS in this dir """ - _, serial = cur_dir.split(CHECKPOINT_SEPARATOR) - try: - int(serial) - except ValueError: + serial = _get_dir_serial(cur_dir) + if serial == -1: return -1 if not os.path.isdir(os.path.join(checkpoint_dir, cur_dir)): return -1 success_path = os.path.join( - _get_serial_dir(serial, checkpoint_dir), SUCCESS_MARK_FILENAME) + _get_serial_dir(checkpoint_dir, serial), SUCCESS_MARK_FILENAME) if os.path.isfile(success_path): - return int(serial) + return serial if not os.path.isdir(checkpoint_dir): return -1 diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index b4b7b75b96e31..3cf96ac251132 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -60,11 +60,24 @@ class CheckpointConfig(object): def __init__(self, checkpoint_dir=None, max_num_checkpoints=3, - save_interval_secs=600): + epoch_interval=1, + step_interval=10): if checkpoint_dir is None: self.checkpoint_dir = os.getcwd() + else: + self.checkpoint_dir = checkpoint_dir + self.max_num_checkpoints = max_num_checkpoints - self.save_interval_secs = save_interval_secs + + if epoch_interval < 1: + self.epoch_interval = 1 + else: + self.epoch_interval = epoch_interval + + if step_interval < 1: + self.step_interval = 10 + else: + self.step_interval = step_interval def check_and_get_place(place): @@ -290,14 +303,6 @@ def save_params(self, param_path): exe = executor.Executor(self.place) io.save_persistables(exe, dirname=param_path) - def _save_checkpoint(self): - if self.checkpoint and self.chief: - exe = executor.Executor(self.place) - io.save_checkpoint(exe, self.checkpoint.checkpoint_dir, - self.checkpoint.max_num_checkpoints, - self.checkpoint.save_interval_secs, - self.train_program) - @contextlib.contextmanager def _prog_and_scope_guard(self): with framework.program_guard( @@ -343,8 +348,9 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): ]) else: metrics = exe.run(feed=data, fetch_list=[]) + event_handler(EndStepEvent(epoch_id, step_id, metrics)) - self._save_checkpoint() + self._save_checkpoint(epoch_id, step_id) event_handler(EndEpochEvent(epoch_id)) def _test_by_executor(self, reader, feed_order, fetch_list): @@ -384,6 +390,18 @@ def _get_or_create_parallel_executor(self): loss_name=self.train_func_outputs[0].name) return self._get_parallel_executor() + def _save_checkpoint(self, epoch_id, step_id): + if not self.checkpoint or not self.chief: + return + + if epoch_id % self.checkpoint.epoch_interval == 0 and step_id % self.checkpoint.step_interval == 0: + exe = executor.Executor(self.place) + io.save_checkpoint( + executor=exe, + checkpoint_dir=self.checkpoint.checkpoint_dir, + max_num_checkpoints=self.checkpoint.max_num_checkpoints, + main_program=self.train_program) + def build_feed_var_list(program, feed_order): if not isinstance(program, framework.Program): From 5f5d6a9dc7eaf2e1c5b069454497d11a28701ddb Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 29 May 2018 16:01:26 +0800 Subject: [PATCH 05/32] optimized checkpoint and save_model --- python/paddle/fluid/io.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index aa039bdfaa416..bd3c2e3d9a3bb 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -489,9 +489,9 @@ def save_persist_vars_without_grad(executor, dirname, program): def save_checkpoint(executor, - checkpoint_dir=None, - max_num_checkpoints=3, - main_program=None): + checkpoint_dir, + main_program=None, + max_num_checkpoints=3): """ Save Checkpoint will save persistable LodTensor variables from main_program in checkpoint directory, the directory named by serial number from 0 to (n -1), save_checkpoint use LRU strategy @@ -500,12 +500,11 @@ def save_checkpoint(executor, :param executor :param checkpoint_dir - :param max_num_checkpoints - :param save_interval_secs :param main_program + :param max_num_checkpoints """ if checkpoint_dir is None: - checkpoint_dir = os.getcwd() + raise ValueError("The values of 'checkpoint_dir' should not be None") if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) @@ -518,7 +517,7 @@ def save_checkpoint(executor, _lru_delete(checkpoint_dir, max_num_checkpoints) -def load_checkpoint(executor, checkpoint_dir=None, main_program=None): +def load_checkpoint(executor, checkpoint_dir, main_program=None): """ Load checkpoint from a directory by executor, it will find the most recent saved checkpoint file and load it auto. @@ -529,7 +528,7 @@ def load_checkpoint(executor, checkpoint_dir=None, main_program=None): """ if checkpoint_dir is None: - checkpoint_dir = os.getcwd() + raise ValueError("The values of 'checkpoint_dir' should not be None") serial = _get_lastest_checkpoint_dir(checkpoint_dir) @@ -546,7 +545,7 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): delete_dir only works when the directory is empty, otherwise, OSError is raised. """ if checkpoint_dir is None: - checkpoint_dir = os.getcwd() + raise ValueError("The values of 'checkpoint_dir' should not be None") _lru_delete(checkpoint_dir, max_num_checkpoints=0) if delete_dir and not os.listdir(checkpoint_dir): From ad9dfeb0180b40905d245354e733e750009cc173 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 29 May 2018 20:28:40 +0800 Subject: [PATCH 06/32] bug fix and optimize --- python/paddle/fluid/io.py | 153 +++++++++++++++++++++++++-------- python/paddle/fluid/trainer.py | 52 +++++++++-- 2 files changed, 162 insertions(+), 43 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index bd3c2e3d9a3bb..ed560304e25fd 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -456,40 +456,18 @@ def get_parameter_value_by_name(name, executor, program=None): return get_parameter_value(var, executor) -def load_persist_vars_without_grad(executor, dirname, program): - """ - load_persist_vars_without_grad will load variables from a directory by an executor, - the variable named end with "@GRAD" will not be loaded. - """ - load_vars( - executor, - dirname=dirname, - main_program=program, - predicate=_is_checkpoint_var, - filename=None) - - -def save_persist_vars_without_grad(executor, dirname, program): - """ - save_persist_vars_without_grad will save variables to a directory by an executor, - the variable named end with "@GRAD" will not be saved. - """ - save_vars( - executor, - dirname=dirname, - main_program=program, - vars=None, - predicate=_is_checkpoint_var, - filename=None) - - SUCCESS_MARK_FILENAME = "_SUCCESS" CHECKPOINT_PREFIX = "checkpoint" +MODEL_DIR = "__model__" +TRAINER_PREFIX = "trainer" CHECKPOINT_SEPARATOR = "_" def save_checkpoint(executor, checkpoint_dir, + trainer_id, + is_chief=False, + trainer_args=None, main_program=None, max_num_checkpoints=3): """ @@ -502,22 +480,35 @@ def save_checkpoint(executor, :param checkpoint_dir :param main_program :param max_num_checkpoints + :param is_chief """ if checkpoint_dir is None: raise ValueError("The values of 'checkpoint_dir' should not be None") + if trainer_args and not isinstance(trainer_args, dict): + raise TypeError("The type of 'trainer_args' should be dict") + if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) serial = _get_lastest_checkpoint_dir(checkpoint_dir) + 1 cur_dir = _get_serial_dir(checkpoint_dir, serial) - save_persist_vars_without_grad(executor, cur_dir, main_program) - _write_success(cur_dir) + if is_chief: + save_persist_vars_without_grad(executor, cur_dir, main_program) + + save_trainer_args(cur_dir, trainer_id, trainer_args) _lru_delete(checkpoint_dir, max_num_checkpoints) -def load_checkpoint(executor, checkpoint_dir, main_program=None): +def need_load_checkpoint(checkpoint_dir): + serial = _get_lastest_checkpoint_dir(checkpoint_dir) + if serial < 0: + return None + return serial + + +def load_checkpoint(executor, checkpoint_dir, serial, main_program): """ Load checkpoint from a directory by executor, it will find the most recent saved checkpoint file and load it auto. @@ -528,14 +519,17 @@ def load_checkpoint(executor, checkpoint_dir, main_program=None): """ if checkpoint_dir is None: - raise ValueError("The values of 'checkpoint_dir' should not be None") + raise ValueError( + "The values of 'checkpoint_dir' or 'serial' should not be None") - serial = _get_lastest_checkpoint_dir(checkpoint_dir) + if serial is None or serial < 0: + raise ValueError("The values of 'serial' should not be None or <0 ") - if serial < 0: - return + if main_program is None: + raise ValueError("The values of 'main_program'should not be None") cur_dir = _get_serial_dir(checkpoint_dir, serial) + cur_dir = _get_model_dir(cur_dir) load_persist_vars_without_grad(executor, cur_dir, main_program) @@ -552,6 +546,68 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): os.rmdir(checkpoint_dir) +def load_persist_vars_without_grad(executor, dirname, program, nest=True): + """ + load_persist_vars_without_grad will load variables from a directory by an executor, + the variable named end with "@GRAD" will not be loaded. + """ + + if nest: + dirname = _get_model_dir(dirname) + + load_vars( + executor, + dirname=dirname, + main_program=program, + predicate=_is_checkpoint_var, + filename=None) + + +def save_persist_vars_without_grad(executor, dirname, program): + """ + save_persist_vars_without_grad will save variables to a directory by an executor, + the variable named end with "@GRAD" will not be saved. + """ + cur_dir = _get_model_dir(dirname) + save_vars( + executor, + dirname=cur_dir, + main_program=program, + vars=None, + predicate=_is_checkpoint_var, + filename=None) + _write_success(cur_dir) + + +def save_trainer_args(dirname, trainer_id, trainer_args): + if not isinstance(trainer_args, dict): + raise TypeError("The type of 'trainer_args' should be dict") + cur_dir = _get_trainer_dir(dirname, trainer_id) + + for name, value in trainer_args.iteritems(): + args_file = os.path.join(cur_dir, name) + with open(args_file, 'w') as f: + f.write(str(value)) + _write_success(cur_dir) + + +def load_trainer_args(checkpoint_dir, serial, trainer_id, trainer_args): + cur_dir = _get_serial_dir(checkpoint_dir, serial) + cur_dir = _get_trainer_dir(cur_dir, trainer_id) + + if not isinstance(trainer_args, list): + raise TypeError("The type of 'trainer_args' should be list") + + ret_values = [] + + for arg in trainer_args: + cur_file = os.path.join(cur_dir, arg) + with open(cur_file, 'r') as f: + contents = f.read() + ret_values.append(contents.strip()) + return ret_values + + def _is_checkpoint_var(var): """ the checkpoint will not save or load all the variables. @@ -583,7 +639,31 @@ def _get_dir_serial(dirname): def _get_serial_dir(dirname, serial): serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial) - return os.path.join(dirname, serial_folder) + serial_dir = os.path.join(dirname, serial_folder) + + if not os.path.isdir(serial_dir): + os.makedirs(serial_dir) + + return serial_dir + + +def _get_model_dir(dirname): + model_dir = os.path.join(dirname, MODEL_DIR) + + if not os.path.isdir(model_dir): + os.makedirs(model_dir) + + return model_dir + + +def _get_trainer_dir(dirname, trainer_id): + trainer_folder = TRAINER_PREFIX + CHECKPOINT_SEPARATOR + str(trainer_id) + trainer_dir = os.path.join(dirname, trainer_folder) + + if not os.path.isdir(trainer_dir): + os.makedirs(trainer_dir) + + return trainer_dir def _lru_delete(dirname, max_num_checkpoints=3): @@ -638,7 +718,8 @@ def has_success(checkpoint_dir, cur_dir): return -1 success_path = os.path.join( - _get_serial_dir(checkpoint_dir, serial), SUCCESS_MARK_FILENAME) + _get_serial_dir(checkpoint_dir, serial), MODEL_DIR, + SUCCESS_MARK_FILENAME) if os.path.isfile(success_path): return serial diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 3cf96ac251132..206d582cdcaaf 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -79,6 +79,9 @@ def __init__(self, else: self.step_interval = step_interval + self.epoch_id = 0 + self.step_id = 0 + def check_and_get_place(place): """ @@ -132,6 +135,7 @@ def __init__(self, # config for checkpoint # only chief worker will save variables + self.trainer_id = 0 self.chief = True self.checkpoint = checkpoint_config if self.checkpoint and \ @@ -139,6 +143,8 @@ def __init__(self, raise TypeError( "The checkpoint_config shoule be an instance of CheckpointConfig" ) + self.load_checkpoint_serial = io.need_load_checkpoint( + self.checkpoint.checkpoint_dir) self.scope = core.Scope() @@ -168,15 +174,25 @@ def __init__(self, exe = executor.Executor(place) exe.run(self.startup_program) - if self.checkpoint: + if self.load_checkpoint_serial: exe = executor.Executor(place) io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, + self.load_checkpoint_serial, self.startup_program) - if param_path: + epoch_id, step_id = io.load_trainer_args( + self.checkpoint.checkpoint_dir, self.load_checkpoint_serial, + self.trainer_id, ["epoch_id", "step_id"]) + self.checkpoint.epoch_id = int(epoch_id) + self.checkpoint.step_id = int(step_id) + + if param_path and os.path.isdir(param_path): # load params from param_path into scope io.load_persist_vars_without_grad( - exe, dirname=param_path, program=self.startup_program) + exe, + dirname=param_path, + program=self.startup_program, + nest=False) def _transpile_nccl2_dist(self): # PADDLE_TRAINER_IPS @@ -333,11 +349,20 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): self._train_by_any_executor(event_handler, exe, num_epochs, reader) def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): - for epoch_id in range(num_epochs): + epochs = [ + epoch_id for epoch_id in range(num_epochs) + if epoch_id >= self.checkpoint.epoch_id + ] + for epoch_id in epochs: event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): if self.__stop: + self._clean_checkpoint() return + + if self.checkpoint and self.checkpoint.step_id >= step_id and self.checkpoint.epoch_id == epoch_id: + continue + begin_event = BeginStepEvent(epoch_id, step_id) event_handler(begin_event) if begin_event.fetch_metrics: @@ -352,6 +377,7 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): event_handler(EndStepEvent(epoch_id, step_id, metrics)) self._save_checkpoint(epoch_id, step_id) event_handler(EndEpochEvent(epoch_id)) + self._clean_checkpoint() def _test_by_executor(self, reader, feed_order, fetch_list): with executor.scope_guard(self.scope): @@ -390,17 +416,29 @@ def _get_or_create_parallel_executor(self): loss_name=self.train_func_outputs[0].name) return self._get_parallel_executor() + def _clean_checkpoint(self): + if not self.checkpoint: + return + io.clean_checkpoint(checkpoint_dir=self.checkpoint.checkpoint_dir) + def _save_checkpoint(self, epoch_id, step_id): - if not self.checkpoint or not self.chief: + if not self.checkpoint: return if epoch_id % self.checkpoint.epoch_interval == 0 and step_id % self.checkpoint.step_interval == 0: + trainer_args = {} + trainer_args["epoch_id"] = epoch_id + trainer_args["step_id"] = step_id + exe = executor.Executor(self.place) io.save_checkpoint( executor=exe, checkpoint_dir=self.checkpoint.checkpoint_dir, - max_num_checkpoints=self.checkpoint.max_num_checkpoints, - main_program=self.train_program) + trainer_id=self.trainer_id, + is_chief=self.chief, + trainer_args=trainer_args, + main_program=self.train_program, + max_num_checkpoints=self.checkpoint.max_num_checkpoints) def build_feed_var_list(program, feed_order): From 486e1e337d05679a22b389840136b9f07714646b Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 29 May 2018 20:36:45 +0800 Subject: [PATCH 07/32] bug fix and optimize --- python/paddle/fluid/trainer.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 206d582cdcaaf..35bb8ded5d906 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -79,8 +79,9 @@ def __init__(self, else: self.step_interval = step_interval - self.epoch_id = 0 - self.step_id = 0 + self._epoch_id = 0 + self._step_id = 0 + self._load_serial = None def check_and_get_place(place): @@ -174,17 +175,17 @@ def __init__(self, exe = executor.Executor(place) exe.run(self.startup_program) - if self.load_checkpoint_serial: + if self.checkpoint._load_serial: exe = executor.Executor(place) io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, - self.load_checkpoint_serial, + self.checkpoint._load_serial, self.startup_program) epoch_id, step_id = io.load_trainer_args( self.checkpoint.checkpoint_dir, self.load_checkpoint_serial, self.trainer_id, ["epoch_id", "step_id"]) - self.checkpoint.epoch_id = int(epoch_id) - self.checkpoint.step_id = int(step_id) + self.checkpoint._epoch_id = int(epoch_id) + self.checkpoint._step_id = int(step_id) if param_path and os.path.isdir(param_path): # load params from param_path into scope @@ -351,7 +352,7 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): epochs = [ epoch_id for epoch_id in range(num_epochs) - if epoch_id >= self.checkpoint.epoch_id + if epoch_id >= self.checkpoint._epoch_id ] for epoch_id in epochs: event_handler(BeginEpochEvent(epoch_id)) @@ -360,7 +361,8 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): self._clean_checkpoint() return - if self.checkpoint and self.checkpoint.step_id >= step_id and self.checkpoint.epoch_id == epoch_id: + if self.checkpoint and self.checkpoint._load_serial \ + and self.checkpoint._step_id >= step_id and self.checkpoint._epoch_id == epoch_id: continue begin_event = BeginStepEvent(epoch_id, step_id) From 9086043090f80ee7695d043e84fbe8068b2f76e7 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 29 May 2018 20:52:01 +0800 Subject: [PATCH 08/32] bug fix and optimize --- python/paddle/fluid/io.py | 1 - python/paddle/fluid/trainer.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index ed560304e25fd..2925e8eb2894f 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -529,7 +529,6 @@ def load_checkpoint(executor, checkpoint_dir, serial, main_program): raise ValueError("The values of 'main_program'should not be None") cur_dir = _get_serial_dir(checkpoint_dir, serial) - cur_dir = _get_model_dir(cur_dir) load_persist_vars_without_grad(executor, cur_dir, main_program) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 35bb8ded5d906..5ca93821e229b 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -144,7 +144,7 @@ def __init__(self, raise TypeError( "The checkpoint_config shoule be an instance of CheckpointConfig" ) - self.load_checkpoint_serial = io.need_load_checkpoint( + self.checkpoint._load_serial = io.need_load_checkpoint( self.checkpoint.checkpoint_dir) self.scope = core.Scope() @@ -182,7 +182,7 @@ def __init__(self, self.startup_program) epoch_id, step_id = io.load_trainer_args( - self.checkpoint.checkpoint_dir, self.load_checkpoint_serial, + self.checkpoint.checkpoint_dir, self.checkpoint._load_serial, self.trainer_id, ["epoch_id", "step_id"]) self.checkpoint._epoch_id = int(epoch_id) self.checkpoint._step_id = int(step_id) From 0211c5df0a12de2647b339dc0a8c36d35209a1a3 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 11:45:25 +0800 Subject: [PATCH 09/32] bug fix --- python/paddle/fluid/trainer.py | 17 +++++++++-------- tools/codestyle/docstring_checker.pyc | Bin 0 -> 12561 bytes 2 files changed, 9 insertions(+), 8 deletions(-) create mode 100644 tools/codestyle/docstring_checker.pyc diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 5ca93821e229b..34db9b39b7757 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -139,13 +139,14 @@ def __init__(self, self.trainer_id = 0 self.chief = True self.checkpoint = checkpoint_config - if self.checkpoint and \ - not isinstance(self.checkpoint, CheckpointConfig): - raise TypeError( - "The checkpoint_config shoule be an instance of CheckpointConfig" - ) - self.checkpoint._load_serial = io.need_load_checkpoint( - self.checkpoint.checkpoint_dir) + if self.checkpoint: + if not isinstance(self.checkpoint, CheckpointConfig): + raise TypeError( + "The checkpoint_config shoule be an instance of CheckpointConfig" + ) + else: + self.checkpoint._load_serial = io.need_load_checkpoint( + self.checkpoint.checkpoint_dir) self.scope = core.Scope() @@ -175,7 +176,7 @@ def __init__(self, exe = executor.Executor(place) exe.run(self.startup_program) - if self.checkpoint._load_serial: + if self.checkpoint and self.checkpoint._load_serial: exe = executor.Executor(place) io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, self.checkpoint._load_serial, diff --git a/tools/codestyle/docstring_checker.pyc b/tools/codestyle/docstring_checker.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1f0255b763c8d154b7cc17d2d525148dfea3b42d GIT binary patch literal 12561 zcmdT~Npl;=7488*kbt;or54N9SSu6>B(+$!8Cn)4%CZuRQVr#(M2iK3m?1gT00T}B zEYg-Lm69vVF}LI&q$*W8rE*K<5Z`=Een4`}DTh=}i7WZO*Mk8_(T-GhiZsb-cE9fF z<@MX&WdAWx{QTCy2Q`)aWbywB9{o!cu2QooE#+oZV5wPDOg*D!Gg8m0Agk&*HJelQ z9yQyeERM*jAg}7ZYPQ$3dsNV;>IF4hFm23MRI^3p=GCWIVux~jmDjHp@M<3+PnEhq zJD}Wx@^S(#rqDs|Wu~-)Tbw{K71ned`HjVEOJ41v7umkHTiSE&IJ9d7+0C_hDQwto zShLN@QyP`Ez#H-ZgP-YA2Ghix@M=|i$uw~f+O60Rbjf1(5Q>|Vct7;rWQghDd#<-o zZ3VIG*WwyS^C2Fu;nDk1JXUHMlUS@gwfILg&B)4nHwHM~qOW7@UbgH-dL-G_PIN{| zXC>*33EX&Yvzt760>6QoN~kF*^a^UD?`zCD8dn>Ok34_$uK9oN{FV8t@3uy2VSRMb zk4G27Q61G-U24q>^iI`v1J9JB$-g)Z13g*`T~F{h3e+0Rws$xJocCeX_RbHXu8C%q zi3=#x&DRLy4?C~o(f3h!pb3xyN+B|mNS4>mN=~_`=hUYl#~$VO2r%EKq+U`IhxH|t z1d0O{5mHpnOsU8v#!;j@uxmk8YkMgSTy4iop53fQBp;k9As5>Vyt>zj^+?giPkO`L zvfS{?sKj!;xLDk9B_kJ}B^g_;!ab89rb{`LPL_jN3vl6}2vCpES}pLZk-0?nYA;yW z7FSrQ_zgdM)5vr;vfYiGn(*bFirn^0iA*;w z*qMwLwUE&D)oQ&NfY5r;=Xh}|YUr3`?o@s4X<;FU*D9}7@tPpf=?7@55$I4joWi5I zL1hgjzk)}hCE|~VPHBmF1p1z2Nl}k86ctZ$YPF;ugS2wW2Iyf%MH5N;$P<8tH(tjp zEBn+vv}Q`2b{eQv$|r>xvLZ0>8iHg&kD$>ou2lVo3o;jK47`uLKv=cK+t>cb-c}=1 zcC+Zk71`Se4x@R;%F0{))*h>94Q6UYBk6>6Dr9*c$2Y?FDMkW^%aFcGJ%L;qAd**m zEMRnM?h9fW_+19Z(cPosta{u->_gf-v{ZCUtsGMgGntI!#pF3=8B!?6k3+Jfpw@%uNM1ZL)!~cSf_XtX-F!bY1NkO zsTrtUs?oQ01Dd8wAR9T%x01Ei zPjxqHdA7eGFJPiXp6gbv=4rjq3WBxH1VZ;A){3+k1Hv_KCHP0N)UrNL|1KI%hkQcAqYsJ#c+ow1^x`)DN2ZFw|eT8<`MRC)1}k;k^> z$;f9snU@$Y^Bc}a79?d~Bb?Um>g_I?oSM6$pwq#IP(?6#p|YMrouC4(KgeBo6eSTv*7Q;$I8;7hp|KN3eJ8(`Do^qhi0$Z57&JGv{_J>+@ipC00yTXumQjZ z1vUiOPJ!(LY`4Jn0Jc|P`v4mj*nYqc2<#wWhXnQ#V21_vGGMO=>`TCW6OezQ^9G42B2PdR5q}-EI*{R%9$}NdMxl6gH1-M(e-;v55<(`qsUge&Z z%0A_mr7}!^a+Jusj$$g*#(GJD12kcpYjnliES4hA39*MPP7f! z?S^O52H^(V{x5kYhGC_m2-Fnkp)4+6Y@!gMgQ!_4#$E z;33e(>CJti2CK9oc5T;Q(;Z#VXE4DnUu(p&h{1$B!lM&+a{ zVTzmHnT6>DxHpmUTr9Y@C{>r9a zmXVs)A#Qz>!Rbnu^l=QjAGWyJR^ZwQBiSD&R`iEN1VT*TlbkCA5{JP-rnT5s7_^|E7biXNHq`AvTMhT8KTmH-ENLPPDT+l4`&*vH* z{Ua2{=A{I%(g2XluNNLJ-cuaGGf0)j2 z<^;ViJT|;VJZG#1aNFDV(?qB_7qOslQ<5WScqQ)8p}FEPanX@qGGZDRAVDG#Pf)Uz zI-*=vIG{B2MQ(04)L+HMev2mwic=UJw~kmoNv%dbMc<2$+`!LJF*g9019*=1q^O5> z1WvQOgQut?Og_sP?$n$k5nNzef}p^Y3B+O$Zsq{Pw)o4aVO~;JMWbJGNxgDOP$n}g zqv{?nyd?hb-?61`+HPym)M!J3(exYR{u|Vt40b4eNdb8W%@a z&TA|v>YaB`OqUq-G^+FlYob}1>N5mMTm@9fP3K(#g?ewI)`>@&$@a0=X7MVEV<<$N z)LIdgF>TNiKwGI=9J+YF$CIegAyJDZH(s zknceuUs!tpC2|50{80nVsh_}Bf%M3zALET3Y6?R*Dlb`|_YLVoR3qmA*Sn7kFmC{f z!X!h+qBs#kr7qNeM9~(|Z6Y;+@x-<(As=K<*cZo6(_e+7hSB1qb*!})u=m6eY#V)@ z8D5C`HZ}Kn{WY2$L@<$i0gq;E2Wo3>F9T-4rV-M>)(@-|k;?77E8i4hxd3VsX$TZ4 zH8Lju8i5HT+5mAOwKZm&xVICFUc*~$MwkD}eHe1m+Xlsz;T*IFJgeT#@0^rB}dT=mSrI5kXLTSu_MV4s<07oQXa#9dmbJ zI;6~m>8P~yA5e4*p)R&-^Ol51+Hnl~)W|8j!z<6pZ3tBuHw|3-|6n-b7#1TA!y4UG zMsLap>-E&+h~R$-z2bf0d?ruzF2{F7|7OI65nIG`&J>pZ8<%HVPxVr+zl7%iWIAYy z97QvVGMyBte~2O$vtxBk2*851ae}4??jNQHh>K*EVYX^zC-cQH!(gP`L4%ojeT{$_ zgwg5L!@fuf5Yhclgvby}-C!#g43WVBC=_7I#rduFY^w7rCBf z_qT;yC$QOJsFD9ouGE}-M9S{QD~m?TA~gy%3snp=7@2~(%bUpBe#E2s=?-jS_;V1X zQ)*>czSV&=hNP7)XmJbBdyof^!)QUuGVt1vug=9Zx6NdU2JZQd`jt} z`tsH=KByZ)rhG4LY=nxBTLrO*_}vYC-oA`ch5WYJ8DYVcjWfoA7M}SUsp9j?O-b?~ zqfRgX&h43-cW>UFo_w!z>-x;h|9;Nfr!$I#*fHeHR}>3_gcy)S+;4c=C=`m)aJ2 zIJ7fEJo#ZM!5`O6_cMHX{Us#ev|_;P?ADOdgWvC1*8PExmkWfRSSb4`5I8Z xXLe5WP2PmumofcJdWFZ@76=ydnJ|fg%Y)ce@_|IZ)sK7#^C11%LeKD>{{o7gqwN3y literal 0 HcmV?d00001 From 0deb6f90baa5dab02b5ff1cbc98dcaf7fae9b80b Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 14:20:51 +0800 Subject: [PATCH 10/32] annotation optimized and code style optimized --- python/paddle/fluid/io.py | 22 +++++++++++++++++++++- python/paddle/fluid/trainer.py | 12 ++++++------ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 2925e8eb2894f..d52c9a882368e 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -478,9 +478,10 @@ def save_checkpoint(executor, :param executor :param checkpoint_dir + :param trainer_id + :param is_chief :param main_program :param max_num_checkpoints - :param is_chief """ if checkpoint_dir is None: raise ValueError("The values of 'checkpoint_dir' should not be None") @@ -502,6 +503,11 @@ def save_checkpoint(executor, def need_load_checkpoint(checkpoint_dir): + """ + If the directory have checkpoint files, it will return lastest checkpoint directory serial number + + :param checkpoint_dir + """ serial = _get_lastest_checkpoint_dir(checkpoint_dir) if serial < 0: return None @@ -515,6 +521,7 @@ def load_checkpoint(executor, checkpoint_dir, serial, main_program): :param executor :param checkpoint_dir + :param serial :param main_program """ @@ -536,7 +543,11 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): """ clean the checkpoint dir, when the train exits normally, the trainer will call clean_checkpoint to delete checkpoint directory saved before. delete_dir only works when the directory is empty, otherwise, OSError is raised. + + :param checkpoint_dir + :param delete_dir """ + if checkpoint_dir is None: raise ValueError("The values of 'checkpoint_dir' should not be None") _lru_delete(checkpoint_dir, max_num_checkpoints=0) @@ -549,6 +560,11 @@ def load_persist_vars_without_grad(executor, dirname, program, nest=True): """ load_persist_vars_without_grad will load variables from a directory by an executor, the variable named end with "@GRAD" will not be loaded. + + :param executor + :param dirname + :param program + :param nest """ if nest: @@ -566,6 +582,10 @@ def save_persist_vars_without_grad(executor, dirname, program): """ save_persist_vars_without_grad will save variables to a directory by an executor, the variable named end with "@GRAD" will not be saved. + + :param executor + :param dirname + :param program """ cur_dir = _get_model_dir(dirname) save_vars( diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 34db9b39b7757..6d8d4a3e43bad 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -79,8 +79,8 @@ def __init__(self, else: self.step_interval = step_interval - self._epoch_id = 0 - self._step_id = 0 + self.epoch_id = 0 + self.step_id = 0 self._load_serial = None @@ -185,8 +185,8 @@ def __init__(self, epoch_id, step_id = io.load_trainer_args( self.checkpoint.checkpoint_dir, self.checkpoint._load_serial, self.trainer_id, ["epoch_id", "step_id"]) - self.checkpoint._epoch_id = int(epoch_id) - self.checkpoint._step_id = int(step_id) + self.checkpoint.epoch_id = int(epoch_id) + self.checkpoint.step_id = int(step_id) if param_path and os.path.isdir(param_path): # load params from param_path into scope @@ -353,7 +353,7 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): epochs = [ epoch_id for epoch_id in range(num_epochs) - if epoch_id >= self.checkpoint._epoch_id + if epoch_id >= self.checkpoint.epoch_id ] for epoch_id in epochs: event_handler(BeginEpochEvent(epoch_id)) @@ -363,7 +363,7 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): return if self.checkpoint and self.checkpoint._load_serial \ - and self.checkpoint._step_id >= step_id and self.checkpoint._epoch_id == epoch_id: + and self.checkpoint.step_id >= step_id and self.checkpoint.epoch_id == epoch_id: continue begin_event = BeginStepEvent(epoch_id, step_id) From d712af25dcee298a1bd1fda1bba6a1f0ed001ab0 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 16:29:05 +0800 Subject: [PATCH 11/32] add distribute config --- python/paddle/fluid/trainer.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 6d8d4a3e43bad..e98672f3187da 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -81,7 +81,8 @@ def __init__(self, self.epoch_id = 0 self.step_id = 0 - self._load_serial = None + self.load_serial = None + self.is_pserver = False def check_and_get_place(place): @@ -145,7 +146,7 @@ def __init__(self, "The checkpoint_config shoule be an instance of CheckpointConfig" ) else: - self.checkpoint._load_serial = io.need_load_checkpoint( + self.checkpoint.load_serial = io.need_load_checkpoint( self.checkpoint.checkpoint_dir) self.scope = core.Scope() @@ -176,17 +177,18 @@ def __init__(self, exe = executor.Executor(place) exe.run(self.startup_program) - if self.checkpoint and self.checkpoint._load_serial: + if self.checkpoint and self.checkpoint.load_serial: exe = executor.Executor(place) io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, - self.checkpoint._load_serial, + self.checkpoint.load_serial, self.startup_program) - epoch_id, step_id = io.load_trainer_args( - self.checkpoint.checkpoint_dir, self.checkpoint._load_serial, - self.trainer_id, ["epoch_id", "step_id"]) - self.checkpoint.epoch_id = int(epoch_id) - self.checkpoint.step_id = int(step_id) + if not self.checkpoint.is_pserver: + epoch_id, step_id = io.load_trainer_args( + self.checkpoint.checkpoint_dir, self.checkpoint.load_serial, + self.trainer_id, ["epoch_id", "step_id"]) + self.checkpoint.epoch_id = int(epoch_id) + self.checkpoint.step_id = int(step_id) if param_path and os.path.isdir(param_path): # load params from param_path into scope @@ -259,6 +261,9 @@ def _dist_transpile_if_necessary(self, optimize_ops, params_grads): t.transpile( trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": + if self.checkpoint: + self.is_pserver = True + self.train_program = t.get_pserver_program(current_endpoint) self.startup_program = t.get_startup_program(current_endpoint, self.train_program) @@ -362,7 +367,7 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): self._clean_checkpoint() return - if self.checkpoint and self.checkpoint._load_serial \ + if self.checkpoint and self.checkpoint.load_serial \ and self.checkpoint.step_id >= step_id and self.checkpoint.epoch_id == epoch_id: continue From b44ede803387c0e292322ba140468599a9136352 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 19:26:12 +0800 Subject: [PATCH 12/32] bug fix --- python/paddle/fluid/trainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index e98672f3187da..b4f719855f479 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -252,14 +252,14 @@ def _dist_transpile_if_necessary(self, optimize_ops, params_grads): current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port # the unique trainer id, starting from 0, needed by trainer # only - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) self.chief = self.trainer_id == 0 # the role, should be either PSERVER or TRAINER training_role = os.getenv("PADDLE_TRAINING_ROLE") with self._prog_and_scope_guard(): t = distribute_transpiler.DistributeTranspiler() t.transpile( - trainer_id, pservers=pserver_endpoints, trainers=trainers) + self.trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": if self.checkpoint: self.is_pserver = True From 94eaf94cf57ec2cc951d046e847b69c348b8f9c9 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 21:51:16 +0800 Subject: [PATCH 13/32] bug fix about lru and save --- python/paddle/fluid/io.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index d52c9a882368e..8e10b01a4abb0 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -495,11 +495,11 @@ def save_checkpoint(executor, serial = _get_lastest_checkpoint_dir(checkpoint_dir) + 1 cur_dir = _get_serial_dir(checkpoint_dir, serial) + save_trainer_args(cur_dir, trainer_id, trainer_args) + if is_chief: save_persist_vars_without_grad(executor, cur_dir, main_program) - - save_trainer_args(cur_dir, trainer_id, trainer_args) - _lru_delete(checkpoint_dir, max_num_checkpoints) + _lru_delete(checkpoint_dir, max_num_checkpoints) def need_load_checkpoint(checkpoint_dir): @@ -639,7 +639,13 @@ def _is_checkpoint_var(var): var.desc.type() == core.VarDesc.VarType.RAW: return False - if var.name.endswith("@GRAD"): + if "@GRAD" in var.name: + return False + + if ".trainer_" in var.name: + return False + + if ".block" in var.name: return False return var.persistable From e44c278e60603c37640a0a352f4bbb7f8363bebc Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 21:55:12 +0800 Subject: [PATCH 14/32] bug fix about clean --- python/paddle/fluid/trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index b4f719855f479..69577a98fbd5a 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -425,7 +425,7 @@ def _get_or_create_parallel_executor(self): return self._get_parallel_executor() def _clean_checkpoint(self): - if not self.checkpoint: + if not self.checkpoint and not self.chief: return io.clean_checkpoint(checkpoint_dir=self.checkpoint.checkpoint_dir) From bca4da422582990b4308932d2c20274cdb6c5a60 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 30 May 2018 21:56:54 +0800 Subject: [PATCH 15/32] cancle only chief delete files --- python/paddle/fluid/io.py | 3 ++- python/paddle/fluid/trainer.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 8e10b01a4abb0..62e3046db6486 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -499,7 +499,8 @@ def save_checkpoint(executor, if is_chief: save_persist_vars_without_grad(executor, cur_dir, main_program) - _lru_delete(checkpoint_dir, max_num_checkpoints) + + _lru_delete(checkpoint_dir, max_num_checkpoints) def need_load_checkpoint(checkpoint_dir): diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 69577a98fbd5a..b4f719855f479 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -425,7 +425,7 @@ def _get_or_create_parallel_executor(self): return self._get_parallel_executor() def _clean_checkpoint(self): - if not self.checkpoint and not self.chief: + if not self.checkpoint: return io.clean_checkpoint(checkpoint_dir=self.checkpoint.checkpoint_dir) From 46f2688f3051b0bbeb070d05159922e8b689720e Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 31 May 2018 09:53:41 +0800 Subject: [PATCH 16/32] bug fix --- python/paddle/fluid/trainer.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index b4f719855f479..3354d77acec9e 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -356,10 +356,14 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): self._train_by_any_executor(event_handler, exe, num_epochs, reader) def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): - epochs = [ - epoch_id for epoch_id in range(num_epochs) - if epoch_id >= self.checkpoint.epoch_id - ] + if self.checkpoint: + epochs = [ + epoch_id for epoch_id in range(num_epochs) + if epoch_id >= self.checkpoint.epoch_id + ] + else: + epochs = [epoch_id for epoch_id in range(num_epochs)] + for epoch_id in epochs: event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): From 7973d9b4b5b3ef032c13410401b8c368220cd21d Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Fri, 1 Jun 2018 10:09:31 +0800 Subject: [PATCH 17/32] bug fix --- python/paddle/fluid/trainer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 3354d77acec9e..72168886fdba4 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -178,10 +178,11 @@ def __init__(self, exe.run(self.startup_program) if self.checkpoint and self.checkpoint.load_serial: - exe = executor.Executor(place) - io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, - self.checkpoint.load_serial, - self.startup_program) + with self._prog_and_scope_guard(): + exe = executor.Executor(place) + io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, + self.checkpoint.load_serial, + self.startup_program) if not self.checkpoint.is_pserver: epoch_id, step_id = io.load_trainer_args( From c06f43bbb6aec4ae12d514ca92a77aed0d473882 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Mon, 4 Jun 2018 15:20:06 +0800 Subject: [PATCH 18/32] add annotation about _is_checkpoint_var --- python/paddle/fluid/io.py | 5 +++-- tools/codestyle/docstring_checker.pyc | Bin 12561 -> 12561 bytes 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 62e3046db6486..75146fe3269ad 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -639,13 +639,14 @@ def _is_checkpoint_var(var): var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ var.desc.type() == core.VarDesc.VarType.RAW: return False - + # @GRAD are named for gradient varibales, checkpoint will not save it. if "@GRAD" in var.name: return False - + # .trainer_ are named for distribute trian variables, checkpoint will not save it. if ".trainer_" in var.name: return False + # .block is named for distribute trian variables, checkpoint will not save it. if ".block" in var.name: return False diff --git a/tools/codestyle/docstring_checker.pyc b/tools/codestyle/docstring_checker.pyc index 1f0255b763c8d154b7cc17d2d525148dfea3b42d..a27d3c9a8cccab8552d510578debb2df04eb53bb 100644 GIT binary patch delta 16 XcmbQ3G%<;t`7|BNbF9rm> delta 16 XcmbQ3G%<;t`7|BNbF*pSi From 08e5f0ae482c1e70dc74c4677e5cb699b38c433e Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Mon, 4 Jun 2018 16:10:11 +0800 Subject: [PATCH 19/32] rename need_load_checkpoint to get_latest_checkpoint_serial --- python/paddle/fluid/io.py | 4 ++-- python/paddle/fluid/trainer.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 75146fe3269ad..111907b57516a 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -25,7 +25,7 @@ 'load_persistables', 'save_inference_model', 'load_inference_model', 'get_inference_program', 'save_checkpoint', 'load_checkpoint', 'clean_checkpoint', 'load_persist_vars_without_grad', - 'save_persist_vars_without_grad' + 'save_persist_vars_without_grad', 'get_latest_checkpoint_serial' ] @@ -503,7 +503,7 @@ def save_checkpoint(executor, _lru_delete(checkpoint_dir, max_num_checkpoints) -def need_load_checkpoint(checkpoint_dir): +def get_latest_checkpoint_serial(checkpoint_dir): """ If the directory have checkpoint files, it will return lastest checkpoint directory serial number diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 72168886fdba4..3c32ec1de8a97 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -146,7 +146,7 @@ def __init__(self, "The checkpoint_config shoule be an instance of CheckpointConfig" ) else: - self.checkpoint.load_serial = io.need_load_checkpoint( + self.checkpoint.load_serial = io.get_latest_checkpoint_serial( self.checkpoint.checkpoint_dir) self.scope = core.Scope() From bfdcf18707c79f2cc29b0903cb9f4fab2e907490 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Mon, 4 Jun 2018 21:10:38 +0800 Subject: [PATCH 20/32] grammar optimized. --- python/paddle/fluid/io.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 111907b57516a..b5d96441bcf93 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -492,7 +492,7 @@ def save_checkpoint(executor, if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) - serial = _get_lastest_checkpoint_dir(checkpoint_dir) + 1 + serial = _get_latest_checkpoint_dir(checkpoint_dir) + 1 cur_dir = _get_serial_dir(checkpoint_dir, serial) save_trainer_args(cur_dir, trainer_id, trainer_args) @@ -505,11 +505,11 @@ def save_checkpoint(executor, def get_latest_checkpoint_serial(checkpoint_dir): """ - If the directory have checkpoint files, it will return lastest checkpoint directory serial number + If the directory have checkpoint files, it will return latest checkpoint directory serial number :param checkpoint_dir """ - serial = _get_lastest_checkpoint_dir(checkpoint_dir) + serial = _get_latest_checkpoint_dir(checkpoint_dir) if serial < 0: return None return serial @@ -639,14 +639,14 @@ def _is_checkpoint_var(var): var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ var.desc.type() == core.VarDesc.VarType.RAW: return False - # @GRAD are named for gradient varibales, checkpoint will not save it. + # @GRAD are named for gradient variables, checkpoint will not save it. if "@GRAD" in var.name: return False - # .trainer_ are named for distribute trian variables, checkpoint will not save it. + # .trainer_ are named for distribute train variables, checkpoint will not save it. if ".trainer_" in var.name: return False - # .block is named for distribute trian variables, checkpoint will not save it. + # .block is named for distribute train variables, checkpoint will not save it. if ".block" in var.name: return False @@ -656,7 +656,6 @@ def _is_checkpoint_var(var): def _get_dir_serial(dirname): _, serial = dirname.split(CHECKPOINT_SEPARATOR) - serial_num = -1 try: serial_num = int(serial) except ValueError: @@ -723,7 +722,7 @@ def _write_success(dirname): f.write(now) -def _get_lastest_checkpoint_dir(checkpoint_dir): +def _get_latest_checkpoint_dir(checkpoint_dir): """ get the latest file in checkpoint directory, the _SUCCESS file must exist in the directory From 9735f25011b04116d271861fde8df05def81c3ce Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 14:47:13 +0800 Subject: [PATCH 21/32] optimized --- python/paddle/fluid/io.py | 44 +++++++++++++--------------------- python/paddle/fluid/trainer.py | 8 +++---- 2 files changed, 20 insertions(+), 32 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index b5d96441bcf93..5abadc73f76b5 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -492,7 +492,7 @@ def save_checkpoint(executor, if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) - serial = _get_latest_checkpoint_dir(checkpoint_dir) + 1 + serial = get_latest_checkpoint_serial(checkpoint_dir) + 1 cur_dir = _get_serial_dir(checkpoint_dir, serial) save_trainer_args(cur_dir, trainer_id, trainer_args) @@ -503,18 +503,6 @@ def save_checkpoint(executor, _lru_delete(checkpoint_dir, max_num_checkpoints) -def get_latest_checkpoint_serial(checkpoint_dir): - """ - If the directory have checkpoint files, it will return latest checkpoint directory serial number - - :param checkpoint_dir - """ - serial = _get_latest_checkpoint_dir(checkpoint_dir) - if serial < 0: - return None - return serial - - def load_checkpoint(executor, checkpoint_dir, serial, main_program): """ Load checkpoint from a directory by executor, @@ -527,17 +515,16 @@ def load_checkpoint(executor, checkpoint_dir, serial, main_program): """ if checkpoint_dir is None: - raise ValueError( - "The values of 'checkpoint_dir' or 'serial' should not be None") + raise ValueError("The values of 'checkpoint_dir' should not be None") if serial is None or serial < 0: raise ValueError("The values of 'serial' should not be None or <0 ") if main_program is None: - raise ValueError("The values of 'main_program'should not be None") + raise ValueError('main_program should not be None.') cur_dir = _get_serial_dir(checkpoint_dir, serial) - load_persist_vars_without_grad(executor, cur_dir, main_program) + load_persist_vars_without_grad(executor, cur_dir, main_program, True) def clean_checkpoint(checkpoint_dir, delete_dir=False): @@ -557,18 +544,21 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): os.rmdir(checkpoint_dir) -def load_persist_vars_without_grad(executor, dirname, program, nest=True): +def load_persist_vars_without_grad(executor, + dirname, + program, + has_model_dir=False): """ load_persist_vars_without_grad will load variables from a directory by an executor, the variable named end with "@GRAD" will not be loaded. - :param executor - :param dirname - :param program - :param nest + :param executor executor for load the value + :param dirname the checkpoint directory + :param program will load all variables in program + :param has_model_dir if has_model_dir is True, will load variables from sub directory named __model__ """ - if nest: + if has_model_dir: dirname = _get_model_dir(dirname) load_vars( @@ -584,9 +574,9 @@ def save_persist_vars_without_grad(executor, dirname, program): save_persist_vars_without_grad will save variables to a directory by an executor, the variable named end with "@GRAD" will not be saved. - :param executor - :param dirname - :param program + :param executor executor for load the value + :param dirname the checkpoint directory + :param program will load all variables in program """ cur_dir = _get_model_dir(dirname) save_vars( @@ -722,7 +712,7 @@ def _write_success(dirname): f.write(now) -def _get_latest_checkpoint_dir(checkpoint_dir): +def get_latest_checkpoint_serial(checkpoint_dir): """ get the latest file in checkpoint directory, the _SUCCESS file must exist in the directory diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 3c32ec1de8a97..fbdd28f53efcd 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -146,8 +146,9 @@ def __init__(self, "The checkpoint_config shoule be an instance of CheckpointConfig" ) else: - self.checkpoint.load_serial = io.get_latest_checkpoint_serial( + serial = io.get_latest_checkpoint_serial( self.checkpoint.checkpoint_dir) + self.checkpoint.load_serial = serial if serial >= 0 else None self.scope = core.Scope() @@ -194,10 +195,7 @@ def __init__(self, if param_path and os.path.isdir(param_path): # load params from param_path into scope io.load_persist_vars_without_grad( - exe, - dirname=param_path, - program=self.startup_program, - nest=False) + exe, dirname=param_path, program=self.startup_program) def _transpile_nccl2_dist(self): # PADDLE_TRAINER_IPS From be16af3b04b3052e35e6d9157cec302274a629a4 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 14:48:15 +0800 Subject: [PATCH 22/32] delete pyc --- tools/codestyle/docstring_checker.pyc | Bin 12561 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tools/codestyle/docstring_checker.pyc diff --git a/tools/codestyle/docstring_checker.pyc b/tools/codestyle/docstring_checker.pyc deleted file mode 100644 index a27d3c9a8cccab8552d510578debb2df04eb53bb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12561 zcmdT~%WoUU8J{I7N~9&rPx%oi-uOw^rX)Y&B&yDAw|-TX1?xj-N!7Warm=5bYj{N50^(C+V(2Hze+imT+b{yJug6!5>yc9NV zH>}%c|-bj7cn3o>=@Nnkurg-h%;-x9IB_yO%9{kseL9wF{k5 z(pgP9V*)px+w3Nfp1^NnrV?sO3cZ5b*n1kYj>Wa+;zyo8cGvtrf8pwU&3D_Q^{_Fv z=*MG=;h2uz%QP)JX z%ESef>E`Q%@rRw)@aX#}JkSJ40i_TbNhHf_XClMO0$BUR*40xSEj*&ytKSSK*#X5HqD5N+-*~tOdAmPz0z)XssT2wa8o|d$kuV zY>O+bR{f?QSF6XlDeg>J!&Zs|p5{7xrcf+)2pzON*==Nc8`<7QPEGpqPE~HZQXjap;KBS9)Z5cSyI%a3`ND`oLViZM`P=n*s;#+9nybV24qje+-}7YM7idHdS`*xPDk z%5E0DxGH-a#bGq>SXp^%(AsMit(}=V(MUQWoeEiA!10aneS(p|;WDJJQjZ~5hKS_V zJ_{I~p8K3w27Z@;adh{oIIAA@5&Muf4=okFt5%MyrkPAe@?!EFvkWPe^VE(QB;IM&)FRgwsrajM^2|QyuX|cAw1Z%+JAu%Bh_xau#(?mQCP&H^ot;DkHCbmRhq_mb zERy3*IJ*g;QtO;^!Xf$$_%o>A869#(2w<90a%*v?Tr;#NWuW(v7N|6rkTvXJGk!z-=hkssPg*YN0zxHRA? z^}4()-EKNt#?6Ai4<9Q_haScby(>8T1?8ieS00-E>Hu8#1<+p#2{K?(QJtM$9%Kes9_A2+RRQ4(N zoK*HJw=9(r`jcZs-VGGfp*Ge_5*(lj(^{h|-f`&>kPwSUoOl{1W_%KgkSKlA)@2n>*M3+H92UyyOJAv8rEKH!=kZi$k@k5kJ&9R@Fw6Yfd9!*1yhYKp(I zsh4G>rgey0-(+yQ(q(-DgYJiIZnhn`Ho{2u2Z>N$5x(l1! z>%5GcsP(*^xFWr?qzk_)czkMMF*82sI5E9%iWh0_GLlh(qs^B8;yKdQUlSKJi~aMt zjz|9hg|RuQ%MtlAUMS0qItcmt5U%5Cn&uV4a!Oy42IhPso+>pwd^_or_u;F-!Gi$+ zXElTQ#CKcYhA)}+OT3iZ(VwFD(1%E=F0`BC%^`AW~Tm51wQ0p`mCnuP&qm_3#v-d<`Ufnx6*&oh<9({{~(t+4=N+Wb%5^3MR&`%tKXq0ESt%M|3<1&3*uB}7XLkB<;7=U7LyZa0ic z`_6d{g>h$zC+7;Ac(yd+%$=Bn8#C@#Xm|Xj&gs6{C#2djCzPl)xcApzc1B*(x?5xe zav_mJXcz~BLJ19oh)hqZR?D22G7NBp6MPv;_j^2v3LO^ZIhYx;jzE1Lu?9@)fy#xa&T`}$3PZRc;h(_U zDhl}?67q$$2T&p>5Wyca(46`)Y!yh4jQSDY*rBE|groA3^?A>bK14Ng4sg8(xB&A8 zkSI(tWGspkAyn!@{YMlX0o@@|6BtiyyAtw2_N0Aj{0#k7IBFOzK3dmWdk%X~48e}k z*PY?HsBcqqkJn$P*+B#oxfk(h#&)2#=JqjQ25cH34QzeSS`n$-$-DAR5ta*}CXt3f zky4}M@~;_~Frp0*7gAeewuyT?x#%^$)mC)nuiS?rC(Z2_r5qAaamfE1$pnWUa4$Cs z)jd7f=ntENyIi^#O-Tmmc4H%Va#@Mw$>f5PE{3)SInH(L=}&kP{wpBHOqh;JOaB2y*AVJqyAE$jc%&1@uuqSkw!6IYjNFD$g>loswf_%>6OLgq>M*R) zOJ($?jIdr$O^yiu7tkx-7tUw$RBv;9SM+a2To|!MOy^8v>A!J#ru9@m<@yVF4nn4b zrpQq=qbSo!f%^L>Vlg{b$AthaSQ{s4df@(HdVsh{RvBiiR(3I83^NQy${jSAnb+3| zs343^ryll2N`Q#&e zZo;9w`8H!F_`DEFGO-3=q;s?UVh1+C4pmvb$+f2E-jpwe&tE+{{)gYL@Ilj2=U9FY{SOn#q;{1Uv6bQ6Z51?er5457O9WZD^0oWdh_kY ziR9ZJ99vJ=&PBkxj3uTb!pBG=m6+5y1B)q!i)@$Z8%feyqEyV)*O&ajjl5J6$P@8n!7>8nbH&Hg6BL@@;HxoP0{@ zq6YHT2tKGAMy7lpZES>!Pgn)9i1^(NecpkLQHA`r*%@WQl#Mgaf)<|n8ma2@%S}o0 zAfrw%|IY2ot-H5w&rH2jefLJCGIjGt_13h67r%Ss{=M7IbPw3M&xvSy>9xju7>HTf zLMAZs<2AXDhQ>Fdlp0}6zD#r8MBTZ{Vu}UBpUyQFbl=5?5QER58g(d~9G<+Z)upyY z9uDoy5Rd*2ihMS26`<0o+WDKs=Z!^t(NsjOU&`I6@iDD_YlxZt1Cry<7Yc(Tg^|ME z!r{VU`IJvC#<+X_s!=Y(SjFV0M>{EDsvqTm43wl0l3w|kd;uvzZElvIP}RdANWKP? y@0p!5e3LgJ_hn2!lV0JmwgrNPd?rm|;L1+yD)~TS&>BQOgn5v`Y@u)D&VK>kVxyJ- From eea5762e26a9a6ae2d9642830031028e5952af45 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 17:04:17 +0800 Subject: [PATCH 23/32] add checkpoint unittest --- .../fluid/tests/unittests/test_checkpoint.py | 72 ++++++++++++++++++ tools/codestyle/docstring_checker.pyc | Bin 0 -> 12561 bytes 2 files changed, 72 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_checkpoint.py create mode 100644 tools/codestyle/docstring_checker.pyc diff --git a/python/paddle/fluid/tests/unittests/test_checkpoint.py b/python/paddle/fluid/tests/unittests/test_checkpoint.py new file mode 100644 index 0000000000000..b8d82c59b4e2c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_checkpoint.py @@ -0,0 +1,72 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import unittest + + +class TestCheckpoint(unittest.TestCase): + def setUp(self): + self.dirname = "/tmp/ckpt" + self.max_num_checkpoints = 3 + self.epoch_interval = 1 + self.step_interval = 1 + self.trainer_id = 0 + self.chief = self.trainer_id == 0 + self.place = fluid.CPUPlace() + self.epoch_id = 100 + self.step_id = 20 + + def test_checkpoint(self): + self.save_checkpoint() + serial = fluid.io.get_latest_checkpoint_serial(self.dirname) + self.assertTrue(serial >= 0) + trainer_args = ["epoch_id", "step_id"] + epoch_id, step_id = fluid.io.load_trainer_args( + self.dirname, serial, self.trainer_id, trainer_args) + self.assertEqual(self.step_id, step_id) + self.assertEqual(self.epoch_id, epoch_id) + + program = fluid.Program() + with fluid.program_guard(program): + exe = fluid.Executor(self.place) + fluid.io.load_checkpoint(exe, self.dirname, serial, program) + + fluid.io.clean_checkpoint(self.dirname, delete_dir=True) + + def save_checkpoint(self): + config = fluid.CheckpointConfig(self.dirname, self.max_num_checkpoints, + self.epoch_interval, self.step_interval) + + trainer_args = {} + trainer_args["epoch_id"] = self.epoch_id + trainer_args["step_id"] = self.step_id + + program = fluid.Program() + with fluid.program_guard(program): + program.global_block().create_var( + name="scale_0", + psersistable=True, + dtype="float32", + shape=[32, 32]) + + exe = fluid.Executor(self.place) + for i in xrange(10): + fluid.io.save_checkpoint( + exe, config.checkpoint_dir, self.trainer_id, self.chief, + trainer_args, program, config.max_num_checkpoints) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/codestyle/docstring_checker.pyc b/tools/codestyle/docstring_checker.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a27d3c9a8cccab8552d510578debb2df04eb53bb GIT binary patch literal 12561 zcmdT~%WoUU8J{I7N~9&rPx%oi-uOw^rX)Y&B&yDAw|-TX1?xj-N!7Warm=5bYj{N50^(C+V(2Hze+imT+b{yJug6!5>yc9NV zH>}%c|-bj7cn3o>=@Nnkurg-h%;-x9IB_yO%9{kseL9wF{k5 z(pgP9V*)px+w3Nfp1^NnrV?sO3cZ5b*n1kYj>Wa+;zyo8cGvtrf8pwU&3D_Q^{_Fv z=*MG=;h2uz%QP)JX z%ESef>E`Q%@rRw)@aX#}JkSJ40i_TbNhHf_XClMO0$BUR*40xSEj*&ytKSSK*#X5HqD5N+-*~tOdAmPz0z)XssT2wa8o|d$kuV zY>O+bR{f?QSF6XlDeg>J!&Zs|p5{7xrcf+)2pzON*==Nc8`<7QPEGpqPE~HZQXjap;KBS9)Z5cSyI%a3`ND`oLViZM`P=n*s;#+9nybV24qje+-}7YM7idHdS`*xPDk z%5E0DxGH-a#bGq>SXp^%(AsMit(}=V(MUQWoeEiA!10aneS(p|;WDJJQjZ~5hKS_V zJ_{I~p8K3w27Z@;adh{oIIAA@5&Muf4=okFt5%MyrkPAe@?!EFvkWPe^VE(QB;IM&)FRgwsrajM^2|QyuX|cAw1Z%+JAu%Bh_xau#(?mQCP&H^ot;DkHCbmRhq_mb zERy3*IJ*g;QtO;^!Xf$$_%o>A869#(2w<90a%*v?Tr;#NWuW(v7N|6rkTvXJGk!z-=hkssPg*YN0zxHRA? z^}4()-EKNt#?6Ai4<9Q_haScby(>8T1?8ieS00-E>Hu8#1<+p#2{K?(QJtM$9%Kes9_A2+RRQ4(N zoK*HJw=9(r`jcZs-VGGfp*Ge_5*(lj(^{h|-f`&>kPwSUoOl{1W_%KgkSKlA)@2n>*M3+H92UyyOJAv8rEKH!=kZi$k@k5kJ&9R@Fw6Yfd9!*1yhYKp(I zsh4G>rgey0-(+yQ(q(-DgYJiIZnhn`Ho{2u2Z>N$5x(l1! z>%5GcsP(*^xFWr?qzk_)czkMMF*82sI5E9%iWh0_GLlh(qs^B8;yKdQUlSKJi~aMt zjz|9hg|RuQ%MtlAUMS0qItcmt5U%5Cn&uV4a!Oy42IhPso+>pwd^_or_u;F-!Gi$+ zXElTQ#CKcYhA)}+OT3iZ(VwFD(1%E=F0`BC%^`AW~Tm51wQ0p`mCnuP&qm_3#v-d<`Ufnx6*&oh<9({{~(t+4=N+Wb%5^3MR&`%tKXq0ESt%M|3<1&3*uB}7XLkB<;7=U7LyZa0ic z`_6d{g>h$zC+7;Ac(yd+%$=Bn8#C@#Xm|Xj&gs6{C#2djCzPl)xcApzc1B*(x?5xe zav_mJXcz~BLJ19oh)hqZR?D22G7NBp6MPv;_j^2v3LO^ZIhYx;jzE1Lu?9@)fy#xa&T`}$3PZRc;h(_U zDhl}?67q$$2T&p>5Wyca(46`)Y!yh4jQSDY*rBE|groA3^?A>bK14Ng4sg8(xB&A8 zkSI(tWGspkAyn!@{YMlX0o@@|6BtiyyAtw2_N0Aj{0#k7IBFOzK3dmWdk%X~48e}k z*PY?HsBcqqkJn$P*+B#oxfk(h#&)2#=JqjQ25cH34QzeSS`n$-$-DAR5ta*}CXt3f zky4}M@~;_~Frp0*7gAeewuyT?x#%^$)mC)nuiS?rC(Z2_r5qAaamfE1$pnWUa4$Cs z)jd7f=ntENyIi^#O-Tmmc4H%Va#@Mw$>f5PE{3)SInH(L=}&kP{wpBHOqh;JOaB2y*AVJqyAE$jc%&1@uuqSkw!6IYjNFD$g>loswf_%>6OLgq>M*R) zOJ($?jIdr$O^yiu7tkx-7tUw$RBv;9SM+a2To|!MOy^8v>A!J#ru9@m<@yVF4nn4b zrpQq=qbSo!f%^L>Vlg{b$AthaSQ{s4df@(HdVsh{RvBiiR(3I83^NQy${jSAnb+3| zs343^ryll2N`Q#&e zZo;9w`8H!F_`DEFGO-3=q;s?UVh1+C4pmvb$+f2E-jpwe&tE+{{)gYL@Ilj2=U9FY{SOn#q;{1Uv6bQ6Z51?er5457O9WZD^0oWdh_kY ziR9ZJ99vJ=&PBkxj3uTb!pBG=m6+5y1B)q!i)@$Z8%feyqEyV)*O&ajjl5J6$P@8n!7>8nbH&Hg6BL@@;HxoP0{@ zq6YHT2tKGAMy7lpZES>!Pgn)9i1^(NecpkLQHA`r*%@WQl#Mgaf)<|n8ma2@%S}o0 zAfrw%|IY2ot-H5w&rH2jefLJCGIjGt_13h67r%Ss{=M7IbPw3M&xvSy>9xju7>HTf zLMAZs<2AXDhQ>Fdlp0}6zD#r8MBTZ{Vu}UBpUyQFbl=5?5QER58g(d~9G<+Z)upyY z9uDoy5Rd*2ihMS26`<0o+WDKs=Z!^t(NsjOU&`I6@iDD_YlxZt1Cry<7Yc(Tg^|ME z!r{VU`IJvC#<+X_s!=Y(SjFV0M>{EDsvqTm43wl0l3w|kd;uvzZElvIP}RdANWKP? y@0p!5e3LgJ_hn2!lV0JmwgrNPd?rm|;L1+yD)~TS&>BQOgn5v`Y@u)D&VK>kVxyJ- literal 0 HcmV?d00001 From 951fa7441c7eff3596735ac55dda01288870aab6 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 17:04:45 +0800 Subject: [PATCH 24/32] add checkpoint unittest --- tools/codestyle/docstring_checker.pyc | Bin 12561 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tools/codestyle/docstring_checker.pyc diff --git a/tools/codestyle/docstring_checker.pyc b/tools/codestyle/docstring_checker.pyc deleted file mode 100644 index a27d3c9a8cccab8552d510578debb2df04eb53bb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12561 zcmdT~%WoUU8J{I7N~9&rPx%oi-uOw^rX)Y&B&yDAw|-TX1?xj-N!7Warm=5bYj{N50^(C+V(2Hze+imT+b{yJug6!5>yc9NV zH>}%c|-bj7cn3o>=@Nnkurg-h%;-x9IB_yO%9{kseL9wF{k5 z(pgP9V*)px+w3Nfp1^NnrV?sO3cZ5b*n1kYj>Wa+;zyo8cGvtrf8pwU&3D_Q^{_Fv z=*MG=;h2uz%QP)JX z%ESef>E`Q%@rRw)@aX#}JkSJ40i_TbNhHf_XClMO0$BUR*40xSEj*&ytKSSK*#X5HqD5N+-*~tOdAmPz0z)XssT2wa8o|d$kuV zY>O+bR{f?QSF6XlDeg>J!&Zs|p5{7xrcf+)2pzON*==Nc8`<7QPEGpqPE~HZQXjap;KBS9)Z5cSyI%a3`ND`oLViZM`P=n*s;#+9nybV24qje+-}7YM7idHdS`*xPDk z%5E0DxGH-a#bGq>SXp^%(AsMit(}=V(MUQWoeEiA!10aneS(p|;WDJJQjZ~5hKS_V zJ_{I~p8K3w27Z@;adh{oIIAA@5&Muf4=okFt5%MyrkPAe@?!EFvkWPe^VE(QB;IM&)FRgwsrajM^2|QyuX|cAw1Z%+JAu%Bh_xau#(?mQCP&H^ot;DkHCbmRhq_mb zERy3*IJ*g;QtO;^!Xf$$_%o>A869#(2w<90a%*v?Tr;#NWuW(v7N|6rkTvXJGk!z-=hkssPg*YN0zxHRA? z^}4()-EKNt#?6Ai4<9Q_haScby(>8T1?8ieS00-E>Hu8#1<+p#2{K?(QJtM$9%Kes9_A2+RRQ4(N zoK*HJw=9(r`jcZs-VGGfp*Ge_5*(lj(^{h|-f`&>kPwSUoOl{1W_%KgkSKlA)@2n>*M3+H92UyyOJAv8rEKH!=kZi$k@k5kJ&9R@Fw6Yfd9!*1yhYKp(I zsh4G>rgey0-(+yQ(q(-DgYJiIZnhn`Ho{2u2Z>N$5x(l1! z>%5GcsP(*^xFWr?qzk_)czkMMF*82sI5E9%iWh0_GLlh(qs^B8;yKdQUlSKJi~aMt zjz|9hg|RuQ%MtlAUMS0qItcmt5U%5Cn&uV4a!Oy42IhPso+>pwd^_or_u;F-!Gi$+ zXElTQ#CKcYhA)}+OT3iZ(VwFD(1%E=F0`BC%^`AW~Tm51wQ0p`mCnuP&qm_3#v-d<`Ufnx6*&oh<9({{~(t+4=N+Wb%5^3MR&`%tKXq0ESt%M|3<1&3*uB}7XLkB<;7=U7LyZa0ic z`_6d{g>h$zC+7;Ac(yd+%$=Bn8#C@#Xm|Xj&gs6{C#2djCzPl)xcApzc1B*(x?5xe zav_mJXcz~BLJ19oh)hqZR?D22G7NBp6MPv;_j^2v3LO^ZIhYx;jzE1Lu?9@)fy#xa&T`}$3PZRc;h(_U zDhl}?67q$$2T&p>5Wyca(46`)Y!yh4jQSDY*rBE|groA3^?A>bK14Ng4sg8(xB&A8 zkSI(tWGspkAyn!@{YMlX0o@@|6BtiyyAtw2_N0Aj{0#k7IBFOzK3dmWdk%X~48e}k z*PY?HsBcqqkJn$P*+B#oxfk(h#&)2#=JqjQ25cH34QzeSS`n$-$-DAR5ta*}CXt3f zky4}M@~;_~Frp0*7gAeewuyT?x#%^$)mC)nuiS?rC(Z2_r5qAaamfE1$pnWUa4$Cs z)jd7f=ntENyIi^#O-Tmmc4H%Va#@Mw$>f5PE{3)SInH(L=}&kP{wpBHOqh;JOaB2y*AVJqyAE$jc%&1@uuqSkw!6IYjNFD$g>loswf_%>6OLgq>M*R) zOJ($?jIdr$O^yiu7tkx-7tUw$RBv;9SM+a2To|!MOy^8v>A!J#ru9@m<@yVF4nn4b zrpQq=qbSo!f%^L>Vlg{b$AthaSQ{s4df@(HdVsh{RvBiiR(3I83^NQy${jSAnb+3| zs343^ryll2N`Q#&e zZo;9w`8H!F_`DEFGO-3=q;s?UVh1+C4pmvb$+f2E-jpwe&tE+{{)gYL@Ilj2=U9FY{SOn#q;{1Uv6bQ6Z51?er5457O9WZD^0oWdh_kY ziR9ZJ99vJ=&PBkxj3uTb!pBG=m6+5y1B)q!i)@$Z8%feyqEyV)*O&ajjl5J6$P@8n!7>8nbH&Hg6BL@@;HxoP0{@ zq6YHT2tKGAMy7lpZES>!Pgn)9i1^(NecpkLQHA`r*%@WQl#Mgaf)<|n8ma2@%S}o0 zAfrw%|IY2ot-H5w&rH2jefLJCGIjGt_13h67r%Ss{=M7IbPw3M&xvSy>9xju7>HTf zLMAZs<2AXDhQ>Fdlp0}6zD#r8MBTZ{Vu}UBpUyQFbl=5?5QER58g(d~9G<+Z)upyY z9uDoy5Rd*2ihMS26`<0o+WDKs=Z!^t(NsjOU&`I6@iDD_YlxZt1Cry<7Yc(Tg^|ME z!r{VU`IJvC#<+X_s!=Y(SjFV0M>{EDsvqTm43wl0l3w|kd;uvzZElvIP}RdANWKP? y@0p!5e3LgJ_hn2!lV0JmwgrNPd?rm|;L1+yD)~TS&>BQOgn5v`Y@u)D&VK>kVxyJ- From 3b5e3f9be4b97f15aac809b851cb328bbf424437 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 18:05:06 +0800 Subject: [PATCH 25/32] update checkpoint unittest --- python/paddle/fluid/tests/unittests/test_checkpoint.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_checkpoint.py b/python/paddle/fluid/tests/unittests/test_checkpoint.py index b8d82c59b4e2c..150e8822d577b 100644 --- a/python/paddle/fluid/tests/unittests/test_checkpoint.py +++ b/python/paddle/fluid/tests/unittests/test_checkpoint.py @@ -14,6 +14,7 @@ import paddle.fluid as fluid import unittest +import os class TestCheckpoint(unittest.TestCase): @@ -35,8 +36,8 @@ def test_checkpoint(self): trainer_args = ["epoch_id", "step_id"] epoch_id, step_id = fluid.io.load_trainer_args( self.dirname, serial, self.trainer_id, trainer_args) - self.assertEqual(self.step_id, step_id) - self.assertEqual(self.epoch_id, epoch_id) + self.assertEqual(self.step_id, int(step_id)) + self.assertEqual(self.epoch_id, int(epoch_id)) program = fluid.Program() with fluid.program_guard(program): @@ -44,6 +45,7 @@ def test_checkpoint(self): fluid.io.load_checkpoint(exe, self.dirname, serial, program) fluid.io.clean_checkpoint(self.dirname, delete_dir=True) + self.assertFalse(os.path.isdir(self.dirname)) def save_checkpoint(self): config = fluid.CheckpointConfig(self.dirname, self.max_num_checkpoints, From 6db240d78b3b515a1b2d885e8cc6d8e0b2ffd638 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 19:25:55 +0800 Subject: [PATCH 26/32] update trainer about epoch_id and step id --- python/paddle/fluid/trainer.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index fbdd28f53efcd..4ffc2064581b7 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -188,7 +188,7 @@ def __init__(self, if not self.checkpoint.is_pserver: epoch_id, step_id = io.load_trainer_args( self.checkpoint.checkpoint_dir, self.checkpoint.load_serial, - self.trainer_id, ["epoch_id", "step_id"]) + self.trainer_id, self._get_checkpoint_load_args()) self.checkpoint.epoch_id = int(epoch_id) self.checkpoint.step_id = int(step_id) @@ -432,22 +432,33 @@ def _clean_checkpoint(self): return io.clean_checkpoint(checkpoint_dir=self.checkpoint.checkpoint_dir) + def _get_checkpoint_load_args(self): + """ + epoch_id and step_id are runtime arguments, they are not variables, will load them independently. + """ + return ["epoch_id", "step_id"] + + def _get_checkpoint_save_args(self, epoch_id, step_id): + """ + epoch_id and step_id are runtime arguments, they are not variables, will save them independently. + """ + trainer_args = {} + trainer_args["epoch_id"] = epoch_id + trainer_args["step_id"] = step_id + return trainer_args + def _save_checkpoint(self, epoch_id, step_id): if not self.checkpoint: return if epoch_id % self.checkpoint.epoch_interval == 0 and step_id % self.checkpoint.step_interval == 0: - trainer_args = {} - trainer_args["epoch_id"] = epoch_id - trainer_args["step_id"] = step_id - exe = executor.Executor(self.place) io.save_checkpoint( executor=exe, checkpoint_dir=self.checkpoint.checkpoint_dir, trainer_id=self.trainer_id, is_chief=self.chief, - trainer_args=trainer_args, + trainer_args=self._get_checkpoint_save_args(epoch_id, step_id), main_program=self.train_program, max_num_checkpoints=self.checkpoint.max_num_checkpoints) From f28f41dbcdb0479d98682b94eb13db95112de424 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 19:40:41 +0800 Subject: [PATCH 27/32] update io.py annotations and codes --- python/paddle/fluid/io.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 5abadc73f76b5..8fcc7787091b4 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -483,11 +483,11 @@ def save_checkpoint(executor, :param main_program :param max_num_checkpoints """ - if checkpoint_dir is None: - raise ValueError("The values of 'checkpoint_dir' should not be None") + if checkpoint_dir.strip() is None: + raise ValueError("'checkpoint_dir' should not be None") - if trainer_args and not isinstance(trainer_args, dict): - raise TypeError("The type of 'trainer_args' should be dict") + if trainer_args: + assert isinstance(trainer_args, dict) if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) @@ -514,11 +514,11 @@ def load_checkpoint(executor, checkpoint_dir, serial, main_program): :param main_program """ - if checkpoint_dir is None: - raise ValueError("The values of 'checkpoint_dir' should not be None") + if checkpoint_dir.strip() is None: + raise ValueError("'checkpoint_dir' should not be None") if serial is None or serial < 0: - raise ValueError("The values of 'serial' should not be None or <0 ") + raise ValueError("'serial' should not be None or <0 ") if main_program is None: raise ValueError('main_program should not be None.') @@ -536,8 +536,8 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): :param delete_dir """ - if checkpoint_dir is None: - raise ValueError("The values of 'checkpoint_dir' should not be None") + if checkpoint_dir.strip() is None: + raise ValueError("'checkpoint_dir' should not be None") _lru_delete(checkpoint_dir, max_num_checkpoints=0) if delete_dir and not os.listdir(checkpoint_dir): @@ -590,8 +590,8 @@ def save_persist_vars_without_grad(executor, dirname, program): def save_trainer_args(dirname, trainer_id, trainer_args): - if not isinstance(trainer_args, dict): - raise TypeError("The type of 'trainer_args' should be dict") + assert isinstance(trainer_args, dict) + cur_dir = _get_trainer_dir(dirname, trainer_id) for name, value in trainer_args.iteritems(): @@ -602,12 +602,11 @@ def save_trainer_args(dirname, trainer_id, trainer_args): def load_trainer_args(checkpoint_dir, serial, trainer_id, trainer_args): + assert isinstance(trainer_args, list) + cur_dir = _get_serial_dir(checkpoint_dir, serial) cur_dir = _get_trainer_dir(cur_dir, trainer_id) - if not isinstance(trainer_args, list): - raise TypeError("The type of 'trainer_args' should be list") - ret_values = [] for arg in trainer_args: From 53409a29d889903ec1414d72f0455fe4ef6588a6 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Tue, 5 Jun 2018 22:00:30 +0800 Subject: [PATCH 28/32] code optimized --- python/paddle/fluid/trainer.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 4ffc2064581b7..9882d5cda04d2 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -141,14 +141,10 @@ def __init__(self, self.chief = True self.checkpoint = checkpoint_config if self.checkpoint: - if not isinstance(self.checkpoint, CheckpointConfig): - raise TypeError( - "The checkpoint_config shoule be an instance of CheckpointConfig" - ) - else: - serial = io.get_latest_checkpoint_serial( - self.checkpoint.checkpoint_dir) - self.checkpoint.load_serial = serial if serial >= 0 else None + assert isinstance(self.checkpoint, CheckpointConfig) + serial = io.get_latest_checkpoint_serial( + self.checkpoint.checkpoint_dir) + self.checkpoint.load_serial = serial if serial >= 0 else None self.scope = core.Scope() @@ -385,8 +381,8 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): else: metrics = exe.run(feed=data, fetch_list=[]) - event_handler(EndStepEvent(epoch_id, step_id, metrics)) self._save_checkpoint(epoch_id, step_id) + event_handler(EndStepEvent(epoch_id, step_id, metrics)) event_handler(EndEpochEvent(epoch_id)) self._clean_checkpoint() From 2f44585e831578b58b53ce5d4b6adcb0275530ce Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 6 Jun 2018 17:26:51 +0800 Subject: [PATCH 29/32] code optimized --- python/paddle/fluid/io.py | 42 +++++++------ .../fluid/tests/unittests/test_checkpoint.py | 3 +- python/paddle/fluid/trainer.py | 60 +++++++++---------- 3 files changed, 52 insertions(+), 53 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 8fcc7787091b4..34c527b62f4ee 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -476,14 +476,14 @@ def save_checkpoint(executor, to keep numbers of checkpoint directory, the numbers of checkpoint directory are max_num_checkpoints at most, The interval between two saved checkpoints must greater than save_interval_secs. - :param executor - :param checkpoint_dir - :param trainer_id - :param is_chief - :param main_program - :param max_num_checkpoints - """ - if checkpoint_dir.strip() is None: + :param executor executor for save the value + :param checkpoint_dir the checkpoint directory + :param trainer_id currect trainer id + :param is_chief if the trainer id equals 0, the is_chief will be true + :param main_program will save all variables in program + :param max_num_checkpoints will keep numbers of checkpoint serials not bigger than max_num_checkpoints + """ + if checkpoint_dir is None: raise ValueError("'checkpoint_dir' should not be None") if trainer_args: @@ -500,7 +500,7 @@ def save_checkpoint(executor, if is_chief: save_persist_vars_without_grad(executor, cur_dir, main_program) - _lru_delete(checkpoint_dir, max_num_checkpoints) + _scroll_delete(checkpoint_dir, max_num_checkpoints) def load_checkpoint(executor, checkpoint_dir, serial, main_program): @@ -508,13 +508,13 @@ def load_checkpoint(executor, checkpoint_dir, serial, main_program): Load checkpoint from a directory by executor, it will find the most recent saved checkpoint file and load it auto. - :param executor - :param checkpoint_dir - :param serial - :param main_program + :param executor executor for load the value + :param checkpoint_dir the checkpoint directory + :param serial the serial folder in checkpoint directory will be load + :param main_program will load all variables in program """ - if checkpoint_dir.strip() is None: + if checkpoint_dir is None: raise ValueError("'checkpoint_dir' should not be None") if serial is None or serial < 0: @@ -536,9 +536,9 @@ def clean_checkpoint(checkpoint_dir, delete_dir=False): :param delete_dir """ - if checkpoint_dir.strip() is None: + if checkpoint_dir is None: raise ValueError("'checkpoint_dir' should not be None") - _lru_delete(checkpoint_dir, max_num_checkpoints=0) + _scroll_delete(checkpoint_dir, max_num_checkpoints=0) if delete_dir and not os.listdir(checkpoint_dir): os.rmdir(checkpoint_dir) @@ -681,7 +681,7 @@ def _get_trainer_dir(dirname, trainer_id): return trainer_dir -def _lru_delete(dirname, max_num_checkpoints=3): +def _scroll_delete(dirname, max_num_checkpoints=3): dirs = os.listdir(dirname) serial_map = {} for serial in dirs: @@ -717,7 +717,7 @@ def get_latest_checkpoint_serial(checkpoint_dir): :param checkpoint_dir """ - if not checkpoint_dir.strip(): + if not checkpoint_dir: return -1 def has_success(checkpoint_dir, cur_dir): @@ -726,10 +726,8 @@ def has_success(checkpoint_dir, cur_dir): """ serial = _get_dir_serial(cur_dir) - if serial == -1: - return -1 - - if not os.path.isdir(os.path.join(checkpoint_dir, cur_dir)): + if serial == -1 or not os.path.isdir( + os.path.join(checkpoint_dir, cur_dir)): return -1 success_path = os.path.join( diff --git a/python/paddle/fluid/tests/unittests/test_checkpoint.py b/python/paddle/fluid/tests/unittests/test_checkpoint.py index 150e8822d577b..cf70dfd448363 100644 --- a/python/paddle/fluid/tests/unittests/test_checkpoint.py +++ b/python/paddle/fluid/tests/unittests/test_checkpoint.py @@ -15,11 +15,12 @@ import paddle.fluid as fluid import unittest import os +import tempfile class TestCheckpoint(unittest.TestCase): def setUp(self): - self.dirname = "/tmp/ckpt" + self.dirname = tempfile.mktemp() self.max_num_checkpoints = 3 self.epoch_interval = 1 self.step_interval = 1 diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 9882d5cda04d2..e5cec4c76af2f 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -132,19 +132,18 @@ def __init__(self, # 1. we need to generate a framework.Program by calling # program_func. Reference: fluid.program_guard in # test_word2vec.py - if not isinstance(optimizer, opt_module.Optimizer): - raise TypeError("The optimizer should be an instance of Optimizer") + assert isinstance(optimizer, opt_module.Optimizer) # config for checkpoint # only chief worker will save variables self.trainer_id = 0 self.chief = True - self.checkpoint = checkpoint_config - if self.checkpoint: - assert isinstance(self.checkpoint, CheckpointConfig) + self.checkpoint_cfg = checkpoint_config + if self.checkpoint_cfg: + assert isinstance(self.checkpoint_cfg, CheckpointConfig) serial = io.get_latest_checkpoint_serial( - self.checkpoint.checkpoint_dir) - self.checkpoint.load_serial = serial if serial >= 0 else None + self.checkpoint_cfg.checkpoint_dir) + self.checkpoint_cfg.load_serial = serial if serial >= 0 else None self.scope = core.Scope() @@ -174,19 +173,20 @@ def __init__(self, exe = executor.Executor(place) exe.run(self.startup_program) - if self.checkpoint and self.checkpoint.load_serial: + if self.checkpoint_cfg and self.checkpoint_cfg.load_serial: with self._prog_and_scope_guard(): exe = executor.Executor(place) - io.load_checkpoint(exe, self.checkpoint.checkpoint_dir, - self.checkpoint.load_serial, + io.load_checkpoint(exe, self.checkpoint_cfg.checkpoint_dir, + self.checkpoint_cfg.load_serial, self.startup_program) - if not self.checkpoint.is_pserver: + if not self.checkpoint_cfg.is_pserver: epoch_id, step_id = io.load_trainer_args( - self.checkpoint.checkpoint_dir, self.checkpoint.load_serial, - self.trainer_id, self._get_checkpoint_load_args()) - self.checkpoint.epoch_id = int(epoch_id) - self.checkpoint.step_id = int(step_id) + self.checkpoint_cfg.checkpoint_dir, + self.checkpoint_cfg.load_serial, self.trainer_id, + self._get_checkpoint_load_args()) + self.checkpoint_cfg.epoch_id = int(epoch_id) + self.checkpoint_cfg.step_id = int(step_id) if param_path and os.path.isdir(param_path): # load params from param_path into scope @@ -256,7 +256,7 @@ def _dist_transpile_if_necessary(self, optimize_ops, params_grads): t.transpile( self.trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": - if self.checkpoint: + if self.checkpoint_cfg: self.is_pserver = True self.train_program = t.get_pserver_program(current_endpoint) @@ -351,10 +351,10 @@ def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): self._train_by_any_executor(event_handler, exe, num_epochs, reader) def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): - if self.checkpoint: + if self.checkpoint_cfg: epochs = [ epoch_id for epoch_id in range(num_epochs) - if epoch_id >= self.checkpoint.epoch_id + if epoch_id >= self.checkpoint_cfg.epoch_id ] else: epochs = [epoch_id for epoch_id in range(num_epochs)] @@ -366,8 +366,8 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): self._clean_checkpoint() return - if self.checkpoint and self.checkpoint.load_serial \ - and self.checkpoint.step_id >= step_id and self.checkpoint.epoch_id == epoch_id: + if self.checkpoint_cfg and self.checkpoint_cfg.load_serial \ + and self.checkpoint_cfg.step_id >= step_id and self.checkpoint_cfg.epoch_id == epoch_id: continue begin_event = BeginStepEvent(epoch_id, step_id) @@ -381,10 +381,12 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): else: metrics = exe.run(feed=data, fetch_list=[]) - self._save_checkpoint(epoch_id, step_id) + if self.checkpoint_cfg: + self._save_checkpoint(epoch_id, step_id) event_handler(EndStepEvent(epoch_id, step_id, metrics)) event_handler(EndEpochEvent(epoch_id)) - self._clean_checkpoint() + if self.checkpoint_cfg: + self._clean_checkpoint() def _test_by_executor(self, reader, feed_order, fetch_list): with executor.scope_guard(self.scope): @@ -424,9 +426,8 @@ def _get_or_create_parallel_executor(self): return self._get_parallel_executor() def _clean_checkpoint(self): - if not self.checkpoint: - return - io.clean_checkpoint(checkpoint_dir=self.checkpoint.checkpoint_dir) + assert self.checkpoint_cfg + io.clean_checkpoint(checkpoint_dir=self.checkpoint_cfg.checkpoint_dir) def _get_checkpoint_load_args(self): """ @@ -444,19 +445,18 @@ def _get_checkpoint_save_args(self, epoch_id, step_id): return trainer_args def _save_checkpoint(self, epoch_id, step_id): - if not self.checkpoint: - return + assert self.checkpoint_cfg - if epoch_id % self.checkpoint.epoch_interval == 0 and step_id % self.checkpoint.step_interval == 0: + if epoch_id % self.checkpoint_cfg.epoch_interval == 0 and step_id % self.checkpoint_cfg.step_interval == 0: exe = executor.Executor(self.place) io.save_checkpoint( executor=exe, - checkpoint_dir=self.checkpoint.checkpoint_dir, + checkpoint_dir=self.checkpoint_cfg.checkpoint_dir, trainer_id=self.trainer_id, is_chief=self.chief, trainer_args=self._get_checkpoint_save_args(epoch_id, step_id), main_program=self.train_program, - max_num_checkpoints=self.checkpoint.max_num_checkpoints) + max_num_checkpoints=self.checkpoint_cfg.max_num_checkpoints) def build_feed_var_list(program, feed_order): From 7fbddaa64a086d1cd9bf3a9811b2b153918ed84a Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 6 Jun 2018 20:41:21 +0800 Subject: [PATCH 30/32] bug fix --- python/paddle/fluid/trainer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 444162664daeb..5230ded7db1ff 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -365,7 +365,8 @@ def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): if self.__stop: - self._clean_checkpoint() + if self.checkpoint_cfg: + self._clean_checkpoint() return if self.checkpoint_cfg and self.checkpoint_cfg.load_serial \ From 9e026a93cff29f1d49fac900b3110968da8594cf Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 7 Jun 2018 16:59:53 +0800 Subject: [PATCH 31/32] remove chief --- python/paddle/fluid/io.py | 6 ++---- python/paddle/fluid/trainer.py | 5 +---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 34c527b62f4ee..6323c9899e008 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -466,7 +466,6 @@ def get_parameter_value_by_name(name, executor, program=None): def save_checkpoint(executor, checkpoint_dir, trainer_id, - is_chief=False, trainer_args=None, main_program=None, max_num_checkpoints=3): @@ -478,8 +477,7 @@ def save_checkpoint(executor, :param executor executor for save the value :param checkpoint_dir the checkpoint directory - :param trainer_id currect trainer id - :param is_chief if the trainer id equals 0, the is_chief will be true + :param trainer_id currect trainer id, if id is equal to 0, the trainer is chief :param main_program will save all variables in program :param max_num_checkpoints will keep numbers of checkpoint serials not bigger than max_num_checkpoints """ @@ -497,7 +495,7 @@ def save_checkpoint(executor, save_trainer_args(cur_dir, trainer_id, trainer_args) - if is_chief: + if trainer_id == 0: save_persist_vars_without_grad(executor, cur_dir, main_program) _scroll_delete(checkpoint_dir, max_num_checkpoints) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 5230ded7db1ff..2737f1c70d6d3 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -136,7 +136,6 @@ def __init__(self, # config for checkpoint # only chief worker will save variables self.trainer_id = 0 - self.chief = True self.checkpoint_cfg = checkpoint_config if self.checkpoint_cfg: assert isinstance(self.checkpoint_cfg, CheckpointConfig) @@ -201,7 +200,6 @@ def _transpile_nccl2_dist(self): self.nccl_id_var = None else: self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) - self.chief = self.trainer_id == 0 port = os.getenv("PADDLE_PSERVER_PORT") worker_ips = os.getenv("PADDLE_TRAINER_IPS") worker_endpoints = [] @@ -250,7 +248,7 @@ def _dist_transpile_if_necessary(self, optimize_ops, params_grads): # the unique trainer id, starting from 0, needed by trainer # only self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - self.chief = self.trainer_id == 0 + # the role, should be either PSERVER or TRAINER training_role = os.getenv("PADDLE_TRAINING_ROLE") with self._prog_and_scope_guard(): @@ -456,7 +454,6 @@ def _save_checkpoint(self, epoch_id, step_id): executor=exe, checkpoint_dir=self.checkpoint_cfg.checkpoint_dir, trainer_id=self.trainer_id, - is_chief=self.chief, trainer_args=self._get_checkpoint_save_args(epoch_id, step_id), main_program=self.train_program, max_num_checkpoints=self.checkpoint_cfg.max_num_checkpoints) From 5c8397a88fe6b062be0c0725bbd14a2c8d4fc2e9 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Fri, 8 Jun 2018 19:49:10 +0800 Subject: [PATCH 32/32] remove chief in test --- python/paddle/fluid/tests/unittests/test_checkpoint.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_checkpoint.py b/python/paddle/fluid/tests/unittests/test_checkpoint.py index cf70dfd448363..e22400a045ced 100644 --- a/python/paddle/fluid/tests/unittests/test_checkpoint.py +++ b/python/paddle/fluid/tests/unittests/test_checkpoint.py @@ -66,9 +66,9 @@ def save_checkpoint(self): exe = fluid.Executor(self.place) for i in xrange(10): - fluid.io.save_checkpoint( - exe, config.checkpoint_dir, self.trainer_id, self.chief, - trainer_args, program, config.max_num_checkpoints) + fluid.io.save_checkpoint(exe, config.checkpoint_dir, + self.trainer_id, trainer_args, program, + config.max_num_checkpoints) if __name__ == '__main__':