-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add design doc: concurrent data transfer and kernel execution #7276
Changes from 1 commit
de0de43
e3dd104
4515f34
47bf1cb
877e14b
d5c22cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,21 +23,20 @@ To support the above description, we need to define a new class: `Channel`. Here | |||||
template <typename T> | ||||||
class Channel { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To implement CSP in Fluid, we need a channel type, which could work with our There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I will fix this for next commitment. |
||||||
private: | ||||||
using ChannelElement = std::vector<T>; | ||||||
|
||||||
std::size_t capacity_; | ||||||
std::size_t bytes_limit_; | ||||||
std::size_t current_bytes_; | ||||||
std::mutex mu_; | ||||||
std::condition_variable empty_cond_var_; | ||||||
std::condition_variable full_cond_var_; | ||||||
std::deque<ChannelElement> channel_; | ||||||
std::deque<T> channel_; | ||||||
|
||||||
public: | ||||||
explicit Channel(std::size_t capacity, std::size_t bytes_limit); | ||||||
|
||||||
void Put(ChannelElement* buffer_element) { ... } | ||||||
void Get(ChannelElement* buffer_element) { ... } | ||||||
void Put(T* buffer_element) { ... } | ||||||
void Get(T* buffer_element) { ... } | ||||||
size_t Size() { ... } | ||||||
void Clear() { ... } | ||||||
}; | ||||||
|
@@ -48,21 +47,20 @@ class Channel { | |||||
|
||||||
``` | ||||||
... | ||||||
chan_list_name = "chan_list" | ||||||
with fluid.go(concurrent_program, chan_list_name): | ||||||
chan_list_config = fluid.channel_list.config(name="chan_list", type=var.CHANNEL_LIST) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this program is the most interesting part of this doc, but I don't see the creation of a channel-typed variable like ch = fluid.channel(...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case of multi-GPU, since different GPU have the different address, each GPU should own one channel. So I create a channel_list and all the channels are created in this |
||||||
|
||||||
with fluid.go(concurrent_program, chan_list_config) as go: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we look at the go foo()
go bar(a, b, c) Here we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I wrote this doc, I thought that one program corresponded to one goroutine, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this syntax valid? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is just pseudo code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I though about this for a while, now I believe that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is interesting! |
||||||
image = fluid.layers.data_layer(...) | ||||||
label = fluid.layers.data_layer(...) | ||||||
chan_list = fluid.channel_list.make(type=var.CHANNEL_LIST, name=chan_list_name) | ||||||
places = get_places() | ||||||
with parallel.for(places) as for_loop: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to me that we don't need to read data concurrently, there is just one data source ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this design doc, I want to overlap the time of data transfer(from CPU to GPU) and model training on GPU. As for data reading, there is no consideration here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, thanks. I think I understand what the code here is trying to do: load tensor on to different GPU memory for different channels. In the code below, image = fluid.layers.data_layer(...)
label = fluid.layers.data_layer(...)
places = get_places()
with parallel.for(places) as for_loop:
chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i))
fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)]) Do you mean Btw, I think parallel.for should be unrelated to where each block is running, it's just a concurrency syntax, we need other syntax to specify where each block is running. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Paddle/paddle/operators/parallel_do_op.cc Lines 105 to 106 in 23f5c18
The difference is that parallel.for(places) does not need merge output.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chengduoZH Thanks for your help! Now I understand the situation better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @helinwang Thanks for your review! |
||||||
chan = fluid.channel_list.get_channel(chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i)) | ||||||
chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i)) | ||||||
fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)]) | ||||||
|
||||||
with fluid.go(main_program, chan_list_name): | ||||||
chan_list = fluid.channel_list.make(type=var.CHANNEL_LIST, name=chan_list_name) | ||||||
with fluid.go(main_program, chan_list_config) as go: | ||||||
places = get_places() | ||||||
with parallel.for(places) as for_loop: | ||||||
chan = fluid.channel_list.get_channel(chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i)) | ||||||
chan = fluid.channel_list.get_channel(go.chan_list, type=var.CHANNELTYPE,name="chan_{}".format(for_loop.i)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe read data to one channel, and multiple consumer consuming from that channel (a single channel, not multiple channel) is enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Data reading has been done on Python side. |
||||||
image, label = fluid.channel.recv(chan) | ||||||
y_predict = fluid.layers.fc(input=image, size=1, act=None) | ||||||
cost = fluid.layers.square_error_cost(input=y_predict, label=label) | ||||||
|
@@ -84,9 +82,9 @@ for i in range(buffer_size): | |||||
``` | ||||||
In Python code, we define two `program`, `concurrent_program` used to send data into `Channel` and `main_program` used to get data from the `Channel` and execute training. If you're familiar with [`Goroutine`](https://www.golang-book.com/books/intro/10#section1) in the go language, you'll find that `main_program` and `concurrent_program` just like two `Goroutine`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we make chan = fluid.channel.make(type=var.LoDTensor)
with fluid.go(concurrent_program, chan):
image = fluid.layers.data_layer(...)
label = fluid.layers.data_layer(...)
places = get_places()
with parallel.for(places) as for_loop:
fluid.channle.send(chan, data=[for_loop.input(image), for_loop.input(label)])
with fluid.go(main_program, chan):
places = get_places()
with parallel.for(places) as for_loop:
image, label = fluid.channel.recv(chan)
y_predict = fluid.layers.fc(input=image, size=1, act=None)
cost = fluid.layers.square_error_cost(input=y_predict, label=label)
avg_cost = fluid.layers.mean(x=cost) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! |
||||||
|
||||||
In `concurrent_program`, `fluid.channel_list.make` gets a `chan_list` according to the `chan_list_name`. `fluid.channel_list.make` creates a `global_variable` and puts it into `concurrent_program.global_block`. And then `places = fluid.get_places()` gets all available device on the current machine. The `places` is used in `parallel.for`. In `parallel.for`, according to `name="chan_{}".format(for_loop.i)`, `fluid.channel_list.get_channel` gets a `chan` from `chan_list`. After that, `fluid.channle.send` sends `image` and `label` to this `channel`. | ||||||
In `concurrent_program`, `fluid.go`'s inputs are `program` and `chan_list_config`/`chan_config`. According to `chan_list_config`/`chan_config`, `fluid.go` calls `fluid.channel_list.make`/`fluid.channel.make` to get `chan_list`/`channel`. `fluid.channel_list.make`/ `fluid.channel.make` creates a `global_variable` and puts it into `program.global_block`. And then `places = fluid.get_places()` gets all available device on the current machine. The `places` is used in `parallel.for`. In `parallel.for`, according to `name="chan_{}".format(for_loop.i)`, `fluid.channel_list.get_channel` gets a `chan` from `chan_list`. After that, `fluid.channle.send` sends `image` and `label` to this `channel`. | ||||||
|
||||||
In `main_program`, roughly similar to the `concurrent_program`, the difference is that `main_program` gets `image` and `label` from `chan` by `fluid.channle.recv`. The names of `chan_list` in `concurrent_program` and `main_program` are the same, so the two `chan_list` are the same variable at the c++ side. | ||||||
In `main_program`, roughly similar to the `concurrent_program`, the difference is that `main_program` gets `image` and `label` from `chan` by calling `fluid.channle.recv`. The configs of `chan_list` in `concurrent_program` and `main_program` are the same, so this two `chan_list` are the same variable at the c++ side. | ||||||
|
||||||
|
||||||
## Reference | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/operators/detail/simple_block_queue.h you can reuse this code or change the name. It now acts as a "channel" internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to set the capacity for this Queue?