Skip to content

Commit

Permalink
[CH] Ignore unstabe uts and add more message when failed. (apache#7821)
Browse files Browse the repository at this point in the history
* read data from orc file format - ignore reading except date32

* dumpPlan and dumpMessage

* fix due to comments
  • Loading branch information
baibaichen authored Nov 5, 2024
1 parent 34a88dd commit f498fe7
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1037,12 +1037,13 @@ class GlutenClickHouseFileFormatSuite
)
}

test("read data from orc file format") {
test("read data from orc file format - except date32") {
val filePath = s"$orcDataPath/all_data_types_with_non_primitive_type.snappy.orc"
val orcFileFormat = "orc"
val sql =
s"""
| select *
| select string_field, int_field, long_field, float_field, double_field, short_field,
| byte_field, boolean_field, decimal_field
| from $orcFileFormat.`$filePath`
| where long_field > 30
|""".stripMargin
Expand Down
50 changes: 50 additions & 0 deletions cpp-ch/local-engine/Common/DebugUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,59 @@
#include <Formats/FormatSettings.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <google/protobuf/json/json.h>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/CHUtil.h>
#include <Common/logger_useful.h>

namespace pb_util = google::protobuf::util;

namespace debug
{

void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger)
{
if (!logger)
{
logger = getLogger("SerializedPlanParser");
if (!logger)
return;
}

if (!force && !logger->debug())
return;

auto out = local_engine::PlanUtil::explainPlan(plan);
if (force) // force
LOG_ERROR(logger, "clickhouse plan:\n{}", out);
else
LOG_DEBUG(logger, "clickhouse plan:\n{}", out);
}

void dumpMessage(const google::protobuf::Message & message, const char * type, bool force, LoggerPtr logger)
{
if (!logger)
{
logger = getLogger("SubstraitPlan");
if (!logger)
return;
}

if (!force && !logger->debug())
return;
pb_util::JsonOptions options;
std::string json;
if (auto s = google::protobuf::json::MessageToJsonString(message, &json, options); !s.ok())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type);

if (force) // force
LOG_ERROR(logger, "{}:\n{}", type, json);
else
LOG_DEBUG(logger, "{}:\n{}", type, json);
}

void headBlock(const DB::Block & block, size_t count)
{
std::cout << "============Block============" << std::endl;
Expand Down
12 changes: 12 additions & 0 deletions cpp-ch/local-engine/Common/DebugUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@

#include <Core/Block.h>

namespace google::protobuf
{
class Message;
}
namespace DB
{
class QueryPlan;
}
namespace debug
{

void dumpPlan(DB::QueryPlan & plan, bool force = false, LoggerPtr = nullptr);
void dumpMessage(const google::protobuf::Message & message, const char * type, bool force = false, LoggerPtr = nullptr);

void headBlock(const DB::Block & block, size_t count = 10);
String printBlock(const DB::Block & block, size_t count = 10);

Expand Down
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/Common/GlutenConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <config.pb.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/logger_useful.h>

namespace local_engine
Expand All @@ -45,7 +46,7 @@ std::map<std::string, std::string> SparkConfigs::load(std::string_view plan, boo
auto configMaps = local_engine::BinaryToMessage<gluten::ConfigMap>(plan);

if (!processStart)
logDebugMessage(configMaps, "Update Config Map Plan");
debug::dumpMessage(configMaps, "Update Config Map Plan");

for (const auto & pair : configMaps.configs())
configs.emplace(pair.first, pair.second);
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/BlockTypeUtils.h>

#include <Common/DebugUtils.h>

namespace DB
{
Expand Down Expand Up @@ -77,7 +77,7 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra
else
{
extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_info);
logDebugMessage(extension_table, "extension_table");
debug::dumpMessage(extension_table, "extension_table");
}
MergeTreeRelParser mergeTreeParser(parser_context, getContext());
query_plan = mergeTreeParser.parseReadRel(std::make_unique<DB::QueryPlan>(), read, extension_table);
Expand Down Expand Up @@ -131,7 +131,7 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
else
{
local_files = BinaryToMessage<substrait::ReadRel::LocalFiles>(split_info);
logDebugMessage(local_files, "local_files");
debug::dumpMessage(local_files, "local_files");
}
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
Expand Down
40 changes: 13 additions & 27 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,20 @@
#include <string>
#include <string_view>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnSet.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Field.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/Context.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryPriorities.h>
#include <Join/StorageJoinFromReadBuffer.h>
#include <Operator/BlocksBufferPoolTransform.h>
#include <Parser/ExpressionParser.h>
#include <Parser/FunctionParser.h>
Expand All @@ -73,6 +58,7 @@
#include <google/protobuf/wrappers.pb.h>
#include <Common/BlockTypeUtils.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/Exception.h>
#include <Common/GlutenConfig.h>
#include <Common/JNIUtils.h>
Expand Down Expand Up @@ -121,13 +107,17 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel
{
ActionsDAG actions_dag{blockToNameAndTypeList(query_plan->getCurrentHeader())};
NamesWithAliases aliases;
auto cols = query_plan->getCurrentHeader().getNamesAndTypesList();
const auto cols = query_plan->getCurrentHeader().getNamesAndTypesList();
if (cols.getNames().size() != static_cast<size_t>(root_rel.root().names_size()))
{
debug::dumpPlan(*query_plan, true);
debug::dumpMessage(root_rel, "substrait::PlanRel", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait plan size {}.",
"Missmatch result columns size. plan column size {}, subtrait plan name size {}.",
cols.getNames().size(),
root_rel.root().names_size());
}
for (int i = 0; i < static_cast<int>(cols.getNames().size()); i++)
aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i)));
actions_dag.project(aliases);
Expand All @@ -144,13 +134,14 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel
const auto & original_cols = original_header.getColumnsWithTypeAndName();
if (static_cast<size_t>(output_schema.types_size()) != original_cols.size())
{
debug::dumpPlan(*query_plan, true);
debug::dumpMessage(root_rel, "substrait::PlanRel", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatch output schema. plan column size {} [header: '{}'], subtrait plan size {}[schema: {}].",
"Missmatch result columns size. plan column size {}, subtrait plan output schema size {}, subtrait plan name size {}.",
original_cols.size(),
original_header.dumpStructure(),
output_schema.types_size(),
dumpMessage(output_schema));
root_rel.root().names_size());
}
bool need_final_project = false;
ColumnsWithTypeAndName final_cols;
Expand Down Expand Up @@ -192,7 +183,7 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel

QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
{
logDebugMessage(plan, "substrait plan");
debug::dumpMessage(plan, "substrait::Plan");
//parseExtensions(plan.extensions());
if (plan.relations_size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "too many relations found");
Expand All @@ -213,12 +204,7 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
PlanUtil::checkOuputType(*query_plan);
#endif

if (auto * logger = &Poco::Logger::get("SerializedPlanParser"); logger->debug())
{
auto out = PlanUtil::explainPlan(*query_plan);
LOG_DEBUG(logger, "clickhouse plan:\n{}", out);
}

debug::dumpPlan(*query_plan);
return query_plan;
}

Expand Down
57 changes: 0 additions & 57 deletions cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp

This file was deleted.

12 changes: 7 additions & 5 deletions cpp-ch/local-engine/Parser/SubstraitParserUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <string>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/Exception.h>

namespace DB::ErrorCodes
Expand Down Expand Up @@ -67,9 +68,10 @@ Message BinaryToMessage(const std::string_view binary)
return message;
}

void logDebugMessage(const google::protobuf::Message & message, const char * type);

std::string dumpMessage(const google::protobuf::Message & message);

std::string toString(const google::protobuf::Any & any);
inline std::string toString(const google::protobuf::Any & any)
{
google::protobuf::StringValue sv;
sv.ParseFromString(any.value());
return sv.value();
}
} // namespace local_engine
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <google/protobuf/util/json_util.h>
#include <rapidjson/document.h>
#include <Poco/StringTokenizer.h>

#include <write_optimization.pb.h>
#include <Poco/StringTokenizer.h>
#include <Common/DebugUtils.h>

using namespace DB;
using namespace local_engine;
Expand Down Expand Up @@ -228,7 +228,7 @@ MergeTreeTableInstance::MergeTreeTableInstance(const google::protobuf::Any & any
MergeTreeTableInstance::MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table)
: MergeTreeTableInstance(extension_table.detail())
{
logDebugMessage(extension_table, "merge_tree_table");
debug::dumpMessage(extension_table, "merge_tree_table");
}

SparkStorageMergeTreePtr MergeTreeTableInstance::restoreStorage(const ContextMutablePtr & context) const
Expand Down

0 comments on commit f498fe7

Please sign in to comment.