Skip to content

Commit

Permalink
Merge branch 'master' into support_pipeline_table_scan_fullstack
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Apr 20, 2023
2 parents 7833379 + 4ee1412 commit b516e56
Show file tree
Hide file tree
Showing 92 changed files with 1,425 additions and 778 deletions.
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 139 files
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
M(StoreSizeCapacity) \
M(StoreSizeAvailable) \
M(StoreSizeUsed) \
M(StoreSizeUsedRemote) \
M(DT_DeltaMerge) \
M(DT_DeltaCompact) \
M(DT_DeltaFlush) \
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ namespace DB
M(random_pipeline_model_operator_run_failpoint) \
M(random_pipeline_model_cancel_failpoint) \
M(random_spill_to_disk_failpoint) \
M(random_restore_from_disk_failpoint)
M(random_restore_from_disk_failpoint) \
M(random_exception_when_connect_local_tunnel)
namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ TiFlashMetrics::TiFlashMetrics()
= prometheus::BuildGauge().Name(current_metrics_prefix + name).Help("System current metric " + name).Register(*registry);
registered_current_metrics.push_back(&family.Add({}));
}

auto prometheus_name = TiFlashMetrics::current_metrics_prefix + std::string("StoreSizeUsed");
registered_keypace_store_used_family = &prometheus::BuildGauge().Name(prometheus_name).Help("Store size used of keyspace").Register(*registry);
store_used_total_metric = &registered_keypace_store_used_family->Add({{"keyspace_id", ""}, {"type", "all_used"}});
}

} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/ProcessCollector.h>
#include <Common/TiFlashBuildInfo.h>
#include <Common/nocopyable.h>
#include <common/types.h>
#include <prometheus/counter.h>
#include <prometheus/exposer.h>
#include <prometheus/gateway.h>
Expand Down Expand Up @@ -535,6 +536,11 @@ class TiFlashMetrics
std::vector<prometheus::Gauge *> registered_current_metrics;
std::unordered_map<std::string, prometheus::Gauge *> registered_async_metrics;

prometheus::Family<prometheus::Gauge> * registered_keypace_store_used_family;
using KeyspaceID = UInt32;
std::unordered_map<KeyspaceID, prometheus::Gauge *> registered_keypace_store_used_metrics;
prometheus::Gauge * store_used_total_metric;

public:
#define MAKE_METRIC_MEMBER_M(family_name, help, type, ...) \
MetricFamily<prometheus::type> family_name = MetricFamily<prometheus::type>(*registry, #family_name, #help, {__VA_ARGS__});
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ void HashJoinBuildBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
static const std::unordered_map<ASTTableJoin::Kind, String> join_type_map{
{ASTTableJoin::Kind::Inner, "Inner"},
{ASTTableJoin::Kind::Left, "Left"},
{ASTTableJoin::Kind::Right, "Right"},
{ASTTableJoin::Kind::LeftOuter, "Left"},
{ASTTableJoin::Kind::RightOuter, "Right"},
{ASTTableJoin::Kind::Full, "Full"},
{ASTTableJoin::Kind::Cross, "Cross"},
{ASTTableJoin::Kind::Comma, "Comma"},
{ASTTableJoin::Kind::Anti, "Anti"},
{ASTTableJoin::Kind::LeftSemi, "Left_Semi"},
{ASTTableJoin::Kind::LeftAnti, "Left_Anti"},
{ASTTableJoin::Kind::Cross_Left, "Cross_Left"},
{ASTTableJoin::Kind::Cross_Right, "Cross_Right"},
{ASTTableJoin::Kind::LeftOuterSemi, "Left_Semi"},
{ASTTableJoin::Kind::LeftOuterAnti, "Left_Anti"},
{ASTTableJoin::Kind::Cross_LeftOuter, "Cross_Left"},
{ASTTableJoin::Kind::Cross_RightOuter, "Cross_Right"},
{ASTTableJoin::Kind::Cross_Anti, "Cross_Anti"},
{ASTTableJoin::Kind::Cross_LeftSemi, "Cross_LeftSemi"},
{ASTTableJoin::Kind::Cross_LeftAnti, "Cross_LeftAnti"},
{ASTTableJoin::Kind::Cross_LeftOuterSemi, "Cross_LeftSemi"},
{ASTTableJoin::Kind::Cross_LeftOuterAnti, "Cross_LeftAnti"},
{ASTTableJoin::Kind::NullAware_Anti, "NullAware_Anti"},
{ASTTableJoin::Kind::NullAware_LeftSemi, "NullAware_LeftSemi"},
{ASTTableJoin::Kind::NullAware_LeftAnti, "NullAware_LeftAnti"},
{ASTTableJoin::Kind::NullAware_LeftOuterSemi, "NullAware_LeftSemi"},
{ASTTableJoin::Kind::NullAware_LeftOuterAnti, "NullAware_LeftAnti"},
{ASTTableJoin::Kind::RightSemi, "RightSemi"},
{ASTTableJoin::Kind::RightAnti, "RightAnti"},
};
Expand Down
24 changes: 12 additions & 12 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace DB
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const JoinPtr & join_,
size_t non_joined_stream_index,
size_t scan_hash_map_after_probe_stream_index,
const String & req_id,
UInt64 max_block_size_)
: log(Logger::get(req_id))
Expand All @@ -34,7 +34,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
RUNTIME_CHECK_MSG(original_join != nullptr, "join ptr should not be null.");
RUNTIME_CHECK_MSG(original_join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0");

probe_exec.set(HashJoinProbeExec::build(original_join, input, non_joined_stream_index, max_block_size_));
probe_exec.set(HashJoinProbeExec::build(original_join, input, scan_hash_map_after_probe_stream_index, max_block_size_));
probe_exec->setCancellationHook([&]() { return isCancelledOrThrowIfKilled(); });

ProbeProcessInfo header_probe_process_info(0);
Expand All @@ -44,7 +44,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(

void HashJoinProbeBlockInputStream::readSuffixImpl()
{
LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, non joined rows {}", joined_rows + non_joined_rows, joined_rows, non_joined_rows);
LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, scan hash map rows {}", joined_rows + scan_hash_map_rows, joined_rows, scan_hash_map_rows);
}

Block HashJoinProbeBlockInputStream::getHeader() const
Expand Down Expand Up @@ -75,9 +75,9 @@ void HashJoinProbeBlockInputStream::onCurrentProbeDone()
switchStatus(probe_exec->onProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::WAIT_PROBE_FINISH);
}

void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone()
void HashJoinProbeBlockInputStream::onCurrentScanHashMapDone()
{
switchStatus(probe_exec->onNonJoinedFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN);
switchStatus(probe_exec->onScanHashMapAfterProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN);
}

void HashJoinProbeBlockInputStream::tryGetRestoreJoin()
Expand All @@ -96,10 +96,10 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin()
void HashJoinProbeBlockInputStream::onAllProbeDone()
{
auto & cur_probe_exec = *probe_exec;
if (cur_probe_exec.needOutputNonJoinedData())
if (cur_probe_exec.needScanHashMap())
{
cur_probe_exec.onNonJoinedStart();
switchStatus(ProbeStatus::READ_NON_JOINED_DATA);
cur_probe_exec.onScanHashMapAfterProbeStart();
switchStatus(ProbeStatus::READ_SCAN_HASH_MAP_DATA);
}
else
{
Expand Down Expand Up @@ -147,13 +147,13 @@ Block HashJoinProbeBlockInputStream::getOutputBlock()
onAllProbeDone();
break;
}
case ProbeStatus::READ_NON_JOINED_DATA:
case ProbeStatus::READ_SCAN_HASH_MAP_DATA:
{
auto block = probe_exec->fetchNonJoined();
non_joined_rows += block.rows();
auto block = probe_exec->fetchScanHashMapData();
scan_hash_map_rows += block.rows();
if (!block)
{
onCurrentReadNonJoinedDataDone();
onCurrentScanHashMapDone();
break;
}
return block;
Expand Down
72 changes: 36 additions & 36 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const JoinPtr & join_,
size_t non_joined_stream_index,
size_t scan_hash_map_after_probe_stream_index,
const String & req_id,
UInt64 max_block_size_);

Expand All @@ -60,47 +60,47 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
* |
* ▼
* -----------------
* has non_joined data | | no non_joined data
* has scan_after_probe data | | no scan_after_probe data
* ▼ ▼
* WAIT_PROBE_FINISH FINISHED
* |
* ▼
* READ_NON_JOINED_DATA
* READ_SCAN_HASH_MAP_DATA
* |
* ▼
* FINISHED
*
* spill enabled:
* |-------------------> WAIT_BUILD_FINISH
* | |
* | ▼
* | PROBE
* | |
* | ▼
* | WAIT_PROBE_FINISH
* | |
* | ▼
* | ---------------
* | has non_joined data | | no non_joined data
* | ▼ |
* | READ_NON_JOINED_DATA |
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | ▼
* | GET_RESTORE_JOIN
* | |
* | ▼
* | ---------------
* | has restored join | | no restored join
* | ▼ ▼
* | RESTORE_BUILD FINISHED
* | |
* -----------------------|
* |-------------------> WAIT_BUILD_FINISH
* | |
* |
* | PROBE
* | |
* |
* | WAIT_PROBE_FINISH
* | |
* |
* | ---------------
* |has scan_hash_map data | | no scan_hash_map data
* | ▼ |
* | READ_SCAN_HASH_MAP_DATA |
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* |
* | GET_RESTORE_JOIN
* | |
* |
* | ---------------
* | has restored join | | no restored join
* | ▼ ▼
* | RESTORE_BUILD FINISHED
* | |
* ------------------------|
*
*/
enum class ProbeStatus
Expand All @@ -110,7 +110,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
WAIT_PROBE_FINISH, /// wait probe finish
GET_RESTORE_JOIN, /// try to get restore join
RESTORE_BUILD, /// build for restore join
READ_NON_JOINED_DATA, /// output non joined data
READ_SCAN_HASH_MAP_DATA, /// output scan hash map after probe data
FINISHED, /// the final state
};

Expand All @@ -119,7 +119,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
std::tuple<size_t, Block> getOneProbeBlock();
void onCurrentProbeDone();
void onAllProbeDone();
void onCurrentReadNonJoinedDataDone();
void onCurrentScanHashMapDone();
void tryGetRestoreJoin();

private:
Expand All @@ -132,7 +132,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
HashJoinProbeExecHolder probe_exec;
ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH};
size_t joined_rows = 0;
size_t non_joined_rows = 0;
size_t scan_hash_map_rows = 0;

Block header;
};
Expand Down
Loading

0 comments on commit b516e56

Please sign in to comment.