Skip to content

Commit

Permalink
KAFKA-9105; Add back truncateHead method to ProducerStateManager (#7599)
Browse files Browse the repository at this point in the history
The truncateHead method was removed from ProducerStateManager by github.com/apache/kafka/commit/c49775b. This meant that snapshots were no longer removed when the log start offset increased, even though the intent of that change was to remove snapshots but preserve the in-memory mapping. This patch adds the required functionality back.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
a0x8o committed Oct 25, 2019
1 parent 0fe7c60 commit a509685
Show file tree
Hide file tree
Showing 133 changed files with 3,727 additions and 1,632 deletions.
6 changes: 3 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

<!-- Generator -->
<suppress checks="CyclomaticComplexity|BooleanExpressionComplexity"
files="(SchemaGenerator|MessageDataGenerator).java"/>
files="(SchemaGenerator|MessageDataGenerator|FieldSpec).java"/>
<suppress checks="NPathComplexity"
files="(FieldSpec).java"/>
<suppress checks="JavaNCSS"
files="(ApiMessageType).java"/>
files="(ApiMessageType).java|MessageDataGenerator.java"/>

<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
Expand All @@ -34,7 +34,7 @@
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>

<suppress checks="ParameterNumber"
files="NetworkClient.java"/>
files="NetworkClient.java|FieldSpec.java"/>
<suppress checks="ParameterNumber"
files="KafkaConsumer.java"/>
<suppress checks="ParameterNumber"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPa
* @param options The options to carry removing members' information.
* @return The MembershipChangeResult.
*/
MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options);
RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options);

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.util.Set;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.KafkaFuture.BaseFunction;
import org.apache.kafka.common.KafkaFuture.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

Expand All @@ -35,50 +33,65 @@
@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
private final KafkaFuture<Map<TopicPartition, Errors>> future;
private final Set<TopicPartition> partitions;

DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {

DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
this.future = future;
this.partitions = partitions;
}

/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
if (!partitions.contains(partition)) {
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
}
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
@Override
public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Group offset deletion for partition \"" + partition +
"\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
}
}

this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
result.complete(null);
}
});

return result;
}

/**
* Return a future which succeeds only if all the deletions succeed.
* If not, the first partition error shall be returned.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
@Override
public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
return null;
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
for (TopicPartition partition : partitions) {
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
return;
}
}
result.complete(null);
}
});
return result;
}

private boolean maybeCompleteExceptionally(Map<TopicPartition, Errors> partitionLevelErrors,
TopicPartition partition,
KafkaFutureImpl<Void> result) {
Throwable exception = KafkaAdminClient.getSubLevelError(partitionLevelErrors, partition,
"Offset deletion result for partition \"" + partition + "\" was not included in the response");
if (exception != null) {
result.completeExceptionally(exception);
return true;
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
Expand Down Expand Up @@ -158,6 +160,7 @@
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
Expand Down Expand Up @@ -2653,7 +2656,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId));
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getDescribeConsumerGroupsCall(context));
() -> getDescribeConsumerGroupsCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
}

Expand Down Expand Up @@ -2980,7 +2983,7 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou
new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture);

Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getListConsumerGroupOffsetsCall(context));
() -> getListConsumerGroupOffsetsCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);

return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
Expand Down Expand Up @@ -3052,7 +3055,7 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupI
ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getDeleteConsumerGroupsCall(context));
() -> getDeleteConsumerGroupsCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
}

Expand Down Expand Up @@ -3104,7 +3107,7 @@ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
if (groupIdIsUnrepresentable(groupId)) {
future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
groupId + "' cannot be represented in a request."));
return new DeleteConsumerGroupOffsetsResult(future);
return new DeleteConsumerGroupOffsetsResult(future, partitions);
}

final long startFindCoordinatorMs = time.milliseconds();
Expand All @@ -3113,10 +3116,10 @@ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);

Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getDeleteConsumerGroupOffsetsCall(context, partitions));
() -> getDeleteConsumerGroupOffsetsCall(context, partitions));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);

return new DeleteConsumerGroupOffsetsResult(future);
return new DeleteConsumerGroupOffsetsResult(future, partitions);
}

private Call getDeleteConsumerGroupOffsetsCall(
Expand Down Expand Up @@ -3162,13 +3165,10 @@ void handleResponse(AbstractResponse abstractResponse) {
return;

final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data.topics().forEach(topic -> {
topic.partitions().forEach(partition -> {
partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode()));
});
});
response.data.topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())))
);

context.future().complete(partitions);
}
Expand Down Expand Up @@ -3277,7 +3277,7 @@ public AlterPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) {

ReassignablePartition reassignablePartition = new ReassignablePartition()
.setPartitionIndex(partitionIndex)
.setReplicas(reassignment.map(NewPartitionReassignment::targetBrokers).orElse(null));
.setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null));
reassignablePartitions.add(reassignablePartition);
}

Expand Down Expand Up @@ -3468,33 +3468,32 @@ private boolean dependsOnSpecificNode(ConfigResource resource) {
}

@Override
public MembershipChangeResult removeMemberFromConsumerGroup(String groupId,
RemoveMemberFromConsumerGroupOptions options) {
public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options) {
final long startFindCoordinatorMs = time.milliseconds();
final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());

KafkaFutureImpl<RemoveMemberFromGroupResult> future = new KafkaFutureImpl<>();
KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new KafkaFutureImpl<>();

ConsumerGroupOperationContext<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context =
ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);

Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getRemoveMembersFromGroupCall(context));
() -> getRemoveMembersFromGroupCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);

return new MembershipChangeResult(future);
return new RemoveMembersFromConsumerGroupResult(future, options.members());
}


private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext
<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context) {
private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context) {
return new Call("leaveGroup",
context.deadline(),
new ConstantNodeIdProvider(context.node().get().id())) {
@Override
LeaveGroupRequest.Builder createRequest(int timeoutMs) {
return new LeaveGroupRequest.Builder(context.groupId(),
context.options().getMembers());
context.options().members().stream().map(
MemberToRemove::toMemberIdentity).collect(Collectors.toList()));
}

@Override
Expand All @@ -3507,16 +3506,19 @@ void handleResponse(AbstractResponse abstractResponse) {
return;
}

// If error is transient coordinator error, retry
Errors error = response.error();
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
throw error.exception();
}

final RemoveMemberFromGroupResult membershipChangeResult =
new RemoveMemberFromGroupResult(response, context.options().getMembers());
if (handleGroupRequestError(response.topLevelError(), context.future()))
return;

context.future().complete(membershipChangeResult);
final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
for (MemberResponse memberResponse : response.memberResponses()) {
// We set member.id to empty here explicitly, so that the lookup will succeed as user doesn't
// know the exact member.id.
memberErrors.put(new MemberIdentity()
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setGroupInstanceId(memberResponse.groupInstanceId()),
Errors.forCode(memberResponse.errorCode()));
}
context.future().complete(memberErrors);
}

@Override
Expand Down Expand Up @@ -3736,4 +3738,15 @@ void handleFailure(Throwable throwable) {
return calls;
}

/**
* Get a sub level error when the request is in batch. If given key was not found,
* return an {@link IllegalArgumentException}.
*/
static <K> Throwable getSubLevelError(Map<K, Errors> subLevelErrors, K subKey, String keyNotFoundMsg) {
if (!subLevelErrors.containsKey(subKey)) {
return new IllegalArgumentException(keyNotFoundMsg);
} else {
return subLevelErrors.get(subKey).exception();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class ListPartitionReassignmentsResult {
private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> future;

public ListPartitionReassignmentsResult(KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments) {
ListPartitionReassignmentsResult(KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments) {
this.future = reassignments;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.requests.JoinGroupRequest;

import java.util.Objects;

/**
* A struct containing information about the member to be removed.
*/
public class MemberToRemove {
private final String groupInstanceId;

public MemberToRemove(String groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}

@Override
public boolean equals(Object o) {
if (o instanceof MemberToRemove) {
MemberToRemove otherMember = (MemberToRemove) o;
return this.groupInstanceId.equals(otherMember.groupInstanceId);
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(groupInstanceId);
}

MemberIdentity toMemberIdentity() {
return new MemberIdentity()
.setGroupInstanceId(groupInstanceId)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}

public String groupInstanceId() {
return groupInstanceId;
}
}
Loading

0 comments on commit a509685

Please sign in to comment.