From 69ae1cab00b89600e14035082a356cc94e6ff628 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 14 Jul 2022 09:53:05 +0800 Subject: [PATCH] Refact MPPTunnel class to encapsulate different tunnel mode (#5286) close pingcap/tiflash#5095 --- dbms/src/Common/MPMCQueue.h | 24 +- dbms/src/Flash/EstablishCall.cpp | 19 +- dbms/src/Flash/EstablishCall.h | 5 +- dbms/src/Flash/FlashService.cpp | 1 - dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 18 +- dbms/src/Flash/Mpp/MPPTunnel.cpp | 397 ++++++++++--------- dbms/src/Flash/Mpp/MPPTunnel.h | 289 +++++++++----- dbms/src/Flash/Mpp/PacketWriter.h | 4 + dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 331 ++++++++-------- 9 files changed, 601 insertions(+), 487 deletions(-) diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index e005c363eae..42aad66e807 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -80,9 +80,15 @@ class MPMCQueue // Copy and move it is dangerous. DISALLOW_COPY_AND_MOVE(MPMCQueue); - /// Block until: - /// 1. Pop succeeds with a valid T: return true. - /// 2. The queue is cancelled or finished: return false. + /* + * | Previous Status | Empty | Behavior | + * |------------------|------------|--------------------------| + * | Normal | Yes | Block | + * | Normal | No | Pop and return true | + * | Finished | Yes | return false | + * | Finished | No | Pop and return true | + * | Cancelled | Yes/No | return false | + * */ ALWAYS_INLINE bool pop(T & obj) { return popObj(obj); @@ -105,10 +111,14 @@ class MPMCQueue return popObj(obj); } - /// Block until: - /// 1. Push succeeds and return true. - /// 2. The queue is cancelled and return false. - /// 3. The queue has finished and return false. + /* + * | Previous Status | Full | Behavior | + * |------------------|------------|--------------------------| + * | Normal | Yes | Block | + * | Normal | No | Push and return true | + * | Finished | Yes/No | return false | + * | Cancelled | Yes/No | return false | + * */ template ALWAYS_INLINE bool push(U && u) { diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 2f8c7c15f56..6dece74d007 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -55,13 +55,13 @@ void EstablishCallData::tryFlushOne() // check whether there is a valid msg to write { std::unique_lock lk(mu); - if (ready && mpp_tunnel->isSendQueueNextPopNonBlocking()) //not ready or no packet + if (ready && async_tunnel_sender->isSendQueueNextPopNonBlocking()) //not ready or no packet ready = false; else return; } // there is a valid msg, do single write operation - mpp_tunnel->sendJob(false); + async_tunnel_sender->sendOne(); } void EstablishCallData::responderFinish(const grpc::Status & status) @@ -117,7 +117,7 @@ void EstablishCallData::writeDone(const ::grpc::Status & status) state = FINISH; if (stopwatch) { - LOG_FMT_INFO(mpp_tunnel->getLogger(), "connection for {} cost {} ms.", mpp_tunnel->id(), stopwatch->elapsedMilliseconds()); + LOG_FMT_INFO(async_tunnel_sender->getLogger(), "connection for {} cost {} ms.", async_tunnel_sender->getTunnelId(), stopwatch->elapsedMilliseconds()); } responderFinish(status); } @@ -141,9 +141,10 @@ void EstablishCallData::cancel() void EstablishCallData::finishTunnelAndResponder() { state = FINISH; - if (mpp_tunnel) + if (async_tunnel_sender) { - mpp_tunnel->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.", mpp_tunnel->id()), true); //trigger mpp tunnel finish work + async_tunnel_sender->consumerFinish(fmt::format("{}: finishTunnelAndResponder called.", + async_tunnel_sender->getTunnelId())); //trigger mpp tunnel finish work } grpc::Status status(static_cast(GRPC_STATUS_UNKNOWN), "Consumer exits unexpected, grpc writes failed."); responder.Finish(status, this); @@ -162,11 +163,11 @@ void EstablishCallData::proceed() else if (state == PROCESSING) { std::unique_lock lk(mu); - if (mpp_tunnel->isSendQueueNextPopNonBlocking()) + if (async_tunnel_sender->isSendQueueNextPopNonBlocking()) { ready = false; lk.unlock(); - mpp_tunnel->sendJob(true); + async_tunnel_sender->sendOne(); } else ready = true; @@ -186,9 +187,9 @@ void EstablishCallData::proceed() } } -void EstablishCallData::attachTunnel(const std::shared_ptr & mpp_tunnel_) +void EstablishCallData::attachAsyncTunnelSender(const std::shared_ptr & async_tunnel_sender_) { stopwatch = std::make_shared(); - this->mpp_tunnel = mpp_tunnel_; + this->async_tunnel_sender = async_tunnel_sender_; } } // namespace DB diff --git a/dbms/src/Flash/EstablishCall.h b/dbms/src/Flash/EstablishCall.h index 3b81b9da6c1..4a62e5f0c6b 100644 --- a/dbms/src/Flash/EstablishCall.h +++ b/dbms/src/Flash/EstablishCall.h @@ -23,6 +23,7 @@ namespace DB { class MPPTunnel; +class AsyncTunnelSender; class AsyncFlashService; class SyncPacketWriter : public PacketWriter @@ -65,7 +66,7 @@ class EstablishCallData : public PacketWriter void cancel(); - void attachTunnel(const std::shared_ptr & mpp_tunnel_); + virtual void attachAsyncTunnelSender(const std::shared_ptr & async_tunnel_sender_) override; // Spawn a new EstablishCallData instance to serve new clients while we process the one for this EstablishCallData. // The instance will deallocate itself as part of its FINISH state. @@ -115,7 +116,7 @@ class EstablishCallData : public PacketWriter FINISH }; CallStatus state; // The current serving state. - std::shared_ptr mpp_tunnel = nullptr; + std::shared_ptr async_tunnel_sender; std::shared_ptr stopwatch; }; } // namespace DB diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 422c6b47cfa..28e40d29d7c 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -297,7 +297,6 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon Stopwatch stopwatch; if (calldata) { - calldata->attachTunnel(tunnel); // In async mode, this function won't wait for the request done and the finish event is handled in EstablishCallData. tunnel->connect(calldata); LOG_FMT_DEBUG(tunnel->getLogger(), "connect tunnel successfully in async way"); diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index b0e49792d1a..5adefe25f92 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -114,26 +114,26 @@ struct AsyncGrpcExchangePacketReader : public AsyncExchangePacketReader struct LocalExchangePacketReader : public ExchangePacketReader { - MPPTunnelPtr tunnel; + LocalTunnelSenderPtr local_tunnel_sender; - explicit LocalExchangePacketReader(const std::shared_ptr & tunnel_) - : tunnel(tunnel_) + explicit LocalExchangePacketReader(const LocalTunnelSenderPtr & local_tunnel_sender_) + : local_tunnel_sender(local_tunnel_sender_) {} /// put the implementation of dtor in .cpp so we don't need to put the specialization of /// pingcap::kv::RpcCall in header file. ~LocalExchangePacketReader() override { - if (tunnel) + if (local_tunnel_sender) { // In case that ExchangeReceiver throw error before finish reading from mpp_tunnel - tunnel->consumerFinish("Receiver closed"); + local_tunnel_sender->consumerFinish("Receiver closed"); } } bool read(MPPDataPacketPtr & packet) override { - MPPDataPacketPtr tmp_packet = tunnel->readForLocal(); + MPPDataPacketPtr tmp_packet = local_tunnel_sender->readForLocal(); bool success = tmp_packet != nullptr; if (success) packet = tmp_packet; @@ -142,7 +142,9 @@ struct LocalExchangePacketReader : public ExchangePacketReader ::grpc::Status finish() override { - tunnel.reset(); + if (local_tunnel_sender) + local_tunnel_sender->consumerFinish("Receiver finished!"); + local_tunnel_sender.reset(); return ::grpc::Status::OK; } }; @@ -222,7 +224,7 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques { throw Exception("Exchange receiver meet error : " + status.error_message()); } - return std::make_shared(tunnel); + return std::make_shared(tunnel->getLocalTunnelSender()); } else { diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 16fe4ae42cc..ab26d4d29f3 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -28,8 +28,7 @@ extern const char exception_during_mpp_close_tunnel[]; extern const char random_tunnel_wait_timeout_failpoint[]; } // namespace FailPoints -template -MPPTunnelBase::MPPTunnelBase( +MPPTunnel::MPPTunnel( const mpp::TaskMeta & receiver_meta_, const mpp::TaskMeta & sender_meta_, const std::chrono::seconds timeout_, @@ -37,45 +36,33 @@ MPPTunnelBase::MPPTunnelBase( bool is_local_, bool is_async_, const String & req_id) - : connected(false) - , finished(false) - , is_local(is_local_) - , is_async(is_async_) - , timeout(timeout_) - , tunnel_id(fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id())) - , input_streams_num(input_steams_num_) - , send_queue(std::max(5, input_steams_num_ * 5)) // MPMCQueue can benefit from a slightly larger queue size - , thread_manager(newThreadManager()) - , log(Logger::get("MPPTunnel", req_id, tunnel_id)) -{ - RUNTIME_ASSERT(!(is_local && is_async), log, "is_local: {}, is_async: {}.", is_local, is_async); - GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); -} + : MPPTunnel(fmt::format("tunnel{}+{}", sender_meta_.task_id(), receiver_meta_.task_id()), timeout_, input_steams_num_, is_local_, is_async_, req_id) +{} -template -MPPTunnelBase::MPPTunnelBase( +MPPTunnel::MPPTunnel( const String & tunnel_id_, const std::chrono::seconds timeout_, int input_steams_num_, bool is_local_, bool is_async_, const String & req_id) - : connected(false) - , finished(false) - , is_local(is_local_) - , is_async(is_async_) + : status(TunnelStatus::Unconnected) , timeout(timeout_) , tunnel_id(tunnel_id_) - , input_streams_num(input_steams_num_) - , send_queue(std::max(5, input_steams_num_ * 5)) // MPMCQueue can benefit from a slightly larger queue size - , thread_manager(newThreadManager()) + , send_queue(std::make_shared>(std::max(5, input_steams_num_ * 5))) // MPMCQueue can benefit from a slightly larger queue size , log(Logger::get("MPPTunnel", req_id, tunnel_id)) { - RUNTIME_ASSERT(!(is_local && is_async), log, "is_local: {}, is_async: {}.", is_local, is_async); + RUNTIME_ASSERT(!(is_local_ && is_async_), log, "is_local: {}, is_async: {}.", is_local_, is_async_); + if (is_local_) + mode = TunnelSenderMode::LOCAL; + else if (is_async_) + mode = TunnelSenderMode::ASYNC_GRPC; + else + mode = TunnelSenderMode::SYNC_GRPC; + GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Increment(); } -template -MPPTunnelBase::~MPPTunnelBase() +MPPTunnel::~MPPTunnel() { SCOPE_EXIT({ GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Decrement(); @@ -84,9 +71,9 @@ MPPTunnelBase::~MPPTunnelBase() { { std::unique_lock lock(mu); - if (finished) + if (status == TunnelStatus::Finished) { - LOG_FMT_TRACE(log, "already finished!"); + LOG_DEBUG(log, "already finished!"); return; } @@ -95,9 +82,7 @@ MPPTunnelBase::~MPPTunnelBase() finishSendQueue(); } LOG_FMT_TRACE(log, "waiting consumer finish!"); - waitForConsumerFinish(/*allow_throw=*/false); - LOG_FMT_TRACE(log, "waiting child thread finished!"); - thread_manager->wait(); + waitForSenderFinish(/*allow_throw=*/false); } catch (...) { @@ -106,32 +91,36 @@ MPPTunnelBase::~MPPTunnelBase() LOG_FMT_TRACE(log, "destructed tunnel obj!"); } -template -void MPPTunnelBase::finishSendQueue() +void MPPTunnel::finishSendQueue() { - bool flag = send_queue.finish(); - if (flag && !is_local && is_async) - writer->tryFlushOne(); + bool flag = send_queue->finish(); + if (flag && mode == TunnelSenderMode::ASYNC_GRPC) + { + async_tunnel_sender->tryFlushOne(); + } } /// exit abnormally, such as being cancelled. -template -void MPPTunnelBase::close(const String & reason) +void MPPTunnel::close(const String & reason) { { std::unique_lock lk(mu); - if (finished) + switch (status) + { + case TunnelStatus::Unconnected: + status = TunnelStatus::Finished; + cv_for_status_changed.notify_all(); return; - if (connected) + case TunnelStatus::Connected: { if (!reason.empty()) { try { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_close_tunnel); - send_queue.push(std::make_shared(getPacketWithError(reason))); - if (!is_local && is_async) - writer->tryFlushOne(); + send_queue->push(std::make_shared(getPacketWithError(reason))); + if (mode == TunnelSenderMode::ASYNC_GRPC) + async_tunnel_sender->tryFlushOne(); } catch (...) { @@ -139,34 +128,37 @@ void MPPTunnelBase::close(const String & reason) } } finishSendQueue(); + break; } - else - { - finished = true; - cv_for_connected_or_finished.notify_all(); + case TunnelStatus::WaitingForSenderFinish: + break; + case TunnelStatus::Finished: return; + default: + RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", status); } } - waitForConsumerFinish(/*allow_throw=*/false); + waitForSenderFinish(/*allow_throw=*/false); } // TODO: consider to hold a buffer -template -void MPPTunnelBase::write(const mpp::MPPDataPacket & data, bool close_after_write) +void MPPTunnel::write(const mpp::MPPDataPacket & data, bool close_after_write) { LOG_FMT_TRACE(log, "ready to write"); { - std::unique_lock lk(mu); - waitUntilConnectedOrFinished(lk); - if (finished) - throw Exception("write to tunnel which is already closed," + consumer_state.getError()); + { + std::unique_lock lk(mu); + waitUntilConnectedOrFinished(lk); + if (status == TunnelStatus::Finished) + throw Exception("write to tunnel which is already closed," + tunnel_sender->getConsumerFinishMsg()); + } - if (send_queue.push(std::make_shared(data))) + if (send_queue->push(std::make_shared(data))) { connection_profile_info.bytes += data.ByteSizeLong(); connection_profile_info.packets += 1; - if (!is_local && is_async) - writer->tryFlushOne(); + if (mode == TunnelSenderMode::ASYNC_GRPC) + async_tunnel_sender->tryFlushOne(); if (close_after_write) { finishSendQueue(); @@ -176,157 +168,98 @@ void MPPTunnelBase::write(const mpp::MPPDataPacket & data, bool close_af } } // push failed, wait consumer for the final state - waitForConsumerFinish(/*allow_throw=*/true); + waitForSenderFinish(/*allow_throw=*/true); } -template -void MPPTunnelBase::sendJob(bool need_lock) -{ - RUNTIME_ASSERT(!is_local, log, "should not reach sendJob for local tunnels"); - if (!is_async) - { - GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment(); - GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Value(), GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Value())); - } - String err_msg; - try - { - /// TODO(fzh) reuse it later - MPPDataPacketPtr res; - while (send_queue.pop(res)) - { - if (!writer->write(*res)) - { - err_msg = "grpc writes failed."; - break; - } - else - { - if (is_async) - return; - } - } - } - catch (Exception & e) - { - err_msg = e.message(); - } - catch (std::exception & e) - { - err_msg = e.what(); - } - catch (...) - { - err_msg = "fatal error in sendJob()"; - } - if (!err_msg.empty()) - { - /// append tunnel id to error message - err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg); - LOG_ERROR(log, err_msg); - } - consumerFinish(err_msg, need_lock); - if (is_async) - writer->writeDone(grpc::Status::OK); - else - { - GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement(); - } -} - - /// done normally and being called exactly once after writing all packets -template -void MPPTunnelBase::writeDone() +void MPPTunnel::writeDone() { - LOG_FMT_TRACE(log, "ready to finish, is_local: {}", is_local); + LOG_FMT_TRACE(log, "ready to finish, is_local: {}", mode == TunnelSenderMode::LOCAL); { std::unique_lock lk(mu); - if (finished) - throw Exception("write to tunnel which is already closed," + consumer_state.getError()); + if (status == TunnelStatus::Finished) + throw Exception("write to tunnel which is already closed," + tunnel_sender->getConsumerFinishMsg()); /// make sure to finish the tunnel after it is connected waitUntilConnectedOrFinished(lk); finishSendQueue(); } - waitForConsumerFinish(/*allow_throw=*/true); -} - -template -std::shared_ptr MPPTunnelBase::readForLocal() -{ - RUNTIME_ASSERT(is_local, log, "should not reach readForLocal for remote tunnels"); - MPPDataPacketPtr res; - if (send_queue.pop(res)) - return res; - consumerFinish(""); - return nullptr; + waitForSenderFinish(/*allow_throw=*/true); } -template -void MPPTunnelBase::connect(Writer * writer_) +void MPPTunnel::connect(PacketWriter * writer) { { std::unique_lock lk(mu); - if (connected) - throw Exception("MPPTunnel has connected"); - if (finished) - throw Exception("MPPTunnel has finished"); + if (status != TunnelStatus::Unconnected) + throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString())); LOG_FMT_TRACE(log, "ready to connect"); - if (is_local) - RUNTIME_ASSERT(writer_ == nullptr, log); - else + switch (mode) { - writer = writer_; - if (!is_async) - { - // communicate send_thread through `consumer_state` - // NOTE: if the thread creation failed, `connected` will still be `false`. - thread_manager->schedule(true, "MPPTunnel", [this] { - sendJob(); - }); - } + case TunnelSenderMode::LOCAL: + RUNTIME_ASSERT(writer == nullptr, log); + local_tunnel_sender = std::make_shared(mode, send_queue, nullptr, log, tunnel_id); + tunnel_sender = local_tunnel_sender; + break; + case TunnelSenderMode::SYNC_GRPC: + RUNTIME_ASSERT(writer != nullptr, log, "Sync writer shouldn't be null"); + sync_tunnel_sender = std::make_shared(mode, send_queue, writer, log, tunnel_id); + sync_tunnel_sender->startSendThread(); + tunnel_sender = sync_tunnel_sender; + break; + case TunnelSenderMode::ASYNC_GRPC: + RUNTIME_ASSERT(writer != nullptr, log, "Async writer shouldn't be null"); + async_tunnel_sender = std::make_shared(mode, send_queue, writer, log, tunnel_id); + tunnel_sender = async_tunnel_sender; + writer->attachAsyncTunnelSender(async_tunnel_sender); + break; + default: + RUNTIME_ASSERT(false, log, "Unsupported TunnelSenderMode: {}", mode); } - connected = true; - cv_for_connected_or_finished.notify_all(); + status = TunnelStatus::Connected; + cv_for_status_changed.notify_all(); } LOG_DEBUG(log, "connected"); } -template -void MPPTunnelBase::waitForFinish() +void MPPTunnel::waitForFinish() { - waitForConsumerFinish(/*allow_throw=*/true); + waitForSenderFinish(/*allow_throw=*/true); } -template -void MPPTunnelBase::waitForConsumerFinish(bool allow_throw) +void MPPTunnel::waitForSenderFinish(bool allow_throw) { #ifndef NDEBUG { std::unique_lock lock(mu); - assert(connected); + assert(status != TunnelStatus::Unconnected); } #endif LOG_FMT_TRACE(log, "start wait for consumer finish!"); - String err_msg = consumer_state.getError(); // may blocking + { + std::unique_lock lock(mu); + status = TunnelStatus::WaitingForSenderFinish; + } + String err_msg = tunnel_sender->getConsumerFinishMsg(); // may blocking + { + std::unique_lock lock(mu); + status = TunnelStatus::Finished; + } if (allow_throw && !err_msg.empty()) throw Exception("Consumer exits unexpected, " + err_msg); LOG_FMT_TRACE(log, "end wait for consumer finish!"); } -template -void MPPTunnelBase::waitUntilConnectedOrFinished(std::unique_lock & lk) +void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock & lk) { - auto connected_or_finished = [&] { - return connected || finished; + auto not_unconnected = [&] { + return (status != TunnelStatus::Unconnected); }; if (timeout.count() > 0) { LOG_FMT_TRACE(log, "start waitUntilConnectedOrFinished"); - auto res = cv_for_connected_or_finished.wait_for(lk, timeout, connected_or_finished); + auto res = cv_for_status_changed.wait_for(lk, timeout, not_unconnected); LOG_FMT_TRACE(log, "end waitUntilConnectedOrFinished"); - fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, res = false;); if (!res) throw Exception(tunnel_id + " is timeout"); @@ -334,38 +267,126 @@ void MPPTunnelBase::waitUntilConnectedOrFinished(std::unique_lock -void MPPTunnelBase::consumerFinish(const String & err_msg, bool need_lock) +StringRef MPPTunnel::statusToString() +{ + switch (status) + { + case TunnelStatus::Unconnected: + return "Unconnected"; + case TunnelStatus::Connected: + return "Connected"; + case TunnelStatus::WaitingForSenderFinish: + return "WaitingForSenderFinish"; + case TunnelStatus::Finished: + return "Finished"; + default: + RUNTIME_ASSERT(false, log, "Unknown TaskStatus {}", status); + } +} + +void TunnelSender::consumerFinish(const String & msg) { - // must finish send_queue outside of the critical area to avoid deadlock with write. LOG_FMT_TRACE(log, "calling consumer Finish"); - send_queue.finish(); - auto rest_work = [this, &err_msg] { - // it's safe to call it multiple times - if (finished && consumer_state.errHasSet()) - return; - finished = true; - // must call setError in the critical area to keep consistent with `finished` from outside. - consumer_state.setError(err_msg); - cv_for_connected_or_finished.notify_all(); - }; - if (need_lock) + send_queue->finish(); + consumer_state.setMsg(msg); +} + +SyncTunnelSender::~SyncTunnelSender() +{ + LOG_FMT_TRACE(log, "waiting child thread finished!"); + thread_manager->wait(); +} + +void SyncTunnelSender::sendJob() +{ + GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment(); + GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Value(), GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Value())); + String err_msg; + try { - std::unique_lock lk(mu); - rest_work(); + MPPDataPacketPtr res; + while (send_queue->pop(res)) + { + if (!writer->write(*res)) + { + err_msg = "grpc writes failed."; + break; + } + } } - else - rest_work(); + catch (...) + { + err_msg = getCurrentExceptionMessage(true); + } + if (!err_msg.empty()) + { + err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg); + LOG_ERROR(log, err_msg); + trimStackTrace(err_msg); + } + consumerFinish(err_msg); + GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement(); +} + +void SyncTunnelSender::startSendThread() +{ + thread_manager = newThreadManager(); + thread_manager->schedule(true, "MPPTunnel", [this] { + sendJob(); + }); } -/// Explicit template instantiations - to avoid code bloat in headers. -template class MPPTunnelBase; +void AsyncTunnelSender::tryFlushOne() +{ + writer->tryFlushOne(); +} + +void AsyncTunnelSender::sendOne() +{ + String err_msg; + bool queue_empty_flag = false; + try + { + MPPDataPacketPtr res; + queue_empty_flag = !send_queue->pop(res); + if (!queue_empty_flag) + { + if (!writer->write(*res)) + { + err_msg = "grpc writes failed."; + } + } + } + catch (...) + { + err_msg = getCurrentExceptionMessage(true); + } + if (!err_msg.empty()) + { + err_msg = fmt::format("{} meet error: {}", tunnel_id, err_msg); + LOG_ERROR(log, err_msg); + trimStackTrace(err_msg); + } + if (!err_msg.empty() || queue_empty_flag) + { + consumerFinish(err_msg); + writer->writeDone(grpc::Status::OK); + } +} +LocalTunnelSender::MPPDataPacketPtr LocalTunnelSender::readForLocal() +{ + MPPDataPacketPtr res; + if (send_queue->pop(res)) + return res; + consumerFinish(""); + return nullptr; +} } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index bdc60a97f5a..419358b8bce 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -44,45 +44,163 @@ namespace DB { namespace tests { -class MPPTunnelTest; -class TestMPPTunnelBase; +class TestMPPTunnel; } // namespace tests class EstablishCallData; +enum class TunnelSenderMode +{ + SYNC_GRPC, // Using sync grpc writer + LOCAL, // Expose internal memory access, no grpc writer needed + ASYNC_GRPC // Using async grpc writer +}; + +/// TunnelSender is responsible for consuming data from Tunnel's internal send_queue and do the actual sending work +/// After TunnelSend finished its work, either normally or abnormally, set ConsumerState to inform Tunnel +class TunnelSender : private boost::noncopyable +{ +public: + using MPPDataPacketPtr = std::shared_ptr; + using DataPacketMPMCQueuePtr = std::shared_ptr>; + virtual ~TunnelSender() = default; + TunnelSender(TunnelSenderMode mode_, DataPacketMPMCQueuePtr send_queue_, PacketWriter * writer_, const LoggerPtr log_, const String & tunnel_id_) + : mode(mode_) + , send_queue(send_queue_) + , writer(writer_) + , log(log_) + , tunnel_id(tunnel_id_) + { + } + DataPacketMPMCQueuePtr getSendQueue() + { + return send_queue; + } + void consumerFinish(const String & err_msg); + String getConsumerFinishMsg() + { + return consumer_state.getMsg(); + } + bool isConsumerFinished() + { + return consumer_state.msgHasSet(); + } + const LoggerPtr & getLogger() const { return log; } + String getTunnelId() + { + return tunnel_id; + } + +protected: + /// TunnelSender use consumer state to inform tunnel that whether sender has finished its work + class ConsumerState + { + public: + ConsumerState() + : future(promise.get_future()) + { + } + String getMsg() + { + future.wait(); + return future.get(); + } + void setMsg(const String & msg) + { + bool old_value = false; + if (!msg_has_set.compare_exchange_strong(old_value, true, std::memory_order_seq_cst, std::memory_order_relaxed)) + return; + promise.set_value(msg); + } + bool msgHasSet() const + { + return msg_has_set.load(); + } + + private: + std::promise promise; + std::shared_future future; + std::atomic msg_has_set{false}; + }; + TunnelSenderMode mode; + DataPacketMPMCQueuePtr send_queue; + ConsumerState consumer_state; + PacketWriter * writer; + const LoggerPtr log; + String tunnel_id; +}; + +/// SyncTunnelSender maintains a new thread itself to consume and send data +class SyncTunnelSender : public TunnelSender +{ +public: + using Base = TunnelSender; + using Base::Base; + virtual ~SyncTunnelSender(); + void startSendThread(); + +private: + friend class tests::TestMPPTunnel; + void sendJob(); + std::shared_ptr thread_manager; +}; + +/// AsyncTunnelSender is mainly triggered by the Async PacketWriter which handles GRPC request/response in async mode, send one element one time +class AsyncTunnelSender : public TunnelSender +{ +public: + using Base = TunnelSender; + using Base::Base; + void tryFlushOne(); + void sendOne(); + bool isSendQueueNextPopNonBlocking() { return send_queue->isNextPopNonBlocking(); } +}; + +/// LocalTunnelSender just provide readForLocal method to return one element one time +/// LocalTunnelSender is owned by the associated ExchangeReceiver +class LocalTunnelSender : public TunnelSender +{ +public: + using Base = TunnelSender; + using Base::Base; + MPPDataPacketPtr readForLocal(); +}; + +using TunnelSenderPtr = std::shared_ptr; +using SyncTunnelSenderPtr = std::shared_ptr; +using AsyncTunnelSenderPtr = std::shared_ptr; +using LocalTunnelSenderPtr = std::shared_ptr; + /** - * MPPTunnelBase represents the sender of an exchange connection. - * - * (Deprecated) It is designed to be a template class so that we can mock a MPPTunnel without involving gRPC. + * MPPTunnel represents the sender of an exchange connection. * - * The lifecycle of a MPPTunnel can be indicated by `connected` and `finished`: - * | Stage | `connected` | `finished` | - * |--------------------------------|-------------|------------| - * | After constructed | false | false | - * | After `close` before `connect` | false | true | - * | After `connect` | true | false | - * | After `consumerFinish` | true | true | + * The lifecycle of a MPPTunnel can be indicated by TunnelStatus: + * | Previous Status | Event | New Status | + * |------------------------|-----------------|------------------------| + * | NaN | Construction | Unconnected | + * | Unconnected | Close | Finished | + * | Unconnected | Connection | Connected | + * | Connected | WriteDone | WaitingForSenderFinish | + * | Connected | Close | WaitingForSenderFinish | + * | Connected | Encounter error | WaitingForSenderFinish | + * | WaitingForSenderFinish | Sender Finished | Finished | * - * To be short: before `connect`, only `close` can finish a MPPTunnel; after `connect`, only `consumerFinish` can. + * To be short: before connect, only close can finish a MPPTunnel; after connect, only Sender Finish can. * - * Each MPPTunnel has a consumer to consume data. There're two kinds of consumers: local and remote. - * - Remote consumer is owned by MPPTunnel itself. MPPTunnel will create a thread and run `sendLoop`. - * - Local consumer is owned by the associated ExchangeReceiver (in the same process). + * Each MPPTunnel has a Sender to consume data. There're three kinds of senders: sync_remote, local and async_remote. * - * The protocol between MPPTunnel and consumer: + * The protocol between MPPTunnel and Sender: * - All data will be pushed into the `send_queue`, including errors. - * - MPPTunnel may close `send_queue` to notify consumer normally finish. - * - Consumer may close `send_queue` to notify MPPTunnel that an error occurs. - * - After `connect` only the consumer can set `finished` to `true`. - * - Consumer's state is saved in `consumer_state` and be available after consumer finished. + * - MPPTunnel may finish `send_queue` to notify Sender normally finish. + * - Sender may finish `send_queue` to notify MPPTunnel that an error occurs. + * - After `status` turned to Connected only when Sender finish its work, MPPTunnel can set its 'status' to Finished. * - * NOTE: to avoid deadlock, `waitForConsumerFinish` should be called outside of the protection of `mu`. + * NOTE: to avoid deadlock, `waitForSenderFinish` should be called outside of the protection of `mu`. */ -template -class MPPTunnelBase : private boost::noncopyable +class MPPTunnel : private boost::noncopyable { public: - MPPTunnelBase( + MPPTunnel( const mpp::TaskMeta & receiver_meta_, const mpp::TaskMeta & sender_meta_, std::chrono::seconds timeout_, @@ -91,7 +209,16 @@ class MPPTunnelBase : private boost::noncopyable bool is_async_, const String & req_id); - ~MPPTunnelBase(); + // For gtest usage + MPPTunnel( + const String & tunnel_id_, + std::chrono::seconds timeout_, + int input_steams_num_, + bool is_local_, + bool is_async_, + const String & req_id); + + ~MPPTunnel(); const String & id() const { return tunnel_id; } @@ -101,122 +228,68 @@ class MPPTunnelBase : private boost::noncopyable // finish the writing. void writeDone(); - std::shared_ptr readForLocal(); - /// close() finishes the tunnel, if the tunnel is connected already, it will /// write the error message to the tunnel, otherwise it just close the tunnel void close(const String & reason); // a MPPConn request has arrived. it will build connection by this tunnel; - void connect(Writer * writer_); + void connect(PacketWriter * writer); // wait until all the data has been transferred. void waitForFinish(); const ConnectionProfileInfo & getConnectionProfileInfo() const { return connection_profile_info; } - bool isLocal() const { return is_local; } + bool isLocal() const { return mode == TunnelSenderMode::LOCAL; } + bool isAsync() const { return mode == TunnelSenderMode::ASYNC_GRPC; } const LoggerPtr & getLogger() const { return log; } - // do finish work for consumer, if need_lock is false, it means it has been protected by a mutex lock. - void consumerFinish(const String & err_msg, bool need_lock = true); - - bool isSendQueueNextPopNonBlocking() { return send_queue.isNextPopNonBlocking(); } - - // In async mode, do a singe send operation when Writer::TryWrite() succeeds. - // In sync mode, as a background task to keep sending until done. - void sendJob(bool need_lock = true); + TunnelSenderPtr getTunnelSender() { return tunnel_sender; } + SyncTunnelSenderPtr getSyncTunnelSender() { return sync_tunnel_sender; } + AsyncTunnelSenderPtr getAsyncTunnelSender() { return async_tunnel_sender; } + LocalTunnelSenderPtr getLocalTunnelSender() { return local_tunnel_sender; } private: - friend class tests::MPPTunnelTest; - friend class tests::TestMPPTunnelBase; - // For gtest usage - MPPTunnelBase( - const String & tunnel_id_, - std::chrono::seconds timeout_, - int input_steams_num_, - bool is_local_, - bool is_async_, - const String & req_id); + friend class tests::TestMPPTunnel; + // TODO(hyb): Extract Cancelled status from Finished to distinguish Completed and Cancelled situation + enum class TunnelStatus + { + Unconnected, // Not connect to any writer, not able to accept new data + Connected, // Connected to some writer, accepting data + WaitingForSenderFinish, // Accepting all data already, wait for sender to finish + Finished // Final state, no more work to do + }; + StringRef statusToString(); void finishSendQueue(); void waitUntilConnectedOrFinished(std::unique_lock & lk); - void waitForConsumerFinish(bool allow_throw); + void waitForSenderFinish(bool allow_throw); std::mutex mu; - std::condition_variable cv_for_connected_or_finished; - - bool connected; // if the exchange in has connected this tunnel. - - bool finished; // if the tunnel has finished its connection. - - bool is_local; // if the tunnel is used for local environment - - bool is_async; // if the tunnel is used for async server. + std::condition_variable cv_for_status_changed; - Writer * writer; + TunnelStatus status; std::chrono::seconds timeout; // tunnel id is in the format like "tunnel[sender]+[receiver]" String tunnel_id; - int input_streams_num; - using MPPDataPacketPtr = std::shared_ptr; - MPMCQueue send_queue; - - std::shared_ptr thread_manager; - - /// Consumer can be sendLoop or local receiver. - class ConsumerState - { - public: - ConsumerState() - : future(promise.get_future()) - { - } - - // before finished, must be called without protection of mu - String getError() - { - future.wait(); - return future.get(); - } - - void setError(const String & err_msg) - { - promise.set_value(err_msg); - err_has_set = true; - } - - bool errHasSet() const - { - return err_has_set.load(); - } - - private: - std::promise promise; - std::shared_future future; - std::atomic err_has_set{false}; - }; - ConsumerState consumer_state; - + using DataPacketMPMCQueuePtr = std::shared_ptr>; + DataPacketMPMCQueuePtr send_queue; ConnectionProfileInfo connection_profile_info; - const LoggerPtr log; + TunnelSenderMode mode; // Tunnel transfer data mode + TunnelSenderPtr tunnel_sender; // Used to refer to one of sync/async/local_tunnel_sender which is not nullptr, just for coding convenience + // According to mode value, among the sync/async/local_tunnel_senders, only the responding sender is not null and do actual work + SyncTunnelSenderPtr sync_tunnel_sender; + AsyncTunnelSenderPtr async_tunnel_sender; + LocalTunnelSenderPtr local_tunnel_sender; }; - -class MPPTunnel : public MPPTunnelBase -{ -public: - using Base = MPPTunnelBase; - using Base::Base; -}; - using MPPTunnelPtr = std::shared_ptr; } // namespace DB diff --git a/dbms/src/Flash/Mpp/PacketWriter.h b/dbms/src/Flash/Mpp/PacketWriter.h index 2333f124ef6..0cf45889ca1 100644 --- a/dbms/src/Flash/Mpp/PacketWriter.h +++ b/dbms/src/Flash/Mpp/PacketWriter.h @@ -27,6 +27,7 @@ namespace DB { // PacketWriter is a common interface of both sync and async gRPC writer. // It is used as the template parameter of `MPPTunnel`. +class AsyncTunnelSender; class PacketWriter { public: @@ -42,6 +43,9 @@ class PacketWriter // If it is not ready, caller can't write a packet. virtual void tryFlushOne() {} + // Attach async sender to async writer so that async writer can use it to get/transfer DataPacket and set consumer finish msg directly + virtual void attachAsyncTunnelSender(const std::shared_ptr &) {} + // Finish rpc with a status. Needed by async writer. For sync writer it is useless but not harmful. virtual void writeDone(const ::grpc::Status & /*status*/) {} }; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 706c17ed036..d2604c462e1 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -26,44 +26,6 @@ namespace DB { namespace tests { -class MPPTunnelTest : public MPPTunnelBase -{ -public: - using Base = MPPTunnelBase; - using Base::Base; - MPPTunnelTest( - const String & tunnel_id_, - std::chrono::seconds timeout_, - int input_steams_num_, - bool is_local_, - bool is_async_, - const String & req_id) - : Base(tunnel_id_, timeout_, input_steams_num_, is_local_, is_async_, req_id) - {} - void setFinishFlag(bool flag) - { - finished = flag; - } - bool getFinishFlag() - { - return finished; - } - bool getConnectFlag() - { - return connected; - } - std::shared_ptr getThreadManager() - { - return thread_manager; - } - LoggerPtr getLog() - { - return log; - } -}; - -using MPPTunnelTestPtr = std::shared_ptr; - class MockWriter : public PacketWriter { bool write(const mpp::MPPDataPacket & packet) override @@ -86,29 +48,36 @@ class MockFailedWriter : public PacketWriter struct MockLocalReader { - MPPTunnelTestPtr tunnel; + LocalTunnelSenderPtr local_sender; std::vector write_packet_vec; + std::shared_ptr thread_manager; - explicit MockLocalReader(const MPPTunnelTestPtr & tunnel_) - : tunnel(tunnel_) - {} + explicit MockLocalReader(const LocalTunnelSenderPtr & local_sender_) + : local_sender(local_sender_) + , thread_manager(newThreadManager()) + { + thread_manager->schedule(true, "LocalReader", [this] { + this->read(); + }); + } ~MockLocalReader() { - if (tunnel) + if (local_sender) { // In case that ExchangeReceiver throw error before finish reading from mpp_tunnel - LOG_FMT_TRACE(tunnel->getLog(), "before mocklocalreader invoking consumerFinish!"); - tunnel->consumerFinish("Receiver closed"); - LOG_FMT_TRACE(tunnel->getLog(), "after mocklocalreader invoking consumerFinish!"); + LOG_FMT_TRACE(local_sender->getLogger(), "before mocklocalreader invoking consumerFinish!"); + local_sender->consumerFinish("Receiver closed"); + LOG_FMT_TRACE(local_sender->getLogger(), "after mocklocalreader invoking consumerFinish!"); } + thread_manager->wait(); } void read() { while (true) { - MPPDataPacketPtr tmp_packet = tunnel->readForLocal(); + MPPDataPacketPtr tmp_packet = local_sender->readForLocal(); bool success = tmp_packet != nullptr; if (success) { @@ -125,25 +94,32 @@ using MockLocalReaderPtr = std::shared_ptr; struct MockTerminateLocalReader { - MPPTunnelTestPtr tunnel; + LocalTunnelSenderPtr local_sender; + std::shared_ptr thread_manager; - explicit MockTerminateLocalReader(const MPPTunnelTestPtr & tunnel_) - : tunnel(tunnel_) - {} + explicit MockTerminateLocalReader(const LocalTunnelSenderPtr & local_sender_) + : local_sender(local_sender_) + , thread_manager(newThreadManager()) + { + thread_manager->schedule(true, "LocalReader", [this] { + this->read(); + }); + } ~MockTerminateLocalReader() { - if (tunnel) + if (local_sender) { // In case that ExchangeReceiver throw error before finish reading from mpp_tunnel - tunnel->consumerFinish("Receiver closed"); + local_sender->consumerFinish("Receiver closed"); } + thread_manager->wait(); } void read() const { - MPPDataPacketPtr tmp_packet = tunnel->readForLocal(); - tunnel->consumerFinish("Receiver closed"); + MPPDataPacketPtr tmp_packet = local_sender->readForLocal(); + local_sender->consumerFinish("Receiver closed"); } }; using MockTerminateLocalReaderPtr = std::shared_ptr; @@ -152,29 +128,32 @@ using MockTerminateLocalReaderPtr = std::shared_ptr; class MockAsyncWriter : public PacketWriter { public: - explicit MockAsyncWriter(MPPTunnelTestPtr tunnel_) - : tunnel(tunnel_) - {} + explicit MockAsyncWriter() {} + bool write(const mpp::MPPDataPacket & packet) override { write_packet_vec.push_back(packet.data()); // Simulate the async process, write success then check if exist msg, then write again - if (tunnel->isSendQueueNextPopNonBlocking()) + if (async_sender->isSendQueueNextPopNonBlocking()) { - tunnel->sendJob(false); + async_sender->sendOne(); } return true; } void tryFlushOne() override { - if (ready && tunnel->isSendQueueNextPopNonBlocking()) + if (ready && async_sender->isSendQueueNextPopNonBlocking()) { - tunnel->sendJob(false); + async_sender->sendOne(); } ready = true; } - MPPTunnelTestPtr tunnel; + void attachAsyncTunnelSender(const std::shared_ptr & async_tunnel_sender_) override + { + async_sender = async_tunnel_sender_; + } + AsyncTunnelSenderPtr async_sender; std::vector write_packet_vec; bool ready = false; }; @@ -182,34 +161,37 @@ class MockAsyncWriter : public PacketWriter class MockFailedAsyncWriter : public PacketWriter { public: - explicit MockFailedAsyncWriter(MPPTunnelTestPtr tunnel_) - : tunnel(tunnel_) - {} + explicit MockFailedAsyncWriter() {} bool write(const mpp::MPPDataPacket & packet) override { write_packet_vec.push_back(packet.data()); // Simulate the async process, write success then check if exist msg, then write again - if (tunnel->isSendQueueNextPopNonBlocking()) + if (async_sender->isSendQueueNextPopNonBlocking()) { - tunnel->sendJob(false); + async_sender->sendOne(); } return false; } void tryFlushOne() override { - if (ready && tunnel->isSendQueueNextPopNonBlocking()) + if (ready && async_sender->isSendQueueNextPopNonBlocking()) { - tunnel->sendJob(false); + async_sender->sendOne(); } ready = true; } - MPPTunnelTestPtr tunnel; + + void attachAsyncTunnelSender(const std::shared_ptr & async_tunnel_sender_) override + { + async_sender = async_tunnel_sender_; + } + AsyncTunnelSenderPtr async_sender; std::vector write_packet_vec; bool ready = false; }; -class TestMPPTunnelBase : public testing::Test +class TestMPPTunnel : public testing::Test { protected: virtual void SetUp() override { timeout = std::chrono::seconds(10); } @@ -217,163 +199,186 @@ class TestMPPTunnelBase : public testing::Test std::chrono::seconds timeout; public: - MPPTunnelTestPtr constructRemoteSyncTunnel() + MPPTunnelPtr constructRemoteSyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, false, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, false, String("0")); return tunnel; } - MPPTunnelTestPtr constructLocalSyncTunnel() + MPPTunnelPtr constructLocalSyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, true, false, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, true, false, String("0")); return tunnel; } - static MockLocalReaderPtr connectLocalSyncTunnel(MPPTunnelTestPtr mpp_tunnel_ptr) + static MockLocalReaderPtr connectLocalSyncTunnel(MPPTunnelPtr mpp_tunnel_ptr) { mpp_tunnel_ptr->connect(nullptr); - MockLocalReaderPtr local_reader_ptr = std::make_shared(mpp_tunnel_ptr); - mpp_tunnel_ptr->getThreadManager()->schedule(true, "LocalReader", [local_reader_ptr] { - local_reader_ptr->read(); - }); + MockLocalReaderPtr local_reader_ptr = std::make_shared(mpp_tunnel_ptr->getLocalTunnelSender()); return local_reader_ptr; } - MPPTunnelTestPtr constructRemoteAsyncTunnel() + MPPTunnelPtr constructRemoteAsyncTunnel() { - auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, true, String("0")); + auto tunnel = std::make_shared(String("0000_0001"), timeout, 2, false, true, String("0")); return tunnel; } + + void waitSyncTunnelSenderThread(SyncTunnelSenderPtr sync_tunnel_sender) + { + sync_tunnel_sender->thread_manager->wait(); + } + + void setTunnelFinished(MPPTunnelPtr tunnel) + { + tunnel->status = MPPTunnel::TunnelStatus::Finished; + } + + bool getTunnelConnectedFlag(MPPTunnelPtr tunnel) + { + return tunnel->status != MPPTunnel::TunnelStatus::Unconnected && tunnel->status != MPPTunnel::TunnelStatus::Finished; + } + + bool getTunnelFinishedFlag(MPPTunnelPtr tunnel) + { + return tunnel->status == MPPTunnel::TunnelStatus::Finished; + } + + bool getTunnelSenderConsumerFinishedFlag(TunnelSenderPtr sender) + { + return sender->isConsumerFinished(); + } }; -TEST_F(TestMPPTunnelBase, ConnectWhenFinished) +TEST_F(TestMPPTunnel, ConnectWhenFinished) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - mpp_tunnel_ptr->setFinishFlag(true); + setTunnelFinished(mpp_tunnel_ptr); mpp_tunnel_ptr->connect(nullptr); GTEST_FAIL(); } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "MPPTunnel has finished"); + GTEST_ASSERT_EQ(e.message(), "MPPTunnel has connected or finished: Finished"); } -TEST_F(TestMPPTunnelBase, ConnectWhenConnected) +TEST_F(TestMPPTunnel, ConnectWhenConnected) { try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->connect(writer_ptr.get()); GTEST_FAIL(); } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "MPPTunnel has connected"); + GTEST_ASSERT_EQ(e.message(), "MPPTunnel has connected or finished: Connected"); } } -TEST_F(TestMPPTunnelBase, CloseBeforeConnect) +TEST_F(TestMPPTunnel, CloseBeforeConnect) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); mpp_tunnel_ptr->close("Canceled"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), false); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), false); } CATCH -TEST_F(TestMPPTunnelBase, CloseAfterClose) +TEST_F(TestMPPTunnel, CloseAfterClose) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); mpp_tunnel_ptr->close("Canceled"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->close("Canceled"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); } CATCH -TEST_F(TestMPPTunnelBase, ConnectWriteCancel) +TEST_F(TestMPPTunnel, ConnectWriteCancel) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->close("Cancel"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 2); //Second for err msg GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, ConnectWriteWithCloseFlag) +TEST_F(TestMPPTunnel, ConnectWriteWithCloseFlag) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr, true); mpp_tunnel_ptr->waitForFinish(); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, ConnectWriteWriteDone) +TEST_F(TestMPPTunnel, ConnectWriteWriteDone) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->writeDone(); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, ConsumerFinish) +TEST_F(TestMPPTunnel, ConsumerFinish) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->consumerFinish(""); - mpp_tunnel_ptr->getThreadManager()->wait(); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + mpp_tunnel_ptr->getSyncTunnelSender()->consumerFinish(""); + waitSyncTunnelSenderThread(mpp_tunnel_ptr->getSyncTunnelSender()); + + GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, WriteError) +TEST_F(TestMPPTunnel, WriteError) { try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -386,14 +391,14 @@ TEST_F(TestMPPTunnelBase, WriteError) } } -TEST_F(TestMPPTunnelBase, WriteAfterFinished) +TEST_F(TestMPPTunnel, WriteAfterFinished) { try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->close("Canceled"); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); @@ -408,122 +413,119 @@ TEST_F(TestMPPTunnelBase, WriteAfterFinished) } /// Test Local MPPTunnel -TEST_F(TestMPPTunnelBase, LocalConnectWhenFinished) +TEST_F(TestMPPTunnel, LocalConnectWhenFinished) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); - mpp_tunnel_ptr->setFinishFlag(true); + setTunnelFinished(mpp_tunnel_ptr); mpp_tunnel_ptr->connect(nullptr); GTEST_FAIL(); } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "MPPTunnel has finished"); + GTEST_ASSERT_EQ(e.message(), "MPPTunnel has connected or finished: Finished"); } -TEST_F(TestMPPTunnelBase, LocalConnectWhenConnected) +TEST_F(TestMPPTunnel, LocalConnectWhenConnected) { try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->connect(nullptr); GTEST_FAIL(); } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "MPPTunnel has connected"); + GTEST_ASSERT_EQ(e.message(), "MPPTunnel has connected or finished: Connected"); } } -TEST_F(TestMPPTunnelBase, LocalCloseBeforeConnect) +TEST_F(TestMPPTunnel, LocalCloseBeforeConnect) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); mpp_tunnel_ptr->close("Canceled"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), false); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), false); } CATCH -TEST_F(TestMPPTunnelBase, LocalCloseAfterClose) +TEST_F(TestMPPTunnel, LocalCloseAfterClose) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); mpp_tunnel_ptr->close("Canceled"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->close("Canceled"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); } CATCH -TEST_F(TestMPPTunnelBase, LocalConnectWriteCancel) +TEST_F(TestMPPTunnel, LocalConnectWriteCancel) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->close("Cancel"); - mpp_tunnel_ptr->getThreadManager()->wait(); // Join local read thread - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + local_reader_ptr->thread_manager->wait(); // Join local read thread + GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 2); //Second for err msg GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, LocalConnectWriteWriteDone) +TEST_F(TestMPPTunnel, LocalConnectWriteWriteDone) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->writeDone(); - mpp_tunnel_ptr->getThreadManager()->wait(); // Join local read thread - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + local_reader_ptr->thread_manager->wait(); // Join local read thread + GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); - LOG_FMT_TRACE(mpp_tunnel_ptr->getLog(), "basic logic done!"); + LOG_FMT_TRACE(mpp_tunnel_ptr->getLogger(), "basic logic done!"); } CATCH -TEST_F(TestMPPTunnelBase, LocalConsumerFinish) +TEST_F(TestMPPTunnel, LocalConsumerFinish) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->consumerFinish(""); - mpp_tunnel_ptr->getThreadManager()->wait(); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + mpp_tunnel_ptr->getTunnelSender()->consumerFinish(""); + local_reader_ptr->thread_manager->wait(); // Join local read thread + GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, LocalReadTerminate) +TEST_F(TestMPPTunnel, LocalReadTerminate) { try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); mpp_tunnel_ptr->connect(nullptr); - MockTerminateLocalReaderPtr local_reader_ptr = std::make_shared(mpp_tunnel_ptr); - mpp_tunnel_ptr->getThreadManager()->schedule(true, "LocalReader", [local_reader_ptr] { - local_reader_ptr->read(); - }); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + MockTerminateLocalReaderPtr local_reader_ptr = std::make_shared(mpp_tunnel_ptr->getLocalTunnelSender()); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -536,13 +538,13 @@ TEST_F(TestMPPTunnelBase, LocalReadTerminate) } } -TEST_F(TestMPPTunnelBase, LocalWriteAfterFinished) +TEST_F(TestMPPTunnel, LocalWriteAfterFinished) { try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->close(""); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); @@ -557,13 +559,14 @@ TEST_F(TestMPPTunnelBase, LocalWriteAfterFinished) } /// Test Async MPPTunnel -TEST_F(TestMPPTunnelBase, AsyncConnectWriteCancel) +TEST_F(TestMPPTunnel, AsyncConnectWriteCancel) try { auto mpp_tunnel_ptr = constructRemoteAsyncTunnel(); - std::unique_ptr async_writer_ptr = std::make_unique(mpp_tunnel_ptr); + std::unique_ptr async_writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(async_writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); @@ -571,56 +574,56 @@ try data_packet_ptr->set_data("Second"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->close("Cancel"); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec.size(), 3); //Third for err msg GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec[0], "First"); GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec[1], "Second"); } CATCH -TEST_F(TestMPPTunnelBase, AsyncConnectWriteWriteDone) +TEST_F(TestMPPTunnel, AsyncConnectWriteWriteDone) try { auto mpp_tunnel_ptr = constructRemoteAsyncTunnel(); - std::unique_ptr async_writer_ptr = std::make_unique(mpp_tunnel_ptr); + std::unique_ptr async_writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(async_writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->writeDone(); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec[0], "First"); } CATCH -TEST_F(TestMPPTunnelBase, AsyncConsumerFinish) +TEST_F(TestMPPTunnel, AsyncConsumerFinish) try { auto mpp_tunnel_ptr = constructRemoteAsyncTunnel(); - std::unique_ptr async_writer_ptr = std::make_unique(mpp_tunnel_ptr); + std::unique_ptr async_writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(async_writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->consumerFinish(""); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); + mpp_tunnel_ptr->getTunnelSender()->consumerFinish(""); + GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); GTEST_ASSERT_EQ(dynamic_cast(async_writer_ptr.get())->write_packet_vec.size(), 0); } CATCH -TEST_F(TestMPPTunnelBase, AsyncWriteError) +TEST_F(TestMPPTunnel, AsyncWriteError) { try { auto mpp_tunnel_ptr = constructRemoteAsyncTunnel(); - std::unique_ptr async_writer_ptr = std::make_unique(mpp_tunnel_ptr); + std::unique_ptr async_writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(async_writer_ptr.get()); - GTEST_ASSERT_EQ(mpp_tunnel_ptr->getConnectFlag(), true); + GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -636,4 +639,4 @@ TEST_F(TestMPPTunnelBase, AsyncWriteError) } } // namespace tests -} // namespace DB +} // namespace DB \ No newline at end of file