Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More debug info for DeltaTree (query_id, snapshot lifetime) #2431

Merged
merged 6 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store)
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired)

namespace FailPoints
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace FailPoints
extern const char region_exception_after_read_from_storage_some_error[];
extern const char region_exception_after_read_from_storage_all_error[];
extern const char pause_after_learner_read[];
extern const char pause_after_copr_streams_acquired[];
extern const char minimum_block_size_for_cross_join[];
} // namespace FailPoints

Expand Down Expand Up @@ -383,6 +384,7 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, DAGPipeline
}
});
}
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
}

void DAGQueryBlockInterpreter::readFromLocalStorage( //
Expand Down
41 changes: 19 additions & 22 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <Databases/IDatabase.h>
#include <IO/UncompressedCache.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/MarkCache.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/StorageMergeTree.h>
#include <common/config_common.h>

Expand Down Expand Up @@ -135,16 +137,10 @@ void AsynchronousMetrics::update()
{
auto databases = context.getDatabases();

size_t max_queue_size = 0;
size_t max_inserts_in_queue = 0;
size_t max_merges_in_queue = 0;

size_t sum_queue_size = 0;
size_t sum_inserts_in_queue = 0;
size_t sum_merges_in_queue = 0;

size_t max_absolute_delay = 0;
size_t max_relative_delay = 0;
double max_dt_stable_oldest_snapshot_lifetime = 0.0;
double max_dt_delta_oldest_snapshot_lifetime = 0.0;
double max_dt_meta_oldest_snapshot_lifetime = 0.0;
size_t max_dt_background_tasks_length = 0;

size_t max_part_count_for_partition = 0;

Expand All @@ -154,24 +150,25 @@ void AsynchronousMetrics::update()
{
auto & table = iterator->table();

if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); table_merge_tree)
if (auto dt_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(table); dt_storage)
{
auto stat = dt_storage->getStore()->getStat();
calculateMax(max_dt_stable_oldest_snapshot_lifetime, stat.storage_stable_oldest_snapshot_lifetime);
calculateMax(max_dt_delta_oldest_snapshot_lifetime, stat.storage_delta_oldest_snapshot_lifetime);
calculateMax(max_dt_meta_oldest_snapshot_lifetime, stat.storage_meta_oldest_snapshot_lifetime);
calculateMax(max_dt_background_tasks_length, stat.background_tasks_length);
}
else if (StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForPartition());
}
}
}

set("ReplicasMaxQueueSize", max_queue_size);
set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
set("ReplicasMaxMergesInQueue", max_merges_in_queue);

set("ReplicasSumQueueSize", sum_queue_size);
set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
set("ReplicasSumMergesInQueue", sum_merges_in_queue);

set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
set("ReplicasMaxRelativeDelay", max_relative_delay);

set("MaxDTStableOldestSnapshotLifetime", max_dt_stable_oldest_snapshot_lifetime);
set("MaxDTDeltaOldestSnapshotLifetime", max_dt_delta_oldest_snapshot_lifetime);
set("MaxDTMetaOldestSnapshotLifetime", max_dt_meta_oldest_snapshot_lifetime);
set("MaxDTBackgroundTasksLength", max_dt_background_tasks_length);
set("MaxPartCountForPartition", max_part_count_for_partition);
}

Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct DMContext : private boost::noncopyable
const bool enable_relevant_place;
const bool enable_skippable_place;

const String query_id;

public:
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
Expand All @@ -80,7 +82,8 @@ struct DMContext : private boost::noncopyable
const NotCompress & not_compress_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings)
const DB::Settings & settings,
const String & query_id_ = "")
: db_context(db_context_),
metrics(db_context.getTiFlashMetrics()),
path_pool(path_pool_),
Expand All @@ -103,13 +106,12 @@ struct DMContext : private boost::noncopyable
read_delta_only(settings.dt_read_delta_only),
read_stable_only(settings.dt_read_stable_only),
enable_relevant_place(settings.dt_enable_relevant_place),
enable_skippable_place(settings.dt_enable_skippable_place)
{
}
RateLimiterPtr getWriteLimiter() const
enable_skippable_place(settings.dt_enable_skippable_place),
query_id(query_id_)
{
return db_context.getWriteLimiter();
}

RateLimiterPtr getWriteLimiter() const { return db_context.getWriteLimiter(); }
};

} // namespace DM
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
else
{
after_segment_read(dm_context, cur_segment);
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment->segmentId()) + "]");
LOG_TRACE(log, "Finish reading segment [" << cur_segment->segmentId() << "]");
cur_segment = {};
cur_stream = {};
}
Expand Down
22 changes: 13 additions & 9 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefines & read_columns,
UInt64 version_limit_,
bool is_common_handle_)
bool is_common_handle_,
const String & query_id_ = "")
: version_limit(version_limit_),
is_common_handle(is_common_handle_),
header(toEmptyBlock(read_columns)),
query_id(query_id_),
log(&Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">"))
{
children.push_back(input);
Expand All @@ -48,8 +50,10 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
"Total rows: " << total_rows << ", pass: " << DB::toString((Float64)passed_rows * 100 / total_rows, 2)
<< "%, complete pass: " << DB::toString((Float64)complete_passed * 100 / total_blocks, 2)
<< "%, complete not pass: " << DB::toString((Float64)complete_not_passed * 100 / total_blocks, 2)
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2)
<< "%, effective: " << DB::toString((Float64)effective_num_rows * 100 / passed_rows, 2) << "%");
<< "%, not clean: " << DB::toString((Float64)not_clean_rows * 100 / passed_rows, 2) //
<< "%, effective: " << DB::toString((Float64)effective_num_rows * 100 / passed_rows, 2) //
<< "%, read tso: " << version_limit
<< ", query id: " << (query_id.empty() ? String("<non-query>") : query_id));
}

void readPrefix() override;
Expand Down Expand Up @@ -89,8 +93,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
not_clean[i] = filter[i] && (compare(cur_handle, next_handle) == 0 || deleted);
effective[i] = filter[i] && (compare(cur_handle, next_handle) != 0);
if (filter[i])
gc_hint_version
= std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
gc_hint_version = std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
}
else
{
Expand Down Expand Up @@ -167,9 +170,10 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
}

private:
UInt64 version_limit;
bool is_common_handle;
Block header;
const UInt64 version_limit;
const bool is_common_handle;
const Block header;
const String query_id;

size_t handle_col_pos;
size_t version_col_pos;
Expand Down Expand Up @@ -207,7 +211,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t not_clean_rows = 0;
size_t effective_num_rows = 0;

Logger * log;
Poco::Logger * const log;
};
} // namespace DM
} // namespace DB
13 changes: 6 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void DeltaMergeStore::shutdown()
LOG_TRACE(log, "Shutdown DeltaMerge end [" << db_name << "." << table_name << "]");
}

DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings)
DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id)
{
std::shared_lock lock(read_write_mutex);

Expand All @@ -374,7 +374,8 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
settings.not_compress_columns,
is_common_handle,
rowkey_column_size,
db_settings);
db_settings,
query_id);
return DMContextPtr(ctx);
}

Expand Down Expand Up @@ -913,7 +914,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
{
SegmentReadTasks tasks;

auto dm_context = newDMContext(db_context, db_settings);
auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId());

{
std::shared_lock lock(read_write_mutex);
Expand Down Expand Up @@ -965,9 +966,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
size_t expected_block_size,
const SegmentIdSet & read_segments)
{
LOG_DEBUG(log, "Read with " << sorted_ranges.size() << " ranges");

auto dm_context = newDMContext(db_context, db_settings);
auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId());

SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments);

Expand Down Expand Up @@ -2265,7 +2264,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges(DMContext & dm_c

LOG_DEBUG(log,
__FUNCTION__ << " [sorted_ranges: " << sorted_ranges.size() << "] [tasks before split: " << tasks.size()
<< "] [tasks final : " << result_tasks.size() << "] [ranges final: " << total_ranges << "]");
<< "] [tasks final: " << result_tasks.size() << "] [ranges final: " << total_ranges << "]");

return result_tasks;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ class DeltaMergeStore : private boost::noncopyable
private:
#endif

DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings);
DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id="");

bool pkIsHandle() const { return original_table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; }

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex

stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stream, read_range, 0);
stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_MVCC>>(
stream, columns_to_read, max_version, is_common_handle);
stream, columns_to_read, max_version, is_common_handle, dm_context.query_id);

return stream;
};
Expand All @@ -393,7 +393,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex
if (read_ranges.size() == 1)
{
LOG_TRACE(log,
"Segment [" << DB::toString(segment_id) << "] is read by max_version: " << max_version << ", 1"
"Segment [" << segment_id << "] is read by max_version: " << max_version << ", 1"
<< " range: " << DB::DM::toDebugString(read_ranges));
RowKeyRange real_range = rowkey_range.shrink(read_ranges[0]);
if (real_range.none())
Expand All @@ -412,8 +412,8 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex
}

LOG_TRACE(log,
"Segment [" << DB::toString(segment_id) << "] is read by max_version: " << max_version << ", "
<< DB::toString(streams.size()) << " ranges: " << DB::DM::toDebugString(read_ranges));
"Segment [" << segment_id << "] is read by max_version: " << max_version << ", " << streams.size()
<< " ranges: " << DB::DM::toDebugString(read_ranges));

if (streams.empty())
stream = std::make_shared<EmptyBlockInputStream>(toEmptyBlock(*read_info.read_columns));
Expand Down Expand Up @@ -1489,7 +1489,7 @@ bool Segment::placeDelete(const DMContext & dm_context,
{
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
auto place_handle_range = skippable_place ? RowKeyRange::startFrom(first_rowkey, is_common_handle, rowkey_column_size)
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);
: RowKeyRange::newAll(is_common_handle, rowkey_column_size);

auto compacted_index = update_delta_tree.getCompactedEntries();

Expand Down
49 changes: 47 additions & 2 deletions metrics/grafana/tiflash_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,22 @@
"intervalFactor": 1,
"legendFormat": "write block",
"refId": "B"
},
{
"expr": "sum(rate(tiflash_system_profile_event_DMWriteBlock{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance, type)",
"format": "time_series",
"hide": true,
"intervalFactor": 1,
"legendFormat": "write block-{{instance}}",
"refId": "C"
},
{
"expr": "sum(increase(tiflash_storage_command_count{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance, type)",
"format": "time_series",
"hide": true,
"intervalFactor": 1,
"legendFormat": "{{type}}-{{instance}}",
"refId": "D"
}
],
"thresholds": [],
Expand Down Expand Up @@ -3588,7 +3604,12 @@
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"seriesOverrides": [
{
"alias": "/max_snapshot_lifetime/",
"yaxis": 2
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
Expand Down Expand Up @@ -3671,6 +3692,30 @@
"intervalFactor": 1,
"legendFormat": "place_index-{{instance}}",
"refId": "H"
},
{
"expr": "tiflash_system_asynchronous_metric_MaxDTDeltaOldestSnapshotLifetime{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"hide": false,
"intervalFactor": 1,
"legendFormat": "max_snapshot_lifetime-{{instance}}",
"refId": "K"
},
{
"expr": "tiflash_system_asynchronous_metric_MaxDTStableOldestSnapshotLifetime{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"hide": true,
"intervalFactor": 1,
"legendFormat": "max_snapshot_lifetime_stable-{{instance}}",
"refId": "L"
},
{
"expr": "tiflash_system_asynchronous_metric_MaxDTMetaOldestSnapshotLifetime{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"hide": true,
"intervalFactor": 1,
"legendFormat": "max_snapshot_lifetime_meta-{{instance}}",
"refId": "M"
}
],
"thresholds": [],
Expand Down Expand Up @@ -3701,7 +3746,7 @@
"show": true
},
{
"format": "short",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
Expand Down