Skip to content

Commit

Permalink
[branch-2.1](memory) When Load ends, check memory tracker value retur…
Browse files Browse the repository at this point in the history
…ns is equal to 0 (#40850)

pick
#38960
#39908
#40043
#40092
#40016
#40439

---------

Co-authored-by: hui lai <[email protected]>
Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
3 people authored Sep 15, 2024
1 parent 148f385 commit b52b572
Show file tree
Hide file tree
Showing 24 changed files with 214 additions and 83 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");

// default is true. if any memory tracking in Orphan mem tracker will report error.
// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time.
// allocator free memory not need to check, because when the thread memory tracker label is Orphan,
// use the tracker saved in Allocator.
DEFINE_mBool(enable_memory_orphan_check, "true");

// The maximum time a thread waits for full GC. Currently only query will wait for full gc.
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);

// default is true. if any memory tracking in Orphan mem tracker will report error.
// !! not modify the default value of this conf!! otherwise memory errors cannot be detected in time.
// allocator free memory not need to check, because when the thread memory tracker label is Orphan,
// use the tracker saved in Allocator.
DECLARE_mBool(enable_memory_orphan_check);

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
RETURN_IF_ERROR(status);
break;
Expand Down Expand Up @@ -349,6 +353,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
RETURN_IF_ERROR(status);
break;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::GROUP_COMMIT_SCAN_NODE:
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
*node = pool->add(new vectorized::GroupCommitScanNode(pool, tnode, descs));
return Status::OK();

Expand Down
23 changes: 17 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}

if (!st.ok() && !ctx->status.ok()) {
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
Expand Down
11 changes: 9 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
ByteBufferPtr bb;
Status st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
Expand Down
9 changes: 5 additions & 4 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (need_schema == true) {
if (need_schema) {
RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
stream_load_ctx->schema_buffer->pos /* total_length */);
stream_load_ctx->schema_buffer->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
stream_load_ctx->schema_buffer()->pos /* total_length */);
stream_load_ctx->schema_buffer()->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
RETURN_IF_ERROR(pipe->finish());
*file_reader = std::move(pipe);
} else {
Expand Down
7 changes: 5 additions & 2 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}

Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
Expand Down Expand Up @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
_write_buf = ByteBuffer::allocate(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
sink_ = std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class ExecEnv {
std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
return _segcompaction_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> stream_load_pipe_tracker() {
return _stream_load_pipe_tracker;
}
std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
return _point_query_executor_mem_tracker;
}
Expand Down Expand Up @@ -368,6 +371,7 @@ class ExecEnv {
std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
std::shared_ptr<MemTrackerLimiter> _stream_load_pipe_tracker;

// Tracking memory may be shared between multiple queries.
std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree");
_s3_file_buffer_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer");
_stream_load_pipe_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "StreamLoadPipe");
}

void ExecEnv::_register_metrics() {
Expand Down
71 changes: 42 additions & 29 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() {
"mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
"If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see "
"more information."
"1. For query and load, memory leaks may have occurred, it is expected that the query "
"mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. "
Expand All @@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_consumption->current_value() != 0) {
// TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end.
#ifndef NDEBUG
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.",
label(), _consumption->current_value(), _consumption->peak_value(),
Expand All @@ -140,29 +142,29 @@ MemTrackerLimiter::~MemTrackerLimiter() {
}
_consumption->set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty()) {
LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
} else if (!_address_sanitizers.empty() && !is_group_commit_load) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
memory_memtrackerlimiter_cnt << -1;
}

#ifndef NDEBUG
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace(1, "DISABLED")
<< ", old stack_trace: " << it->second.stack_trace;
_error_address_sanitizers.emplace_back(
fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, "
"consumption: {}, peak consumption: {}, buf: {}, size: {}, old "
"buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(),
buf, size, it->first, it->second.size,
get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace));
}

// if alignment not equal to 0, maybe usable_size > size.
Expand All @@ -174,36 +176,47 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
}

void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace(1, "DISABLED")
<< ", old stack_trace: " << it->second.stack_trace;
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] free memory buf size inaccurate, mem tracker label: "
"{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: "
"{}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf,
size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
it->second.stack_trace));
}
_address_sanitizers.erase(buf);
} else {
LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace(1, "DISABLED");
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf, size,
get_stack_trace(1, "FULL_WITH_INLINE")));
}
}
}

std::string MemTrackerLimiter::print_address_sanitizers() {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
std::string detail = "[Address Sanitizer]:";
detail += "\n memory not be freed:";
for (const auto& it : _address_sanitizers) {
detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size,
it.second.stack_trace);
auto msg = fmt::format(
"\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
_label, _consumption->current_value(), _consumption->peak_value(), it.first,
it.second.size, it.second.stack_trace);
LOG(INFO) << msg;
detail += msg;
}
detail += "\n incorrect memory alloc and free:";
for (const auto& err_msg : _error_address_sanitizers) {
LOG(INFO) << err_msg;
detail += fmt::format("\n {}", err_msg);
}
return detail;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class MemTrackerLimiter final : public MemTracker {
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
std::string print_address_sanitizers();
bool is_group_commit_load {false};
#endif

std::string debug_string() override {
Expand Down Expand Up @@ -260,6 +261,7 @@ class MemTrackerLimiter final : public MemTracker {

std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
std::vector<std::string> _error_address_sanitizers;
#endif
};

Expand Down
15 changes: 13 additions & 2 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "common/utils.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/thread_context.h"
#include "util/byte_buffer.h"
#include "util/time.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -118,6 +119,17 @@ class StreamLoadContext {
// also print the load source info if detail is set to true
std::string brief(bool detail = false) const;

Status allocate_schema_buffer() {
if (_schema_buffer == nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer);
}
return Status::OK();
}

ByteBufferPtr schema_buffer() { return _schema_buffer; }

public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
Expand Down Expand Up @@ -182,8 +194,6 @@ class StreamLoadContext {
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;

ByteBufferPtr schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);

TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;

Expand Down Expand Up @@ -241,6 +251,7 @@ class StreamLoadContext {

private:
ExecEnv* _exec_env = nullptr;
ByteBufferPtr _schema_buffer;
};

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000;
};

// Reset thread memory tracker, otherwise SCOPED_ATTACH_TASK will be called nested, nesting is
// not allowed, first time in on_chunk_data, second time in StreamLoadExecutor::execute_plan_fragment.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());

if (ctx->put_result.__isset.params) {
st = _exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
QuerySource::STREAM_LOAD, exec_fragment);
Expand Down
Loading

0 comments on commit b52b572

Please sign in to comment.