Skip to content

Commit

Permalink
KAFKA-18646: Null records in fetch response breaks librdkafka (#18726)
Browse files Browse the repository at this point in the history
Ensure we always return empty records (including cases where an error is returned).
We also remove `nullable` from `records` since it is effectively expected to be
non-null by a large percentage of clients in the wild.

This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
previously set via `FetchResponse.recordsOrFail(partitionData)` in the
now-removed `maybeConvertFetchedData` method.

Added an integration test that fails without this fix and also update many
tests to set `records` to `empty` instead of leaving them as `null`.

Reviewers: Chia-Ping Tsai <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
ijuma authored Jan 29, 2025
1 parent 97a2280 commit ca5d2cf
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public static FetchResponseData.PartitionData partitionResponse(int partition, E
return new FetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(error.code())
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
.setRecords(MemoryRecords.EMPTY);
}

/**
Expand Down Expand Up @@ -285,4 +286,4 @@ private static FetchResponseData toMessage(Errors error,
.setSessionId(sessionId)
.setResponses(topicResponseList);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
]},
{ "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId",
"about": "The preferred read replica for the consumer to use on its next fetch request."},
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
{ "name": "Records", "type": "records", "versions": "0+", "about": "The record data."}
]}
]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,8 @@ private FetchResponse createFetchResponse(int sessionId) {
.setPartitionIndex(1)
.setHighWatermark(1000000)
.setLogStartOffset(0)
.setAbortedTransactions(abortedTransactions));
.setAbortedTransactions(abortedTransactions)
.setRecords(MemoryRecords.EMPTY));
return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId,
responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion());
}
Expand All @@ -2048,7 +2049,8 @@ private FetchResponse createFetchResponse(boolean includeAborted) {
.setPartitionIndex(1)
.setHighWatermark(1000000)
.setLogStartOffset(0)
.setAbortedTransactions(abortedTransactions));
.setAbortedTransactions(abortedTransactions)
.setRecords(MemoryRecords.EMPTY));
return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, INVALID_SESSION_ID,
responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
Expand All @@ -59,6 +59,7 @@ import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.coordinator.group.GroupConfig
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable

import scala.collection.mutable
Expand Down Expand Up @@ -808,6 +809,34 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendRequestAndVerifyResponseError(request, resources, isAuthorized = true)
}

@Test
def testFetchConsumerRequest(): Unit = {
createTopicWithBrokerPrincipal(topic)

val request = createFetchRequest
val topicNames = getTopicNames().asJava

def partitionDatas(response: AbstractResponse): Iterable[FetchResponseData.PartitionData] = {
assertTrue(response.isInstanceOf[FetchResponse])
response.asInstanceOf[FetchResponse].responseData(topicNames, ApiKeys.FETCH.latestVersion).values().asScala
}

removeAllClientAcls()
val resources = Set(topicResource.resourceType, clusterResource.resourceType)
val failedResponse = sendRequestAndVerifyResponseError(request, resources, isAuthorized = false)
val failedPartitionDatas = partitionDatas(failedResponse)
assertEquals(1, failedPartitionDatas.size)
// Some clients (like librdkafka) always expect non-null records - even for the cases where an error is returned
failedPartitionDatas.foreach(partitionData => assertEquals(MemoryRecords.EMPTY, partitionData.records))

val readAcls = topicReadAcl(topicResource)
addAndVerifyAcls(readAcls, topicResource)
val succeededResponse = sendRequestAndVerifyResponseError(request, resources, isAuthorized = true)
val succeededPartitionDatas = partitionDatas(succeededResponse)
assertEquals(1, succeededPartitionDatas.size)
succeededPartitionDatas.foreach(partitionData => assertEquals(MemoryRecords.EMPTY, partitionData.records))
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,8 @@ FetchResponseData divergingFetchResponse(
partitionData.divergingEpoch()
.setEpoch(divergingEpoch)
.setEndOffset(divergingEpochEndOffset);

partitionData.setRecords(MemoryRecords.EMPTY);
}
);
}
Expand Down Expand Up @@ -1830,6 +1832,8 @@ FetchResponseData snapshotFetchResponse(
partitionData.snapshotId()
.setEpoch(snapshotId.epoch())
.setEndOffset(snapshotId.offset());

partitionData.setRecords(MemoryRecords.EMPTY);
}
);
}
Expand Down

0 comments on commit ca5d2cf

Please sign in to comment.