Skip to content

Commit

Permalink
cherry-pick #2431 #2461 to release-5.1 (#2564)
Browse files Browse the repository at this point in the history
* More debug info for DeltaTree (query_id, snapshot lifetime) (#2431)
* Fix deadlock on `removeExpiredSnapshots` (#2461)
  • Loading branch information
JaySon-Huang authored Aug 4, 2021
1 parent 051405c commit 9f65bd8
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 80 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store)
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired)

namespace FailPoints
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace FailPoints
extern const char region_exception_after_read_from_storage_some_error[];
extern const char region_exception_after_read_from_storage_all_error[];
extern const char pause_after_learner_read[];
extern const char pause_after_copr_streams_acquired[];
extern const char minimum_block_size_for_cross_join[];
} // namespace FailPoints

Expand Down Expand Up @@ -382,6 +383,7 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, DAGPipeline
}
});
}
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
}

void DAGQueryBlockInterpreter::readFromLocalStorage( //
Expand Down
41 changes: 19 additions & 22 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <Databases/IDatabase.h>
#include <IO/UncompressedCache.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/MarkCache.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/StorageMergeTree.h>
#include <common/config_common.h>

Expand Down Expand Up @@ -135,16 +137,10 @@ void AsynchronousMetrics::update()
{
auto databases = context.getDatabases();

size_t max_queue_size = 0;
size_t max_inserts_in_queue = 0;
size_t max_merges_in_queue = 0;

size_t sum_queue_size = 0;
size_t sum_inserts_in_queue = 0;
size_t sum_merges_in_queue = 0;

size_t max_absolute_delay = 0;
size_t max_relative_delay = 0;
double max_dt_stable_oldest_snapshot_lifetime = 0.0;
double max_dt_delta_oldest_snapshot_lifetime = 0.0;
double max_dt_meta_oldest_snapshot_lifetime = 0.0;
size_t max_dt_background_tasks_length = 0;

size_t max_part_count_for_partition = 0;

Expand All @@ -154,24 +150,25 @@ void AsynchronousMetrics::update()
{
auto & table = iterator->table();

if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); table_merge_tree)
if (auto dt_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(table); dt_storage)
{
auto stat = dt_storage->getStore()->getStat();
calculateMax(max_dt_stable_oldest_snapshot_lifetime, stat.storage_stable_oldest_snapshot_lifetime);
calculateMax(max_dt_delta_oldest_snapshot_lifetime, stat.storage_delta_oldest_snapshot_lifetime);
calculateMax(max_dt_meta_oldest_snapshot_lifetime, stat.storage_meta_oldest_snapshot_lifetime);
calculateMax(max_dt_background_tasks_length, stat.background_tasks_length);
}
else if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForPartition());
}
}
}

set("ReplicasMaxQueueSize", max_queue_size);
set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
set("ReplicasMaxMergesInQueue", max_merges_in_queue);

set("ReplicasSumQueueSize", sum_queue_size);
set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
set("ReplicasSumMergesInQueue", sum_merges_in_queue);

set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
set("ReplicasMaxRelativeDelay", max_relative_delay);

set("MaxDTStableOldestSnapshotLifetime", max_dt_stable_oldest_snapshot_lifetime);
set("MaxDTDeltaOldestSnapshotLifetime", max_dt_delta_oldest_snapshot_lifetime);
set("MaxDTMetaOldestSnapshotLifetime", max_dt_meta_oldest_snapshot_lifetime);
set("MaxDTBackgroundTasksLength", max_dt_background_tasks_length);
set("MaxPartCountForPartition", max_part_count_for_partition);
}

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct DMContext : private boost::noncopyable
const bool enable_relevant_place;
const bool enable_skippable_place;

const String query_id;

public:
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
Expand All @@ -80,7 +82,8 @@ struct DMContext : private boost::noncopyable
const NotCompress & not_compress_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings)
const DB::Settings & settings,
const String & query_id_ = "")
: db_context(db_context_),
metrics(db_context.getTiFlashMetrics()),
path_pool(path_pool_),
Expand All @@ -103,9 +106,11 @@ struct DMContext : private boost::noncopyable
read_delta_only(settings.dt_read_delta_only),
read_stable_only(settings.dt_read_stable_only),
enable_relevant_place(settings.dt_enable_relevant_place),
enable_skippable_place(settings.dt_enable_skippable_place)
enable_skippable_place(settings.dt_enable_skippable_place),
query_id(query_id_)
{
}

};

} // namespace DM
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
else
{
after_segment_read(dm_context, cur_segment);
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment->segmentId()) + "]");
LOG_TRACE(log, "Finish reading segment [" << cur_segment->segmentId() << "]");
cur_segment = {};
cur_stream = {};
}
Expand Down
22 changes: 13 additions & 9 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_)
bool is_common_handle_,
const String & query_id_ = "")
: version_limit(version_limit_),
is_common_handle(is_common_handle_),
header(toEmptyBlock(read_columns)),
query_id(query_id_),
log(&Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">"))
{
children.push_back(input);
Expand All @@ -48,8 +50,10 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
"Total rows: " << total_rows << ", pass: " << DB::toString((Float64)passed_rows * 100 / total_rows, 2)
<< "%, complete pass: " << DB::toString((Float64)complete_passed * 100 / total_blocks, 2)
<< "%, complete not pass: " << DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2)
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2)
<< "%, effective: " << DB::toString((Float64)effective_num_rows * 100 / passed_rows, 2) << "%");
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) //
<< "%, effective: " << DB::toString((Float64)effective_num_rows * 100 / passed_rows, 2) //
<< "%, read tso: " << version_limit
<< ", query id: " << (query_id.empty() ? String("<non-query>") : query_id));
}

void readPrefix() override;
Expand Down Expand Up @@ -89,8 +93,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
not_clean[i] = filter[i] && (compare(cur_handle, next_handle) == 0 || deleted);
effective[i] = filter[i] && (compare(cur_handle, next_handle) != 0);
if (filter[i])
gc_hint_version
= std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
gc_hint_version = std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
}
else
{
Expand Down Expand Up @@ -167,9 +170,10 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
}

private:
UInt64 version_limit;
bool is_common_handle;
Block header;
const UInt64 version_limit;
const bool is_common_handle;
const Block header;
const String query_id;

size_t handle_col_pos;
size_t version_col_pos;
Expand Down Expand Up @@ -207,7 +211,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t not_clean_rows = 0;
size_t effective_num_rows = 0;

Logger * log;
Poco::Logger * const log;
};
} // namespace DM
} // namespace DB
13 changes: 6 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void DeltaMergeStore::shutdown()
LOG_TRACE(log, "Shutdown DeltaMerge end [" << db_name << "." << table_name << "]");
}

DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings)
DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id)
{
std::shared_lock lock(read_write_mutex);

Expand All @@ -374,7 +374,8 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
settings.not_compress_columns,
is_common_handle,
rowkey_column_size,
db_settings);
db_settings,
query_id);
return DMContextPtr(ctx);
}

Expand Down Expand Up @@ -913,7 +914,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
{
SegmentReadTasks tasks;

auto dm_context = newDMContext(db_context, db_settings);
auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId());

{
std::shared_lock lock(read_write_mutex);
Expand Down Expand Up @@ -965,9 +966,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
size_t expected_block_size,
const SegmentIdSet & read_segments)
{
LOG_DEBUG(log, "Read with " << sorted_ranges.size() << " ranges");

auto dm_context = newDMContext(db_context, db_settings);
auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId());

SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments);

Expand Down Expand Up @@ -2266,7 +2265,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(DMContext & dm_c

LOG_DEBUG(log,
__FUNCTION__ << " [sorted_ranges: " << sorted_ranges.size() << "] [tasks before split: " << tasks.size()
<< "] [tasks final : " << result_tasks.size() << "] [ranges final: " << total_ranges << "]");
<< "] [tasks final: " << result_tasks.size() << "] [ranges final: " << total_ranges << "]");

return result_tasks;
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ class DeltaMergeStore : private boost::noncopyable
RegionSplitRes getRegionSplitPoint(DMContext & dm_context, const RowKeyRange & check_range, size_t max_region_size, size_t split_size);

private:
DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings);

DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id="");

bool pkIsHandle() const { return original_table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; }

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex

stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stream, read_range, 0);
stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_MVCC>>(
stream, columns_to_read, max_version, is_common_handle);
stream, columns_to_read, max_version, is_common_handle, dm_context.query_id);

return stream;
};
Expand All @@ -395,7 +395,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex
if (read_ranges.size() == 1)
{
LOG_TRACE(log,
"Segment [" << DB::toString(segment_id) << "] is read by max_version: " << max_version << ", 1"
"Segment [" << segment_id << "] is read by max_version: " << max_version << ", 1"
<< " range: " << DB::DM::toDebugString(read_ranges));
RowKeyRange real_range = rowkey_range.shrink(read_ranges[0]);
if (real_range.none())
Expand All @@ -414,8 +414,8 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex
}

LOG_TRACE(log,
"Segment [" << DB::toString(segment_id) << "] is read by max_version: " << max_version << ", "
<< DB::toString(streams.size()) << " ranges: " << DB::DM::toDebugString(read_ranges));
"Segment [" << segment_id << "] is read by max_version: " << max_version << ", " << streams.size()
<< " ranges: " << DB::DM::toDebugString(read_ranges));

if (streams.empty())
stream = std::make_shared<EmptyBlockInputStream>(toEmptyBlock(*read_info.read_columns));
Expand Down Expand Up @@ -1495,7 +1495,7 @@ bool Segment::placeDelete(const DMContext & dm_context,
{
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
auto place_handle_range = skippable_place ? RowKeyRange::startFrom(first_rowkey, is_common_handle, rowkey_column_size)
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);

auto compacted_index = update_delta_tree.getCompactedEntries();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Storages/Page/VersionSet/PageEntriesVersionSet.h>
#include <Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.h>

#include <stack>

#ifdef FIU_ENABLE
#include <Common/randomSeed.h>

#include <pcg_random.hpp>
#include <thread>
#endif


namespace CurrentMetrics
{
extern const Metric PSMVCCSnapshotsList;
} // namespace CurrentMetrics

namespace DB
{
namespace FailPoints
{
extern const char random_slow_page_storage_list_all_live_files[];
} // namespace FailPoints

//==========================================================================================
// PageEntriesVersionSetWithDelta
Expand Down Expand Up @@ -48,7 +61,7 @@ std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>>
PageEntriesVersionSetWithDelta::listAllLiveFiles(std::unique_lock<std::shared_mutex> && lock, bool need_scan_page_ids)
{
/// Collect live files is costly, we save SnapshotPtrs and scan them without lock.
(void)lock; // Note read_write_mutex must be hold.
// Note read_write_mutex must be hold.
std::vector<SnapshotPtr> valid_snapshots;
const size_t snapshots_size_before_clean = snapshots.size();
double longest_living_seconds = 0.0;
Expand All @@ -63,21 +76,25 @@ PageEntriesVersionSetWithDelta::listAllLiveFiles(std::unique_lock<std::shared_mu
}
else
{
fiu_do_on(FailPoints::random_slow_page_storage_list_all_live_files, {
pcg64 rng(randomSeed());
std::chrono::milliseconds ms{std::uniform_int_distribution(0, 900)(rng)}; // 0~900 milliseconds
std::this_thread::sleep_for(ms);
});
const auto snapshot_lifetime = snapshot_or_invalid->elapsedSeconds();
if (snapshot_lifetime > longest_living_seconds)
{
longest_living_seconds = snapshot_lifetime;
longest_living_from_thread_id = snapshot_or_invalid->t_id;
}
// Save valid snapshot.
valid_snapshots.emplace_back(snapshot_or_invalid);
valid_snapshots.emplace_back(snapshot_or_invalid); // Save valid snapshot and release them without lock later
iter++;
}
}
// Create a temporary latest snapshot by using `current`
valid_snapshots.emplace_back(std::make_shared<Snapshot>(this, current));

lock.unlock(); // Notice: unlock
lock.unlock(); // Notice: unlock and we should free those valid snapshots without locking

// Plus 1 for eliminating the counting of temporary snapshot of `current`
const size_t num_invalid_snapshot_to_clean = snapshots_size_before_clean + 1 - valid_snapshots.size();
Expand Down
Loading

0 comments on commit 9f65bd8

Please sign in to comment.