From ec3633d80024d04c8c1264d3210726cc52fe33b3 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 10 May 2022 16:34:43 +0800 Subject: [PATCH 01/14] Add random failpoints for Tunnel & Reciever Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 5 ++++ dbms/src/Flash/EstablishCall.cpp | 8 ++++++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 30 +++++++++++++++++++- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 10 ++++++- dbms/src/Flash/Mpp/MPPTunnel.cpp | 19 ++++++++++++- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 ++++++ 6 files changed, 77 insertions(+), 3 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index c6c3caa44ad..0ea45473755 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -105,6 +105,10 @@ std::unordered_map> FailPointHelper::f M(pause_after_copr_streams_acquired) \ M(pause_before_server_merge_one_delta) +#define APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) \ + M(random_tunnel_failpoint) \ + M(random_receiver_failpoint) + namespace FailPoints { #define M(NAME) extern const char(NAME)[] = #NAME ""; @@ -112,6 +116,7 @@ APPLY_FOR_FAILPOINTS_ONCE(M) APPLY_FOR_FAILPOINTS(M) APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) +APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) #undef M } // namespace FailPoints diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 8af81e30962..e27882057c1 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -13,12 +13,18 @@ // limitations under the License. #include +#include #include #include #include namespace DB { +namespace FailPoints +{ +extern const char random_tunnel_failpoint[]; +} // namespace FailPoints + EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) : service(service) , cq(cq) @@ -60,6 +66,7 @@ void EstablishCallData::tryFlushOne() void EstablishCallData::responderFinish(const grpc::Status & status) { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_failpoint); if (*is_shutdown) finishTunnelAndResponder(); else @@ -71,6 +78,7 @@ void EstablishCallData::initRpc() std::exception_ptr eptr = nullptr; try { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_failpoint); service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this); } catch (...) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index f194afee31f..8320651c968 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -15,13 +15,24 @@ #include #include #include +#include #include #include #include #include +#ifdef FIU_ENABLE +#include +#include +#endif + namespace DB { +namespace FailPoints +{ +extern const char random_receiver_failpoint[]; +} // namespace FailPoints + namespace { String getReceiverStateStr(const ExchangeReceiverState & s) @@ -477,13 +488,30 @@ void ExchangeReceiverBase::readLoop(const Request & req) recv_msg->req_info = req_info; recv_msg->source_index = req.source_index; bool success = reader->read(recv_msg->packet); + fiu_do_on(FailPoints::random_receiver_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 11) + success = false; + }); if (!success) break; has_data = true; if (recv_msg->packet->has_error()) throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg()); - if (!msg_channel.push(std::move(recv_msg))) + bool push_success = msg_channel.push(std::move(recv_msg)); + fiu_do_on(FailPoints::random_receiver_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 71) + push_success = false; + }); + if (!push_success) { meet_error = true; auto local_state = getState(); diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index b0e49792d1a..af65305fa42 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -50,6 +51,11 @@ struct RpcTypeTraits<::mpp::EstablishMPPConnectionRequest> namespace DB { +namespace FailPoints +{ +extern const char random_receiver_failpoint[]; +} // namespace FailPoints + namespace { struct GrpcExchangePacketReader : public ExchangePacketReader @@ -218,7 +224,9 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques if (request.is_local) { auto [tunnel, status] = establishMPPConnectionLocal(request.req.get(), task_manager); - if (!status.ok()) + bool status_ok = status.ok(); + fiu_do_on(FailPoints::random_receiver_failpoint, status_ok = false;); + if (!status_ok) { throw Exception("Exchange receiver meet error : " + status.error_message()); } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 826e7fea88a..c922f406dbe 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -16,15 +16,22 @@ #include #include #include +#include #include #include #include +#ifdef FIU_ENABLE +#include +#include +#endif + namespace DB { namespace FailPoints { extern const char exception_during_mpp_close_tunnel[]; +extern const char random_tunnel_failpoint[]; } // namespace FailPoints template @@ -194,7 +201,16 @@ void MPPTunnelBase::sendJob(bool need_lock) MPPDataPacketPtr res; while (send_queue.pop(res)) { - if (!writer->write(*res)) + bool write_success = writer->write(*res); + fiu_do_on(FailPoints::random_tunnel_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 17) + write_success = false; + }); + if (!write_success) { err_msg = "grpc writes failed."; break; @@ -322,6 +338,7 @@ void MPPTunnelBase::waitUntilConnectedOrFinished(std::unique_lockset_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->close("Cancel"); +<<<<<<< HEAD mpp_tunnel_ptr->getThreadManager()->wait(); // Join local read thread +======= + mpp_tunnel_ptr->getThreadManager->wait(); // Join local read thread +>>>>>>> 009ed811b (Fix gtest unstable issue) GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 2); //Second for err msg GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); @@ -487,7 +491,11 @@ try data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->writeDone(); +<<<<<<< HEAD mpp_tunnel_ptr->getThreadManager()->wait(); // Join local read thread +======= + mpp_tunnel_ptr->getThreadManager->wait(); // Join local read thread +>>>>>>> 009ed811b (Fix gtest unstable issue) GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); From 26d26f76846dbb57c08f9362f6e6363fd5b92ec1 Mon Sep 17 00:00:00 2001 From: yibin Date: Sun, 1 May 2022 09:27:21 +0800 Subject: [PATCH 02/14] Fix format issue Signed-off-by: yibin --- dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index e5f9e763990..47ce2ee6ee6 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -469,11 +469,7 @@ try data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->close("Cancel"); -<<<<<<< HEAD mpp_tunnel_ptr->getThreadManager()->wait(); // Join local read thread -======= - mpp_tunnel_ptr->getThreadManager->wait(); // Join local read thread ->>>>>>> 009ed811b (Fix gtest unstable issue) GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 2); //Second for err msg GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); @@ -491,11 +487,7 @@ try data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); mpp_tunnel_ptr->writeDone(); -<<<<<<< HEAD mpp_tunnel_ptr->getThreadManager()->wait(); // Join local read thread -======= - mpp_tunnel_ptr->getThreadManager->wait(); // Join local read thread ->>>>>>> 009ed811b (Fix gtest unstable issue) GTEST_ASSERT_EQ(mpp_tunnel_ptr->getFinishFlag(), true); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 1); GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); From 43f7693edd7adc5ec9c3377479416b59af0656da Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 12 May 2022 18:56:48 +0800 Subject: [PATCH 03/14] Add join & aggregate failpoints Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 7 ++++- .../DataStreams/SharedQueryBlockInputStream.h | 19 ++++++++++++ dbms/src/DataStreams/SizeLimits.cpp | 20 ++++++++++++- dbms/src/Interpreters/Aggregator.cpp | 19 ++++++++++++ dbms/src/Interpreters/Join.cpp | 30 ++++++++++++++++++- 5 files changed, 92 insertions(+), 3 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 0ea45473755..265588ff117 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -107,7 +107,12 @@ std::unordered_map> FailPointHelper::f #define APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) \ M(random_tunnel_failpoint) \ - M(random_receiver_failpoint) + M(random_receiver_failpoint) \ + M(random_limit_check_failpoint) \ + M(random_join_failpoint) \ + M(random_aggregate_failpoint) \ + M(random_sharedquery_failpoint) + namespace FailPoints { diff --git a/dbms/src/DataStreams/SharedQueryBlockInputStream.h b/dbms/src/DataStreams/SharedQueryBlockInputStream.h index e7cece67f0b..c6203ce942d 100644 --- a/dbms/src/DataStreams/SharedQueryBlockInputStream.h +++ b/dbms/src/DataStreams/SharedQueryBlockInputStream.h @@ -18,12 +18,23 @@ #include #include #include +#include #include #include +#ifdef FIU_ENABLE +#include +#include +#endif + namespace DB { +namespace FailPoints +{ +extern const char random_sharedquery_failpoint[]; +} // namespace FailPoints + /** This block input stream is used by SharedQuery. * It enable multiple threads read from one stream. */ @@ -136,6 +147,14 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream in->readPrefix(); while (true) { + fiu_do_on(FailPoints::random_sharedquery_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 13) + throw Exception("Fail point shared query is triggered.", ErrorCodes::FAIL_POINT_ERROR); + }); Block block = in->read(); // in is finished or queue is canceled if (!block || !queue.push(block)) diff --git a/dbms/src/DataStreams/SizeLimits.cpp b/dbms/src/DataStreams/SizeLimits.cpp index 7dd5e1524ba..06281ad1661 100644 --- a/dbms/src/DataStreams/SizeLimits.cpp +++ b/dbms/src/DataStreams/SizeLimits.cpp @@ -15,15 +15,33 @@ #include #include #include +#include #include +#ifdef FIU_ENABLE +#include +#include +#endif namespace DB { +namespace FailPoints +{ +extern const char random_limit_check_failpoint[]; +} // namespace FailPoints bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const { - if (max_rows && rows > max_rows) + bool rows_exceed_limit = max_rows && rows > max_rows; + fiu_do_on(FailPoints::random_limit_check_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 53) + rows_exceed_limit = false; + }); + if (rows_exceed_limit) { if (overflow_mode == OverflowMode::THROW) throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 6e067b88d81..e3650345f09 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,10 @@ #include #include +#ifdef FIU_ENABLE +#include +#include +#endif namespace ProfileEvents { @@ -61,6 +66,10 @@ extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS; extern const int LOGICAL_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char random_aggregate_failpoint[]; +} // namespace FailPoints AggregatedDataVariants::~AggregatedDataVariants() { @@ -330,6 +339,7 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const * In order that then everything is properly destroyed, we "roll back" some of the created states. * The code is not very convenient. */ + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_failpoint); aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]); } catch (...) @@ -1521,6 +1531,15 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream if (current_bucket_num >= NUM_BUCKETS) return {}; + fiu_do_on(FailPoints::random_aggregate_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 41) + throw Exception("Fail point aggregate is triggered.", ErrorCodes::FAIL_POINT_ERROR); + }); + AggregatedDataVariantsPtr & first = data[0]; if (current_bucket_num == -1) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 820618a6e8b..28a47d5421f 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,9 +27,21 @@ #include #include #include +#include +#include "executeQuery.h" + +#ifdef FIU_ENABLE +#include +#include +#endif namespace DB { +namespace FailPoints +{ +extern const char random_join_failpoint[]; +} // namespace FailPoints + namespace ErrorCodes { extern const int UNKNOWN_SET_DATA_VARIANT; @@ -621,6 +634,14 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( } for (size_t insert_index = 0; insert_index < segment_index_info.size(); insert_index++) { + fiu_do_on(FailPoints::random_join_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 31) + throw Exception("Fail point join build is triggered.", ErrorCodes::FAIL_POINT_ERROR); + }); size_t segment_index = (insert_index + stream_index) % segment_index_info.size(); if (segment_index == segment_size) { @@ -1513,7 +1534,14 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const default: throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT); } - + fiu_do_on(FailPoints::random_join_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/100 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 100)(rng); + if (num == 67) + throw Exception("Fail point join prob is triggered.", ErrorCodes::FAIL_POINT_ERROR); + }); for (size_t i = 0; i < num_columns_to_add; ++i) { const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); From 88d37a7dae5d4d41bdd4d9673e41fe6f0f514ad8 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 12 May 2022 21:09:20 +0800 Subject: [PATCH 04/14] Add thread pool failpoints Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 4 ++-- dbms/src/Common/wrapInvocable.h | 21 ++++++++++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 265588ff117..301ef1bdf97 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -111,8 +111,8 @@ std::unordered_map> FailPointHelper::f M(random_limit_check_failpoint) \ M(random_join_failpoint) \ M(random_aggregate_failpoint) \ - M(random_sharedquery_failpoint) - + M(random_sharedquery_failpoint) \ + M(random_thread_failpoint) namespace FailPoints { diff --git a/dbms/src/Common/wrapInvocable.h b/dbms/src/Common/wrapInvocable.h index d6cee519835..b6d1c99d25f 100644 --- a/dbms/src/Common/wrapInvocable.h +++ b/dbms/src/Common/wrapInvocable.h @@ -15,9 +15,20 @@ #pragma once #include +#include + +#ifdef FIU_ENABLE +#include +#include +#endif namespace DB { +namespace FailPoints +{ +extern const char random_thread_failpoint[]; +} // namespace FailPoints + template inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&... args) { @@ -33,7 +44,15 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&.. args = std::make_tuple(std::forward(args)...)]() mutable { MemoryTrackerSetter setter(propagate_memory_tracker, memory_tracker); // run the task with the parameters provided - return std::apply(std::move(func), std::move(args)); + std::apply(std::move(func), std::move(args)); + fiu_do_on(FailPoints::random_thread_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/1000 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 1000)(rng); + if (num == 241) + throw Exception("Fail point aggregate is triggered.", ErrorCodes::FAIL_POINT_ERROR); + }); }; return capture; From 1ad20aa9aa00d410b8d1696c16ae6be86ef157db Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 13 May 2022 11:02:19 +0800 Subject: [PATCH 05/14] Add taskmanager query failpoints Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 5 ++++- dbms/src/Common/wrapInvocable.h | 7 +++++-- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 19 +++++++++++++++++++ dbms/src/Interpreters/executeQuery.cpp | 7 ++++++- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 301ef1bdf97..92c9d2be073 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -112,7 +112,10 @@ std::unordered_map> FailPointHelper::f M(random_join_failpoint) \ M(random_aggregate_failpoint) \ M(random_sharedquery_failpoint) \ - M(random_thread_failpoint) + M(random_thread_failpoint) \ + M(random_interpreter_failpoint) \ + M(random_scheduler_failpoint) \ + M(random_task_manager_failpoint) namespace FailPoints { diff --git a/dbms/src/Common/wrapInvocable.h b/dbms/src/Common/wrapInvocable.h index b6d1c99d25f..5ac131d2622 100644 --- a/dbms/src/Common/wrapInvocable.h +++ b/dbms/src/Common/wrapInvocable.h @@ -44,6 +44,7 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&.. args = std::make_tuple(std::forward(args)...)]() mutable { MemoryTrackerSetter setter(propagate_memory_tracker, memory_tracker); // run the task with the parameters provided + #ifdef FIU_ENABLE std::apply(std::move(func), std::move(args)); fiu_do_on(FailPoints::random_thread_failpoint, { // Since the code will run very frequently, then other failpoint might have no chance to trigger @@ -51,10 +52,12 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&.. pcg64 rng(randomSeed()); int num = std::uniform_int_distribution(0, 1000)(rng); if (num == 241) - throw Exception("Fail point aggregate is triggered.", ErrorCodes::FAIL_POINT_ERROR); + throw Exception("Fail point thread is triggered.", ErrorCodes::FAIL_POINT_ERROR); }); + #else + return std::apply(std::move(func), std::move(args)); + #endif }; - return capture; } } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 531f8f7a10d..6dc902cc025 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -20,8 +21,18 @@ #include #include +#ifdef FIU_ENABLE +#include +#include +#endif + namespace DB { +namespace FailPoints +{ +extern const char random_task_manager_failpoint[]; +} // namespace FailPoints + MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , log(&Poco::Logger::get("TaskManager")) @@ -50,6 +61,14 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std:: it = query_it->second->task_map.find(id); return it != query_it->second->task_map.end(); }); + fiu_do_on(FailPoints::random_task_manager_failpoint, { + // Since the code will run very frequently, then other failpoint might have no chance to trigger + // so internally low down the possibility to 1/1000 + pcg64 rng(randomSeed()); + int num = std::uniform_int_distribution(0, 1000)(rng); + if (num == 127) + ret = false; + }); if (cancelled) { errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id()); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 96cfc0a58ae..e1f060ab17c 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -53,7 +54,10 @@ extern const int LOGICAL_ERROR; extern const int QUERY_IS_TOO_LARGE; extern const int INTO_OUTFILE_NOT_ALLOWED; } // namespace ErrorCodes - +namespace FailPoints +{ +extern const char random_interpreter_failpoint[]; +} // namespace FailPoints namespace { void checkASTSizeLimits(const IAST & ast, const Settings & settings) @@ -226,6 +230,7 @@ std::tuple executeQueryImpl( context.setProcessListElement(&process_list_entry->get()); } + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); auto interpreter = query_src.interpreter(context, stage); res = interpreter->execute(); From 5fb687d267e78d00b474f17072360da751d750e8 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 13 May 2022 11:47:43 +0800 Subject: [PATCH 06/14] Add failpoints to produce situation that MPPTask destructed in CancelMPPTask grpc thread Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 2 +- dbms/src/Flash/Mpp/MPPTask.cpp | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 92c9d2be073..12fc6131c1e 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -114,7 +114,7 @@ std::unordered_map> FailPointHelper::f M(random_sharedquery_failpoint) \ M(random_thread_failpoint) \ M(random_interpreter_failpoint) \ - M(random_scheduler_failpoint) \ + M(random_task_lifecycle_failpoint) \ M(random_task_manager_failpoint) namespace FailPoints diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 40f03ff79ba..0ba02b75ba2 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -51,6 +51,7 @@ extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[]; extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[]; extern const char exception_during_mpp_write_err_to_tunnel[]; extern const char force_no_local_region_for_mpp_task[]; +extern const char random_task_lifecycle_failpoint[]; } // namespace FailPoints MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) @@ -397,7 +398,15 @@ void MPPTask::runImpl() writeErrToAllTunnels(err_msg); } LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); - unregisterTask(); + // unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed + // by grpc CancelMPPTask thread; + bool unregister = true; + fiu_do_on(FailPoints::random_task_lifecycle_failpoint, { + if (!err_msg.empty()) + unregister = false; + }); + if (unregister) + unregisterTask(); if (switchStatus(RUNNING, FINISHED)) LOG_INFO(log, "finish task"); From 9a56e2a84f7774572cb97faac9cb3d07011b26d5 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 15 Jun 2022 15:47:51 +0800 Subject: [PATCH 07/14] Add RandomFailPoint cfg support and some refact Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 76 +++++++++++++++---- dbms/src/Common/FailPoint.h | 19 ++++- dbms/src/Common/wrapInvocable.h | 23 ------ .../DataStreams/SharedQueryBlockInputStream.h | 14 +--- dbms/src/DataStreams/SizeLimits.cpp | 14 +--- dbms/src/Flash/EstablishCall.cpp | 5 +- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 29 ++----- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 10 +-- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 14 +--- dbms/src/Flash/Mpp/MPPTunnel.cpp | 21 +---- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 10 ++- dbms/src/Interpreters/Aggregator.cpp | 19 +---- dbms/src/Interpreters/Join.cpp | 26 +------ dbms/src/Server/Server.cpp | 2 + 14 files changed, 114 insertions(+), 168 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 12fc6131c1e..89ff516627e 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -13,15 +13,19 @@ // limitations under the License. #include - +#include #include #include #include +#include +#include +#include +#include +#include namespace DB { std::unordered_map> FailPointHelper::fail_point_wait_channels; - #define APPLY_FOR_FAILPOINTS_ONCE(M) \ M(exception_between_drop_meta_and_data) \ M(exception_between_alter_data_and_meta) \ @@ -105,17 +109,21 @@ std::unordered_map> FailPointHelper::f M(pause_after_copr_streams_acquired) \ M(pause_before_server_merge_one_delta) -#define APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) \ - M(random_tunnel_failpoint) \ - M(random_receiver_failpoint) \ - M(random_limit_check_failpoint) \ - M(random_join_failpoint) \ - M(random_aggregate_failpoint) \ - M(random_sharedquery_failpoint) \ - M(random_thread_failpoint) \ - M(random_interpreter_failpoint) \ - M(random_task_lifecycle_failpoint) \ - M(random_task_manager_failpoint) +#define APPLY_FOR_RANDOM_FAILPOINTS(M) \ + M(random_tunnel_wait_timeout_failpoint) \ + M(random_tunnel_init_rpc_failure_failpoint) \ + M(random_receiver_sync_msg_push_failure_failpoint) \ + M(random_receiver_async_msg_push_failure_failpoint) \ + M(random_limit_check_failpoint) \ + M(random_join_build_failpoint) \ + M(random_join_prob_failpoint) \ + M(random_aggregate_create_state_failpoint) \ + M(random_aggregate_merge_failpoint) \ + M(random_sharedquery_failpoint) \ + M(random_interpreter_failpoint) \ + M(random_task_lifecycle_failpoint) \ + M(random_task_manager_failpoint) \ + M(random_min_tso_scheduler_failpoint) namespace FailPoints { @@ -124,7 +132,7 @@ APPLY_FOR_FAILPOINTS_ONCE(M) APPLY_FOR_FAILPOINTS(M) APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) -APPLY_FOR_RANDOM_FAILPOINTS_ENABLED_OUTSIDE(M) +APPLY_FOR_RANDOM_FAILPOINTS(M) #undef M } // namespace FailPoints @@ -188,7 +196,7 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name) #undef M #undef SUB_M - throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR); + throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR); } void FailPointHelper::disableFailPoint(const String & fail_point_name) @@ -213,6 +221,40 @@ void FailPointHelper::wait(const String & fail_point_name) ptr->wait(); } } + +void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) +{ + String random_fail_point_cfg = config.getString("flash.random_fail_points", ""); + if (random_fail_point_cfg.empty()) + return; + + Poco::StringTokenizer string_tokens(random_fail_point_cfg, ","); + for (const auto & string_token : string_tokens) + { + Poco::StringTokenizer pair_tokens(string_token, "-"); + RUNTIME_ASSERT((pair_tokens.count() == 2), log, "RandomFailPoints config should be FailPointA-RatioA,FailPointB-RatioB,... format"); + double rate = atof(pair_tokens[1].c_str()); //NOLINT(cert-err34-c): check conversion error manually + RUNTIME_ASSERT((0 <= rate && rate <= 1.0), log, "RandomFailPoint trigger rate should in [0,1], while {}", rate); + enableRandomFailPoint(pair_tokens[0], rate); + } + LOG_FMT_INFO(log, "Enable RandomFailPoints: {}", random_fail_point_cfg); +} + +void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) +{ +#define SUB_M(NAME) \ + if (fail_point_name == FailPoints::NAME) \ + { \ + fiu_enable_random(FailPoints::NAME, 1, nullptr, 0, rate); \ + return; \ + } + +#define M(NAME) SUB_M(NAME) + APPLY_FOR_RANDOM_FAILPOINTS(M) +#undef M + + throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR); +} #else class FailPointChannel { @@ -223,6 +265,10 @@ void FailPointHelper::enableFailPoint(const String &) {} void FailPointHelper::disableFailPoint(const String &) {} void FailPointHelper::wait(const String &) {} + +void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) {} + +void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) {} #endif } // namespace DB diff --git a/dbms/src/Common/FailPoint.h b/dbms/src/Common/FailPoint.h index 2cf40ad55e4..7556cd5b15e 100644 --- a/dbms/src/Common/FailPoint.h +++ b/dbms/src/Common/FailPoint.h @@ -21,6 +21,15 @@ #include +namespace Poco +{ +class Logger; +namespace Util +{ +class LayeredConfiguration; +} +} // namespace Poco + namespace DB { namespace ErrorCodes @@ -35,7 +44,6 @@ extern const int FAIL_POINT_ERROR; // When `fail_point` is enabled, wait till it is disabled #define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);) - class FailPointChannel; class FailPointHelper { @@ -46,6 +54,15 @@ class FailPointHelper static void wait(const String & fail_point_name); + /* + * For Server RandomFailPoint test usage. When FIU_ENABLE is defined, this function does the following work: + * 1. Return if TiFlash config has empty flash.random_fail_points cfg + * 2. Parse flash.random_fail_points, which expect to has "FailPointA-RatioA,FailPointB-RatioB,..." format + * 3. Call enableRandomFailPoint method with parsed FailPointName and Rate + */ + static void initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + + static void enableRandomFailPoint(const String & fail_point_name, double rate); private: static std::unordered_map> fail_point_wait_channels; }; diff --git a/dbms/src/Common/wrapInvocable.h b/dbms/src/Common/wrapInvocable.h index 5ac131d2622..1c93bb3e782 100644 --- a/dbms/src/Common/wrapInvocable.h +++ b/dbms/src/Common/wrapInvocable.h @@ -15,20 +15,9 @@ #pragma once #include -#include - -#ifdef FIU_ENABLE -#include -#include -#endif namespace DB { -namespace FailPoints -{ -extern const char random_thread_failpoint[]; -} // namespace FailPoints - template inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&... args) { @@ -44,19 +33,7 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&.. args = std::make_tuple(std::forward(args)...)]() mutable { MemoryTrackerSetter setter(propagate_memory_tracker, memory_tracker); // run the task with the parameters provided - #ifdef FIU_ENABLE - std::apply(std::move(func), std::move(args)); - fiu_do_on(FailPoints::random_thread_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/1000 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 1000)(rng); - if (num == 241) - throw Exception("Fail point thread is triggered.", ErrorCodes::FAIL_POINT_ERROR); - }); - #else return std::apply(std::move(func), std::move(args)); - #endif }; return capture; } diff --git a/dbms/src/DataStreams/SharedQueryBlockInputStream.h b/dbms/src/DataStreams/SharedQueryBlockInputStream.h index c6203ce942d..7d426921d45 100644 --- a/dbms/src/DataStreams/SharedQueryBlockInputStream.h +++ b/dbms/src/DataStreams/SharedQueryBlockInputStream.h @@ -23,11 +23,6 @@ #include -#ifdef FIU_ENABLE -#include -#include -#endif - namespace DB { namespace FailPoints @@ -147,14 +142,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream in->readPrefix(); while (true) { - fiu_do_on(FailPoints::random_sharedquery_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 13) - throw Exception("Fail point shared query is triggered.", ErrorCodes::FAIL_POINT_ERROR); - }); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_sharedquery_failpoint); Block block = in->read(); // in is finished or queue is canceled if (!block || !queue.push(block)) diff --git a/dbms/src/DataStreams/SizeLimits.cpp b/dbms/src/DataStreams/SizeLimits.cpp index 06281ad1661..9a03d7985a9 100644 --- a/dbms/src/DataStreams/SizeLimits.cpp +++ b/dbms/src/DataStreams/SizeLimits.cpp @@ -18,11 +18,6 @@ #include #include -#ifdef FIU_ENABLE -#include -#include -#endif - namespace DB { namespace FailPoints @@ -33,14 +28,7 @@ extern const char random_limit_check_failpoint[]; bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const { bool rows_exceed_limit = max_rows && rows > max_rows; - fiu_do_on(FailPoints::random_limit_check_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 53) - rows_exceed_limit = false; - }); + fiu_do_on(FailPoints::random_limit_check_failpoint, rows_exceed_limit = false;); if (rows_exceed_limit) { if (overflow_mode == OverflowMode::THROW) diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index e27882057c1..11e85393db7 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -22,7 +22,7 @@ namespace DB { namespace FailPoints { -extern const char random_tunnel_failpoint[]; +extern const char random_tunnel_init_rpc_failure_failpoint[]; } // namespace FailPoints EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) @@ -66,7 +66,6 @@ void EstablishCallData::tryFlushOne() void EstablishCallData::responderFinish(const grpc::Status & status) { - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_failpoint); if (*is_shutdown) finishTunnelAndResponder(); else @@ -78,7 +77,7 @@ void EstablishCallData::initRpc() std::exception_ptr eptr = nullptr; try { - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_failpoint); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_init_rpc_failure_failpoint); service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this); } catch (...) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 8320651c968..1b269a1a5ef 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -21,16 +21,12 @@ #include #include -#ifdef FIU_ENABLE -#include -#include -#endif - namespace DB { namespace FailPoints { -extern const char random_receiver_failpoint[]; +extern const char random_receiver_sync_msg_push_failure_failpoint[]; +extern const char random_receiver_async_msg_push_failure_failpoint[]; } // namespace FailPoints namespace @@ -268,7 +264,9 @@ class AsyncRequestHandler : public UnaryCallback recv_msg->packet = std::move(packet); recv_msg->source_index = request->source_index; recv_msg->req_info = req_info; - if (!msg_channel->push(std::move(recv_msg))) + bool push_success = msg_channel->push(std::move(recv_msg)); + fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_success = false;); + if (!push_success) return false; // can't reuse packet since it is sent to readers. packet = std::make_shared(); @@ -488,14 +486,6 @@ void ExchangeReceiverBase::readLoop(const Request & req) recv_msg->req_info = req_info; recv_msg->source_index = req.source_index; bool success = reader->read(recv_msg->packet); - fiu_do_on(FailPoints::random_receiver_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 11) - success = false; - }); if (!success) break; has_data = true; @@ -503,14 +493,7 @@ void ExchangeReceiverBase::readLoop(const Request & req) throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg()); bool push_success = msg_channel.push(std::move(recv_msg)); - fiu_do_on(FailPoints::random_receiver_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 71) - push_success = false; - }); + fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_success = false;); if (!push_success) { meet_error = true; diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index af65305fa42..b0e49792d1a 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include @@ -51,11 +50,6 @@ struct RpcTypeTraits<::mpp::EstablishMPPConnectionRequest> namespace DB { -namespace FailPoints -{ -extern const char random_receiver_failpoint[]; -} // namespace FailPoints - namespace { struct GrpcExchangePacketReader : public ExchangePacketReader @@ -224,9 +218,7 @@ ExchangePacketReaderPtr GRPCReceiverContext::makeReader(const ExchangeRecvReques if (request.is_local) { auto [tunnel, status] = establishMPPConnectionLocal(request.req.get(), task_manager); - bool status_ok = status.ok(); - fiu_do_on(FailPoints::random_receiver_failpoint, status_ok = false;); - if (!status_ok) + if (!status.ok()) { throw Exception("Exchange receiver meet error : " + status.error_message()); } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 6dc902cc025..80d729838bc 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -21,11 +21,6 @@ #include #include -#ifdef FIU_ENABLE -#include -#include -#endif - namespace DB { namespace FailPoints @@ -61,14 +56,7 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std:: it = query_it->second->task_map.find(id); return it != query_it->second->task_map.end(); }); - fiu_do_on(FailPoints::random_task_manager_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/1000 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 1000)(rng); - if (num == 127) - ret = false; - }); + fiu_do_on(FailPoints::random_task_manager_failpoint, ret = false;); if (cancelled) { errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id()); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index c922f406dbe..30badedb340 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -21,17 +21,13 @@ #include #include -#ifdef FIU_ENABLE -#include -#include -#endif - namespace DB { namespace FailPoints { extern const char exception_during_mpp_close_tunnel[]; -extern const char random_tunnel_failpoint[]; +extern const char random_tunnel_write_failure_failpoint[]; +extern const char random_tunnel_wait_timeout_failpoint[]; } // namespace FailPoints template @@ -201,16 +197,7 @@ void MPPTunnelBase::sendJob(bool need_lock) MPPDataPacketPtr res; while (send_queue.pop(res)) { - bool write_success = writer->write(*res); - fiu_do_on(FailPoints::random_tunnel_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 17) - write_success = false; - }); - if (!write_success) + if (!writer->write(*res)) { err_msg = "grpc writes failed."; break; @@ -338,7 +325,7 @@ void MPPTunnelBase::waitUntilConnectedOrFinished(std::unique_lock +#include #include #include namespace DB { +namespace FailPoints +{ +extern const char random_min_tso_scheduler_failpoint[]; +} // namespace FailPoints + constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); constexpr UInt64 OS_THREAD_SOFT_LIMIT = 100000; @@ -193,7 +199,9 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q } else { - if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here. + bool is_tso_min = tso <= min_tso; + fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_tso_min = true;); + if (is_tso_min) /// the min_tso query should fully run, otherwise throw errors here. { has_error = true; auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index e3650345f09..738232444c9 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -38,11 +38,6 @@ #include #include -#ifdef FIU_ENABLE -#include -#include -#endif - namespace ProfileEvents { extern const Event ExternalAggregationWritePart; @@ -68,7 +63,8 @@ extern const int LOGICAL_ERROR; namespace FailPoints { -extern const char random_aggregate_failpoint[]; +extern const char random_aggregate_create_state_failpoint[]; +extern const char random_aggregate_merge_failpoint[]; } // namespace FailPoints AggregatedDataVariants::~AggregatedDataVariants() @@ -339,7 +335,7 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const * In order that then everything is properly destroyed, we "roll back" some of the created states. * The code is not very convenient. */ - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_failpoint); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_create_state_failpoint); aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]); } catch (...) @@ -1531,14 +1527,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream if (current_bucket_num >= NUM_BUCKETS) return {}; - fiu_do_on(FailPoints::random_aggregate_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 41) - throw Exception("Fail point aggregate is triggered.", ErrorCodes::FAIL_POINT_ERROR); - }); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_merge_failpoint); AggregatedDataVariantsPtr & first = data[0]; diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 28a47d5421f..ae1ad648165 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -30,16 +30,12 @@ #include #include "executeQuery.h" -#ifdef FIU_ENABLE -#include -#include -#endif - namespace DB { namespace FailPoints { -extern const char random_join_failpoint[]; +extern const char random_join_build_failpoint[]; +extern const char random_join_prob_failpoint[]; } // namespace FailPoints namespace ErrorCodes @@ -634,14 +630,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( } for (size_t insert_index = 0; insert_index < segment_index_info.size(); insert_index++) { - fiu_do_on(FailPoints::random_join_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 31) - throw Exception("Fail point join build is triggered.", ErrorCodes::FAIL_POINT_ERROR); - }); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_build_failpoint); size_t segment_index = (insert_index + stream_index) % segment_index_info.size(); if (segment_index == segment_size) { @@ -1534,14 +1523,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const default: throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT); } - fiu_do_on(FailPoints::random_join_failpoint, { - // Since the code will run very frequently, then other failpoint might have no chance to trigger - // so internally low down the possibility to 1/100 - pcg64 rng(randomSeed()); - int num = std::uniform_int_distribution(0, 100)(rng); - if (num == 67) - throw Exception("Fail point join prob is triggered.", ErrorCodes::FAIL_POINT_ERROR); - }); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); for (size_t i = 0; i < num_columns_to_add; ++i) { const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3e2c29de76c..72f02c35fad 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -982,6 +983,7 @@ int Server::main(const std::vector & /*args*/) Poco::Logger * log = &logger(); #ifdef FIU_ENABLE fiu_init(0); // init failpoint + FailPointHelper::initRandomFailPoints(config(), log); #endif UpdateMallocConfig(log); From 4d817fc8e5843f3d1640034abec6476d366607c2 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 15 Jun 2022 17:47:05 +0800 Subject: [PATCH 08/14] Fix format issue Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 23 ++++++++++--------- dbms/src/Common/FailPoint.h | 1 + .../DataStreams/SharedQueryBlockInputStream.h | 2 +- dbms/src/DataStreams/SizeLimits.cpp | 13 +++++++---- dbms/src/Flash/EstablishCall.cpp | 2 +- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 2 +- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 2 +- dbms/src/Interpreters/Aggregator.cpp | 2 +- dbms/src/Interpreters/Join.cpp | 3 ++- dbms/src/Interpreters/executeQuery.cpp | 2 +- dbms/src/Server/Server.cpp | 2 +- 12 files changed, 31 insertions(+), 25 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 89ff516627e..e106480aa91 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include -#include -#include -#include -#include +#include #include #include -#include +#include #include +#include + +#include +#include +#include namespace DB { @@ -242,11 +243,11 @@ void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & co void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) { -#define SUB_M(NAME) \ - if (fail_point_name == FailPoints::NAME) \ - { \ - fiu_enable_random(FailPoints::NAME, 1, nullptr, 0, rate); \ - return; \ +#define SUB_M(NAME) \ + if (fail_point_name == FailPoints::NAME) \ + { \ + fiu_enable_random(FailPoints::NAME, 1, nullptr, 0, rate); \ + return; \ } #define M(NAME) SUB_M(NAME) diff --git a/dbms/src/Common/FailPoint.h b/dbms/src/Common/FailPoint.h index 7556cd5b15e..31df2dbdcd2 100644 --- a/dbms/src/Common/FailPoint.h +++ b/dbms/src/Common/FailPoint.h @@ -63,6 +63,7 @@ class FailPointHelper static void initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); static void enableRandomFailPoint(const String & fail_point_name, double rate); + private: static std::unordered_map> fail_point_wait_channels; }; diff --git a/dbms/src/DataStreams/SharedQueryBlockInputStream.h b/dbms/src/DataStreams/SharedQueryBlockInputStream.h index 7d426921d45..d7c0707b5aa 100644 --- a/dbms/src/DataStreams/SharedQueryBlockInputStream.h +++ b/dbms/src/DataStreams/SharedQueryBlockInputStream.h @@ -14,11 +14,11 @@ #pragma once +#include #include #include #include #include -#include #include #include diff --git a/dbms/src/DataStreams/SizeLimits.cpp b/dbms/src/DataStreams/SizeLimits.cpp index 9a03d7985a9..c83f18b257d 100644 --- a/dbms/src/DataStreams/SizeLimits.cpp +++ b/dbms/src/DataStreams/SizeLimits.cpp @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include #include +#include +#include + #include namespace DB @@ -33,7 +34,8 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti { if (overflow_mode == OverflowMode::THROW) throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows) - + ", current rows: " + formatReadableQuantity(rows), exception_code); + + ", current rows: " + formatReadableQuantity(rows), + exception_code); else return false; } @@ -42,7 +44,8 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti { if (overflow_mode == OverflowMode::THROW) throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes) - + ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), exception_code); + + ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), + exception_code); else return false; } @@ -50,4 +53,4 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti return true; } -} +} // namespace DB diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 11e85393db7..89857a2407e 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include +#include #include #include #include diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 1b269a1a5ef..ec8bde51469 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -13,9 +13,9 @@ // limitations under the License. #include +#include #include #include -#include #include #include #include diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 80d729838bc..1b999c44f74 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include +#include #include #include diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index caf793a07f3..967bfcecfa3 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include +#include #include #include diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 738232444c9..455c01ad99e 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -17,9 +17,9 @@ #include #include #include +#include #include #include -#include #include #include #include diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index ae1ad648165..04d264b57b1 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -17,8 +17,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -28,6 +28,7 @@ #include #include #include + #include "executeQuery.h" namespace DB diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index e1f060ab17c..78ad4b41ce6 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include -#include #include #include #include diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 72f02c35fad..df635e441c2 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -27,7 +28,6 @@ #include #include #include -#include #include #include #include From 0eed31ea6a4bbd1c38015ebd9787daaf9e5f47ef Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 16 Jun 2022 09:13:20 +0800 Subject: [PATCH 09/14] Fix format issue Signed-off-by: yibin --- dbms/src/Interpreters/Aggregator.cpp | 4 ++-- dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 455c01ad99e..6659504c624 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1022,7 +1022,7 @@ Block Aggregator::prepareBlockAndFill( aggregate_columns[i] = header.getByName(aggregate_column_name).type->createColumn(); /// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states. - ColumnAggregateFunction & column_aggregate_func = assert_cast(*aggregate_columns[i]); + auto & column_aggregate_func = assert_cast(*aggregate_columns[i]); for (auto & pool : data_variants.aggregates_pools) column_aggregate_func.addArena(pool); @@ -1508,7 +1508,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream Block getHeader() const override { return aggregator.getHeader(final); } - ~MergingAndConvertingBlockInputStream() + ~MergingAndConvertingBlockInputStream() override { LOG_FMT_TRACE(&Poco::Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish"); diff --git a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp index c9b871c67d2..00ba6e16767 100644 --- a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp +++ b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp @@ -398,7 +398,7 @@ class PageStorageControlV3 if (entry_or_del.isEntry() && it->second->type == EditRecordType::VAR_ENTRY) { (void)blob_store; - try + tryA { PageIDAndEntryV3 to_read_entry; const PageEntryV3 & entry = entry_or_del.entry; From 88d54d27960a7235b274629df07e3b4bd5d0c39d Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 16 Jun 2022 09:39:42 +0800 Subject: [PATCH 10/14] Fix format issue Signed-off-by: yibin --- dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp index 00ba6e16767..c9b871c67d2 100644 --- a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp +++ b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp @@ -398,7 +398,7 @@ class PageStorageControlV3 if (entry_or_del.isEntry() && it->second->type == EditRecordType::VAR_ENTRY) { (void)blob_store; - tryA + try { PageIDAndEntryV3 to_read_entry; const PageEntryV3 & entry = entry_or_del.entry; From cb2a868023c5a6d0c11f88ac4d75fe4d65e50cee Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 17 Jun 2022 08:41:32 +0800 Subject: [PATCH 11/14] Update dbms/src/Interpreters/Join.cpp Remove useless head files Co-authored-by: JaySon --- dbms/src/Interpreters/Join.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 04d264b57b1..181ebcaaa64 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -29,7 +29,6 @@ #include #include -#include "executeQuery.h" namespace DB { From 29cc539fb76ef87cd521db68aac98a5de1d53677 Mon Sep 17 00:00:00 2001 From: yibin Date: Mon, 20 Jun 2022 11:24:00 +0800 Subject: [PATCH 12/14] Update according to review comments Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 2 +- dbms/src/DataStreams/SizeLimits.cpp | 2 +- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 4 ++-- dbms/src/Flash/Mpp/MPPTunnel.cpp | 1 - 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index e106480aa91..92c7434cb9c 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -123,7 +123,7 @@ std::unordered_map> FailPointHelper::f M(random_sharedquery_failpoint) \ M(random_interpreter_failpoint) \ M(random_task_lifecycle_failpoint) \ - M(random_task_manager_failpoint) \ + M(random_task_manager_find_task_failure_failpoint) \ M(random_min_tso_scheduler_failpoint) namespace FailPoints diff --git a/dbms/src/DataStreams/SizeLimits.cpp b/dbms/src/DataStreams/SizeLimits.cpp index c83f18b257d..4d1bfaae997 100644 --- a/dbms/src/DataStreams/SizeLimits.cpp +++ b/dbms/src/DataStreams/SizeLimits.cpp @@ -29,7 +29,7 @@ extern const char random_limit_check_failpoint[]; bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const { bool rows_exceed_limit = max_rows && rows > max_rows; - fiu_do_on(FailPoints::random_limit_check_failpoint, rows_exceed_limit = false;); + fiu_do_on(FailPoints::random_limit_check_failpoint, rows_exceed_limit = true;); if (rows_exceed_limit) { if (overflow_mode == OverflowMode::THROW) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 1b999c44f74..3df4af5de5f 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -25,7 +25,7 @@ namespace DB { namespace FailPoints { -extern const char random_task_manager_failpoint[]; +extern const char random_task_manager_find_task_failure_failpoint[]; } // namespace FailPoints MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) @@ -56,7 +56,7 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std:: it = query_it->second->task_map.find(id); return it != query_it->second->task_map.end(); }); - fiu_do_on(FailPoints::random_task_manager_failpoint, ret = false;); + fiu_do_on(FailPoints::random_task_manager_find_task_failure_failpoint, ret = false;); if (cancelled) { errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id()); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 30badedb340..6ddd50c2666 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -26,7 +26,6 @@ namespace DB namespace FailPoints { extern const char exception_during_mpp_close_tunnel[]; -extern const char random_tunnel_write_failure_failpoint[]; extern const char random_tunnel_wait_timeout_failpoint[]; } // namespace FailPoints From 1297fdd62f3d4eb13945924426d1dccfcccdbc68 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 21 Jun 2022 09:36:30 +0800 Subject: [PATCH 13/14] Add missed undef Signed-off-by: yibin --- dbms/src/Common/FailPoint.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 92c7434cb9c..c5d7bf8d68f 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -253,6 +253,7 @@ void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, doub #define M(NAME) SUB_M(NAME) APPLY_FOR_RANDOM_FAILPOINTS(M) #undef M +#undef SUB_M throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR); } From c50d821b2a30f4aba1cc6459fd03740f2b6dc98c Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 21 Jun 2022 14:54:56 +0800 Subject: [PATCH 14/14] Remove useless include head statement Signed-off-by: yibin --- dbms/src/Flash/Mpp/MPPTunnel.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 6ddd50c2666..13a7eaad95e 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include