Skip to content

Commit

Permalink
refactor schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed May 15, 2023
1 parent 037b9c6 commit 0a88d40
Show file tree
Hide file tree
Showing 16 changed files with 29 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cpp/core/compute/Backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class Backend : public std::enable_shared_from_this<Backend> {

virtual std::shared_ptr<Datasource>
getDatasource(const std::string& filePath, const std::string& fileName, std::shared_ptr<arrow::Schema> schema) {
return std::make_shared<Datasource>(filePath, fileName, schema);
throw GlutenException("Not implement getDatasource");
}

std::unordered_map<std::string, std::string> getConfMap() {
Expand Down
3 changes: 1 addition & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,7 @@ Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper
jlong cSchema) {
JNI_METHOD_START
auto datasource = glutenDatasourceHolder.lookup(instanceId);
auto schema = datasource->inspectSchema();
GLUTEN_THROW_NOT_OK(arrow::ExportSchema(*schema.get(), reinterpret_cast<struct ArrowSchema*>(cSchema)));
datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
JNI_METHOD_END()
}

Expand Down
9 changes: 3 additions & 6 deletions cpp/core/operators/writer/Datasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <arrow/c/abi.h>
#include <arrow/memory_pool.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>
Expand All @@ -36,14 +37,10 @@ class Datasource {
virtual ~Datasource() = default;

virtual void init(const std::unordered_map<std::string, std::string>& sparkConfs) {}
virtual std::shared_ptr<arrow::Schema> inspectSchema() {
return nullptr;
}
virtual void inspectSchema(struct ArrowSchema* out) = 0;
virtual void write(const std::shared_ptr<ColumnarBatch>& cb) {}
virtual void close() {}
virtual std::shared_ptr<arrow::Schema> getSchema() {
return nullptr;
}
virtual std::shared_ptr<arrow::Schema> getSchema() = 0;

private:
std::string filePath_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/benchmarks/BenchmarkUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

#include "BenchmarkUtils.h"

#include "compute/VeloxBackend.h"
#include "compute/VeloxInitializer.h"
#include "config/GlutenConfig.h"
#include "velox/dwio/common/Options.h"
Expand Down
3 changes: 1 addition & 2 deletions cpp/velox/benchmarks/BenchmarkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
#pragma once

#include <benchmark/benchmark.h>

#include <velox/substrait/SubstraitToVeloxPlan.h>
#include <cstdlib>
#include <filesystem>
#include <fstream>

#include <thread>
#include <utility>

#include "compute/ProtobufUtils.h"
#include "utils/exception.h"
#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"

DECLARE_bool(print_result);
DECLARE_string(write_file);
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ auto BM_Generic = [](::benchmark::State& state,
auto resultIter = backend->getResultIterator(
gluten::defaultMemoryAllocator().get(), "/tmp/test-spill", std::move(inputIters), conf);
auto veloxPlan = std::dynamic_pointer_cast<gluten::VeloxBackend>(backend)->getVeloxPlan();
auto outputSchema = toArrowSchema(veloxPlan->outputType());
ArrowSchema cSchema;
toArrowSchema(veloxPlan->outputType(), &cSchema);
GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&cSchema));
ArrowWriter writer{FLAGS_write_file};
state.PauseTiming();
if (!FLAGS_write_file.empty()) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/benchmarks/QueryBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/

#include <benchmark/benchmark.h>
// Because we should include velox Abi.h first, otherwise it will conflicts with arrow abi.h
#include <compute/VeloxBackend.h>

#include "BenchmarkUtils.h"
#include "compute/ArrowTypeUtils.h"
#include "compute/VeloxBackend.h"
#include "compute/VeloxPlanConverter.h"

using namespace facebook;
Expand Down
9 changes: 6 additions & 3 deletions cpp/velox/compute/ArrowTypeUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ using namespace facebook;

namespace gluten {

void toArrowSchema(const std::shared_ptr<const velox::RowType>& rowType, struct ArrowSchema* out) {
exportToArrow(velox::BaseVector::create(rowType, 0, getDefaultVeloxLeafMemoryPool().get()), *out);
}

std::shared_ptr<arrow::Schema> toArrowSchema(const std::shared_ptr<const velox::RowType>& rowType) {
ArrowSchema arrowSchema{};
exportToArrow(velox::BaseVector::create(rowType, 0, getDefaultVeloxLeafMemoryPool().get()), arrowSchema);
ArrowSchema arrowSchema;
toArrowSchema(rowType, &arrowSchema);
GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&arrowSchema));
return outputSchema;
}

} // namespace gluten
2 changes: 2 additions & 0 deletions cpp/velox/compute/ArrowTypeUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

namespace gluten {

void toArrowSchema(const std::shared_ptr<const facebook::velox::RowType>& rowType, struct ArrowSchema* out);

std::shared_ptr<arrow::Schema> toArrowSchema(const std::shared_ptr<const facebook::velox::RowType>& rowType);

} // namespace gluten
1 change: 1 addition & 0 deletions cpp/velox/compute/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <arrow/array/array_base.h>
#include <arrow/buffer.h>
#include <arrow/c/abi.h>
#include <arrow/type_traits.h>
#include <arrow/util/decimal.h>

Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/VeloxInitializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
#endif
#include "velox/common/memory/MmapAllocator.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
#include "velox/exec/Operator.h"
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/compute/VeloxInitializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <folly/executors/IOThreadPoolExecutor.h>

#include "VeloxColumnarToRowConverter.h"
#include "WholeStageResultIterator.h"
#include "velox/common/caching/AsyncDataCache.h"

namespace gluten {

Expand Down Expand Up @@ -60,6 +60,8 @@ class VeloxInitializer {

std::unique_ptr<folly::IOThreadPoolExecutor> ssdCacheExecutor_;
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;

const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";

// memory cache
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxParquetDatasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::str
parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink), *(pool_), 2048, properities, queryCtx);
}

std::shared_ptr<arrow::Schema> VeloxParquetDatasource::inspectSchema() {
void VeloxParquetDatasource::inspectSchema(struct ArrowSchema* out) {
velox::dwio::common::ReaderOptions readerOptions(pool_.get());
auto format = velox::dwio::common::FileFormat::PARQUET;
readerOptions.setFileFormat(format);
Expand All @@ -122,7 +122,7 @@ std::shared_ptr<arrow::Schema> VeloxParquetDatasource::inspectSchema() {
std::make_unique<velox::dwio::common::BufferedInput>(
std::make_shared<velox::dwio::common::ReadFileInputStream>(readFile), *pool_.get()),
readerOptions);
return toArrowSchema(reader->rowType());
toArrowSchema(reader->rowType(), out);
}

void VeloxParquetDatasource::close() {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxParquetDatasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class VeloxParquetDatasource final : public Datasource {
: Datasource(filePath, fileName, schema), filePath_(filePath), fileName_(fileName), schema_(schema) {}

void init(const std::unordered_map<std::string, std::string>& sparkConfs) override;
std::shared_ptr<arrow::Schema> inspectSchema() override;
void inspectSchema(struct ArrowSchema* out) override;
void write(const std::shared_ptr<ColumnarBatch>& cb) override;
void close() override;
std::shared_ptr<arrow::Schema> getSchema() override {
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace gluten {

namespace {
// Velox configs
const std::string kHiveConnectorId = "test-hive";
const std::string kSpillEnabled = "spark.gluten.sql.columnar.backend.velox.spillEnabled";
const std::string kAggregationSpillEnabled = "spark.gluten.sql.columnar.backend.velox.aggregationSpillEnabled";
const std::string kJoinSpillEnabled = "spark.gluten.sql.columnar.backend.velox.joinSpillEnabled";
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/compute/WholeStageResultIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

namespace gluten {

static const std::string kHiveConnectorId = "test-hive";

class WholeStageResultIterator : public ColumnarBatchIterator {
public:
WholeStageResultIterator(
Expand Down

0 comments on commit 0a88d40

Please sign in to comment.