-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experiment scripts #447
Experiment scripts #447
Changes from 5 commits
f26a8d1
add7e13
a345063
2b50765
ea08eca
05e2e64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
nohup.out | ||
*.log | ||
out/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
#!/bin/bash | ||
DEFAULT_JOBNAME_PREFIX="mnist" | ||
CPU="10" | ||
MEMORY="8Gi" | ||
PSCPU="6" | ||
PSMEMORY="5Gi" | ||
JOB_COUNT=${JOB_COUNT:-1} | ||
FAULT_TOLERANT=${FAULT_TOLERANT:-OFF} | ||
PASSES=${PASSES:-1} | ||
DETAILS=${DETAILS:-OFF} | ||
|
||
function submit_general_job() { | ||
paddlecloud submit -jobname $1 \ | ||
-cpu $CPU \ | ||
-gpu 0 \ | ||
-memory $MEMORY \ | ||
-parallelism 20 \ | ||
-pscpu $PSCPU \ | ||
-pservers 10 \ | ||
-psmemory $PSMEMORY \ | ||
-entry "python ./train.py train" \ | ||
./mnist | ||
} | ||
|
||
function submit_ft_job() { | ||
paddlecloud submit -jobname $1 \ | ||
-cpu $CPU \ | ||
-gpu 0 \ | ||
-memory $MEMORY \ | ||
-parallelism 2 \ | ||
-pscpu $PSCPU \ | ||
-pservers 10 \ | ||
-psmemory $PSMEMORY \ | ||
-entry "python ./train_ft.py train" \ | ||
-faulttolerant \ | ||
./mnist | ||
sleep 2 | ||
cat k8s/trainingjob.yaml.tmpl | sed "s/<jobname>/$1/g" | kubectl create -f - | ||
} | ||
|
||
function usage() { | ||
echo "usage: control_case1.sh <action>" | ||
echo " action[required]: str[start|stop], will start or stop all the jobs." | ||
echo "env var:" | ||
echo " JOB_COUNT[optional]: int, The number of submiting jobs, defualt is 1." | ||
echo " FAULT_TOLERANT[optional]: str[ON|OFF], whether a fault-tolerant job,\ | ||
default is OFF." | ||
echo " PASSES[optional]: int, The number of run passes." | ||
echo " DETAILS[optional: str[ON|OFF], print detail monitor information." | ||
} | ||
|
||
function start() { | ||
echo "JOB_COUNT: "$JOB_COUNT | ||
echo "FAULT_TOLERANT: "$FAULT_TOLERANT | ||
echo "PASSES: "$PASSES | ||
echo "DETAILS: "$DETAILS | ||
# Following https://apple.stackexchange.com/a/193156, | ||
# we need to set the envrionment var for MacOS | ||
if [ $(uname) == "Darwin" ] | ||
then | ||
export PATH=/usr/local/opt/coreutils/libexec/gnubin:$PATH | ||
fi | ||
rm -rf ./out > /dev/null | ||
mkdir ./out > /dev/null | ||
rm -f ./experiment.log > /dev/null | ||
for ((pass=0; pass<$PASSES; pass++)) | ||
do | ||
echo "Run pass "$pass | ||
PASSE_NUM=$pass FAULT_TOLERANT=$FAULT_TOLERANT JOB_COUNT=$JOB_COUNT \ | ||
stdbuf -oL nohup python python/main.py run_case1 &> ./out/pass$pass.log & | ||
|
||
for ((j=0; j<$JOB_COUNT; j++)) | ||
do | ||
if [ "$FAULT_TOLERANT" == "ON" ] | ||
then | ||
submit_ft_job $DEFAULT_JOBNAME_PREFIX$j $JOB_COUNT | ||
else | ||
submit_general_job $DEFAULT_JOBNAME_PREFIX$j $JOB_COUNT | ||
fi | ||
sleep 2 | ||
done | ||
# waiting for all jobs finished | ||
python python/main.py wait_for_finished | ||
# stop all jobs | ||
stop | ||
# waiting for all jobs have been cleaned | ||
python python/main.py wait_for_cleaned | ||
# waiting for the data collector exit | ||
while true | ||
do | ||
FILE=./out/$DEFAULT_JOBNAME_PREFIX-pass$pass | ||
if [ ! -f $FILE ]; then | ||
echo "waiting for collector exit, generated file " $FILE | ||
sleep 5 | ||
fi | ||
break | ||
done | ||
done | ||
python python/main.py generate_report | ||
rm -f ./out/%DEFAULT_JOBNAME_PREFIX-pass* | ||
|
||
} | ||
|
||
function stop() { | ||
for ((i=0; i<$JOB_COUNT; i++)) | ||
do | ||
echo "kill" $DEFAULT_JOBNAME_PREFIX$i | ||
paddlecloud kill $DEFAULT_JOBNAME_PREFIX$i | ||
if [ "$FAULT_TOLERANT" == "ON" ] | ||
then | ||
cat k8s/trainingjob.yaml.tmpl | sed "s/<jobname>/$DEFAULT_JOBNAME_PREFIX$i/g" | kubectl delete -f - | ||
fi | ||
sleep 2 | ||
done | ||
} | ||
|
||
ACTION=${1} | ||
|
||
case $ACTION in | ||
start) | ||
start | ||
;; | ||
stop) | ||
stop | ||
;; | ||
--help) | ||
usage | ||
;; | ||
*) | ||
usage | ||
;; | ||
esac |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
apiVersion: paddlepaddle.org/v1 | ||
kind: TrainingJob | ||
metadata: | ||
name: <jobname> | ||
spec: | ||
image: "paddlepaddle/paddlecloud-job" | ||
port: 7164 | ||
ports_num: 1 | ||
ports_num_for_sparse: 1 | ||
fault_tolerant: true | ||
trainer: | ||
entrypoint: "python train.py" | ||
workspace: "/home/job-1/" | ||
passes: 10 | ||
min-instance: 2 | ||
max-instance: 20 | ||
resources: | ||
limits: | ||
cpu: "15" | ||
memory: "8Gi" | ||
requests: | ||
cpu: "10" | ||
memory: "8Gi" | ||
pserver: | ||
min-instance: 2 | ||
max-instance: 2 | ||
resources: | ||
limits: | ||
cpu: "9" | ||
memory: "5Gi" | ||
requests: | ||
cpu: "6" | ||
memory: "5Gi" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
from PIL import Image | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not copy this file, just reuse the file under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very much agree that it's good to avoid duplicate code. However the For example, I think we need to avoid only the truly duplicate code, otherwise if two irrelevant components share the same code, it adds the unnecessary coupling of the two components. Just my thoughts, open for discussion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mabye we can change the folder name for a general name, such as |
||
import numpy as np | ||
import paddle.v2 as paddle | ||
import paddle.v2.dataset.common as common | ||
import os | ||
import sys | ||
import glob | ||
import pickle | ||
|
||
|
||
DC = os.getenv("PADDLE_CLOUD_CURRENT_DATACENTER") | ||
common.DATA_HOME = "/pfs/%s/public/idl/users/dl/paddlecloud/public/dataset" % DC | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The especial path is only used for the internal CPU cluster, I will changed this one to a general path before merged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
TRAIN_FILES_PATH = os.path.join(common.DATA_HOME, "mnist") | ||
TEST_FILES_PATH = os.path.join(common.DATA_HOME, "mnist") | ||
|
||
TRAINER_ID = int(os.getenv("PADDLE_INIT_TRAINER_ID", "-1")) | ||
NUM_GRADIENT_SERVERS = int(os.getenv("PADDLE_INIT_NUM_GRADIENT_SERVERS", "-1")) | ||
|
||
def prepare_dataset(): | ||
# convert will also split the dataset by line-count | ||
common.convert(TRAIN_FILES_PATH, | ||
paddle.dataset.mnist.train(), | ||
2000, "train") | ||
#common.convert(TEST_FILES_PATH, | ||
# paddle.dataset.mnist.test(), | ||
# 1, "test") | ||
|
||
def cluster_reader_recordio(trainer_id, trainer_count, flag): | ||
''' | ||
read from cloud dataset which is stored as recordio format | ||
each trainer will read a subset of files of the whole dataset. | ||
''' | ||
import recordio | ||
def reader(): | ||
PATTERN_STR = "%s-[0-9]*" % flag | ||
FILES_PATTERN = os.path.join(TRAIN_FILES_PATH, PATTERN_STR) | ||
file_list = glob.glob(FILES_PATTERN) | ||
file_list.sort() | ||
my_file_list = [] | ||
# read files for current trainer_id | ||
for idx, f in enumerate(file_list): | ||
if idx % trainer_count == trainer_id: | ||
my_file_list.append(f) | ||
for f in my_file_list: | ||
print "processing ", f | ||
reader = recordio.reader(f) | ||
record_raw = reader.read() | ||
while record_raw: | ||
yield pickle.loads(record_raw) | ||
record_raw = reader.read() | ||
reader.close() | ||
return reader | ||
|
||
|
||
|
||
def softmax_regression(img): | ||
predict = paddle.layer.fc( | ||
input=img, size=10, act=paddle.activation.Softmax()) | ||
return predict | ||
|
||
|
||
def multilayer_perceptron(img): | ||
# The first fully-connected layer | ||
hidden1 = paddle.layer.fc(input=img, size=128, act=paddle.activation.Relu()) | ||
# The second fully-connected layer and the according activation function | ||
hidden2 = paddle.layer.fc( | ||
input=hidden1, size=64, act=paddle.activation.Relu()) | ||
# The thrid fully-connected layer, note that the hidden size should be 10, | ||
# which is the number of unique digits | ||
predict = paddle.layer.fc( | ||
input=hidden2, size=10, act=paddle.activation.Softmax()) | ||
return predict | ||
|
||
|
||
def convolutional_neural_network(img): | ||
# first conv layer | ||
conv_pool_1 = paddle.networks.simple_img_conv_pool( | ||
input=img, | ||
filter_size=5, | ||
num_filters=20, | ||
num_channel=1, | ||
pool_size=2, | ||
pool_stride=2, | ||
act=paddle.activation.Relu()) | ||
# second conv layer | ||
conv_pool_2 = paddle.networks.simple_img_conv_pool( | ||
input=conv_pool_1, | ||
filter_size=5, | ||
num_filters=50, | ||
num_channel=20, | ||
pool_size=2, | ||
pool_stride=2, | ||
act=paddle.activation.Relu()) | ||
# fully-connected layer | ||
predict = paddle.layer.fc( | ||
input=conv_pool_2, size=10, act=paddle.activation.Softmax()) | ||
return predict | ||
|
||
|
||
def main(): | ||
paddle.init(trainer_count=1) | ||
|
||
# define network topology | ||
images = paddle.layer.data( | ||
name='pixel', type=paddle.data_type.dense_vector(784)) | ||
label = paddle.layer.data( | ||
name='label', type=paddle.data_type.integer_value(10)) | ||
|
||
# Here we can build the prediction network in different ways. Please | ||
# choose one by uncomment corresponding line. | ||
# predict = softmax_regression(images) | ||
# predict = multilayer_perceptron(images) | ||
predict = convolutional_neural_network(images) | ||
|
||
cost = paddle.layer.classification_cost(input=predict, label=label) | ||
|
||
parameters = paddle.parameters.create(cost) | ||
|
||
optimizer = paddle.optimizer.Momentum( | ||
learning_rate=0.1 / 128.0, | ||
momentum=0.9, | ||
regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128)) | ||
|
||
trainer = paddle.trainer.SGD( | ||
cost=cost, parameters=parameters, update_equation=optimizer) | ||
|
||
def event_handler(event): | ||
if isinstance(event, paddle.event.EndIteration): | ||
if event.batch_id % 100 == 0: | ||
print "Pass %d, Batch %d, Cost %f, %s" % ( | ||
event.pass_id, event.batch_id, event.cost, event.metrics) | ||
if isinstance(event, paddle.event.EndPass): | ||
result = trainer.test( | ||
reader=paddle.batch( | ||
paddle.dataset.mnist.test(), | ||
batch_size=2)) | ||
print "Test with Pass %d, Cost %f, %s\n" % ( | ||
event.pass_id, result.cost, result.metrics) | ||
|
||
trainer.train( | ||
reader=paddle.batch( | ||
cluster_reader_recordio(TRAINER_ID, NUM_GRADIENT_SERVERS, "train"), | ||
batch_size=128), | ||
event_handler=event_handler, | ||
num_passes=20) | ||
|
||
if __name__ == '__main__': | ||
usage = "python train.py [prepare|train]" | ||
if len(sys.argv) != 2: | ||
print usage | ||
exit(1) | ||
|
||
if TRAINER_ID == -1 or NUM_GRADIENT_SERVERS == -1: | ||
print "no cloud environ found, must run on cloud" | ||
exit(1) | ||
|
||
if sys.argv[1] == "prepare": | ||
prepare_dataset() | ||
elif sys.argv[1] == "train": | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep
submit_general_job
, but do we need to use it in the experiment?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the TestCase1 of the design, we will compare the gernatl jobs and fault-tolerant jobs, so maybe we need this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I meant currently we are using tolerant job to indicate scaling the job or not. We can also do the experiment by just not scaling the fault tolerant job (but always start a tolerant job), so we do not need the general job. But that's fine. This is just a minor difference.