Skip to content

Commit

Permalink
fix batch consuming issue (#25664)
Browse files Browse the repository at this point in the history
  • Loading branch information
jialigit authored Nov 25, 2021
1 parent 7264c48 commit 1f36a79
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.test.eventhubs.stream.binder;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.TestPropertySource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(classes = EventHubBinderConsumingBatchModeIT.TestConfig.class)
@TestPropertySource(properties =
{
"spring.cloud.stream.eventhub.bindings.consume-in-0.consumer.checkpoint-mode=BATCH",
"spring.cloud.stream.eventhub.bindings.consume-in-0.consumer.max-batch-size=10",
"spring.cloud.stream.eventhub.bindings.consume-in-0.consumer.max-wait-time=2s",
"spring.cloud.stream.bindings.consume-in-0.destination=test-eventhub-consuming-batch",
"spring.cloud.stream.bindings.supply-out-0.destination=test-eventhub-consuming-batch",
"spring.cloud.azure.eventhub.checkpoint-container=test-eventhub-consuming-batch",
"spring.cloud.stream.bindings.consume-in-0.consumer.batch-mode=true"
})
public class EventHubBinderConsumingBatchModeIT {

private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderConsumingBatchModeIT.class);

private static final String MESSAGE = UUID.randomUUID().toString();

private static final CountDownLatch LATCH = new CountDownLatch(1);

@Autowired
private Sinks.Many<Message<String>> many;

@EnableAutoConfiguration
public static class TestConfig {

@Bean
public Sinks.Many<Message<String>> many() {
return Sinks.many().unicast().onBackpressureBuffer();
}

@Bean
public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
return () -> many.asFlux()
.doOnNext(m -> LOGGER.info("Manually sending message {}", m.getPayload()))
.doOnError(t -> LOGGER.error("Error encountered", t));
}

@Bean
public Consumer<Message<List<String>>> consume() {
return message -> {
List<String> payload = message.getPayload();
LOGGER.info("EventHubBinderBatchModeIT: New message received: '{}'", payload);
if (payload.contains(EventHubBinderConsumingBatchModeIT.MESSAGE)) {
LATCH.countDown();
}
};
}
}

@Test
public void testSendAndReceiveMessage() throws InterruptedException {
LOGGER.info("EventHubBinderBatchModeIT begin.");
EventHubBinderConsumingBatchModeIT.LATCH.await(15, TimeUnit.SECONDS);
LOGGER.info("Send a message:" + MESSAGE + ".");
many.emitNext(new GenericMessage<>("\"" + MESSAGE + "\""), Sinks.EmitFailureHandler.FAIL_FAST);
assertThat(EventHubBinderConsumingBatchModeIT.LATCH.await(600, TimeUnit.SECONDS)).isTrue();
LOGGER.info("EventHubBinderBatchModeIT end.");
}
}
39 changes: 38 additions & 1 deletion sdk/spring/azure-spring-cloud-test-eventhubs/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@
"status": "Active"
}
},
{
"type": "Microsoft.EventHub/namespaces/eventhubs",
"apiVersion": "2017-04-01",
"name": "[concat(variables('eventHubsNamespaceName'), '/test-eventhub-consuming-batch')]",
"location": "[variables('location')]",
"dependsOn": [
"[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespaceName'))]"
],
"properties": {
"messageRetentionInDays": 1,
"partitionCount": 1,
"status": "Active"
}
},
{
"type": "Microsoft.EventHub/namespaces/eventhubs",
"apiVersion": "2017-04-01",
Expand Down Expand Up @@ -172,6 +186,17 @@
],
"properties": {}
},
{
"type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups",
"apiVersion": "2017-04-01",
"name": "[concat(variables('eventHubsNamespaceName'), '/test-eventhub-consuming-batch/$Default')]",
"location": "[variables('location')]",
"dependsOn": [
"[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventHubsNamespaceName'), 'test-eventhub-consuming-batch')]",
"[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespaceName'))]"
],
"properties": {}
},
{
"type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups",
"apiVersion": "2017-04-01",
Expand Down Expand Up @@ -311,6 +336,18 @@
"publicAccess": "None"
}
},
{
"type": "Microsoft.Storage/storageAccounts/blobServices/containers",
"apiVersion": "2019-06-01",
"name": "[concat(variables('storageAccountName'), '/default/test-eventhub-consuming-batch')]",
"dependsOn": [
"[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storageAccountName'), 'default')]",
"[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]"
],
"properties": {
"publicAccess": "None"
}
},
{
"type": "Microsoft.Storage/storageAccounts/blobServices/containers",
"apiVersion": "2019-06-01",
Expand Down Expand Up @@ -374,4 +411,4 @@
"value": "[listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespaceName'), variables('eventHubsNamespaceKeyName')), '2017-04-01').primaryConnectionString]"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -23,9 +24,9 @@
public class BatchCheckpointManager extends CheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(BatchCheckpointManager.class);
private static final String CHECKPOINT_FAIL_MSG = "Consumer group '%s' failed to checkpoint offset %s of message "
+ "%s on partition %s, last checkpointed message is %s";
+ "on partition %s in batch mode";
private static final String CHECKPOINT_SUCCESS_MSG =
"Consumer group '%s' checkpointed offset %s of message %s on partition %s in %s " + "mode";
"Consumer group '%s' succeed to checkpoint offset %s of message on partition %s in batch mode";

private final ConcurrentHashMap<String, EventData> lastEventByPartition = new ConcurrentHashMap<>();

Expand All @@ -51,15 +52,16 @@ public void completeBatch(EventContext context) {
}

public void onMessages(EventBatchContext context) {
EventData lastEvent = getLastEnqueuedEvent(context);
EventData lastEvent = getLastEventFromBatch(context);
if (lastEvent == null) {
return;
}
Long offset = lastEvent.getOffset();
String partitionId = context.getPartitionContext().getPartitionId();
String consumerGroup = context.getPartitionContext().getConsumerGroup();
context.updateCheckpointAsync()
.doOnError(t -> logCheckpointFail(context, offset, lastEvent,
lastEventByPartition.get(context.getPartitionContext().getPartitionId()), t))
.doOnSuccess(v -> {
this.lastEventByPartition.put(context.getPartitionContext().getPartitionId(), lastEvent);
logCheckpointSuccess(context, offset, lastEvent);
})
.doOnError(t -> logCheckpointFail(consumerGroup, partitionId, offset, t))
.doOnSuccess(v -> logCheckpointSuccess(consumerGroup, partitionId, offset))
.subscribe();
}

Expand All @@ -68,27 +70,25 @@ protected Logger getLogger() {
return LOG;
}

void logCheckpointFail(EventBatchContext context, Long offset, EventData lastEnqueuedEvent,
EventData lastCheckpointedEvent, Throwable t) {
if (getLogger().isWarnEnabled()) {
getLogger().warn(String
.format(CHECKPOINT_FAIL_MSG, context.getPartitionContext().getConsumerGroup(), offset,
lastEnqueuedEvent, lastCheckpointedEvent, context.getPartitionContext().getPartitionId()), t);
}
void logCheckpointFail(String consumerGroup, String partitionId, Long offset, Throwable t) {
getLogger().warn(String
.format(CHECKPOINT_FAIL_MSG, consumerGroup, offset, partitionId), t);
}

void logCheckpointSuccess(EventBatchContext context, Long offset, EventData lastEnqueuedEvent) {
void logCheckpointSuccess(String consumerGroup, String partitionId, Long offset) {
if (getLogger().isDebugEnabled()) {
getLogger().debug(String
.format(CHECKPOINT_SUCCESS_MSG, context.getPartitionContext().getConsumerGroup(), offset,
lastEnqueuedEvent, context.getPartitionContext().getPartitionId(),
this.checkpointConfig.getCheckpointMode()));
.format(CHECKPOINT_SUCCESS_MSG, consumerGroup, offset, partitionId));
}
}

EventData getLastEnqueuedEvent(EventBatchContext context) {
private EventData getLastEventFromBatch(EventBatchContext context) {
List<EventData> events = context.getEvents();
return events.get(events.size() - 1);
if (CollectionUtils.isEmpty(events)) {
return null;
}
EventData lastEvent = events.get(events.size() - 1);
return lastEvent;

}
}

0 comments on commit 1f36a79

Please sign in to comment.