Skip to content

Event hub stream binder e2e tests guide

Warren Zhu edited this page Dec 7, 2018 · 5 revisions

Produce in default async mode and consume in MANUAL checkpoint mode and latest startPostition

  1. Use event hub binder sample as starting point
  2. Change SourceExample as below to sent many messages once:
    @PostMapping("/messages")
    public String postMessage(@RequestParam String message) {
        IntStream.range(0, 6).forEach(i -> {
            Map<String, Object> headers = new HashMap<>();
            headers.put("index", i);
            this.source.output().send(new GenericMessage<>(message + i, headers));
        });
        return message;
    }
  1. Enable debug level log in application.properties logging.level.com.microsoft.azure.spring=DEBUG
  2. Run the sample.
EventHubTemplate startPosition becomes: LATEST
EventHubTemplate checkpoint config becomes: CheckpointConfig(checkpointMode=MANUAL, checkpointCount=0)
DefaultMessageHandler sync becomes: false
  1. Post message to produce
  2. You should see logs like below:

index header and message should be out of order since this is async mode.

GenericMessage [payload=byte[6], headers={index=4, id=96589f7d-45d8-3083-e18b-7f118145db3d, contentType=application/json, timestamp=1542943635113}] sent successfully in async mode
New message received: 'hello3'
Message 'hello3' successfully checkpointed

Consume in RECORD checkpoint mode

  1. Change checkpoint-mode into RECORD. Remove checkpointer parameter
  2. Run sample
EventHubTemplate checkpoint config becomes: CheckpointConfig(checkpointMode=RECORD, checkpointCount=0)
  1. Post message to produce
  2. You should see each message checkpointed once like below:
Consumer group 'cg1' checkpointed {body=hello1, offset=3592, sequenceNumber=38, enqueuedTime=2018-11-23T03:36:21.058Z} on partition 0 in RECORD mode

Consume in BATCH checkpoint mode

  1. Change checkpoint-mode into BATCH. Remove checkpointer parameter
  2. Run sample
EventHubTemplate checkpoint config becomes: CheckpointConfig(checkpointMode=BATCH, checkpointCount=0)
  1. Post message to produce
  2. You should see each batch message checkpointed once like below. Currently, each batch only contains one message, will confirm with event hub how this batch works:
Consumer group 'cg1' checkpointed {body=hello1, offset=3592, sequenceNumber=38, enqueuedTime=2018-11-23T03:36:21.058Z} on partition 0 in BATCH mode

Consume in PARTITION_COUNT checkpoint mode

  1. Change checkpoint-mode into PARTITION_COUNT. Change spring.cloud.stream.eventhub.bindings.input.consumer.checkpoint-count=3
  2. Run sample
EventHubTemplate checkpoint config becomes: CheckpointConfig(checkpointMode=PARTITION_COUNT, checkpointCount=3)
  1. Post message to produce. You might need to post multiples times when event hub has many partitions
  2. You should see that two continuous checkpoints on the same partition have the sequenceNumber difference as checkpoint count:
Consumer group 'cg3' checkpointed {body=hello2, offset=66304, sequenceNumber=1182, enqueuedTime=2018-12-07T06:17:01.102Z} on partition 3 in PARTITION_COUNT mode
Consumer group 'cg3' checkpointed {body=hello0, offset=66592, sequenceNumber=1185, enqueuedTime=2018-12-07T06:17:37.515Z} on partition 3 in PARTITION_COUNT mode

Produce in sync mode

  1. change spring.cloud.stream.eventhub.bindings.output.producer.sync into true
  2. Run sample
DefaultMessageHandler sync becomes: true
Started DefaultMessageHandler with properties: {sendTimeout=ValueExpression [value=10000], destination=eventhub2, sync=true}
  1. Post message to produce
  2. You should see messages are sent one by one with increasing header index.
GenericMessage [payload=byte[6], headers={index=0, id=96589f7d-45d8-3083-e18b-7f118145db3d, contentType=application/json, timestamp=1542943635113}] sent successfully in async mode

Consume in earliest startPosition

  1. change group into new group and checkpoint-storage-account into new account
  2. change spring.cloud.stream.eventhub.bindings.input.consumer.start-position into earliest
  3. Run sample
  4. You should receive all messages sent before.
EventHubTemplate startPosition becomes: EARLIEST