Skip to content

Commit

Permalink
Add random failpoint in critical paths (pingcap#4876)
Browse files Browse the repository at this point in the history
  • Loading branch information
yibin87 authored and Lloyd-Pottiger committed Jul 19, 2022
1 parent 049f050 commit 4d86eb7
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 17 deletions.
65 changes: 63 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Poco/String.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/defines.h>
#include <common/logger_useful.h>

#include <boost/core/noncopyable.hpp>
#include <condition_variable>
Expand All @@ -21,7 +27,6 @@
namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS_ONCE(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
Expand Down Expand Up @@ -109,13 +114,30 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_query_init)


#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
M(random_tunnel_wait_timeout_failpoint) \
M(random_tunnel_init_rpc_failure_failpoint) \
M(random_receiver_sync_msg_push_failure_failpoint) \
M(random_receiver_async_msg_push_failure_failpoint) \
M(random_limit_check_failpoint) \
M(random_join_build_failpoint) \
M(random_join_prob_failpoint) \
M(random_aggregate_create_state_failpoint) \
M(random_aggregate_merge_failpoint) \
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_lifecycle_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint)

namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
APPLY_FOR_FAILPOINTS_ONCE(M)
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M)
APPLY_FOR_PAUSEABLE_FAILPOINTS(M)
APPLY_FOR_RANDOM_FAILPOINTS(M)
#undef M
} // namespace FailPoints

Expand Down Expand Up @@ -179,7 +201,7 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
#undef M
#undef SUB_M

throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}

void FailPointHelper::disableFailPoint(const String & fail_point_name)
Expand All @@ -204,6 +226,41 @@ void FailPointHelper::wait(const String & fail_point_name)
ptr->wait();
}
}

void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log)
{
String random_fail_point_cfg = config.getString("flash.random_fail_points", "");
if (random_fail_point_cfg.empty())
return;

Poco::StringTokenizer string_tokens(random_fail_point_cfg, ",");
for (const auto & string_token : string_tokens)
{
Poco::StringTokenizer pair_tokens(string_token, "-");
RUNTIME_ASSERT((pair_tokens.count() == 2), log, "RandomFailPoints config should be FailPointA-RatioA,FailPointB-RatioB,... format");
double rate = atof(pair_tokens[1].c_str()); //NOLINT(cert-err34-c): check conversion error manually
RUNTIME_ASSERT((0 <= rate && rate <= 1.0), log, "RandomFailPoint trigger rate should in [0,1], while {}", rate);
enableRandomFailPoint(pair_tokens[0], rate);
}
LOG_FMT_INFO(log, "Enable RandomFailPoints: {}", random_fail_point_cfg);
}

void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate)
{
#define SUB_M(NAME) \
if (fail_point_name == FailPoints::NAME) \
{ \
fiu_enable_random(FailPoints::NAME, 1, nullptr, 0, rate); \
return; \
}

#define M(NAME) SUB_M(NAME)
APPLY_FOR_RANDOM_FAILPOINTS(M)
#undef M
#undef SUB_M

throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}
#else
class FailPointChannel
{
Expand All @@ -214,6 +271,10 @@ void FailPointHelper::enableFailPoint(const String &) {}
void FailPointHelper::disableFailPoint(const String &) {}

void FailPointHelper::wait(const String &) {}

void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) {}

void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) {}
#endif

} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@

#include <unordered_map>

namespace Poco
{
class Logger;
namespace Util
{
class LayeredConfiguration;
}
} // namespace Poco

namespace DB
{
namespace ErrorCodes
Expand All @@ -35,7 +44,6 @@ extern const int FAIL_POINT_ERROR;
// When `fail_point` is enabled, wait till it is disabled
#define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);)


class FailPointChannel;
class FailPointHelper
{
Expand All @@ -46,6 +54,16 @@ class FailPointHelper

static void wait(const String & fail_point_name);

/*
* For Server RandomFailPoint test usage. When FIU_ENABLE is defined, this function does the following work:
* 1. Return if TiFlash config has empty flash.random_fail_points cfg
* 2. Parse flash.random_fail_points, which expect to has "FailPointA-RatioA,FailPointB-RatioB,..." format
* 3. Call enableRandomFailPoint method with parsed FailPointName and Rate
*/
static void initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log);

static void enableRandomFailPoint(const String & fail_point_name, double rate);

private:
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
};
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Common/wrapInvocable.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&..
// run the task with the parameters provided
return std::apply(std::move(func), std::move(args));
};

return capture;
}
} // namespace DB
7 changes: 7 additions & 0 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FailPoint.h>
#include <Common/MPMCQueue.h>
#include <Common/ThreadFactory.h>
#include <Common/ThreadManager.h>
Expand All @@ -24,6 +25,11 @@

namespace DB
{
namespace FailPoints
{
extern const char random_sharedquery_failpoint[];
} // namespace FailPoints

/** This block input stream is used by SharedQuery.
* It enable multiple threads read from one stream.
*/
Expand Down Expand Up @@ -136,6 +142,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
in->readPrefix();
while (true)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_sharedquery_failpoint);
Block block = in->read();
// in is finished or queue is canceled
if (!block || !queue.push(block))
Expand Down
23 changes: 16 additions & 7 deletions dbms/src/DataStreams/SizeLimits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/SizeLimits.h>
#include <Common/formatReadable.h>
#include <Common/Exception.h>
#include <string>
#include <Common/FailPoint.h>
#include <Common/formatReadable.h>
#include <DataStreams/SizeLimits.h>

#include <string>

namespace DB
{
namespace FailPoints
{
extern const char random_limit_check_failpoint[];
} // namespace FailPoints

bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const
{
if (max_rows && rows > max_rows)
bool rows_exceed_limit = max_rows && rows > max_rows;
fiu_do_on(FailPoints::random_limit_check_failpoint, rows_exceed_limit = true;);
if (rows_exceed_limit)
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows)
+ ", current rows: " + formatReadableQuantity(rows), exception_code);
+ ", current rows: " + formatReadableQuantity(rows),
exception_code);
else
return false;
}
Expand All @@ -36,12 +44,13 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes)
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), exception_code);
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes),
exception_code);
else
return false;
}

return true;
}

}
} // namespace DB
7 changes: 7 additions & 0 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/EstablishCall.h>
#include <Flash/FlashService.h>
#include <Flash/Mpp/Utils.h>

namespace DB
{
namespace FailPoints
{
extern const char random_tunnel_init_rpc_failure_failpoint[];
} // namespace FailPoints

EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr<std::atomic<bool>> & is_shutdown)
: service(service)
, cq(cq)
Expand Down Expand Up @@ -71,6 +77,7 @@ void EstablishCallData::initRpc()
std::exception_ptr eptr = nullptr;
try
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_init_rpc_failure_failpoint);
service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this);
}
catch (...)
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/CPUAffinityManager.h>
#include <Common/FailPoint.h>
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
Expand All @@ -22,6 +23,12 @@

namespace DB
{
namespace FailPoints
{
extern const char random_receiver_sync_msg_push_failure_failpoint[];
extern const char random_receiver_async_msg_push_failure_failpoint[];
} // namespace FailPoints

namespace
{
String getReceiverStateStr(const ExchangeReceiverState & s)
Expand Down Expand Up @@ -257,7 +264,9 @@ class AsyncRequestHandler : public UnaryCallback<bool>
recv_msg->packet = std::move(packet);
recv_msg->source_index = request->source_index;
recv_msg->req_info = req_info;
if (!msg_channel->push(std::move(recv_msg)))
bool push_success = msg_channel->push(std::move(recv_msg));
fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_success = false;);
if (!push_success)
return false;
// can't reuse packet since it is sent to readers.
packet = std::make_shared<MPPDataPacket>();
Expand Down Expand Up @@ -483,7 +492,9 @@ void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
if (recv_msg->packet->has_error())
throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg());

if (!msg_channel.push(std::move(recv_msg)))
bool push_success = msg_channel.push(std::move(recv_msg));
fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_success = false;);
if (!push_success)
{
meet_error = true;
auto local_state = getState();
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[];
extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_during_mpp_write_err_to_tunnel[];
extern const char force_no_local_region_for_mpp_task[];
extern const char random_task_lifecycle_failpoint[];
} // namespace FailPoints

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
Expand Down Expand Up @@ -394,7 +395,15 @@ void MPPTask::runImpl()
writeErrToAllTunnels(err_msg);
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
unregisterTask();
// unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed
// by grpc CancelMPPTask thread;
bool unregister = true;
fiu_do_on(FailPoints::random_task_lifecycle_failpoint, {
if (!err_msg.empty())
unregister = false;
});
if (unregister)
unregisterTask();

if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <fmt/core.h>
Expand All @@ -22,6 +23,11 @@

namespace DB
{
namespace FailPoints
{
extern const char random_task_manager_find_task_failure_failpoint[];
} // namespace FailPoints

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, log(&Poco::Logger::get("TaskManager"))
Expand Down Expand Up @@ -50,6 +56,7 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std::
it = query_it->second->task_map.find(id);
return it != query_it->second->task_map.end();
});
fiu_do_on(FailPoints::random_task_manager_find_task_failure_failpoint, ret = false;);
if (cancelled)
{
errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id());
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace DB
namespace FailPoints
{
extern const char exception_during_mpp_close_tunnel[];
extern const char random_tunnel_wait_timeout_failpoint[];
} // namespace FailPoints

template <typename Writer>
Expand Down Expand Up @@ -322,6 +323,7 @@ void MPPTunnelBase<Writer>::waitUntilConnectedOrFinished(std::unique_lock<std::m
auto res = cv_for_connected_or_finished.wait_for(lk, timeout, connected_or_finished);
LOG_FMT_TRACE(log, "end waitUntilConnectedOrFinished");

fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, res = false;);
if (!res)
throw Exception(tunnel_id + " is timeout");
}
Expand Down
Loading

0 comments on commit 4d86eb7

Please sign in to comment.