Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sync_mode device ctx from grpc server #10881

Merged
merged 43 commits into from
Jun 1, 2018

Conversation

gongweibao
Copy link
Contributor

@gongweibao gongweibao commented May 23, 2018

Fix part of #10804.

@gongweibao gongweibao changed the title [WIP]Move sync_mode device ctx from grpc server [WIP]Move sync_mode device ctx from grpc server May 23, 2018
@gongweibao gongweibao changed the title [WIP]Move sync_mode device ctx from grpc server Move sync_mode device ctx from grpc server May 24, 2018
Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there are still compile errors, need to fix that first

const ReceivedMessage Get() { return var_recv_queue_.Pop(); }

void Push(const std::string& msg_name) {
var_recv_queue_.Push(std::make_pair(msg_name, nullptr));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can get rid of the queues, and have a member function Process which will do the "real" processing, like:

void Process(const std::string& msg_name, framework::Variable* var);
virtual void ProcessImpl(const std::string& msg_name, framework::Variable* var);

Then, operators can define classes inherited from this class and write the ProcessImpl to do the job that when message arrives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

} else {
request_.reset(new VariableResponse(scope, dev_ctx_, true));
request_.reset(new VariableResponse(rpc_processor->scope(),
rpc_processor->dev_ctx(), true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_.reset(new VariableResponse(rpc_processor->scope(),
        rpc_processor->dev_ctx(), !rpc_processor->sync_mode()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

executor_->RunPreparedContext(prefetch_ctx_, scope_);

SerializeToByteBuffer(var_name, var, *dev_ctx_, &reply_);
rpc_processor_->RequestPrefetch(request_.get(), &reply);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply_?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq,
int rpc_id,
std::function<void()> TryToRegisterNewOne) {
TryToRegisterNewOne();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is not necessary? We only register before the existing one is deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge一时爽,debug火葬场!
Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.Thanks.


VLOG(3) << "HandleRequest for " << rpc_id << " get Next";

PADDLE_ENFORCE(tag);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the tag is now is an id starting from 0, smaller than the max buf size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

PADDLE_ENFORCE(tag);

if (rpc_processor_->sync_mode()) {
if (!is_shut_down_ &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps !is_shut_down_ should be in previous if? and this one should be if-else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

void AsyncGRPCServer::WaitCond(const std::string& rpc_name) {
std::unique_lock<std::mutex> lock(this->barrier_mutex_);
auto it = barrier_.find(rpc_name);
assert(it != barrier_.end());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 2 line is equal to barrier_.at()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -418,15 +396,29 @@ void AsyncGRPCServer::HandleRequest(
}
}

void AsyncGRPCServer::WaitCond(int cond) {
void AsyncGRPCServer::RegisterBarrier(const std::string& rpc_name,
int cond_id) {
std::unique_lock<std::mutex> lock(this->barrier_mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need a map? It seems we only have a few rpc methods. A enum is enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. A set is enough!

GRPCProcessorCtx() : exit_flag_(false), fan_in_(-1) { clear_to_init(); }
virtual ~GRPCProcessorCtx() {}

bool ProcessSendImpl(const std::string& msg_name, framework::Variable* var,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All handler functions can be saved in a unordered_map<std::string, std::function>, call the handler when RPC request arrives.

bool ProcessPrefetchImpl(const std::string& msg_name, framework::Scope* scope,
framework::Variable** var);

void SetFanIn(int fan_in) { fan_in_ = fan_in; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For easier to understand, can call this SetNumClients, then the WaitBarrier(std::string method) can wait all the clients have completed sending the barrier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


bool IsExit() {
std::unique_lock<std::mutex> lock(this->mutex_);
return exit_flag_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can make this atomic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exit_flag_ need be protected with the condition.

std::mutex mutex_;

// send
std::condition_variable condition_send_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one condition is needed here for all status. The "condition" is the current "state" of the RCP server, condition will be changed only when client barrier reaches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private:
// status
bool exit_flag_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A handler class have no "exit_flag", should let RPC server do this, also the condition and state changing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


SerializeToByteBuffer(var_name, var, *dev_ctx_, &reply_);
local_scope_ = &rpc_processor_->scope()->NewScope();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO to see if this local_scope_ is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, g_rpc_service.get()));

sleep(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does WaitServerReady() work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

namespace detail {

void GRPCProcessorCtx::SetExit() {
LOG(WARNING) << "GRPCProcessorCtx SetExit!";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

exit_flag = true;
while (true) {
if (rpc_processor_->IsExit()) {
LOG(WARNING) << "get exit!rpc_processor break!";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INFO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

LOG(INFO) << "received terminate message and exit";
exit_flag = true;
while (true) {
if (rpc_processor_->IsExit()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has rpc_service been shutdown at this point? Maybe it should wait rpc_service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop() call both of them exit. So can we judge only one?

executor_->RunPreparedContext((*grad_to_prepared_ctx_)[msg_name].get(),
scope);
} catch (std::exception& e) {
LOG(ERROR) << "async: run sub program error " << e.what();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return false here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done.


if (var == nullptr) {
LOG(ERROR) << "sync: Can not find server side var: " << msg_name;
PADDLE_THROW("sync: Can not find server side var");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return false or throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done.

LOG(WARNING) << "GRPCProcessorCtx SetExit!";
std::unique_lock<std::mutex> lock(mutex_);
exit_flag_ = true;
condition_send_.notify_all();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to unlock the mutex before executor notify_all, that will avoid deadlock. Reference https://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

}

void GRPCProcessorCtx::IncreaseBatchBarrierSend() {
std::unique_lock<std::mutex> lock(mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A condition variable should used with a mutex as a pair, and mutex_ is used anywhere, maybe you can use multiple mutex to project these condition variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reserve only one! Thanks! Done!


GRPCProcessorCtx* rpc_processor_;

// barrier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need more details about this comment, it's confusion with only one word barrier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change a readable name.
Thanks! Done!

virtual void WaitCond(int cond) = 0;

protected:
std::string address_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to bind_address_ is better for a server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

VLOG(3) << "batch_barrier_:" << batch_barrier_;
}

void clear_to_init() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this Reset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// barrier
void WaitBarrier() {
std::unique_lock<std::mutex> lock(this->mutex_);
condition_.wait(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why "Barriers" can not put in RPCServer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,158 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've written an example at: https://gist.github.com/typhoonzero/fc2489231c8e3a29d9ac7c895c8f4aae, which shows how to abstract the rpcserver. This implement can actually remove the redundant "blocking queue" also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.Thanks.

@typhoonzero
Copy link
Contributor

Please fix the compile errors first

framework::Executor* executor() { return executor_; }
std::vector<framework::Variable*>& sparse_vars() { return sparse_vars_; }

// request handler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe need detailed comments here, and also in the header files, so developers may get to know more about the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

std::shared_ptr<framework::ExecutorPrepareContext>>*
grad_to_prepared_ctx_;

// get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,91 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can now move this file under fluid/framework as a general interface now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in next PR.

}

void RPCServer::SavePort() const {
// NOTE: default write file to /tmp/paddle.selected_port
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can delete this line now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

RequestHandler* handler, int thread_num) {
rpc_call_map_[rpc_name] = handler;
rpc_thread_num_[rpc_name] = thread_num;
PADDLE_ENFORCE((thread_num >= 0 && thread_num <= 24),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

24 is hard coded and maybe we don't need check from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

namespace operators {
namespace detail {

bool GrpcRequestSendHandler::Handle(void* input, void* output) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we pass inputs below to the Handle function, so that the handler can not depend on the RPC implement:

bool Handle(const std::string& varname, Variable* var, Variable* outvar);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM basically, please fix those minor comments so that we can merge it.

// SerializeToByteBuffer(varname, outvar,
// *request_handler_->dev_ctx(), &reply_);
// }
virtual bool Handle(const std::string& varname, framework::Scope* scope,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious we have a member scope and why need to pass a scope parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequestSend and RequestPrefetch uses the scope from VariableResponse.
Such as https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/operators/detail/grpc_server.cc#L198.

namespace operators {
namespace detail {

class RequestSendHandler final : public RequestHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_handler_impl.cc seems not quiet large, so can omit the .cc file or just put it in the listen_and_serv operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Others depends them.

std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::StartServer, g_rpc_service.get()));

// FIXME(gongwb): don't use hard time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call WaitServerReady

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should wait for data transformation end. So I add FIXME here.

void ShutdownQueue();
void ShutDownImpl() override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShutDownImpl, ShutdownQueue, HandleRequest these two functions can be private? They are only used in grpc_server class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// The implemention is in request_handler_impl.
// example:
// std::string varname = request_.varname();
// VLOG(3) << "RequestGet " << varname;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example have no need to add VLOG

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

rpc_service_->ShutDown();
break;
}
sleep(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sleep1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid deed loop.

rpc_service_.reset(new detail::AsyncGRPCServer(endpoint, sync_mode));
// request_handler_.reset(new detail::GRPCRequestSendHandler(sync_mode));
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint, fan_in));
request_send_handler_.reset(new detail::RequestSendHandler(sync_mode));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_send_handler => send_request_handler ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequestSendHandler对应。

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, we can merge this work now finnally.

@gongweibao gongweibao merged commit 4fb7cc7 into PaddlePaddle:develop Jun 1, 2018
@gongweibao gongweibao deleted the cleanprefetch branch June 11, 2018 10:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants