Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo committed Jul 3, 2023
1 parent 6aa3dc6 commit ec97a43
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 58 deletions.
18 changes: 10 additions & 8 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ namespace DB
M(force_stop_background_checkpoint_upload) \
M(skip_seek_before_read_dmfile) \
M(exception_after_large_write_exceed) \
M(proactive_flush_force_set_type)
M(proactive_flush_force_set_type) \
M(proactive_flush_between_persist_cache_and_region) \
M(proactive_flush_between_persist_regions)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand All @@ -114,13 +116,13 @@ namespace DB
M(pause_after_copr_streams_acquired_once) \
M(pause_before_register_non_root_mpp_task)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_query_init) \
#define APPLY_FOR_PAUSEABLE_FAILPOINTS(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_query_init) \
M(proactive_flush_before_persist_region) \
M(passive_flush_before_persist_region)

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ namespace DB
F(type_write, {{"type", "write"}}, ExpBuckets{0.001, 2, 30})) \
M(tiflash_raft_write_data_to_storage_duration_seconds, "Bucketed histogram of writting region into storage layer", Histogram, \
F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_raft_raft_log_lag_count, "Bucketed histogram of applying write command Raft logs", Histogram, \
F(type_compact_index, {{"type", "compact_index"}}, EqualWidthBuckets{0, 200, 5})) \
/* required by DBaaS */ \
M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \
F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,12 @@ void MockRaftStoreProxy::doApply(
auto res = kvs.tryFlushRegionData(region_id, false, true, tmt, index, term, region->getApply().truncated_state().index(), region->getApply().truncated_state().term());
auto compact_index = cmd.admin().request.compact_log().compact_index();
auto compact_term = cmd.admin().request.compact_log().compact_term();
if (!res) {
if (!res)
{
LOG_DEBUG(log, "mock pre exec reject");
} else {
}
else
{
region->updateTruncatedState(compact_index, compact_term);
LOG_DEBUG(log, "mock pre exec success, update to {},{}", compact_index, compact_term);
}
Expand Down
35 changes: 23 additions & 12 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace FailPoints
extern const char force_fail_in_flush_region_data[];
extern const char proactive_flush_before_persist_region[];
extern const char passive_flush_before_persist_region[];
extern const char proactive_flush_between_persist_cache_and_region[];
extern const char proactive_flush_between_persist_regions[];
} // namespace FailPoints

KVStore::KVStore(Context & context)
Expand Down Expand Up @@ -447,15 +449,20 @@ bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 fl

auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo();

auto gap = region_compact_log_gap.load();
if(index > truncated_index + gap && flush_if_possible) {
// This rarely happens when there are too may raft logs, which don't trigger a proactive flush.
LOG_INFO(log, "{} flush region due to tryFlushRegionData, index {} term {} truncated_index {} truncated_term {} log gap {}", curr_region.toString(false), index, term, truncated_index, truncated_term, gap);
return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term);
}

LOG_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);
auto current_gap = index - truncated_index;
GET_METRIC(tiflash_raft_raft_log_lag_count, type_compact_index).Observe(current_gap);
auto gap_threshold = region_compact_log_gap.load();
if (flush_if_possible)
{
if (index > truncated_index + gap_threshold)
{
// This rarely happens when there are too may raft logs, which don't trigger a proactive flush.
LOG_INFO(log, "{} flush region due to tryFlushRegionData, index {} term {} truncated_index {} truncated_term {} gap {}/{}", curr_region.toString(false), index, term, truncated_index, truncated_term, current_gap, gap_threshold);
return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term);
}

LOG_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);
}
return false;
}

Expand Down Expand Up @@ -936,10 +943,6 @@ FileUsageStatistics KVStore::getFileUsageStatistics() const
return region_persister->getFileUsageStatistics();
}

// We need to get applied index before flushing cache, and can't hold region task lock when flush cache to avoid hang write cmd apply.
// 1. store applied index and applied term,
// 2. flush cache,
// 3. notify regions to compact log and store fushed state with applied index/term before flushing cache.
void KVStore::proactiveFlushCacheAndRegion(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, KeyspaceID keyspace_id, TableID table_id, bool is_background)
{
if (is_background)
Expand Down Expand Up @@ -1034,6 +1037,14 @@ void KVStore::proactiveFlushCacheAndRegion(TMTContext & tmt, const DM::RowKeyRan
{
LOG_DEBUG(log, "extra segment of region {} to flush, region range:[{},{}], flushed segment range:[{},{}]", region.first, region_rowkey_range.getStart().toDebugString(), region_rowkey_range.getEnd().toDebugString(), rowkey_range.getStart().toDebugString(), rowkey_range.getEnd().toDebugString());
// Both flushCache and persistRegion should be protected by region task lock.
// We can avoid flushCache with a region lock held, if we save some meta info before flushing cache.
// Merely store applied_index is not enough, considering some cmds leads to modification of other meta data.
// After flushCache, we will persist region and notify Proxy with the previously stored meta info.
// However, this solution still involves region task lock in this function.
// Meanwhile, other write/admin cmds may be executed, they requires we acquire lock here:
// For write cmds, we need to support replay from KVStore level, like enhancing duplicate key detection.
// For admin cmds, it can cause insertion/deletion of regions, so it can't be replayed currently.

auto region_task_lock = region_manager.genRegionTaskLock(region_id);
storage->flushCache(tmt.getContext(), std::get<2>(region.second));
persistRegion(*region_ptr, std::make_optional(&region_task_lock), "triggerCompactLog");
Expand Down
25 changes: 19 additions & 6 deletions dbms/src/Storages/Transaction/tests/gtest_proactive_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ namespace DB
{
namespace tests
{
TEST_F(RegionKVStoreTest, ProactiveFlushLiveness)
try

std::tuple<uint64_t, uint64_t, uint64_t> RegionKVStoreTest::prepareForProactiveFlushTest()
{
auto & ctx = TiFlashTestEnv::getGlobalContext();
// Allow enough large segment size.
ctx.getSettingsRef().dt_segment_limit_rows = 1000000;
ctx.getSettingsRef().dt_segment_limit_size = 1000000;
ctx.getSettingsRef().dt_segment_delta_cache_limit_rows = 0;
Expand All @@ -30,8 +31,6 @@ try
UInt64 region_id2 = 7;
TableID table_id;
KVStore & kvs = getKVS();
ASSERT_EQ(&ctx.getTMTContext().getContext(), &ctx);
ASSERT_EQ(ctx.getTMTContext().getContext().getSettingsRef().dt_segment_limit_size, 1000000);
MockRaftStoreProxy::FailCond cond;
{
initStorages();
Expand All @@ -57,6 +56,20 @@ try
ctx.getTMTContext().getRegionTable().updateRegion(*kvr1);
ctx.getTMTContext().getRegionTable().updateRegion(*kvr2);
}
return std::make_tuple(table_id, region_id, region_id2);
}

TEST_F(RegionKVStoreTest, ProactiveFlushLiveness)
try
{
auto & ctx = TiFlashTestEnv::getGlobalContext();
auto tp = prepareForProactiveFlushTest();
auto table_id = std::get<0>(tp);
auto region_id = std::get<1>(tp);
auto region_id2 = std::get<2>(tp);
MockRaftStoreProxy::FailCond cond;
KVStore & kvs = getKVS();

std::shared_ptr<std::atomic<size_t>> ai = std::make_shared<std::atomic<size_t>>();
{
// A fg flush and a bg flush will not deadlock.
Expand Down Expand Up @@ -171,10 +184,10 @@ CATCH
TEST_F(RegionKVStoreTest, ProactiveFlushRecover)
try
{
auto ctx = TiFlashTestEnv::getGlobalContext();
{
// Safe to abort between flushing regions.
}
{
} {
// Safe to abort between flushCache and persistRegion.
}
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/tests/kvstore_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class RegionKVStoreTest : public ::testing::Test
}

protected:
std::tuple<uint64_t, uint64_t, uint64_t> prepareForProactiveFlushTest();
static void testRaftSplit(KVStore & kvs, TMTContext & tmt);
static void testRaftMerge(KVStore & kvs, TMTContext & tmt);
static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt);
Expand Down
38 changes: 18 additions & 20 deletions dbms/src/Storages/tests/hit_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <map>
#include <list>
#include <iostream>

#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>

#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeDateTime.h>

#include <DataStreams/TabSeparatedRowInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/BlockInputStreamFromRowInputStream.h>
#include <DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/TabSeparatedRowInputStream.h>
#include <DataStreams/TabSeparatedRowOutputStream.h>
#include <DataStreams/copyData.h>

#include <Storages/StorageLog.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <Interpreters/Context.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageLog.h>

#include <Interpreters/Context.h>
#include <iostream>
#include <list>
#include <map>


using namespace DB;
Expand All @@ -42,8 +38,7 @@ using namespace DB;
int main(int argc, char ** argv)
try
{
NamesAndTypesList names_and_types_list
{
NamesAndTypesList names_and_types_list{
{"WatchID", std::make_shared<DataTypeUInt64>()},
{"JavaEnable", std::make_shared<DataTypeUInt8>()},
{"Title", std::make_shared<DataTypeString>()},
Expand Down Expand Up @@ -114,7 +109,10 @@ try
/// create a hit log table

StoragePtr table = StorageLog::create(
"./", "HitLog", ColumnsDescription{names_and_types_list}, DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
"./",
"HitLog",
ColumnsDescription{names_and_types_list},
DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
table->startup();

/// create a description of how to read data from the tab separated dump
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/tests/remove_symlink_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <port/unistd.h>
#include <iostream>
#include <Common/Exception.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Common/Exception.h>
#include <port/unistd.h>

#include <iostream>


int main(int, char **)
Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Storages/tests/seek_speed_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Stopwatch.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Common/Stopwatch.h>
#include <Poco/File.h>
#include <iostream>

#include <algorithm>
#include <iomanip>
#include <iostream>
#include <vector>
#include <algorithm>

/** We test the hypothesis that skipping unnecessary parts of seek-forward never degrades overall read speed.
* Before the measurements, it is desirable to discard disk cache: `echo 3 > /proc/sys/vm/drop_caches`.
Expand All @@ -35,7 +36,7 @@ int main(int argc, const char ** argv)
if (argc < 5 || argc > 6)
{
std::cerr << "Usage:\n"
<< argv[0] << " file bytes_in_block min_skip_bytes max_skip_bytes [buffer_size]" << std::endl;
<< argv[0] << " file bytes_in_block min_skip_bytes max_skip_bytes [buffer_size]" << std::endl;
return 0;
}

Expand Down Expand Up @@ -72,11 +73,11 @@ int main(int argc, const char ** argv)
}
watch.stop();

std::cout << checksum << std::endl; /// don't optimize
std::cout << checksum << std::endl; /// don't optimize

std::cout << "Read " << bytes_read << " out of " << size << " bytes in "
<< std::setprecision(4) << watch.elapsedSeconds() << " seconds ("
<< bytes_read / watch.elapsedSeconds() / 1000000 << " MB/sec.)" << std::endl;
<< std::setprecision(4) << watch.elapsedSeconds() << " seconds ("
<< bytes_read / watch.elapsedSeconds() / 1000000 << " MB/sec.)" << std::endl;

return 0;
}

0 comments on commit ec97a43

Please sign in to comment.