Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Reactor exception thrown from event hub producer client #12500

Closed
davegarred opened this issue Jun 24, 2020 · 6 comments · Fixed by #12822
Closed

[BUG] Reactor exception thrown from event hub producer client #12500

davegarred opened this issue Jun 24, 2020 · 6 comments · Fixed by #12822
Assignees
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@davegarred
Copy link

Describe the bug
I'm getting an IllegalStateException thrown by a reactor component when trying to send a message using EventHubProducerClient in the eventhubs Java SDK.

Exception or Stack Trace

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77)
	at reactor.core.publisher.Mono.block(Mono.java:1496)
	at com.azure.messaging.eventhubs.EventHubProducerClient.send(EventHubProducerClient.java:192)
	... (event transformation within our application)
	at com.azure.messaging.eventhubs.EventProcessorClientBuilder$1.processEventBatch(EventProcessorClientBuilder.java:441)
	at com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:248)
	at com.azure.messaging.eventhubs.PartitionPumpManager.lambda$startPartitionPump$1(PartitionPumpManager.java:172)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:695)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:571)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:971)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1605)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121)
	at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:336)
	at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:232)
	at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:312)
	at reactor.core.publisher.UnicastProcessor.onComplete(UnicastProcessor.java:414)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.drainLoop(FluxWindowTimeout.java:339)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber$ConsumerIndexHolder.run(FluxWindowTimeout.java:440)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
        ...

To Reproduce

  1. Receive an event off of an event hub EventProcessorClientBuilder.
  2. Transform event.
  3. Attempt to publish an EventData using EventHubProducerClient.send within the same thread.

Code Snippet

final byte[] messageBytes = message.getBytes(UTF_8);
final EventData eventData = new EventData(messageBytes);
eventHubProducerClient.send(eventData);

Expected behavior
The message should be published or a valid exception should be thrown.

Screenshots
n/a

Setup (please complete the following information):
com.azure:azure-messaging-eventhubs:5.1.1

Additional context
n/a

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [ x ] Bug Description Added
  • [ x ] Repro Steps Added
  • [ x ] Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jun 24, 2020
@JonathanGiles JonathanGiles added Event Hubs and removed needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. labels Jun 24, 2020
@davegarred
Copy link
Author

The problem appears to originate in EventProcessorClient, this delivers an EventBatchContext on a custom Reactor thread reactor.core.scheduler.ReactorThreadFactory$NonBlockingThread.

Additional Reactor components are used in the EventHubProducerClient and that's when everything breaks via:

reactor.core.publisher.BlockingSingleSubscriber:76
if (Schedulers.isInNonBlockingThread()) {
  throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}

reactor.core.scheduler.Schedulers:398
public static boolean isInNonBlockingThread() {
  return Thread.currentThread() instanceof NonBlocking;
}

Workaround

The workaround that we are using for now is to process events individually (accepting EventContext) instead of within a batch (EventBatchContext). This correctly delivers the event via a simple java.lang.Thread.

@srnagar
Copy link
Member

srnagar commented Jun 25, 2020

@davegarred Thanks for reporting the issue. I tried reproducing this issue using the codesnippet below and I was able to send the events successfully. Can you please take a look and let us know if there is anything that you are doing differently and share a reproducible sample with us?

public static void main(String[] args) {
    String eventHubConnectionString = "REDACTED";
    EventHubProducerClient eventHubProducerClient = new EventHubClientBuilder()
        .connectionString(eventHubConnectionString)
        .buildProducerClient();

    EventProcessorClient processor = new EventProcessorClientBuilder()
        .connectionString(eventHubConnectionString)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .checkpointStore(new SampleCheckpointStore())
        .retry(new AmqpRetryOptions().setMaxRetries(0))
        .processEventBatch(eventBatchContext -> onEventBatch(eventBatchContext, eventHubProducerClient), 100)
        .processError(context -> {
            System.out.println("Error occurred on partition: " +
                context.getPartitionContext().getPartitionId());
        })
        .processPartitionInitialization(initializationContext -> {
            System.out.println("Started receiving on partition: " +
                initializationContext.getPartitionContext().getPartitionId());
        })
        .processPartitionClose(closeContext -> {
            System.out.println("Stopped receiving on partition: " +
                closeContext.getPartitionContext().getPartitionId());
        })
        .buildEventProcessorClient();

    processor.start();
    System.out.println("Processor started");
}

private static void onEventBatch(EventBatchContext eventBatchContext,
    EventHubProducerClient eventHubProducerClient) {
    String partitionId = eventBatchContext.getPartitionContext().getPartitionId();
    int batchSize = eventBatchContext.getEvents().size();
    System.out.println("Received an event batch of size " + batchSize + " on partition " + partitionId);
    eventBatchContext.updateCheckpoint();
    eventHubProducerClient.send(Arrays.asList(new EventData("Event batch received " + batchSize)));
    System.out.println("Successfully sent message from partition " + partitionId);
}

Output:

Processor started
Started receiving on partition: 4
Started receiving on partition: 2
Started receiving on partition: 3
Started receiving on partition: 0
Started receiving on partition: 1
Received an event batch of size 100 on partition 2
Received an event batch of size 100 on partition 3
Received an event batch of size 100 on partition 0
Received an event batch of size 100 on partition 1
Received an event batch of size 100 on partition 4
Successfully sent message from partition 2
Successfully sent message from partition 4
Successfully sent message from partition 3
Successfully sent message from partition 1
Successfully sent message from partition 0
Received an event batch of size 100 on partition 3
Received an event batch of size 100 on partition 1
Received an event batch of size 100 on partition 4
Received an event batch of size 100 on partition 0
Received an event batch of size 100 on partition 2
Successfully sent message from partition 1
Successfully sent message from partition 3
Successfully sent message from partition 4
Successfully sent message from partition 0
Successfully sent message from partition 2

@davegarred
Copy link
Author

davegarred commented Jun 26, 2020

Hi @srnagar ,
Thanks for taking a look. I have been able to recreate the issue with something similar to what you have.

  private EventProcessorClient getEventProcessorClient() {
    final BlobContainerAsyncClient blobContainerAsyncClient = getBlobContainerAsyncClient(
        this.credential);
    final EventProcessorClient eventProcessorClient =
        new EventProcessorClientBuilder()
            .credential(FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME, credential)
            .consumerGroup(CONSUMER_GROUP_NAME)

            // this is broken
            .processEventBatch(partitionBatchProcessor(producer), 100, Duration.ofSeconds(3))

            // but this returns a correct java.lang.Thread and works correctly
//            .processEvent(partitionProcessor(producer))

            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
            .buildEventProcessorClient();

    return eventProcessorClient;
  }

  private final Consumer<EventBatchContext> partitionBatchProcessor(final Producer producer) {
    return batchContext -> {
      final String tt = Thread.currentThread().getClass().getName();
      // --------------------------- thread type: reactor.core.scheduler.ReactorThreadFactory$NonBlockingThread
      System.out.println("--------------------------- thread type: " + tt);
      batchContext.getEvents().forEach(e -> System.out.println(e.getBodyAsString()));
      // fails in publishMessage
      producer.publishMessage("{}", emptyMap());
      batchContext.updateCheckpoint();
    };
  }

  private final Consumer<EventContext> partitionProcessor(final Producer producer) {
    return eventContext -> {
      final String tt = Thread.currentThread().getClass().getName();
      // --------------------------- thread type: java.lang.Thread
      System.out.println("--------------------------- thread type: " + tt);
      producer.publishMessage("{}", emptyMap());
      eventContext.updateCheckpoint();
      // successfully returns
    };
  }

// in Publisher
  public EventData publishMessage(String message, Map<String, String> eventProperties) {
    final EventData eventData = new EventData(message);
    EventDataBatch eventDataBatch = this.clientProvider.getClient().createBatch();
    eventDataBatch.tryAdd(eventDataToBeSent);
    this.clientProvider.getClient().send(eventDataBatch);
    return eventData;
  }

T
The resulting exception

Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
	at reactor.core.publisher.Mono.block(Mono.java:1690)
	at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:126)
	at com.starbucks.cdx.evh.demo.messaging.producer.Producer.publishMessage(Producer.java:37)
	at com.starbucks.cdx.evh.demo.messaging.consumer.ConsumerClientProvider.lambda$partitionBatchProcessor$1(ConsumerClientProvider.java:87)
	at com.azure.messaging.eventhubs.EventProcessorClientBuilder$1.processEventBatch(EventProcessorClientBuilder.java:441)
	at com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:248)
	... 22 common frames omitted

My dependencies:

    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-storage-blob</artifactId>
      <version>12.7.0</version>
    </dependency>
    <!-- EventHubs Consumer CHeckpoint-->
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.1.1</version>
    </dependency>
    <!--Azure Identity    -->
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-identity</artifactId>
      <version>1.0.7</version>
    </dependency>

I would encourage you to drop a break point in your consuming function, you should be able to verify that the thread provided to you is of type reactor.core.scheduler.ReactorThreadFactory$NonBlockingThread.

Thanks,
Dave

@srnagar
Copy link
Member

srnagar commented Jul 6, 2020

@davegarred I have a PR to fix this issue here - #12822

@davegarred
Copy link
Author

@davegarred I have a PR to fix this issue here - #12822

Thanks for the help!

openapi-sdkautomation bot referenced this issue in AzureSDKAutomation/azure-sdk-for-java Jan 27, 2021
Dev kubernetesconfiguration microsoft.kubernetes configuration stable 2021 03 01 (#12500)

* Adds base for updating Microsoft.KubernetesConfiguration from version preview/2020-10-01-preview to version 2021-03-01

* Updates readme

* Updates API version in new specs and examples

* Fixed review comment

* Fixed reference path error

* Fixed casing error in enum
@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
@moarychan
Copy link
Member

Related PR #43063

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants