Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Atomic rename DeltaMergeStore::db_name in memory #9246

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,12 @@ void injectFailPointForLocalRead([[maybe_unused]] const SelectQueryInfo & query_
});
}

String genErrMsgForLocalRead(
const ManageableStoragePtr & storage,
const KeyspaceID keyspace_id,
const TableID & table_id,
const TableID & logical_table_id)
String genErrMsgForLocalRead(const KeyspaceID keyspace_id, const TableID & table_id, const TableID & logical_table_id)
{
return table_id == logical_table_id
? fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
keyspace_id,
table_id)
? fmt::format("(while creating read sources from storage, keyspace_id={} table_id={})", keyspace_id, table_id)
: fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={} logical_table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
"(while creating read sources from storage, keyspace_id={} table_id={} logical_table_id={})",
keyspace_id,
table_id,
logical_table_id);
Expand Down Expand Up @@ -1084,14 +1073,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
Expand Down Expand Up @@ -1167,14 +1156,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ DeltaMergeStore::DeltaMergeStore(
, path_pool(std::make_shared<StoragePathPool>(
global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)))
, settings(settings_)
, db_name(db_name_)
, table_name(table_name_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, is_common_handle(is_common_handle_)
Expand All @@ -232,6 +230,12 @@ DeltaMergeStore::DeltaMergeStore(
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
, log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_)))
{
{
auto meta = table_meta.lockExclusive();
meta->db_name = db_name_;
meta->table_name = table_name_;
}

replica_exist.store(has_replica);
// for mock test, table_id_ should be DB::InvalidTableID
NamespaceID ns_id = physical_table_id == DB::InvalidTableID ? TEST_NAMESPACE_ID : physical_table_id;
Expand Down Expand Up @@ -325,9 +329,9 @@ void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, Stri
{
path_pool->rename(new_database_name, new_table_name);

// TODO: replacing these two variables is not atomic, but could be good enough?
table_name.swap(new_table_name);
db_name.swap(new_database_name);
auto meta = table_meta.lockExclusive();
meta->table_name.swap(new_table_name);
meta->db_name.swap(new_database_name);
}

void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h>
#include <Storages/Page/PageStorage_fwd.h>
#include <Storages/TableNameMeta.h>
#include <TiDB/Schema/TiDB.h>

#include <queue>
Expand Down Expand Up @@ -277,8 +278,12 @@ class DeltaMergeStore : private boost::noncopyable

void setUpBackgroundTask(const DMContextPtr & dm_context);

const String & getDatabaseName() const { return db_name; }
const String & getTableName() const { return table_name; }
TableNameMeta getTableMeta() const
{
auto meta = table_meta.lockShared();
return TableNameMeta{meta->db_name, meta->table_name};
}
String getIdent() const { return fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id); }

void rename(String new_path, String new_database_name, String new_table_name);

Expand Down Expand Up @@ -796,8 +801,7 @@ class DeltaMergeStore : private boost::noncopyable
Settings settings;
StoragePoolPtr storage_pool;

String db_name;
String table_name;
SharedMutexProtected<TableNameMeta> table_meta;

const KeyspaceID keyspace_id;
const TableID physical_table_id;
Expand Down Expand Up @@ -838,7 +842,7 @@ class DeltaMergeStore : private boost::noncopyable
mutable std::shared_mutex read_write_mutex;

LoggerPtr log;
}; // namespace DM
};

using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;

Expand Down
45 changes: 15 additions & 30 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
auto segment_bytes = segment->getEstimatedBytes();
if (segment_rows >= dm_context->segment_limit_rows || segment_bytes >= dm_context->segment_limit_bytes)
{
LOG_TRACE(
log,
"GC - Merge skipped because current segment is not small, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Merge skipped because current segment is not small, segment={}", segment->simpleInfo());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The logger of DeltaMergeStore contains the keyspace and table_id as a suffix. So we don't need to log down the table_name in these methods.

, log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_)))

return {};
}

Expand All @@ -713,13 +709,12 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
{
LOG_TRACE(
log,
"GC - Merge skipped because cannot find adjacent segments to merge, segment={} table={}",
segment->simpleInfo(),
table_name);
"GC - Merge skipped because cannot find adjacent segments to merge, segment={}",
segment->simpleInfo());
return {};
}

LOG_INFO(log, "GC - Trigger Merge, segment={} table={}", segment->simpleInfo(), table_name);
LOG_INFO(log, "GC - Trigger Merge, segment={}", segment->simpleInfo());
auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread);
if (new_segment)
{
Expand All @@ -743,23 +738,15 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(
// The segment we just retrieved may be dropped from the map. Let's verify it again before creating a snapshot.
if (!isSegmentValid(lock, segment))
{
LOG_TRACE(
log,
"GC - Skip checking MergeDelta because not valid, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Skip checking MergeDelta because not valid, segment={}", segment->simpleInfo());
return {};
}

segment_snap
= segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
if (!segment_snap)
{
LOG_TRACE(
log,
"GC - Skip checking MergeDelta because snapshot failed, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Skip checking MergeDelta because snapshot failed, segment={}", segment->simpleInfo());
return {};
}
}
Expand Down Expand Up @@ -828,26 +815,24 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(

if (!should_compact)
{
LOG_TRACE(log, "GC - MergeDelta skipped, segment={} table={}", segment->simpleInfo(), table_name);
LOG_TRACE(log, "GC - MergeDelta skipped, segment={}", segment->simpleInfo());
return {};
}

LOG_INFO(
log,
"GC - Trigger MergeDelta, compact_reason={} segment={} table={}",
"GC - Trigger MergeDelta, compact_reason={} segment={}",
GC::toString(compact_reason),
segment->simpleInfo(),
table_name);
segment->simpleInfo());
auto new_segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::BackgroundGCThread, segment_snap);

if (!new_segment)
{
LOG_DEBUG(
log,
"GC - MergeDelta aborted, compact_reason={} segment={} table={}",
"GC - MergeDelta aborted, compact_reason={} segment={}",
GC::toString(compact_reason),
segment->simpleInfo(),
table_name);
segment->simpleInfo());
return {};
}

Expand Down Expand Up @@ -877,8 +862,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)
DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire);
LOG_TRACE(
log,
"GC on table start, table={} check_key={} options={} gc_safe_point={} max_gc_limit={}",
table_name,
"GC on table start, check_key={} options={} gc_safe_point={} max_gc_limit={}",
next_gc_check_key.toDebugString(),
gc_options.toString(),
gc_safe_point,
Expand Down Expand Up @@ -937,15 +921,16 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)

if (!new_seg)
{
LOG_TRACE(log, "GC - Skipped segment, segment={} table={}", segment->simpleInfo(), table_name);
LOG_TRACE(log, "GC - Skipped segment, segment={}", segment->simpleInfo());
continue;
}

gc_segments_num++;
}
catch (Exception & e)
{
e.addMessage(fmt::format("Error while GC segment, segment={} table={}", segment->info(), table_name));
e.addMessage(
fmt::format("Error while GC segment, segment={} log_ident={}", segment->info(), log->identifier()));
e.rethrow();
}
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class IManageableStorage : public IStorage
/// Return true is data dir exist
virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); }

virtual ::TiDB::StorageEngine engineType() const = 0;
virtual TiDB::StorageEngine engineType() const = 0;

virtual String getDatabaseName() const = 0;

Expand Down Expand Up @@ -186,7 +186,7 @@ class IManageableStorage : public IStorage
throw Exception(
"Method getDecodingSchemaSnapshot is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
};
}

/// The `block_decoding_schema_epoch` is just an internal version for `DecodingStorageSchemaSnapshot`,
/// And it has no relation with the table schema version.
Expand Down
11 changes: 5 additions & 6 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ class DMBlockOutputStream : public IBlockOutputStream
}
catch (DB::Exception & e)
{
e.addMessage(
fmt::format("(while writing to table `{}`.`{}`)", store->getDatabaseName(), store->getTableName()));
e.addMessage(fmt::format("(while writing to table `{}`)", store->getIdent()));
throw;
}

Expand Down Expand Up @@ -1457,12 +1456,12 @@ String StorageDeltaMerge::getTableName() const
{
if (storeInited())
{
return _store->getTableName();
return _store->getTableMeta().table_name;
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getTableName();
return _store->getTableMeta().table_name;
}
return table_column_info->table_name;
}
Expand All @@ -1471,12 +1470,12 @@ String StorageDeltaMerge::getDatabaseName() const
{
if (storeInited())
{
return _store->getDatabaseName();
return _store->getTableMeta().db_name;
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getDatabaseName();
return _store->getTableMeta().db_name;
}
return table_column_info->db_name;
}
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/TableNameMeta.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>

namespace DB
{

struct TableNameMeta
{
String db_name;
String table_name;
};

} // namespace DB