Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18269: Remove deprecated protocol APIs support (KIP-896, KIP-724) for 4.0 #18291

Merged
merged 2 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.ProduceRequest;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
Expand All @@ -33,7 +28,6 @@
public class ApiVersions {

private final Map<String, NodeApiVersions> nodeApiVersions = new HashMap<>();
private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;

// The maximum finalized feature epoch of all the node api versions.
private long maxFinalizedFeaturesEpoch = -1;
Expand All @@ -50,7 +44,6 @@ public static class FinalizedFeaturesInfo {

public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions) {
this.nodeApiVersions.put(nodeId, nodeApiVersions);
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
if (maxFinalizedFeaturesEpoch < nodeApiVersions.finalizedFeaturesEpoch()) {
this.maxFinalizedFeaturesEpoch = nodeApiVersions.finalizedFeaturesEpoch();
this.finalizedFeatures = nodeApiVersions.finalizedFeatures();
Expand All @@ -59,7 +52,6 @@ public synchronized void update(String nodeId, NodeApiVersions nodeApiVersions)

public synchronized void remove(String nodeId) {
this.nodeApiVersions.remove(nodeId);
this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
}

public synchronized NodeApiVersions get(String nodeId) {
Expand All @@ -74,19 +66,4 @@ public synchronized FinalizedFeaturesInfo getFinalizedFeaturesInfo() {
return new FinalizedFeaturesInfo(maxFinalizedFeaturesEpoch, finalizedFeatures);
}

private byte computeMaxUsableProduceMagic() {
// use a magic version which is supported by all brokers to reduce the chance that
// we will need to convert the messages when they are ready to be sent.
Optional<Byte> knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream()
.filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
.map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)))
.min(Byte::compare);
return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE,
knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE));
}

public synchronized byte maxUsableProduceMagic() {
return maxUsableProduceMagic;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,15 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
case NONE:
if (!partition.oldStyleOffsets().isEmpty()) {
// Handle v0 response with offsets
long offset;
if (partition.oldStyleOffsets().size() > 1) {
throw new IllegalStateException("Unexpected partitionData response of length " +
partition.oldStyleOffsets().size());
} else {
offset = partition.oldStyleOffsets().get(0);
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
if (offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
} else {
// Handle v1 and later response or v0 without offsets
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partition.offset(), partition.timestamp());
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(partition.offset(), partition.timestamp(),
leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
}
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partition.offset(), partition.timestamp());
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
OffsetFetcherUtils.ListOffsetData offsetData = new OffsetFetcherUtils.ListOffsetData(partition.offset(), partition.timestamp(),
leaderEpoch);
fetchedOffsets.put(topicPartition, offsetData);
}
break;
case UNSUPPORTED_FOR_MESSAGE_FORMAT:
Expand Down Expand Up @@ -458,4 +440,4 @@ static class ListOffsetData {
this.leaderEpoch = leaderEpoch;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
compression.type(), serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
Expand Down Expand Up @@ -344,8 +343,8 @@ public RecordAppendResult append(String topic,
}

if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
Expand Down Expand Up @@ -408,7 +407,7 @@ private RecordAppendResult appendNewBatch(String topic,
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
Expand All @@ -419,12 +418,8 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,27 +871,10 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
return;

final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();

// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
Expand All @@ -904,18 +887,13 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
}

String transactionalId = null;

// When we use transaction V1 protocol in transaction we set the request version upper limit to
// LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1.
boolean useTransactionV1Version = false;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
if (!transactionManager.isTransactionV2Enabled()) {
useTransactionV1Version = true;
}
useTransactionV1Version = !transactionManager.isTransactionV2Enabled();
}

ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
ProduceRequest.Builder requestBuilder = ProduceRequest.builder(
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,18 @@ public class TopicConfig {
"or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
"configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
"timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";

/**
* @deprecated down-conversion is not possible in Apache Kafka 4.0 and newer, hence this configuration is a no-op,
* and it is deprecated for removal in Apache Kafka 5.0.
*/
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " +
"down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " +
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";

/**
* @deprecated see {@link #MESSAGE_DOWNCONVERSION_ENABLE_CONFIG}.
*/
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
"hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public enum ApiKeys {
private static boolean shouldRetainsBufferReference(Schema[] requestSchemas) {
boolean requestRetainsBufferReference = false;
for (Schema requestVersionSchema : requestSchemas) {
if (retainsBufferReference(requestVersionSchema)) {
if (requestVersionSchema != null && retainsBufferReference(requestVersionSchema)) {
requestRetainsBufferReference = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,29 +224,6 @@ public DescribeConfigsResponse(DescribeConfigsResponseData data) {
this.data = data;
}

// This constructor should only be used after deserialization, it has special handling for version 0
private DescribeConfigsResponse(DescribeConfigsResponseData data, short version) {
super(ApiKeys.DESCRIBE_CONFIGS);
this.data = data;
if (version == 0) {
for (DescribeConfigsResponseData.DescribeConfigsResult result : data.results()) {
for (DescribeConfigsResponseData.DescribeConfigsResourceResult config : result.configs()) {
if (config.isDefault()) {
config.setConfigSource(ConfigSource.DEFAULT_CONFIG.id);
} else {
if (result.resourceType() == ConfigResource.Type.BROKER.id()) {
config.setConfigSource(ConfigSource.STATIC_BROKER_CONFIG.id);
} else if (result.resourceType() == ConfigResource.Type.TOPIC.id()) {
config.setConfigSource(ConfigSource.TOPIC_CONFIG.id);
} else {
config.setConfigSource(ConfigSource.UNKNOWN.id);
}
}
}
}
}
}

@Override
public DescribeConfigsResponseData data() {
return data;
Expand All @@ -272,7 +249,7 @@ public Map<Errors, Integer> errorCounts() {
}

public static DescribeConfigsResponse parse(ByteBuffer buffer, short version) {
return new DescribeConfigsResponse(new DescribeConfigsResponseData(new ByteBufferAccessor(buffer), version), version);
return new DescribeConfigsResponse(new DescribeConfigsResponseData(new ByteBufferAccessor(buffer), version));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -137,7 +136,6 @@ private ListOffsetsRequest(ListOffsetsRequestData data, short version) {

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version();
short errorCode = Errors.forException(e).code();

List<ListOffsetsTopicResponse> responses = new ArrayList<>();
Expand All @@ -148,12 +146,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ListOffsetsPartitionResponse partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(errorCode)
.setPartitionIndex(partition.partitionIndex());
if (versionId == 0) {
partitionResponse.setOldStyleOffsets(Collections.emptyList());
} else {
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
}
partitionResponse.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP);
partitions.add(partitionResponse);
}
topicResponse.setPartitions(partitions);
Expand Down
Loading
Loading