Skip to content

Commit

Permalink
Refact MPPTunnel class to encapsulate different tunnel mode (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yibin87 authored and Lloyd-Pottiger committed Jul 19, 2022
1 parent fba2271 commit 69ae1ca
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 487 deletions.
24 changes: 17 additions & 7 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,15 @@ class MPMCQueue
// Copy and move it is dangerous.
DISALLOW_COPY_AND_MOVE(MPMCQueue);

/// Block until:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
/*
* | Previous Status | Empty | Behavior |
* |------------------|------------|--------------------------|
* | Normal | Yes | Block |
* | Normal | No | Pop and return true |
* | Finished | Yes | return false |
* | Finished | No | Pop and return true |
* | Cancelled | Yes/No | return false |
* */
ALWAYS_INLINE bool pop(T & obj)
{
return popObj<true>(obj);
Expand All @@ -105,10 +111,14 @@ class MPMCQueue
return popObj<false>(obj);
}

/// Block until:
/// 1. Push succeeds and return true.
/// 2. The queue is cancelled and return false.
/// 3. The queue has finished and return false.
/*
* | Previous Status | Full | Behavior |
* |------------------|------------|--------------------------|
* | Normal | Yes | Block |
* | Normal | No | Push and return true |
* | Finished | Yes/No | return false |
* | Cancelled | Yes/No | return false |
* */
template <typename U>
ALWAYS_INLINE bool push(U && u)
{
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ void EstablishCallData::tryFlushOne()
// check whether there is a valid msg to write
{
std::unique_lock lk(mu);
if (ready && mpp_tunnel->isSendQueueNextPopNonBlocking()) //not ready or no packet
if (ready && async_tunnel_sender->isSendQueueNextPopNonBlocking()) //not ready or no packet
ready = false;
else
return;
}
// there is a valid msg, do single write operation
mpp_tunnel->sendJob(false);
async_tunnel_sender->sendOne();
}

void EstablishCallData::responderFinish(const grpc::Status & status)
Expand Down Expand Up @@ -117,7 +117,7 @@ void EstablishCallData::writeDone(const ::grpc::Status & status)
state = FINISH;
if (stopwatch)
{
LOG_FMT_INFO(mpp_tunnel->getLogger(), "connection for {} cost {} ms.", mpp_tunnel->id(), stopwatch->elapsedMilliseconds());
LOG_FMT_INFO(async_tunnel_sender->getLogger(), "connection for {} cost {} ms.", async_tunnel_sender->getTunnelId(), stopwatch->elapsedMilliseconds());
}
responderFinish(status);
}
Expand All @@ -141,9 +141,10 @@ void EstablishCallData::cancel()
void EstablishCallData::finishTunnelAndResponder()
{
state = FINISH;
if (mpp_tunnel)
if (async_tunnel_sender)
{
mpp_tunnel->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.", mpp_tunnel->id()), true); //trigger mpp tunnel finish work
async_tunnel_sender->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.",
async_tunnel_sender->getTunnelId())); //trigger mpp tunnel finish work
}
grpc::Status status(static_cast<grpc::StatusCode>(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed.");
responder.Finish(status, this);
Expand All @@ -162,11 +163,11 @@ void EstablishCallData::proceed()
else if (state == PROCESSING)
{
std::unique_lock lk(mu);
if (mpp_tunnel->isSendQueueNextPopNonBlocking())
if (async_tunnel_sender->isSendQueueNextPopNonBlocking())
{
ready = false;
lk.unlock();
mpp_tunnel->sendJob(true);
async_tunnel_sender->sendOne();
}
else
ready = true;
Expand All @@ -186,9 +187,9 @@ void EstablishCallData::proceed()
}
}

void EstablishCallData::attachTunnel(const std::shared_ptr<DB::MPPTunnel> & mpp_tunnel_)
void EstablishCallData::attachAsyncTunnelSender(const std::shared_ptr<DB::AsyncTunnelSender> & async_tunnel_sender_)
{
stopwatch = std::make_shared<Stopwatch>();
this->mpp_tunnel = mpp_tunnel_;
this->async_tunnel_sender = async_tunnel_sender_;
}
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Flash/EstablishCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
namespace DB
{
class MPPTunnel;
class AsyncTunnelSender;
class AsyncFlashService;

class SyncPacketWriter : public PacketWriter
Expand Down Expand Up @@ -65,7 +66,7 @@ class EstablishCallData : public PacketWriter

void cancel();

void attachTunnel(const std::shared_ptr<DB::MPPTunnel> & mpp_tunnel_);
virtual void attachAsyncTunnelSender(const std::shared_ptr<DB::AsyncTunnelSender> & async_tunnel_sender_) override;

// Spawn a new EstablishCallData instance to serve new clients while we process the one for this EstablishCallData.
// The instance will deallocate itself as part of its FINISH state.
Expand Down Expand Up @@ -115,7 +116,7 @@ class EstablishCallData : public PacketWriter
FINISH
};
CallStatus state; // The current serving state.
std::shared_ptr<DB::MPPTunnel> mpp_tunnel = nullptr;
std::shared_ptr<DB::AsyncTunnelSender> async_tunnel_sender;
std::shared_ptr<Stopwatch> stopwatch;
};
} // namespace DB
1 change: 0 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
Stopwatch stopwatch;
if (calldata)
{
calldata->attachTunnel(tunnel);
// In async mode, this function won't wait for the request done and the finish event is handled in EstablishCallData.
tunnel->connect(calldata);
LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully in async way");
Expand Down
18 changes: 10 additions & 8 deletions dbms/src/Flash/Mpp/GRPCReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,26 @@ struct AsyncGrpcExchangePacketReader : public AsyncExchangePacketReader

struct LocalExchangePacketReader : public ExchangePacketReader
{
MPPTunnelPtr tunnel;
LocalTunnelSenderPtr local_tunnel_sender;

explicit LocalExchangePacketReader(const std::shared_ptr<MPPTunnel> & tunnel_)
: tunnel(tunnel_)
explicit LocalExchangePacketReader(const LocalTunnelSenderPtr & local_tunnel_sender_)
: local_tunnel_sender(local_tunnel_sender_)
{}

/// put the implementation of dtor in .cpp so we don't need to put the specialization of
/// pingcap::kv::RpcCall<mpp::EstablishMPPConnectionRequest> in header file.
~LocalExchangePacketReader() override
{
if (tunnel)
if (local_tunnel_sender)
{
// In case that ExchangeReceiver throw error before finish reading from mpp_tunnel
tunnel->consumerFinish("Receiver closed");
local_tunnel_sender->consumerFinish("Receiver closed");
}
}

bool read(MPPDataPacketPtr & packet) override
{
MPPDataPacketPtr tmp_packet = tunnel->readForLocal();
MPPDataPacketPtr tmp_packet = local_tunnel_sender->readForLocal();
bool success = tmp_packet != nullptr;
if (success)
packet = tmp_packet;
Expand All @@ -142,7 +142,9 @@ struct LocalExchangePacketReader : public ExchangePacketReader

::grpc::Status finish() override
{
tunnel.reset();
if (local_tunnel_sender)
local_tunnel_sender->consumerFinish("Receiver finished!");
local_tunnel_sender.reset();
return ::grpc::Status::OK;
}
};
Expand Down Expand Up @@ -222,7 +224,7 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques
{
throw Exception("Exchange receiver meet error : " + status.error_message());
}
return std::make_shared<LocalExchangePacketReader>(tunnel);
return std::make_shared<LocalExchangePacketReader>(tunnel->getLocalTunnelSender());
}
else
{
Expand Down
Loading

0 comments on commit 69ae1ca

Please sign in to comment.