Skip to content

Commit

Permalink
[GLUTEN-1434] [VL] Remove unused arrow code and add GLUTEN_CHECK and …
Browse files Browse the repository at this point in the history
…GLUTEN_DCHECK (#1611)
  • Loading branch information
jinchengchenghh authored May 16, 2023
1 parent bfbd42c commit 5facfbd
Show file tree
Hide file tree
Showing 29 changed files with 95 additions and 624 deletions.
23 changes: 9 additions & 14 deletions cpp/core/compute/Backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,20 @@ class Backend : public std::enable_shared_from_this<Backend> {
const std::vector<std::shared_ptr<ResultIterator>>& inputs,
const std::unordered_map<std::string, std::string>& sessionConf) = 0;

bool parsePlan(const uint8_t* data, int32_t size) {
return parsePlan(data, size, -1, -1, -1);
void parsePlan(const uint8_t* data, int32_t size) {
parsePlan(data, size, -1, -1, -1);
}

/// Parse and cache the plan.
/// Return true if parsed successfully.
bool parsePlan(const uint8_t* data, int32_t size, int32_t stageId, int32_t partitionId, int64_t taskId) {
void parsePlan(const uint8_t* data, int32_t size, int32_t stageId, int32_t partitionId, int64_t taskId) {
#ifdef GLUTEN_PRINT_DEBUG
auto buf = std::make_shared<arrow::Buffer>(data, size);
auto maybePlanJson = substraitFromPbToJson("Plan", *buf);
if (maybePlanJson.status().ok()) {
std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl;
std::cout << "Task stageId: " << stageId << ", partitionId: " << partitionId << ", taskId: " << taskId << "; "
<< maybePlanJson.ValueOrDie() << std::endl;
} else {
std::cout << "Error parsing substrait plan to json: " << maybePlanJson.status().ToString() << std::endl;
}
auto jsonPlan = substraitFromPbToJson("Plan", data, size);
std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl;
std::cout << "Task stageId: " << stageId << ", partitionId: " << partitionId << ", taskId: " << taskId << "; "
<< jsonPlan << std::endl;
#endif
return parseProtobuf(data, size, &substraitPlan_);
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed");
}

// Just for benchmark
Expand Down Expand Up @@ -109,7 +104,7 @@ class Backend : public std::enable_shared_from_this<Backend> {

virtual std::shared_ptr<Datasource>
getDatasource(const std::string& filePath, const std::string& fileName, std::shared_ptr<arrow::Schema> schema) {
return std::make_shared<Datasource>(filePath, fileName, schema);
throw GlutenException("Not implement getDatasource");
}

std::unordered_map<std::string, std::string> getConfMap() {
Expand Down
14 changes: 8 additions & 6 deletions cpp/core/compute/ProtobufUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <google/protobuf/util/type_resolver_util.h>
#include <fstream>

#include "utils/exception.h"

namespace gluten {

// Common for both projector and filters.
Expand All @@ -43,7 +45,7 @@ inline google::protobuf::util::TypeResolver* getGeneratedTypeResolver() {
return typeResolver.get();
}

arrow::Result<std::shared_ptr<arrow::Buffer>> substraitFromJsonToPb(std::string_view typeName, std::string_view json) {
std::string substraitFromJsonToPb(std::string_view typeName, std::string_view json) {
std::string typeUrl = "/substrait." + std::string(typeName);

google::protobuf::io::ArrayInputStream jsonStream{json.data(), static_cast<int>(json.size())};
Expand All @@ -55,22 +57,22 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> substraitFromJsonToPb(std::string_
google::protobuf::util::JsonToBinaryStream(getGeneratedTypeResolver(), typeUrl, &jsonStream, &outStream);

if (!status.ok()) {
return arrow::Status::Invalid("JsonToBinaryStream returned ", status);
throw GlutenException("JsonToBinaryStream returned " + status.ToString());
}
return arrow::Buffer::FromString(std::move(out));
return out;
}

arrow::Result<std::string> substraitFromPbToJson(std::string_view typeName, const arrow::Buffer& buf) {
std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* data, int32_t size) {
std::string typeUrl = "/substrait." + std::string(typeName);

google::protobuf::io::ArrayInputStream bufStream{buf.data(), static_cast<int>(buf.size())};
google::protobuf::io::ArrayInputStream bufStream{data, size};

std::string out;
google::protobuf::io::StringOutputStream outStream{&out};

auto status = google::protobuf::util::BinaryToJsonStream(getGeneratedTypeResolver(), typeUrl, &bufStream, &outStream);
if (!status.ok()) {
return arrow::Status::Invalid("BinaryToJsonStream returned ", status);
throw GlutenException("BinaryToJsonStream returned " + status.ToString());
}
return out;
}
Expand Down
12 changes: 2 additions & 10 deletions cpp/core/compute/ProtobufUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,15 @@

#pragma once

#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>

#include <google/protobuf/message.h>
#include <string>

namespace gluten {

// Common for both projector and filters.
bool parseProtobuf(const uint8_t* buf, int bufLen, google::protobuf::Message* msg);

arrow::Result<std::shared_ptr<arrow::Buffer>> substraitFromJsonToPb(std::string_view typeName, std::string_view json);

arrow::Result<std::string> substraitFromPbToJson(std::string_view typeName, const arrow::Buffer& buf);
std::string substraitFromJsonToPb(std::string_view typeName, std::string_view json);

// Write a Protobuf message into a specified file with JSON format.
// void MessageToJSONFile(const google::protobuf::Message& message, const std::string& file_path);
std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* data, int32_t size);

} // namespace gluten
242 changes: 0 additions & 242 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,191 +53,6 @@ static inline jmethodID getStaticMethodId(JNIEnv* env, jclass thisClass, const c
return ret;
}

static inline std::shared_ptr<arrow::DataType> getOffsetDataType(std::shared_ptr<arrow::DataType> parentType) {
switch (parentType->id()) {
case arrow::BinaryType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::BinaryType>::OffsetType>();
case arrow::LargeBinaryType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::LargeBinaryType>::OffsetType>();
case arrow::ListType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::ListType>::OffsetType>();
case arrow::LargeListType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::LargeListType>::OffsetType>();
default:
return nullptr;
}
}

template <typename T>
inline bool isFixedWidthType(T _) {
return std::is_base_of<arrow::FixedWidthType, T>::value;
}

static inline arrow::Status appendNodes(
std::shared_ptr<arrow::Array> column,
std::vector<std::pair<int64_t, int64_t>>* nodes) {
auto type = column->type();
(*nodes).push_back(std::make_pair(column->length(), column->null_count()));
switch (type->id()) {
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto listArray = std::dynamic_pointer_cast<arrow::ListArray>(column);
RETURN_NOT_OK(appendNodes(listArray->values(), nodes));
} break;
default: {
} break;
}
return arrow::Status::OK();
}

static inline arrow::Status appendBuffers(
std::shared_ptr<arrow::Array> column,
std::vector<std::shared_ptr<arrow::Buffer>>* buffers) {
auto type = column->type();
switch (type->id()) {
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto listArray = std::dynamic_pointer_cast<arrow::ListArray>(column);
(*buffers).push_back(listArray->null_bitmap());
(*buffers).push_back(listArray->value_offsets());
RETURN_NOT_OK(appendBuffers(listArray->values(), buffers));
} break;
default: {
for (auto& buffer : column->data()->buffers) {
(*buffers).push_back(buffer);
}
} break;
}
return arrow::Status::OK();
}

static inline arrow::Status fixOffsetBuffer(std::shared_ptr<arrow::Buffer>* inBuf, int fixRow) {
if ((*inBuf) == nullptr || (*inBuf)->size() == 0)
return arrow::Status::OK();
if ((*inBuf)->size() * 8 <= fixRow) {
ARROW_ASSIGN_OR_RAISE(auto valid_copy, arrow::AllocateBuffer((*inBuf)->size() + 1));
std::memcpy(valid_copy->mutable_data(), (*inBuf)->data(), static_cast<size_t>((*inBuf)->size()));
(*inBuf) = std::move(valid_copy);
}
arrow::bit_util::SetBitsTo(const_cast<uint8_t*>((*inBuf)->data()), fixRow, 1, true);
return arrow::Status::OK();
}

static inline arrow::Status makeArrayData(
std::shared_ptr<arrow::DataType> type,
int numRows,
std::vector<std::shared_ptr<arrow::Buffer>> inBufs,
int inBufsLen,
std::shared_ptr<arrow::ArrayData>* arrData,
int* bufIdxPtr) {
if (arrow::is_nested(type->id())) {
// Maybe ListType, MapType, StructType or UnionType
switch (type->id()) {
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto offsetDataType = getOffsetDataType(type);
auto listType = std::dynamic_pointer_cast<arrow::ListType>(type);
auto childType = listType->value_type();
std::shared_ptr<arrow::ArrayData> childArrayData, offsetArrayData;
// create offset array
// Chendi: For some reason, for ListArray::FromArrays will remove last
// row from offset array, refer to array_nested.cc CleanListOffsets
// function
RETURN_NOT_OK(fixOffsetBuffer(&inBufs[*bufIdxPtr], numRows));
RETURN_NOT_OK(makeArrayData(offsetDataType, numRows + 1, inBufs, inBufsLen, &offsetArrayData, bufIdxPtr));
auto offsetArray = arrow::MakeArray(offsetArrayData);
// create child data array
RETURN_NOT_OK(makeArrayData(childType, -1, inBufs, inBufsLen, &childArrayData, bufIdxPtr));
auto childArray = arrow::MakeArray(childArrayData);
auto listArray = arrow::ListArray::FromArrays(*offsetArray, *childArray).ValueOrDie();
*arrData = listArray->data();

} break;
default:
return arrow::Status::NotImplemented("MakeArrayData for type ", type->ToString(), " is not supported yet.");
}

} else {
int64_t nullCount = arrow::kUnknownNullCount;
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
if (*bufIdxPtr >= inBufsLen) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}
if (inBufs[*bufIdxPtr]->size() == 0) {
nullCount = 0;
}
buffers.push_back(inBufs[*bufIdxPtr]);
*bufIdxPtr += 1;

if (arrow::is_binary_like(type->id())) {
if (*bufIdxPtr >= inBufsLen) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}

buffers.push_back(inBufs[*bufIdxPtr]);
auto offsetsSize = inBufs[*bufIdxPtr]->size();
*bufIdxPtr += 1;
if (numRows == -1)
numRows = offsetsSize / 4 - 1;
}

if (*bufIdxPtr >= inBufsLen) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}
auto valueSize = inBufs[*bufIdxPtr]->size();
buffers.push_back(inBufs[*bufIdxPtr]);
*bufIdxPtr += 1;
if (numRows == -1) {
numRows = valueSize * 8 / arrow::bit_width(type->id());
}

*arrData = arrow::ArrayData::Make(type, numRows, std::move(buffers), nullCount);
}
return arrow::Status::OK();
}

static inline arrow::Status makeRecordBatch(
const std::shared_ptr<arrow::Schema>& schema,
int numRows,
std::vector<std::shared_ptr<arrow::Buffer>> inBufs,
int inBufsLen,
std::shared_ptr<arrow::RecordBatch>* batch) {
std::vector<std::shared_ptr<arrow::ArrayData>> arrays;
auto numFields = schema->num_fields();
int bufIdx = 0;

for (int i = 0; i < numFields; i++) {
auto field = schema->field(i);
std::shared_ptr<arrow::ArrayData> arrayData;
RETURN_NOT_OK(makeArrayData(field->type(), numRows, inBufs, inBufsLen, &arrayData, &bufIdx));
arrays.push_back(arrayData);
}

*batch = arrow::RecordBatch::Make(schema, numRows, arrays);
return arrow::Status::OK();
}

static inline arrow::Status makeRecordBatch(
const std::shared_ptr<arrow::Schema>& schema,
int numRows,
int64_t* inBufAddrs,
int64_t* inBufSizes,
int inBufsLen,
std::shared_ptr<arrow::RecordBatch>* batch) {
std::vector<std::shared_ptr<arrow::ArrayData>> arrays;
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
for (int i = 0; i < inBufsLen; i++) {
if (inBufAddrs[i] != 0) {
auto data =
std::shared_ptr<arrow::Buffer>(new arrow::Buffer(reinterpret_cast<uint8_t*>(inBufAddrs[i]), inBufSizes[i]));
buffers.push_back(data);
} else {
buffers.push_back(std::make_shared<arrow::Buffer>(nullptr, 0));
}
}
return makeRecordBatch(schema, numRows, buffers, inBufsLen, batch);
}

static inline std::string jStringToCString(JNIEnv* env, jstring string) {
int32_t jlen, clen;
clen = env->GetStringUTFLength(string);
Expand Down Expand Up @@ -292,63 +107,6 @@ static inline arrow::Result<arrow::Compression::type> getCompressionType(JNIEnv*
return compression_type;
}

static inline arrow::Status decompressBuffer(
const arrow::Buffer& buffer,
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer>* out,
arrow::MemoryPool* pool) {
const uint8_t* data = buffer.data();
int64_t compressedSize = buffer.size() - sizeof(int64_t);
int64_t uncompressedSize = arrow::bit_util::FromLittleEndian(arrow::util::SafeLoadAs<int64_t>(data));
ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressedSize, pool));

int64_t actualDecompressed;
ARROW_ASSIGN_OR_RAISE(
actualDecompressed,
codec->Decompress(compressedSize, data + sizeof(int64_t), uncompressedSize, uncompressed->mutable_data()));
if (actualDecompressed != uncompressedSize) {
return arrow::Status::Invalid(
"Failed to fully decompress buffer, expected ",
uncompressedSize,
" bytes but decompressed ",
actualDecompressed);
}
*out = std::move(uncompressed);
return arrow::Status::OK();
}

static inline arrow::Status decompressBuffers(
arrow::Compression::type compression,
const arrow::ipc::IpcReadOptions& options,
const uint8_t* bufMask,
std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
const std::vector<std::shared_ptr<arrow::Field>>& schemaFields) {
std::unique_ptr<arrow::util::Codec> codec;
ARROW_ASSIGN_OR_RAISE(codec, arrow::util::Codec::Create(compression));

auto decompressOne = [&buffers, &bufMask, &codec, &options](int i) {
if (buffers[i] == nullptr || buffers[i]->size() == 0) {
return arrow::Status::OK();
}
// if the buffer has been rebuilt to uncompressed on java side, return
if (arrow::bit_util::GetBit(bufMask, i)) {
ARROW_ASSIGN_OR_RAISE(auto valid_copy, buffers[i]->CopySlice(0, buffers[i]->size()));
buffers[i] = valid_copy;
return arrow::Status::OK();
}

if (buffers[i]->size() < 8) {
return arrow::Status::Invalid(
"Likely corrupted message, compressed buffers "
"are larger than 8 bytes by construction");
}
RETURN_NOT_OK(decompressBuffer(*buffers[i], codec.get(), &buffers[i], options.memory_pool));
return arrow::Status::OK();
};

return ::arrow::internal::OptionalParallelFor(options.use_threads, static_cast<int>(buffers.size()), decompressOne);
}

static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) {
int getEnvStat = vm->GetEnv(reinterpret_cast<void**>(out), jniVersion);
if (getEnvStat == JNI_EDETACHED) {
Expand Down
7 changes: 2 additions & 5 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,7 @@ Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKerne
}

auto backend = gluten::createBackend();
if (!backend->parsePlan(planData, planSize, stageId, partitionId, taskId)) {
gluten::jniThrow("Failed to parse plan.");
}
backend->parsePlan(planData, planSize, stageId, partitionId, taskId);

auto confs = getConfMap(env, confArr);

Expand Down Expand Up @@ -1001,8 +999,7 @@ Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper
jlong cSchema) {
JNI_METHOD_START
auto datasource = glutenDatasourceHolder.lookup(instanceId);
auto schema = datasource->inspectSchema();
GLUTEN_THROW_NOT_OK(arrow::ExportSchema(*schema.get(), reinterpret_cast<struct ArrowSchema*>(cSchema)));
datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
JNI_METHOD_END()
}

Expand Down
Loading

0 comments on commit 5facfbd

Please sign in to comment.