Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: support disaggregated mode without S3 #7480

Merged
merged 17 commits into from
May 18, 2023
6 changes: 4 additions & 2 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,19 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
}


SourceOps MockStorage::getSourceOpsFromDeltaMerge(
void MockStorage::buildExecFromDeltaMerge(
PipelineExecutorStatus & exec_status_,
PipelineExecGroupBuilder & group_builder,
Context & context,
Int64 table_id,
size_t concurrency,
bool keep_order)
{
auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order);
// Currently don't support test for late materialization
return storage->readSourceOps(
storage->read(
exec_status_,
group_builder,
column_names,
query_info,
context,
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Operators/Operator.h>
#include <Storages/Transaction/TiDB.h>
#include <common/types.h>
Expand Down Expand Up @@ -101,8 +103,9 @@ class MockStorage
const FilterConditions * filter_conditions = nullptr,
bool keep_order = false);

SourceOps getSourceOpsFromDeltaMerge(
void buildExecFromDeltaMerge(
PipelineExecutorStatus & exec_status_,
PipelineExecGroupBuilder & group_builder,
Context & context,
Int64 table_id,
size_t concurrency = 1,
Expand Down
131 changes: 80 additions & 51 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,41 +278,32 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
executeImpl(pipeline);
}

SourceOps DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status)
void DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder)
{
prepare(); // learner read

return executeImpl(exec_status);
return executeImpl(exec_status, group_builder);
}

SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status)
void DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder)
{
auto & dag_context = dagContext();

auto scan_context = std::make_shared<DM::ScanContext>();
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

SourceOps source_ops;
if (!mvcc_query_info->regions_query_info.empty())
{
source_ops = buildLocalSourceOps(exec_status, context.getSettingsRef().max_block_size);
buildLocalExec(exec_status, group_builder, context.getSettingsRef().max_block_size);
}

// Should build `remote_requests` and `nullSourceOp` under protect of `table_structure_lock`.
if (source_ops.empty())
{
source_ops.emplace_back(std::make_unique<NullSourceOp>(
exec_status,
storage_for_logical_table->getSampleBlockForColumns(required_columns),
log->identifier()));
}

// Note that `buildRemoteRequests` must be called after `buildLocalSourceOps` because
// `buildLocalSourceOps` will setup `region_retry_from_local_region` and we must
// Note that `buildRemoteRequests` must be called after `buildLocalExec` because
// `buildLocalExec` will setup `region_retry_from_local_region` and we must
// retry those regions or there will be data lost.
auto remote_requests = buildRemoteRequests(scan_context);

if (dag_context.is_disaggregated_task && !remote_requests.empty())
{
// This means RN is sending requests with stale region info, we simply reject the request
Expand All @@ -328,32 +319,34 @@ SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_statu
RegionException::RegionReadStatus::EPOCH_NOT_MATCH);
}

// A failpoint to test pause before alter lock released
FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);

// Release alter locks
// The DeltaTree engine ensures that once sourceOps are created, the caller can get a consistent result
// from those sourceOps even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
const TableLockHolders drop_locks = releaseAlterLocks();

remote_read_sources_start_index = source_ops.size();
size_t remote_read_start_index = group_builder.concurrency();

if (!remote_requests.empty())
buildRemoteSourceOps(source_ops, exec_status, remote_requests);
buildRemoteExec(exec_status, group_builder, remote_requests);

if (group_builder.empty())
{
group_builder.addConcurrency(std::make_unique<NullSourceOp>(exec_status, storage_for_logical_table->getSampleBlockForColumns(required_columns), log->identifier()));
// reset remote_read_start_index for null_source_if_empty.
remote_read_start_index = 1;
}

for (const auto & lock : drop_locks)
dagContext().addTableLock(lock);

FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

return source_ops;
}

void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder)
{
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log);
executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_start_index, generated_column_infos, log);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = group_builder.getCurrentHeader();
Expand All @@ -362,15 +355,15 @@ void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status,
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local source, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_sources_start_index == 0)
if (remote_read_start_index == 0)
return;
/// handle timezone/duration cast for local table scan.
executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index);
executeCastAfterTableScan(exec_status, group_builder, remote_read_start_index);

/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
{
::DB::executePushedDownFilter(exec_status, group_builder, remote_read_sources_start_index, filter_conditions, *analyzer, log);
::DB::executePushedDownFilter(exec_status, group_builder, remote_read_start_index, filter_conditions, *analyzer, log);
/// TODO: record profile
}
}
Expand Down Expand Up @@ -508,16 +501,16 @@ void DAGStorageInterpreter::prepare()
void DAGStorageInterpreter::executeCastAfterTableScan(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
size_t remote_read_sources_start_index)
size_t remote_read_start_index)
{
// execute timezone cast or duration cast if needed for local table scan
auto [has_cast, extra_cast] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan);
if (has_cast)
{
RUNTIME_CHECK(remote_read_sources_start_index <= group_builder.group.size());
RUNTIME_CHECK(remote_read_start_index <= group_builder.group.size());
size_t i = 0;
// local sources
while (i < remote_read_sources_start_index)
while (i < remote_read_start_index)
{
auto & group = group_builder.group[i++];
group.appendTransformOp(std::make_unique<ExpressionTransformOp>(exec_status, log->identifier(), extra_cast));
Expand Down Expand Up @@ -618,9 +611,9 @@ void DAGStorageInterpreter::buildRemoteStreams(const std::vector<RemoteRequest>
}
}

void DAGStorageInterpreter::buildRemoteSourceOps(
SourceOps & source_ops,
void DAGStorageInterpreter::buildRemoteExec(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
const std::vector<RemoteRequest> & remote_requests)
{
std::vector<pingcap::coprocessor::CopTask> all_tasks = buildCopTasks(remote_requests);
Expand All @@ -644,7 +637,7 @@ void DAGStorageInterpreter::buildRemoteSourceOps(
auto coprocessor_reader = std::make_shared<CoprocessorReader>(schema, cluster, tasks, has_enforce_encode_type, 1, tiflash_label_filter);
context.getDAGContext()->addCoprocessorReader(coprocessor_reader);

source_ops.emplace_back(std::make_unique<CoprocessorReaderSourceOp>(exec_status, log->identifier(), coprocessor_reader));
group_builder.addConcurrency(std::make_unique<CoprocessorReaderSourceOp>(exec_status, log->identifier(), coprocessor_reader));
task_start = task_end;
}

Expand Down Expand Up @@ -935,15 +928,18 @@ DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
return table_snap;
}

SourceOps DAGStorageInterpreter::buildLocalSourceOpsForPhysicalTable(
DM::Remote::DisaggPhysicalTableReadSnapshotPtr
DAGStorageInterpreter::buildLocalExecForPhysicalTable(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
const TableID & table_id,
const SelectQueryInfo & query_info,
size_t max_block_size)
{
DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
return {};
return table_snap;

RUNTIME_CHECK(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end());
auto & storage = storages_with_structure_lock[table_id].storage;
Expand All @@ -953,28 +949,46 @@ SourceOps DAGStorageInterpreter::buildLocalSourceOpsForPhysicalTable(
{
try
{
/// TODO: consider disaggregated task
auto source_ops = storage->readSourceOps(
exec_status,
required_columns,
query_info,
context,
max_block_size,
max_streams);
if (!dag_context.is_disaggregated_task)
{
storage->read(
exec_status,
group_builder,
required_columns,
query_info,
context,
max_block_size,
max_streams);
}
else
{
// build a snapshot on write node
StorageDeltaMergePtr delta_merge_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
RUNTIME_CHECK_MSG(delta_merge_storage != nullptr, "delta_merge_storage which cast from storage is null");
table_snap = delta_merge_storage->writeNodeBuildRemoteReadSnapshot(required_columns, query_info, context, max_streams);
// TODO: could be shared on the logical table level
table_snap->output_field_types = std::make_shared<std::vector<tipb::FieldType>>();
*table_snap->output_field_types = collectOutputFieldTypes(*dag_context.dag_request);
RUNTIME_CHECK(table_snap->output_field_types->size() == table_snap->column_defines->size(),
table_snap->output_field_types->size(),
table_snap->column_defines->size());
}

injectFailPointForLocalRead(query_info);
// After getting sourceOps from storage, we need to validate whether Regions have changed or not after learner read.
// (by calling `validateQueryInfo`). In case the key ranges of Regions have changed (Region merge/split), those `sourceOps`
// may contain different data other than expected.
validateQueryInfo(*query_info.mvcc_query_info, learner_read_snapshot, tmt, log);
return source_ops;
break;
}
catch (RegionException & e)
{
query_info.mvcc_query_info->scan_context->total_local_region_num -= e.unavailable_region.size();
/// Recover from region exception for batchCop/MPP
if (dag_context.isBatchCop() || dag_context.isMPPTask())
{
// clean all operator from local because we are not sure the correctness of those operators
group_builder.reset();
if (likely(checkRetriableForBatchCopOrMPP(table_id, query_info, e, num_allow_retry)))
continue;
else
Expand All @@ -994,7 +1008,7 @@ SourceOps DAGStorageInterpreter::buildLocalSourceOpsForPhysicalTable(
throw;
}
}
return {};
return table_snap;
}

void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size)
Expand Down Expand Up @@ -1054,34 +1068,49 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
}
}

SourceOps DAGStorageInterpreter::buildLocalSourceOps(
void DAGStorageInterpreter::buildLocalExec(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
size_t max_block_size)
{
const DAGContext & dag_context = *context.getDAGContext();
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return {};
return;
const auto table_query_infos = generateSelectQueryInfos();

auto disaggregated_snap = std::make_shared<DM::Remote::DisaggReadSnapshot>();
// TODO Improve the performance of partition table in extreme case.
// ref https://github.com/pingcap/tiflash/issues/4474
SourceOps source_ops;
for (const auto & table_query_info : table_query_infos)
{
PipelineExecGroupBuilder builder;
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
auto table_snap = buildLocalExecForPhysicalTable(exec_status, builder, table_id, query_info, max_block_size);
if (table_snap)
{
disaggregated_snap->addTask(table_id, std::move(table_snap));
}

auto table_source_ops = buildLocalSourceOpsForPhysicalTable(exec_status, table_id, query_info, max_block_size);
source_ops.insert(source_ops.end(), std::make_move_iterator(table_source_ops.begin()), std::make_move_iterator(table_source_ops.end()));
group_builder.merge(std::move(builder));
}

LOG_DEBUG(
log,
"local sourceOps built, is_disaggregated_task={}",
dag_context.is_disaggregated_task);

return source_ops;
if (dag_context.is_disaggregated_task)
{
// register the snapshot to manager
auto snaps = context.getSharedContextDisagg()->wn_snapshot_manager;
const auto & snap_id = *dag_context.getDisaggTaskId();
auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout;
auto expired_at = Clock::now() + std::chrono::seconds(timeout_s);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id);
}
}

std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAGStorageInterpreter::getAndLockStorages(Int64 query_schema_version)
Expand Down
Loading