Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize apply speed under heavy write pressure #4883

Merged
merged 10 commits into from
Jul 4, 2022
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ bool DeltaValueSpace::ingestColumnFiles(DMContext & /*context*/, const RowKeyRan

bool DeltaValueSpace::flush(DMContext & context)
{
bool v = false;
if (!is_flushing.compare_exchange_strong(v, true))
{
// other thread is flushing, just return.
LOG_FMT_DEBUG(log, "{}, Flush stop because other thread is flushing", simpleInfo());
return false;
}
SCOPE_EXIT({
bool v = true;
if (!is_flushing.compare_exchange_strong(v, false))
throw Exception(simpleInfo() + " is expected to be flushing", ErrorCodes::LOGICAL_ERROR);
});

LOG_FMT_DEBUG(log, "{}, Flush start", info());

/// We have two types of data needed to flush to disk:
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class DeltaValueSpace
/// Note that those things can not be done at the same time.
std::atomic_bool is_updating = false;

/// Note that it's safe to do multiple flush concurrently but only one of them can succeed,
/// and other thread's work is just a waste of resource.
/// So we only allow one flush task running at any time to aviod waste resource.
std::atomic_bool is_flushing = false;

std::atomic<size_t> last_try_flush_rows = 0;
std::atomic<size_t> last_try_flush_bytes = 0;
std::atomic<size_t> last_try_compact_column_files = 0;
Expand Down Expand Up @@ -159,6 +164,8 @@ class DeltaValueSpace
size_t getTotalCacheBytes() const;
size_t getValidCacheRows() const;

bool isFlushing() const { return is_flushing; }

bool isUpdating() const { return is_updating; }

bool tryLockUpdating()
Expand Down
35 changes: 29 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,14 +980,14 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings
checkSegmentUpdate(dm_context, segment, ThreadType::Write);
}

void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range)
bool DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed)
{
RowKeyRange cur_range = range;
while (!cur_range.none())
{
RowKeyRange segment_range;

// Keep trying until succeeded.
// Keep trying until succeeded if needed.
while (true)
{
SegmentPtr segment;
Expand All @@ -1010,10 +1010,15 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa
{
break;
}
else if (!try_until_succeed)
{
return false;
}
}

cur_range.setStart(segment_range.end);
}
return true;
}

void DeltaMergeStore::mergeDeltaAll(const Context & context)
Expand Down Expand Up @@ -1347,6 +1352,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
&& (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows
|| delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes);
bool should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 3 || unsaved_bytes >= delta_cache_limit_bytes * 3;
/// For write thread, we want to avoid foreground flush to block the process of apply raft command.
/// So we increase the threshold of foreground flush for write thread.
if (thread_type == ThreadType::Write)
{
should_foreground_flush = unsaved_rows >= delta_cache_limit_rows * 10 || unsaved_bytes >= delta_cache_limit_bytes * 10;
}

bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) //
&& (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows
Expand Down Expand Up @@ -1404,9 +1415,16 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
}
else if (should_background_flush)
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
/// It's meaningless to add more flush tasks if the segment is flushing.
/// Because only one flush task can proceed at any time.
/// And after the current flush task finished,
/// it will call `checkSegmentUpdate` again to check whether there is more flush task to do.
if (!segment->isFlushing())
{
delta_last_try_flush_rows = delta_rows;
delta_last_try_flush_bytes = delta_bytes;
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment, {}});
}
}
}

Expand Down Expand Up @@ -1502,7 +1520,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
return false;
};
auto try_bg_compact = [&]() {
if (should_compact)
/// Compact task should be a really low priority task.
/// And if the segment is flushing,
/// we should avoid adding background compact task to reduce lock contention on the segment and save disk throughput.
/// And after the current flush task complete,
/// it will call `checkSegmentUpdate` again to check whether there is other kinds of task to do.
if (should_compact && !segment->isFlushing())
{
delta_last_try_compact_column_files = column_file_count;
try_add_background_task(BackgroundTask{TaskType::Compact, dm_context, segment, {}});
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ class DeltaMergeStore : private boost::noncopyable
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);

/// Force flush all data to disk.
void flushCache(const Context & context, const RowKeyRange & range)
/// Try flush all data in `range` to disk and return whether the task succeed.
bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true)
{
auto dm_context = newDMContext(context, context.getSettingsRef());
flushCache(dm_context, range);
return flushCache(dm_context, range, try_until_succeed);
}

void flushCache(const DMContextPtr & dm_context, const RowKeyRange & range);
bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true);

/// Merge delta into the stable layer for all segments.
///
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ class Segment : private boost::noncopyable

void drop(const FileProviderPtr & file_provider, WriteBatches & wbs);

bool isFlushing() const { return delta->isFlushing(); }

RowsAndBytes getRowsAndBytesInRange(
DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class IManageableStorage : public IStorage

virtual void flushCache(const Context & /*context*/) {}

virtual void flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/) {}
virtual bool flushCache(const Context & /*context*/, const DM::RowKeyRange & /*range_to_flush*/, [[maybe_unused]] bool try_until_succeed = true) { return true; }

virtual BlockInputStreamPtr status() { return {}; }

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,12 +775,12 @@ void StorageDeltaMerge::checkStatus(const Context & context)

void StorageDeltaMerge::flushCache(const Context & context)
{
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size));
flushCache(context, DM::RowKeyRange::newAll(is_common_handle, rowkey_column_size), /* try_until_succeed */ true);
}

void StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush)
bool StorageDeltaMerge::flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed)
{
getAndMaybeInitStore()->flushCache(context, range_to_flush);
return getAndMaybeInitStore()->flushCache(context, range_to_flush, try_until_succeed);
}

void StorageDeltaMerge::mergeDelta(const Context & context)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class StorageDeltaMerge

void flushCache(const Context & context) override;

void flushCache(const Context & context, const DM::RowKeyRange & range_to_flush) override;
bool flushCache(const Context & context, const DM::RowKeyRange & range_to_flush, bool try_until_succeed) override;

/// Merge delta into the stable layer for all segments.
///
Expand Down
27 changes: 17 additions & 10 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void KVStore::traverseRegions(std::function<void(RegionID, const RegionPtr &)> &
callback(region.first, region.second);
}

void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log)
bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed)
{
auto table_id = region.getMappedTableID();
auto storage = tmt.getStorages().get(table_id);
Expand All @@ -139,7 +139,7 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
"tryFlushRegionCacheInStorage can not get table for region {} with table id {}, ignored",
region.toString(),
table_id);
return;
return true;
}

try
Expand All @@ -151,14 +151,15 @@ void KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
region.getRange()->getMappedTableID(),
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
storage->flushCache(tmt.getContext(), rowkey_range);
return storage->flushCache(tmt.getContext(), rowkey_range, try_until_succeed);
}
catch (DB::Exception & e)
{
// We can ignore if storage is already dropped.
if (e.code() != ErrorCodes::TABLE_IS_DROPPED)
throw;
}
return true;
}

void KVStore::tryPersist(RegionID region_id)
Expand Down Expand Up @@ -366,12 +367,12 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and perist mem data.
// if rows or bytes more than threshold, try to flush cache and persist mem data.
return true;
}
else
{
// if thhere is little data in mem, wait until time interval reached threshold.
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
Expand All @@ -381,11 +382,17 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(

if (check_sync_log())
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
else
{
return EngineStoreApplyRes::None;
}
}
return EngineStoreApplyRes::None;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class KVStore final : private boost::noncopyable

void tryPersist(RegionID region_id);

static void tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log);
static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, Poco::Logger * log, bool try_until_succeed = true);

size_t regionSize() const;
EngineStoreApplyRes handleAdminRaftCmd(raft_cmdpb::AdminRequest && request,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void removeObsoleteDataInStorage(
auto rowkey_range
= DM::RowKeyRange::fromRegionRange(handle_range, table_id, table_id, storage->isCommonHandle(), storage->getRowKeyColumnSize());
dm_storage->deleteRange(rowkey_range, context->getSettingsRef());
dm_storage->flushCache(*context, rowkey_range); // flush to disk
dm_storage->flushCache(*context, rowkey_range, /*try_until_succeed*/ true); // flush to disk
}
catch (DB::Exception & e)
{
Expand Down