Skip to content

Commit

Permalink
Remove spurious StoreOffset and drop support for Co-operative Sticky …
Browse files Browse the repository at this point in the history
…load balancing (#3059)
  • Loading branch information
iancooper authored Apr 19, 2024
1 parent a8a13e1 commit e0cb877
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,18 @@ public KafkaMessageConsumer(
})
.SetPartitionsRevokedHandler((consumer, list) =>
{
consumer.Commit(list);
try
{
_consumer?.Commit(list);
}
catch (KafkaException error)
{
s_logger.LogError(
"Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}",
error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal
);
}

var revokedPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();

s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitions));
Expand Down Expand Up @@ -237,7 +248,6 @@ public void Acknowledge(Message message)
s_logger.LogInformation("Storing offset {Offset} to topic {Topic} for partition {ChannelName}",
new Offset(topicPartitionOffset.Offset + 1).Value, topicPartitionOffset.TopicPartition.Topic,
topicPartitionOffset.TopicPartition.Partition.Value);
_consumer.StoreOffset(offset);
_offsetStorage.Add(offset);

if (_offsetStorage.Count % _maxBatchSize == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public KafkaSubscription (
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
Expand All @@ -170,6 +170,10 @@ public KafkaSubscription (
NumPartitions = numOfPartitions;
ReplicationFactor = replicationFactor;
PartitionAssignmentStrategy = partitionAssignmentStrategy;

if (PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
throw new ArgumentOutOfRangeException("partitionAssignmentStrategy",
"CooperativeSticky is not supported for with manual commits, see https://github.com/confluentinc/librdkafka/issues/4059");
}
}

Expand Down Expand Up @@ -226,7 +230,7 @@ public KafkaSubscription(
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin)
: base(typeof(T), name, channelName, routingKey, groupId, bufferSize, noOfPerformers, timeoutInMilliseconds,
requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, offsetDefault, commitBatchSize,
sessionTimeoutMs, maxPollIntervalMs, sweepUncommittedOffsetsIntervalMs, isolationLevel, runAsync,
Expand Down

0 comments on commit e0cb877

Please sign in to comment.