diff --git a/lib/fluent/plugin/kafka_producer_ext.rb b/lib/fluent/plugin/kafka_producer_ext.rb index e1b70a2..40dd0a5 100644 --- a/lib/fluent/plugin/kafka_producer_ext.rb +++ b/lib/fluent/plugin/kafka_producer_ext.rb @@ -254,9 +254,8 @@ def assign_partitions! @pending_message_queue.each do |message| partition = message.partition - + partition_count = @cluster.partitions_for(message.topic).count begin - partition_count = @cluster.partitions_for(message.topic).count if partition.nil? partition = @partitioner.call(partition_count, message) diff --git a/lib/fluent/plugin/out_kafka2.rb b/lib/fluent/plugin/out_kafka2.rb index dfc6e44..35a4fd9 100644 --- a/lib/fluent/plugin/out_kafka2.rb +++ b/lib/fluent/plugin/out_kafka2.rb @@ -403,6 +403,7 @@ def write(chunk) rescue Kafka::UnknownTopicOrPartition if @use_default_for_unknown_topic && topic != @default_topic log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic" + producer.clear_buffer topic = @default_topic retry end