Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105) (apache#7809
Browse files Browse the repository at this point in the history
)

* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241105)

* Fix Build due to ClickHouse/ClickHouse#71261

* Fix Build due to ClickHouse/ClickHouse#68682

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent 40ec2cc commit 34a88dd
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 18 deletions.
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241101
CH_COMMIT=7cd7bb8ece2
CH_BRANCH=rebase_ch/20241105
CH_COMMIT=500e1e35c0b
21 changes: 18 additions & 3 deletions cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "CrossRelParser.h"
#include <optional>

#include <Core/Settings.h>
#include <Interpreters/CollectJoinOnKeysVisitor.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
Expand All @@ -37,6 +38,10 @@

namespace DB
{
namespace Setting
{
extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
Expand Down Expand Up @@ -194,8 +199,15 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const substrait::CrossRel & join, DB:
else
{
JoinPtr hash_join = std::make_shared<HashJoin>(table_join, right->getCurrentHeader().cloneEmpty());
QueryPlanStepPtr join_step
= std::make_unique<DB::JoinStep>(left->getCurrentHeader(), right->getCurrentHeader(), hash_join, 8192, 1, false);
QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
left->getCurrentHeader(),
right->getCurrentHeader(),
hash_join,
context->getSettingsRef()[Setting::max_block_size],
1,
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);
join_step->setStepDescription("CROSS_JOIN");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
Expand Down Expand Up @@ -243,7 +255,10 @@ void CrossRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left
for (const auto & col : left.getCurrentHeader().getNames())
left_columns_set.emplace(col);
table_join.setColumnsFromJoinedTable(
right.getCurrentHeader().getNamesAndTypesList(), left_columns_set, getUniqueName("right") + ".");
right.getCurrentHeader().getNamesAndTypesList(),
left_columns_set,
getUniqueName("right") + ".",
left.getCurrentHeader().getNamesAndTypesList());

// fix right table key duplicate
NamesWithAliases right_table_alias;
Expand Down
40 changes: 33 additions & 7 deletions cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace DB
namespace Setting
{
extern const SettingsJoinAlgorithm join_algorithm;
extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -313,8 +314,15 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q

JoinPtr smj_join = std::make_shared<FullSortingMergeJoin>(table_join, right->getCurrentHeader().cloneEmpty(), -1);
MultiEnum<DB::JoinAlgorithm> join_algorithm = context->getSettingsRef()[Setting::join_algorithm];
QueryPlanStepPtr join_step
= std::make_unique<DB::JoinStep>(left->getCurrentHeader(), right->getCurrentHeader(), smj_join, 8192, 1, false);
QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
left->getCurrentHeader(),
right->getCurrentHeader(),
smj_join,
context->getSettingsRef()[Setting::max_block_size],
1,
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);

join_step->setStepDescription("SORT_MERGE_JOIN");
steps.emplace_back(join_step.get());
Expand Down Expand Up @@ -382,7 +390,11 @@ void JoinRelParser::addConvertStep(TableJoin & table_join, DB::QueryPlan & left,
NameSet left_columns_set;
for (const auto & col : left.getCurrentHeader().getNames())
left_columns_set.emplace(col);
table_join.setColumnsFromJoinedTable(right.getCurrentHeader().getNamesAndTypesList(), left_columns_set, getUniqueName("right") + ".");
table_join.setColumnsFromJoinedTable(
right.getCurrentHeader().getNamesAndTypesList(),
left_columns_set,
getUniqueName("right") + ".",
left.getCurrentHeader().getNamesAndTypesList());

// fix right table key duplicate
NamesWithAliases right_table_alias;
Expand Down Expand Up @@ -772,8 +784,15 @@ DB::QueryPlanPtr JoinRelParser::buildMultiOnClauseHashJoin(
LOG_INFO(getLogger("JoinRelParser"), "multi join on clauses:\n{}", DB::TableJoin::formatClauses(table_join->getClauses()));

JoinPtr hash_join = std::make_shared<HashJoin>(table_join, right_plan->getCurrentHeader());
QueryPlanStepPtr join_step
= std::make_unique<DB::JoinStep>(left_plan->getCurrentHeader(), right_plan->getCurrentHeader(), hash_join, 8192, 1, false);
QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
left_plan->getCurrentHeader(),
right_plan->getCurrentHeader(),
hash_join,
context->getSettingsRef()[Setting::max_block_size],
1,
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);
join_step->setStepDescription("Multi join on clause hash join");
steps.emplace_back(join_step.get());
std::vector<QueryPlanPtr> plans;
Expand Down Expand Up @@ -806,8 +825,15 @@ DB::QueryPlanPtr JoinRelParser::buildSingleOnClauseHashJoin(
{
hash_join = std::make_shared<HashJoin>(table_join, right_plan->getCurrentHeader().cloneEmpty());
}
QueryPlanStepPtr join_step
= std::make_unique<DB::JoinStep>(left_plan->getCurrentHeader(), right_plan->getCurrentHeader(), hash_join, 8192, 1, false);
QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
left_plan->getCurrentHeader(),
right_plan->getCurrentHeader(),
hash_join,
context->getSettingsRef()[Setting::max_block_size],
1,
/* required_output_ = */ NameSet{},
false,
/* use_new_analyzer_ = */ false);

join_step->setStepDescription("HASH_JOIN");
steps.emplace_back(join_step.get());
Expand Down
14 changes: 10 additions & 4 deletions cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadSettings.h>
#include <IO/SplittableBzip2ReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3Common.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/SplittableBzip2ReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
Expand Down Expand Up @@ -321,8 +321,11 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder
DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", *file_size}};
auto cache_creator = wrapWithCache(
read_buffer_creator, read_settings, remote_path, *modified_time, *file_size);
size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
if (*file_size > 0)
buffer_size = std::min(*file_size, buffer_size);
auto cache_hdfs_read = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
std::move(cache_creator), stored_objects, read_settings, nullptr, /* use_external_buffer */ false);
std::move(cache_creator), stored_objects, read_settings, nullptr, /* use_external_buffer */ false, buffer_size);
read_buffer = std::move(cache_hdfs_read);
}

Expand Down Expand Up @@ -406,11 +409,14 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder

DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "", object_size}};
auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
std::move(cache_creator), stored_objects, read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
std::move(cache_creator), stored_objects, read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true, 0);

auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
size_t buffer_size = std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
if (object_size > 0)
buffer_size = std::min(object_size, buffer_size);
auto async_reader
= std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), pool_reader, read_settings);
= std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), pool_reader, read_settings, buffer_size);

if (read_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
Expand Down
8 changes: 6 additions & 2 deletions cpp-ch/local-engine/tests/gtest_ch_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ TEST(TestJoin, simple)
for (const auto & column : join->columnsFromJoinedTable())
join->addJoinedColumn(column);

auto columns_from_left_table = left_plan.getCurrentHeader().getNamesAndTypesList();
for (auto & column_from_joined_table : columns_from_left_table)
join->setUsedColumn(column_from_joined_table, JoinTableSide::Left);

auto left_keys = left.getNamesAndTypesList();
join->addJoinedColumnsAndCorrectTypes(left_keys, true);
std::cerr << "after join:\n";
Expand All @@ -122,8 +126,8 @@ TEST(TestJoin, simple)
}
auto hash_join = std::make_shared<HashJoin>(join, right_plan.getCurrentHeader());

QueryPlanStepPtr join_step
= std::make_unique<JoinStep>(left_plan.getCurrentHeader(), right_plan.getCurrentHeader(), hash_join, 8192, 1, false);
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
left_plan.getCurrentHeader(), right_plan.getCurrentHeader(), hash_join, 8192, 1, NameSet{}, false, false);

std::cerr << "join step:" << join_step->getOutputHeader().dumpStructure() << std::endl;

Expand Down

0 comments on commit 34a88dd

Please sign in to comment.