Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add random failpoint in critical paths #4876

Merged
merged 18 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Poco/String.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/defines.h>
#include <common/logger_useful.h>

#include <boost/core/noncopyable.hpp>
#include <condition_variable>
Expand All @@ -21,7 +27,6 @@
namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS_ONCE(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
Expand Down Expand Up @@ -105,13 +110,30 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta)

#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
M(random_tunnel_wait_timeout_failpoint) \
M(random_tunnel_init_rpc_failure_failpoint) \
M(random_receiver_sync_msg_push_failure_failpoint) \
M(random_receiver_async_msg_push_failure_failpoint) \
M(random_limit_check_failpoint) \
M(random_join_build_failpoint) \
M(random_join_prob_failpoint) \
M(random_aggregate_create_state_failpoint) \
M(random_aggregate_merge_failpoint) \
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_lifecycle_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint)

namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
APPLY_FOR_FAILPOINTS_ONCE(M)
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
APPLY_FOR_RANDOM_FAILPOINTS(M)
#undef M
} // namespace FailPoints

Expand Down Expand Up @@ -175,7 +197,7 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
#undef M
#undef SUB_M

throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}

void FailPointHelper::disableFailPoint(const String & fail_point_name)
Expand All @@ -200,6 +222,41 @@ void FailPointHelper::wait(const String & fail_point_name)
ptr->wait();
}
}

void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log)
{
String random_fail_point_cfg = config.getString("flash.random_fail_points", "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there some test plans? how about open failpoint for ci test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan to add a regular cluster test, since some failpoints need to be test under parrel workloads.

if (random_fail_point_cfg.empty())
return;

Poco::StringTokenizer string_tokens(random_fail_point_cfg, ",");
for (const auto & string_token : string_tokens)
{
Poco::StringTokenizer pair_tokens(string_token, "-");
RUNTIME_ASSERT((pair_tokens.count() == 2), log, "RandomFailPoints config should be FailPointA-RatioA,FailPointB-RatioB,... format");
double rate = atof(pair_tokens[1].c_str()); //NOLINT(cert-err34-c): check conversion error manually
RUNTIME_ASSERT((0 <= rate && rate <= 1.0), log, "RandomFailPoint trigger rate should in [0,1], while {}", rate);
enableRandomFailPoint(pair_tokens[0], rate);
}
LOG_FMT_INFO(log, "Enable RandomFailPoints: {}", random_fail_point_cfg);
}

void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate)
{
#define SUB_M(NAME) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need an #undef?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, done.

if (fail_point_name == FailPoints::NAME) \
{ \
fiu_enable_random(FailPoints::NAME, 1, nullptr, 0, rate); \
return; \
}

#define M(NAME) SUB_M(NAME)
APPLY_FOR_RANDOM_FAILPOINTS(M)
#undef M
#undef SUB_M

throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR);
}
#else
class FailPointChannel
{
Expand All @@ -210,6 +267,10 @@ void FailPointHelper::enableFailPoint(const String &) {}
void FailPointHelper::disableFailPoint(const String &) {}

void FailPointHelper::wait(const String &) {}

void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) {}

void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) {}
#endif

} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Common/FailPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@

#include <unordered_map>

namespace Poco
{
class Logger;
namespace Util
{
class LayeredConfiguration;
}
} // namespace Poco

namespace DB
{
namespace ErrorCodes
Expand All @@ -35,7 +44,6 @@ extern const int FAIL_POINT_ERROR;
// When `fail_point` is enabled, wait till it is disabled
#define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);)


class FailPointChannel;
class FailPointHelper
{
Expand All @@ -46,6 +54,16 @@ class FailPointHelper

static void wait(const String & fail_point_name);

/*
* For Server RandomFailPoint test usage. When FIU_ENABLE is defined, this function does the following work:
* 1. Return if TiFlash config has empty flash.random_fail_points cfg
* 2. Parse flash.random_fail_points, which expect to has "FailPointA-RatioA,FailPointB-RatioB,..." format
* 3. Call enableRandomFailPoint method with parsed FailPointName and Rate
*/
static void initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log);

static void enableRandomFailPoint(const String & fail_point_name, double rate);

private:
static std::unordered_map<String, std::shared_ptr<FailPointChannel>> fail_point_wait_channels;
};
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Common/wrapInvocable.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&..
// run the task with the parameters provided
return std::apply(std::move(func), std::move(args));
};

return capture;
}
} // namespace DB
7 changes: 7 additions & 0 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FailPoint.h>
#include <Common/MPMCQueue.h>
#include <Common/ThreadFactory.h>
#include <Common/ThreadManager.h>
Expand All @@ -24,6 +25,11 @@

namespace DB
{
namespace FailPoints
{
extern const char random_sharedquery_failpoint[];
} // namespace FailPoints

/** This block input stream is used by SharedQuery.
* It enable multiple threads read from one stream.
*/
Expand Down Expand Up @@ -136,6 +142,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
in->readPrefix();
while (true)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_sharedquery_failpoint);
Block block = in->read();
// in is finished or queue is canceled
if (!block || !queue.push(block))
Expand Down
23 changes: 16 additions & 7 deletions dbms/src/DataStreams/SizeLimits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/SizeLimits.h>
#include <Common/formatReadable.h>
#include <Common/Exception.h>
#include <string>
#include <Common/FailPoint.h>
#include <Common/formatReadable.h>
#include <DataStreams/SizeLimits.h>

#include <string>

namespace DB
{
namespace FailPoints
{
extern const char random_limit_check_failpoint[];
} // namespace FailPoints

bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const
{
if (max_rows && rows > max_rows)
bool rows_exceed_limit = max_rows && rows > max_rows;
fiu_do_on(FailPoints::random_limit_check_failpoint, rows_exceed_limit = true;);
if (rows_exceed_limit)
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows)
+ ", current rows: " + formatReadableQuantity(rows), exception_code);
+ ", current rows: " + formatReadableQuantity(rows),
exception_code);
else
return false;
}
Expand All @@ -36,12 +44,13 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes)
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), exception_code);
+ ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes),
exception_code);
else
return false;
}

return true;
}

}
} // namespace DB
7 changes: 7 additions & 0 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/EstablishCall.h>
#include <Flash/FlashService.h>
#include <Flash/Mpp/Utils.h>

namespace DB
{
namespace FailPoints
{
extern const char random_tunnel_init_rpc_failure_failpoint[];
} // namespace FailPoints

EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr<std::atomic<bool>> & is_shutdown)
: service(service)
, cq(cq)
Expand Down Expand Up @@ -71,6 +77,7 @@ void EstablishCallData::initRpc()
std::exception_ptr eptr = nullptr;
try
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_init_rpc_failure_failpoint);
service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this);
}
catch (...)
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/CPUAffinityManager.h>
#include <Common/FailPoint.h>
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
Expand All @@ -22,6 +23,12 @@

namespace DB
{
namespace FailPoints
{
extern const char random_receiver_sync_msg_push_failure_failpoint[];
extern const char random_receiver_async_msg_push_failure_failpoint[];
} // namespace FailPoints

namespace
{
String getReceiverStateStr(const ExchangeReceiverState & s)
Expand Down Expand Up @@ -257,7 +264,9 @@ class AsyncRequestHandler : public UnaryCallback<bool>
recv_msg->packet = std::move(packet);
recv_msg->source_index = request->source_index;
recv_msg->req_info = req_info;
if (!msg_channel->push(std::move(recv_msg)))
bool push_success = msg_channel->push(std::move(recv_msg));
fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_success = false;);
if (!push_success)
return false;
// can't reuse packet since it is sent to readers.
packet = std::make_shared<MPPDataPacket>();
Expand Down Expand Up @@ -483,7 +492,9 @@ void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
if (recv_msg->packet->has_error())
throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg());

if (!msg_channel.push(std::move(recv_msg)))
bool push_success = msg_channel.push(std::move(recv_msg));
fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_success = false;);
if (!push_success)
{
meet_error = true;
auto local_state = getState();
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[];
extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_during_mpp_write_err_to_tunnel[];
extern const char force_no_local_region_for_mpp_task[];
extern const char random_task_lifecycle_failpoint[];
} // namespace FailPoints

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
Expand Down Expand Up @@ -397,7 +398,15 @@ void MPPTask::runImpl()
writeErrToAllTunnels(err_msg);
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
unregisterTask();
// unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed
// by grpc CancelMPPTask thread;
bool unregister = true;
fiu_do_on(FailPoints::random_task_lifecycle_failpoint, {
if (!err_msg.empty())
unregister = false;
});
if (unregister)
unregisterTask();

if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <fmt/core.h>
Expand All @@ -22,6 +23,11 @@

namespace DB
{
namespace FailPoints
{
extern const char random_task_manager_find_task_failure_failpoint[];
} // namespace FailPoints

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, log(&Poco::Logger::get("TaskManager"))
Expand Down Expand Up @@ -50,6 +56,7 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std::
it = query_it->second->task_map.find(id);
return it != query_it->second->task_map.end();
});
fiu_do_on(FailPoints::random_task_manager_find_task_failure_failpoint, ret = false;);
if (cancelled)
{
errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id());
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/FailPoint.h>
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Common/randomSeed.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless header file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

#include <Flash/Mpp/MPPTunnel.h>
#include <Flash/Mpp/Utils.h>
#include <fmt/core.h>
Expand All @@ -25,6 +26,7 @@ namespace DB
namespace FailPoints
{
extern const char exception_during_mpp_close_tunnel[];
extern const char random_tunnel_wait_timeout_failpoint[];
} // namespace FailPoints

template <typename Writer>
Expand Down Expand Up @@ -322,6 +324,7 @@ void MPPTunnelBase<Writer>::waitUntilConnectedOrFinished(std::unique_lock<std::m
auto res = cv_for_connected_or_finished.wait_for(lk, timeout, connected_or_finished);
LOG_FMT_TRACE(log, "end waitUntilConnectedOrFinished");

fiu_do_on(FailPoints::random_tunnel_wait_timeout_failpoint, res = false;);
if (!res)
throw Exception(tunnel_id + " is timeout");
}
Expand Down
Loading