Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Add comments and refine logging about WaitCheckRegionReady #9153

Merged
merged 9 commits into from
Jun 24, 2024
30 changes: 4 additions & 26 deletions dbms/src/Common/Checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ struct ChecksumFrame
{
size_t bytes;
typename Algorithm::HashType checksum;
uint8_t
pad[alignof(size_t) > alignof(typename Algorithm::HashType)
// clang-format off
uint8_t pad[alignof(size_t) > alignof(typename Algorithm::HashType)
? alignof(size_t) - sizeof(typename Algorithm::HashType)
: 0];
// clang-format on
uint8_t data[0];
};

Expand Down Expand Up @@ -172,11 +173,9 @@ using FrameUnion = std::aligned_union_t<
struct UnifiedDigestBase
{
virtual void update(const void * data, size_t length) = 0;
virtual bool compareB64(const std::string & data) = 0;
virtual bool compareRaw(std::string_view data) = 0;
virtual bool compareRaw(const void * data) = 0;
virtual bool compareFrame(const FrameUnion & frame) = 0;
[[nodiscard]] virtual std::string base64() const = 0;
[[nodiscard]] virtual std::string raw() const = 0;
virtual ~UnifiedDigestBase() = default;
virtual size_t hashSize() const = 0;
Expand All @@ -195,16 +194,6 @@ class UnifiedDigest : public UnifiedDigestBase
public:
void update(const void * data, size_t length) override { backend.update(data, length); }

bool compareB64(const std::string & data) override
{
auto checksum = backend.checksum();
auto input = std::istringstream{data};
auto decoder = Poco::Base64Decoder{input};
decltype(checksum) target = {};
decoder.read(reinterpret_cast<char *>(&target), sizeof(target));
return checksum == target;
}

bool compareRaw(const void * data) override
{
auto checksum = backend.checksum();
Expand All @@ -227,22 +216,11 @@ class UnifiedDigest : public UnifiedDigestBase
[[nodiscard]] std::string raw() const override
{
auto checksum = backend.checksum();
std::string data(sizeof(checksum), ' ');
std::string data(sizeof(checksum), '\0');
::memcpy(data.data(), &checksum, sizeof(checksum));
return data;
}

[[nodiscard]] std::string base64() const override
{
auto output = std::ostringstream{};
{
auto encoder = Poco::Base64Encoder{output};
auto checksum = backend.checksum();
encoder.write(reinterpret_cast<char *>(&checksum), sizeof(checksum));
}
return output.str();
}

[[nodiscard]] size_t hashSize() const override { return Backend::hash_size; }
[[nodiscard]] size_t headerSize() const override { return sizeof(ChecksumFrame<Backend>); }
void reset() override { backend = Backend{}; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
(*p_process_keys_bytes) += (key.len + value.len);
reader->next();
}
auto sec = sw.elapsedMilliseconds();
const auto sec = sw.elapsedSeconds();
LOG_DEBUG(
log,
"Done loading all kvpairs, CF={} offset={} processed_bytes={} write_cf_offset={} region_id={} split_id={} "
"snapshot_index={} elapsed_sec={} speed={}",
"snapshot_index={} elapsed_sec={:.3f} speed={}",
CFToName(cf),
(*p_process_keys),
(*p_process_keys_bytes),
Expand All @@ -308,11 +308,11 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
// We keep an assumption that rowkeys are memory-comparable and they are asc sorted in the SST file
if (!last_loaded_rowkey->empty() && *last_loaded_rowkey > *rowkey_to_be_included)
{
auto sec = sw.elapsedMilliseconds();
const auto sec = sw.elapsedSeconds();
LOG_DEBUG(
log,
"Done loading, CF={} offset={} processed_bytes={} write_cf_offset={} last_loaded_rowkey={} "
"rowkey_to_be_included={} region_id={} snapshot_index={} elapsed_sec={} speed={}",
"rowkey_to_be_included={} region_id={} snapshot_index={} elapsed_sec={:.3f} speed={}",
CFToName(cf),
(*p_process_keys),
(*p_process_keys_bytes),
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ class KVStoreTaskLock : private boost::noncopyable
};

void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t &, double, double, double);
void WaitCheckRegionReadyImpl(
const TMTContext &,
KVStore & kvstore,
const std::atomic_size_t &,
double,
double,
double);

} // namespace DB
60 changes: 38 additions & 22 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Poco/Message.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/Read/ReadIndexWorkerImpl.h>
Expand Down Expand Up @@ -124,38 +125,38 @@ std::tuple<WaitIndexStatus, double> Region::waitIndex(
}
}

void WaitCheckRegionReady(
void WaitCheckRegionReadyImpl(
const TMTContext & tmt,
KVStore & kvstore,
const std::atomic_size_t & terminate_signals_counter,
double wait_tick_time,
double max_wait_tick_time,
double get_wait_region_ready_timeout_sec)
{
constexpr double batch_read_index_time_rate = 0.2; // part of time for waiting shall be assigned to batch-read-index
// part of time for waiting shall be assigned to batch-read-index
static constexpr double BATCH_READ_INDEX_TIME_RATE = 0.2;
auto log = Logger::get(__FUNCTION__);

LOG_INFO(
log,
"start to check regions ready, min-wait-tick {}s, max-wait-tick {}s, wait-region-ready-timeout {:.3f}s",
"start to check regions ready, min_wait_tick={:.3f}s max_wait_tick={:.3f}s wait_region_ready_timeout={:.3f}s",
wait_tick_time,
max_wait_tick_time,
get_wait_region_ready_timeout_sec);

std::unordered_set<RegionID> remain_regions;
std::unordered_map<RegionID, uint64_t> regions_to_check;
Stopwatch region_check_watch;
size_t total_regions_cnt = 0;
{
kvstore.traverseRegions(
[&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
total_regions_cnt = remain_regions.size();
}
while (region_check_watch.elapsedSeconds() < get_wait_region_ready_timeout_sec * batch_read_index_time_rate
kvstore.traverseRegions(
[&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
const size_t total_regions_cnt = remain_regions.size();

while (region_check_watch.elapsedSeconds() < get_wait_region_ready_timeout_sec * BATCH_READ_INDEX_TIME_RATE
&& terminate_signals_counter.load(std::memory_order_relaxed) == 0)
{
// Generate the read index requests
std::vector<kvrpcpb::ReadIndexRequest> batch_read_index_req;
for (auto it = remain_regions.begin(); it != remain_regions.end();)
for (auto it = remain_regions.begin(); it != remain_regions.end(); /**/)
{
auto region_id = *it;
if (auto region = kvstore.getRegion(region_id); region)
Expand All @@ -165,9 +166,12 @@ void WaitCheckRegionReady(
}
else
{
// Remove the region that is not exist now
it = remain_regions.erase(it);
}
}

// Record the latest commit index in TiKV
auto read_index_res = kvstore.batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
for (auto && [resp, region_id] : read_index_res)
{
Expand All @@ -179,7 +183,7 @@ void WaitCheckRegionReady(
need_retry = false;
LOG_DEBUG(
log,
"neglect error region_id={} not found {} epoch not match {}",
"neglect error, region_id={} not_found={} epoch_not_match={}",
region_id,
region_error.has_region_not_found(),
region_error.has_epoch_not_match());
Expand All @@ -202,15 +206,17 @@ void WaitCheckRegionReady(

LOG_INFO(
log,
"{} regions need to fetch latest commit-index in next round, sleep for {:.3f}s",
"{} regions need to fetch latest commit-index in next round, sleep for {:.3f}s, tot_regions={}",
remain_regions.size(),
wait_tick_time);
wait_tick_time,
total_regions_cnt);
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<Int64>(wait_tick_time * 1000)));
wait_tick_time = std::min(max_wait_tick_time, wait_tick_time * 2);
}

if (!remain_regions.empty())
{
// timeout for fetching latest commit index from TiKV happen
FmtBuffer buffer;
buffer.joinStr(
remain_regions.begin(),
Expand All @@ -223,15 +229,18 @@ void WaitCheckRegionReady(
remain_regions.size(),
buffer.toString());
}

// Wait untill all region has catch up with TiKV or timeout happen
do
{
for (auto it = regions_to_check.begin(); it != regions_to_check.end();)
for (auto it = regions_to_check.begin(); it != regions_to_check.end(); /**/)
{
auto [region_id, latest_index] = *it;
if (auto region = kvstore.getRegion(region_id); region)
{
if (region->appliedIndex() >= latest_index)
{
// The region has already catch up
it = regions_to_check.erase(it);
}
else
Expand All @@ -241,6 +250,7 @@ void WaitCheckRegionReady(
}
else
{
// The region is removed from this instance
it = regions_to_check.erase(it);
}
}
Expand All @@ -250,9 +260,10 @@ void WaitCheckRegionReady(

LOG_INFO(
log,
"{} regions need to apply to latest index, sleep for {:.3f}s",
"{} regions need to apply to latest index, sleep for {:.3f}s, tot_regions={}",
regions_to_check.size(),
wait_tick_time);
wait_tick_time,
total_regions_cnt);
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<Int64>(wait_tick_time * 1000)));
wait_tick_time = std::min(max_wait_tick_time, wait_tick_time * 2);
} while (region_check_watch.elapsedSeconds() < get_wait_region_ready_timeout_sec
Expand All @@ -271,6 +282,7 @@ void WaitCheckRegionReady(
}
else
{
// The region is removed from this instance during waiting latest index
b.fmtAppend("{},{},none", e.first, e.second);
}
},
Expand All @@ -282,11 +294,14 @@ void WaitCheckRegionReady(
buffer.toString());
}

LOG_INFO(
const auto total_elapse = region_check_watch.elapsedSeconds();
const auto log_level = total_elapse > 60.0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_INFORMATION;
LOG_IMPL(
log,
"finish to check {} regions, time cost {:.3f}s",
total_regions_cnt,
region_check_watch.elapsedSeconds());
log_level,
"finish to check regions, time_cost={:.3f}s tot_regions={}",
total_elapse,
total_regions_cnt);
}

void WaitCheckRegionReady(
Expand All @@ -295,13 +310,14 @@ void WaitCheckRegionReady(
const std::atomic_size_t & terminate_signals_counter)
{
// wait interval to check region ready, not recommended to modify only if for tesing
// TODO: Move this hidden config to TMTContext
auto wait_region_ready_tick = tmt.getContext().getConfigRef().getUInt64("flash.wait_region_ready_tick", 0);
auto wait_region_ready_timeout_sec = static_cast<double>(tmt.waitRegionReadyTimeout());
const double max_wait_tick_time = 0 == wait_region_ready_tick ? 20.0 : wait_region_ready_timeout_sec;
double min_wait_tick_time = 0 == wait_region_ready_tick
? 2.5
: static_cast<double>(wait_region_ready_tick); // default tick in TiKV is about 2s (without hibernate-region)
return WaitCheckRegionReady(
return WaitCheckRegionReadyImpl(
tmt,
kvstore,
terminate_signals_counter,
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Types.h>
#include <common/logger_useful.h>

#include <ext/scope_guard.h>
Expand Down Expand Up @@ -131,9 +132,10 @@ std::string Region::getDebugString() const
{
const auto & meta_snap = meta.dumpRegionMetaSnapshot();
return fmt::format(
"[region_id={} index={} table_id={} ver={} conf_ver={} state={} peer={} range={}]",
"[region_id={} index={} {}table_id={} ver={} conf_ver={} state={} peer={} range={}]",
id(),
meta.appliedIndex(),
((keyspace_id == NullspaceID) ? "" : fmt::format("keyspace={} ", keyspace_id)),
mapped_table_id,
meta_snap.ver,
meta_snap.conf_ver,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST_F(RegionKVStoreOldTest, ReadIndex)
const std::atomic_size_t terminate_signals_counter{};
std::thread t([&]() {
notifier.wake();
WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 20, 20 * 60);
WaitCheckRegionReadyImpl(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 20, 20 * 60);
});
SCOPE_EXIT({
t.join();
Expand Down Expand Up @@ -162,7 +162,7 @@ TEST_F(RegionKVStoreOldTest, ReadIndex)
const std::atomic_size_t terminate_signals_counter{};
std::thread t([&]() {
notifier.wake();
WaitCheckRegionReady(
WaitCheckRegionReadyImpl(
ctx.getTMTContext(),
kvs,
terminate_signals_counter,
Expand Down