Skip to content

Commit

Permalink
fix hanging in case timeout with empty reply from scanner (ydb-platfo…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 25, 2025
1 parent c678726 commit 5d4df03
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu

void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
auto g = Stats->MakeGuard("ack");
Y_ABORT_UNLESS(!AckReceivedInstant);
AFL_VERIFY(!AckReceivedInstant);
AckReceivedInstant = TMonotonic::Now();

AFL_VERIFY(ev->Get()->Generation == ScanGen)("ev_gen", ev->Get()->Generation)("scan_gen", ScanGen);

ChunksLimiter = TChunksLimiter(ev->Get()->FreeSpace, ev->Get()->MaxChunksCount);
Y_ABORT_UNLESS(ev->Get()->MaxChunksCount == 1);
AFL_VERIFY(ev->Get()->MaxChunksCount == 1);
ACFL_DEBUG("event", "TEvScanDataAck")("info", ChunksLimiter.DebugString());
if (ScanIterator) {
if (!!ScanIterator->GetAvailableResultsCount() && !*ScanIterator->GetAvailableResultsCount()) {
Expand Down Expand Up @@ -183,6 +183,7 @@ void TColumnShardScan::HandleScan(TEvents::TEvWakeup::TPtr& /*ev*/) {
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);

if (TMonotonic::Now() >= GetDeadline()) {
SendScanError("ColumnShard scanner timeout");
Finish(NColumnShard::TScanCounters::EStatusFinish::Deadline);
} else {
ScheduleWakeup(GetDeadline());
Expand Down Expand Up @@ -220,6 +221,9 @@ bool TColumnShardScan::ProduceResults() noexcept {

std::shared_ptr<TPartialReadResult> resultOpt = resultConclusion.DetachResult();
if (!resultOpt) {
if (!!AckReceivedInstant) {
LastResultInstant = TMonotonic::Now();
}
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
return false;
}
Expand Down Expand Up @@ -370,25 +374,20 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {

PageFaults = 0;

LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " send ScanData to " << ScanComputeActorId << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen
<< " tablet: " << TabletId << " bytes: " << Bytes << " rows: " << Rows << " page faults: " << Result->PageFaults
<< " finished: " << Result->Finished << " pageFault: " << Result->PageFault << " arrow schema:\n"
<< (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : ""));

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "send_data")("compute_actor_id", ScanComputeActorId)("bytes", Bytes)("rows", Rows)(
"faults", Result->PageFaults)("finished", Result->Finished)("fault", Result->PageFault)(
"schema", (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : ""));
Finished = Result->Finished;
if (Finished) {
ALS_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)
<< "Scanner finished " << ScanActorId << " and sent to " << ScanComputeActorId << " packs: " << PacksSum << " txId: " << TxId
<< " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " bytes: " << Bytes << "/" << BytesSum
<< " rows: " << Rows << "/" << RowsSum << " page faults: " << Result->PageFaults << " finished: " << Result->Finished
<< " pageFault: " << Result->PageFault << " stats:" << Stats->ToJson()
<< ";iterator:" << (ScanIterator ? ScanIterator->DebugString(false) : "NO");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "scan_finished")("compute_actor_id", ScanComputeActorId)("packs_sum", PacksSum)(
"bytes", Bytes)("bytes_sum", BytesSum)("rows", Rows)("rows_sum", RowsSum)("faults", Result->PageFaults)(
"finished", Result->Finished)("fault", Result->PageFault)("stats", Stats->ToJson())(
"iterator", (ScanIterator ? ScanIterator->DebugString(false) : "NO"));
Result->StatsOnFinished = std::make_shared<TScanStatsOwner>(ScanIterator->GetStats());
} else {
Y_ABORT_UNLESS(ChunksLimiter.Take(Bytes));
AFL_VERIFY(ChunksLimiter.Take(Bytes));
Result->RequestedBytesLimitReached = !ChunksLimiter.HasMore();
Y_ABORT_UNLESS(AckReceivedInstant);
AFL_VERIFY(AckReceivedInstant);
ScanCountersPool.AckWaitingInfo(TMonotonic::Now() - *AckReceivedInstant);
}
ReadMetadataRange->OnReplyConstruction(TabletId, *Result);
Expand All @@ -410,6 +409,8 @@ void TColumnShardScan::SendScanError(const TString& reason) {
ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_RESULT_UNAVAILABLE, msg);
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "scan_finish")("compute_actor_id", ScanComputeActorId)("stats", Stats->ToJson())(
"iterator", (ScanIterator ? ScanIterator->DebugString(false) : "NO"));

Send(ScanComputeActorId, ev.Release());
}
Expand All @@ -421,6 +422,8 @@ void TColumnShardScan::Finish(const NColumnShard::TScanCounters::EStatusFinish s
AFL_VERIFY(StartInstant);
ScanCountersPool.OnScanFinished(status, TMonotonic::Now() - *StartInstant);
ReportStats();
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "scan_finish")("compute_actor_id", ScanComputeActorId)("stats", Stats->ToJson())(
"iterator", (ScanIterator ? ScanIterator->DebugString(false) : "NO"));
PassAway();
}

Expand Down

0 comments on commit 5d4df03

Please sign in to comment.