Skip to content

Commit

Permalink
[Enhancement] add data cache to pos-delete file
Browse files Browse the repository at this point in the history
Signed-off-by: stephen <[email protected]>
  • Loading branch information
stephen-shelby committed Oct 14, 2024
1 parent 22e44de commit 9e5f0bd
Show file tree
Hide file tree
Showing 17 changed files with 479 additions and 330 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Status HdfsScanner::_build_scanner_context() {
ctx.extended_columns.emplace_back(std::move(column));
}

ctx.tuple_desc = _scanner_params.tuple_desc;
ctx.slot_descs = _scanner_params.tuple_desc->slots();
ctx.scan_range = _scanner_params.scan_range;
ctx.runtime_filter_collector = _scanner_params.runtime_filter_collector;
ctx.min_max_conjunct_ctxs = _scanner_params.min_max_conjunct_ctxs;
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ struct HdfsScannerContext {
return case_sensitive ? name : boost::algorithm::to_lower_copy(name);
}

const TupleDescriptor* tuple_desc = nullptr;
std::vector<SlotDescriptor*> slot_descs;
std::unordered_map<SlotId, std::vector<ExprContext*>> conjunct_ctxs_by_slot;

// materialized column read from parquet file
Expand Down Expand Up @@ -381,10 +381,11 @@ class HdfsScanner {
void move_split_tasks(std::vector<pipeline::ScanSplitContextPtr>* split_tasks);
bool has_split_tasks() const { return _scanner_ctx.has_split_tasks; }

protected:
static StatusOr<std::unique_ptr<RandomAccessFile>> create_random_access_file(
std::shared_ptr<io::SharedBufferedInputStream>& shared_buffered_input_stream,
std::shared_ptr<io::CacheInputStream>& cache_input_stream, const OpenFileOptions& options);

protected:
Status open_random_access_file();
static CompressionTypePB get_compression_type_from_path(const std::string& filename);

Expand Down
16 changes: 12 additions & 4 deletions be/src/exec/hdfs_scanner_orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,20 @@ bool OrcRowReaderFilter::filterOnPickStringDictionary(
Status HdfsOrcScanner::build_iceberg_delete_builder() {
if (_scanner_params.deletes.empty()) return Status::OK();
SCOPED_RAW_TIMER(&_app_stats.iceberg_delete_file_build_ns);
const IcebergDeleteBuilder iceberg_delete_builder(_scanner_params.fs, _scanner_params.path, &_need_skip_rowids,
_scanner_params.datacache_options);
auto iceberg_delete_builder =
std::make_unique<IcebergDeleteBuilder>(&_need_skip_rowids, _runtime_state, _scanner_params);

for (const auto& tdelete_file : _scanner_params.deletes) {
RETURN_IF_ERROR(iceberg_delete_builder.build_orc(_runtime_state->timezone(), *tdelete_file));
for (const auto& delete_file : _scanner_params.deletes) {
if (delete_file->file_content == TIcebergFileContent::POSITION_DELETES) {
RETURN_IF_ERROR(iceberg_delete_builder->build_orc(*delete_file));
} else {
const auto s = strings::Substitute("Unsupported iceberg file content: $0 in the scanner thread",
delete_file->file_content);
LOG(WARNING) << s;
return Status::InternalError(s);
}
}

_app_stats.iceberg_delete_files_per_scan += _scanner_params.deletes.size();
return Status::OK();
}
Expand Down
15 changes: 11 additions & 4 deletions be/src/exec/hdfs_scanner_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ static const std::string kParquetProfileSectionPrefix = "Parquet";
Status HdfsParquetScanner::do_init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params) {
if (!scanner_params.deletes.empty()) {
SCOPED_RAW_TIMER(&_app_stats.iceberg_delete_file_build_ns);
std::unique_ptr<IcebergDeleteBuilder> iceberg_delete_builder(new IcebergDeleteBuilder(
scanner_params.fs, scanner_params.path, &_need_skip_rowids, scanner_params.datacache_options));
for (const auto& tdelete_file : scanner_params.deletes) {
RETURN_IF_ERROR(iceberg_delete_builder->build_parquet(runtime_state->timezone(), *tdelete_file));
auto iceberg_delete_builder =
std::make_unique<IcebergDeleteBuilder>(&_need_skip_rowids, runtime_state, scanner_params);
for (const auto& delete_file : scanner_params.deletes) {
if (delete_file->file_content == TIcebergFileContent::POSITION_DELETES) {
RETURN_IF_ERROR(iceberg_delete_builder->build_parquet(*delete_file));
} else {
const auto s = strings::Substitute("Unsupported iceberg file content: $0 in the scanner thread",
delete_file->file_content);
LOG(WARNING) << s;
return Status::InternalError(s);
}
}
_app_stats.iceberg_delete_files_per_scan += scanner_params.deletes.size();
} else if (scanner_params.paimon_deletion_file != nullptr) {
Expand Down
292 changes: 235 additions & 57 deletions be/src/exec/iceberg/iceberg_delete_builder.cpp

Large diffs are not rendered by default.

102 changes: 19 additions & 83 deletions be/src/exec/iceberg/iceberg_delete_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,104 +14,40 @@

#pragma once

#include <utility>

#include "block_cache/cache_options.h"
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "fs/fs.h"
#include "gutil/strings/substitute.h"
#include "exec/hdfs_scanner.h"
#include "runtime/descriptors.h"

namespace starrocks {
struct IcebergColumnMeta;

class DeleteBuilder {
public:
DeleteBuilder(FileSystem* fs, const DataCacheOptions& datacache_options)
: _fs(fs), _datacache_options(datacache_options){};
virtual ~DeleteBuilder() = default;

protected:
FileSystem* _fs;
const DataCacheOptions& _datacache_options;
};

class PositionDeleteBuilder : public DeleteBuilder {
public:
PositionDeleteBuilder(FileSystem* fs, const DataCacheOptions& datacache_options)
: DeleteBuilder(fs, datacache_options) {}
~PositionDeleteBuilder() override = default;

virtual Status build(const std::string& timezone, const std::string& file_path, int64_t file_length,
std::set<int64_t>* need_skip_rowids) = 0;
};

class ORCPositionDeleteBuilder final : public PositionDeleteBuilder {
class IcebergDeleteBuilder {
public:
ORCPositionDeleteBuilder(FileSystem* fs, const DataCacheOptions& datacache_options, std::string datafile_path)
: PositionDeleteBuilder(fs, datacache_options), _datafile_path(std::move(datafile_path)) {}
~ORCPositionDeleteBuilder() override = default;

Status build(const std::string& timezone, const std::string& delete_file_path, int64_t file_length,
std::set<int64_t>* need_skip_rowids) override;
IcebergDeleteBuilder(std::set<int64_t>* need_skip_rowids, RuntimeState* state,
const HdfsScannerParams& scanner_params)
: _need_skip_rowids(need_skip_rowids), _params(scanner_params), _runtime_state(state) {}

private:
std::string _datafile_path;
};
~IcebergDeleteBuilder() = default;

class ParquetPositionDeleteBuilder final : public PositionDeleteBuilder {
public:
ParquetPositionDeleteBuilder(FileSystem* fs, const DataCacheOptions& datacache_options, std::string datafile_path)
: PositionDeleteBuilder(fs, datacache_options), _datafile_path(std::move(datafile_path)) {}
~ParquetPositionDeleteBuilder() override = default;
Status build_orc(const TIcebergDeleteFile& delete_file) const;

Status build(const std::string& timezone, const std::string& delete_file_path, int64_t file_length,
std::set<int64_t>* need_skip_rowids) override;
Status build_parquet(const TIcebergDeleteFile& delete_file) const;

private:
std::string _datafile_path;
};
StatusOr<std::unique_ptr<RandomAccessFile>> open_random_access_file(
const TIcebergDeleteFile& delete_file, HdfsScanStats& fs_scan_stats, HdfsScanStats& app_scan_stats,
std::shared_ptr<io::SharedBufferedInputStream>& shared_buffered_input_stream,
std::shared_ptr<io::CacheInputStream>& cache_input_stream) const;

class IcebergDeleteBuilder {
public:
IcebergDeleteBuilder(FileSystem* fs, std::string datafile_path, std::set<int64_t>* need_skip_rowids,
const DataCacheOptions& datacache_options = DataCacheOptions())
: _fs(fs),
_datafile_path(std::move(datafile_path)),
_need_skip_rowids(need_skip_rowids),
_datacache_options(datacache_options) {}
~IcebergDeleteBuilder() = default;

Status build_orc(const std::string& timezone, const TIcebergDeleteFile& delete_file) const {
if (delete_file.file_content == TIcebergFileContent::POSITION_DELETES) {
return ORCPositionDeleteBuilder(_fs, _datacache_options, _datafile_path)
.build(timezone, delete_file.full_path, delete_file.length, _need_skip_rowids);
} else {
const auto s = strings::Substitute("Unsupported iceberg file content: $0 in the scanner thread",
delete_file.file_content);
LOG(WARNING) << s;
return Status::InternalError(s);
}
}
static void update_delete_file_io_counter(
RuntimeProfile* parent_profile, const HdfsScanStats& app_stats, const HdfsScanStats& fs_stats,
const std::shared_ptr<io::CacheInputStream>& cache_input_stream,
const std::shared_ptr<io::SharedBufferedInputStream>& shared_buffered_input_stream);
Status fill_skip_rowids(const ChunkPtr& chunk) const;

Status build_parquet(const std::string& timezone, const TIcebergDeleteFile& delete_file) const {
if (delete_file.file_content == TIcebergFileContent::POSITION_DELETES) {
return ParquetPositionDeleteBuilder(_fs, _datacache_options, _datafile_path)
.build(timezone, delete_file.full_path, delete_file.length, _need_skip_rowids);
} else {
auto s = strings::Substitute("Unsupported iceberg file content: $0 in the scanner thread",
delete_file.file_content);
LOG(WARNING) << s;
return Status::InternalError(s);
}
}

private:
FileSystem* _fs;
std::string _datafile_path;
std::set<int64_t>* _need_skip_rowids;
const DataCacheOptions _datacache_options;
const HdfsScannerParams& _params;
RuntimeState* _runtime_state;
};

class IcebergDeleteFileMeta {
Expand Down
13 changes: 8 additions & 5 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ bool FileReader::_filter_group_with_bloom_filter_min_max_conjuncts(const tparque
if (_scanner_ctx->runtime_filter_collector) {
std::vector<SlotDescriptor*> min_max_slots(1);

const TupleDescriptor& tuple_desc = *(_scanner_ctx->tuple_desc);
const std::vector<SlotDescriptor*>& slots = tuple_desc.slots();
const std::vector<SlotDescriptor*>& slots = _scanner_ctx->slot_descs;

for (auto& it : _scanner_ctx->runtime_filter_collector->descriptors()) {
RuntimeFilterProbeDescriptor* rf_desc = it.second;
Expand Down Expand Up @@ -470,8 +469,13 @@ bool FileReader::_filter_group_with_more_filter(const tparquet::RowGroup& row_gr
StatisticsHelper::StatSupportedFilter filter_type;
for (auto ctx : kv.second) {
if (StatisticsHelper::can_be_used_for_statistics_filter(ctx, filter_type)) {
const TupleDescriptor& tuple_desc = *(_scanner_ctx->tuple_desc);
SlotDescriptor* slot = tuple_desc.get_slot_by_id(kv.first);
SlotDescriptor* slot = nullptr;
for (auto s : _scanner_ctx->slot_descs) {
if (s->id() == kv.first) {
slot = s;
}
}

if (UNLIKELY(slot == nullptr)) {
// it shouldn't be here, just some defensive code
DCHECK(false) << "couldn't find slot id " << kv.first << " in tuple desc";
Expand Down Expand Up @@ -656,7 +660,6 @@ Status FileReader::_init_group_readers() {
const HdfsScannerContext& fd_scanner_ctx = *_scanner_ctx;

// _group_reader_param is used by all group readers
_group_reader_param.tuple_desc = fd_scanner_ctx.tuple_desc;
_group_reader_param.conjunct_ctxs_by_slot = fd_scanner_ctx.conjunct_ctxs_by_slot;
_group_reader_param.timezone = fd_scanner_ctx.timezone;
_group_reader_param.stats = fd_scanner_ctx.stats;
Expand Down
1 change: 0 additions & 1 deletion be/src/formats/parquet/group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct GroupReaderParam {
const SlotId slot_id() const { return slot_desc->id(); }
};

const TupleDescriptor* tuple_desc = nullptr;
// conjunct_ctxs that column is materialized in group reader
std::unordered_map<SlotId, std::vector<ExprContext*>> conjunct_ctxs_by_slot;

Expand Down
1 change: 0 additions & 1 deletion be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ set(EXEC_FILES
./exec/es/es_query_builder_test.cpp
./exec/es/es_scan_reader_test.cpp
./exec/es/es_scroll_parser_test.cpp
./exec/iceberg/iceberg_delete_builder_test.cpp
./exec/iceberg/iceberg_table_sink_operator_test.cpp
./exec/paimon/paimon_delete_file_builder_test.cpp
./exec/workgroup/scan_task_queue_test.cpp
Expand Down
45 changes: 0 additions & 45 deletions be/test/exec/iceberg/iceberg_delete_builder_test.cpp

This file was deleted.

5 changes: 3 additions & 2 deletions be/test/formats/parquet/column_converter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ class ColumnConverterTest : public testing::Test {

Utils::SlotDesc slot_descs[] = {{col_name, col_type}, {""}};

ctx->tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs);
Utils::make_column_info_vector(ctx->tuple_desc, &ctx->materialized_columns);
TupleDescriptor* tuple_desc = Utils::create_tuple_descriptor(_runtime_state, &_pool, slot_descs);
Utils::make_column_info_vector(tuple_desc, &ctx->materialized_columns);
ctx->slot_descs = tuple_desc->slots();
ctx->scan_range = (_create_scan_range(filepath));
// --------------finish init context---------------

Expand Down
Loading

0 comments on commit 9e5f0bd

Please sign in to comment.