diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 1df0e984968..67e29dadbcb 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -265,23 +265,12 @@ void injectFailPointForLocalRead([[maybe_unused]] const SelectQueryInfo & query_ }); } -String genErrMsgForLocalRead( - const ManageableStoragePtr & storage, - const KeyspaceID keyspace_id, - const TableID & table_id, - const TableID & logical_table_id) +String genErrMsgForLocalRead(const KeyspaceID keyspace_id, const TableID & table_id, const TableID & logical_table_id) { return table_id == logical_table_id - ? fmt::format( - "(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={})", - storage->getDatabaseName(), - storage->getTableName(), - keyspace_id, - table_id) + ? fmt::format("(while creating read sources from storage, keyspace_id={} table_id={})", keyspace_id, table_id) : fmt::format( - "(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={} logical_table_id={})", - storage->getDatabaseName(), - storage->getTableName(), + "(while creating read sources from storage, keyspace_id={} table_id={} logical_table_id={})", keyspace_id, table_id, logical_table_id); @@ -1081,14 +1070,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal else { // Throw an exception for TiDB / TiSpark to retry - e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id)); + e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id)); throw; } } catch (DB::Exception & e) { /// Other unknown exceptions - e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id)); + e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id)); throw; } } @@ -1164,14 +1153,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal else { // Throw an exception for TiDB / TiSpark to retry - e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id)); + e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id)); throw; } } catch (DB::Exception & e) { /// Other unknown exceptions - e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id)); + e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id)); throw; } } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 274d81b75b0..c8ae0b27805 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -58,6 +58,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -218,8 +219,6 @@ DeltaMergeStore::DeltaMergeStore( , path_pool(std::make_shared( global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))) , settings(settings_) - , db_name(db_name_) - , table_name(table_name_) , keyspace_id(keyspace_id_) , physical_table_id(physical_table_id_) , is_common_handle(is_common_handle_) @@ -230,6 +229,12 @@ DeltaMergeStore::DeltaMergeStore( , next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY) , log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_))) { + { + std::unique_lock lock(mtx_table_meta); + table_meta.db_name = db_name_; + table_meta.table_name = table_name_; + } + replica_exist.store(has_replica); // for mock test, table_id_ should be DB::InvalidTableID NamespaceID ns_id = physical_table_id == DB::InvalidTableID ? TEST_NAMESPACE_ID : physical_table_id; @@ -323,9 +328,9 @@ void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, Stri { path_pool->rename(new_database_name, new_table_name); - // TODO: replacing these two variables is not atomic, but could be good enough? - table_name.swap(new_table_name); - db_name.swap(new_database_name); + std::unique_lock lock(mtx_table_meta); + table_meta.table_name.swap(new_table_name); + table_meta.db_name.swap(new_database_name); } void DeltaMergeStore::dropAllSegments(bool keep_first_segment) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index bbeab6ca744..5932eb8ca10 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -277,8 +278,12 @@ class DeltaMergeStore : private boost::noncopyable void setUpBackgroundTask(const DMContextPtr & dm_context); - const String & getDatabaseName() const { return db_name; } - const String & getTableName() const { return table_name; } + TableNameMeta getTableMeta() const + { + std::shared_lock lock(mtx_table_meta); + return TableNameMeta{table_meta.db_name, table_meta.table_name}; + } + String getIdent() const { return fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id); } void rename(String new_path, String new_database_name, String new_table_name); @@ -776,8 +781,8 @@ class DeltaMergeStore : private boost::noncopyable Settings settings; StoragePoolPtr storage_pool; - String db_name; - String table_name; + mutable std::shared_mutex mtx_table_meta; + TableNameMeta table_meta; const KeyspaceID keyspace_id; const TableID physical_table_id; @@ -818,7 +823,7 @@ class DeltaMergeStore : private boost::noncopyable mutable std::shared_mutex read_write_mutex; LoggerPtr log; -}; // namespace DM +}; using DeltaMergeStorePtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index a31c2a5a80a..7d126e64f8a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -695,11 +695,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c auto segment_bytes = segment->getEstimatedBytes(); if (segment_rows >= dm_context->segment_limit_rows || segment_bytes >= dm_context->segment_limit_bytes) { - LOG_TRACE( - log, - "GC - Merge skipped because current segment is not small, segment={} table={}", - segment->simpleInfo(), - table_name); + LOG_TRACE(log, "GC - Merge skipped because current segment is not small, segment={}", segment->simpleInfo()); return {}; } @@ -708,13 +704,12 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c { LOG_TRACE( log, - "GC - Merge skipped because cannot find adjacent segments to merge, segment={} table={}", - segment->simpleInfo(), - table_name); + "GC - Merge skipped because cannot find adjacent segments to merge, segment={}", + segment->simpleInfo()); return {}; } - LOG_INFO(log, "GC - Trigger Merge, segment={} table={}", segment->simpleInfo(), table_name); + LOG_INFO(log, "GC - Trigger Merge, segment={}", segment->simpleInfo()); auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread); if (new_segment) { @@ -738,11 +733,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta( // The segment we just retrieved may be dropped from the map. Let's verify it again before creating a snapshot. if (!isSegmentValid(lock, segment)) { - LOG_TRACE( - log, - "GC - Skip checking MergeDelta because not valid, segment={} table={}", - segment->simpleInfo(), - table_name); + LOG_TRACE(log, "GC - Skip checking MergeDelta because not valid, segment={}", segment->simpleInfo()); return {}; } @@ -750,11 +741,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta( = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); if (!segment_snap) { - LOG_TRACE( - log, - "GC - Skip checking MergeDelta because snapshot failed, segment={} table={}", - segment->simpleInfo(), - table_name); + LOG_TRACE(log, "GC - Skip checking MergeDelta because snapshot failed, segment={}", segment->simpleInfo()); return {}; } } @@ -823,26 +810,24 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta( if (!should_compact) { - LOG_TRACE(log, "GC - MergeDelta skipped, segment={} table={}", segment->simpleInfo(), table_name); + LOG_TRACE(log, "GC - MergeDelta skipped, segment={}", segment->simpleInfo()); return {}; } LOG_INFO( log, - "GC - Trigger MergeDelta, compact_reason={} segment={} table={}", + "GC - Trigger MergeDelta, compact_reason={} segment={}", GC::toString(compact_reason), - segment->simpleInfo(), - table_name); + segment->simpleInfo()); auto new_segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::BackgroundGCThread, segment_snap); if (!new_segment) { LOG_DEBUG( log, - "GC - MergeDelta aborted, compact_reason={} segment={} table={}", + "GC - MergeDelta aborted, compact_reason={} segment={}", GC::toString(compact_reason), - segment->simpleInfo(), - table_name); + segment->simpleInfo()); return {}; } @@ -872,8 +857,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options) DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire); LOG_TRACE( log, - "GC on table start, table={} check_key={} options={} gc_safe_point={} max_gc_limit={}", - table_name, + "GC on table start, check_key={} options={} gc_safe_point={} max_gc_limit={}", next_gc_check_key.toDebugString(), gc_options.toString(), gc_safe_point, @@ -932,7 +916,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options) if (!new_seg) { - LOG_TRACE(log, "GC - Skipped segment, segment={} table={}", segment->simpleInfo(), table_name); + LOG_TRACE(log, "GC - Skipped segment, segment={}", segment->simpleInfo()); continue; } @@ -940,7 +924,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options) } catch (Exception & e) { - e.addMessage(fmt::format("Error while GC segment, segment={} table={}", segment->info(), table_name)); + e.addMessage( + fmt::format("Error while GC segment, segment={} log_ident={}", segment->info(), log->identifier())); e.rethrow(); } } diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index e887e4dddc2..2de1e8977e6 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -93,7 +93,7 @@ class IManageableStorage : public IStorage /// Return true is data dir exist virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } - virtual ::TiDB::StorageEngine engineType() const = 0; + virtual TiDB::StorageEngine engineType() const = 0; virtual String getDatabaseName() const = 0; @@ -185,7 +185,7 @@ class IManageableStorage : public IStorage throw Exception( "Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); - }; + } /// The `block_decoding_schema_epoch` is just an internal version for `DecodingStorageSchemaSnapshot`, /// And it has no relation with the table schema version. diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index e59ba187ceb..75c64b40368 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -467,8 +467,7 @@ class DMBlockOutputStream : public IBlockOutputStream } catch (DB::Exception & e) { - e.addMessage( - fmt::format("(while writing to table `{}`.`{}`)", store->getDatabaseName(), store->getTableName())); + e.addMessage(fmt::format("(while writing to table `{}`)", store->getIdent())); throw; } @@ -1622,12 +1621,12 @@ String StorageDeltaMerge::getTableName() const { if (storeInited()) { - return _store->getTableName(); + return _store->getTableMeta().table_name; } std::lock_guard lock(store_mutex); if (storeInited()) { - return _store->getTableName(); + return _store->getTableMeta().table_name; } return table_column_info->table_name; } @@ -1636,12 +1635,12 @@ String StorageDeltaMerge::getDatabaseName() const { if (storeInited()) { - return _store->getDatabaseName(); + return _store->getTableMeta().db_name; } std::lock_guard lock(store_mutex); if (storeInited()) { - return _store->getDatabaseName(); + return _store->getTableMeta().db_name; } return table_column_info->db_name; } diff --git a/dbms/src/Storages/TableNameMeta.h b/dbms/src/Storages/TableNameMeta.h new file mode 100644 index 00000000000..c2aa8b2bd39 --- /dev/null +++ b/dbms/src/Storages/TableNameMeta.h @@ -0,0 +1,28 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ + +struct TableNameMeta +{ + String db_name; + String table_name; +}; + +} // namespace DB