Skip to content

Commit

Permalink
correction balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Jan 26, 2025
1 parent 6011de9 commit b50d004
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
11 changes: 11 additions & 0 deletions ydb/core/tx/conveyor/service/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ class TProcess {
YDB_ACCESSOR_DEF(TDequePriorityFIFO, Tasks);
ui32 LinksCount = 0;
public:
void CleanCPUMetric() {
CPUTime = 0;
}

bool DecRegistration() {
AFL_VERIFY(LinksCount);
--LinksCount;
Expand Down Expand Up @@ -148,7 +152,14 @@ class TDistributor: public TActorBootstrapped<TDistributor> {
void HandleMain(TEvInternal::TEvTaskProcessedResult::TPtr& ev);

void AddProcess(const ui64 processId) {
ProcessesOrdered.clear();
AFL_VERIFY(Processes.emplace(processId, TProcess(processId)).second);
for (auto&& i : Processes) {
i.second.CleanCPUMetric();
if (i.second.GetTasks().size()) {
ProcessesOrdered.emplace(i.second.GetAddress());
}
}
}

void AddCPUTime(const ui64 processId, const TDuration d) {
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/limiter/grouped_memory/service/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ std::vector<std::shared_ptr<TAllocationInfo>> TGrouppedAllocations::AllocatePoss
return result;
}

bool TAllocationGroups::Allocate(const bool isPriorityProcess, TProcessMemoryScope& process, const ui32 allocationsLimit) {
bool TAllocationGroups::Allocate(const bool isPriorityProcess, TProcessMemoryScope& scope, const ui32 allocationsLimit) {
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "try_allocation")("limit", allocationsLimit)(
"external_process_id", process.ExternalProcessId)("forced_internal_group_id", process.GroupIds.GetMinInternalIdOptional())(
"external_scope_id", process.ExternalScopeId)("forced_external_group_id", process.GroupIds.GetMinExternalIdOptional());
"external_process_id", scope.ExternalProcessId)("forced_internal_group_id", scope.GroupIds.GetMinInternalIdOptional())(
"external_scope_id", scope.ExternalScopeId)("forced_external_group_id", scope.GroupIds.GetMinExternalIdOptional());
ui32 allocationsCount = 0;
while (true) {
std::vector<ui64> toRemove;
for (auto it = Groups.begin(); it != Groups.end();) {
const ui64 internalGroupId = it->first;
const bool forced = isPriorityProcess && internalGroupId == process.GroupIds.GetMinInternalIdVerified();
const bool forced = isPriorityProcess && internalGroupId == scope.GroupIds.GetMinInternalIdVerified();
std::vector<std::shared_ptr<TAllocationInfo>> allocated;
if (forced) {
allocated = it->second.ExtractAllocationsToVector();
Expand All @@ -38,7 +38,7 @@ bool TAllocationGroups::Allocate(const bool isPriorityProcess, TProcessMemorySco
break;
}
for (auto&& i : allocated) {
if (!i->Allocate(process.OwnerActorId)) {
if (!i->Allocate(scope.OwnerActorId)) {
toRemove.emplace_back(i->GetIdentifier());
} else if (!forced) {
AFL_VERIFY(++allocationsCount <= allocationsLimit)("count", allocationsCount)("limit", allocationsLimit);
Expand All @@ -56,7 +56,7 @@ bool TAllocationGroups::Allocate(const bool isPriorityProcess, TProcessMemorySco
}
}
for (auto&& i : toRemove) {
process.UnregisterAllocation(i);
scope.UnregisterAllocation(i);
}
if (toRemove.empty() || allocationsCount == allocationsLimit) {
break;
Expand Down

0 comments on commit b50d004

Please sign in to comment.