Skip to content

Service bus topic stream binder e2e tests guide

Warren Zhu edited this page Nov 27, 2018 · 1 revision

Produce in default async mode and consume in MANUAL checkpoint mode

  1. Use service bus topic binder sample as starting point
  2. Change SourceExample as below to sent many messages once:
    @PostMapping("/messages")
    public String postMessage(@RequestParam String message) {
        IntStream.range(0, 6).forEach(i -> {
            Map<String, Object> headers = new HashMap<>();
            headers.put("index", i);
            this.source.output().send(new GenericMessage<>(message + i, headers));
        });
        return message;
    }
  1. Enable debug level log in application.properties logging.level.com.microsoft.azure.spring=DEBUG
  2. Run the sample.
ServiceBusTemplate checkpoint config becomes: CheckpointConfig(checkpointMode=MANUAL, checkpointCount=0)
DefaultMessageHandler sync becomes: false
  1. Post message to produce
  2. You should see logs like below:

index header and message should be out of order since this is async mode.

GenericMessage [payload=byte[6], headers={index=4, id=96589f7d-45d8-3083-e18b-7f118145db3d, contentType=application/json, timestamp=1542943635113}] sent successfully in async mode
New message received: 'hello3'
Message 'hello3' successfully checkpointed

Consume in RECORD checkpoint mode

  1. Change checkpoint-mode into RECORD. Remove checkpointer parameter
  2. Run sample
ServiceBusTemplate checkpoint config becomes: CheckpointConfig(checkpointMode=RECORD, checkpointCount=0)
  1. Post message to produce
  2. You should see each message checkpointed once like below, raw_id should be equal to produced message id:
Consumer group 'cg1' of topic 'topic1' checkpointed GenericMessage [payload=byte[6], headers={id=373acb51-ff5c-5c1b-0b42-7503072ef1b4, raw_id=250a2005-65ea-67ae-3dd9-ba6c6a8e00c4, contentType=application/json, timestamp=1543284909638}] in RECORD mode

Produce in sync mode

  1. change spring.cloud.stream.eventhub.bindings.output.producer.sync into true
  2. Run sample
DefaultMessageHandler sync becomes: true
Started DefaultMessageHandler with properties: {sendTimeout=ValueExpression [value=10000], destination=topic1, sync=true}
  1. Post message to produce
  2. You should see messages are sent one by one with increasing header index.
GenericMessage [payload=byte[6], headers={index=0, id=96589f7d-45d8-3083-e18b-7f118145db3d, contentType=application/json, timestamp=1542943635113}] sent successfully in async mode