Skip to content

Commit

Permalink
accessors fetching control (#12859)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 23, 2024
1 parent c6318e6 commit ac47d76
Show file tree
Hide file tree
Showing 21 changed files with 254 additions and 95 deletions.
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class TColumnPortionsAccumulator {
const std::shared_ptr<TResultAccumulator> Result;
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
const std::set<ui32> ColumnTagsRequested;
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag() const override {
return Default<std::shared_ptr<const TAtomicCounter>>();
}

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override {
THashMap<ui32, std::unique_ptr<TCountMinSketch>> sketchesByColumns;
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {
private:
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag() const override {
return Default<std::shared_ptr<const TAtomicCounter>>();
}

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
AFL_VERIFY(ResourcesGuard);
Expand Down Expand Up @@ -1422,13 +1425,13 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {

public:
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
std::vector<NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
: TBase(self)
, FetchCallback(fetchCallback)
, Consumer(consumer)
{
for (auto&& i : portions) {
PortionsByPath[i.second->GetPathId()].emplace_back(i.second);
PortionsByPath[i->GetPathId()].emplace_back(i);
}
}

Expand Down
9 changes: 7 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@

namespace NKikimr::NOlap::NDataAccessorControl {

THashMap<ui64, TPortionDataAccessor> IGranuleDataAccessor::AskData(
void IGranuleDataAccessor::AskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
AFL_VERIFY(portions.size());
return DoAskData(portions, callback, consumer);
DoAskData(portions, callback, consumer);
}

TDataCategorized IGranuleDataAccessor::AnalyzeData(
const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) {
return DoAnalyzeData(portions, consumer);
}

void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/abstract/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,27 @@ class TActorAccessorsCallback: public IAccessorCallback {
}
};

class TDataCategorized {
private:
YDB_READONLY_DEF(std::vector<TPortionInfo::TConstPtr>, PortionsToAsk);
YDB_READONLY_DEF(std::vector<TPortionDataAccessor>, CachedAccessors);

public:
void AddToAsk(const TPortionInfo::TConstPtr& p) {
PortionsToAsk.emplace_back(p);
}
void AddFromCache(const TPortionDataAccessor& accessor) {
CachedAccessors.emplace_back(accessor);
}
};

class IGranuleDataAccessor {
private:
const ui64 PathId;

virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
virtual void DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) = 0;
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) = 0;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) = 0;

public:
Expand All @@ -39,8 +54,9 @@ class IGranuleDataAccessor {
: PathId(pathId) {
}

THashMap<ui64, TPortionDataAccessor> AskData(
void AskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer);
TDataCategorized AnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer);
void ModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
return DoModifyPortions(add, remove);
}
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/columnshard/data_accessor/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,12 @@ class TEvUnregisterController

class TEvAskTabletDataAccessors: public NActors::TEventLocal<TEvAskTabletDataAccessors, NColumnShard::TEvPrivate::EEv::EvAskTabletDataAccessors> {
private:
using TPortions = THashMap<ui64, TPortionInfo::TConstPtr>;
YDB_ACCESSOR_DEF(TPortions, Portions);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TConstPtr>, Portions);
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IAccessorCallback>, Callback);
YDB_READONLY_DEF(TString, Consumer);

public:
explicit TEvAskTabletDataAccessors(const THashMap<ui64, TPortionInfo::TConstPtr>& portions,
explicit TEvAskTabletDataAccessors(const std::vector<TPortionInfo::TConstPtr>& portions,
const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback, const TString& consumer)
: Portions(portions)
, Callback(callback)
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

namespace NKikimr::NOlap::NDataAccessorControl::NInMem {

THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
void TCollector::DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/, const TString& /*consumer*/) {
THashMap<ui64, TPortionDataAccessor> accessors;
AFL_VERIFY(portions.empty());
}

TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& /*consumer*/) {
TDataCategorized result;
for (auto&& i : portions) {
auto it = Accessors.find(i->GetPortionId());
AFL_VERIFY(it != Accessors.end());
accessors.emplace(i->GetPortionId(), it->second);
result.AddFromCache(it->second);
}
return accessors;
return result;
}

void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
Expand All @@ -22,4 +26,4 @@ void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
}
}

}
} // namespace NKikimr::NOlap::NDataAccessorControl::NInMem
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/in_mem/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ class TCollector: public IGranuleDataAccessor {
private:
using TBase = IGranuleDataAccessor;
THashMap<ui64, TPortionDataAccessor> Accessors;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
virtual void DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback,
const TString& consumer) override;
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) override;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
const std::vector<ui64>& remove) override;

Expand Down
22 changes: 12 additions & 10 deletions ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@
#include <ydb/core/tx/columnshard/data_accessor/events.h>
namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB {

THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
void TCollector::DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
THashMap<ui64, TPortionDataAccessor> accessors;
THashMap<ui64, TPortionInfo::TConstPtr> portionsToDirectAsk;
if (portions.size()) {
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portions, callback, consumer));
}
}

TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) {
TDataCategorized result;
for (auto&& p : portions) {
auto it = AccessorsCache.Find(p->GetPortionId());
if (it != AccessorsCache.End() && it.Key() == p->GetPortionId()) {
accessors.emplace(p->GetPortionId(), it.Value());
result.AddFromCache(it.Value());
} else {
portionsToDirectAsk.emplace(p->GetPortionId(), p);
result.AddToAsk(p);
}
}
if (portionsToDirectAsk.size()) {
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback, consumer));
}
return accessors;
return result;
}

void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/data_accessor/local_db/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ class TCollector: public IGranuleDataAccessor {

TLRUCache<ui64, TPortionDataAccessor, TNoopDelete, TMetadataSizeProvider> AccessorsCache;
using TBase = IGranuleDataAccessor;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
virtual void DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) override;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) override;

public:
Expand Down
108 changes: 108 additions & 0 deletions ydb/core/tx/columnshard/data_accessor/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,112 @@

namespace NKikimr::NOlap::NDataAccessorControl {

void TLocalManager::DrainQueue() {
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
std::optional<ui64> lastPathId;
IGranuleDataAccessor* lastDataAccessor = nullptr;
ui32 countToFlight = 0;
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) {
PortionsAsk.pop_front();
continue;
}
auto p = PortionsAsk.front().ExtractPortion();
PortionsAsk.pop_front();
if (!lastPathId || *lastPathId != p->GetPathId()) {
lastPathId = p->GetPathId();
auto it = Managers.find(p->GetPathId());
if (it == Managers.end()) {
lastDataAccessor = nullptr;
} else {
lastDataAccessor = it->second.get();
}
}
if (!lastDataAccessor) {
auto it = RequestsByPortion.find(p->GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
if (!i->IsFetched()) {
i->AddError(p->GetPathId(), "path id absent");
}
}
RequestsByPortion.erase(it);
} else {
portionsToAsk[p->GetPathId()].emplace_back(p);
++countToFlight;
}
}
for (auto&& i : portionsToAsk) {
auto it = Managers.find(i.first);
AFL_VERIFY(it != Managers.end());
auto dataAnalyzed = it->second->AnalyzeData(i.second, "ANALYZE");
for (auto&& accessor : dataAnalyzed.GetCachedAccessors()) {
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
if (!i->IsFetched()) {
i->AddAccessor(accessor);
}
}
RequestsByPortion.erase(it);
AFL_VERIFY(countToFlight);
--countToFlight;
}
if (dataAnalyzed.GetPortionsToAsk().size()) {
it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE");
}
}
}
PortionsAskInFlight += countToFlight;
}

void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString());
for (auto&& pathId : request->GetPathIds()) {
auto portions = request->StartFetching(pathId);
for (auto&& [_, i] : portions) {
auto itRequest = RequestsByPortion.find(i->GetPortionId());
if (itRequest == RequestsByPortion.end()) {
AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector<std::shared_ptr<TDataAccessorsRequest>>({request})).second);
PortionsAsk.emplace_back(i, request->GetAbortionFlag());
} else {
itRequest->second.emplace_back(request);
}
}
}
DrainQueue();
}

void TLocalManager::DoRegisterController(std::unique_ptr<IGranuleDataAccessor>&& controller, const bool update) {
if (update) {
auto it = Managers.find(controller->GetPathId());
if (it != Managers.end()) {
it->second = std::move(controller);
}
} else {
AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second);
}
}

void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) {
{
auto it = Managers.find(accessor.GetPortionInfo().GetPathId());
AFL_VERIFY(it != Managers.end());
it->second->ModifyPortions({ accessor }, {});
}
{
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
if (it != RequestsByPortion.end()) {
for (auto&& i : it->second) {
i->AddAccessor(accessor);
}
AFL_VERIFY(PortionsAskInFlight);
--PortionsAskInFlight;
}
RequestsByPortion.erase(it);
}
DrainQueue();
}

} // namespace NKikimr::NOlap
Loading

0 comments on commit ac47d76

Please sign in to comment.