Skip to content

Commit

Permalink
ChangeFeedProcessorFix - IllegalArgumentException (#34618)
Browse files Browse the repository at this point in the history
* add delay for error case as well
* skip child leases assignement for split scenario


---------

Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
xinlian12 and annie-mac authored Apr 30, 2023
1 parent 658c6d5 commit 0a831e6
Show file tree
Hide file tree
Showing 26 changed files with 875 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void faultInjectionServerErrorRuleTests_OperationType(OperationType opera
cosmosDiagnostics,
operationType,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
serverGoneRuleId,
true);

Expand Down Expand Up @@ -294,7 +294,7 @@ public void faultInjectionServerErrorRuleTests_OperationTypeImpactAddresses(Oper
cosmosDiagnostics,
operationType,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
writeRegionServerGoneRuleId,
true);
} else {
Expand Down Expand Up @@ -394,7 +394,7 @@ public void faultInjectionServerErrorRuleTests_Region() throws JsonProcessingExc
cosmosDiagnostics,
OperationType.Read,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
localRegionRuleId,
true
);
Expand Down Expand Up @@ -456,7 +456,7 @@ public void faultInjectionServerErrorRuleTests_Partition() throws JsonProcessing
cosmosDiagnostics,
OperationType.Query,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
feedRangeRuleId,
true
);
Expand Down Expand Up @@ -798,7 +798,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit() throws JsonProcessingE
cosmosDiagnostics,
OperationType.Read,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
hitLimitRuleId,
true
);
Expand Down Expand Up @@ -922,7 +922,7 @@ public void faultInjectionServerErrorRuleTests_includePrimary() throws JsonProce
cosmosDiagnostics,
OperationType.Create,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
serverGoneIncludePrimaryRuleId,
true);

Expand All @@ -932,7 +932,7 @@ public void faultInjectionServerErrorRuleTests_includePrimary() throws JsonProce
cosmosDiagnostics,
OperationType.Upsert,
HttpConstants.StatusCodes.GONE,
HttpConstants.SubStatusCodes.UNKNOWN,
HttpConstants.SubStatusCodes.SERVER_GENERATED_410,
serverGoneIncludePrimaryRuleId,
true);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,41 @@
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneMergeHandler;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.routing.Range;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.test.StepVerifier;

import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
public class FeedRangeOneMergeHandlerTests {
public class FeedRangeGoneMergeHandlerTests {
@DataProvider(name = "maxScaleCountArgProvider")
public static Object[][] maxScaleCountArgProvider() {
return new Object[][]{
// maxScaleCount
{ 0 },
{ 1 }
};
}

@Test(groups = "unit")
public void feedRangeGoneMergeHandler_constructor() {
FeedRangeEpkImpl feedRangeForLeaseWithGoneException = new FeedRangeEpkImpl(
new Range<>("AA", "BB", true, false));

ServiceItemLeaseV1 leaseWithGoneException =
new ServiceItemLeaseV1()
.withLeaseToken("AA-BB")
.withFeedRange(feedRangeForLeaseWithGoneException);
leaseWithGoneException.setId("TestLease-" + UUID.randomUUID());

FeedRangeGoneMergeHandler mergeHandler = new FeedRangeGoneMergeHandler(
leaseWithGoneException,
new PartitionKeyRange("1", "AA", "CC"));

assertThat(mergeHandler.shouldSkipDirectLeaseAssignment()).isFalse();
assertThat(mergeHandler.shouldDeleteCurrentLease()).isFalse();
}

@Test(groups = "unit")
public void mergeHandlerForEpkBasedLease() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,44 @@ public Object[][] secondChildLeaseSuccessArgProvider() {
};
}

@DataProvider(name = "maxScaleCountArgProvider")
public static Object[][] maxScaleCountArgProvider() {
return new Object[][]{
// maxScaleCount
{ 0 },
{ 1 }
};
}

@Test(groups = "unit", dataProvider = "maxScaleCountArgProvider")
public void feedRangeGoneSplitHandler_constructor(int maxScaleCount) {
ServiceItemLeaseV1 leaseWithGoneException =
new ServiceItemLeaseV1()
.withLeaseToken("AA-CC")
.withFeedRange(new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false)));
leaseWithGoneException.setId("TestLease-" + UUID.randomUUID());

List<PartitionKeyRange> childRanges = new ArrayList<>();
// using a min less than AA to check we are using the min of the lease with gone exception
childRanges.add(new PartitionKeyRange("1", "", "BB"));
// using a max larger than CC to check we are using the max of the lease with gone exception
childRanges.add(new PartitionKeyRange("2", "BB", "FF"));

FeedRangeGoneSplitHandler splitHandler = new FeedRangeGoneSplitHandler(
leaseWithGoneException,
childRanges,
Mockito.mock(LeaseManager.class),
maxScaleCount);

if (maxScaleCount > 0) {
assertThat(splitHandler.shouldSkipDirectLeaseAssignment()).isTrue();
} else {
assertThat(splitHandler.shouldSkipDirectLeaseAssignment()).isFalse();
}

assertThat(splitHandler.shouldDeleteCurrentLease()).isTrue();
}

@Test(groups = "unit", dataProvider = "secondChildLeaseSuccessArgProvider")
public void splitHandlerForEpkBasedLease(boolean secondChildLeaseSuccess) {
// Testing an imaginary scenario FeedRange "AA-CC" has been split into "''-BB", "BB-FF"
Expand Down Expand Up @@ -74,8 +112,8 @@ public void splitHandlerForEpkBasedLease(boolean secondChildLeaseSuccess) {
FeedRangeGoneSplitHandler splitHandler = new FeedRangeGoneSplitHandler(
leaseWithGoneException,
childRanges,
leaseManagerMock
);
leaseManagerMock,
0);

if (secondChildLeaseSuccess) {
StepVerifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.routing.Range;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -25,13 +26,24 @@
import java.util.UUID;

import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class PartitionControllerImplTests {
@Test(groups = "unit")
public void handleSplit() throws InterruptedException {

@DataProvider(name = "shouldSkipDirectLeaseAssignmentArgProvider")
public static Object[][] shouldSkipDirectLeaseAssignmentArgProvider() {
return new Object[][]{
// shouldSkipDirectLeaseAssignment
{ true },
{ false }
};
}

@Test(groups = "unit", dataProvider = "shouldSkipDirectLeaseAssignmentArgProvider")
public void handleSplit(boolean shouldSkipDirectLeaseAssignment) throws InterruptedException {
LeaseContainer leaseContainer = Mockito.mock(LeaseContainer.class);
when(leaseContainer.getOwnedLeases()).thenReturn(Flux.empty());

Expand Down Expand Up @@ -76,7 +88,8 @@ public void handleSplit() throws InterruptedException {
synchronizer,
lease,
Arrays.asList(childLease1, childLease2),
true);
true,
shouldSkipDirectLeaseAssignment);

StepVerifier.create(partitionController.initialize()).verifyComplete();
StepVerifier.create(partitionController.addOrUpdateLease(lease))
Expand All @@ -87,28 +100,53 @@ public void handleSplit() throws InterruptedException {
// add some waiting time here so that we can capture all the calls
Thread.sleep(500);

// Verify total three leases are acquired
verify(leaseManager, times(1)).acquire(lease);
verify(leaseManager, times(1)).acquire(childLease1);
verify(leaseManager, times(1)).acquire(childLease2);

// Verify partitionSupervisor is created for each lease
verify(partitionSupervisorFactory, times(1)).create(lease);
verify(partitionSupervisorFactory, times(1)).create(childLease1);
verify(partitionSupervisorFactory, times(1)).create(childLease2);

// Verify only the lease with feedRangeGone exception will be deleted from lease container
verify(leaseManager, times(1)).delete(lease);
verify(leaseManager, Mockito.never()).delete(childLease1);
verify(leaseManager, Mockito.never()).delete(childLease2);

// Verify at the end, all the leases will be released
verify(leaseManager, times(1)).release(lease);
verify(leaseManager, times(1)).release(childLease1);
verify(leaseManager, times(1)).release(childLease2);

verify(leaseManager, Mockito.never()).updateProperties(Mockito.any());
verify(feedRangeGoneHandler, times(1)).handlePartitionGone();
if (shouldSkipDirectLeaseAssignment) {
// Verify only parent lease is acquired
verify(leaseManager, times(1)).acquire(lease);
verify(leaseManager, never()).acquire(childLease1);
verify(leaseManager, never()).acquire(childLease2);

// Verify partitionSupervisor is created for parent lease
verify(partitionSupervisorFactory, times(1)).create(lease);
verify(partitionSupervisorFactory, never()).create(childLease1);
verify(partitionSupervisorFactory, never()).create(childLease2);

// Verify only the lease with feedRangeGone exception will be deleted from lease container
verify(leaseManager, times(1)).delete(lease);
verify(leaseManager, Mockito.never()).delete(childLease1);
verify(leaseManager, Mockito.never()).delete(childLease2);

// Verify at the end, only parent lease will be released
verify(leaseManager, times(1)).release(lease);
verify(leaseManager, never()).release(childLease1);
verify(leaseManager, never()).release(childLease2);

verify(leaseManager, Mockito.never()).updateProperties(Mockito.any());
verify(feedRangeGoneHandler, times(1)).handlePartitionGone();
} else {
// Verify total three leases are acquired
verify(leaseManager, times(1)).acquire(lease);
verify(leaseManager, times(1)).acquire(childLease1);
verify(leaseManager, times(1)).acquire(childLease2);

// Verify partitionSupervisor is created for each lease
verify(partitionSupervisorFactory, times(1)).create(lease);
verify(partitionSupervisorFactory, times(1)).create(childLease1);
verify(partitionSupervisorFactory, times(1)).create(childLease2);

// Verify only the lease with feedRangeGone exception will be deleted from lease container
verify(leaseManager, times(1)).delete(lease);
verify(leaseManager, Mockito.never()).delete(childLease1);
verify(leaseManager, Mockito.never()).delete(childLease2);

// Verify at the end, all the leases will be released
verify(leaseManager, times(1)).release(lease);
verify(leaseManager, times(1)).release(childLease1);
verify(leaseManager, times(1)).release(childLease2);

verify(leaseManager, Mockito.never()).updateProperties(Mockito.any());
verify(feedRangeGoneHandler, times(1)).handlePartitionGone();
}
}


Expand Down Expand Up @@ -151,6 +189,7 @@ public void handleMerge() throws InterruptedException {
synchronizer,
lease,
Arrays.asList(lease), // For merge with epkBased lease, we are going to reuse the lease
false,
false);

StepVerifier.create(partitionController.initialize()).verifyComplete();
Expand Down Expand Up @@ -256,10 +295,12 @@ private FeedRangeGoneHandler setDefaultFeedRangeGoneHandlerBehavior(
PartitionSynchronizer partitionSynchronizer,
ServiceItemLeaseV1 leaseWithException,
List<ServiceItemLeaseV1> newLeases,
boolean shouldDeleteLeaseWithException){
boolean shouldDeleteLeaseWithException,
boolean shouldSkipDirectLeaseAssignment){
FeedRangeGoneHandler feedRangeGoneHandler = Mockito.mock(FeedRangeGoneHandler.class);
when(feedRangeGoneHandler.handlePartitionGone()).thenReturn(Flux.fromIterable(newLeases));
when(feedRangeGoneHandler.shouldDeleteCurrentLease()).thenReturn(shouldDeleteLeaseWithException);
when(feedRangeGoneHandler.shouldSkipDirectLeaseAssignment()).thenReturn(shouldSkipDirectLeaseAssignment);

when(partitionSynchronizer.getFeedRangeGoneHandler(leaseWithException)).thenReturn(Mono.just(feedRangeGoneHandler));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.PartitionController;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;

public class PartitionLoadBalancerImplTests {
private final int PARTITION_LOAD_BALANCER_TIMEOUT = 5000;

@DataProvider(name = "loadBalancingSucceededArgProvider")
public static Object[][] loadBalancingSucceededArgProvider() {
return new Object[][]{
// load balancing call succeeded
{ false },
{ true }
};
}

@Test(groups = "unit", dataProvider = "loadBalancingSucceededArgProvider")
public void run(boolean loadBalancingSucceeded) throws InterruptedException {
PartitionController partitionControllerMock = Mockito.mock(PartitionController.class);
LeaseContainer leaseContainerMock = Mockito.mock(LeaseContainer.class);
PartitionLoadBalancingStrategy partitionLoadBalancingStrategyMock = Mockito.mock(PartitionLoadBalancingStrategy.class);

ServiceItemLeaseV1 lease = new ServiceItemLeaseV1().withLeaseToken("1");
lease.setId("TestLease-" + UUID.randomUUID());

List<Lease> allLeases = Arrays.asList(lease);
Mockito.when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(allLeases));

if (loadBalancingSucceeded) {
Mockito
.when(partitionLoadBalancingStrategyMock.selectLeasesToTake(allLeases))
.thenReturn(allLeases)
.thenReturn(Arrays.asList());
} else {
Mockito
.when(partitionLoadBalancingStrategyMock.selectLeasesToTake(allLeases))
.thenThrow(new IllegalArgumentException("Something is wrong"));
}

Mockito.when(partitionControllerMock.shutdown()).thenReturn(Mono.empty());

PartitionLoadBalancerImpl partitionLoadBalancerImpl =
new PartitionLoadBalancerImpl(
partitionControllerMock,
leaseContainerMock,
partitionLoadBalancingStrategyMock,
Duration.ofSeconds(2),
Schedulers.boundedElastic()
);

partitionLoadBalancerImpl
.start()
.timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
Thread.sleep(Duration.ofSeconds(5).toMillis());
Mockito.verify(leaseContainerMock, Mockito.atMost(3)).getAllLeases();
partitionLoadBalancerImpl
.stop()
.timeout(Duration.ofMillis(PARTITION_LOAD_BALANCER_TIMEOUT))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
Loading

0 comments on commit 0a831e6

Please sign in to comment.