Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Remove RegionPtrWithBlock #9735

Merged
merged 8 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("region_snapshot", MockRaftCommand::dbgFuncRegionSnapshot);
regSchemalessFunc("region_snapshot_data", MockRaftCommand::dbgFuncRegionSnapshotWithData);
regSchemalessFunc("region_snapshot_pre_handle_block", /**/ MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock);
regSchemalessFunc("region_snapshot_apply_block", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyBlock);
regSchemalessFunc("region_snapshot_pre_handle_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles);
regSchemalessFunc(
"region_snapshot_pre_handle_file_pks",
Expand Down
10 changes: 0 additions & 10 deletions dbms/src/Debug/dbgKVStore/dbgFuncMockRaftCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ struct MockRaftCommand
// ./storage-client.sh "DBGInvoke region_ingest_sst(database_name, table_name, region_id, start, end)"
static void dbgFuncIngestSST(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Apply snapshot for a region. (pre-handle)
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_pre_handle_block(database_name, table_name, region_id, start, end, handle_id1, tso1, del1, r1_c1, r1_c2, ..., handle_id2, tso2, del2, r2_c1, r2_c2, ... )"
static void dbgFuncRegionSnapshotPreHandleBlock(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Apply snapshot for a region. (apply a pre-handle snapshot)
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_apply_block(region_id)"
static void dbgFuncRegionSnapshotApplyBlock(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Simulate a region pre-handle snapshot data to DTFiles
// Usage:
// ./storage-client.sh "DBGInvoke region_snapshot_pre_handle_file(database_name, table_name, region_id, start, end, schema_string, pk_name[, test-fields=1, cfs="write,default"])"
Expand Down
180 changes: 12 additions & 168 deletions dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/setThreadName.h>
Expand Down Expand Up @@ -87,9 +88,10 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)
// Get start key and end key form multiple column if it is clustered_index.
std::vector<Field> start_keys;
std::vector<Field> end_keys;
const auto & pk_idx_cols = table_info.getPrimaryIndexInfo().idx_cols;
for (size_t i = 0; i < handle_column_size; i++)
{
auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset];
auto & column_info = table_info.columns[pk_idx_cols[i].offset];
auto start_field
= RegionBench::convertField(column_info, typeid_cast<const ASTLiteral &>(*args[3 + i]).value);
TiDB::DatumBumpy start_datum = TiDB::DatumBumpy(start_field, column_info.tp);
Expand All @@ -108,8 +110,7 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)

const size_t len = table->table_info.columns.size() + 3;

if ((args_end - args_begin) % len)
throw Exception("Number of insert values and columns do not match.", ErrorCodes::LOGICAL_ERROR);
RUNTIME_CHECK_MSG((((args_end - args_begin) % len) == 0), "Number of insert values and columns do not match.");

// Parse row values
for (auto it = args_begin; it != args_end; it += len)
Expand Down Expand Up @@ -170,9 +171,15 @@ void MockRaftCommand::dbgFuncRegionSnapshotWithData(Context & context, const AST
auto table_id = region->getMappedTableID();
auto cnt = region->writeCFCount();

// Mock to apply a snapshot with data in `region`
// Mock to apply a snapshot with committed rows in `region`
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithBlock>(region, tmt);
tmt.getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithSnapshotFiles>(region, tmt);
// Decode the committed rows into Block and flush to the IStorage layer.
// This dose not ensure the atomic of "apply snapshot". But we only use it for writing tests now.
if (auto region_applied = tmt.getKVStore()->getRegion(region_id); region_applied)
{
tmt.getRegionTable().tryWriteBlockByRegion(region_applied);
}
output(fmt::format("put region #{}, range{} to table #{} with {} records", region_id, range_string, table_id, cnt));
}

Expand Down Expand Up @@ -488,30 +495,10 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG
struct GlobalRegionMap
{
using Key = std::string;
using BlockVal = std::pair<RegionPtr, RegionPtrWithBlock::CachePtr>;
std::unordered_map<Key, BlockVal> regions_block;
using SnapPath = std::pair<RegionPtr, std::vector<DM::ExternalDTFileInfo>>;
std::unordered_map<Key, SnapPath> regions_snap_files;
std::mutex mutex;

void insertRegionCache(const Key & name, BlockVal && val)
{
auto _ = std::lock_guard(mutex);
regions_block[name] = std::move(val);
}
BlockVal popRegionCache(const Key & name)
{
auto _ = std::lock_guard(mutex);
if (auto it = regions_block.find(name); it == regions_block.end())
throw Exception(std::string(__PRETTY_FUNCTION__) + " ... " + name);
else
{
auto ret = std::move(it->second);
regions_block.erase(it);
return ret;
}
}

void insertRegionSnap(const Key & name, SnapPath && val)
{
auto _ = std::lock_guard(mutex);
Expand All @@ -533,149 +520,6 @@ struct GlobalRegionMap

static GlobalRegionMap GLOBAL_REGION_MAP;

/// Mock to pre-decode snapshot to block then apply

/// Pre-decode region data into block cache and remove committed data from `region`
RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & region, Context & context)
{
auto keyspace_id = region->getKeyspaceID();
const auto & tmt = context.getTMTContext();
{
Timestamp gc_safe_point = 0;
if (auto pd_client = tmt.getPDClient(); !pd_client->isMock())
{
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
/**
* In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
* If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files.
*/
region->tryCompactionFilter(gc_safe_point);
}
std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
try
{
data_list_read = ReadRegionCommitCache(region, true);
if (!data_list_read)
return nullptr;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ILLFORMAT_RAFT_ROW)
{
// br or lighting may write illegal data into tikv, skip pre-decode and ingest sst later.
LOG_WARNING(
Logger::get(__PRETTY_FUNCTION__),
"Got error while reading region committed cache: {}. Skip pre-decode and keep original cache.",
e.displayText());
// set data_list_read and let apply snapshot process use empty block
data_list_read = RegionDataReadInfoList();
}
else
throw;
}

TableID table_id = region->getMappedTableID();
Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
Block res_block;

const auto atomic_decode = [&](bool force_decode) -> bool {
Stopwatch watch;
auto storage = tmt.getStorages().get(keyspace_id, table_id);
if (storage == nullptr || storage->isTombstone())
{
if (!force_decode) // Need to update.
return false;
if (storage == nullptr) // Table must have just been GC-ed.
return true;
}

/// Get a structure read lock throughout decode, during which schema must not change.
TableStructureLockHolder lock;
try
{
lock = storage->lockStructureForShare(getThreadNameAndID());
}
catch (DB::Exception & e)
{
// If the storage is physical dropped (but not removed from `ManagedStorages`) when we want to decode snapshot, consider the decode done.
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
return true;
else
throw;
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot
= storage->getSchemaSnapshotAndBlockForDecoding(lock, false, true).first;
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
return reader.read(res_block, *data_list_read, force_decode);
};

/// In TiFlash, the actions between applying raft log and schema changes are not strictly synchronized.
/// There could be a chance that some raft logs come after a table gets tombstoned. Take care of it when
/// decoding data. Check the test case for more details.
FAIL_POINT_PAUSE(FailPoints::pause_before_apply_raft_snapshot);

if (!atomic_decode(false))
{
tmt.getSchemaSyncerManager()->syncSchemas(context, keyspace_id);

if (!atomic_decode(true))
throw Exception(
"Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed",
ErrorCodes::LOGICAL_ERROR);
}

RemoveRegionCommitCache(region, *data_list_read);

return std::make_unique<RegionPreDecodeBlockData>(std::move(res_block), schema_version, std::move(*data_list_read));
}

void MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock(
Context & context,
const ASTs & args,
DBGInvoker::Printer output)
{
FmtBuffer fmt_buf;
auto region = GenDbgRegionSnapshotWithData(context, args);
const auto region_name = "__snap_" + std::to_string(region->id());
fmt_buf.fmtAppend("pre-handle {} snapshot with data {}", region->toString(false), region->dataInfo());
auto & tmt = context.getTMTContext();
auto block_cache = GenRegionPreDecodeBlockData(region, tmt.getContext());
fmt_buf.append(", pre-decode block cache");
fmt_buf.fmtAppend(
" {{ schema_version: ?, data_list size: {}, block row: {} col: {} bytes: {} }}",
block_cache->data_list_read.size(),
block_cache->block.rows(),
block_cache->block.columns(),
block_cache->block.bytes());
GLOBAL_REGION_MAP.insertRegionCache(region_name, {region, std::move(block_cache)});
output(fmt_buf.toString());
}

void MockRaftCommand::dbgFuncRegionSnapshotApplyBlock(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
{
throw Exception("Args not matched, should be: region-id", ErrorCodes::BAD_ARGUMENTS);
}

auto region_id = static_cast<RegionID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args.front()).value));
auto [region, block_cache] = GLOBAL_REGION_MAP.popRegionCache("__snap_" + std::to_string(region_id));
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithBlock>(
{region, std::move(block_cache)},
tmt);

output(fmt::format("success apply {} with block cache", region->id()));
}


/// Mock to pre-decode snapshot to DTFile(s) then apply

// Simulate a region pre-handle snapshot data to DTFiles
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgKVStore/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer

TMTContext & tmt = context.getTMTContext();
RegionPtr region = RegionBench::createRegion(table_info, region_id, start_keys, end_keys);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithSnapshotFiles>(region, nullptr, 0, tmt);

output(fmt::format(
"put region #{}, range{} to table #{} with kvstore.onSnapshot",
Expand All @@ -96,7 +96,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer

TMTContext & tmt = context.getTMTContext();
RegionPtr region = RegionBench::createRegion(table_id, region_id, start, end);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithSnapshotFiles>(region, nullptr, 0, tmt);

output(fmt::format(
"put region #{}, range[{}, {}) to table #{} with kvstore.onSnapshot",
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgNaturalDag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void NaturalDag::buildTables(Context & context)
auto raft_index = RAFT_INIT_LOG_INDEX;
region_meta.setApplied(raft_index, RAFT_INIT_LOG_TERM);
RegionPtr region_ptr = RegionBench::makeRegion(std::move(region_meta));
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region_ptr, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithSnapshotFiles>(region_ptr, nullptr, 0, tmt);

auto & pairs = region.pairs;
for (auto & pair : pairs)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void concurrentBatchInsert(
Regions regions
= createRegions(table_info.id, concurrent_num, key_num_each_region, handle_begin, curr_max_region_id + 1);
for (const RegionPtr & region : regions)
debug_kvstore.onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);
debug_kvstore.onSnapshot<RegionPtrWithSnapshotFiles>(region, nullptr, 0, tmt);

std::list<std::thread> threads;
for (Int64 i = 0; i < concurrent_num; i++, handle_begin += key_num_each_region)
Expand Down
26 changes: 8 additions & 18 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static void inline writeCommittedBlockDataIntoStorage(
template <typename ReadList>
static inline bool atomicReadWrite(
AtomicReadWriteCtx & rw_ctx,
const RegionPtrWithBlock & region,
const RegionPtr & region,
ReadList & data_list_read,
bool force_decode)
{
Expand Down Expand Up @@ -147,9 +147,7 @@ static inline bool atomicReadWrite(
should_handle_version_col = false;
}

// Currently, RegionPtrWithBlock with a not-null CachePtr is only used in debug functions
// to apply a pre-decoded snapshot. So it will not take place here.
// In short, we always decode here because there is no pre-decode cache.
// Decode `data_list_read` according to the schema snapshot into `Block`
{
LOG_TRACE(
rw_ctx.log,
Expand All @@ -169,6 +167,7 @@ static inline bool atomicReadWrite(
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_decode)
.Observe(rw_ctx.region_decode_cost / 1000.0);
}

if constexpr (std::is_same_v<ReadList, RegionDataReadInfoList>)
{
RUNTIME_CHECK(block_ptr != nullptr);
Expand Down Expand Up @@ -198,12 +197,12 @@ static inline bool atomicReadWrite(

template DM::WriteResult writeRegionDataToStorage<RegionUncommittedDataList>(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
RegionUncommittedDataList & data_list_read,
const LoggerPtr & log);
template DM::WriteResult writeRegionDataToStorage<RegionDataReadInfoList>(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
RegionDataReadInfoList & data_list_read,
const LoggerPtr & log);

Expand All @@ -212,7 +211,7 @@ template DM::WriteResult writeRegionDataToStorage<RegionDataReadInfoList>(
template <typename ReadList>
DM::WriteResult writeRegionDataToStorage(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
ReadList & data_list_read,
const LoggerPtr & log)
{
Expand Down Expand Up @@ -431,21 +430,12 @@ static inline void reportUpstreamLatency(const RegionDataReadInfoList & data_lis

DM::WriteResult RegionTable::writeCommittedByRegion(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
RegionDataReadInfoList & data_list_to_remove,
const LoggerPtr & log,
bool lock_region)
{
std::optional<RegionDataReadInfoList> maybe_data_list_read = std::nullopt;
if (region.pre_decode_cache)
{
// If schema version changed, use the kv data to rebuild block cache
maybe_data_list_read = std::move(region.pre_decode_cache->data_list_read);
}
else
{
maybe_data_list_read = ReadRegionCommitCache(region, lock_region);
}
std::optional<RegionDataReadInfoList> maybe_data_list_read = ReadRegionCommitCache(region, lock_region);

if (!maybe_data_list_read.has_value())
return std::nullopt;
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class Region;
using RegionPtr = std::shared_ptr<Region>;
class StorageDeltaMerge;
class TMTContext;
struct RegionPtrWithBlock;

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region);
void RemoveRegionCommitCache(
Expand All @@ -48,7 +47,7 @@ Block GenRegionBlockDataWithSchema(
template <typename ReadList>
DM::WriteResult writeRegionDataToStorage(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
ReadList & data_list_read,
const LoggerPtr & log);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Decode/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
}
}

RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtrWithBlock & region)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & region)
{
const RegionID region_id = region->id();

Expand Down
Loading