Skip to content

Commit

Permalink
Fix partition split condition for kinesis protocol (#12447)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Dec 24, 2024
1 parent a3bc91c commit c7b02fe
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
4 changes: 3 additions & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
const auto writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
const auto sourceIdWindow = TDuration::Seconds(std::min<ui32>(5, Config.GetPartitionStrategy().GetScaleThresholdSeconds()));
const auto sourceIdCount = SourceIdCounter.Count(ctx.Now() - sourceIdWindow);
const auto canSplit = sourceIdCount > 1 || (sourceIdCount == 1 && SourceIdCounter.LastValue().empty() /* kinesis */);

PQ_LOG_D("TPartition::CheckScaleStatus"
<< " splitMergeAvgWriteBytes# " << SplitMergeAvgWriteBytes->GetValue()
<< " writeSpeedUsagePercent# " << writeSpeedUsagePercent
<< " scaleThresholdSeconds# " << Config.GetPartitionStrategy().GetScaleThresholdSeconds()
<< " totalPartitionWriteSpeed# " << TotalPartitionWriteSpeed
<< " sourceIdCount=" << sourceIdCount
<< " canSplit=" << canSplit
<< " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
Expand All @@ -616,7 +618,7 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {

auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent() && sourceIdCount > 1) {
if (splitEnabled && canSplit && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
PQ_LOG_D("TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"."
<< " Partition: " << Partition);
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,8 @@ size_t TLastCounter::Count(const TInstant& expirationTime) {
});
}

const TString& TLastCounter::LastValue() const {
return Values.back().Value;
}

} // NKikimr::NPQ
1 change: 1 addition & 0 deletions ydb/core/persqueue/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TLastCounter {
public:
void Use(const TString& value, const TInstant& now);
size_t Count(const TInstant& expirationTime);
const TString& LastValue() const;

private:
struct Data {
Expand Down

0 comments on commit c7b02fe

Please sign in to comment.