From 97901beb9f6ebb2cfb1fc7d18025df6bbff366b7 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 26 Oct 2017 15:35:28 +0800 Subject: [PATCH 1/2] fix train ft --- demo/word2vec/{train.py => train_ft.py} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename demo/word2vec/{train.py => train_ft.py} (98%) diff --git a/demo/word2vec/train.py b/demo/word2vec/train_ft.py similarity index 98% rename from demo/word2vec/train.py rename to demo/word2vec/train_ft.py index 429512ec..df02cce3 100644 --- a/demo/word2vec/train.py +++ b/demo/word2vec/train_ft.py @@ -14,7 +14,7 @@ # then you can use different size of embedding. # NOTE: must change this to your own username on paddlecloud. -USERNAME = "wuyi05@baidu.com" +USERNAME = "your-username" DC = os.getenv("PADDLE_CLOUD_CURRENT_DATACENTER") common.DATA_HOME = "/pfs/%s/home/%s" % (DC, USERNAME) TRAIN_FILES_PATH = os.path.join(common.DATA_HOME, "imikolov") @@ -63,7 +63,7 @@ def cluster_reader_recordio_from_master(etcd_endpoints): then read from cloud storage. ''' TRAIN_FILES_PATTERN = os.path.join(TRAIN_FILES_PATH, "train-*") - return paddle.reader.creator.cloud_reader(TRAIN_FILES_PATTERN, etcd_endpoints) + return paddle.reader.creator.cloud_reader([TRAIN_FILES_PATTERN], etcd_endpoints) def wordemb(inlayer): From 1b8d9aacaf66aa8abe922207992997c22ee4d1ae Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 26 Oct 2017 17:26:51 +0800 Subject: [PATCH 2/2] update w2v demo --- demo/word2vec/train.py | 145 ++++++++++++++++++++++++++++++++++++++ demo/word2vec/train_ft.py | 56 ++++----------- 2 files changed, 159 insertions(+), 42 deletions(-) create mode 100644 demo/word2vec/train.py diff --git a/demo/word2vec/train.py b/demo/word2vec/train.py new file mode 100644 index 00000000..9f3d4186 --- /dev/null +++ b/demo/word2vec/train.py @@ -0,0 +1,145 @@ +import math +import pickle +import glob +import os +import sys +import paddle.v2 as paddle +import paddle.v2.dataset.common as common + +embsize = 32 +hiddensize = 256 +N = 5 + +# NOTE: You need to generate and split dataset then put it under your cloud storage. +# then you can use different size of embedding. + +# NOTE: must change this to your own username on paddlecloud. +USERNAME = "your-username" +DC = os.getenv("PADDLE_CLOUD_CURRENT_DATACENTER") +common.DATA_HOME = "/pfs/%s/home/%s" % (DC, USERNAME) +TRAIN_FILES_PATH = os.path.join(common.DATA_HOME, "imikolov", "imikolov_train-*") +WORD_DICT_PATH = os.path.join(common.DATA_HOME, "imikolov/word_dict.pickle") + +TRAINER_ID = int(os.getenv("PADDLE_INIT_TRAINER_ID", "-1")) +TRAINER_COUNT = int(os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS", "-1")) + +def prepare_dataset(): + word_dict = paddle.dataset.imikolov.build_dict() + with open(WORD_DICT_PATH, "w") as fn: + pickle.dump(word_dict, fn) + # NOTE: convert should be done by other job. + +def cluster_reader_recordio(trainer_id, trainer_count): + ''' + read from cloud dataset which is stored as recordio format + each trainer will read a subset of files of the whole dataset. + ''' + import recordio + def reader(): + file_list = glob.glob(TRAIN_FILES_PATH) + file_list.sort() + my_file_list = [] + # read files for current trainer_id + for idx, f in enumerate(file_list): + if idx % trainer_count == trainer_id: + my_file_list.append(f) + for f in my_file_list: + print "processing ", f + reader = recordio.reader(f) + record_raw = reader.read() + while record_raw: + yield pickle.loads(record_raw) + record_raw = reader.read() + reader.close() + return reader + +def wordemb(inlayer): + wordemb = paddle.layer.table_projection( + input=inlayer, + size=embsize, + param_attr=paddle.attr.Param( + name="_proj", + initial_std=0.001, + learning_rate=1, + l2_rate=0, )) + return wordemb + + +def main(): + paddle.init(use_gpu=False, trainer_count=1) + # load dict from cloud file + with open(WORD_DICT_PATH) as fn: + word_dict = pickle.load(fn) + dict_size = len(word_dict) + firstword = paddle.layer.data( + name="firstw", type=paddle.data_type.integer_value(dict_size)) + secondword = paddle.layer.data( + name="secondw", type=paddle.data_type.integer_value(dict_size)) + thirdword = paddle.layer.data( + name="thirdw", type=paddle.data_type.integer_value(dict_size)) + fourthword = paddle.layer.data( + name="fourthw", type=paddle.data_type.integer_value(dict_size)) + nextword = paddle.layer.data( + name="fifthw", type=paddle.data_type.integer_value(dict_size)) + + Efirst = wordemb(firstword) + Esecond = wordemb(secondword) + Ethird = wordemb(thirdword) + Efourth = wordemb(fourthword) + + contextemb = paddle.layer.concat(input=[Efirst, Esecond, Ethird, Efourth]) + hidden1 = paddle.layer.fc( + input=contextemb, + size=hiddensize, + act=paddle.activation.Sigmoid(), + layer_attr=paddle.attr.Extra(drop_rate=0.5), + bias_attr=paddle.attr.Param(learning_rate=2), + param_attr=paddle.attr.Param( + initial_std=1. / math.sqrt(embsize * 8), learning_rate=1)) + predictword = paddle.layer.fc( + input=hidden1, + size=dict_size, + bias_attr=paddle.attr.Param(learning_rate=2), + act=paddle.activation.Softmax()) + + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + if event.batch_id % 100 == 0: + result = trainer.test( + paddle.batch( + # NOTE: if you're going to use cluster test files, + # prepare them on the storage first + paddle.dataset.imikolov.test(word_dict, N), 32)) + print "Pass %d, Batch %d, Cost %f, %s, Testing metrics %s" % ( + event.pass_id, event.batch_id, event.cost, event.metrics, + result.metrics) + + cost = paddle.layer.classification_cost(input=predictword, label=nextword) + parameters = paddle.parameters.create(cost) + adam_optimizer = paddle.optimizer.Adam( + learning_rate=3e-3, + regularization=paddle.optimizer.L2Regularization(8e-4)) + trainer = paddle.trainer.SGD(cost, + parameters, + adam_optimizer) + + trainer.train( + paddle.batch(cluster_reader_recordio(TRAINER_ID, TRAINER_COUNT), 32), + num_passes=30, + event_handler=event_handler) + + +if __name__ == '__main__': + usage = "python train.py [prepare|train]" + if len(sys.argv) != 2: + print usage + exit(1) + + if TRAINER_ID == -1 or TRAINER_COUNT == -1: + print "no cloud environ found, must run on cloud" + exit(1) + + if sys.argv[1] == "prepare": + prepare_dataset() + elif sys.argv[1] == "train": + main() diff --git a/demo/word2vec/train_ft.py b/demo/word2vec/train_ft.py index df02cce3..a6737dab 100644 --- a/demo/word2vec/train_ft.py +++ b/demo/word2vec/train_ft.py @@ -5,6 +5,7 @@ import sys import paddle.v2 as paddle import paddle.v2.dataset.common as common +from paddle.reader.creator import cloud_reader embsize = 32 hiddensize = 256 @@ -17,54 +18,21 @@ USERNAME = "your-username" DC = os.getenv("PADDLE_CLOUD_CURRENT_DATACENTER") common.DATA_HOME = "/pfs/%s/home/%s" % (DC, USERNAME) -TRAIN_FILES_PATH = os.path.join(common.DATA_HOME, "imikolov") +TRAIN_FILES_PATH = os.path.join(common.DATA_HOME, "imikolov", "imikolov_train-*") WORD_DICT_PATH = os.path.join(common.DATA_HOME, "imikolov/word_dict.pickle") TRAINER_ID = int(os.getenv("PADDLE_INIT_TRAINER_ID", "-1")) TRAINER_COUNT = int(os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS", "-1")) +etcd_ip = os.getenv("ETCD_IP") +etcd_endpoint = "http://" + etcd_ip + ":" + "2379" +trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID", "-1")) + def prepare_dataset(): word_dict = paddle.dataset.imikolov.build_dict() with open(WORD_DICT_PATH, "w") as fn: pickle.dump(word_dict, fn) - # convert will also split the dataset by line-count - common.convert(TRAIN_FILES_PATH, - paddle.dataset.imikolov.train(word_dict, N), - 1000, "train") - -def cluster_reader_recordio(trainer_id, trainer_count): - ''' - read from cloud dataset which is stored as recordio format - each trainer will read a subset of files of the whole dataset. - ''' - import recordio - def reader(): - TRAIN_FILES_PATTERN = os.path.join(TRAIN_FILES_PATH, "train-*") - file_list = glob.glob(TRAIN_FILES_PATTERN) - file_list.sort() - my_file_list = [] - # read files for current trainer_id - for idx, f in enumerate(file_list): - if idx % trainer_count == trainer_id: - my_file_list.append(f) - for f in my_file_list: - print "processing ", f - reader = recordio.reader(f) - record_raw = reader.read() - while record_raw: - yield pickle.loads(record_raw) - record_raw = reader.read() - reader.close() - return reader - -def cluster_reader_recordio_from_master(etcd_endpoints): - ''' - call paddle master's RPC to get recordio metadata, - then read from cloud storage. - ''' - TRAIN_FILES_PATTERN = os.path.join(TRAIN_FILES_PATH, "train-*") - return paddle.reader.creator.cloud_reader([TRAIN_FILES_PATTERN], etcd_endpoints) - + # NOTE: convert should be done by other job. def wordemb(inlayer): wordemb = paddle.layer.table_projection( @@ -132,10 +100,14 @@ def event_handler(event): adam_optimizer = paddle.optimizer.Adam( learning_rate=3e-3, regularization=paddle.optimizer.L2Regularization(8e-4)) - trainer = paddle.trainer.SGD(cost, parameters, adam_optimizer) + trainer = paddle.trainer.SGD(cost, + parameters, + adam_optimizer, + is_local=False, + pserver_spec=etcd_endpoint, + use_etcd=True) trainer.train( - # NOTE: use either cluster_reader_recordio or cluster_reader_recordio_from_master - paddle.batch(cluster_reader_recordio(TRAINER_ID, TRAINER_COUNT), 32), + paddle.batch(cloud_reader([TRAIN_FILES_PATH], etcd_endpoint), 32), num_passes=30, event_handler=event_handler)