Skip to content

Commit

Permalink
Merge f5d27a1 into 1e8be6b
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 22, 2024
2 parents 1e8be6b + f5d27a1 commit 6fc5c69
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 141 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TTestEvictionBase {
UNIT_ASSERT_GT(columnRawBytes, 0);
}

TestHelper->SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
TestHelper->SetTiering("/Root/olapStore/olapTable", "Root/tier1", "timestamp");
csController->WaitActualization(TDuration::Seconds(5));

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ std::shared_ptr<NKikimr::NOlap::IBlobsStorageOperator> IStoragesManager::GetOper
void IStoragesManager::OnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
AFL_VERIFY(tiers);
for (auto&& i : tiers->GetManagers()) {
GetOperatorGuarantee(i.first)->OnTieringModified(tiers);
GetOperatorGuarantee(i.first.GetPath())->OnTieringModified(tiers);
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
Tiers->Start(Tiers);
if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
for (const auto& [id, tier] : tiersSnapshot) {
Tiers->UpdateTierConfig(tier, CanonizePath(id), false);
Tiers->UpdateTierConfig(tier, NTiers::TExternalStorageId(id), false);
}
}
BackgroundSessionsManager = std::make_shared<NOlap::NBackground::TSessionsManager>(
Expand All @@ -131,7 +131,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
void TColumnShard::Handle(TEvPrivate::TEvTieringModified::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
for (const auto& [id, tier] : tiersSnapshot) {
Tiers->UpdateTierConfig(tier, CanonizePath(id), false);
Tiers->UpdateTierConfig(tier, NTiers::TExternalStorageId(id), false);
}
}

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
}

{
THashSet<TString> usedTiers;
THashSet<NTiers::TExternalStorageId> usedTiers;
TTableInfo table(pathId);
if (tableProto.HasTtlSettings()) {
const auto& ttlSettings = tableProto.GetTtlSettings();
Expand Down Expand Up @@ -452,7 +452,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
schema = alterProto.GetSchema();
}

THashSet<TString> usedTiers;
THashSet<NTiers::TExternalStorageId> usedTiers;
if (alterProto.HasTtlSettings()) {
const auto& ttlSettings = alterProto.GetTtlSettings();
*tableVerProto.MutableTtlSettings() = ttlSettings;
Expand Down Expand Up @@ -1582,10 +1582,10 @@ void TColumnShard::Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs:
Execute(new TTxRemoveSharedBlobs(this, blobs, NActors::ActorIdFromProto(ev->Get()->Record.GetSourceActorId()), ev->Get()->Record.GetStorageId()), ctx);
}

void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<TString>& usedTiers) {
void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers) {
AFL_VERIFY(Tiers);
if (!usedTiers.empty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tiers", JoinStrings(usedTiers.begin(), usedTiers.end(), ","));
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tier_count", usedTiers.size());
Tiers->EnablePathId(pathId, usedTiers);
} else {
Tiers->DisablePathId(pathId);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
putStatus.OnYellowChannels(Executor());
}

void ActivateTiering(const ui64 pathId, const THashSet<TString>& usedTiers);
void ActivateTiering(const ui64 pathId, const THashSet<NTiers::TExternalStorageId>& usedTiers);
void OnTieringModified(const std::optional<ui64> pathId = {});

public:
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ TTiering::TTieringContext TTiering::GetTierToMove(const std::shared_ptr<arrow::S
std::optional<TDuration> nextTierDuration;
for (auto& tierRef : GetOrderedTiers()) {
auto& tierInfo = tierRef.Get();
if (skipEviction && tierInfo.GetName() != NTiering::NCommon::DeleteTierName) {
if (skipEviction && tierInfo.GetStorageId()) {
continue;
}
const TString tierName = tierInfo.GetStorageId() ? tierInfo.GetStorageId()->GetPath() : NTiering::NCommon::DeleteTierName;
auto mpiOpt = tierInfo.ScalarToInstant(max);
Y_ABORT_UNLESS(mpiOpt);
const TInstant maxTieringPortionInstant = *mpiOpt;
const TDuration dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(now);
if (!dWaitLocal) {
return TTieringContext(tierInfo.GetName(), tierInfo.GetEvictInstant(now) - maxTieringPortionInstant, nextTierName, nextTierDuration);
return TTieringContext(tierName, tierInfo.GetEvictInstant(now) - maxTieringPortionInstant, nextTierName, nextTierDuration);
} else {
nextTierName = tierInfo.GetName();
nextTierName = tierName;
nextTierDuration = dWaitLocal;
}
}
Expand Down
61 changes: 21 additions & 40 deletions ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <ydb/core/tx/tiering/tier/identifier.h>

#include <ydb/library/formats/arrow/common/validation.h>

Expand All @@ -16,7 +17,7 @@ namespace NKikimr::NOlap {

class TTierInfo {
private:
YDB_READONLY_DEF(TString, Name);
YDB_READONLY_DEF(std::optional<NColumnShard::NTiers::TExternalStorageId>, StorageId);
YDB_READONLY_DEF(TString, EvictColumnName);
YDB_READONLY_DEF(TDuration, EvictDuration);

Expand All @@ -27,12 +28,12 @@ class TTierInfo {
return NTiering::NCommon::DeleteTierName;
}

TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0)
: Name(tierName)
TTierInfo(const std::optional<NColumnShard::NTiers::TExternalStorageId>& storage, TDuration evictDuration, const TString& column,
ui32 unitsInSecond = 0)
: StorageId(storage)
, EvictColumnName(column)
, EvictDuration(evictDuration)
, TtlUnitsInSecond(unitsInSecond) {
Y_ABORT_UNLESS(!!Name);
Y_ABORT_UNLESS(!!EvictColumnName);
}

Expand Down Expand Up @@ -72,7 +73,8 @@ class TTierInfo {

TString GetDebugString() const {
TStringBuilder sb;
sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";serializer=";
sb << "storage=" << (StorageId ? StorageId->GetPath() : "") << ";duration=" << EvictDuration << ";column=" << EvictColumnName
<< ";serializer=";
if (Serializer) {
sb << Serializer->DebugString();
} else {
Expand All @@ -95,19 +97,19 @@ class TTierRef {
if (Info->GetEvictDuration() > b.Info->GetEvictDuration()) {
return true;
} else if (Info->GetEvictDuration() == b.Info->GetEvictDuration()) {
if (Info->GetName() == NTiering::NCommon::DeleteTierName) {
if (!Info->GetStorageId()) {
return true;
} else if (b.Info->GetName() == NTiering::NCommon::DeleteTierName) {
} else if (!b.Info->GetStorageId()) {
return false;
}
return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
return *Info->GetStorageId() > *b.Info->GetStorageId(); // add stability: smaller name is hotter
}
return false;
}

bool operator == (const TTierRef& b) const {
return Info->GetEvictDuration() == b.Info->GetEvictDuration()
&& Info->GetName() == b.Info->GetName();
&& Info->GetStorageId() == b.Info->GetStorageId();
}

const TTierInfo& Get() const {
Expand All @@ -124,8 +126,7 @@ class TTierRef {

class TTiering {
using TProto = NKikimrSchemeOp::TColumnDataLifeCycle::TTtl;
using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>;
TTiersMap TierByName;
using TTiersMap = THashMap<NColumnShard::NTiers::TExternalStorageId, std::shared_ptr<TTierInfo>>;
TSet<TTierRef> OrderedTiers;
std::optional<TString> TTLColumnName;
public:
Expand Down Expand Up @@ -169,18 +170,6 @@ class TTiering {

TTieringContext GetTierToMove(const std::shared_ptr<arrow::Scalar>& max, const TInstant now, const bool skipEviction) const;

const TTiersMap& GetTierByName() const {
return TierByName;
}

std::shared_ptr<TTierInfo> GetTierByName(const TString& name) const {
auto it = TierByName.find(name);
if (it == TierByName.end()) {
return nullptr;
}
return it->second;
}

const TSet<TTierRef>& GetOrderedTiers() const {
return OrderedTiers;
}
Expand All @@ -203,20 +192,10 @@ class TTiering {
return false;
}

TierByName.emplace(tier->GetName(), tier);
OrderedTiers.emplace(tier);
return true;
}

std::optional<NArrow::NSerialization::TSerializerContainer> GetSerializer(const TString& name) const {
auto it = TierByName.find(name);
if (it != TierByName.end()) {
Y_ABORT_UNLESS(!name.empty());
return it->second->GetSerializer();
}
return {};
}

TConclusionStatus DeserializeFromProto(const TProto& serialized) {
if (serialized.HasExpireAfterBytes()) {
return TConclusionStatus::Fail("TTL by size is not supported.");
Expand Down Expand Up @@ -264,24 +243,26 @@ class TTiering {

TString GetDebugString() const {
TStringBuilder sb;
sb << "[";
for (auto&& i : OrderedTiers) {
sb << i.Get().GetDebugString() << "; ";
}
sb << "]";
return sb;
}

THashSet<TString> GetUsedTiers() const {
THashSet<TString> tiers;
for (const auto& [name, info] : TierByName) {
if (name != NTiering::NCommon::DeleteTierName) {
tiers.emplace(name);
THashSet<NColumnShard::NTiers::TExternalStorageId> GetUsedTiers() const {
THashSet<NColumnShard::NTiers::TExternalStorageId> tiers;
for (const auto& tier : OrderedTiers) {
if (const auto& storageId = tier.Get().GetStorageId()) {
tiers.emplace(*storageId);
}
}
return tiers;
}

static THashSet<TString> GetUsedTiers(const TProto& ttlSettings) {
THashSet<TString> usedTiers;
static THashSet<NColumnShard::NTiers::TExternalStorageId> GetUsedTiers(const TProto& ttlSettings) {
THashSet<NColumnShard::NTiers::TExternalStorageId> usedTiers;
for (const auto& tier : ttlSettings.GetTiers()) {
if (tier.HasEvictToExternalStorage()) {
usedTiers.emplace(CanonizePath(tier.GetEvictToExternalStorage().GetStorage()));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/scheme/tiering/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(

PEERDIR(
ydb/core/formats/arrow/serializer
ydb/core/tx/tiering/tier
)

END()
2 changes: 1 addition & 1 deletion ydb/core/tx/tiering/abstract/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace NKikimr::NColumnShard {

const NTiers::TManager& ITiersManager::GetManagerVerified(const TString& tierId) const {
const NTiers::TManager& ITiersManager::GetManagerVerified(const NTiers::TExternalStorageId& tierId) const {
auto* result = GetManagerOptional(tierId);
AFL_VERIFY(result)("tier_id", tierId);
return *result;
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/tiering/abstract/manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <util/generic/string.h>
#include <map>
#include <ydb/core/tx/tiering/tier/identifier.h>

namespace NKikimr::NColumnShard {
namespace NTiers {
Expand All @@ -9,9 +10,9 @@ class TManager;

class ITiersManager {
public:
const NTiers::TManager& GetManagerVerified(const TString& tierId) const;
virtual const NTiers::TManager* GetManagerOptional(const TString& tierId) const = 0;
virtual const std::map<TString, NTiers::TManager>& GetManagers() const = 0;
const NTiers::TManager& GetManagerVerified(const NTiers::TExternalStorageId& tierId) const;
virtual const NTiers::TManager* GetManagerOptional(const NTiers::TExternalStorageId& tierId) const = 0;
virtual const std::map<NTiers::TExternalStorageId, NTiers::TManager>& GetManagers() const = 0;
virtual ~ITiersManager() = default;
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tiering/abstract/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(

PEERDIR(
ydb/library/actors/core
ydb/core/tx/tiering/tier
)

END()
Loading

0 comments on commit 6fc5c69

Please sign in to comment.