diff --git a/be/src/common/config.h b/be/src/common/config.h index 4d5c4af70255af..5421ce65961d48 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1316,4 +1316,7 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000"); CONF_mInt32(apply_version_slow_log_sec, "30"); +// 1200s +CONF_mInt32(wait_load_memory_full_ms, "1200000"); + } // namespace starrocks::config diff --git a/be/src/runtime/lake_tablets_channel.cpp b/be/src/runtime/lake_tablets_channel.cpp index 979fa1ccdbd1fa..a3aacf8357f5fd 100644 --- a/be/src/runtime/lake_tablets_channel.cpp +++ b/be/src/runtime/lake_tablets_channel.cpp @@ -349,6 +349,23 @@ void LakeTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkRequ // Do NOT return break; } + // back pressure OlapTableSink when load memory is full + size_t wait_cnt = 0; + while (_mem_tracker->limit_exceeded() || + (_mem_tracker->parent() != nullptr && _mem_tracker->parent()->limit_exceeded())) { + auto t1 = std::chrono::steady_clock::now(); + if (wait_cnt * 100 > config::wait_load_memory_full_ms) { + LOG(INFO) << "LakeTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id()) + << " wait tablet " << tablet_id << " " << (wait_cnt * 100) + << "ms because load memory is full "; + break; + } + bthread_usleep(100000); // 100ms + wait_memtable_flush_time_us += + std::chrono::duration_cast(std::chrono::steady_clock::now() - t1) + .count(); + wait_cnt++; + } dw->write(chunk, row_indexes + from, size, [&](const Status& st) { context->update_status(st); count_down_latch.count_down(); diff --git a/be/src/storage/lake/delta_writer.cpp b/be/src/storage/lake/delta_writer.cpp index 20a0679a261b77..55ee94b80dcc72 100644 --- a/be/src/storage/lake/delta_writer.cpp +++ b/be/src/storage/lake/delta_writer.cpp @@ -340,6 +340,7 @@ Status DeltaWriterImpl::check_partial_update_with_sort_key(const Chunk& chunk) { Status DeltaWriterImpl::write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size) { SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false); + auto t0 = butil::gettimeofday_ms(); if (_mem_table == nullptr) { // When loading memory usage is larger than hard limit, we will reject new loading task. if (!config::enable_new_load_on_memory_limit_exceeded && @@ -355,14 +356,24 @@ Status DeltaWriterImpl::write(const Chunk& chunk, const uint32_t* indexes, uint3 _last_write_ts = butil::gettimeofday_s(); Status st; bool full = _mem_table->insert(chunk, indexes, 0, indexes_size); + auto t1 = butil::gettimeofday_ms(); if (_mem_tracker->limit_exceeded()) { VLOG(2) << "Flushing memory table due to memory limit exceeded"; st = flush(); + LOG(INFO) << "DeltaWriterImpl write time: " << t1 - t0 << " flush time: " << butil::gettimeofday_ms() - t1 + << " chunk : " << chunk.num_rows() << " " << chunk.bytes_usage() << " tid " << _tablet_id; } else if (_mem_tracker->parent() && _mem_tracker->parent()->limit_exceeded()) { VLOG(2) << "Flushing memory table due to parent memory limit exceeded"; st = flush(); + LOG(INFO) << "DeltaWriterImpl write2 time: " << t1 - t0 << " flush time: " << butil::gettimeofday_ms() - t1 + << " chunk : " << chunk.num_rows() << " " << chunk.bytes_usage() << " tid " << _tablet_id; } else if (full) { st = flush_async(); + LOG(INFO) << "DeltaWriterImpl write3 time: " << t1 - t0 << " flush time: " << butil::gettimeofday_ms() - t1 + << " chunk : " << chunk.num_rows() << " " << chunk.bytes_usage() << " tid " << _tablet_id; + } else { + LOG(INFO) << "DeltaWriterImpl write4 time: " << t1 - t0 << " flush time: " << butil::gettimeofday_ms() - t1 + << " chunk : " << chunk.num_rows() << " " << chunk.bytes_usage() << " tid " << _tablet_id; } return st; } diff --git a/be/src/storage/memtable_flush_executor.cpp b/be/src/storage/memtable_flush_executor.cpp index d2ee64013d0708..b997005f56d269 100644 --- a/be/src/storage/memtable_flush_executor.cpp +++ b/be/src/storage/memtable_flush_executor.cpp @@ -175,6 +175,8 @@ int MemTableFlushExecutor::calc_max_threads_for_lake_table(const std::vector(data_dirs.size()); data_dir_num = std::max(1, data_dir_num); data_dir_num = std::min(8, data_dir_num); + LOG(INFO) << "calc_max_threads_for_lake_table " << config::lake_flush_thread_num_per_store << " " + << CpuInfo::num_cores() << " " << data_dirs.size() << " result : " << data_dir_num << " * " << threads; return data_dir_num * threads; }