Skip to content

Commit

Permalink
fix accessors fetching queue processing (ydb-platform#12936)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 3e9b9a0 commit 9174611
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,7 @@ message TColumnShardConfig {
optional bool AllowNullableColumnsInPK = 29 [default = false];
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
optional bool UseSlicesFilter = 31 [default = true];
optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000];
}

message TSchemeShardConfig {
Expand Down
37 changes: 24 additions & 13 deletions ydb/core/tx/columnshard/data_accessor/manager.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#include "manager.h"

#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

namespace NKikimr::NOlap::NDataAccessorControl {

void TLocalManager::DrainQueue() {
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
std::optional<ui64> lastPathId;
IGranuleDataAccessor* lastDataAccessor = nullptr;
ui32 countToFlight = 0;
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
while (PortionsAskInFlight + countToFlight < NYDBTest::TControllers::GetColumnShardController()->GetLimitForPortionsMetadataAsk() &&
PortionsAsk.size()) {
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) {
PortionsAsk.pop_front();
continue;
}
auto p = PortionsAsk.front().ExtractPortion();
PortionsAsk.pop_front();
if (!lastPathId || *lastPathId != p->GetPathId()) {
Expand All @@ -24,18 +23,30 @@ void TLocalManager::DrainQueue() {
lastDataAccessor = it->second.get();
}
}
auto it = RequestsByPortion.find(p->GetPortionId());
if (it == RequestsByPortion.end()) {
continue;
}
if (!lastDataAccessor) {
auto it = RequestsByPortion.find(p->GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
if (!i->IsFetched()) {
if (!i->IsFetched() && !i->IsAborted()) {
i->AddError(p->GetPathId(), "path id absent");
}
}
RequestsByPortion.erase(it);
} else {
portionsToAsk[p->GetPathId()].emplace_back(p);
++countToFlight;
bool toAsk = false;
for (auto&& i : it->second) {
if (!i->IsFetched() && !i->IsAborted()) {
toAsk = true;
}
}
if (!toAsk) {
RequestsByPortion.erase(it);
} else {
portionsToAsk[p->GetPathId()].emplace_back(p);
++countToFlight;
}
}
}
for (auto&& i : portionsToAsk) {
Expand All @@ -46,7 +57,7 @@ void TLocalManager::DrainQueue() {
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
if (!i->IsFetched()) {
if (!i->IsFetched() && !i->IsAborted()) {
i->AddAccessor(accessor);
}
}
Expand Down Expand Up @@ -110,4 +121,4 @@ void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) {
DrainQueue();
}

} // namespace NKikimr::NOlap
} // namespace NKikimr::NOlap::NDataAccessorControl
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
return result;
}

bool IsAborted() const {
AFL_VERIFY(HasSubscriber());
auto flag = Subscriber->GetAbortionFlag();
return flag && flag->Val();
}

const std::shared_ptr<const TAtomicCounter>& GetAbortionFlag() const {
AFL_VERIFY(HasSubscriber());
return Subscriber->GetAbortionFlag();
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/hooks/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class ICSController {
virtual TDuration DoGetUsedSnapshotLivetime(const TDuration defaultValue) const {
return defaultValue;
}
virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const {
return defaultValue;
}
virtual TDuration DoGetOverridenGCPeriod(const TDuration defaultValue) const {
return defaultValue;
}
Expand Down Expand Up @@ -189,6 +192,11 @@ class ICSController {
virtual void OnSelectShardingFilter() {
}

ui64 GetLimitForPortionsMetadataAsk() const {
const ui64 defaultValue = GetConfig().GetLimitForPortionsMetadataAsk();
return DoGetLimitForPortionsMetadataAsk(defaultValue);
}

TDuration GetCompactionActualizationLag() const {
const TDuration defaultValue = TDuration::MilliSeconds(GetConfig().GetCompactionActualizationLagMs());
return DoGetCompactionActualizationLag(defaultValue);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/hooks/testing/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TController: public TReadOnlyController {
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideMaxReadStaleness);
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
YDB_ACCESSOR(std::optional<ui64>, OverrideLimitForPortionsMetadataAsk, 1);

YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue);

EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
Expand Down Expand Up @@ -135,6 +137,11 @@ class TController: public TReadOnlyController {
protected:
virtual ::NKikimr::NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const ::NKikimr::NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& actions) const override;

virtual ui64 DoGetLimitForPortionsMetadataAsk(const ui64 defaultValue) const override {
return OverrideLimitForPortionsMetadataAsk.value_or(defaultValue);
}


virtual ui64 DoGetMemoryLimitScanPortion(const ui64 defaultValue) const override {
return OverrideMemoryLimitForPortionReading.value_or(defaultValue);
}
Expand Down

0 comments on commit 9174611

Please sign in to comment.