Skip to content

Commit

Permalink
Merge branch 'branch-3.0' into amory-test
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Nov 25, 2024
2 parents 294afdc + 74b9658 commit dce7a30
Show file tree
Hide file tree
Showing 97 changed files with 29,527 additions and 1,032 deletions.
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,10 @@ In terms of optimizers, Doris uses a combination of CBO and RBO. RBO supports co

**Apache Doris has graduated from Apache incubator successfully and become a Top-Level Project in June 2022**.

Currently, the Apache Doris community has gathered more than 400 contributors from nearly 200 companies in different industries, and the number of active contributors is close to 100 per month.


[![Monthly Active Contributors](https://contributor-overtime-api.apiseven.com/contributors-svg?chart=contributorMonthlyActivity&repo=apache/doris)](https://www.apiseven.com/en/contributor-graph?chart=contributorMonthlyActivity&repo=apache/doris)

[![Contributor over time](https://contributor-overtime-api.apiseven.com/contributors-svg?chart=contributorOverTime&repo=apache/doris)](https://www.apiseven.com/en/contributor-graph?chart=contributorOverTime&repo=apache/doris)

We deeply appreciate 🔗[community contributors](https://github.com/apache/doris/graphs/contributors) for their contribution to Apache Doris.

[![contrib graph](https://contrib.rocks/image?repo=apache/doris)](https://github.com/apache/doris/graphs/contributors)

## 👨‍👩‍👧‍👦 Users

Apache Doris now has a wide user base in China and around the world, and as of today, **Apache Doris is used in production environments in thousands of companies worldwide.** More than 80% of the top 50 Internet companies in China in terms of market capitalization or valuation have been using Apache Doris for a long time, including Baidu, Meituan, Xiaomi, Jingdong, Bytedance, Tencent, NetEase, Kwai, Sina, 360, Mihoyo, and Ke Holdings. It is also widely used in some traditional industries such as finance, energy, manufacturing, and telecommunications.
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,6 @@ DEFINE_Int32(doris_remote_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");
DEFINE_mInt64(block_cache_wait_timeout_ms, "1000");
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");

// limit the queue of pending batches which will be sent by a single nodechannel
DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
Expand Down Expand Up @@ -1043,6 +1041,9 @@ DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
11 changes: 9 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1004,8 +1004,6 @@ DECLARE_mInt64(nodechannel_pending_queue_max_bytes);
// The batch size for sending data by brpc streaming client
DECLARE_mInt64(brpc_streaming_client_batch_bytes);
DECLARE_mInt64(block_cache_wait_timeout_ms);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);

DECLARE_Bool(enable_brpc_builtin_services);

Expand Down Expand Up @@ -1084,6 +1082,15 @@ DECLARE_Bool(enable_ttl_cache_evict_using_lru);
DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);
// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file cache.
// Make your choice based on the following considerations:
// If your file cache is ample enough to accommodate all the data in your database,
// enable this option; otherwise, it is recommended to leave it disabled.
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,8 +1181,12 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
ctx.compaction_level = _engine.cumu_compaction_policy(compaction_policy)
->new_compaction_level(_input_rowsets);
}

ctx.write_file_cache = compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION;
// We presume that the data involved in cumulative compaction is sufficiently 'hot'
// and should always be retained in the cache.
// TODO(gavin): Ensure that the retention of hot data is implemented with precision.
ctx.write_file_cache = (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) ||
(config::enable_file_cache_keep_base_compaction_output &&
compaction_type() == ReaderType::READER_BASE_COMPACTION);
ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical));
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
Expand Down
68 changes: 67 additions & 1 deletion be/src/olap/cumulative_compaction_time_series_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ namespace doris {

uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
uint32_t score = 0;
uint32_t level0_score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();

int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
std::list<RowsetMetaSharedPtr> checked_rs_metas;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
Expand All @@ -50,6 +53,12 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
} else {
// collect the rowsets of cumulative part
score += rs_meta->get_compaction_score();
if (rs_meta->compaction_level() == 0) {
level0_total_size += rs_meta->total_disk_size();
level0_score += rs_meta->get_compaction_score();
} else {
checked_rs_metas.push_back(rs_meta);
}
}
}

Expand All @@ -64,7 +73,64 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}

return score;
// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}

// Condition 3: level1 achieve compaction_goal_size
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
return a->version().first < b->version().first;
});
int32_t rs_meta_count = 0;
int64_t continuous_size = 0;
for (const auto& rs_meta : checked_rs_metas) {
rs_meta_count++;
continuous_size += rs_meta->total_disk_size();
if (rs_meta_count >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
return score;
}
} else if (score > 0) {
// If the compaction process has not been successfully executed,
// the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met
tablet->set_last_cumu_compaction_success_time(now);
}

// Condition 5: If there is a continuous set of empty rowsets, prioritize merging.
std::vector<RowsetSharedPtr> input_rowsets;
std::vector<RowsetSharedPtr> candidate_rowsets =
tablet->pick_candidate_rowsets_to_cumulative_compaction();
tablet->calc_consecutive_empty_rowsets(
&input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets.empty()) {
return score;
}

return 0;
}

void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
_flush_active_memtables(need_flush);
}
} while (_hard_limit_reached() && !_load_usage_low());
} while (_hard_limit_reached());
g_memtable_memory_limit_waiting_threads << -1;
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
Expand Down
20 changes: 11 additions & 9 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "cloud/config.h"
#include "common/status.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/segment_loader.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"

Expand Down Expand Up @@ -63,21 +64,18 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
auto rowset = reader->rowset();
const auto rowset_id = rowset->rowset_id();

DCHECK(_segment_cache_handles.contains(rowset_id));
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
const auto& segments_rows = _all_segments_rows[rowset_id];

if (rowset->num_rows() == 0) {
continue;
}

const auto& segments = segment_cache_handle.get_segments();
int segment_start = 0;
auto split = RowSetSplits(reader->clone());

for (size_t i = 0; i != segments.size(); ++i) {
const auto& segment = segments[i];
for (size_t i = 0; i != segments_rows.size(); ++i) {
const size_t rows_of_segment = segments_rows[i];
RowRanges row_ranges;
const size_t rows_of_segment = segment->num_rows();
int64_t offset_in_segment = 0;

// try to split large segments into RowRanges
Expand Down Expand Up @@ -125,15 +123,15 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
// The non-empty `row_ranges` means there are some rows left in this segment not added into `split`.
if (!row_ranges.is_empty()) {
DCHECK_GT(rows_collected, 0);
DCHECK_EQ(row_ranges.to(), segment->num_rows());
DCHECK_EQ(row_ranges.to(), rows_of_segment);
split.segment_row_ranges.emplace_back(std::move(row_ranges));
}
}

DCHECK_LE(rows_collected, _rows_per_scanner);
if (rows_collected > 0) {
split.segment_offsets.first = segment_start;
split.segment_offsets.second = segments.size();
split.segment_offsets.second = segments_rows.size();
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second - split.segment_offsets.first);
Expand Down Expand Up @@ -181,11 +179,15 @@ Status ParallelScannerBuilder::_load() {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
SegmentCacheHandle segment_cache_handle;

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false));

for (const auto& segment : segment_cache_handle.get_segments()) {
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
}
_total_rows += rowset->num_rows();
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/parallel_scanner_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ParallelScannerBuilder {

size_t _rows_per_scanner {_min_rows_per_scanner};

std::map<RowsetId, SegmentCacheHandle> _segment_cache_handles;
std::map<RowsetId, std::vector<size_t>> _all_segments_rows;

std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeState* _state;
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ Status VTabletWriterV2::close(Status exec_status) {
// close_wait on all non-incremental streams, even if this is not the last sink.
// because some per-instance data structures are now shared among all sinks
// due to sharing delta writers and load stream stubs.
_close_wait(false);
RETURN_IF_ERROR(_close_wait(false));

// send CLOSE_LOAD on all incremental streams if this is the last sink.
// this must happen after all non-incremental streams are closed,
Expand All @@ -616,7 +616,7 @@ Status VTabletWriterV2::close(Status exec_status) {
}

// close_wait on all incremental streams, even if this is not the last sink.
_close_wait(true);
RETURN_IF_ERROR(_close_wait(true));

// calculate and submit commit info
if (is_last_sink) {
Expand Down Expand Up @@ -665,7 +665,7 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}

void VTabletWriterV2::_close_wait(bool incremental) {
Status VTabletWriterV2::_close_wait(bool incremental) {
SCOPED_TIMER(_close_load_timer);
auto st = _load_stream_map->for_each_st(
[this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status {
Expand All @@ -690,6 +690,7 @@ void VTabletWriterV2::_close_wait(bool incremental) {
if (!st.ok()) {
LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id);
}
return st;
}

void VTabletWriterV2::_calc_tablets_to_commit() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {

void _calc_tablets_to_commit();

void _close_wait(bool incremental);
Status _close_wait(bool incremental);

void _cancel(Status status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor
const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
cumulative_compaction_policy);

EXPECT_EQ(9, score);
EXPECT_EQ(0, score);
}

TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_score_big_rowset) {
Expand Down
16 changes: 4 additions & 12 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,17 @@ int Checker::start() {
auto ctime_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
ret = checker->do_check();
int ret1 = checker->do_check();

int ret2 = 0;
if (config::enable_inverted_check) {
if (ret == 0) {
ret = checker->do_inverted_check();
}
}

if (ret < 0) {
// If ret < 0, it means that a temporary error occurred during the check process.
// The check job should not be considered finished, and the next round of check job
// should be retried as soon as possible.
return;
ret2 = checker->do_inverted_check();
}

// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
ip_port_, ret == 0, ctime_ms);
ip_port_, ret1 == 0 && ret2 == 0, ctime_ms);
}
{
std::lock_guard lock(mtx_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,12 @@ private boolean processDropColumn(DropColumnClause alterClause, OlapTable olapTa
// drop bloom filter column
Set<String> bfCols = olapTable.getCopiedBfColumns();
if (bfCols != null) {
Set<String> newBfCols = new HashSet<>();
Set<String> newBfCols = null;
for (String bfCol : bfCols) {
if (!bfCol.equalsIgnoreCase(dropColName)) {
if (newBfCols == null) {
newBfCols = Sets.newHashSet();
}
newBfCols.add(bfCol);
}
}
Expand Down Expand Up @@ -2912,6 +2915,25 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
LOG.info("finished modify table's add or drop or modify columns. table: {}, job: {}, is replay: {}",
olapTable.getName(), jobId, isReplay);
}
// for bloom filter, rebuild bloom filter info by table schema in replay
if (isReplay) {
Set<String> bfCols = olapTable.getCopiedBfColumns();
if (bfCols != null) {
List<Column> columns = olapTable.getBaseSchema();
Set<String> newBfCols = null;
for (String bfCol : bfCols) {
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(bfCol)) {
if (newBfCols == null) {
newBfCols = Sets.newHashSet();
}
newBfCols.add(column.getName());
}
}
}
olapTable.setBloomFilterInfo(newBfCols, olapTable.getBfFpp());
}
}
}

public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@ public String getTypeName() {

@Override
public String toString() {
if (!leftRejectEdges.isEmpty() || !rightRejectEdges.isEmpty()) {
return String.format("<%s --%s-- %s>[%s , %s]", LongBitmap.toString(leftExtendedNodes),
this.getTypeName(), LongBitmap.toString(rightExtendedNodes), leftRejectEdges, rightRejectEdges);
}
return String.format("<%s --%s-- %s>", LongBitmap.toString(leftExtendedNodes),
this.getTypeName(), LongBitmap.toString(rightExtendedNodes));
}
Expand Down
Loading

0 comments on commit dce7a30

Please sign in to comment.