Skip to content

Commit

Permalink
Fix fetch record from master failed (#2848)
Browse files Browse the repository at this point in the history
Fix fetch record from master
  • Loading branch information
Yancey1989 authored Jul 17, 2017
1 parent 1032ef1 commit 83f263e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
14 changes: 11 additions & 3 deletions go/cmd/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/namsral/flag"
log "github.com/sirupsen/logrus"
"github.com/topicai/candy"

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
Expand All @@ -20,11 +21,18 @@ func main() {
port := flag.Int("port", 8080, "port of the master server.")
ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
taskTimeoutDur := flag.Duration("task-timout-dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.")
logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
flag.Parse()

level, e := log.ParseLevel(*logLevel)
candy.Must(e)

log.SetLevel(level)

if *endpoints == "" {
log.Warningln("-endpoints not set, fault tolerance not be enabled.")
}
Expand Down
6 changes: 4 additions & 2 deletions python/paddle/v2/dataset/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,10 @@ def close_writers(w):
def write_data(w, lines):
random.shuffle(lines)
for i, d in enumerate(lines):
d = cPickle.dumps(d)
w[i % num_shards].write(d)
# FIXME(Yancey1989):
# dumps with protocol: pickle.HIGHEST_PROTOCOL
o = pickle.dumps(d)
w[i % num_shards].write(o)

w = open_writers()
lines = []
Expand Down

0 comments on commit 83f263e

Please sign in to comment.