Skip to content

Commit

Permalink
ddl: Atomic rename DeltaMergeStore::db_name in memory (#9246) (#9688)
Browse files Browse the repository at this point in the history
ref #9233

Make renaming `DeltaMergeStore::db_name` to be atomic

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang authored Dec 3, 2024
1 parent a2db730 commit e30ce75
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 66 deletions.
25 changes: 7 additions & 18 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,12 @@ void injectFailPointForLocalRead([[maybe_unused]] const SelectQueryInfo & query_
});
}

String genErrMsgForLocalRead(
const ManageableStoragePtr & storage,
const KeyspaceID keyspace_id,
const TableID & table_id,
const TableID & logical_table_id)
String genErrMsgForLocalRead(const KeyspaceID keyspace_id, const TableID & table_id, const TableID & logical_table_id)
{
return table_id == logical_table_id
? fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
keyspace_id,
table_id)
? fmt::format("(while creating read sources from storage, keyspace_id={} table_id={})", keyspace_id, table_id)
: fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={} logical_table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
"(while creating read sources from storage, keyspace_id={} table_id={} logical_table_id={})",
keyspace_id,
table_id,
logical_table_id);
Expand Down Expand Up @@ -1081,14 +1070,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
Expand Down Expand Up @@ -1164,14 +1153,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <ext/scope_guard.h>
#include <magic_enum.hpp>
#include <memory>
#include <mutex>

namespace ProfileEvents
{
Expand Down Expand Up @@ -218,8 +219,6 @@ DeltaMergeStore::DeltaMergeStore(
, path_pool(std::make_shared<StoragePathPool>(
global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)))
, settings(settings_)
, db_name(db_name_)
, table_name(table_name_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, is_common_handle(is_common_handle_)
Expand All @@ -230,6 +229,12 @@ DeltaMergeStore::DeltaMergeStore(
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
, log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_)))
{
{
std::unique_lock lock(mtx_table_meta);
table_meta.db_name = db_name_;
table_meta.table_name = table_name_;
}

replica_exist.store(has_replica);
// for mock test, table_id_ should be DB::InvalidTableID
NamespaceID ns_id = physical_table_id == DB::InvalidTableID ? TEST_NAMESPACE_ID : physical_table_id;
Expand Down Expand Up @@ -323,9 +328,9 @@ void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, Stri
{
path_pool->rename(new_database_name, new_table_name);

// TODO: replacing these two variables is not atomic, but could be good enough?
table_name.swap(new_table_name);
db_name.swap(new_database_name);
std::unique_lock lock(mtx_table_meta);
table_meta.table_name.swap(new_table_name);
table_meta.db_name.swap(new_database_name);
}

void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <Storages/Page/PageStorage_fwd.h>
#include <Storages/TableNameMeta.h>
#include <TiDB/Schema/TiDB.h>

#include <queue>
Expand Down Expand Up @@ -277,8 +278,12 @@ class DeltaMergeStore : private boost::noncopyable

void setUpBackgroundTask(const DMContextPtr & dm_context);

const String & getDatabaseName() const { return db_name; }
const String & getTableName() const { return table_name; }
TableNameMeta getTableMeta() const
{
std::shared_lock lock(mtx_table_meta);
return TableNameMeta{table_meta.db_name, table_meta.table_name};
}
String getIdent() const { return fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id); }

void rename(String new_path, String new_database_name, String new_table_name);

Expand Down Expand Up @@ -776,8 +781,8 @@ class DeltaMergeStore : private boost::noncopyable
Settings settings;
StoragePoolPtr storage_pool;

String db_name;
String table_name;
mutable std::shared_mutex mtx_table_meta;
TableNameMeta table_meta;

const KeyspaceID keyspace_id;
const TableID physical_table_id;
Expand Down Expand Up @@ -818,7 +823,7 @@ class DeltaMergeStore : private boost::noncopyable
mutable std::shared_mutex read_write_mutex;

LoggerPtr log;
}; // namespace DM
};

using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;

Expand Down
45 changes: 15 additions & 30 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,11 +695,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
auto segment_bytes = segment->getEstimatedBytes();
if (segment_rows >= dm_context->segment_limit_rows || segment_bytes >= dm_context->segment_limit_bytes)
{
LOG_TRACE(
log,
"GC - Merge skipped because current segment is not small, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Merge skipped because current segment is not small, segment={}", segment->simpleInfo());
return {};
}

Expand All @@ -708,13 +704,12 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
{
LOG_TRACE(
log,
"GC - Merge skipped because cannot find adjacent segments to merge, segment={} table={}",
segment->simpleInfo(),
table_name);
"GC - Merge skipped because cannot find adjacent segments to merge, segment={}",
segment->simpleInfo());
return {};
}

LOG_INFO(log, "GC - Trigger Merge, segment={} table={}", segment->simpleInfo(), table_name);
LOG_INFO(log, "GC - Trigger Merge, segment={}", segment->simpleInfo());
auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread);
if (new_segment)
{
Expand All @@ -738,23 +733,15 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(
// The segment we just retrieved may be dropped from the map. Let's verify it again before creating a snapshot.
if (!isSegmentValid(lock, segment))
{
LOG_TRACE(
log,
"GC - Skip checking MergeDelta because not valid, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Skip checking MergeDelta because not valid, segment={}", segment->simpleInfo());
return {};
}

segment_snap
= segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
if (!segment_snap)
{
LOG_TRACE(
log,
"GC - Skip checking MergeDelta because snapshot failed, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Skip checking MergeDelta because snapshot failed, segment={}", segment->simpleInfo());
return {};
}
}
Expand Down Expand Up @@ -823,26 +810,24 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(

if (!should_compact)
{
LOG_TRACE(log, "GC - MergeDelta skipped, segment={} table={}", segment->simpleInfo(), table_name);
LOG_TRACE(log, "GC - MergeDelta skipped, segment={}", segment->simpleInfo());
return {};
}

LOG_INFO(
log,
"GC - Trigger MergeDelta, compact_reason={} segment={} table={}",
"GC - Trigger MergeDelta, compact_reason={} segment={}",
GC::toString(compact_reason),
segment->simpleInfo(),
table_name);
segment->simpleInfo());
auto new_segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::BackgroundGCThread, segment_snap);

if (!new_segment)
{
LOG_DEBUG(
log,
"GC - MergeDelta aborted, compact_reason={} segment={} table={}",
"GC - MergeDelta aborted, compact_reason={} segment={}",
GC::toString(compact_reason),
segment->simpleInfo(),
table_name);
segment->simpleInfo());
return {};
}

Expand Down Expand Up @@ -872,8 +857,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)
DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire);
LOG_TRACE(
log,
"GC on table start, table={} check_key={} options={} gc_safe_point={} max_gc_limit={}",
table_name,
"GC on table start, check_key={} options={} gc_safe_point={} max_gc_limit={}",
next_gc_check_key.toDebugString(),
gc_options.toString(),
gc_safe_point,
Expand Down Expand Up @@ -932,15 +916,16 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)

if (!new_seg)
{
LOG_TRACE(log, "GC - Skipped segment, segment={} table={}", segment->simpleInfo(), table_name);
LOG_TRACE(log, "GC - Skipped segment, segment={}", segment->simpleInfo());
continue;
}

gc_segments_num++;
}
catch (Exception & e)
{
e.addMessage(fmt::format("Error while GC segment, segment={} table={}", segment->info(), table_name));
e.addMessage(
fmt::format("Error while GC segment, segment={} log_ident={}", segment->info(), log->identifier()));
e.rethrow();
}
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class IManageableStorage : public IStorage
/// Return true is data dir exist
virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); }

virtual ::TiDB::StorageEngine engineType() const = 0;
virtual TiDB::StorageEngine engineType() const = 0;

virtual String getDatabaseName() const = 0;

Expand Down Expand Up @@ -185,7 +185,7 @@ class IManageableStorage : public IStorage
throw Exception(
"Method getDecodingSchemaSnapshot is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
};
}

/// The `block_decoding_schema_epoch` is just an internal version for `DecodingStorageSchemaSnapshot`,
/// And it has no relation with the table schema version.
Expand Down
11 changes: 5 additions & 6 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,7 @@ class DMBlockOutputStream : public IBlockOutputStream
}
catch (DB::Exception & e)
{
e.addMessage(
fmt::format("(while writing to table `{}`.`{}`)", store->getDatabaseName(), store->getTableName()));
e.addMessage(fmt::format("(while writing to table `{}`)", store->getIdent()));
throw;
}

Expand Down Expand Up @@ -1622,12 +1621,12 @@ String StorageDeltaMerge::getTableName() const
{
if (storeInited())
{
return _store->getTableName();
return _store->getTableMeta().table_name;
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getTableName();
return _store->getTableMeta().table_name;
}
return table_column_info->table_name;
}
Expand All @@ -1636,12 +1635,12 @@ String StorageDeltaMerge::getDatabaseName() const
{
if (storeInited())
{
return _store->getDatabaseName();
return _store->getTableMeta().db_name;
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getDatabaseName();
return _store->getTableMeta().db_name;
}
return table_column_info->db_name;
}
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/TableNameMeta.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>

namespace DB
{

struct TableNameMeta
{
String db_name;
String table_name;
};

} // namespace DB

0 comments on commit e30ce75

Please sign in to comment.