Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization: 9~15% improvement in KinesisDataFetcher wall-time after #1034

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@
@KinesisClientInternalApi
public class KinesisShardDetector implements ShardDetector {

/**
* Reusable {@link AWSExceptionManager}.
* <p>
* N.B. This instance is mutable, but thread-safe for <b>read-only</b> use.
* </p>
*/
private static final AWSExceptionManager AWS_EXCEPTION_MANAGER;

static {
AWS_EXCEPTION_MANAGER = new AWSExceptionManager();
AWS_EXCEPTION_MANAGER.add(KinesisException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(LimitExceededException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceInUseException.class, t -> t);
AWS_EXCEPTION_MANAGER.add(ResourceNotFoundException.class, t -> t);
}

@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
Expand All @@ -78,7 +94,7 @@ public class KinesisShardDetector implements ShardDetector {
private volatile Map<String, Shard> cachedShardMap = null;
private volatile Instant lastCacheUpdateTime;
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
private final AtomicInteger cacheMisses = new AtomicInteger(0);

@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
Expand Down Expand Up @@ -186,12 +202,6 @@ public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
}

private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);

ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
Expand All @@ -211,7 +221,7 @@ private ListShardsResponse listShards(ShardFilter shardFilter, final String next
try {
result = getListShardsResponse(request);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: check if this is the correct behavior for Interrupted Exception
log.debug("Interrupted exception caught, shutdown initiated, returning null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import software.amazon.kinesis.annotations.KinesisClientInternalApi;

/**
*
* Traverses a {@code Throwable} class inheritance in search of a mapping
* function which will convert that throwable into a {@code RuntimeException}.
* If no mapping function is found, the default function will be applied.
*/
@KinesisClientInternalApi
public class AWSExceptionManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
Expand All @@ -48,7 +46,6 @@
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.DataFetcherResult;
import software.amazon.kinesis.retrieval.DataRetrievalUtil;
import software.amazon.kinesis.retrieval.IteratorBuilder;
import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
Expand All @@ -66,6 +63,14 @@ public class KinesisDataFetcher implements DataFetcher {
private static final String METRICS_PREFIX = "KinesisDataFetcher";
private static final String OPERATION = "ProcessTask";

/**
* Reusable {@link AWSExceptionManager}.
* <p>
* N.B. This instance is mutable, but thread-safe for <b>read-only</b> use.
* </p>
*/
private static final AWSExceptionManager AWS_EXCEPTION_MANAGER = createExceptionManager();

@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull @Getter
Expand All @@ -91,8 +96,6 @@ public KinesisDataFetcher(KinesisAsyncClient kinesisClient, String streamName, S

/**
* Note: This method has package level access for testing purposes.
*
* @return nextIterator
*/
@Getter(AccessLevel.PACKAGE)
private String nextIterator;
Expand Down Expand Up @@ -233,8 +236,6 @@ private void advanceIteratorTo(final String sequenceNumber,
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
}

final AWSExceptionManager exceptionManager = createExceptionManager();

GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request;
Expand All @@ -256,7 +257,7 @@ private void advanceIteratorTo(final String sequenceNumber,
nextIterator = getNextIterator(request);
success = true;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
throw new RuntimeException(e);
Expand Down Expand Up @@ -328,7 +329,6 @@ public String getNextIterator(GetShardIteratorRequest request) throws ExecutionE

@Override
public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
final AWSExceptionManager exceptionManager = createExceptionManager();
GetRecordsRequest request = getGetRecordsRequest(nextIterator);

final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
Expand All @@ -341,7 +341,7 @@ public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
success = true;
return response;
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
throw AWS_EXCEPTION_MANAGER.apply(e.getCause());
} catch (InterruptedException e) {
// TODO: Check behavior
log.debug("{} : Interrupt called on method, shutdown initiated", streamAndShardId);
Expand All @@ -355,7 +355,7 @@ public GetRecordsResponse getRecords(@NonNull final String nextIterator) {
}
}

private AWSExceptionManager createExceptionManager() {
private static AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(KinesisException.class, t -> t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
Expand Down Expand Up @@ -154,14 +153,13 @@ public final void testInitializeInvalid() throws Exception {
testInitializeAndFetch("foo", null, INITIAL_POSITION_LATEST);
}

private CompletableFuture<GetShardIteratorResponse> makeGetShardIteratorResonse(String shardIterator)
throws InterruptedException, ExecutionException {
private CompletableFuture<GetShardIteratorResponse> makeGetShardIteratorResponse(String shardIterator) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed typo: s/Resonse/Response/

return CompletableFuture
.completedFuture(GetShardIteratorResponse.builder().shardIterator(shardIterator).build());
}

@Test
public void testadvanceIteratorTo() throws KinesisClientLibException, InterruptedException, ExecutionException {
public void testAdvanceIteratorTo() throws KinesisClientLibException {
final Checkpointer checkpoint = mock(Checkpointer.class);
final String iteratorA = "foo";
final String iteratorB = "bar";
Expand All @@ -172,8 +170,9 @@ public void testadvanceIteratorTo() throws KinesisClientLibException, Interrupte
.forClass(GetShardIteratorRequest.class);

when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iteratorA)).thenReturn(makeGetShardIteratorResonse(iteratorA))
.thenReturn(makeGetShardIteratorResonse(iteratorB));
.thenReturn(makeGetShardIteratorResponse(iteratorA))
.thenReturn(makeGetShardIteratorResponse(iteratorA))
.thenReturn(makeGetShardIteratorResponse(iteratorB));
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqA));

kinesisDataFetcher.initialize(seqA, null);
Expand Down Expand Up @@ -203,7 +202,7 @@ private GetShardIteratorRequest makeGetShardIteratorRequest(String shardIterator
}

@Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws InterruptedException, ExecutionException {
public void testAdvanceIteratorToTrimHorizonLatestAndAtTimestamp(){
final ArgumentCaptor<GetShardIteratorRequest> requestCaptor = ArgumentCaptor
.forClass(GetShardIteratorRequest.class);
final String iteratorHorizon = "TRIM_HORIZON";
Expand All @@ -218,9 +217,9 @@ public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() throws Interr
tsReq.toBuilder().timestamp(INITIAL_POSITION_AT_TIMESTAMP.getTimestamp().toInstant()).build());

when(kinesisClient.getShardIterator(requestCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iteratorHorizon))
.thenReturn(makeGetShardIteratorResonse(iteratorLatest))
.thenReturn(makeGetShardIteratorResonse(iteratorAtTimestamp));
.thenReturn(makeGetShardIteratorResponse(iteratorHorizon))
.thenReturn(makeGetShardIteratorResponse(iteratorLatest))
.thenReturn(makeGetShardIteratorResponse(iteratorAtTimestamp));

kinesisDataFetcher.advanceIteratorTo(ShardIteratorType.TRIM_HORIZON.toString(), INITIAL_POSITION_TRIM_HORIZON);
assertEquals(iteratorHorizon, kinesisDataFetcher.getNextIterator());
Expand Down Expand Up @@ -261,7 +260,7 @@ public void testGetRecordsWithResourceNotFoundException() throws Exception {

// Set up proxy mock methods
when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(nextIterator));
.thenReturn(makeGetShardIteratorResponse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
Expand Down Expand Up @@ -302,7 +301,6 @@ public void testGetRecordsThrowsSdkException() throws Exception {

// Call records of dataFetcher which will throw an exception
getRecordsRetrievalStrategy.getRecords(MAX_RECORDS);

}

@Test
Expand All @@ -318,7 +316,7 @@ public void testNonNullGetRecords() throws Exception {
final CompletableFuture<GetRecordsResponse> future = mock(CompletableFuture.class);

when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(nextIterator));
.thenReturn(makeGetShardIteratorResponse(nextIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(future);
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(
new ExecutionException(ResourceNotFoundException.builder().message("Test Exception").build()));
Expand All @@ -331,8 +329,7 @@ public void testNonNullGetRecords() throws Exception {
assertEquals(expectedRecordsRequest.shardIterator(), recordsCaptor.getValue().shardIterator());
}

private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records)
throws InterruptedException, ExecutionException {
private CompletableFuture<GetRecordsResponse> makeGetRecordsResponse(String nextIterator, List<Record> records) {
List<ChildShard> childShards = new ArrayList<>();
if(nextIterator == null) {
childShards = createChildShards();
Expand Down Expand Up @@ -368,7 +365,6 @@ public void testFetcherDoesNotAdvanceWithoutAccept() throws InterruptedException
final String initialIterator = "InitialIterator";
final String nextIterator1 = "NextIteratorOne";
final String nextIterator2 = "NextIteratorTwo";
final String nextIterator3 = "NextIteratorThree";
final CompletableFuture<GetRecordsResponse> nonAdvancingResult1 = makeGetRecordsResponse(initialIterator, null);
final CompletableFuture<GetRecordsResponse> nonAdvancingResult2 = makeGetRecordsResponse(nextIterator1, null);
final CompletableFuture<GetRecordsResponse> finalNonAdvancingResult = makeGetRecordsResponse(nextIterator2,
Expand All @@ -378,7 +374,7 @@ public void testFetcherDoesNotAdvanceWithoutAccept() throws InterruptedException
final CompletableFuture<GetRecordsResponse> finalAdvancingResult = makeGetRecordsResponse(null, null);

when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(initialIterator));
.thenReturn(makeGetShardIteratorResponse(initialIterator));
when(kinesisClient.getRecords(recordsCaptor.capture())).thenReturn(nonAdvancingResult1, advancingResult1,
nonAdvancingResult2, advancingResult2, finalNonAdvancingResult, finalAdvancingResult);

Expand All @@ -397,8 +393,6 @@ public void testFetcherDoesNotAdvanceWithoutAccept() throws InterruptedException
assertAdvanced(finalAdvancingResult.get(), nextIterator2, null);
verify(kinesisClient, times(6)).getRecords(any(GetRecordsRequest.class));



reset(kinesisClient);

DataFetcherResult terminal = kinesisDataFetcher.getRecords();
Expand Down Expand Up @@ -444,7 +438,7 @@ public void testRestartIteratorUsesAfterSequenceNumberIteratorType() throws Exce
ArgumentCaptor.forClass(GetShardIteratorRequest.class);

when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())).
thenReturn(makeGetShardIteratorResonse(iterator));
thenReturn(makeGetShardIteratorResponse(iterator));

kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST);
kinesisDataFetcher.restartIterator();
Expand Down Expand Up @@ -547,10 +541,9 @@ private void testInitializeAndFetch(final String iteratorType, final String seqN
} else if (iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString())) {
expectedIteratorRequest = expectedIteratorRequest.toBuilder().startingSequenceNumber(seqNo).build();
}
final GetRecordsRequest expectedRecordsRequest = makeGetRecordsRequest(iterator);

when(kinesisClient.getShardIterator(iteratorCaptor.capture()))
.thenReturn(makeGetShardIteratorResonse(iterator));
.thenReturn(makeGetShardIteratorResponse(iterator));

when(kinesisClient.getRecords(recordsCaptor.capture()))
.thenReturn(makeGetRecordsResponse(null, expectedRecords));
Expand Down