Skip to content

Commit

Permalink
[Feature] Support query deletion vector for delta lake (#53766)
Browse files Browse the repository at this point in the history
Signed-off-by: Youngwb <[email protected]>
(cherry picked from commit 66b78c4)
  • Loading branch information
Youngwb authored and mergify[bot] committed Dec 13, 2024
1 parent d6ba730 commit c43565d
Show file tree
Hide file tree
Showing 20 changed files with 528 additions and 17 deletions.
1 change: 1 addition & 0 deletions be/src/connector/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ add_library(Connector STATIC
utils.cpp
async_flush_stream_poller.cpp
sink_memory_manager.cpp
deletion_vector/deletion_vector.cpp
)
170 changes: 170 additions & 0 deletions be/src/connector/deletion_vector/deletion_vector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "deletion_vector.h"

#include <roaring/roaring64.h>

#include "util/base85.h"
#include "util/uuid_generator.h"

namespace starrocks {

Status starrocks::DeletionVector::fill_row_indexes(std::set<int64_t>* need_skip_rowids) {
if (_deletion_vector_descriptor->__isset.cardinality && _deletion_vector_descriptor->__isset.cardinality == 0) {
return Status::OK();
} else if (is_inline()) {
return deserialized_inline_dv(_deletion_vector_descriptor->pathOrInlineDv, need_skip_rowids);
} else {
std::shared_ptr<io::SharedBufferedInputStream> shared_buffered_input_stream = nullptr;
std::shared_ptr<io::CacheInputStream> cache_input_stream = nullptr;
HdfsScanStats app_scan_stats;
HdfsScanStats fs_scan_stats;

ASSIGN_OR_RETURN(auto path, get_absolute_path(_params.table_location));
int64_t offset = _deletion_vector_descriptor->offset;
int64_t length = _deletion_vector_descriptor->sizeInBytes;

ASSIGN_OR_RETURN(auto dv_file, open_random_access_file(path, fs_scan_stats, app_scan_stats,
shared_buffered_input_stream, cache_input_stream));
// Check the dv size
uint32_t size_from_deletion_vector_file;
RETURN_IF_ERROR(dv_file->read_at_fully(offset, &size_from_deletion_vector_file, DV_SIZE_LENGTH));
offset += DV_SIZE_LENGTH;
// the size_from_deletion_vector_file is big endian byte order
if (LittleEndian::IsLittleEndian()) {
size_from_deletion_vector_file = BigEndian::ToHost32(size_from_deletion_vector_file);
}

if (size_from_deletion_vector_file != length) {
std::stringstream ss;
ss << "DV size mismatch, expected : " << length << " , actual : " << size_from_deletion_vector_file;
return Status::RuntimeError(ss.str());
}

// Check the correctness of magic number
uint32_t magic_number_from_deletion_vector_file;
RETURN_IF_ERROR(dv_file->read_at_fully(offset, &magic_number_from_deletion_vector_file, MAGIC_NUMBER_LENGTH));
// magic_number_from_deletion_vector_file is little endian byte order
if (!LittleEndian::IsLittleEndian()) {
magic_number_from_deletion_vector_file = LittleEndian::ToHost32(magic_number_from_deletion_vector_file);
}
offset += MAGIC_NUMBER_LENGTH;

int64_t serialized_bitmap_length = length - MAGIC_NUMBER_LENGTH;
std::unique_ptr<char[]> deletion_vector(new char[serialized_bitmap_length]);
RETURN_IF_ERROR(dv_file->read_at_fully(offset, deletion_vector.get(), serialized_bitmap_length));

return deserialized_deletion_vector(magic_number_from_deletion_vector_file, std::move(deletion_vector),
serialized_bitmap_length, need_skip_rowids);
}
}

StatusOr<std::unique_ptr<RandomAccessFile>> DeletionVector::open_random_access_file(
const std::string& file_path, HdfsScanStats& fs_scan_stats, HdfsScanStats& app_scan_stats,
std::shared_ptr<io::SharedBufferedInputStream>& shared_buffered_input_stream,
std::shared_ptr<io::CacheInputStream>& cache_input_stream) const {
const OpenFileOptions options{.fs = _params.fs,
.path = file_path,
.fs_stats = &fs_scan_stats,
.app_stats = &app_scan_stats,
.datacache_options = _params.datacache_options};
ASSIGN_OR_RETURN(auto file,
HdfsScanner::create_random_access_file(shared_buffered_input_stream, cache_input_stream, options));
std::vector<io::SharedBufferedInputStream::IORange> io_ranges{};
int64_t offset = _deletion_vector_descriptor->offset;
int64_t length = _deletion_vector_descriptor->sizeInBytes + DV_SIZE_LENGTH;
while (offset < length) {
const int64_t remain_length = std::min(static_cast<int64_t>(config::io_coalesce_read_max_buffer_size), length);
io_ranges.emplace_back(offset, remain_length);
offset += remain_length;
}

RETURN_IF_ERROR(shared_buffered_input_stream->set_io_ranges(io_ranges));
return file;
}

Status DeletionVector::deserialized_inline_dv(std::string& encoded_bitmap_data,
std::set<int64_t>* need_skip_rowids) const {
ASSIGN_OR_RETURN(auto decoded_bitmap_data, base85_decode(encoded_bitmap_data));
uint32_t inline_magic_number;
memcpy(&inline_magic_number, decoded_bitmap_data.data(), DeletionVector::MAGIC_NUMBER_LENGTH);

int64_t serialized_bitmap_length = decoded_bitmap_data.size() - MAGIC_NUMBER_LENGTH;
std::unique_ptr<char[]> deletion_vector(new char[serialized_bitmap_length]);
memcpy(deletion_vector.get(), decoded_bitmap_data.data() + MAGIC_NUMBER_LENGTH, serialized_bitmap_length);

return deserialized_deletion_vector(inline_magic_number, std::move(deletion_vector), serialized_bitmap_length,
need_skip_rowids);
}

Status DeletionVector::deserialized_deletion_vector(uint32_t magic_number, std::unique_ptr<char[]> serialized_dv,
int64_t serialized_bitmap_length,
std::set<int64_t>* need_skip_rowids) const {
if (magic_number != MAGIC_NUMBER) {
std::stringstream ss;
ss << "Unexpected magic number : " << magic_number;
return Status::RuntimeError(ss.str());
}

// Construct the roaring bitmap of corresponding deletion vector
roaring64_bitmap_t* bitmap =
roaring64_bitmap_portable_deserialize_safe(serialized_dv.get(), serialized_bitmap_length);
if (bitmap == nullptr) {
return Status::RuntimeError("deserialize roaring64 bitmap error");
}

// Construct _need_skip_rowids from bitmap
uint64_t bitmap_cardinality = roaring64_bitmap_get_cardinality(bitmap);
std::unique_ptr<uint64_t[]> bitmap_array(new uint64_t[bitmap_cardinality]);
roaring64_bitmap_to_uint64_array(bitmap, bitmap_array.get());
need_skip_rowids->insert(bitmap_array.get(), bitmap_array.get() + bitmap_cardinality);

roaring64_bitmap_free(bitmap);
return Status::OK();
}

StatusOr<std::string> DeletionVector::get_absolute_path(const std::string& table_location) const {
std::string& storage_type = _deletion_vector_descriptor->storageType;
std::string& path = _deletion_vector_descriptor->pathOrInlineDv;
if (storage_type == "u") {
uint32_t random_prefix_len = path.length() - ENCODED_UUID_LENGTH;
std::string random_prefix = path.substr(0, random_prefix_len);
std::string encoded_uuid = path.substr(random_prefix_len);
ASSIGN_OR_RETURN(auto decoded_uuid, base85_decode(encoded_uuid));
if (decoded_uuid.length() < 16) {
return Status::RuntimeError("decoded uuid length less than 16");
}
boost::uuids::uuid uuid{};
memcpy(uuid.data, decoded_uuid.data(), 8);
memcpy(uuid.data + 8, decoded_uuid.data() + 8, 8);
return assemble_deletion_vector_path(table_location, boost::uuids::to_string(uuid), random_prefix);
} else if (storage_type == "p") {
return path;
} else {
return Status::RuntimeError(fmt::format("unsupported storage type {}", storage_type));
}
}

std::string DeletionVector::assemble_deletion_vector_path(const string& table_location, string&& uuid,
string& prefix) const {
std::string file_name = fmt::format("deletion_vector_{}.bin", uuid);
if (prefix.empty()) {
return fmt::format("{}/{}", table_location, file_name);
} else {
return fmt::format("{}/{}/{}", table_location, prefix, file_name);
}
}

} // namespace starrocks
57 changes: 57 additions & 0 deletions be/src/connector/deletion_vector/deletion_vector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "common/status.h"
#include "exec/hdfs_scanner.h"
#include "fs/fs.h"

namespace starrocks {

class DeletionVector {
public:
DeletionVector(const HdfsScannerParams& scanner_params)
: _deletion_vector_descriptor(scanner_params.deletion_vector_descriptor), _params(scanner_params) {}

Status fill_row_indexes(std::set<int64_t>* need_skip_rowids);
Status deserialized_inline_dv(std::string& encoded_bitmap_data, std::set<int64_t>* need_skip_rowids) const;
StatusOr<std::string> get_absolute_path(const std::string& table_location) const;

const bool is_inline() {
return _deletion_vector_descriptor->__isset.storageType && _deletion_vector_descriptor->storageType == "i";
}

static const int32_t DV_SIZE_LENGTH = 4;
static const int32_t MAGIC_NUMBER_LENGTH = 4;
static const uint32_t MAGIC_NUMBER = 1681511377;
// UUIDs always encode into 20 characters.
static const uint32_t ENCODED_UUID_LENGTH = 20;

private:
StatusOr<std::unique_ptr<RandomAccessFile>> open_random_access_file(
const std::string& file_path, HdfsScanStats& fs_scan_stats, HdfsScanStats& app_scan_stats,
std::shared_ptr<io::SharedBufferedInputStream>& shared_buffered_input_stream,
std::shared_ptr<io::CacheInputStream>& cache_input_stream) const;

Status deserialized_deletion_vector(uint32_t magic_number, std::unique_ptr<char[]> serialized_dv,
int64_t serialized_bitmap_length, std::set<int64_t>* need_skip_rowids) const;

std::string assemble_deletion_vector_path(const std::string& table_location, std::string&& uuid,
std::string& prefix) const;

const std::shared_ptr<TDeletionVectorDescriptor> _deletion_vector_descriptor;
const HdfsScannerParams& _params;
};
} // namespace starrocks
6 changes: 6 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
scanner_params.fs = _pool.add(fs.release());
scanner_params.path = native_file_path;
scanner_params.file_size = _scan_range.file_length;
scanner_params.table_location = _hive_table->get_base_path();
scanner_params.tuple_desc = _tuple_desc;
scanner_params.materialize_slots = _materialize_slots;
scanner_params.materialize_index_in_chunk = _materialize_index_in_chunk;
Expand Down Expand Up @@ -645,6 +646,11 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {
scanner_params.deletes.emplace_back(&delete_file);
}

if (scan_range.__isset.deletion_vector_descriptor) {
scanner_params.deletion_vector_descriptor =
std::make_shared<TDeletionVectorDescriptor>(scan_range.deletion_vector_descriptor);
}

if (scan_range.__isset.paimon_deletion_file && !scan_range.paimon_deletion_file.path.empty()) {
scanner_params.paimon_deletion_file = std::make_shared<TPaimonDeletionFile>(scan_range.paimon_deletion_file);
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,10 @@ StatusOr<std::unique_ptr<RandomAccessFile>> HdfsScanner::create_random_access_fi
std::shared_ptr<io::SharedBufferedInputStream>& shared_buffered_input_stream,
std::shared_ptr<io::CacheInputStream>& cache_input_stream, const OpenFileOptions& options) {
ASSIGN_OR_RETURN(std::unique_ptr<RandomAccessFile> raw_file, options.fs->new_random_access_file(options.path))
const int64_t file_size = options.file_size;
int64_t file_size = options.file_size;
if (file_size < 0) {
ASSIGN_OR_RETURN(file_size, raw_file->stream()->get_size());
}
raw_file->set_size(file_size);
const std::string& filename = raw_file->filename();

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ struct HdfsScannerParams {
std::string path;
// The file size. -1 means unknown.
int64_t file_size = -1;
// the table location
std::string table_location;

const TupleDescriptor* tuple_desc = nullptr;

Expand Down Expand Up @@ -213,6 +215,8 @@ struct HdfsScannerParams {

std::vector<const TIcebergDeleteFile*> deletes;

std::shared_ptr<TDeletionVectorDescriptor> deletion_vector_descriptor = nullptr;

const TIcebergSchema* iceberg_schema = nullptr;

bool is_lazy_materialization_slot(SlotId slot_id) const;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/hdfs_scanner_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/hdfs_scanner_parquet.h"

#include "connector/deletion_vector/deletion_vector.h"
#include "exec/hdfs_scanner.h"
#include "exec/iceberg/iceberg_delete_builder.h"
#include "exec/paimon/paimon_delete_file_builder.h"
Expand Down Expand Up @@ -45,6 +46,9 @@ Status HdfsParquetScanner::do_init(RuntimeState* runtime_state, const HdfsScanne
std::unique_ptr<PaimonDeleteFileBuilder> paimon_delete_file_builder(
new PaimonDeleteFileBuilder(scanner_params.fs, &_need_skip_rowids));
RETURN_IF_ERROR(paimon_delete_file_builder->build(scanner_params.paimon_deletion_file.get()));
} else if (scanner_params.deletion_vector_descriptor != nullptr) {
std::unique_ptr<DeletionVector> dv = std::make_unique<DeletionVector>(scanner_params);
RETURN_IF_ERROR(dv->fill_row_indexes(&_need_skip_rowids));
}
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ set(UTIL_FILES
arrow/utils.cpp
await.cpp
bfd_parser.cpp
base85.cpp
compression/block_compression.cpp
compression/compression_context_pool_singletons.cpp
compression/stream_compression.cpp
Expand Down
77 changes: 77 additions & 0 deletions be/src/util/base85.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The code in this file was modified from https://github.com/artemkin/z85/blob/master/src/z85.c
// Copyright 2013 Stanislav Artemkin <[email protected]>.

#include "base85.h"

typedef unsigned char byte;

static byte base256[] = {0x00, 0x44, 0x00, 0x54, 0x53, 0x52, 0x48, 0x00, 0x4B, 0x4C, 0x46, 0x41, 0x00, 0x3F,
0x3E, 0x45, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x40, 0x00,
0x49, 0x42, 0x4A, 0x47, 0x51, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C,
0x2D, 0x2E, 0x2F, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A,
0x3B, 0x3C, 0x3D, 0x4D, 0x00, 0x4E, 0x43, 0x00, 0x00, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E,
0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C,
0x1D, 0x1E, 0x1F, 0x20, 0x21, 0x22, 0x23, 0x4F, 0x00, 0x50, 0x00, 0x00};

namespace starrocks {

char* base85_decode_impl(const char* source, const char* sourceEnd, char* dest) {
byte* src = (byte*)source;
byte* end = (byte*)sourceEnd;
byte* dst = (byte*)dest;
uint32_t value;

for (; src != end; src += 5, dst += 4) {
value = base256[(src[0] - 32) & 127];
value = value * 85 + base256[(src[1] - 32) & 127];
value = value * 85 + base256[(src[2] - 32) & 127];
value = value * 85 + base256[(src[3] - 32) & 127];
value = value * 85 + base256[(src[4] - 32) & 127];

// pack big-endian frame
dst[0] = value >> 24;
dst[1] = (byte)(value >> 16);
dst[2] = (byte)(value >> 8);
dst[3] = (byte)(value);
}

return (char*)dst;
}

StatusOr<std::string> base85_decode(const std::string& source) {
if (source.empty()) {
return Status::RuntimeError("base85 encoded source is empty");
}

size_t input_size = source.size();
if (input_size % 5) {
return Status::RuntimeError("base85 encoded source size error");
}

std::string buf;
size_t buf_size = input_size * 4 / 5;
buf.resize(buf_size);
char* dest = &buf[0];

const size_t decodedBytes = base85_decode_impl(source.data(), source.data() + input_size, dest) - dest;
if (decodedBytes == 0) {
return Status::RuntimeError("base85 decoded failed");
}

return buf;
}
} // namespace starrocks
Loading

0 comments on commit c43565d

Please sign in to comment.