Skip to content

Commit

Permalink
Support Console interoperation through distconf (#13109)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jan 9, 2025
1 parent 217e49d commit 0859ec3
Show file tree
Hide file tree
Showing 19 changed files with 526 additions and 98 deletions.
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/base/blobstorage_console_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NKikimr {
NKikimrBlobStorage::TEvControllerProposeConfigRequest, TEvBlobStorage::EvControllerProposeConfigRequest> {
TEvControllerProposeConfigRequest() = default;

TEvControllerProposeConfigRequest(const ui32 configHash, const ui32 configVersion) {
TEvControllerProposeConfigRequest(ui64 configHash, ui32 configVersion) {
Record.SetConfigHash(configHash);
Record.SetConfigVersion(configVersion);
}
Expand Down Expand Up @@ -68,6 +68,8 @@ namespace NKikimr {
struct TEvBlobStorage::TEvControllerValidateConfigResponse : TEventPB<TEvBlobStorage::TEvControllerValidateConfigResponse,
NKikimrBlobStorage::TEvControllerValidateConfigResponse, TEvBlobStorage::EvControllerValidateConfigResponse> {
TEvControllerValidateConfigResponse() = default;

std::optional<TString> InternalError;
};

struct TEvBlobStorage::TEvControllerReplaceConfigRequest : TEventPB<TEvBlobStorage::TEvControllerReplaceConfigRequest,
Expand Down
44 changes: 44 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "distconf.h"
#include "node_warden_impl.h"
#include <ydb/core/mind/dynamic_nameserver.h>
#include <ydb/library/yaml_config/yaml_config_helpers.h>
#include <ydb/library/yaml_config/yaml_config.h>
#include <library/cpp/streams/zstd/zstd.h>

namespace NKikimr::NStorage {

Expand Down Expand Up @@ -61,16 +64,48 @@ namespace NKikimr::NStorage {

bool TDistributedConfigKeeper::ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config) {
if (!StorageConfig || StorageConfig->GetGeneration() < config.GetGeneration()) {
StorageConfigYaml = StorageConfigFetchYaml = {};
StorageConfigFetchYamlHash = 0;
StorageConfigYamlVersion.reset();

if (config.HasConfigComposite()) {
try {
// parse the composite stream
TStringInput ss(config.GetConfigComposite());
TZstdDecompress zstd(&ss);
StorageConfigYaml = TString::Uninitialized(LoadSize(&zstd));
zstd.LoadOrFail(StorageConfigYaml.Detach(), StorageConfigYaml.size());
StorageConfigFetchYaml = TString::Uninitialized(LoadSize(&zstd));
zstd.LoadOrFail(StorageConfigFetchYaml.Detach(), StorageConfigFetchYaml.size());

// extract _current_ config version
auto metadata = NYamlConfig::GetMetadata(StorageConfigYaml);
Y_DEBUG_ABORT_UNLESS(metadata.Version.has_value());
StorageConfigYamlVersion = metadata.Version.value_or(0);

// and _fetched_ config hash
StorageConfigFetchYamlHash = NYaml::GetConfigHash(StorageConfigFetchYaml);
} catch (const std::exception& ex) {
Y_ABORT("ConfigComposite format incorrect: %s", ex.what());
}
}

StorageConfig.emplace(config);
if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() <= StorageConfig->GetGeneration()) {
ProposedStorageConfig.reset();
}

Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenStorageConfig(*StorageConfig,
ProposedStorageConfig ? &ProposedStorageConfig.value() : nullptr));

if (IsSelfStatic) {
PersistConfig({});
ApplyConfigUpdateToDynamicNodes(false);
}

ConnectToConsole();
SendConfigProposeRequest();

return true;
} else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() &&
StorageConfig->GetFingerprint() != config.GetFingerprint()) {
Expand Down Expand Up @@ -197,6 +232,10 @@ namespace NKikimr::NStorage {
Y_ABORT_UNLESS(!Binding);
} else {
Y_ABORT_UNLESS(RootState == ERootState::INITIAL || RootState == ERootState::ERROR_TIMEOUT);

// we can't have connection to the Console without being the root node
Y_ABORT_UNLESS(!ConsolePipeId);
Y_ABORT_UNLESS(!ConsoleConnected);
}
}
#endif
Expand Down Expand Up @@ -280,6 +319,11 @@ namespace NKikimr::NStorage {
fFunc(TEvents::TSystem::Gone, HandleGone);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
cFunc(TEvents::TSystem::Poison, PassAway);
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle);
hFunc(TEvBlobStorage::TEvControllerProposeConfigResponse, Handle);
hFunc(TEvBlobStorage::TEvControllerConsoleCommitResponse, Handle);
)
for (ui32 nodeId : std::exchange(UnsubscribeQueue, {})) {
UnsubscribeInterconnect(nodeId);
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ namespace NKikimr::NStorage {

// currently active storage config
std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig;
TString StorageConfigYaml; // the part we have to push (unless this is storage-only) to console
TString StorageConfigFetchYaml; // the part we would get is we fetch from console
ui64 StorageConfigFetchYamlHash = 0;
std::optional<ui32> StorageConfigYamlVersion;

// base config from config file
NKikimrBlobStorage::TStorageConfig BaseConfig;
Expand Down Expand Up @@ -261,6 +265,17 @@ namespace NKikimr::NStorage {
// child actors
THashSet<TActorId> ChildActors;

// pipe to Console
TActorId ConsolePipeId;
bool ConsoleConnected = false;
bool ConfigCommittedToConsole = false;
ui64 ValidateRequestCookie = 0;
ui64 ProposeRequestCookie = 0;
ui64 CommitRequestCookie = 0;
bool ProposeRequestInFlight = false;
std::optional<std::tuple<ui64, ui32>> ProposedConfigHashVersion;
std::vector<std::tuple<TActorId, TString, ui64>> ConsoleConfigValidationQ;

friend void ::Out<ERootState>(IOutputStream&, ERootState);

public:
Expand Down Expand Up @@ -421,6 +436,23 @@ namespace NKikimr::NStorage {

void Handle(NMon::TEvHttpInfo::TPtr ev);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Console interaction

void ConnectToConsole(bool enablingDistconf = false);
void DisconnectFromConsole();
void SendConfigProposeRequest();
void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerProposeConfigResponse::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerConsoleCommitResponse::TPtr ev);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void OnConsolePipeError();
bool EnqueueConsoleConfigValidation(TActorId queryId, bool enablingDistconf, TString yaml);

static std::optional<TString> UpdateConfigComposite(NKikimrBlobStorage::TStorageConfig& config, const TString& yaml,
const std::optional<TString>& fetched);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Consistency checking

Expand Down
244 changes: 244 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_console.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
#include "distconf.h"

#include <ydb/library/yaml_config/yaml_config.h>
#include <library/cpp/streams/zstd/zstd.h>

namespace NKikimr::NStorage {

void TDistributedConfigKeeper::ConnectToConsole(bool enablingDistconf) {
if (ConsolePipeId) {
return; // connection is already established
} else if (!Scepter) {
return; // this is not the root node
} else if (enablingDistconf) {
// NO RETURN HERE -> right now we are enabling distconf, so we can skip rest of the checks
} else if (!StorageConfig || !StorageConfig->GetSelfManagementConfig().GetEnabled()) {
return; // no self-management config enabled
} else if (!StorageConfig->HasStateStorageConfig()) {
return; // no way to find Console too
}

STLOG(PRI_DEBUG, BS_NODE, NWDC66, "ConnectToConsole: creating pipe to the Console");
ConsolePipeId = Register(NTabletPipe::CreateClient(SelfId(), MakeConsoleID(),
NTabletPipe::TClientRetryPolicy::WithRetries()));
}

void TDistributedConfigKeeper::DisconnectFromConsole() {
NTabletPipe::CloseAndForgetClient(SelfId(), ConsolePipeId);
ConsoleConnected = false;
}

void TDistributedConfigKeeper::SendConfigProposeRequest() {
if (!ConsoleConnected) {
return;
}

if (ProposeRequestInFlight) {
return; // still waiting for previous one
}

if (!StorageConfig || !StorageConfig->HasConfigComposite()) {
return; // no config yet
}

Y_ABORT_UNLESS(StorageConfigYamlVersion);

STLOG(PRI_DEBUG, BS_NODE, NWDC67, "SendConfigProposeRequest: sending propose request to the Console",
(StorageConfigFetchYamlHash, StorageConfigFetchYamlHash),
(StorageConfigYamlVersion, StorageConfigYamlVersion),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(ProposeRequestCookie, ProposeRequestCookie + 1));

Y_DEBUG_ABORT_UNLESS(!ProposedConfigHashVersion || ProposedConfigHashVersion == std::make_tuple(
StorageConfigFetchYamlHash, *StorageConfigYamlVersion));
ProposedConfigHashVersion.emplace(StorageConfigFetchYamlHash, *StorageConfigYamlVersion);
NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerProposeConfigRequest(
StorageConfigFetchYamlHash, *StorageConfigYamlVersion), ++ProposeRequestCookie);
ProposeRequestInFlight = true;
}

void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) {
auto& q = ConsoleConfigValidationQ;
auto pred = [&](const auto& item) {
const auto& [actorId, yaml, cookie] = item;
const bool match = cookie == ev->Cookie;
if (match) {
TActivationContext::Send(ev->Forward(actorId));
}
return match;
};
q.erase(std::remove_if(q.begin(), q.end(), pred), q.end());
}

void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerProposeConfigResponse::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC68, "received TEvControllerProposeConfigResponse",
(ConsoleConnected, ConsoleConnected),
(ProposeRequestInFlight, ProposeRequestInFlight),
(Cookie, ev->Cookie),
(ProposeRequestCookie, ProposeRequestCookie),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(Record, ev->Get()->Record));

if (!ConsoleConnected || !ProposeRequestInFlight || ev->Cookie != ProposeRequestCookie) {
return;
}
ProposeRequestInFlight = false;

const auto& record = ev->Get()->Record;
switch (record.GetStatus()) {
case NKikimrBlobStorage::TEvControllerProposeConfigResponse::HashMismatch:
case NKikimrBlobStorage::TEvControllerProposeConfigResponse::UnexpectedConfig:
// TODO: error condition; restart?
ProposedConfigHashVersion.reset();
break;

case NKikimrBlobStorage::TEvControllerProposeConfigResponse::CommitIsNeeded: {
if (!StorageConfig || !StorageConfig->HasConfigComposite() || ProposedConfigHashVersion !=
std::make_tuple(StorageConfigFetchYamlHash, *StorageConfigYamlVersion)) {
const char *err = "proposed config, but something has gone awfully wrong";
STLOG(PRI_CRIT, BS_NODE, NWDC69, err, (StorageConfig, StorageConfig),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(StorageConfigFetchYamlHash, StorageConfigFetchYamlHash),
(StorageConfigYamlVersion, StorageConfigYamlVersion));
Y_DEBUG_ABORT("%s", err);
return;
}

NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerConsoleCommitRequest(
StorageConfigYaml), ++CommitRequestCookie);
break;
}

case NKikimrBlobStorage::TEvControllerProposeConfigResponse::CommitIsNotNeeded:
// it's okay, just wait for another configuration change or something like that
ConfigCommittedToConsole = true;
break;
}
}

void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerConsoleCommitResponse::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC70, "received TEvControllerConsoleCommitResponse",
(ConsoleConnected, ConsoleConnected),
(Cookie, ev->Cookie),
(CommitRequestCookie, CommitRequestCookie),
(Record, ev->Get()->Record));

if (!ConsoleConnected || ev->Cookie != CommitRequestCookie) {
return;
}

const auto& record = ev->Get()->Record;
switch (record.GetStatus()) {
case NKikimrBlobStorage::TEvControllerConsoleCommitResponse::SessionMismatch:
DisconnectFromConsole();
ConnectToConsole();
break;

case NKikimrBlobStorage::TEvControllerConsoleCommitResponse::NotCommitted:
break;

case NKikimrBlobStorage::TEvControllerConsoleCommitResponse::Committed:
ConfigCommittedToConsole = true;
break;
}

ProposedConfigHashVersion.reset();
}

void TDistributedConfigKeeper::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC71, "received TEvClientConnected", (ConsolePipeId, ConsolePipeId),
(TabletId, ev->Get()->TabletId), (Status, ev->Get()->Status), (ClientId, ev->Get()->ClientId),
(ServerId, ev->Get()->ServerId));
if (ev->Get()->ClientId == ConsolePipeId) {
if (ev->Get()->Status == NKikimrProto::OK) {
Y_ABORT_UNLESS(!ConsoleConnected);
ConsoleConnected = true;
SendConfigProposeRequest();
for (auto& [actorId, yaml, cookie] : ConsoleConfigValidationQ) {
Y_ABORT_UNLESS(!cookie);
cookie = ++ValidateRequestCookie;
NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerValidateConfigRequest(
yaml), cookie);
}
} else {
OnConsolePipeError();
}
}
}

void TDistributedConfigKeeper::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC72, "received TEvClientDestroyed", (ConsolePipeId, ConsolePipeId),
(TabletId, ev->Get()->TabletId), (ClientId, ev->Get()->ClientId), (ServerId, ev->Get()->ServerId));
if (ev->Get()->ClientId == ConsolePipeId) {
OnConsolePipeError();
}
}

void TDistributedConfigKeeper::OnConsolePipeError() {
ConsolePipeId = {};
ConsoleConnected = false;
ConfigCommittedToConsole = false;
ProposedConfigHashVersion.reset();
ProposeRequestInFlight = false;
++CommitRequestCookie; // to prevent processing any messages

// cancel any pending requests
for (const auto& [actorId, yaml, cookie] : ConsoleConfigValidationQ) {
auto ev = std::make_unique<TEvBlobStorage::TEvControllerValidateConfigResponse>();
ev->InternalError = "pipe disconnected";
Send(actorId, ev.release());
}
ConsoleConfigValidationQ.clear();

ConnectToConsole();
}

std::optional<TString> TDistributedConfigKeeper::UpdateConfigComposite(NKikimrBlobStorage::TStorageConfig& config,
const TString& yaml, const std::optional<TString>& fetched) {
TString temp;
const TString *finalFetchedConfig = fetched ? &fetched.value() : &temp;

if (!fetched) { // fill in 'to-be-fetched' version of config with version incremented by one
try {
auto metadata = NYamlConfig::GetMetadata(yaml);
metadata.Cluster = metadata.Cluster.value_or("unknown"); // TODO: fix this
metadata.Version = metadata.Version.value_or(0) + 1;
temp = NYamlConfig::ReplaceMetadata(yaml, metadata);
} catch (const std::exception& ex) {
return ex.what();
}
}

TStringStream ss;
{
TZstdCompress zstd(&ss);
SaveSize(&zstd, yaml.size());
zstd.Write(yaml);
SaveSize(&zstd, finalFetchedConfig->size());
zstd.Write(*finalFetchedConfig);
}
config.SetConfigComposite(ss.Str());

return {};
}

bool TDistributedConfigKeeper::EnqueueConsoleConfigValidation(TActorId actorId, bool enablingDistconf, TString yaml) {
if (!ConsolePipeId) {
ConnectToConsole(enablingDistconf);
if (!ConsolePipeId) {
return false;
}
}

auto& [qActorId, qYaml, qCookie] = ConsoleConfigValidationQ.emplace_back(actorId, std::move(yaml), 0);

if (ConsoleConnected) {
qCookie = ++ValidateRequestCookie;
NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerValidateConfigRequest(qYaml),
qCookie);
}

return true;
}

}
Loading

0 comments on commit 0859ec3

Please sign in to comment.