Skip to content

Commit

Permalink
Merge branch 'fix-test_send_recv' of https://github.com/jacquesqiao/P…
Browse files Browse the repository at this point in the history
…addle into fix_server_shutdown
  • Loading branch information
typhoonzero committed Mar 30, 2018
2 parents f6de248 + e727cdb commit 52439d9
Show file tree
Hide file tree
Showing 32 changed files with 923 additions and 281 deletions.
6 changes: 6 additions & 0 deletions cmake/generic.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,9 @@ function(grpc_library TARGET_NAME)
get_filename_component(PROTO_WE ${grpc_library_PROTO} NAME_WE)
get_filename_component(PROTO_PATH ${ABS_PROTO} PATH)

#FIXME(putcn): the follwoing line is supposed to generate *.pb.h and cc, but
# somehow it didn't. line 602 to 604 is to patching this. Leaving this here
# for now to enable dist CI.
protobuf_generate_cpp(grpc_proto_srcs grpc_proto_hdrs "${ABS_PROTO}")
set(grpc_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/${PROTO_WE}.grpc.pb.cc")
set(grpc_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/${PROTO_WE}.grpc.pb.h")
Expand All @@ -597,6 +600,9 @@ function(grpc_library TARGET_NAME)
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" -I "${PROTO_PATH}"
--plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN}" "${ABS_PROTO}"
COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
ARGS --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" -I "${PROTO_PATH}"
"${ABS_PROTO}"
DEPENDS "${ABS_PROTO}" ${PROTOBUF_PROTOC_EXECUTABLE} extern_grpc)

# FIXME(typhoonzero): grpc generated code do not generate virtual-dtor, mark it
Expand Down
43 changes: 38 additions & 5 deletions doc/fluid/design/concepts/cpp_data_feeding.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ To solve this problem, we introduce `ReaderHolder` as a wrapper. It acts as an e

To create and invoke readers, some new ops are introduced:

### CreateReaderOp
### Operators That Create Readers

Each reader has its creation op. File readers' creation ops have no input and yield the created file reader as its output. Decorated readers' creation ops take the underlying readers as inputs and then yield new decorated readers.

Expand Down Expand Up @@ -153,19 +153,52 @@ double_buffer_reader = create_double_buffer_op(batch_reader)
The forwarding ops of the corresponding `main_program` would be like this:

```
while_op {
not_completed = true
pass_count = 0
while_op(not_completed) {
has_next = has_next_op(double_buffer_reader)
if_else_op(has_next) {
batch_data = read_op(double_buffer_reader)
... (subsequent training ops)
} else {
reset_op(double_buffer_reader)
increase_op(pass_count)
not_completed = less_than_op(pass_count, reqiured_pass_num)
}
}
```

Two important considerations for these programs are as follows:
A few important considerations for these programs are as follows:

1. The multiple\_reader is the batch\_reader's underlying reader, and the batch\_reader is the double\_buffer\_reader's underlying reader. `read_op`, `has_next_op` and other reader related ops will only invoke the top-most reader. In this case, it's the double\_buffer\_reader.
1. `not_completed`, `pass_count` and other variables shown above are all Fluid Variables.

2. All readers exist in both `startup_program` and `main_program`. And they are persistable.
2. The multiple\_reader is the batch\_reader's underlying reader, and the batch\_reader is the double\_buffer\_reader's underlying reader. `read_op`, `has_next_op` and other reader related ops will only invoke the top-most reader. In this case, it's the double\_buffer\_reader.

3. All readers exist in both `startup_program` and `main_program`. And they are persistable.

### Simplify Configuration by MultiPassReader

The Program configuration mentioned above is complicated. Users need to be very familiar to concepts of Program and Block to prevent making mistakes in their code. To make the usage of C++ readers more friendly to new users, we introduce `MultiPassReader`.

`MultiPassReader` is a decorated reader. A multi-pass reader is used to continuously yield data for several training passes. It takes the number of passes to run as one of its attributes('pass_num') and maintains a counter to record how many passes it has completed. Each time its underlying reader reaches the EOF, the multi-pass reader checks whether it has completed the training of given number of pass. If not, the underlying reader will be re-initialized and starts a new pass automatically. Before completing the whole training, the return of MultiPassReader's `HasNext()` will always be `true`.

With `MultiPassReader`, the startup program would be like this:

```
multiple_reader = open_files_op(...)
batch_reader = create_batch_reader_op(multiple_reader)
multi_pass_reader = create_multi_pass_reader_op(batch_reader)
double_buffer_reader = create_double_buffer_op(multi_pass_reader)
... (other initializers)
```

The forwarding part of the corresponding `main_program` would be like this:

```
not_completed = true
while_op(not_completed) {
batch_data = read_op(double_buffer_reader)
... (subsequent training ops)
not_completed = has_next_op(double_buffer_reader)
}
```
139 changes: 139 additions & 0 deletions doc/fluid/design/concurrent/channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Channel Design

## Introduction

A Channel is a data structure that allows for synchronous interprocess
communication via message passing. It is a fundemental component of CSP
(communicating sequential processes), and allows for users to pass data
between threads without having to worry about synchronization.

## How to use it

Paddle offers python APIs to open and close channels, along with sending
and receiving data to/from a channel.

### Create a channel

Creates a new channel that takes in variables of a specific dtype.

- **fluid.make_channel(dtype, capacity=0)**
- **dtype**: The data type of variables being sent/received through channel
- **capacity**: The capacity of the channel. A capacity of 0 represents
an unbuffered channel. Capacity > 0 represents a buffered channel

```
ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR, 10)
```

### Close a channel

Closes a channel. Any pending senders and receivers will be awoken during
this time. Receivers can still receive from a closed channel, but senders
are not allowed to send any additional data to the channel (Paddle will
raise an exception if users try to send to a closed channel.)

- **fluid.channel_close(channel)**

```
fluid.channel_close(ch)
```

### Send data to a channel

Sends a variable to a channel. Currently, variables of dtype `LoDTensor`,
`LoDRankTable`, `LoDTensorArray`, `SelectedRows`, `ReaderHolder`, and
`ChannelHolder` are supported.

By default, the data of the Variable is moved from the sender to the receiver,
however the user can optionally copy the data before performing the send.

- **channel_send(channel, variable, is_copy=False)**
- **channel**: The channel to send the variable to
- **variable**: The variable to send to the channel
- **is_copy**: If set to True, channel_send will perform a variable assign
to copy the source variable to a new variable to be sent.

```
ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
var = fill_constant(shape=[1],dtype=core.VarDesc.VarType.INT32, value=100)
fluid.channel_send(ch, var, True)
```

### Receive data from a channel

Receives a variable from a channel. The data of the variable is moved to the
receiving variable.

- **channel_recv(channel, return_variable)**
- **channel**: The channel to receive the variable from
- **return_variable**: The destination variable used to store the data of the
variable received from the channel

```
ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR)
var = fill_constant(shape=[1],dtype=core.VarDesc.VarType.INT32, value=-1)
fluid.channel_recv(ch, var)
```

## How it Works

Channels provides a simple interface for different threads to share data.
To support the synchronization requirements, channels utilizes a series of
internal queues, locks, and conditional variables.

### QueueMessage

QueueMessage encapsulates the state of the channel send/receive operation to be
put in the **sendq/recvq**. It contains a condition variable used to lock the
thread (when there are no available sends/receives). In addition, it contains
a callback function to notify a thread when the QueueMessage is being
processed by the channel.

### Queues

- **buff_**: This queue holds the data buffer in a buffered channel. The
capacity is set to the capacity of the channel. This data buffer is not
used in an unbuffered channel.

- **sendq**: This queue holds the QueueMessage of any pending senders of a
channel. When a thread performs a channel_send operation on the channel, the
channel_send operation will put a new QueueMessage on the sendq and block the
current thread under two conditions:
1. The channel is buffered and is full
2. The channel is unbuffered and does not have a receiver

- **recvq**: This queue holds the QueueMessage of any pending receivers of a
channel. When a thread performs a channel_recv operation on the channel, the
channel_recv operation will put a new QueueMessage on the recvq and block the
current thread under two conditions:
1. The channel is buffered and there is no data on the buff_
2. The channel is unbuffered and does not have a sender

### State diagram

#### Channel Send

<p align="center">
<img src="./images/channel_send.png"/><br/>
</p>

#### Channel Receive

<p align="center">
<img src="./images/channel_recv.png"/><br/>
</p>

## Limitations and Considerations

### Variable Copy

In golang, variables in channels are copied from the sender to the receiver.
In Paddle, the data from our variables are **moved** from sender to receiver.
As a result, these variables should not be used after they are sent. We
provide a flag in channel_send method to allow users to copy the variable to
be sent before it is sent.

Please note that this is acheived by adding an **assign** operator and creating
a temporary variable that is sent in place of the original variable. Please
note that **assign** operator has limited support for only certain variables
datatypes.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ cc_test(init_test SRCS init_test.cc DEPS init)
cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_context framework_proto)
cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc)

cc_test(channel_test SRCS channel_test.cc)
# cc_test(channel_test SRCS channel_test.cc)
cc_test(tuple_test SRCS tuple_test.cc )
cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op channel_close_op channel_create_op
channel_send_op channel_recv_op sum_op select_op elementwise_add_op compare_op
Expand Down
49 changes: 43 additions & 6 deletions paddle/fluid/framework/block_desc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,52 @@ void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return;
}
auto get_vars = [](std::deque<std::unique_ptr<OpDesc>>::iterator &op,
std::vector<std::string> &v) {
auto in_names = (*op)->InputArgumentNames();
v.insert(v.end(), in_names.begin(), in_names.end());
auto out_names = (*op)->OutputArgumentNames();
v.insert(v.end(), out_names.begin(), out_names.end());
std::sort(v.begin(), v.end());
auto last = std::unique(v.begin(), v.end());
v.erase(last, v.end());
};
need_update_ = true;
for (auto it = ops_.begin() + s; it != ops_.begin() + e; it++) {
auto names = (*it)->InputArgumentNames();
for (auto n : names) {
// TODO(typhoonzero): delete vars if no other op use it.
VLOG(3) << "deleting var " << n;

for (size_t i = s; i < e; i++) {
// since remove op one by one, every time remove the first op.
auto op = ops_.begin() + s;

// collect input and output variables from current delete op
std::vector<std::string> cur_vars;
get_vars(op, cur_vars);

// remove current op
ops_.erase(ops_.begin() + s);

// collect input and output variables from other ops
std::vector<std::string> other_vars;
for (auto it = ops_.begin(); it != ops_.end(); it++) {
get_vars(it, other_vars);
}

// variables should be deleted
std::vector<std::string> delete_vars;
// delete_vars = cur_vars - cur_vars ^ other_input_vars
std::set_difference(cur_vars.begin(), cur_vars.end(), other_vars.begin(),
other_vars.end(),
std::inserter(delete_vars, delete_vars.end()));
// remove variables
for (size_t i = 0; i < delete_vars.size(); i++) {
auto name = delete_vars[i];
auto it = vars_.find(name);
PADDLE_ENFORCE(it != vars_.end(),
"%s is not in variable list, it should not be deleted",
name);
vars_.erase(it);
VLOG(3) << "deleting variable " << name;
}
}
ops_.erase(ops_.begin() + s, ops_.begin() + e);
}

std::vector<OpDesc *> BlockDesc::AllOps() const {
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/block_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class BlockDesc {

OpDesc *InsertOp(size_t index);

/*
* Remove Op and its input/output variables.
* Note that for either input or ouput variable, if it is also an input or
* output variable of other ops, we should remain it.
*/
void RemoveOp(size_t s, size_t e);

std::vector<OpDesc *> AllOps() const;
Expand Down
36 changes: 36 additions & 0 deletions paddle/fluid/operators/activation_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,36 @@ Floor Activation Operator.
}
};

class CosOpMaker : public framework::OpProtoAndCheckerMaker {
public:
CosOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: framework::OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "Input of Cosine operator");
AddOutput("Out", "Output of Cosine operator");
AddComment(R"DOC(
Cosine Activation Operator.
$out = cos(x)$
)DOC");
}
};

class SinOpMaker : public framework::OpProtoAndCheckerMaker {
public:
SinOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: framework::OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "Input of Sine operator");
AddOutput("Out", "Output of Sine operator");
AddComment(R"DOC(
Sine Activation Operator.
$out = sin(x)$
)DOC");
}
};

class RoundOpMaker : public framework::OpProtoAndCheckerMaker {
public:
RoundOpMaker(OpProto *proto, OpAttrChecker *op_checker)
Expand Down Expand Up @@ -561,6 +591,12 @@ REGISTER_OP(ceil, ops::ActivationOp, ops::CeilOpMaker, ceil_grad,
REGISTER_OP(floor, ops::ActivationOp, ops::FloorOpMaker, floor_grad,
ops::ActivationOpGrad);

REGISTER_OP(cos, ops::ActivationOp, ops::CosOpMaker, cos_grad,
ops::ActivationOpGrad);

REGISTER_OP(sin, ops::ActivationOp, ops::SinOpMaker, sin_grad,
ops::ActivationOpGrad);

REGISTER_OP(round, ops::ActivationOp, ops::RoundOpMaker, round_grad,
ops::ActivationOpGrad);

Expand Down
Loading

0 comments on commit 52439d9

Please sign in to comment.