Skip to content

Commit

Permalink
Merge branch 'branch-2.1' into sync_unique_case
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Jul 15, 2024
2 parents 3082339 + 8df2432 commit 1620484
Show file tree
Hide file tree
Showing 401 changed files with 10,796 additions and 5,405 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ be/tags
be/test/olap/test_data/tablet_meta_test.hdr
be/.devcontainer/
be/src/apache-orc/
zoneinfo/

## tools
tools/ssb-tools/ssb-data/
Expand Down
2 changes: 1 addition & 1 deletion be/src/clucene
16 changes: 6 additions & 10 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,9 @@ DEFINE_mInt32(max_fill_rate, "2");

DEFINE_mInt32(double_resize_threshold, "23");

// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G,
// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines
// with large memory larger than 16G.
// Turn up max. On machines with more than 16G memory, more memory buffers will be reserved for Full GC.
// Turn down max. will use as much memory as possible.
DEFINE_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
DEFINE_Int64(max_sys_mem_available_low_water_mark_bytes, "6871947673");

DEFINE_Int64(memtable_limiter_reserved_memory_bytes, "838860800");

// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
DEFINE_mString(process_minor_gc_size, "10%");
Expand All @@ -132,6 +129,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");

DEFINE_mBool(disable_memory_gc, "false");

DEFINE_mBool(enable_stacktrace_in_allocator_check_failed, "false");

DEFINE_mInt64(large_memory_check_bytes, "2147483648");

DEFINE_mBool(enable_memory_orphan_check, "true");
Expand Down Expand Up @@ -554,6 +553,7 @@ DEFINE_String(pprof_profile_dir, "${DORIS_HOME}/log");
// for jeprofile in jemalloc
DEFINE_mString(jeprofile_dir, "${DORIS_HOME}/log");
DEFINE_mBool(enable_je_purge_dirty_pages, "true");
DEFINE_mString(je_dirty_pages_mem_limit_percent, "5%");

// to forward compatibility, will be removed later
DEFINE_mBool(enable_token_check, "true");
Expand Down Expand Up @@ -1152,10 +1152,6 @@ DEFINE_mBool(enable_workload_group_memory_gc, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");

// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
DEFINE_Bool(use_doris_tzfile, "false");

// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");

Expand Down
22 changes: 13 additions & 9 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,16 @@ DECLARE_mInt32(max_fill_rate);

DECLARE_mInt32(double_resize_threshold);

// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 1.6G,
// actual low water mark=min(1.6G, MemTotal * 10%), avoid wasting too much memory on machines
// with large memory larger than 16G.
// Turn up max. On machines with more than 16G memory, more memory buffers will be reserved for Full GC.
// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 6.4G,
// actual low water mark=min(6.4G, MemTotal * 5%), avoid wasting too much memory on machines
// with large memory larger than 128G.
// Turn up max. On machines with more than 128G memory, more memory buffers will be reserved for Full GC.
// Turn down max. will use as much memory as possible.
DECLARE_Int64(max_sys_mem_available_low_water_mark_bytes);

// reserve a small amount of memory so we do not trigger MinorGC
DECLARE_Int64(memtable_limiter_reserved_memory_bytes);

// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
DECLARE_mString(process_minor_gc_size);
DECLARE_mString(process_full_gc_size);
Expand All @@ -171,11 +174,14 @@ DECLARE_mString(process_full_gc_size);
// used memory and the exec_mem_limit will be canceled.
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
DECLARE_mBool(enable_query_memory_overcommit);
//waibibabu

// gc will release cache, cancel task, and task will wait for gc to release memory,
// default gc strategy is conservative, if you want to exclude the interference of gc, let it be true
DECLARE_mBool(disable_memory_gc);

// Allocator check failed log stacktrace if not catch exception
DECLARE_mBool(enable_stacktrace_in_allocator_check_failed);

// malloc or new large memory larger than large_memory_check_bytes, default 2G,
// will print a warning containing the stacktrace, but not prevent memory alloc.
// If is -1, disable large memory check.
Expand Down Expand Up @@ -606,6 +612,8 @@ DECLARE_String(pprof_profile_dir);
DECLARE_mString(jeprofile_dir);
// Purge all unused dirty pages for all arenas.
DECLARE_mBool(enable_je_purge_dirty_pages);
// Purge all unused Jemalloc dirty pages for all arenas when exceed je_dirty_pages_mem_limit and process exceed soft limit.
DECLARE_mString(je_dirty_pages_mem_limit_percent);

// to forward compatibility, will be removed later
DECLARE_mBool(enable_token_check);
Expand Down Expand Up @@ -1228,10 +1236,6 @@ DECLARE_Bool(enable_flush_file_cache_async);
// Remove predicate that is always true for a segment.
DECLARE_Bool(ignore_always_true_predicate_for_segment);

// Dir of default timezone files
DECLARE_String(default_tzfiles_path);
DECLARE_Bool(use_doris_tzfile);

// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);

Expand Down
50 changes: 27 additions & 23 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/memory_reclamation.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/cpu_info.h"
Expand Down Expand Up @@ -192,27 +193,30 @@ void Daemon::memory_maintenance_thread() {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache();
doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
// Refresh allocator memory metrics.
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_allocator_mem();
#ifdef USE_JEMALLOC
if (doris::MemInfo::je_dirty_pages_mem() > doris::MemInfo::je_dirty_pages_mem_limit() &&
GlobalMemoryArbitrator::is_exceed_soft_mem_limit()) {
doris::MemInfo::notify_je_purge_dirty_pages();
}
#endif
if (config::enable_system_metrics) {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
}
#endif

// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::MemTrackerLimiter::enable_print_log_process_usage();

// Refresh mem tracker each type counter.
doris::MemTrackerLimiter::refresh_global_counter();

// Refresh allocator memory metrics.
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_allocator_mem();
if (config::enable_system_metrics) {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
}
#endif

ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
Expand All @@ -229,38 +233,38 @@ void Daemon::memory_gc_thread() {
if (config::disable_memory_gc) {
continue;
}
auto sys_mem_available = doris::MemInfo::sys_mem_available();
auto sys_mem_available = doris::GlobalMemoryArbitrator::sys_mem_available();
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();

// GC excess memory for resource groups that not enable overcommit
auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc();
auto tg_free_mem = doris::MemoryReclamation::tg_disable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
process_memory_usage -= tg_free_mem;

if (memory_full_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() ||
process_memory_usage >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
std::string mem_info =
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str();
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format(
"[MemoryGC] start full GC, {}.",
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
if (doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
process_memory_usage >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
std::string mem_info =
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format(
"[MemoryGC] start minor GC, {}.",
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
if (doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else {
Expand Down
39 changes: 39 additions & 0 deletions be/src/http/action/clear_cache_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/clear_cache_action.h"

#include <sstream>
#include <string>

#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "runtime/memory/cache_manager.h"

namespace doris {

const static std::string HEADER_JSON = "application/json";

void ClearDataCacheAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
CacheManager::instance()->clear_once();
HttpChannel::send_reply(req, HttpStatus::OK, "");
}

} // end namespace doris
35 changes: 35 additions & 0 deletions be/src/http/action/clear_cache_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "http/http_handler.h"

namespace doris {

class HttpRequest;

class ClearDataCacheAction : public HttpHandler {
public:
ClearDataCacheAction() = default;

~ClearDataCacheAction() override = default;

void handle(HttpRequest* req) override;
};

} // end namespace doris
5 changes: 5 additions & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "gutil/strings/substitute.h"
#include "http/action/tablets_info_action.h"
#include "http/web_page_handler.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/easy_json.h"
Expand Down Expand Up @@ -155,6 +156,10 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr
MemTrackerLimiter::Type::SCHEMA_CHANGE);
} else if (iter->second == "other") {
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER);
} else if (iter->second == "reserved_memory") {
GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots);
} else if (iter->second == "all") {
MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots);
}
} else {
(*output) << "<h4>*Notice:</h4>\n";
Expand Down
53 changes: 26 additions & 27 deletions be/src/io/fs/stream_sink_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
<< ", data_length: " << bytes_req;

std::span<const Slice> slices {data, data_cnt};
size_t stream_index = 0;
size_t fault_injection_skipped_streams = 0;
bool ok = false;
bool skip_stream = false;
Status st;
for (auto& stream : _streams) {
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
if (stream_index >= 2) {
skip_stream = true;
if (fault_injection_skipped_streams < 1) {
fault_injection_skipped_streams++;
continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
if (stream_index >= 1) {
skip_stream = true;
if (fault_injection_skipped_streams < 2) {
fault_injection_skipped_streams++;
continue;
}
});
if (!skip_stream) {
st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
_bytes_appended, slices);
}
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
if (stream_index >= 2) {
st = Status::InternalError("stream sink file writer append data failed");
}
stream_index++;
skip_stream = false;
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
if (stream_index >= 1) {
st = Status::InternalError("stream sink file writer append data failed");
}
stream_index++;
skip_stream = false;
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", {
st = Status::InternalError("stream sink file writer append data failed");
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{ continue; });
st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended,
slices);
ok = ok || st.ok();
if (!st.ok()) {
LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id()
Expand Down Expand Up @@ -116,8 +100,23 @@ Status StreamSinkFileWriter::finalize() {
VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
size_t fault_injection_skipped_streams = 0;
bool ok = false;
for (auto& stream : _streams) {
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
if (fault_injection_skipped_streams < 1) {
fault_injection_skipped_streams++;
continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
if (fault_injection_skipped_streams < 2) {
fault_injection_skipped_streams++;
continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{ continue; });
auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
_bytes_appended, {}, true);
ok = ok || st.ok();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_
{_max_version_schema, update_schema}, _max_version_schema, final_schema,
check_column_size));
_max_version_schema = final_schema;
VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_structure();
VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_full_schema();
return Status::OK();
}

Expand Down
Loading

0 comments on commit 1620484

Please sign in to comment.