Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: Support pipeline table scan fullstack part 1 #7225

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c1f9554
tmp save
ywqzzy Mar 9, 2023
5cab781
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Mar 10, 2023
373d0d8
can run simple queries
ywqzzy Mar 10, 2023
fd8087f
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Mar 20, 2023
8243ccd
tmp save
ywqzzy Mar 20, 2023
1450a7a
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Apr 4, 2023
2fab85d
tiny refine.
ywqzzy Apr 4, 2023
841cabd
fmt
ywqzzy Apr 4, 2023
b7a469e
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Apr 4, 2023
dc24eef
fix remote
ywqzzy Apr 4, 2023
c09481f
support generated column
ywqzzy Apr 6, 2023
7c87eaa
disable pipeline
ywqzzy Apr 6, 2023
1845fc1
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Apr 7, 2023
e6314d5
remote read aysnc
ywqzzy Apr 7, 2023
11c520a
client change
ywqzzy Apr 7, 2023
067ceaa
fix cop read
ywqzzy Apr 12, 2023
e33ae47
address comments
ywqzzy Apr 12, 2023
3b98ab0
change client-c
ywqzzy Apr 12, 2023
84f9ad3
fix mpp statistics
ywqzzy Apr 13, 2023
388fa9b
address comments
ywqzzy Apr 13, 2023
e7595c5
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Apr 13, 2023
cb3e3c6
keep order = true
ywqzzy Apr 13, 2023
f075bd7
add test
ywqzzy Apr 13, 2023
e42e849
refine
ywqzzy Apr 14, 2023
7ed8f49
tmp save
ywqzzy Apr 14, 2023
6a4efe4
build sourceOp in compile time
ywqzzy Apr 17, 2023
7c99ab7
remove useless log
ywqzzy Apr 17, 2023
afed2f5
minor refine
ywqzzy Apr 17, 2023
38ccc35
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Apr 17, 2023
a04effe
fix ut
ywqzzy Apr 17, 2023
d473f97
minor refine
ywqzzy Apr 17, 2023
ea9081d
build source in compile time
ywqzzy Apr 17, 2023
09946b5
fmt
ywqzzy Apr 17, 2023
6696c8a
address comments
ywqzzy Apr 18, 2023
5c62263
fix concurrency
ywqzzy Apr 18, 2023
5c0c835
fmt
ywqzzy Apr 18, 2023
26793a5
Merge branch 'master' of https://github.com/pingcap/tiflash into supp…
ywqzzy Apr 18, 2023
091c4d1
keep consistent with master
ywqzzy Apr 18, 2023
7833379
fix build
ywqzzy Apr 18, 2023
b516e56
Merge branch 'master' into support_pipeline_table_scan_fullstack
ywqzzy Apr 20, 2023
76956e0
Merge branch 'master' into support_pipeline_table_scan_fullstack
ti-chi-bot Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/client-c
68 changes: 68 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceHolderTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/GeneratedColumnPlaceHolderTransformAction.h>

namespace DB
{
GeneratedColumnPlaceHolderTransformAction::GeneratedColumnPlaceHolderTransformAction(
const Block & header_,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_)
: generated_column_infos(generated_column_infos_)
{
header = header_;
insertColumns(header, false);
}

Block GeneratedColumnPlaceHolderTransformAction::getHeader() const
{
return header;
}

void GeneratedColumnPlaceHolderTransformAction::checkColumn() const
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

void GeneratedColumnPlaceHolderTransformAction::insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
Comment on lines +50 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
constexpr int col_index_pos = 1;
constexpr int col_name_pos = 2;
constexpr int data_type_pos = 3;
const auto & col_index = std::get<col_index_pos>(ele);
const auto & col_name = std::get<col_name_pos>(ele);
const auto & data_type = std::get<data_type_pos>(ele);

Maybe get tuple's value with meaningful const name is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe get tuple's value with meaningful const name is better

Only use for 1 time. I will keep it

ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

bool GeneratedColumnPlaceHolderTransformAction::transform(Block & block)
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
{
insertColumns(block, true);
return true;
}

} // namespace DB
40 changes: 40 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceHolderTransformAction.h
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Core/Block.h>

namespace DB
{
class GeneratedColumnPlaceHolderTransformAction
{
public:
GeneratedColumnPlaceHolderTransformAction(
const Block & header_,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_);

bool transform(Block & block);

Block getHeader() const;

void checkColumn() const;

private:
void insertColumns(Block & block, bool insert_data) const;

private:
Block header;
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};
} // namespace DB
38 changes: 7 additions & 31 deletions dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/GeneratedColumnPlaceHolderTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
Expand All @@ -32,18 +33,17 @@ class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputSt
const BlockInputStreamPtr & input,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_,
const String & req_id_)
: generated_column_infos(generated_column_infos_)
: action(input->getHeader(), generated_column_infos_)
, log(Logger::get(req_id_))
{
children.push_back(input);
}

String getName() const override { return NAME; }

Block getHeader() const override
{
Block block = children.back()->getHeader();
insertColumns(block, /*insert_data=*/false);
return block;
return action.getHeader();
}

static String getColumnName(UInt64 col_index)
Expand All @@ -54,43 +54,19 @@ class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputSt
protected:
void readPrefix() override
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
}
action.checkColumn();
}

Block readImpl() override
{
Block block = children.back()->read();
insertColumns(block, /*insert_data=*/true);
action.transform(block);
return block;
}

private:
void insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

static constexpr auto NAME = "GeneratedColumnPlaceholder";
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
GeneratedColumnPlaceHolderTransformAction action;
const LoggerPtr log;
};

Expand Down
38 changes: 29 additions & 9 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class CoprocessorReader
, has_enforce_encode_type(has_enforce_encode_type_)
, resp_iter(std::move(tasks), cluster, concurrency, &Poco::Logger::get("pingcap/coprocessor"), tiflash_label_filter_)
, collected(false)
, concurrency_(concurrency)
, concurrency(concurrency)
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
{}

const DAGSchema & getOutputSchema() const { return schema; }
Expand Down Expand Up @@ -115,7 +115,7 @@ class CoprocessorReader
return detail;

detail.packet_bytes = resp->ByteSizeLong();
for (int i = 0; i < chunk_size; i++)
for (int i = 0; i < chunk_size; ++i)
{
Block block;
const tipb::Chunk & chunk = resp->chunks(i);
Expand Down Expand Up @@ -146,17 +146,27 @@ class CoprocessorReader
return detail;
}

// stream_id, decoder_ptr are only meaningful for ExchagneReceiver.
CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header, size_t /*stream_id*/, std::unique_ptr<CHBlockChunkDecodeAndSquash> & /*decoder_ptr*/)
std::pair<pingcap::coprocessor::ResponseIter::Result, bool> nonBlockingNext()
{
RUNTIME_CHECK(opened == true);
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
return resp_iter.nonBlockingNext();
}

CoprocessorReaderResult toResult(std::pair<pingcap::coprocessor::ResponseIter::Result, bool> & result_pair,
std::queue<Block> & block_queue,
const Block & header)
{
auto && [result, has_next] = result_pair;

auto && [result, has_next] = resp_iter.next();
if (!result.error.empty())
return {nullptr, true, result.error.message(), false};

if (!has_next)
return {nullptr, false, "", true};
{
if (result.finished)
return {nullptr, false, "", true};
else
return {nullptr, false, "", false};
}

auto resp = std::make_shared<tipb::SelectResponse>();
if (resp->ParseFromString(result.data()))
Expand All @@ -182,14 +192,24 @@ class CoprocessorReader
}
}

// stream_id, decoder_ptr are only meaningful for ExchagneReceiver.
CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header, size_t /*stream_id*/, std::unique_ptr<CHBlockChunkDecodeAndSquash> & /*decoder_ptr*/)
{
RUNTIME_CHECK(opened == true);

auto && result_pair = resp_iter.next();

return toResult(result_pair, block_queue, header);
}

size_t getSourceNum() const { return 1; }

int getExternalThreadCnt() const { return concurrency_; }
int getExternalThreadCnt() const { return concurrency; }

void close() {}

bool collected = false;
int concurrency_;
int concurrency;
bool opened = false;
};
} // namespace DB
Loading