Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refact MPPTunnel class to encapsulate different tunnel mode #5286

Merged
merged 19 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,15 @@ class MPMCQueue
// Copy and move it is dangerous.
DISALLOW_COPY_AND_MOVE(MPMCQueue);

/// Block until:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
/*
* | Previous Status | Empty | Behavior |
* |------------------|------------|--------------------------|
* | Normal | Yes | Block |
* | Normal | No | Pop and return true |
* | Finished | Yes | return false |
* | Finished | No | Pop and return true |
* | Cancelled | Yes/No | return false |
* */
ALWAYS_INLINE bool pop(T & obj)
{
return popObj<true>(obj);
Expand All @@ -105,10 +111,14 @@ class MPMCQueue
return popObj<false>(obj);
}

/// Block until:
/// 1. Push succeeds and return true.
/// 2. The queue is cancelled and return false.
/// 3. The queue has finished and return false.
/*
* | Previous Status | Full | Behavior |
* |------------------|------------|--------------------------|
* | Normal | Yes | Block |
* | Normal | No | Push and return true |
* | Finished | Yes/No | return false |
* | Cancelled | Yes/No | return false |
* */
template <typename U>
ALWAYS_INLINE bool push(U && u)
{
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ void EstablishCallData::tryFlushOne()
// check whether there is a valid msg to write
{
std::unique_lock lk(mu);
if (ready && mpp_tunnel->isSendQueueNextPopNonBlocking()) //not ready or no packet
if (ready && async_tunnel_sender->isSendQueueNextPopNonBlocking()) //not ready or no packet
ready = false;
else
return;
}
// there is a valid msg, do single write operation
mpp_tunnel->sendJob(false);
async_tunnel_sender->sendOne();
}

void EstablishCallData::responderFinish(const grpc::Status & status)
Expand Down Expand Up @@ -117,7 +117,7 @@ void EstablishCallData::writeDone(const ::grpc::Status & status)
state = FINISH;
if (stopwatch)
{
LOG_FMT_INFO(mpp_tunnel->getLogger(), "connection for {} cost {} ms.", mpp_tunnel->id(), stopwatch->elapsedMilliseconds());
LOG_FMT_INFO(async_tunnel_sender->getLogger(), "connection for {} cost {} ms.", async_tunnel_sender->getTunnelId(), stopwatch->elapsedMilliseconds());
}
responderFinish(status);
}
Expand All @@ -141,9 +141,10 @@ void EstablishCallData::cancel()
void EstablishCallData::finishTunnelAndResponder()
{
state = FINISH;
if (mpp_tunnel)
if (async_tunnel_sender)
{
mpp_tunnel->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.", mpp_tunnel->id()), true); //trigger mpp tunnel finish work
async_tunnel_sender->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.",
async_tunnel_sender->getTunnelId())); //trigger mpp tunnel finish work
}
grpc::Status status(static_cast<grpc::StatusCode>(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed.");
responder.Finish(status, this);
Expand All @@ -162,11 +163,11 @@ void EstablishCallData::proceed()
else if (state == PROCESSING)
{
std::unique_lock lk(mu);
if (mpp_tunnel->isSendQueueNextPopNonBlocking())
if (async_tunnel_sender->isSendQueueNextPopNonBlocking())
{
ready = false;
lk.unlock();
mpp_tunnel->sendJob(true);
async_tunnel_sender->sendOne();
}
else
ready = true;
Expand All @@ -186,9 +187,9 @@ void EstablishCallData::proceed()
}
}

void EstablishCallData::attachTunnel(const std::shared_ptr<DB::MPPTunnel> & mpp_tunnel_)
void EstablishCallData::attachAsyncTunnelSender(const std::shared_ptr<DB::AsyncTunnelSender> & async_tunnel_sender_)
{
stopwatch = std::make_shared<Stopwatch>();
this->mpp_tunnel = mpp_tunnel_;
this->async_tunnel_sender = async_tunnel_sender_;
}
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Flash/EstablishCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
namespace DB
{
class MPPTunnel;
class AsyncTunnelSender;
class AsyncFlashService;

class SyncPacketWriter : public PacketWriter
Expand Down Expand Up @@ -65,7 +66,7 @@ class EstablishCallData : public PacketWriter

void cancel();

void attachTunnel(const std::shared_ptr<DB::MPPTunnel> & mpp_tunnel_);
virtual void attachAsyncTunnelSender(const std::shared_ptr<DB::AsyncTunnelSender> & async_tunnel_sender_) override;

// Spawn a new EstablishCallData instance to serve new clients while we process the one for this EstablishCallData.
// The instance will deallocate itself as part of its FINISH state.
Expand Down Expand Up @@ -115,7 +116,7 @@ class EstablishCallData : public PacketWriter
FINISH
};
CallStatus state; // The current serving state.
std::shared_ptr<DB::MPPTunnel> mpp_tunnel = nullptr;
std::shared_ptr<DB::AsyncTunnelSender> async_tunnel_sender;
std::shared_ptr<Stopwatch> stopwatch;
};
} // namespace DB
1 change: 0 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
Stopwatch stopwatch;
if (calldata)
{
calldata->attachTunnel(tunnel);
// In async mode, this function won't wait for the request done and the finish event is handled in EstablishCallData.
tunnel->connect(calldata);
LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully in async way");
Expand Down
18 changes: 10 additions & 8 deletions dbms/src/Flash/Mpp/GRPCReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,26 @@ struct AsyncGrpcExchangePacketReader : public AsyncExchangePacketReader

struct LocalExchangePacketReader : public ExchangePacketReader
{
MPPTunnelPtr tunnel;
LocalTunnelSenderPtr local_tunnel_sender;

explicit LocalExchangePacketReader(const std::shared_ptr<MPPTunnel> & tunnel_)
: tunnel(tunnel_)
explicit LocalExchangePacketReader(const LocalTunnelSenderPtr & local_tunnel_sender_)
: local_tunnel_sender(local_tunnel_sender_)
{}

/// put the implementation of dtor in .cpp so we don't need to put the specialization of
/// pingcap::kv::RpcCall<mpp::EstablishMPPConnectionRequest> in header file.
~LocalExchangePacketReader() override
{
if (tunnel)
if (local_tunnel_sender)
{
// In case that ExchangeReceiver throw error before finish reading from mpp_tunnel
tunnel->consumerFinish("Receiver closed");
local_tunnel_sender->consumerFinish("Receiver closed");
}
}

bool read(MPPDataPacketPtr & packet) override
{
MPPDataPacketPtr tmp_packet = tunnel->readForLocal();
MPPDataPacketPtr tmp_packet = local_tunnel_sender->readForLocal();
bool success = tmp_packet != nullptr;
if (success)
packet = tmp_packet;
Expand All @@ -142,7 +142,9 @@ struct LocalExchangePacketReader : public ExchangePacketReader

::grpc::Status finish() override
{
tunnel.reset();
if (local_tunnel_sender)
local_tunnel_sender->consumerFinish("Receiver finished!");
local_tunnel_sender.reset();
return ::grpc::Status::OK;
}
};
Expand Down Expand Up @@ -222,7 +224,7 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques
{
throw Exception("Exchange receiver meet error : " + status.error_message());
}
return std::make_shared<LocalExchangePacketReader>(tunnel);
return std::make_shared<LocalExchangePacketReader>(tunnel->getLocalTunnelSender());
}
else
{
Expand Down
Loading