Skip to content

Commit

Permalink
[improvement] improvement for light weight schema change (apache#10860)
Browse files Browse the repository at this point in the history
* improvement for dynamic schema
not use schema as lru cache key any more.
load segment just use the rowset's original schema not the current read schema.
generate column reader and column iterator using the original schema, using the read schema if it is a new column.
using column unique id as key instead of column ordinals.
Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei authored and miswujian committed Jul 28, 2022
1 parent 3e2bade commit f3dc105
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 47 deletions.
3 changes: 3 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "vec/core/block.h"

namespace doris {
Expand Down Expand Up @@ -83,6 +84,8 @@ class StorageReadOptions {
OlapReaderStatistics* stats = nullptr;
bool use_page_cache = false;
int block_row_max = 4096;

const TabletSchema* tablet_schema = nullptr;
};

// Used to read data in RowBlockV2 one by one
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,15 @@ Status BetaRowset::do_load(bool /*use_cache*/) {
return Status::OK();
}

Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
const TabletSchema* read_tablet_schema) {
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
auto fs = _rowset_meta->fs();
if (!fs) {
if (!fs || _schema == nullptr) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
}
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto seg_path = segment_file_path(seg_id);
std::shared_ptr<segment_v2::Segment> segment;
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, read_tablet_schema, &segment);
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, &segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset "
<< unique_id() << " : " << s.to_string();
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ class BetaRowset : public Rowset {

bool check_file_exist() override;

Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
const TabletSchema* read_tablet_schema);
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);

protected:
BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
}
}
read_options.use_page_cache = read_context->use_page_cache;
read_options.tablet_schema = read_context->tablet_schema;

// load segments
RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
_rowset, &_segment_cache_handle, read_context->tablet_schema,
_rowset, &_segment_cache_handle,
read_context->reader_type == ReaderType::READER_QUERY));

// create iterator for each segment
Expand Down
36 changes: 22 additions & 14 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
// trying to prune the current segment by segment-level zone map
if (read_options.conditions != nullptr) {
for (auto& column_condition : read_options.conditions->columns()) {
int32_t column_id = column_condition.first;
if (_column_readers[column_id] == nullptr ||
!_column_readers[column_id]->has_zone_map()) {
int32_t column_unique_id = _tablet_schema.column(column_condition.first).unique_id();
if (_column_readers.count(column_unique_id) < 1 ||
!_column_readers.at(column_unique_id)->has_zone_map()) {
continue;
}
if (!_column_readers[column_id]->match_condition(column_condition.second)) {
if (!_column_readers.at(column_unique_id)->match_condition(column_condition.second)) {
// any condition not satisfied, return.
iter->reset(new EmptySegmentIterator(schema));
read_options.stats->filtered_segment_number++;
Expand Down Expand Up @@ -168,9 +168,8 @@ Status Segment::_create_column_readers() {
_column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
}

_column_readers.resize(_tablet_schema.columns().size());
for (uint32_t ordinal = 0; ordinal < _tablet_schema.num_columns(); ++ordinal) {
auto& column = _tablet_schema.columns()[ordinal];
auto& column = _tablet_schema.column(ordinal);
auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
if (iter == _column_id_to_footer_ordinal.end()) {
continue;
Expand All @@ -181,14 +180,20 @@ Status Segment::_create_column_readers() {
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(ColumnReader::create(opts, _footer.columns(iter->second),
_footer.num_rows(), _file_reader, &reader));
_column_readers[ordinal] = std::move(reader);
_column_readers.emplace(column.unique_id(), std::move(reader));
}
return Status::OK();
}

Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) {
if (_column_readers[cid] == nullptr) {
const TabletColumn& tablet_column = _tablet_schema.column(cid);
// Not use cid anymore, for example original table schema is colA int, then user do following actions
// 1.add column b
// 2. drop column b
// 3. add column c
// in the new schema column c's cid == 2
// but in the old schema column b's cid == 2
// but they are not the same column
Status Segment::new_column_iterator(const TabletColumn& tablet_column, ColumnIterator** iter) {
if (_column_readers.count(tablet_column.unique_id()) < 1) {
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
return Status::InternalError("invalid nonexistent column without default value.");
}
Expand All @@ -204,12 +209,15 @@ Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) {
*iter = default_value_iter.release();
return Status::OK();
}
return _column_readers[cid]->new_iterator(iter);
return _column_readers.at(tablet_column.unique_id())->new_iterator(iter);
}

Status Segment::new_bitmap_index_iterator(uint32_t cid, BitmapIndexIterator** iter) {
if (_column_readers[cid] != nullptr && _column_readers[cid]->has_bitmap_index()) {
return _column_readers[cid]->new_bitmap_index_iterator(iter);
Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column,
BitmapIndexIterator** iter) {
auto col_unique_id = tablet_column.unique_id();
if (_column_readers.count(col_unique_id) > 0 &&
_column_readers.at(col_unique_id)->has_bitmap_index()) {
return _column_readers.at(col_unique_id)->new_bitmap_index_iterator(iter);
}
return Status::OK();
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class Segment : public std::enable_shared_from_this<Segment> {

uint32_t num_rows() const { return _footer.num_rows(); }

Status new_column_iterator(uint32_t cid, ColumnIterator** iter);
Status new_column_iterator(const TabletColumn& tablet_column, ColumnIterator** iter);

Status new_bitmap_index_iterator(uint32_t cid, BitmapIndexIterator** iter);
Status new_bitmap_index_iterator(const TabletColumn& tablet_column, BitmapIndexIterator** iter);

size_t num_short_keys() const { return _tablet_schema.num_short_key_columns(); }

Expand Down Expand Up @@ -133,10 +133,11 @@ class Segment : public std::enable_shared_from_this<Segment> {
// with an old schema.
std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;

// map column unique id ---> column reader
// ColumnReader for each column in TabletSchema. If ColumnReader is nullptr,
// This means that this segment has no data for that column, which may be added
// after this segment is generated.
std::vector<std::unique_ptr<ColumnReader>> _column_readers;
std::map<int32_t, std::unique_ptr<ColumnReader>> _column_readers;

// used to guarantee that short key index will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_index_once;
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
// create used column iterator
for (auto cid : _seek_schema->column_ids()) {
if (_column_iterators[cid] == nullptr) {
RETURN_IF_ERROR(_segment->new_column_iterator(cid, &_column_iterators[cid]));
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[cid]));
ColumnIteratorOptions iter_opts;
iter_opts.stats = _opts.stats;
iter_opts.file_reader = _file_reader.get();
Expand Down Expand Up @@ -344,7 +345,8 @@ Status SegmentIterator::_init_return_column_iterators() {
}
for (auto cid : _schema.column_ids()) {
if (_column_iterators[cid] == nullptr) {
RETURN_IF_ERROR(_segment->new_column_iterator(cid, &_column_iterators[cid]));
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[cid]));
ColumnIteratorOptions iter_opts;
iter_opts.stats = _opts.stats;
iter_opts.use_page_cache = _opts.use_page_cache;
Expand All @@ -361,8 +363,8 @@ Status SegmentIterator::_init_bitmap_index_iterators() {
}
for (auto cid : _schema.column_ids()) {
if (_bitmap_index_iterators[cid] == nullptr) {
RETURN_IF_ERROR(
_segment->new_bitmap_index_iterator(cid, &_bitmap_index_iterators[cid]));
RETURN_IF_ERROR(_segment->new_bitmap_index_iterator(_opts.tablet_schema->column(cid),
&_bitmap_index_iterators[cid]));
}
}
return Status::OK();
Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,16 @@ void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::C
}

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle,
const TabletSchema* read_tablet_schema, bool use_cache) {
SegmentLoader::CacheKey cache_key(rowset->rowset_id(), *read_tablet_schema);
if (use_cache && _lookup(cache_key, cache_handle)) {
SegmentCacheHandle* cache_handle, bool use_cache) {
SegmentLoader::CacheKey cache_key(rowset->rowset_id());
if (_lookup(cache_key, cache_handle)) {
cache_handle->owned = false;
return Status::OK();
}
cache_handle->owned = !use_cache;

std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_NOT_OK(rowset->load_segments(&segments, read_tablet_schema));
RETURN_NOT_OK(rowset->load_segments(&segments));

if (use_cache) {
// memory of SegmentLoader::CacheValue will be handled by SegmentLoader
Expand Down
12 changes: 3 additions & 9 deletions be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,11 @@ class SegmentLoader {
public:
// The cache key or segment lru cache
struct CacheKey {
CacheKey(RowsetId rowset_id_, const TabletSchema& tablet_schema)
: rowset_id(rowset_id_), tablet_schema(tablet_schema) {}
CacheKey(RowsetId rowset_id_) : rowset_id(rowset_id_) {}
RowsetId rowset_id;
TabletSchema tablet_schema;

// Encode to a flat binary which can be used as LRUCache's key
std::string encode() const {
TabletSchemaPB tablet_schema_pb;
tablet_schema.to_schema_pb(&tablet_schema_pb);
return rowset_id.to_string() + tablet_schema_pb.SerializeAsString();
}
std::string encode() const { return rowset_id.to_string(); }
};

// The cache value of segment lru cache.
Expand Down Expand Up @@ -89,7 +83,7 @@ class SegmentLoader {
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
const TabletSchema* read_tablet_schema, bool use_cache = false);
bool use_cache = false);

// Try to prune the segment cache if expired.
Status prune();
Expand Down
8 changes: 3 additions & 5 deletions be/test/olap/rowset/beta_rowset_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
std::make_shared<io::S3FileSystem>(properties, "bucket", "test prefix", resource_id);
Aws::SDKOptions aws_options = Aws::SDKOptions {};
Aws::InitAPI(aws_options);

TabletSchema dummy_schema;
// failed to head object
{
Aws::Auth::AWSCredentials aws_cred("ak", "sk");
Expand All @@ -447,7 +445,7 @@ TEST_F(BetaRowsetTest, ReadTest) {
rowset.rowset_meta()->set_fs(fs);

std::vector<segment_v2::SegmentSharedPtr> segments;
Status st = rowset.load_segments(&segments, &dummy_schema);
Status st = rowset.load_segments(&segments);
ASSERT_FALSE(st.ok());
}

Expand All @@ -462,7 +460,7 @@ TEST_F(BetaRowsetTest, ReadTest) {
rowset.rowset_meta()->set_fs(fs);

std::vector<segment_v2::SegmentSharedPtr> segments;
Status st = rowset.load_segments(&segments, &dummy_schema);
Status st = rowset.load_segments(&segments);
ASSERT_FALSE(st.ok());
}

Expand All @@ -477,7 +475,7 @@ TEST_F(BetaRowsetTest, ReadTest) {
rowset.rowset_meta()->set_fs(fs);

std::vector<segment_v2::SegmentSharedPtr> segments;
Status st = rowset.load_segments(&segments, &dummy_schema);
Status st = rowset.load_segments(&segments);
ASSERT_FALSE(st.ok());
}

Expand Down
Loading

0 comments on commit f3dc105

Please sign in to comment.