Skip to content

Commit

Permalink
Interpreter: Print BlockInputStream (#4911)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored May 25, 2022
1 parent 0f56b15 commit 35c8ea1
Show file tree
Hide file tree
Showing 25 changed files with 327 additions and 225 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/FilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ FilterBlockInputStream::FilterBlockInputStream(

Block FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
expression->executeOnTotals(totals);
Expand Down
23 changes: 23 additions & 0 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.


#include <Common/FmtUtils.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
namespace DB
{
Expand All @@ -25,4 +26,26 @@ Block HashJoinBuildBlockInputStream::readImpl()
return block;
}

void HashJoinBuildBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
static const std::unordered_map<ASTTableJoin::Kind, String> join_type_map{
{ASTTableJoin::Kind::Inner, "Inner"},
{ASTTableJoin::Kind::Left, "Left"},
{ASTTableJoin::Kind::Right, "Right"},
{ASTTableJoin::Kind::Full, "Full"},
{ASTTableJoin::Kind::Cross, "Cross"},
{ASTTableJoin::Kind::Comma, "Comma"},
{ASTTableJoin::Kind::Anti, "Anti"},
{ASTTableJoin::Kind::LeftSemi, "Left_Semi"},
{ASTTableJoin::Kind::LeftAnti, "Left_Anti"},
{ASTTableJoin::Kind::Cross_Left, "Cross_Left"},
{ASTTableJoin::Kind::Cross_Right, "Cross_Right"},
{ASTTableJoin::Kind::Cross_Anti, "Cross_Anti"},
{ASTTableJoin::Kind::Cross_LeftSemi, "Cross_LeftSemi"},
{ASTTableJoin::Kind::Cross_LeftAnti, "Cross_LeftAnti"}};
auto join_type_it = join_type_map.find(join->getKind());
if (join_type_it == join_type_map.end())
throw TiFlashException("Unknown join type", Errors::Coprocessor::Internal);
buffer.fmtAppend(", join_kind = {}", join_type_it->second);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HashJoinBuildBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
JoinPtr join;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/IBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
return res + 1;
}


void IBlockInputStream::dumpTree(FmtBuffer & buffer, size_t indent, size_t multiplier)
{
// todo append getHeader().dumpStructure()
buffer.fmtAppend(
"{}{}{}\n",
"{}{}{}",
String(indent, ' '),
getName(),
multiplier > 1 ? fmt::format(" x {}", multiplier) : "");
if (!extra_info.empty())
buffer.fmtAppend(": <{}>", extra_info);
appendInfo(buffer);
buffer.append("\n");
++indent;

/// If the subtree is repeated several times, then we output it once with the multiplier.
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class IBlockInputStream : private boost::noncopyable
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

void setExtraInfo(String info) { extra_info = info; }

template <typename F>
void forEachChild(F && f)
Expand Down Expand Up @@ -176,6 +177,8 @@ class IBlockInputStream : private boost::noncopyable
}
}

virtual void appendInfo(FmtBuffer & /*buffer*/) const {};

protected:
BlockInputStreams children;
mutable std::shared_mutex children_mutex;
Expand All @@ -188,6 +191,9 @@ class IBlockInputStream : private boost::noncopyable
mutable std::mutex tree_id_mutex;
mutable String tree_id;

/// The info that hints why the inputStream is needed to run.
String extra_info;

/// Get text with names of this source and the entire subtree, this function should only be called after the
/// InputStream tree is constructed.
String getTreeID() const;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ Block LimitBlockInputStream::readImpl()
return res;
}

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
size_t limit;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,5 +287,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
return blocks[0].cloneWithColumns(std::move(merged_columns));
}

void MergeSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
SortDescription description;
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.

#include <Common/ClickHouseRevision.h>
#include <Common/FmtUtils.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
Expand Down Expand Up @@ -275,4 +278,9 @@ void ParallelAggregatingBlockInputStream::execute()
no_more_keys);
}

void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", max_threads: {}, final: {}", max_threads, final ? "true" : "false");
}

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream

Block getHeader() const override;

virtual void collectNewThreadCountOfThisLevel(int & cnt) override
void collectNewThreadCountOfThisLevel(int & cnt) override
{
cnt += processor.getMaxThreads();
}
Expand All @@ -62,6 +62,8 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
}

Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;


private:
const LoggerPtr log;
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/DataStreams/PartialSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/sortBlock.h>

#include <Common/FmtUtils.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <Interpreters/sortBlock.h>


namespace DB
{


Block PartialSortingBlockInputStream::readImpl()
{
Block res = children.back()->read();
sortBlock(res, description, limit);
return res;
}


void PartialSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(": limit = {}", limit);
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/DataStreams/PartialSortingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PartialSortingBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
void appendInfo(FmtBuffer & buffer) const override;

private:
SortDescription description;
Expand Down
28 changes: 21 additions & 7 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/FmtUtils.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
Expand Down Expand Up @@ -60,11 +61,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (auto & execution_summary : resp.execution_summaries())
for (const auto & execution_summary : resp.execution_summaries())
{
if (execution_summary.has_executor_id())
{
auto & executor_id = execution_summary.executor_id();
const auto & executor_id = execution_summary.executor_id();
execution_summaries[index][executor_id].time_processed_ns = execution_summary.time_processed_ns();
execution_summaries[index][executor_id].num_produced_rows = execution_summary.num_produced_rows();
execution_summaries[index][executor_id].num_iterations = execution_summary.num_iterations();
Expand All @@ -84,11 +85,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
return;
}
auto & execution_summaries_map = execution_summaries[index];
for (auto & execution_summary : resp.execution_summaries())
for (const auto & execution_summary : resp.execution_summaries())
{
if (execution_summary.has_executor_id())
{
auto & executor_id = execution_summary.executor_id();
const auto & executor_id = execution_summary.executor_id();
if (unlikely(execution_summaries_map.find(executor_id) == execution_summaries_map.end()))
{
LOG_FMT_WARNING(log, "execution {} not found in execution_summaries, this should not happen", executor_id);
Expand Down Expand Up @@ -224,12 +225,12 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
bool isStreamingCall() const { return is_streaming_reader; }
const std::vector<ConnectionProfileInfo> & getConnectionProfileInfos() const { return connection_profile_infos; }

virtual void collectNewThreadCountOfThisLevel(int & cnt) override
void collectNewThreadCountOfThisLevel(int & cnt) override
{
remote_reader->collectNewThreadCount(cnt);
}

virtual void resetNewThreadCountCompute() override
void resetNewThreadCountCompute() override
{
if (collected)
{
Expand All @@ -239,11 +240,24 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

protected:
virtual void readSuffixImpl() override
void readSuffixImpl() override
{
LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows);
remote_reader->close();
}

void appendInfo(FmtBuffer & buffer) const override
{
buffer.append(": schema: {");
buffer.joinStr(
sample_block.begin(),
sample_block.end(),
[](const auto & arg, FmtBuffer & fb) {
fb.fmtAppend("<{}, {}>", arg.name, arg.type->getName());
},
", ");
buffer.append("}");
}
};

using ExchangeReceiverInputStream = TiRemoteBlockInputStream<ExchangeReceiver>;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class DAGContext
explicit DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, initialize_concurrency(concurrency)
, is_mpp_task(false)
, is_mpp_task(true)
, is_root_mpp_task(false)
, tunnel_set(nullptr)
, log(Logger::get(log_identifier))
Expand Down
Loading

0 comments on commit 35c8ea1

Please sign in to comment.