Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design doc: master server #1953

Merged
merged 5 commits into from
May 12, 2017
Merged

Conversation

helinwang
Copy link
Contributor

@helinwang helinwang commented May 2, 2017

Maybe here is easier to review.

@@ -21,7 +21,7 @@

### 文件预处理

在数据集可以被训练之前,文件需要预先被转换成PaddlePaddle集群内部的存储格式(SSTable)。我们提供两个转换方式:
在数据集可以被训练之前,文件需要预先被转换成PaddlePaddle集群内部的存储格式(RecordIO)。我们提供两个转换方式:

- 提供给用户本地转换的库,用户可以编写程序完成转换。
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 用户在本地转换好再上传
  2. 用户上传数据后,在机群上运行转换程序

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,89 @@
# Design Doc: Master Process

For an overview of master process' role, please refer to [distributed training design doc](./README.md). In this design doc we will discuss the master process in more details. The master will be implemented in [golang](https://golang.org/).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in golang ==> in Go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


<img src="src/dataset.png"/>

A dataset is represented by a list of files in *RecordIO* format on the distributed filesystem, each RecordIO file consists of multiple *blocks*, and each block has multiple data instances.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A dataset is represented by a list of files in RecordIO format on the distributed filesystem,

==>

A dataset is a list of files in RecordIO format.

A dataset is itself a list of files, not representationally.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each RecordIO file consists of multiple blocks, and each block has multiple data instances.

==>

A RecordIO file consists of chunks, whereas each chunk consists some records.

It is chunks, not blocks. And RecordIO files consist of records. It's us/PaddlePaddle who take records as instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


### Task Queue Creation

1. Each trainer will make an RPC call (using [golang rpc](https://golang.org/pkg/net/rpc/)) to the master process, telling it the RecordIO files representing the dataset specified by the user. Since every trainer will tell the master process the same dataset, only the first RPC call will be honored.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golang => Go's rpc package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


The RPC interface is:
```go
func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define the "dataset" to a list of files under the same path? So here we may not need Paths []string

Copy link
Contributor

@Yancey1989 Yancey1989 May 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change Paths []string to Path string, such as /home/random_images-*-of-*


@Yancey1989 please see #1953 (comment) - Helin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client need to implement the parsing logic for wildcards: user can train RecordIO files locally on their computer, there is no master when training locally.
Since the client already implemented it, I recommend us no to implement it again in the server changing Paths []string to Path string.


The task queues need to be persisted on [etcd](https://github.com/coreos/etcd) for fault recovery. Since the task queues only change once a task is completed or timed out, which is not very frequent, we can afford to synchronize with etcd every time the task queues change.

We will serialize the task queues data structure with [gob encoding](https://golang.org/pkg/encoding/gob/), compress with gzip, and save into etcd synchronously under key `/task_queues`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are 2 copies of task queue data: one in etcd, one in master process's memory. so the task queue guarantees "at least once" data dispatch. Which means there will be no data loss, but some data may be replayed to trainers.

This should be mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the "at least once" data dispatch is due to the possibility of task timeout, and master dispatches task again for retry. Can you explain more on why having "2 copies of task queue data" will cause "at least once" data dispatch?

Copy link
Contributor

@typhoonzero typhoonzero May 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right! Timouts brings "at least once" data garantee. Also "2 copies of task queue data" will also make it possible:

In master process may encounter:

def dispatch_task():
    task = taskQueues.Todo.dequeue()
    taskQueues.Pending[task] = new taskState
    # if master goes down here
    taskQueues.writeToEtcd()

If master goes down before syncing data to etcd, then when master restarted by Kubernetes and load queue data from etcd, then it will dispatch the task again.

Copy link
Contributor Author

@helinwang helinwang May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero The trainer's RPC call will fail in that case, and it will retry after master restarts. When the master restarts, the tasks are all in todo queue, so not re-dispatched. And when the trainer retries, the task will be marked done. So I think in this case "2 copies of task queue data" will not happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

So are you adding some notices for "at least once" at the and of the "timeout" section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done.


```python
# ...
reader = paddle.reader.creator.SSTable("/home/random_images-*-of-*")
reader = paddle.reader.creator.RecordIO("/home/random_images-*-of-*")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果是个云端的目录,是不是改为"pfs://home/random_images--of-"比较清楚?

Copy link
Contributor Author

@helinwang helinwang May 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pfs:// only have to be a concept for the fileserver client to distinguish between remote and local.
Since we always mount the user home directory on the same place, users can just think it as local directory. I think there is no need to add the concept of pfs for user to understand. More concept means steeper learning curve for users.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second both of you @helinwang and @Yancey1989 that from the perspective of user program running in a Pod, it is only I/O with the local filesystem, as

  1. the home directory should have been mapped to the Pod-local directory /home, and
  2. some shared directories, e.g., the pre-downloaded paddle.v2.dataset data, should have been mapped to the Pod-local directory /common.

and from the perspective of our client tool paddle, it has to refer to files in the distributed filesystem in a special format. But

  • I don't prefer pfs:///home/$USER/cifa/..., because if we are going to be compatible with the URL standard, we need 3 /s in above URL.
  • Instead, I prefer /pfs/$DATACENTER/home/$USER/cifa/..., which has been used in Google.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao Please take a look at @wangkuiyi 's comment, /pfs/ seems better than pfs:// when using the command line tool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with /pfs, thank @wangkuiyi @helinwang !

Copy link
Contributor

@gongweibao gongweibao May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我有个地方不太理解:

pfs:///home/$USER/cifa/...

  • 我们的命令中,比如mv:
    mv [OPTION]... <LocalPath> <PFSPath> or <PFSPath> <LocalPath> or <PFSPath> <PFSPath>

  如果用/pfs,而恰好用户有一个这样的目录,我们就没法区分local还是remote了

Copy link
Contributor Author

@helinwang helinwang May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao 我觉得就不用管了,hardcode如果/pfs/开头,就认为是远程。


The RPC interface is:
```go
func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error {
Copy link
Contributor

@Yancey1989 Yancey1989 May 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change Paths []string to Path string, such as /home/random_images-*-of-*


@Yancey1989 please see #1953 (comment) - Helin


### Task Retry Logic

When a task is dispatched to the trainer, the master will schedule a function for execution after the timeout duration (based on the moving average of task completion time). If the task entry in still in the pending queue, its timeout counter will increase by one, and the task will be moved to todo queue. If the timeout counter is above the threshold, the master will log the error and discard the task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the timeout, there will be at least two possible cases: Network Split and Slow Task, for the first case Kubernetes will recovery the trainer process, for the second one, Master process should tell the Trainer Stop The Task, so does the trainer also need an RPC interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just allow the trainer to continue work on the slow task :) (we can tolerate a task being trained twice since SGD is a stochastic algorithm.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a task being trained twice, the DONE queue will have many same tasks, does the DONE queue use map[int] TaskEntry instead of []TaskEntry ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 Good question! I think we can implement as when receiving "task done" message, check if the task is in the pending queue. If not, just ignore. So there will not be any duplicate task inside done queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

@helinwang helinwang changed the title Design doc: master process Design doc: master server May 9, 2017
Copy link
Collaborator

@wangkuiyi wangkuiyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@helinwang helinwang merged commit 4764303 into PaddlePaddle:develop May 12, 2017
@helinwang helinwang deleted the master_design branch May 12, 2017 18:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants