Skip to content

Commit

Permalink
batch writing fixes (#13246)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 10, 2025
1 parent 1d239d7 commit b754b20
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 27 deletions.
5 changes: 4 additions & 1 deletion ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2949,7 +2949,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
Tests::NCommon::TLoggerInit(kikimr)
.SetComponents({ NKikimrServices::TX_COLUMNSHARD_WRITE, NKikimrServices::TX_COLUMNSHARD }, "CS")
.SetPriority(NActors::NLog::PRI_DEBUG)
.Initialize();

TLocalHelper(kikimr).CreateTestOlapTables();

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,8 @@ message TColumnShardConfig {
optional bool UseSlicesFilter = 31 [default = true];
optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000];
optional uint64 WritingBufferVolumeMb = 33 [default = 32];
optional uint64 WritingInFlightRequestsCountLimit = 34 [default = 1000];
optional uint64 WritingInFlightRequestBytesLimit = 35 [default = 128000000];
}

message TSchemeShardConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
operation->OnWriteFinish(txc, {}, true);
} else {
operation->OnWriteFinish(txc, InsertWriteIds, false);
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(InsertWriteIds, operation->GetWriteId());
}
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(InsertWriteIds, operation->GetWriteId());
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const ui64
return EOverloadStatus::Disk;
}
ui64 txLimit = Settings.OverloadTxInFlight;
ui64 writesLimit = Settings.OverloadWritesInFlight;
ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
const ui64 writesLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetWritingInFlightRequestsCountLimit() : 1000;
const ui64 writesSizeLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetWritingInFlightRequestBytesLimit() : (((ui64)128) << 20);
if (txLimit && Executor()->GetStats().TxInFly > txLimit) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "shard_overload")("reason", "tx_in_fly")("sum", Executor()->GetStats().TxInFly)(
"limit", txLimit);
Expand Down Expand Up @@ -152,8 +152,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
} else {
auto operation = OperationsManager->GetOperation((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
auto operation = OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetLockId(),
ev->Get()->GetWriteResultStatus(), ev->Get()->GetErrorMessage() ? ev->Get()->GetErrorMessage() : "put data fails");
ctx.Send(writeMeta.GetSource(), result.release(), 0, operation->GetCookie());
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ struct TSettings {
TControlWrapper CacheDataAfterIndexing;
TControlWrapper CacheDataAfterCompaction;
static constexpr ui64 OverloadTxInFlight = 1000;
static constexpr ui64 OverloadWritesInFlight = 1000;
static constexpr ui64 OverloadWritesSizeInFlight = 128 * 1024 * 1024;

TSettings()
: BlobWriteGrouppingEnabled(1, 0, 1)
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class TWriteCounters: public TCommonCountersOwner {
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;
NMonitoring::THistogramPtr HistogramDurationQueueWait;
NMonitoring::THistogramPtr HistogramBatchDataCount;
NMonitoring::THistogramPtr HistogramBatchDataSize;

public:
const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize;
Expand All @@ -44,13 +46,20 @@ class TWriteCounters: public TCommonCountersOwner {
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBatchDataCount = TBase::GetHistogram("Write/Batch/Size/Count", NMonitoring::ExponentialHistogram(18, 2, 1));
HistogramBatchDataSize = TBase::GetHistogram("Write/Batch/Size/Bytes", NMonitoring::ExponentialHistogram(18, 2, 128));
}

void OnIncomingData(const ui64 dataSize) const {
VolumeWriteData->Add(dataSize);
HistogramBytesWriteDataCount->Collect((i64)dataSize, 1);
HistogramBytesWriteDataBytes->Collect((i64)dataSize, dataSize);
}

void OnAggregationWrite(const ui64 count, const ui64 dataSize) const {
HistogramBatchDataCount->Collect((i64)count, 1);
HistogramBatchDataSize->Collect((i64)dataSize, 1);
}
};

class TCSCounters: public TCommonCountersOwner {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void TActor::Flush() {
}

void TActor::Handle(TEvFlushBuffer::TPtr& /*ev*/) {
if (AppDataVerified().ColumnShardConfig.HasWritingBufferDurationMs()) {
if (AppDataVerified().ColumnShardConfig.HasWritingBufferDurationMs() && AppDataVerified().ColumnShardConfig.GetWritingBufferDurationMs()) {
FlushDuration = TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetWritingBufferDurationMs());
} else {
FlushDuration = std::nullopt;
Expand Down Expand Up @@ -67,10 +67,12 @@ void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) {

void TWriteAggregation::Flush(const ui64 tabletId) {
if (Units.size()) {
Context.GetWritingCounters()->OnAggregationWrite(Units.size(), SumSize);
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<TBuildPackSlicesTask>(std::move(Units), Context, PathId, tabletId, ModificationType);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
Units.clear();
SumSize = 0;
}
}

Expand Down
19 changes: 8 additions & 11 deletions ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) {
auto operation = std::make_shared<TWriteOperation>(0, writeId, lockId, cookie, status, TInstant::Seconds(createdAtSec),
granuleShardingVersionId, NEvWrite::EModificationType::Upsert, false);
operation->FromProto(metaProto);
LinkInsertWriteIdToOperationWriteId(operation->GetInsertWriteIds(), operation->GetWriteId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "register_operation_on_load")("operation_id", operation->GetWriteId());
AFL_VERIFY(operation->GetStatus() != EOperationStatus::Draft);

AFL_VERIFY(Operations.emplace(operation->GetWriteId(), operation).second);
LinkInsertWriteIdToOperationWriteId(operation->GetInsertWriteIds(), operation->GetWriteId());

auto it = LockFeatures.find(lockId);
if (it == LockFeatures.end()) {
it = LockFeatures.emplace(lockId, TLockFeatures(lockId, 0)).first;
Expand Down Expand Up @@ -143,14 +144,6 @@ void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const u
OnTransactionFinishOnComplete(aborted, *lock, txId);
}

TWriteOperation::TPtr TOperationsManager::GetOperation(const TOperationWriteId writeId) const {
auto it = Operations.find(writeId);
if (it == Operations.end()) {
return nullptr;
}
return it->second;
}

void TOperationsManager::OnTransactionFinishOnExecute(
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
for (auto&& op : operations) {
Expand Down Expand Up @@ -179,8 +172,10 @@ void TOperationsManager::RemoveOperationOnExecute(const TWriteOperation::TPtr& o

void TOperationsManager::RemoveOperationOnComplete(const TWriteOperation::TPtr& op) {
for (auto&& i : op->GetInsertWriteIds()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "remove_by_insert_id")("id", i)("operation_id", op->GetWriteId());
AFL_VERIFY(InsertWriteIdToOpWriteId.erase(i));
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "remove_operation")("operation_id", op->GetWriteId());
Operations.erase(op->GetWriteId());
}

Expand Down Expand Up @@ -210,7 +205,9 @@ TWriteOperation::TPtr TOperationsManager::RegisterOperation(const ui64 pathId, c
auto writeId = BuildNextOperationWriteId();
auto operation = std::make_shared<TWriteOperation>(pathId, writeId, lockId, cookie, EOperationStatus::Draft, AppData()->TimeProvider->Now(),
granuleShardingVersionId, mType, portionsWriting);
Y_ABORT_UNLESS(Operations.emplace(operation->GetWriteId(), operation).second);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "register_operation")("operation_id", operation->GetWriteId())(
"last", LastWriteId);
AFL_VERIFY(Operations.emplace(operation->GetWriteId(), operation).second);
GetLockVerified(operation->GetLockId()).MutableWriteOperations().emplace_back(operation);
GetLockVerified(operation->GetLockId()).AddWrite();
return operation;
Expand Down
18 changes: 14 additions & 4 deletions ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,25 +134,35 @@ class TOperationsManager {

TWriteOperation::TPtr GetOperationByInsertWriteIdVerified(const TInsertWriteId insertWriteId) const {
auto it = InsertWriteIdToOpWriteId.find(insertWriteId);
AFL_VERIFY(it != InsertWriteIdToOpWriteId.end());
AFL_VERIFY(it != InsertWriteIdToOpWriteId.end())("write_id", insertWriteId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "ask_by_insert_id")("write_id", insertWriteId)("operation_id", it->second);
return GetOperationVerified(it->second);
}

void LinkInsertWriteIdToOperationWriteId(const std::vector<TInsertWriteId>& insertions, const TOperationWriteId operationId) {
const auto op = GetOperationVerified(operationId);
AFL_VERIFY(op->GetInsertWriteIds() == insertions)("operation_data", JoinSeq(", ", op->GetInsertWriteIds()))(
"expected", JoinSeq(", ", insertions));
for (auto&& i : insertions) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "add_by_insert_id")("id", i)("operation_id", operationId);
InsertWriteIdToOpWriteId.emplace(i, operationId);
}
}
bool Load(NTabletFlatExecutor::TTransactionContext& txc);
void AddEventForTx(TColumnShard& owner, const ui64 txId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
void AddEventForLock(TColumnShard& owner, const ui64 lockId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);

TWriteOperation::TPtr GetOperation(const TOperationWriteId writeId) const;
TWriteOperation::TPtr GetOperationVerified(const TOperationWriteId writeId) const {
return TValidator::CheckNotNull(GetOperationOptional(writeId));
auto result = GetOperationOptional(writeId);
AFL_VERIFY(!!result)("op_id", writeId);
return result;
}
TWriteOperation::TPtr GetOperationOptional(const TOperationWriteId writeId) const {
return GetOperation(writeId);
auto it = Operations.find(writeId);
if (it == Operations.end()) {
return nullptr;
}
return it->second;
}
void CommitTransactionOnExecute(
TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot);
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/changes/cleanup_portions.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
Expand Down Expand Up @@ -455,9 +456,9 @@ void TestWriteOverload(const TestTableDescription& table) {
UNIT_ASSERT(testBlob.size() > NOlap::TCompactionLimits::MAX_BLOB_SIZE / 2);
UNIT_ASSERT(testBlob.size() < NOlap::TCompactionLimits::MAX_BLOB_SIZE);

const ui64 overloadSize = NColumnShard::TSettings::OverloadWritesSizeInFlight;
const ui64 overloadSize = NKikimrConfig::TColumnShardConfig().GetWritingInFlightRequestBytesLimit();
ui32 toCatch = overloadSize / testBlob.size() + 1;
UNIT_ASSERT_VALUES_EQUAL(toCatch, 22);
UNIT_ASSERT_VALUES_EQUAL(toCatch, 21);
TDeque<TAutoPtr<IEventHandle>> capturedWrites;

auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
Expand Down Expand Up @@ -531,7 +532,8 @@ void TestWriteReadDup(const TestTableDescription& table = {}) {
for (ui32 i = 0; i <= 5; ++i) {
std::vector<ui64> writeIds;
++txId;
UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert, txId));
UNIT_ASSERT(
WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert, txId));
ProposeCommit(runtime, sender, txId, writeIds, txId);
txIds.insert(txId);
}
Expand Down

0 comments on commit b754b20

Please sign in to comment.