Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-1434] [VL] Remove unused arrow code and add GLUTEN_CHECK and GLUTEN_DCHECK #1611

Merged
merged 4 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions cpp/core/compute/Backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,20 @@ class Backend : public std::enable_shared_from_this<Backend> {
const std::vector<std::shared_ptr<ResultIterator>>& inputs,
const std::unordered_map<std::string, std::string>& sessionConf) = 0;

bool parsePlan(const uint8_t* data, int32_t size) {
return parsePlan(data, size, -1, -1, -1);
void parsePlan(const uint8_t* data, int32_t size) {
parsePlan(data, size, -1, -1, -1);
}

/// Parse and cache the plan.
/// Return true if parsed successfully.
bool parsePlan(const uint8_t* data, int32_t size, int32_t stageId, int32_t partitionId, int64_t taskId) {
void parsePlan(const uint8_t* data, int32_t size, int32_t stageId, int32_t partitionId, int64_t taskId) {
#ifdef GLUTEN_PRINT_DEBUG
auto buf = std::make_shared<arrow::Buffer>(data, size);
auto maybePlanJson = substraitFromPbToJson("Plan", *buf);
if (maybePlanJson.status().ok()) {
std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl;
std::cout << "Task stageId: " << stageId << ", partitionId: " << partitionId << ", taskId: " << taskId << "; "
<< maybePlanJson.ValueOrDie() << std::endl;
} else {
std::cout << "Error parsing substrait plan to json: " << maybePlanJson.status().ToString() << std::endl;
}
auto jsonPlan = substraitFromPbToJson("Plan", data, size);
std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl;
std::cout << "Task stageId: " << stageId << ", partitionId: " << partitionId << ", taskId: " << taskId << "; "
<< jsonPlan << std::endl;
#endif
return parseProtobuf(data, size, &substraitPlan_);
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed");
}

// Just for benchmark
Expand Down Expand Up @@ -109,7 +104,7 @@ class Backend : public std::enable_shared_from_this<Backend> {

virtual std::shared_ptr<Datasource>
getDatasource(const std::string& filePath, const std::string& fileName, std::shared_ptr<arrow::Schema> schema) {
return std::make_shared<Datasource>(filePath, fileName, schema);
throw GlutenException("Not implement getDatasource");
}

std::unordered_map<std::string, std::string> getConfMap() {
Expand Down
14 changes: 8 additions & 6 deletions cpp/core/compute/ProtobufUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <google/protobuf/util/type_resolver_util.h>
#include <fstream>

#include "utils/exception.h"

namespace gluten {

// Common for both projector and filters.
Expand All @@ -43,7 +45,7 @@ inline google::protobuf::util::TypeResolver* getGeneratedTypeResolver() {
return typeResolver.get();
}

arrow::Result<std::shared_ptr<arrow::Buffer>> substraitFromJsonToPb(std::string_view typeName, std::string_view json) {
std::string substraitFromJsonToPb(std::string_view typeName, std::string_view json) {
std::string typeUrl = "/substrait." + std::string(typeName);

google::protobuf::io::ArrayInputStream jsonStream{json.data(), static_cast<int>(json.size())};
Expand All @@ -55,22 +57,22 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> substraitFromJsonToPb(std::string_
google::protobuf::util::JsonToBinaryStream(getGeneratedTypeResolver(), typeUrl, &jsonStream, &outStream);

if (!status.ok()) {
return arrow::Status::Invalid("JsonToBinaryStream returned ", status);
throw GlutenException("JsonToBinaryStream returned " + status.ToString());
}
return arrow::Buffer::FromString(std::move(out));
return out;
}

arrow::Result<std::string> substraitFromPbToJson(std::string_view typeName, const arrow::Buffer& buf) {
std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* data, int32_t size) {
std::string typeUrl = "/substrait." + std::string(typeName);

google::protobuf::io::ArrayInputStream bufStream{buf.data(), static_cast<int>(buf.size())};
google::protobuf::io::ArrayInputStream bufStream{data, size};

std::string out;
google::protobuf::io::StringOutputStream outStream{&out};

auto status = google::protobuf::util::BinaryToJsonStream(getGeneratedTypeResolver(), typeUrl, &bufStream, &outStream);
if (!status.ok()) {
return arrow::Status::Invalid("BinaryToJsonStream returned ", status);
throw GlutenException("BinaryToJsonStream returned " + status.ToString());
}
return out;
}
Expand Down
12 changes: 2 additions & 10 deletions cpp/core/compute/ProtobufUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,15 @@

#pragma once

#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>

#include <google/protobuf/message.h>
#include <string>

namespace gluten {

// Common for both projector and filters.
bool parseProtobuf(const uint8_t* buf, int bufLen, google::protobuf::Message* msg);

arrow::Result<std::shared_ptr<arrow::Buffer>> substraitFromJsonToPb(std::string_view typeName, std::string_view json);

arrow::Result<std::string> substraitFromPbToJson(std::string_view typeName, const arrow::Buffer& buf);
std::string substraitFromJsonToPb(std::string_view typeName, std::string_view json);

// Write a Protobuf message into a specified file with JSON format.
// void MessageToJSONFile(const google::protobuf::Message& message, const std::string& file_path);
std::string substraitFromPbToJson(std::string_view typeName, const uint8_t* data, int32_t size);

} // namespace gluten
242 changes: 0 additions & 242 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,191 +53,6 @@ static inline jmethodID getStaticMethodId(JNIEnv* env, jclass thisClass, const c
return ret;
}

static inline std::shared_ptr<arrow::DataType> getOffsetDataType(std::shared_ptr<arrow::DataType> parentType) {
switch (parentType->id()) {
case arrow::BinaryType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::BinaryType>::OffsetType>();
case arrow::LargeBinaryType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::LargeBinaryType>::OffsetType>();
case arrow::ListType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::ListType>::OffsetType>();
case arrow::LargeListType::type_id:
return std::make_shared<arrow::TypeTraits<arrow::LargeListType>::OffsetType>();
default:
return nullptr;
}
}

template <typename T>
inline bool isFixedWidthType(T _) {
return std::is_base_of<arrow::FixedWidthType, T>::value;
}

static inline arrow::Status appendNodes(
std::shared_ptr<arrow::Array> column,
std::vector<std::pair<int64_t, int64_t>>* nodes) {
auto type = column->type();
(*nodes).push_back(std::make_pair(column->length(), column->null_count()));
switch (type->id()) {
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto listArray = std::dynamic_pointer_cast<arrow::ListArray>(column);
RETURN_NOT_OK(appendNodes(listArray->values(), nodes));
} break;
default: {
} break;
}
return arrow::Status::OK();
}

static inline arrow::Status appendBuffers(
std::shared_ptr<arrow::Array> column,
std::vector<std::shared_ptr<arrow::Buffer>>* buffers) {
auto type = column->type();
switch (type->id()) {
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto listArray = std::dynamic_pointer_cast<arrow::ListArray>(column);
(*buffers).push_back(listArray->null_bitmap());
(*buffers).push_back(listArray->value_offsets());
RETURN_NOT_OK(appendBuffers(listArray->values(), buffers));
} break;
default: {
for (auto& buffer : column->data()->buffers) {
(*buffers).push_back(buffer);
}
} break;
}
return arrow::Status::OK();
}

static inline arrow::Status fixOffsetBuffer(std::shared_ptr<arrow::Buffer>* inBuf, int fixRow) {
if ((*inBuf) == nullptr || (*inBuf)->size() == 0)
return arrow::Status::OK();
if ((*inBuf)->size() * 8 <= fixRow) {
ARROW_ASSIGN_OR_RAISE(auto valid_copy, arrow::AllocateBuffer((*inBuf)->size() + 1));
std::memcpy(valid_copy->mutable_data(), (*inBuf)->data(), static_cast<size_t>((*inBuf)->size()));
(*inBuf) = std::move(valid_copy);
}
arrow::bit_util::SetBitsTo(const_cast<uint8_t*>((*inBuf)->data()), fixRow, 1, true);
return arrow::Status::OK();
}

static inline arrow::Status makeArrayData(
std::shared_ptr<arrow::DataType> type,
int numRows,
std::vector<std::shared_ptr<arrow::Buffer>> inBufs,
int inBufsLen,
std::shared_ptr<arrow::ArrayData>* arrData,
int* bufIdxPtr) {
if (arrow::is_nested(type->id())) {
// Maybe ListType, MapType, StructType or UnionType
switch (type->id()) {
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto offsetDataType = getOffsetDataType(type);
auto listType = std::dynamic_pointer_cast<arrow::ListType>(type);
auto childType = listType->value_type();
std::shared_ptr<arrow::ArrayData> childArrayData, offsetArrayData;
// create offset array
// Chendi: For some reason, for ListArray::FromArrays will remove last
// row from offset array, refer to array_nested.cc CleanListOffsets
// function
RETURN_NOT_OK(fixOffsetBuffer(&inBufs[*bufIdxPtr], numRows));
RETURN_NOT_OK(makeArrayData(offsetDataType, numRows + 1, inBufs, inBufsLen, &offsetArrayData, bufIdxPtr));
auto offsetArray = arrow::MakeArray(offsetArrayData);
// create child data array
RETURN_NOT_OK(makeArrayData(childType, -1, inBufs, inBufsLen, &childArrayData, bufIdxPtr));
auto childArray = arrow::MakeArray(childArrayData);
auto listArray = arrow::ListArray::FromArrays(*offsetArray, *childArray).ValueOrDie();
*arrData = listArray->data();

} break;
default:
return arrow::Status::NotImplemented("MakeArrayData for type ", type->ToString(), " is not supported yet.");
}

} else {
int64_t nullCount = arrow::kUnknownNullCount;
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
if (*bufIdxPtr >= inBufsLen) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}
if (inBufs[*bufIdxPtr]->size() == 0) {
nullCount = 0;
}
buffers.push_back(inBufs[*bufIdxPtr]);
*bufIdxPtr += 1;

if (arrow::is_binary_like(type->id())) {
if (*bufIdxPtr >= inBufsLen) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}

buffers.push_back(inBufs[*bufIdxPtr]);
auto offsetsSize = inBufs[*bufIdxPtr]->size();
*bufIdxPtr += 1;
if (numRows == -1)
numRows = offsetsSize / 4 - 1;
}

if (*bufIdxPtr >= inBufsLen) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}
auto valueSize = inBufs[*bufIdxPtr]->size();
buffers.push_back(inBufs[*bufIdxPtr]);
*bufIdxPtr += 1;
if (numRows == -1) {
numRows = valueSize * 8 / arrow::bit_width(type->id());
}

*arrData = arrow::ArrayData::Make(type, numRows, std::move(buffers), nullCount);
}
return arrow::Status::OK();
}

static inline arrow::Status makeRecordBatch(
const std::shared_ptr<arrow::Schema>& schema,
int numRows,
std::vector<std::shared_ptr<arrow::Buffer>> inBufs,
int inBufsLen,
std::shared_ptr<arrow::RecordBatch>* batch) {
std::vector<std::shared_ptr<arrow::ArrayData>> arrays;
auto numFields = schema->num_fields();
int bufIdx = 0;

for (int i = 0; i < numFields; i++) {
auto field = schema->field(i);
std::shared_ptr<arrow::ArrayData> arrayData;
RETURN_NOT_OK(makeArrayData(field->type(), numRows, inBufs, inBufsLen, &arrayData, &bufIdx));
arrays.push_back(arrayData);
}

*batch = arrow::RecordBatch::Make(schema, numRows, arrays);
return arrow::Status::OK();
}

static inline arrow::Status makeRecordBatch(
const std::shared_ptr<arrow::Schema>& schema,
int numRows,
int64_t* inBufAddrs,
int64_t* inBufSizes,
int inBufsLen,
std::shared_ptr<arrow::RecordBatch>* batch) {
std::vector<std::shared_ptr<arrow::ArrayData>> arrays;
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
for (int i = 0; i < inBufsLen; i++) {
if (inBufAddrs[i] != 0) {
auto data =
std::shared_ptr<arrow::Buffer>(new arrow::Buffer(reinterpret_cast<uint8_t*>(inBufAddrs[i]), inBufSizes[i]));
buffers.push_back(data);
} else {
buffers.push_back(std::make_shared<arrow::Buffer>(nullptr, 0));
}
}
return makeRecordBatch(schema, numRows, buffers, inBufsLen, batch);
}

static inline std::string jStringToCString(JNIEnv* env, jstring string) {
int32_t jlen, clen;
clen = env->GetStringUTFLength(string);
Expand Down Expand Up @@ -292,63 +107,6 @@ static inline arrow::Result<arrow::Compression::type> getCompressionType(JNIEnv*
return compression_type;
}

static inline arrow::Status decompressBuffer(
const arrow::Buffer& buffer,
arrow::util::Codec* codec,
std::shared_ptr<arrow::Buffer>* out,
arrow::MemoryPool* pool) {
const uint8_t* data = buffer.data();
int64_t compressedSize = buffer.size() - sizeof(int64_t);
int64_t uncompressedSize = arrow::bit_util::FromLittleEndian(arrow::util::SafeLoadAs<int64_t>(data));
ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressedSize, pool));

int64_t actualDecompressed;
ARROW_ASSIGN_OR_RAISE(
actualDecompressed,
codec->Decompress(compressedSize, data + sizeof(int64_t), uncompressedSize, uncompressed->mutable_data()));
if (actualDecompressed != uncompressedSize) {
return arrow::Status::Invalid(
"Failed to fully decompress buffer, expected ",
uncompressedSize,
" bytes but decompressed ",
actualDecompressed);
}
*out = std::move(uncompressed);
return arrow::Status::OK();
}

static inline arrow::Status decompressBuffers(
arrow::Compression::type compression,
const arrow::ipc::IpcReadOptions& options,
const uint8_t* bufMask,
std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
const std::vector<std::shared_ptr<arrow::Field>>& schemaFields) {
std::unique_ptr<arrow::util::Codec> codec;
ARROW_ASSIGN_OR_RAISE(codec, arrow::util::Codec::Create(compression));

auto decompressOne = [&buffers, &bufMask, &codec, &options](int i) {
if (buffers[i] == nullptr || buffers[i]->size() == 0) {
return arrow::Status::OK();
}
// if the buffer has been rebuilt to uncompressed on java side, return
if (arrow::bit_util::GetBit(bufMask, i)) {
ARROW_ASSIGN_OR_RAISE(auto valid_copy, buffers[i]->CopySlice(0, buffers[i]->size()));
buffers[i] = valid_copy;
return arrow::Status::OK();
}

if (buffers[i]->size() < 8) {
return arrow::Status::Invalid(
"Likely corrupted message, compressed buffers "
"are larger than 8 bytes by construction");
}
RETURN_NOT_OK(decompressBuffer(*buffers[i], codec.get(), &buffers[i], options.memory_pool));
return arrow::Status::OK();
};

return ::arrow::internal::OptionalParallelFor(options.use_threads, static_cast<int>(buffers.size()), decompressOne);
}

static inline void attachCurrentThreadAsDaemonOrThrow(JavaVM* vm, JNIEnv** out) {
int getEnvStat = vm->GetEnv(reinterpret_cast<void**>(out), jniVersion);
if (getEnvStat == JNI_EDETACHED) {
Expand Down
7 changes: 2 additions & 5 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,7 @@ Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKerne
}

auto backend = gluten::createBackend();
if (!backend->parsePlan(planData, planSize, stageId, partitionId, taskId)) {
gluten::jniThrow("Failed to parse plan.");
}
backend->parsePlan(planData, planSize, stageId, partitionId, taskId);

auto confs = getConfMap(env, confArr);

Expand Down Expand Up @@ -1001,8 +999,7 @@ Java_io_glutenproject_spark_sql_execution_datasources_velox_DatasourceJniWrapper
jlong cSchema) {
JNI_METHOD_START
auto datasource = glutenDatasourceHolder.lookup(instanceId);
auto schema = datasource->inspectSchema();
GLUTEN_THROW_NOT_OK(arrow::ExportSchema(*schema.get(), reinterpret_cast<struct ArrowSchema*>(cSchema)));
datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
JNI_METHOD_END()
}

Expand Down
Loading