Skip to content

Commit

Permalink
Optimize apply speed under heavy write pressure (#4883)
Browse files Browse the repository at this point in the history
ref #4728
  • Loading branch information
lidezhu authored Jul 4, 2022
1 parent a89222a commit 6da631c
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 27 deletions.
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan

bool DeltaValueSpace::flush(DMContext & context)
{
bool v = false;
if (!is_flushing.compare_exchange_strong(v, true))
{
// other thread is flushing, just return.
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo());
return false;
}
SCOPE_EXIT({
bool v = true;
if (!is_flushing.compare_exchange_strong(v, false))
throw Exception(simpleInfo() + " is expected to be flushing", ErrorCodes::LOGICAL_ERROR);
});

LOG_FMT_DEBUG(log, "{}, Flush start", info());

/// We have two types of data needed to flush to disk:
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DeltaValueSpace
/// Note that those things can not be done at the same time.
std::atomic_bool is_updating = false;

/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
/// and other thread's work is just a waste of resource.
/// So we only allow one flush task running at any time to aviod waste resource.
std::atomic_bool is_flushing = false;

std::atomic<size_t> last_try_flush_rows = 0;
std::atomic<size_t> last_try_flush_bytes = 0;
std::atomic<size_t> last_try_compact_column_files = 0;
Expand Down Expand Up @@ -159,6 +164,8 @@ class DeltaValueSpace
size_t getTotalCacheBytes() const;
size_t getValidCacheRows() const;

bool isFlushing() const { return is_flushing; }

bool isUpdating() const { return is_updating; }

bool tryLockUpdating()
Expand Down
35 changes: 29 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
checkSegmentUpdate(dm_context, segment, ThreadType::Write);
}

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed)
{
RowKeyRange cur_range = range;
while (!cur_range.none())
{
RowKeyRange segment_range;

// Keep trying until succeeded.
// Keep trying until succeeded if needed.
while (true)
{
SegmentPtr segment;
Expand All @@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
{
break;
}
else if (!try_until_succeed)
{
return false;
}
}

cur_range.setStart(segment_range.end);
}
return true;
}

void DeltaMergeStore::mergeDeltaAll(const Context & context)
Expand Down Expand Up @@ -1347,6 +1352,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
&& (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows
|| delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes);
bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3;
/// For write thread, we want to avoid foreground flush to block the process of apply raft command.
/// So we increase the threshold of foreground flush for write thread.
if (thread_type == ThreadType::Write)
{
should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 10 || unsaved_bytes >= delta_cache_limit_bytes * 10;
}

bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) //
&& (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows
Expand Down Expand Up @@ -1404,9 +1415,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
}
else if (should_background_flush)
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
/// It's meaningless to add more flush tasks if the segment is flushing.
/// Because only one flush task can proceed at any time.
/// And after the current flush task finished,
/// it will call `checkSegmentUpdate` again to check whether there is more flush task to do.
if (!segment->isFlushing())
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
}
}
}

Expand Down Expand Up @@ -1502,7 +1520,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
return false;
};
auto try_bg_compact = [&]() {
if (should_compact)
/// Compact task should be a really low priority task.
/// And if the segment is flushing,
/// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput.
/// And after the current flush task complete,
/// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do.
if (should_compact && !segment->isFlushing())
{
delta_last_try_compact_column_files = column_file_count;
try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment, {}});
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
/// Try flush all data in `range` to disk and return whether the task succeed.
bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true)
{
auto dm_context = newDMContext(context, context.getSettingsRef());
flushCache(dm_context, range);
return flushCache(dm_context, range, try_until_succeed);
}

void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range);
bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true);

/// Merge delta into the stable layer for all segments.
///
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class Segment : private boost::noncopyable

void drop(const FileProviderPtr & file_provider, WriteBatches & wbs);

bool isFlushing() const { return delta->isFlushing(); }

RowsAndBytes getRowsAndBytesInRange(
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class IManageableStorage : public IStorage

virtual void flushCache(const Context & /*context*/) {}

virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {}
virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; }

virtual BlockInputStreamPtr status() { return {}; }

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,12 +775,12 @@ void StorageDeltaMerge::checkStatus(const Context & context)

void StorageDeltaMerge::flushCache(const Context & context)
{
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size));
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size), /* try_until_succeed */ true);
}

void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush)
bool StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed)
{
getAndMaybeInitStore()->flushCache(context, range_to_flush);
return getAndMaybeInitStore()->flushCache(context, range_to_flush, try_until_succeed);
}

void StorageDeltaMerge::mergeDelta(const Context & context)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class StorageDeltaMerge

void flushCache(const Context & context) override;

void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override;
bool flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed) override;

/// Merge delta into the stable layer for all segments.
///
Expand Down
27 changes: 17 additions & 10 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void KVStore::traverseRegions(std::function<void(RegionID, const RegionPtr &)> &
callback(region.first, region.second);
}

void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log)
bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed)
{
auto table_id = region.getMappedTableID();
auto storage = tmt.getStorages().get(table_id);
Expand All @@ -139,7 +139,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
"tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored",
region.toString(),
table_id);
return;
return true;
}

try
Expand All @@ -151,14 +151,15 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
region.getRange()->getMappedTableID(),
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
storage->flushCache(tmt.getContext(), rowkey_range);
return storage->flushCache(tmt.getContext(), rowkey_range, try_until_succeed);
}
catch (DB::Exception & e)
{
// We can ignore if storage is already dropped.
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
return true;
}

void KVStore::tryPersist(RegionID region_id)
Expand Down Expand Up @@ -366,12 +367,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and perist mem data.
// if rows or bytes more than threshold, try to flush cache and persist mem data.
return true;
}
else
{
// if thhere is little data in mem, wait until time interval reached threshold.
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
Expand All @@ -381,11 +382,17 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(

if (check_sync_log())
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
else
{
return EngineStoreApplyRes::None;
}
}
return EngineStoreApplyRes::None;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class KVStore final : private boost::noncopyable

void tryPersist(RegionID region_id);

static void tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log);
static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed = true);

size_t regionSize() const;
EngineStoreApplyRes handleAdminRaftCmd(raft_cmdpb::AdminRequest && request,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void removeObsoleteDataInStorage(
auto rowkey_range
= DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize());
dm_storage->deleteRange(rowkey_range, context->getSettingsRef());
dm_storage->flushCache(*context, rowkey_range); // flush to disk
dm_storage->flushCache(*context, rowkey_range, /*try_until_succeed*/ true); // flush to disk
}
catch (DB::Exception & e)
{
Expand Down

0 comments on commit 6da631c

Please sign in to comment.