Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8942
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed May 20, 2024
1 parent acdbe72 commit 489e0a7
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 0 deletions.
51 changes: 51 additions & 0 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,23 @@ class LocalAdmissionController final : private boost::noncopyable
background_threads.emplace_back([this] { this->watchGAC(); });
}

<<<<<<< HEAD
~LocalAdmissionController() { stop(); }
=======
~LocalAdmissionController() { safeStop(); }

void safeStop()
{
try
{
stop();
}
catch (...)
{
LOG_ERROR(log, "stop server id({}) failed: {}", unique_client_id, getCurrentExceptionMessage(false));
}
}
>>>>>>> 0b0cc4527b (catch exception of LocalAdmissionController::stop() (#8942))

void consumeCPUResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
{
Expand Down Expand Up @@ -523,7 +539,11 @@ class LocalAdmissionController final : private boost::noncopyable
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000;

private:
<<<<<<< HEAD
void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
=======
void stop()
>>>>>>> 0b0cc4527b (catch exception of LocalAdmissionController::stop() (#8942))
{
assert(!stopped);

Expand Down Expand Up @@ -602,6 +622,37 @@ class LocalAdmissionController final : private boost::noncopyable
getCurrentExceptionMessage(false));
}
}
<<<<<<< HEAD
=======
LOG_INFO(log, "LAC stopped done: final report size: {}", acquire_infos.size());
}

void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
{
if (unlikely(stopped))
return;

// When tidb_enable_resource_control is disabled, resource group name is empty.
if (name.empty())
return;

ResourceGroupPtr group = findResourceGroup(name);
if unlikely (!group)
{
LOG_DEBUG(log, "cannot consume ru for {}, maybe it has been deleted", name);
return;
}

group->consumeResource(ru, cpu_time_in_ns);
if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now()))
{
{
std::lock_guard lock(mu);
low_token_resource_groups.insert(name);
}
cv.notify_one();
}
>>>>>>> 0b0cc4527b (catch exception of LocalAdmissionController::stop() (#8942))
}

// If we cannot get GAC resp for DEGRADE_MODE_DURATION seconds, enter degrade mode.
Expand Down
65 changes: 65 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,71 @@ int Server::main(const std::vector<std::string> & /*args*/)
GET_METRIC(tiflash_server_info, start_time).Set(ts.epochTime());
}

<<<<<<< HEAD
=======
// For test mode, TaskScheduler and LAC is controlled by test case.
// TODO: resource control is not supported for WN. So disable pipeline model and LAC.
const bool init_pipeline_and_lac
= !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
if (init_pipeline_and_lac)
{
#ifdef DBMS_PUBLIC_GTEST
LocalAdmissionController::global_instance = std::make_unique<MockLocalAdmissionController>();
#else
LocalAdmissionController::global_instance
= std::make_unique<LocalAdmissionController>(tmt_context.getKVCluster(), tmt_context.getEtcdClient());
#endif

auto get_pool_size = [](const auto & setting) {
return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast<size_t>(setting);
};
TaskSchedulerConfig config{
{get_pool_size(settings.pipeline_cpu_task_thread_pool_size),
settings.pipeline_cpu_task_thread_pool_queue_type},
{get_pool_size(settings.pipeline_io_task_thread_pool_size),
settings.pipeline_io_task_thread_pool_queue_type},
};
RUNTIME_CHECK(!TaskScheduler::instance);
TaskScheduler::instance = std::make_unique<TaskScheduler>(config);
LOG_INFO(log, "init pipeline task scheduler with {}", config.toString());
}

SCOPE_EXIT({
if (init_pipeline_and_lac)
{
assert(TaskScheduler::instance);
TaskScheduler::instance.reset();
// Stop LAC instead of reset, because storage layer still needs it.
// Workload will not be throttled when LAC is stopped.
// It's ok because flash service has already been destructed, so throllting is meaningless.
assert(LocalAdmissionController::global_instance);
LocalAdmissionController::global_instance->safeStop();
}
});

if (settings.enable_async_grpc_client)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

/// startup grpc server to serve raft and/or flash services.
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

SCOPE_EXIT({
// Stop LAC for AutoScaler managed CN before FlashGrpcServerHolder is destructed.
// Because AutoScaler it will kill tiflash process when port of flash_server_addr is down.
// And we want to make sure LAC is cleanedup.
// The effects are there will be no resource control during [lac.safeStop(), FlashGrpcServer destruct done],
// but it's basically ok, that duration is small(normally 100-200ms).
if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode() && use_autoscaler
&& LocalAdmissionController::global_instance)
LocalAdmissionController::global_instance->safeStop();
});

>>>>>>> 0b0cc4527b (catch exception of LocalAdmissionController::stop() (#8942))
tmt_context.setStatusRunning();

try
Expand Down

0 comments on commit 489e0a7

Please sign in to comment.