From 9174611344d8ace575d0304ef5fc649080dbf159 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 25 Dec 2024 07:34:56 +0300 Subject: [PATCH] fix accessors fetching queue processing (#12936) --- ydb/core/protos/config.proto | 1 + .../tx/columnshard/data_accessor/manager.cpp | 37 ++++++++++++------- .../tx/columnshard/data_accessor/request.h | 6 +++ .../tx/columnshard/hooks/abstract/abstract.h | 8 ++++ .../tx/columnshard/hooks/testing/controller.h | 7 ++++ 5 files changed, 46 insertions(+), 13 deletions(-) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e7787490dfe8..bd66f8fb984f 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1628,6 +1628,7 @@ message TColumnShardConfig { optional bool AllowNullableColumnsInPK = 29 [default = false]; optional uint32 RestoreDataOnWriteTimeoutSeconds = 30; optional bool UseSlicesFilter = 31 [default = true]; + optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000]; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index 182f8d81fbed..7f0b7bdc1510 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -1,18 +1,17 @@ #include "manager.h" +#include + namespace NKikimr::NOlap::NDataAccessorControl { void TLocalManager::DrainQueue() { - THashMap> portionsToAsk; std::optional lastPathId; IGranuleDataAccessor* lastDataAccessor = nullptr; ui32 countToFlight = 0; - while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { + while (PortionsAskInFlight + countToFlight < NYDBTest::TControllers::GetColumnShardController()->GetLimitForPortionsMetadataAsk() && + PortionsAsk.size()) { + THashMap> portionsToAsk; while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { - if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) { - PortionsAsk.pop_front(); - continue; - } auto p = PortionsAsk.front().ExtractPortion(); PortionsAsk.pop_front(); if (!lastPathId || *lastPathId != p->GetPathId()) { @@ -24,18 +23,30 @@ void TLocalManager::DrainQueue() { lastDataAccessor = it->second.get(); } } + auto it = RequestsByPortion.find(p->GetPortionId()); + if (it == RequestsByPortion.end()) { + continue; + } if (!lastDataAccessor) { - auto it = RequestsByPortion.find(p->GetPortionId()); - AFL_VERIFY(it != RequestsByPortion.end()); for (auto&& i : it->second) { - if (!i->IsFetched()) { + if (!i->IsFetched() && !i->IsAborted()) { i->AddError(p->GetPathId(), "path id absent"); } } RequestsByPortion.erase(it); } else { - portionsToAsk[p->GetPathId()].emplace_back(p); - ++countToFlight; + bool toAsk = false; + for (auto&& i : it->second) { + if (!i->IsFetched() && !i->IsAborted()) { + toAsk = true; + } + } + if (!toAsk) { + RequestsByPortion.erase(it); + } else { + portionsToAsk[p->GetPathId()].emplace_back(p); + ++countToFlight; + } } } for (auto&& i : portionsToAsk) { @@ -46,7 +57,7 @@ void TLocalManager::DrainQueue() { auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); AFL_VERIFY(it != RequestsByPortion.end()); for (auto&& i : it->second) { - if (!i->IsFetched()) { + if (!i->IsFetched() && !i->IsAborted()) { i->AddAccessor(accessor); } } @@ -110,4 +121,4 @@ void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) { DrainQueue(); } -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap::NDataAccessorControl diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 2d5f29ad2040..6be2665e8be1 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -232,6 +232,12 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounterGetAbortionFlag(); + return flag && flag->Val(); + } + const std::shared_ptr& GetAbortionFlag() const { AFL_VERIFY(HasSubscriber()); return Subscriber->GetAbortionFlag(); diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 46ee0ba1c336..94b4eca7e4d2 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -92,6 +92,9 @@ class ICSController { virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const { return defaultValue; } + virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const { + return defaultValue; + } virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const { return defaultValue; } @@ -189,6 +192,11 @@ class ICSController { virtual void OnSelectShardingFilter() { } + ui64 GetLimitForPortionsMetadataAsk() const { + const ui64 defaultValue = GetConfig().GetLimitForPortionsMetadataAsk(); + return DoGetLimitForPortionsMetadataAsk(defaultValue); + } + TDuration GetCompactionActualizationLag() const { const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetCompactionActualizationLagMs()); return DoGetCompactionActualizationLag(defaultValue); diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index f9bf371afa4d..1430c3a002c0 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -24,6 +24,8 @@ class TController: public TReadOnlyController { YDB_ACCESSOR_DEF(std::optional, OverrideTasksActualizationLag); YDB_ACCESSOR_DEF(std::optional, OverrideMaxReadStaleness); YDB_ACCESSOR(std::optional, OverrideMemoryLimitForPortionReading, 100); + YDB_ACCESSOR(std::optional, OverrideLimitForPortionsMetadataAsk, 1); + YDB_ACCESSOR_DEF(std::optional, OverrideBlobPutResultOnWriteValue); EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force; @@ -135,6 +137,11 @@ class TController: public TReadOnlyController { protected: virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override; + virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const override { + return OverrideLimitForPortionsMetadataAsk.value_or(defaultValue); + } + + virtual ui64 DoGetMemoryLimitScanPortion(const ui64 defaultValue) const override { return OverrideMemoryLimitForPortionReading.value_or(defaultValue); }