From f764cb64e70decef52c26472ba36b205a92006b1 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 11 May 2023 17:27:44 +0000 Subject: [PATCH 1/4] remove unused arrow code and optimize parse plan --- cpp/core/compute/Backend.h | 21 +- cpp/core/compute/ProtobufUtils.cc | 14 +- cpp/core/compute/ProtobufUtils.h | 12 +- cpp/core/jni/JniCommon.h | 242 ----------------------- cpp/core/jni/JniWrapper.cc | 4 +- cpp/core/shuffle/ArrowShuffleWriter.cc | 1 + cpp/core/shuffle/ArrowShuffleWriter.h | 1 + cpp/core/tests/BackendTest.cc | 21 +- cpp/core/utils/exception.h | 13 ++ cpp/velox/benchmarks/BenchmarkUtils.cc | 6 +- cpp/velox/benchmarks/BenchmarkUtils.h | 9 +- cpp/velox/benchmarks/GenericBenchmark.cc | 16 +- cpp/velox/benchmarks/QueryBenchmark.cc | 9 +- cpp/velox/tests/OrcTest.cc | 4 +- 14 files changed, 58 insertions(+), 315 deletions(-) diff --git a/cpp/core/compute/Backend.h b/cpp/core/compute/Backend.h index d4a467597d53..b5d66b0e2b60 100644 --- a/cpp/core/compute/Backend.h +++ b/cpp/core/compute/Backend.h @@ -44,25 +44,20 @@ class Backend : public std::enable_shared_from_this { const std::vector>& inputs, const std::unordered_map& sessionConf) = 0; - bool parsePlan(const uint8_t* data, int32_t size) { - return parsePlan(data, size, -1, -1, -1); + void parsePlan(const uint8_t* data, int32_t size) { + parsePlan(data, size, -1, -1, -1); } /// Parse and cache the plan. /// Return true if parsed successfully. - bool parsePlan(const uint8_t* data, int32_t size, int32_t stageId, int32_t partitionId, int64_t taskId) { + void parsePlan(const uint8_t* data, int32_t size, int32_t stageId, int32_t partitionId, int64_t taskId) { #ifdef GLUTEN_PRINT_DEBUG - auto buf = std::make_shared(data, size); - auto maybePlanJson = substraitFromPbToJson("Plan", *buf); - if (maybePlanJson.status().ok()) { - std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; - std::cout << "Task stageId: " << stageId << ", partitionId: " << partitionId << ", taskId: " << taskId << "; " - << maybePlanJson.ValueOrDie() << std::endl; - } else { - std::cout << "Error parsing substrait plan to json: " << maybePlanJson.status().ToString() << std::endl; - } + auto jsonPlan = substraitFromPbToJson("Plan", data, size); + std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl; + std::cout << "Task stageId: " << stageId << ", partitionId: " << partitionId << ", taskId: " << taskId << "; " + << jsonPlan << std::endl; #endif - return parseProtobuf(data, size, &substraitPlan_); + GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); } // Just for benchmark diff --git a/cpp/core/compute/ProtobufUtils.cc b/cpp/core/compute/ProtobufUtils.cc index ce5c69d0bc60..408f25cd6173 100644 --- a/cpp/core/compute/ProtobufUtils.cc +++ b/cpp/core/compute/ProtobufUtils.cc @@ -21,6 +21,8 @@ #include #include +#include "utils/exception.h" + namespace gluten { // Common for both projector and filters. @@ -43,7 +45,7 @@ inline google::protobuf::util::TypeResolver* getGeneratedTypeResolver() { return typeResolver.get(); } -arrow::Result> substraitFromJsonToPb(std::string_view typeName, std::string_view json) { +std::string substraitFromJsonToPb(std::string_view typeName, std::string_view json) { std::string typeUrl = "/substrait." + std::string(typeName); google::protobuf::io::ArrayInputStream jsonStream{json.data(), static_cast(json.size())}; @@ -55,22 +57,22 @@ arrow::Result> substraitFromJsonToPb(std::string_ google::protobuf::util::JsonToBinaryStream(getGeneratedTypeResolver(), typeUrl, &jsonStream, &outStream); if (!status.ok()) { - return arrow::Status::Invalid("JsonToBinaryStream returned ", status); + throw GlutenException("JsonToBinaryStream returned " + status.ToString()); } - return arrow::Buffer::FromString(std::move(out)); + return out; } -arrow::Result substraitFromPbToJson(std::string_view typeName, const arrow::Buffer& buf) { +std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* data, int32_t size) { std::string typeUrl = "/substrait." + std::string(typeName); - google::protobuf::io::ArrayInputStream bufStream{buf.data(), static_cast(buf.size())}; + google::protobuf::io::ArrayInputStream bufStream{data, size}; std::string out; google::protobuf::io::StringOutputStream outStream{&out}; auto status = google::protobuf::util::BinaryToJsonStream(getGeneratedTypeResolver(), typeUrl, &bufStream, &outStream); if (!status.ok()) { - return arrow::Status::Invalid("BinaryToJsonStream returned ", status); + throw GlutenException("BinaryToJsonStream returned " + status.ToString()); } return out; } diff --git a/cpp/core/compute/ProtobufUtils.h b/cpp/core/compute/ProtobufUtils.h index 07aee914f480..fd42fffe8c48 100644 --- a/cpp/core/compute/ProtobufUtils.h +++ b/cpp/core/compute/ProtobufUtils.h @@ -17,23 +17,15 @@ #pragma once -#include -#include -#include - #include #include namespace gluten { -// Common for both projector and filters. bool parseProtobuf(const uint8_t* buf, int bufLen, google::protobuf::Message* msg); -arrow::Result> substraitFromJsonToPb(std::string_view typeName, std::string_view json); - -arrow::Result substraitFromPbToJson(std::string_view typeName, const arrow::Buffer& buf); +std::string substraitFromJsonToPb(std::string_view typeName, std::string_view json); -// Write a Protobuf message into a specified file with JSON format. -// void MessageToJSONFile(const google::protobuf::Message& message, const std::string& file_path); +std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* data, int32_t size); } // namespace gluten diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index d8b6f4fbc534..0739bdb24c39 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -53,191 +53,6 @@ static inline jmethodID getStaticMethodId(JNIEnv* env, jclass thisClass, const c return ret; } -static inline std::shared_ptr getOffsetDataType(std::shared_ptr parentType) { - switch (parentType->id()) { - case arrow::BinaryType::type_id: - return std::make_shared::OffsetType>(); - case arrow::LargeBinaryType::type_id: - return std::make_shared::OffsetType>(); - case arrow::ListType::type_id: - return std::make_shared::OffsetType>(); - case arrow::LargeListType::type_id: - return std::make_shared::OffsetType>(); - default: - return nullptr; - } -} - -template -inline bool isFixedWidthType(T _) { - return std::is_base_of::value; -} - -static inline arrow::Status appendNodes( - std::shared_ptr column, - std::vector>* nodes) { - auto type = column->type(); - (*nodes).push_back(std::make_pair(column->length(), column->null_count())); - switch (type->id()) { - case arrow::Type::LIST: - case arrow::Type::LARGE_LIST: { - auto listArray = std::dynamic_pointer_cast(column); - RETURN_NOT_OK(appendNodes(listArray->values(), nodes)); - } break; - default: { - } break; - } - return arrow::Status::OK(); -} - -static inline arrow::Status appendBuffers( - std::shared_ptr column, - std::vector>* buffers) { - auto type = column->type(); - switch (type->id()) { - case arrow::Type::LIST: - case arrow::Type::LARGE_LIST: { - auto listArray = std::dynamic_pointer_cast(column); - (*buffers).push_back(listArray->null_bitmap()); - (*buffers).push_back(listArray->value_offsets()); - RETURN_NOT_OK(appendBuffers(listArray->values(), buffers)); - } break; - default: { - for (auto& buffer : column->data()->buffers) { - (*buffers).push_back(buffer); - } - } break; - } - return arrow::Status::OK(); -} - -static inline arrow::Status fixOffsetBuffer(std::shared_ptr* inBuf, int fixRow) { - if ((*inBuf) == nullptr || (*inBuf)->size() == 0) - return arrow::Status::OK(); - if ((*inBuf)->size() * 8 <= fixRow) { - ARROW_ASSIGN_OR_RAISE(auto valid_copy, arrow::AllocateBuffer((*inBuf)->size() + 1)); - std::memcpy(valid_copy->mutable_data(), (*inBuf)->data(), static_cast((*inBuf)->size())); - (*inBuf) = std::move(valid_copy); - } - arrow::bit_util::SetBitsTo(const_cast((*inBuf)->data()), fixRow, 1, true); - return arrow::Status::OK(); -} - -static inline arrow::Status makeArrayData( - std::shared_ptr type, - int numRows, - std::vector> inBufs, - int inBufsLen, - std::shared_ptr* arrData, - int* bufIdxPtr) { - if (arrow::is_nested(type->id())) { - // Maybe ListType, MapType, StructType or UnionType - switch (type->id()) { - case arrow::Type::LIST: - case arrow::Type::LARGE_LIST: { - auto offsetDataType = getOffsetDataType(type); - auto listType = std::dynamic_pointer_cast(type); - auto childType = listType->value_type(); - std::shared_ptr childArrayData, offsetArrayData; - // create offset array - // Chendi: For some reason, for ListArray::FromArrays will remove last - // row from offset array, refer to array_nested.cc CleanListOffsets - // function - RETURN_NOT_OK(fixOffsetBuffer(&inBufs[*bufIdxPtr], numRows)); - RETURN_NOT_OK(makeArrayData(offsetDataType, numRows + 1, inBufs, inBufsLen, &offsetArrayData, bufIdxPtr)); - auto offsetArray = arrow::MakeArray(offsetArrayData); - // create child data array - RETURN_NOT_OK(makeArrayData(childType, -1, inBufs, inBufsLen, &childArrayData, bufIdxPtr)); - auto childArray = arrow::MakeArray(childArrayData); - auto listArray = arrow::ListArray::FromArrays(*offsetArray, *childArray).ValueOrDie(); - *arrData = listArray->data(); - - } break; - default: - return arrow::Status::NotImplemented("MakeArrayData for type ", type->ToString(), " is not supported yet."); - } - - } else { - int64_t nullCount = arrow::kUnknownNullCount; - std::vector> buffers; - if (*bufIdxPtr >= inBufsLen) { - return arrow::Status::Invalid("insufficient number of in_buf_addrs"); - } - if (inBufs[*bufIdxPtr]->size() == 0) { - nullCount = 0; - } - buffers.push_back(inBufs[*bufIdxPtr]); - *bufIdxPtr += 1; - - if (arrow::is_binary_like(type->id())) { - if (*bufIdxPtr >= inBufsLen) { - return arrow::Status::Invalid("insufficient number of in_buf_addrs"); - } - - buffers.push_back(inBufs[*bufIdxPtr]); - auto offsetsSize = inBufs[*bufIdxPtr]->size(); - *bufIdxPtr += 1; - if (numRows == -1) - numRows = offsetsSize / 4 - 1; - } - - if (*bufIdxPtr >= inBufsLen) { - return arrow::Status::Invalid("insufficient number of in_buf_addrs"); - } - auto valueSize = inBufs[*bufIdxPtr]->size(); - buffers.push_back(inBufs[*bufIdxPtr]); - *bufIdxPtr += 1; - if (numRows == -1) { - numRows = valueSize * 8 / arrow::bit_width(type->id()); - } - - *arrData = arrow::ArrayData::Make(type, numRows, std::move(buffers), nullCount); - } - return arrow::Status::OK(); -} - -static inline arrow::Status makeRecordBatch( - const std::shared_ptr& schema, - int numRows, - std::vector> inBufs, - int inBufsLen, - std::shared_ptr* batch) { - std::vector> arrays; - auto numFields = schema->num_fields(); - int bufIdx = 0; - - for (int i = 0; i < numFields; i++) { - auto field = schema->field(i); - std::shared_ptr arrayData; - RETURN_NOT_OK(makeArrayData(field->type(), numRows, inBufs, inBufsLen, &arrayData, &bufIdx)); - arrays.push_back(arrayData); - } - - *batch = arrow::RecordBatch::Make(schema, numRows, arrays); - return arrow::Status::OK(); -} - -static inline arrow::Status makeRecordBatch( - const std::shared_ptr& schema, - int numRows, - int64_t* inBufAddrs, - int64_t* inBufSizes, - int inBufsLen, - std::shared_ptr* batch) { - std::vector> arrays; - std::vector> buffers; - for (int i = 0; i < inBufsLen; i++) { - if (inBufAddrs[i] != 0) { - auto data = - std::shared_ptr(new arrow::Buffer(reinterpret_cast(inBufAddrs[i]), inBufSizes[i])); - buffers.push_back(data); - } else { - buffers.push_back(std::make_shared(nullptr, 0)); - } - } - return makeRecordBatch(schema, numRows, buffers, inBufsLen, batch); -} - static inline std::string jStringToCString(JNIEnv* env, jstring string) { int32_t jlen, clen; clen = env->GetStringUTFLength(string); @@ -292,63 +107,6 @@ static inline arrow::Result getCompressionType(JNIEnv* return compression_type; } -static inline arrow::Status decompressBuffer( - const arrow::Buffer& buffer, - arrow::util::Codec* codec, - std::shared_ptr* out, - arrow::MemoryPool* pool) { - const uint8_t* data = buffer.data(); - int64_t compressedSize = buffer.size() - sizeof(int64_t); - int64_t uncompressedSize = arrow::bit_util::FromLittleEndian(arrow::util::SafeLoadAs(data)); - ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressedSize, pool)); - - int64_t actualDecompressed; - ARROW_ASSIGN_OR_RAISE( - actualDecompressed, - codec->Decompress(compressedSize, data + sizeof(int64_t), uncompressedSize, uncompressed->mutable_data())); - if (actualDecompressed != uncompressedSize) { - return arrow::Status::Invalid( - "Failed to fully decompress buffer, expected ", - uncompressedSize, - " bytes but decompressed ", - actualDecompressed); - } - *out = std::move(uncompressed); - return arrow::Status::OK(); -} - -static inline arrow::Status decompressBuffers( - arrow::Compression::type compression, - const arrow::ipc::IpcReadOptions& options, - const uint8_t* bufMask, - std::vector>& buffers, - const std::vector>& schemaFields) { - std::unique_ptr codec; - ARROW_ASSIGN_OR_RAISE(codec, arrow::util::Codec::Create(compression)); - - auto decompressOne = [&buffers, &bufMask, &codec, &options](int i) { - if (buffers[i] == nullptr || buffers[i]->size() == 0) { - return arrow::Status::OK(); - } - // if the buffer has been rebuilt to uncompressed on java side, return - if (arrow::bit_util::GetBit(bufMask, i)) { - ARROW_ASSIGN_OR_RAISE(auto valid_copy, buffers[i]->CopySlice(0, buffers[i]->size())); - buffers[i] = valid_copy; - return arrow::Status::OK(); - } - - if (buffers[i]->size() < 8) { - return arrow::Status::Invalid( - "Likely corrupted message, compressed buffers " - "are larger than 8 bytes by construction"); - } - RETURN_NOT_OK(decompressBuffer(*buffers[i], codec.get(), &buffers[i], options.memory_pool)); - return arrow::Status::OK(); - }; - - return ::arrow::internal::OptionalParallelFor(options.use_threads, static_cast(buffers.size()), decompressOne); -} - static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) { int getEnvStat = vm->GetEnv(reinterpret_cast(out), jniVersion); if (getEnvStat == JNI_EDETACHED) { diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 62400d5289ae..4ffca7a3a12b 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -353,9 +353,7 @@ Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKerne } auto backend = gluten::createBackend(); - if (!backend->parsePlan(planData, planSize, stageId, partitionId, taskId)) { - gluten::jniThrow("Failed to parse plan."); - } + backend->parsePlan(planData, planSize, stageId, partitionId, taskId); auto confs = getConfMap(env, confArr); diff --git a/cpp/core/shuffle/ArrowShuffleWriter.cc b/cpp/core/shuffle/ArrowShuffleWriter.cc index 9405b8a9b5ea..74d9d3d6b15e 100644 --- a/cpp/core/shuffle/ArrowShuffleWriter.cc +++ b/cpp/core/shuffle/ArrowShuffleWriter.cc @@ -34,6 +34,7 @@ #include "utils/compression.h" #include "utils/macros.h" +using namespace arrow; namespace gluten { using arrow::internal::checked_cast; diff --git a/cpp/core/shuffle/ArrowShuffleWriter.h b/cpp/core/shuffle/ArrowShuffleWriter.h index c50299629ba7..4dabdb019bb2 100644 --- a/cpp/core/shuffle/ArrowShuffleWriter.h +++ b/cpp/core/shuffle/ArrowShuffleWriter.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include diff --git a/cpp/core/tests/BackendTest.cc b/cpp/core/tests/BackendTest.cc index 455c4abeac8f..751570f42709 100644 --- a/cpp/core/tests/BackendTest.cc +++ b/cpp/core/tests/BackendTest.cc @@ -18,6 +18,7 @@ #include "compute/Backend.h" #include +#include "TestUtils.h" namespace gluten { @@ -41,20 +42,12 @@ class DummyBackend final : public Backend { } hasNext_ = false; - std::unique_ptr tmp; - std::unique_ptr builder; - std::shared_ptr array; - GLUTEN_THROW_NOT_OK(arrow::MakeBuilder(arrow::default_memory_pool(), arrow::float64(), &tmp)); - builder.reset(arrow::internal::checked_cast(tmp.release())); - - GLUTEN_THROW_NOT_OK(builder->Append(1000)); - GLUTEN_THROW_NOT_OK(builder->Finish(&array)); - std::vector> retTypes = {arrow::field("res", arrow::float64())}; - auto batch = arrow::RecordBatch::Make(arrow::schema(retTypes), 1, {array}); - std::unique_ptr cSchema = std::make_unique(); - std::unique_ptr cArray = std::make_unique(); - GLUTEN_THROW_NOT_OK(arrow::ExportRecordBatch(*batch, cArray.get(), cSchema.get())); - return std::make_shared(std::move(cSchema), std::move(cArray)); + auto fArrInt32 = arrow::field("f_int32", arrow::int32()); + auto rbSchema = arrow::schema({fArrInt32}); + const std::vector inputDataArr = {R"([1, 2,3])"}; + std::shared_ptr inputBatchArr; + makeInputBatch(inputDataArr, rbSchema, &inputBatchArr); + return std::make_shared(inputBatchArr); } private: diff --git a/cpp/core/utils/exception.h b/cpp/core/utils/exception.h index 7a876e013a9d..0f97d81f7f5b 100644 --- a/cpp/core/utils/exception.h +++ b/cpp/core/utils/exception.h @@ -37,6 +37,19 @@ #define GLUTEN_ASSIGN_OR_THROW(lhs, rexpr) \ GLUTEN_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr); +#define GLUTEN_CHECK(expr, errMessage) \ + do { \ + if (UNLIKELY(!(expr))) { \ + throw gluten::GlutenException(errMessage); \ + } \ + } while (0) + +#ifndef NDEBUG +#define GLUTEN_DCHECK(expr, errMessage) GLUTEN_CHECK(expr, errMessage) +#else +#define GLUTEN_DCHECK(expr, errMessage) GLUTEN_CHECK(true) +#endif + namespace gluten { class GlutenException final : public std::runtime_error { diff --git a/cpp/velox/benchmarks/BenchmarkUtils.cc b/cpp/velox/benchmarks/BenchmarkUtils.cc index 7f9a25577d12..03b68ebb1981 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/BenchmarkUtils.cc @@ -46,15 +46,15 @@ void initVeloxBackend() { initVeloxBackend(bmConfMap); } -arrow::Result> getPlanFromFile(const std::string& filePath) { +std::string getPlanFromFile(const std::string& filePath) { // Read json file and resume the binary data. std::ifstream msgJson(filePath); std::stringstream buffer; buffer << msgJson.rdbuf(); std::string msgData = buffer.str(); - auto maybePlan = gluten::substraitFromJsonToPb("Plan", msgData); - return maybePlan; + return gluten::substraitFromJsonToPb("Plan", msgData); + ; } std::shared_ptr getSplitInfos( diff --git a/cpp/velox/benchmarks/BenchmarkUtils.h b/cpp/velox/benchmarks/BenchmarkUtils.h index afac0e7312aa..ecdc1f3564d5 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/BenchmarkUtils.h @@ -20,7 +20,7 @@ #include #include -#include +// #include #include #include #include @@ -31,6 +31,7 @@ #include #include "compute/ProtobufUtils.h" +#include "utils/exception.h" #include "velox/common/memory/Memory.h" DECLARE_bool(print_result); @@ -56,7 +57,7 @@ inline std::string getExampleFilePath(const std::string& fileName) { } // Get the location of a file generated by Java unittest. -inline arrow::Result getGeneratedFilePath(const std::string& fileName) { +inline std::string getGeneratedFilePath(const std::string& fileName) { std::string currentPath = std::filesystem::current_path().c_str(); auto generatedFilePath = currentPath + "/../../../../backends-velox/generated-native-benchmark/"; std::filesystem::directory_entry filePath{generatedFilePath + fileName}; @@ -74,11 +75,11 @@ inline arrow::Result getGeneratedFilePath(const std::string& fileNa } } } - return arrow::Status::Invalid("Could not get generated file from given path: " + fileName); + throw gluten::GlutenException("Could not get generated file from given path: " + fileName); } /// Read binary data from a json file. -arrow::Result> getPlanFromFile(const std::string& filePath); +std::string getPlanFromFile(const std::string& filePath); /// Get the file paths, starts, lengths from a directory. /// Use fileFormat to specify the format to read, eg., orc, parquet. diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 9e90a6f43d30..ace2b01ed43d 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -55,13 +55,7 @@ auto BM_Generic = [](::benchmark::State& state, setCpu(state.thread_index()); } const auto& filePath = getExampleFilePath(substraitJsonFile); - auto maybePlan = getPlanFromFile(filePath); - if (!maybePlan.ok()) { - state.SkipWithError(maybePlan.status().message().c_str()); - return; - } - auto plan = std::move(maybePlan).ValueOrDie(); - + auto plan = getPlanFromFile(filePath); auto startTime = std::chrono::steady_clock::now(); int64_t collectBatchTime = 0; @@ -80,7 +74,7 @@ auto BM_Generic = [](::benchmark::State& state, }); } - backend->parsePlan(plan->data(), plan->size()); + backend->parsePlan(reinterpret_cast(plan.data()), plan.size()); auto resultIter = backend->getResultIterator( gluten::defaultMemoryAllocator().get(), "/tmp/test-spill", std::move(inputIters), conf); auto veloxPlan = std::dynamic_pointer_cast(backend)->getVeloxPlan(); @@ -227,9 +221,9 @@ int main(int argc, char** argv) { << std::endl; std::cout << "Running example..." << std::endl; inputFiles.resize(2); - GLUTEN_ASSIGN_OR_THROW(substraitJsonFile, getGeneratedFilePath("example.json")); - GLUTEN_ASSIGN_OR_THROW(inputFiles[0], getGeneratedFilePath("example_orders")); - GLUTEN_ASSIGN_OR_THROW(inputFiles[1], getGeneratedFilePath("example_lineitem")); + substraitJsonFile = getGeneratedFilePath("example.json"); + inputFiles[0] = getGeneratedFilePath("example_orders"); + inputFiles[1] = getGeneratedFilePath("example_lineitem"); } else { substraitJsonFile = argv[1]; abortIfFileNotExists(substraitJsonFile); diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index fbbfe34e9eca..617a1b8c92d8 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -60,12 +60,7 @@ auto bm = [](::benchmark::State& state, const std::string& jsonFile, const std::string& fileFormat) { const auto& filePath = getFilePath("plan/" + jsonFile); - auto maybePlan = getPlanFromFile(filePath); - if (!maybePlan.ok()) { - state.SkipWithError(maybePlan.status().message().c_str()); - return; - } - auto plan = std::move(maybePlan).ValueOrDie(); + auto plan = getPlanFromFile(filePath); std::vector> scanInfos; scanInfos.reserve(datasetPaths.size()); @@ -78,7 +73,7 @@ auto bm = [](::benchmark::State& state, auto backend = std::dynamic_pointer_cast(gluten::createBackend()); state.ResumeTiming(); - backend->parsePlan(plan->data(), plan->size()); + backend->parsePlan(reinterpret_cast(plan.data()), plan.size()); auto resultIter = getResultIterator(gluten::defaultMemoryAllocator().get(), backend, scanInfos); auto veloxPlan = std::dynamic_pointer_cast(backend)->getVeloxPlan(); auto outputSchema = getOutputSchema(veloxPlan); diff --git a/cpp/velox/tests/OrcTest.cc b/cpp/velox/tests/OrcTest.cc index 3e0c3ca279dc..96dec2067cb3 100644 --- a/cpp/velox/tests/OrcTest.cc +++ b/cpp/velox/tests/OrcTest.cc @@ -100,8 +100,8 @@ arrow::Status parquet2Orc(unsigned index, const std::string& parquetFile, const void testWriteOrc() { std::vector inputFiles(kFileNum); - GLUTEN_ASSIGN_OR_THROW(inputFiles[0], getGeneratedFilePath("example_orders")); - GLUTEN_ASSIGN_OR_THROW(inputFiles[1], getGeneratedFilePath("example_lineitem")); + inputFiles[0] = getGeneratedFilePath("example_orders"); + inputFiles[1] = getGeneratedFilePath("example_lineitem"); ASSERT_EQ(inputFiles.size(), orcTestData.entries.size()); From 19af9a953f970ce27a95cb1b8958626645996929 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 11 May 2023 18:02:30 +0000 Subject: [PATCH 2/4] remove unused arrow code --- cpp/velox/CMakeLists.txt | 1 - cpp/velox/benchmarks/BenchmarkUtils.cc | 8 - cpp/velox/benchmarks/BenchmarkUtils.h | 7 +- cpp/velox/benchmarks/GenericBenchmark.cc | 3 +- cpp/velox/benchmarks/QueryBenchmark.cc | 5 +- cpp/velox/compute/ArrowTypeUtils.cc | 39 ++-- cpp/velox/compute/ArrowTypeUtils.h | 3 - cpp/velox/compute/VeloxBackend.cc | 1 - cpp/velox/compute/VeloxBridge.cc | 228 ----------------------- cpp/velox/compute/VeloxBridge.h | 16 -- cpp/velox/compute/VeloxPlanConverter.cc | 1 - 11 files changed, 17 insertions(+), 295 deletions(-) delete mode 100644 cpp/velox/compute/VeloxBridge.cc delete mode 100644 cpp/velox/compute/VeloxBridge.h diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 1e9e14a7ddc1..b737bbc001a8 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -204,7 +204,6 @@ set(VELOX_SRCS compute/VeloxPlanConverter.cc compute/VeloxRowToColumnarConverter.cc compute/VeloxParquetDatasource.cc - compute/VeloxBridge.cc memory/VeloxMemoryPool.cc memory/VeloxColumnarBatch.cc ) diff --git a/cpp/velox/benchmarks/BenchmarkUtils.cc b/cpp/velox/benchmarks/BenchmarkUtils.cc index 03b68ebb1981..8e746219d343 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/BenchmarkUtils.cc @@ -102,14 +102,6 @@ void abortIfFileNotExists(const std::string& filepath) { } } -std::shared_ptr getOutputSchema(std::shared_ptr planNode) { - ArrowSchema arrowSchema{}; - exportToArrow( - velox::BaseVector::create(planNode->outputType(), 0, gluten::getDefaultVeloxLeafMemoryPool().get()), arrowSchema); - GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&arrowSchema)); - return outputSchema; -} - bool endsWith(const std::string& data, const std::string& suffix) { return data.find(suffix, data.size() - suffix.size()) != std::string::npos; } diff --git a/cpp/velox/benchmarks/BenchmarkUtils.h b/cpp/velox/benchmarks/BenchmarkUtils.h index ecdc1f3564d5..38221ff61186 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/BenchmarkUtils.h @@ -19,9 +19,6 @@ #include -#include -// #include -#include #include #include #include @@ -95,6 +92,4 @@ void abortIfFileNotExists(const std::string& filepath); /// Return whether the data ends with suffix. bool endsWith(const std::string& data, const std::string& suffix); -void setCpu(uint32_t cpuindex); - -std::shared_ptr getOutputSchema(std::shared_ptr plan); +void setCpu(uint32_t cpuindex); \ No newline at end of file diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index ace2b01ed43d..d7b50e679a05 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -28,6 +28,7 @@ #include "BatchStreamIterator.h" #include "BatchVectorIterator.h" #include "BenchmarkUtils.h" +#include "compute/ArrowTypeUtils.h" #include "compute/VeloxBackend.h" #include "compute/VeloxPlanConverter.h" #include "config/GlutenConfig.h" @@ -78,7 +79,7 @@ auto BM_Generic = [](::benchmark::State& state, auto resultIter = backend->getResultIterator( gluten::defaultMemoryAllocator().get(), "/tmp/test-spill", std::move(inputIters), conf); auto veloxPlan = std::dynamic_pointer_cast(backend)->getVeloxPlan(); - auto outputSchema = getOutputSchema(veloxPlan); + auto outputSchema = toArrowSchema(veloxPlan->outputType()); ArrowWriter writer{FLAGS_write_file}; state.PauseTiming(); if (!FLAGS_write_file.empty()) { diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 617a1b8c92d8..655ddae19970 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -16,9 +16,10 @@ */ #include -#include #include "BenchmarkUtils.h" +#include "compute/ArrowTypeUtils.h" +#include "compute/VeloxBackend.h" #include "compute/VeloxPlanConverter.h" using namespace facebook; @@ -76,7 +77,7 @@ auto bm = [](::benchmark::State& state, backend->parsePlan(reinterpret_cast(plan.data()), plan.size()); auto resultIter = getResultIterator(gluten::defaultMemoryAllocator().get(), backend, scanInfos); auto veloxPlan = std::dynamic_pointer_cast(backend)->getVeloxPlan(); - auto outputSchema = getOutputSchema(veloxPlan); + auto outputSchema = toArrowSchema(veloxPlan->outputType()); while (resultIter->hasNext()) { auto array = resultIter->next()->exportArrowArray(); auto maybeBatch = arrow::ImportRecordBatch(array.get(), outputSchema); diff --git a/cpp/velox/compute/ArrowTypeUtils.cc b/cpp/velox/compute/ArrowTypeUtils.cc index 854aa59b5d1a..03b0194a7af0 100644 --- a/cpp/velox/compute/ArrowTypeUtils.cc +++ b/cpp/velox/compute/ArrowTypeUtils.cc @@ -15,41 +15,24 @@ * limitations under the License. */ +#include +#include + #include "ArrowTypeUtils.h" +#include "memory/VeloxMemoryPool.h" +#include "utils/exception.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/arrow/Bridge.h" using namespace facebook; namespace gluten { -std::shared_ptr toArrowType(const velox::TypePtr& type) { - switch (type->kind()) { - case velox::TypeKind::INTEGER: - return arrow::int32(); - case velox::TypeKind::BIGINT: - return arrow::int64(); - case velox::TypeKind::REAL: - return arrow::float32(); - case velox::TypeKind::DOUBLE: - return arrow::float64(); - case velox::TypeKind::VARCHAR: - return arrow::utf8(); - case velox::TypeKind::VARBINARY: - return arrow::utf8(); - case velox::TypeKind::TIMESTAMP: - return arrow::timestamp(arrow::TimeUnit::MICRO); - default: - throw std::runtime_error("Type conversion is not supported."); - } -} - std::shared_ptr toArrowSchema(const std::shared_ptr& rowType) { - std::vector> fields; - auto size = rowType->size(); - fields.reserve(size); - for (auto i = 0; i < size; ++i) { - fields.push_back(arrow::field(rowType->nameOf(i), toArrowType(rowType->childAt(i)))); - } - return arrow::schema(fields); + ArrowSchema arrowSchema{}; + exportToArrow(velox::BaseVector::create(rowType, 0, getDefaultVeloxLeafMemoryPool().get()), arrowSchema); + GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&arrowSchema)); + return outputSchema; } } // namespace gluten \ No newline at end of file diff --git a/cpp/velox/compute/ArrowTypeUtils.h b/cpp/velox/compute/ArrowTypeUtils.h index 5b62141a073c..acf6bfc78fb2 100644 --- a/cpp/velox/compute/ArrowTypeUtils.h +++ b/cpp/velox/compute/ArrowTypeUtils.h @@ -18,14 +18,11 @@ #pragma once #include -#include #include "velox/type/Type.h" namespace gluten { -std::shared_ptr toArrowType(const facebook::velox::TypePtr& type); - std::shared_ptr toArrowSchema(const std::shared_ptr& rowType); } // namespace gluten diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 6aedaa01873b..9931dca3dc1f 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -19,7 +19,6 @@ #include #include "ArrowTypeUtils.h" -#include "VeloxBridge.h" #include "arrow/c/bridge.h" #include "compute/Backend.h" #include "compute/ResultIterator.h" diff --git a/cpp/velox/compute/VeloxBridge.cc b/cpp/velox/compute/VeloxBridge.cc deleted file mode 100644 index c1f2bad1ebd3..000000000000 --- a/cpp/velox/compute/VeloxBridge.cc +++ /dev/null @@ -1,228 +0,0 @@ - -#include "VeloxBridge.h" - -#include -#include -#include -#include -#include -#include - -#include "arrow/array.h" -#include "arrow/buffer.h" -#include "arrow/c/helpers.h" -#include "arrow/extension_type.h" -#include "arrow/memory_pool.h" -#include "arrow/record_batch.h" -#include "arrow/result.h" -#include "arrow/stl_allocator.h" -#include "arrow/type_traits.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/checked_cast.h" -#include "arrow/util/key_value_metadata.h" -#include "arrow/util/logging.h" -#include "arrow/util/macros.h" -#include "arrow/util/small_vector.h" -#include "arrow/util/value_parsing.h" -#include "arrow/visit_type_inline.h" - -namespace gluten { - -namespace { - -class ExportedArrayStreamByArray { - public: - struct PrivateData { - explicit PrivateData(std::shared_ptr reader, std::shared_ptr schema) - : reader_(std::move(reader)), schema_(schema) {} - - std::shared_ptr reader_; - std::string last_error_; - std::shared_ptr schema_; - - PrivateData() = default; - ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData); - }; - - explicit ExportedArrayStreamByArray(struct ArrowArrayStream* stream) : stream_(stream) {} - - arrow::Status getSchema(struct ArrowSchema* outSchema) { - return ExportSchema(*schema(), outSchema); - } - - arrow::Status getNext(struct ArrowArray* outArray) { - std::shared_ptr array; - RETURN_NOT_OK(reader()->Next().Value(&array)); - if (array == nullptr) { - // End of stream - ArrowArrayMarkReleased(outArray); - } else { - ArrowArrayMove(array.get(), outArray); - } - return arrow::Status::OK(); - } - - const char* getLastError() { - const auto& lastError = privateData()->last_error_; - return lastError.empty() ? nullptr : lastError.c_str(); - } - - void release() { - if (ArrowArrayStreamIsReleased(stream_)) { - return; - } - DCHECK_NE(privateData(), nullptr); - delete privateData(); - - ArrowArrayStreamMarkReleased(stream_); - } - - // C-compatible callbacks - - static int staticGetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* outSchema) { - ExportedArrayStreamByArray self{stream}; - return self.toCError(self.getSchema(outSchema)); - } - - static int staticGetNext(struct ArrowArrayStream* stream, struct ArrowArray* outArray) { - ExportedArrayStreamByArray self{stream}; - return self.toCError(self.getNext(outArray)); - } - - static void staticRelease(struct ArrowArrayStream* stream) { - ExportedArrayStreamByArray{stream}.release(); - } - - static const char* staticGetLastError(struct ArrowArrayStream* stream) { - return ExportedArrayStreamByArray{stream}.getLastError(); - } - - private: - int toCError(const arrow::Status& status) { - if (ARROW_PREDICT_TRUE(status.ok())) { - privateData()->last_error_.clear(); - return 0; - } - privateData()->last_error_ = status.ToString(); - switch (status.code()) { - case arrow::StatusCode::IOError: - return EIO; - case arrow::StatusCode::NotImplemented: - return ENOSYS; - case arrow::StatusCode::OutOfMemory: - return ENOMEM; - default: - return EINVAL; // Fallback for Invalid, TypeError, etc. - } - } - - PrivateData* privateData() { - return reinterpret_cast(stream_->private_data); - } - - const std::shared_ptr reader() { - return privateData()->reader_; - } - - const std::shared_ptr schema() { - return privateData()->schema_; - } - - struct ArrowArrayStream* stream_; -}; - -} // namespace - -arrow::Status exportArrowArray( - std::shared_ptr schema, - std::shared_ptr reader, - struct ArrowArrayStream* out) { - out->get_schema = ExportedArrayStreamByArray::staticGetSchema; - out->get_next = ExportedArrayStreamByArray::staticGetNext; - out->get_last_error = ExportedArrayStreamByArray::staticGetLastError; - out->release = ExportedArrayStreamByArray::staticRelease; - out->private_data = new ExportedArrayStreamByArray::PrivateData{std::move(reader), std::move(schema)}; - return arrow::Status::OK(); -} - -////////////////////////////////////////////////////////////////////////// -// C stream import - -// namespace { - -// class ArrayStreamArrayReader : public ArrowArray { -// public: -// explicit ArrayStreamArrayReader(struct ArrowArrayStream* stream) { -// ArrowArrayStreamMove(stream, &stream_); -// DCHECK(!ArrowArrayStreamIsReleased(&stream_)); -// } - -// ~ArrayStreamArrayReader() { -// ArrowArrayStreamRelease(&stream_); -// DCHECK(ArrowArrayStreamIsReleased(&stream_)); -// } - -// std::shared_ptr schema() const override { return CacheSchema(); } - -// Status ReadNext(std::shared_ptr* batch) override { -// struct ArrowArray c_array; -// RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array))); -// if (ArrowArrayIsReleased(&c_array)) { -// // End of stream -// batch->reset(); -// return Status::OK(); -// } else { -// return ImportRecordBatch(&c_array, CacheSchema()).Value(batch); -// } -// } - -// private: -// std::shared_ptr CacheSchema() const { -// if (!schema_) { -// struct ArrowSchema c_schema; -// ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, -// &c_schema))); schema_ = ImportSchema(&c_schema).ValueOrDie(); -// } -// return schema_; -// } - -// Status StatusFromCError(int errno_like) const { -// if (ARROW_PREDICT_TRUE(errno_like == 0)) { -// return Status::OK(); -// } -// StatusCode code; -// switch (errno_like) { -// case EDOM: -// case EINVAL: -// case ERANGE: -// code = StatusCode::Invalid; -// break; -// case ENOMEM: -// code = StatusCode::OutOfMemory; -// break; -// case ENOSYS: -// code = StatusCode::NotImplemented; -// default: -// code = StatusCode::IOError; -// break; -// } -// const char* last_error = stream_.get_last_error(&stream_); -// return Status(code, last_error ? std::string(last_error) : ""); -// } - -// mutable struct ArrowArrayStream stream_; -// mutable std::shared_ptr schema_; -// }; - -// } // namespace - -// Result> ImportArrowArray( -// struct ArrowArrayStream* stream) { -// if (ArrowArrayStreamIsReleased(stream)) { -// return Status::Invalid("Cannot import released ArrowArrayStream"); -// } -// // XXX should we call get_schema() here to avoid crashing on error? -// return std::make_shared(stream); -// } - -} // namespace gluten diff --git a/cpp/velox/compute/VeloxBridge.h b/cpp/velox/compute/VeloxBridge.h deleted file mode 100644 index d7d2dc3749e2..000000000000 --- a/cpp/velox/compute/VeloxBridge.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once - -#include "arrow/c/bridge.h" -#include "arrow/util/iterator.h" - -namespace gluten { - -using ArrowArrayIterator = arrow::Iterator>; - -ARROW_EXPORT -arrow::Status exportArrowArray( - std::shared_ptr schema, - std::shared_ptr reader, - struct ArrowArrayStream* out); - -} // namespace gluten diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index d2c85bcad129..47e9b4be195a 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -19,7 +19,6 @@ #include #include "ArrowTypeUtils.h" -#include "VeloxBridge.h" #include "arrow/c/bridge.h" #include "compute/ResultIterator.h" #include "compute/RowVectorStream.h" From 2b7682ae6cbaac76ea4679a58d49cbe699ca7bce Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 15 May 2023 09:27:42 +0000 Subject: [PATCH 3/4] refactor schema --- cpp/core/compute/Backend.h | 2 +- cpp/core/jni/JniWrapper.cc | 3 +-- cpp/core/operators/writer/Datasource.h | 9 +++------ cpp/velox/benchmarks/BenchmarkUtils.cc | 2 +- cpp/velox/benchmarks/BenchmarkUtils.h | 3 +-- cpp/velox/benchmarks/GenericBenchmark.cc | 4 +++- cpp/velox/benchmarks/QueryBenchmark.cc | 3 ++- cpp/velox/compute/ArrowTypeUtils.cc | 9 ++++++--- cpp/velox/compute/ArrowTypeUtils.h | 2 ++ cpp/velox/compute/VeloxColumnarToRowConverter.cc | 1 + cpp/velox/compute/VeloxInitializer.cc | 1 + cpp/velox/compute/VeloxInitializer.h | 4 +++- cpp/velox/compute/VeloxParquetDatasource.cc | 4 ++-- cpp/velox/compute/VeloxParquetDatasource.h | 2 +- cpp/velox/compute/WholeStageResultIterator.cc | 1 + cpp/velox/compute/WholeStageResultIterator.h | 2 -- 16 files changed, 29 insertions(+), 23 deletions(-) diff --git a/cpp/core/compute/Backend.h b/cpp/core/compute/Backend.h index b5d66b0e2b60..36ab41a83cde 100644 --- a/cpp/core/compute/Backend.h +++ b/cpp/core/compute/Backend.h @@ -104,7 +104,7 @@ class Backend : public std::enable_shared_from_this { virtual std::shared_ptr getDatasource(const std::string& filePath, const std::string& fileName, std::shared_ptr schema) { - return std::make_shared(filePath, fileName, schema); + throw GlutenException("Not implement getDatasource"); } std::unordered_map getConfMap() { diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 4ffca7a3a12b..4b596d915e4c 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -999,8 +999,7 @@ Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper jlong cSchema) { JNI_METHOD_START auto datasource = glutenDatasourceHolder.lookup(instanceId); - auto schema = datasource->inspectSchema(); - GLUTEN_THROW_NOT_OK(arrow::ExportSchema(*schema.get(), reinterpret_cast(cSchema))); + datasource->inspectSchema(reinterpret_cast(cSchema)); JNI_METHOD_END() } diff --git a/cpp/core/operators/writer/Datasource.h b/cpp/core/operators/writer/Datasource.h index ecbf5f014ab3..f79ed18e4ca5 100644 --- a/cpp/core/operators/writer/Datasource.h +++ b/cpp/core/operators/writer/Datasource.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -36,14 +37,10 @@ class Datasource { virtual ~Datasource() = default; virtual void init(const std::unordered_map& sparkConfs) {} - virtual std::shared_ptr inspectSchema() { - return nullptr; - } + virtual void inspectSchema(struct ArrowSchema* out) = 0; virtual void write(const std::shared_ptr& cb) {} virtual void close() {} - virtual std::shared_ptr getSchema() { - return nullptr; - } + virtual std::shared_ptr getSchema() = 0; private: std::string filePath_; diff --git a/cpp/velox/benchmarks/BenchmarkUtils.cc b/cpp/velox/benchmarks/BenchmarkUtils.cc index 8e746219d343..32aa3c06ab7c 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/BenchmarkUtils.cc @@ -16,7 +16,7 @@ */ #include "BenchmarkUtils.h" - +#include "compute/VeloxBackend.h" #include "compute/VeloxInitializer.h" #include "config/GlutenConfig.h" #include "velox/dwio/common/Options.h" diff --git a/cpp/velox/benchmarks/BenchmarkUtils.h b/cpp/velox/benchmarks/BenchmarkUtils.h index 38221ff61186..3a2a9c320bae 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/BenchmarkUtils.h @@ -18,18 +18,17 @@ #pragma once #include - #include #include #include #include - #include #include #include "compute/ProtobufUtils.h" #include "utils/exception.h" #include "velox/common/memory/Memory.h" +#include "velox/dwio/common/tests/utils/DataFiles.h" DECLARE_bool(print_result); DECLARE_string(write_file); diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index d7b50e679a05..d9c86ae59968 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -79,7 +79,9 @@ auto BM_Generic = [](::benchmark::State& state, auto resultIter = backend->getResultIterator( gluten::defaultMemoryAllocator().get(), "/tmp/test-spill", std::move(inputIters), conf); auto veloxPlan = std::dynamic_pointer_cast(backend)->getVeloxPlan(); - auto outputSchema = toArrowSchema(veloxPlan->outputType()); + ArrowSchema cSchema; + toArrowSchema(veloxPlan->outputType(), &cSchema); + GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&cSchema)); ArrowWriter writer{FLAGS_write_file}; state.PauseTiming(); if (!FLAGS_write_file.empty()) { diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 655ddae19970..edcaa65f8809 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -16,10 +16,11 @@ */ #include +// Because we should include velox Abi.h first, otherwise it will conflicts with arrow abi.h +#include #include "BenchmarkUtils.h" #include "compute/ArrowTypeUtils.h" -#include "compute/VeloxBackend.h" #include "compute/VeloxPlanConverter.h" using namespace facebook; diff --git a/cpp/velox/compute/ArrowTypeUtils.cc b/cpp/velox/compute/ArrowTypeUtils.cc index 03b0194a7af0..41f0df0bc3c5 100644 --- a/cpp/velox/compute/ArrowTypeUtils.cc +++ b/cpp/velox/compute/ArrowTypeUtils.cc @@ -28,11 +28,14 @@ using namespace facebook; namespace gluten { +void toArrowSchema(const std::shared_ptr& rowType, struct ArrowSchema* out) { + exportToArrow(velox::BaseVector::create(rowType, 0, getDefaultVeloxLeafMemoryPool().get()), *out); +} + std::shared_ptr toArrowSchema(const std::shared_ptr& rowType) { - ArrowSchema arrowSchema{}; - exportToArrow(velox::BaseVector::create(rowType, 0, getDefaultVeloxLeafMemoryPool().get()), arrowSchema); + ArrowSchema arrowSchema; + toArrowSchema(rowType, &arrowSchema); GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&arrowSchema)); return outputSchema; } - } // namespace gluten \ No newline at end of file diff --git a/cpp/velox/compute/ArrowTypeUtils.h b/cpp/velox/compute/ArrowTypeUtils.h index acf6bfc78fb2..a1601725845b 100644 --- a/cpp/velox/compute/ArrowTypeUtils.h +++ b/cpp/velox/compute/ArrowTypeUtils.h @@ -23,6 +23,8 @@ namespace gluten { +void toArrowSchema(const std::shared_ptr& rowType, struct ArrowSchema* out); + std::shared_ptr toArrowSchema(const std::shared_ptr& rowType); } // namespace gluten diff --git a/cpp/velox/compute/VeloxColumnarToRowConverter.cc b/cpp/velox/compute/VeloxColumnarToRowConverter.cc index 5370950d5cb4..f0bd4381266e 100644 --- a/cpp/velox/compute/VeloxColumnarToRowConverter.cc +++ b/cpp/velox/compute/VeloxColumnarToRowConverter.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include diff --git a/cpp/velox/compute/VeloxInitializer.cc b/cpp/velox/compute/VeloxInitializer.cc index 882cad4de604..e5218481f0b5 100644 --- a/cpp/velox/compute/VeloxInitializer.cc +++ b/cpp/velox/compute/VeloxInitializer.cc @@ -31,6 +31,7 @@ #include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h" #endif #include "velox/common/memory/MmapAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/exec/Operator.h" diff --git a/cpp/velox/compute/VeloxInitializer.h b/cpp/velox/compute/VeloxInitializer.h index 487503af64d6..ab7a53933c66 100644 --- a/cpp/velox/compute/VeloxInitializer.h +++ b/cpp/velox/compute/VeloxInitializer.h @@ -24,7 +24,7 @@ #include #include "VeloxColumnarToRowConverter.h" -#include "WholeStageResultIterator.h" +#include "velox/common/caching/AsyncDataCache.h" namespace gluten { @@ -60,6 +60,8 @@ class VeloxInitializer { std::unique_ptr ssdCacheExecutor_; std::unique_ptr ioExecutor_; + + const std::string kHiveConnectorId = "test-hive"; const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled"; // memory cache diff --git a/cpp/velox/compute/VeloxParquetDatasource.cc b/cpp/velox/compute/VeloxParquetDatasource.cc index c8535978054d..1a72f83ab5f6 100644 --- a/cpp/velox/compute/VeloxParquetDatasource.cc +++ b/cpp/velox/compute/VeloxParquetDatasource.cc @@ -107,7 +107,7 @@ void VeloxParquetDatasource::init(const std::unordered_map(std::move(sink), *(pool_), 2048, properities, queryCtx); } -std::shared_ptr VeloxParquetDatasource::inspectSchema() { +void VeloxParquetDatasource::inspectSchema(struct ArrowSchema* out) { velox::dwio::common::ReaderOptions readerOptions(pool_.get()); auto format = velox::dwio::common::FileFormat::PARQUET; readerOptions.setFileFormat(format); @@ -122,7 +122,7 @@ std::shared_ptr VeloxParquetDatasource::inspectSchema() { std::make_unique( std::make_shared(readFile), *pool_.get()), readerOptions); - return toArrowSchema(reader->rowType()); + toArrowSchema(reader->rowType(), out); } void VeloxParquetDatasource::close() { diff --git a/cpp/velox/compute/VeloxParquetDatasource.h b/cpp/velox/compute/VeloxParquetDatasource.h index 11b1f22ccc48..f2cec14f9952 100644 --- a/cpp/velox/compute/VeloxParquetDatasource.h +++ b/cpp/velox/compute/VeloxParquetDatasource.h @@ -51,7 +51,7 @@ class VeloxParquetDatasource final : public Datasource { : Datasource(filePath, fileName, schema), filePath_(filePath), fileName_(fileName), schema_(schema) {} void init(const std::unordered_map& sparkConfs) override; - std::shared_ptr inspectSchema() override; + void inspectSchema(struct ArrowSchema* out) override; void write(const std::shared_ptr& cb) override; void close() override; std::shared_ptr getSchema() override { diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 90f71265ca04..8dec309b8a03 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -14,6 +14,7 @@ namespace gluten { namespace { // Velox configs +const std::string kHiveConnectorId = "test-hive"; const std::string kSpillEnabled = "spark.gluten.sql.columnar.backend.velox.spillEnabled"; const std::string kAggregationSpillEnabled = "spark.gluten.sql.columnar.backend.velox.aggregationSpillEnabled"; const std::string kJoinSpillEnabled = "spark.gluten.sql.columnar.backend.velox.joinSpillEnabled"; diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index bd52d113565a..8b40a3ce6711 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -10,8 +10,6 @@ namespace gluten { -static const std::string kHiveConnectorId = "test-hive"; - class WholeStageResultIterator : public ColumnarBatchIterator { public: WholeStageResultIterator( From cf7f5dcd95d62445028a17bcbf276fd6ce2995be Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 15 May 2023 16:18:00 +0000 Subject: [PATCH 4/4] fix code style --- cpp/velox/benchmarks/BenchmarkUtils.h | 2 +- cpp/velox/compute/ArrowTypeUtils.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/velox/benchmarks/BenchmarkUtils.h b/cpp/velox/benchmarks/BenchmarkUtils.h index 3a2a9c320bae..73373a7cec22 100644 --- a/cpp/velox/benchmarks/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/BenchmarkUtils.h @@ -91,4 +91,4 @@ void abortIfFileNotExists(const std::string& filepath); /// Return whether the data ends with suffix. bool endsWith(const std::string& data, const std::string& suffix); -void setCpu(uint32_t cpuindex); \ No newline at end of file +void setCpu(uint32_t cpuindex); diff --git a/cpp/velox/compute/ArrowTypeUtils.cc b/cpp/velox/compute/ArrowTypeUtils.cc index 41f0df0bc3c5..27cb71c24bbf 100644 --- a/cpp/velox/compute/ArrowTypeUtils.cc +++ b/cpp/velox/compute/ArrowTypeUtils.cc @@ -38,4 +38,4 @@ std::shared_ptr toArrowSchema(const std::shared_ptr