Skip to content

Commit

Permalink
Merge 93fce54 into 8a6d04b
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jan 10, 2025
2 parents 8a6d04b + 93fce54 commit 7db614f
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 79 deletions.
4 changes: 2 additions & 2 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1138,8 +1138,8 @@ struct TEvBlobStorage {
ui32 Generation = 0; // tablet generation
};

using TForceBlockTabletData = TTabletData;
using TReaderTabletData = TTabletData;
struct TForceBlockTabletData : TTabletData { using TTabletData::TTabletData; };
struct TReaderTabletData : TTabletData { using TTabletData::TTabletData; };

std::optional<TReaderTabletData> ReaderTabletData;
std::optional<TForceBlockTabletData> ForceBlockTabletData;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/blob_depot/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ namespace NKikimr::NBlobDepot {
NLog::EPriority WatchdogPriority = NLog::PRI_WARN;
bool Destroyed = false;
std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
ui32 BlockChecksRemain = 3;

static constexpr TDuration WatchdogDuration = TDuration::Seconds(10);

Expand All @@ -377,6 +378,9 @@ namespace NKikimr::NBlobDepot {
virtual void OnIdAllocated(bool /*success*/) {}
virtual void OnDestroy(bool /*success*/) {}

NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, std::optional<ui32> generation,
ui32 *blockedGeneration = nullptr);

protected: // reading logic
struct TReadContext;
struct TReadArg {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/blob_depot/agent/query.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "agent_impl.h"
#include "blocks.h"

namespace NKikimr::NBlobDepot {

Expand Down Expand Up @@ -209,6 +210,20 @@ namespace NKikimr::NBlobDepot {
DoDestroy();
}

NKikimrProto::EReplyStatus TBlobDepotAgent::TQuery::CheckBlockForTablet(ui64 tabletId, std::optional<ui32> generation,
ui32 *blockedGeneration) {
const NKikimrProto::EReplyStatus status = Agent.BlocksManager.CheckBlockForTablet(tabletId, generation, this,
blockedGeneration);
if (status != NKikimrProto::OK) {
if (status != NKikimrProto::UNKNOWN) {
EndWithError(status, "block race detected");
} else if (!--BlockChecksRemain) {
EndWithError(NKikimrProto::ERROR, "failed to obtain blocked generation");
}
}
return status;
}

void TBlobDepotAgent::TQuery::DoDestroy() {
Y_ABORT_UNLESS(!Destroyed);
Destroyed = true;
Expand Down
9 changes: 1 addition & 8 deletions ydb/core/blob_depot/agent/storage_collect_garbage.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#include "agent_impl.h"
#include "blocks.h"

namespace NKikimr::NBlobDepot {

template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) {
class TCollectGarbageQuery : public TBlobStorageQuery<TEvBlobStorage::TEvCollectGarbage> {
ui32 BlockChecksRemain = 3;
ui32 KeepIndex = 0;
ui32 NumKeep;
ui32 DoNotKeepIndex = 0;
Expand All @@ -21,13 +19,8 @@ namespace NKikimr::NBlobDepot {
NumKeep = Request.Keep ? Request.Keep->size() : 0;
NumDoNotKeep = Request.DoNotKeep ? Request.DoNotKeep->size() : 0;

const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, Request.RecordGeneration, this, nullptr);
if (status == NKikimrProto::OK) {
if (CheckBlockForTablet(Request.TabletId, Request.RecordGeneration) == NKikimrProto::OK) {
IssueCollectGarbage();
} else if (status != NKikimrProto::UNKNOWN) {
EndWithError(status, "block race detected");
} else if (!--BlockChecksRemain) {
EndWithError(NKikimrProto::ERROR, "failed to acquire blocks");
}
}

Expand Down
30 changes: 8 additions & 22 deletions ydb/core/blob_depot/agent/storage_discover.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
#include "agent_impl.h"
#include "blocks.h"
#include "blob_mapping_cache.h"

namespace NKikimr::NBlobDepot {

template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) {
class TDiscoverQuery : public TBlobStorageQuery<TEvBlobStorage::TEvDiscover> {
ui32 GetBlockedGenerationRetriesRemain = 10;

bool DoneWithBlockedGeneration = false;
bool DoneWithData = false;

Expand All @@ -27,17 +24,15 @@ namespace NKikimr::NBlobDepot {

GenerateInitialResolve();
IssueResolve();
CheckBlockedGeneration();
}

if (Request.DiscoverBlockedGeneration) {
const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, std::nullopt, this, &BlockedGeneration);
if (status == NKikimrProto::OK) {
DoneWithBlockedGeneration = true;
} else if (status != NKikimrProto::UNKNOWN) {
EndWithError(status, "tablet was deleted");
}
} else {
DoneWithBlockedGeneration = true;
void CheckBlockedGeneration() {
if (!DoneWithBlockedGeneration) {
DoneWithBlockedGeneration = !Request.DiscoverBlockedGeneration ||
CheckBlockForTablet(Request.TabletId, std::nullopt, &BlockedGeneration) == NKikimrProto::OK;
}
CheckIfDone();
}

void GenerateInitialResolve() {
Expand Down Expand Up @@ -85,16 +80,7 @@ namespace NKikimr::NBlobDepot {
void OnUpdateBlock() override {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "OnUpdateBlock", (AgentId, Agent.LogId),
(QueryId, GetQueryId()));

const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, std::nullopt, this, &BlockedGeneration);
if (status == NKikimrProto::OK) {
DoneWithBlockedGeneration = true;
CheckIfDone();
} else if (status != NKikimrProto::UNKNOWN) {
EndWithError(status, "tablet was deleted");
} else if (!--GetBlockedGenerationRetriesRemain) {
EndWithError(NKikimrProto::ERROR, "too many retries to obtain blocked generation");
}
CheckBlockedGeneration();
}

void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) {
Expand Down
31 changes: 22 additions & 9 deletions ydb/core/blob_depot/agent/storage_get.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "agent_impl.h"
#include "blob_mapping_cache.h"
#include "blocks.h"

namespace NKikimr::NBlobDepot {

Expand All @@ -22,6 +21,24 @@ namespace NKikimr::NBlobDepot {
using TBlobStorageQuery::TBlobStorageQuery;

void Initiate() override {
if (const auto& blk = Request.ReaderTabletData) {
if (CheckBlockForTablet(blk->Id, blk->Generation) != NKikimrProto::OK) {
return;
}
}

if (const auto& blk = Request.ForceBlockTabletData; blk && blk->Generation) {
ui32 blockedGeneration;
if (CheckBlockForTablet(blk->Id, std::nullopt, &blockedGeneration) != NKikimrProto::OK) {
return;
}
if (blockedGeneration != blk->Generation) {
// this can happen only in distributed storage, but not possible in BlobDepot
return EndWithError(NKikimrProto::ERROR, "incorrect blocked generation provided for"
" ForceBlockTabletData in TEvGet query to BlobDepot");
}
}

if (IS_LOG_PRIORITY_ENABLED(NLog::PRI_TRACE, NKikimrServices::BLOB_DEPOT_EVENTS)) {
for (ui32 i = 0; i < Request.QuerySize; ++i) {
const auto& q = Request.Queries[i];
Expand All @@ -34,14 +51,6 @@ namespace NKikimr::NBlobDepot {
TGroupId::FromValue(Agent.VirtualGroupId));
AnswersRemain = Request.QuerySize;

if (Request.ReaderTabletData) {
auto status = Agent.BlocksManager.CheckBlockForTablet(Request.ReaderTabletData->Id, Request.ReaderTabletData->Generation, this, nullptr);
if (status == NKikimrProto::BLOCKED) {
EndWithError(status, "Fail TEvGet due to BLOCKED tablet generation");
return;
}
}

for (ui32 i = 0; i < Request.QuerySize; ++i) {
auto& query = Request.Queries[i];

Expand All @@ -65,6 +74,10 @@ namespace NKikimr::NBlobDepot {
CheckAndFinish();
}

void OnUpdateBlock() override {
Initiate();
}

bool ProcessSingleResult(ui32 queryIdx, const TKeyResolved& result) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (AgentId, Agent.LogId),
(QueryId, GetQueryId()), (QueryIdx, queryIdx), (Result, result));
Expand Down
24 changes: 5 additions & 19 deletions ydb/core/blob_depot/agent/storage_get_block.cpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
#include "agent_impl.h"
#include "blocks.h"

namespace NKikimr::NBlobDepot {

template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGetBlock>(std::unique_ptr<IEventHandle> ev) {
class TGetBlockQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGetBlock> {
ui32 BlockedGeneration = 0;
ui32 RetriesRemain = 10;

private:
void TryGetBlock() {
const auto status = Agent.BlocksManager.CheckBlockForTablet(Request.TabletId, std::nullopt, this, &BlockedGeneration);
if (status == NKikimrProto::OK) {
EndWithSuccess(std::make_unique<TEvBlobStorage::TEvGetBlockResult>(NKikimrProto::OK, Request.TabletId, BlockedGeneration));
} else if (status != NKikimrProto::UNKNOWN) {
EndWithError(status, "BlobDepot tablet is unreachable");
}
}

public:
using TBlobStorageQuery::TBlobStorageQuery;

void Initiate() override {
TryGetBlock();
if (CheckBlockForTablet(Request.TabletId, std::nullopt, &BlockedGeneration) == NKikimrProto::OK) {
EndWithSuccess(std::make_unique<TEvBlobStorage::TEvGetBlockResult>(NKikimrProto::OK,
Request.TabletId, BlockedGeneration));
}
}

void OnUpdateBlock() override {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA52, "OnUpdateBlock", (AgentId, Agent.LogId),
(QueryId, GetQueryId()));

if (!--RetriesRemain) {
EndWithError(NKikimrProto::ERROR, "too many retries to get blocked generation");
return;
}
TryGetBlock();
Initiate();
}

void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse /*response*/) override {
Expand Down
10 changes: 2 additions & 8 deletions ydb/core/blob_depot/agent/storage_put.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "agent_impl.h"
#include "blocks.h"

namespace NKikimr::NBlobDepot {

Expand All @@ -9,7 +8,6 @@ namespace NKikimr::NBlobDepot {
const bool SuppressFooter = true;
const bool IssueUncertainWrites = false;

std::vector<ui32> BlockChecksRemain;
ui32 PutsInFlight = 0;
bool PutsIssued = false;
bool WaitingForCommitBlobSeq = false;
Expand Down Expand Up @@ -44,7 +42,7 @@ namespace NKikimr::NBlobDepot {
return EndWithError(NKikimrProto::ERROR, "blob id is zero");
}

BlockChecksRemain.resize(1 + Request.ExtraBlockChecks.size(), 3); // set number of tries for every block
BlockChecksRemain = (1 + Request.ExtraBlockChecks.size()) * 3; // set number of tries for every block
CheckBlocks();
}

Expand All @@ -54,13 +52,9 @@ namespace NKikimr::NBlobDepot {
const auto *blkp = i ? &Request.ExtraBlockChecks[i - 1] : nullptr;
const ui64 tabletId = blkp ? blkp->first : Request.Id.TabletID();
const ui32 generation = blkp ? blkp->second : Request.Id.Generation();
const auto status = Agent.BlocksManager.CheckBlockForTablet(tabletId, generation, this, nullptr);
const auto status = CheckBlockForTablet(tabletId, generation);
if (status == NKikimrProto::OK) {
continue;
} else if (status != NKikimrProto::UNKNOWN) {
return EndWithError(status, "block race detected");
} else if (!--BlockChecksRemain[i]) {
return EndWithError(NKikimrProto::ERROR, "failed to acquire blocks");
} else {
someBlocksMissing = true;
}
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/blobstorage/dsproxy/mock/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ namespace NFake {
}

TEvBlobStorage::TEvGetResult* Handle(TEvBlobStorage::TEvGet *msg) {
if (const auto& blk = msg->ReaderTabletData) {
if (IsBlocked(blk->Id, blk->Generation)) {
auto response = msg->MakeErrorResponse(NKikimrProto::BLOCKED, "block race detected", GroupId);
return response.release();
}
}
if (const auto& blk = msg->ForceBlockTabletData; blk && blk->Generation) {
auto it = Blocks.find(blk->Id);
Y_VERIFY_S(it != Blocks.end() && it->second == blk->Generation, "incorrect ForceBlockTabletData"
<< " expected Generation# " << blk->Generation
<< " having Generation# " << (it != Blocks.end() ? ToString(it->second) : "none"));
}

// prepare result structure holding the returned data
auto result = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, msg->QuerySize, GroupId);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ namespace NKikimr {
ShowInternals = 2,
};

using TForceBlockTabletData = TEvBlobStorage::TEvGet::TTabletData;
using TForceBlockTabletData = TEvBlobStorage::TEvGet::TForceBlockTabletData;

struct TExtremeQuery : std::tuple<TLogoBlobID, ui32, ui32, const ui64*> {
TExtremeQuery(const TLogoBlobID &logoBlobId, ui32 sh, ui32 sz, const ui64 *cookie = nullptr)
Expand Down
19 changes: 9 additions & 10 deletions ydb/core/blobstorage/vdisk/skeleton/skeleton_block_and_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
namespace NKikimr {

class TBlockAndGetActor : public TActorBootstrapped<TBlockAndGetActor> {
private:
static constexpr auto VBLOCK_DEFAULT_DEADLINE_SECONDS = 50;
public:
TBlockAndGetActor() = delete;
explicit TBlockAndGetActor(
Expand All @@ -40,13 +38,15 @@ class TBlockAndGetActor : public TActorBootstrapped<TBlockAndGetActor> {

void Bootstrap() {
// create TEvVBlock request
const TInstant deadline = Request->Get()->Record.GetMsgQoS().HasDeadlineSeconds()
? TInstant::Seconds(Request->Get()->Record.GetMsgQoS().GetDeadlineSeconds())
: TInstant::Max();

auto request = std::make_unique<TEvBlobStorage::TEvVBlock>(
Request->Get()->Record.GetForceBlockTabletData().GetId(),
Request->Get()->Record.GetForceBlockTabletData().GetGeneration(),
VDiskIDFromVDiskID(Request->Get()->Record.GetVDiskID()),
Request->Get()->Record.GetMsgQoS().HasDeadlineSeconds() ?
TInstant::Seconds(Request->Get()->Record.GetMsgQoS().GetDeadlineSeconds()) :
TInstant::Seconds(VBLOCK_DEFAULT_DEADLINE_SECONDS)
deadline
);

// send TEvVBlock request
Expand All @@ -62,11 +62,10 @@ class TBlockAndGetActor : public TActorBootstrapped<TBlockAndGetActor> {

void Handle(TEvBlobStorage::TEvVBlockResult::TPtr &ev) {
switch (ev->Get()->Record.GetStatus()) {
case NKikimrProto::OK:
break;
case NKikimrProto::ALREADY:
break;
default: {
case NKikimrProto::OK:
case NKikimrProto::ALREADY:
break;
default: {
// we failed to block required generation, so return failure
auto response = NErrBuilder::ErroneousResult(
VCtx,
Expand Down

0 comments on commit 7db614f

Please sign in to comment.