Skip to content

Commit

Permalink
* fix connection open/close timeout paths
Browse files Browse the repository at this point in the history
* use non ForkJoinPool variants while composing completable futures (Azure#4)
* fix thread-safety in MessageSender & MessageReceiver in error cases(Azure#4)
  • Loading branch information
SreeramGarlapati committed Apr 5, 2017
1 parent b378b50 commit f0e0b65
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static CompletableFuture<EventHubClient> createFromConnectionString(final
final EventHubClient eventHubClient = new EventHubClient(connStr);

return MessagingFactory.createFromConnectionString(connectionString.toString(), retryPolicy)
.thenApplyAsync(new Function<MessagingFactory, EventHubClient>() {
.thenApply(new Function<MessagingFactory, EventHubClient>() {
@Override
public EventHubClient apply(MessagingFactory factory) {
eventHubClient.underlyingFactory = factory;
Expand Down Expand Up @@ -205,7 +205,7 @@ public final CompletableFuture<Void> send(final EventData data) {
throw new IllegalArgumentException("EventData cannot be empty.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(data.toAmqpMessage());
Expand Down Expand Up @@ -295,7 +295,7 @@ public final CompletableFuture<Void> send(final Iterable<EventData> eventDatas)
throw new IllegalArgumentException("Empty batch of EventData cannot be sent.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas));
Expand Down Expand Up @@ -371,7 +371,7 @@ public final CompletableFuture<Void> send(final EventData eventData, final Strin
throw new IllegalArgumentException("partitionKey cannot be null");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(eventData.toAmqpMessage(partitionKey));
Expand Down Expand Up @@ -445,7 +445,7 @@ public final CompletableFuture<Void> send(final Iterable<EventData> eventDatas,
String.format(Locale.US, "PartitionKey exceeds the maximum allowed length of partitionKey: {0}", ClientConstants.MAX_PARTITION_KEY_LENGTH));
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey));
Expand Down Expand Up @@ -1222,7 +1222,7 @@ public CompletableFuture<Void> onClose() {
if (this.underlyingFactory != null) {
synchronized (this.senderCreateSync) {
final CompletableFuture<Void> internalSenderClose = this.sender != null
? this.sender.close().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
? this.sender.close().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.underlyingFactory.close();
Expand All @@ -1242,7 +1242,7 @@ private CompletableFuture<Void> createInternalSender() {
synchronized (this.senderCreateSync) {
if (!this.isSenderCreateStarted) {
this.createSender = MessageSender.create(this.underlyingFactory, StringUtil.getRandomString(), this.eventHubName)
.thenAcceptAsync(new Consumer<MessageSender>() {
.thenAccept(new Consumer<MessageSender>() {
public void accept(MessageSender a) {
EventHubClient.this.sender = a;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static CompletableFuture<PartitionReceiver> create(MessagingFactory factory,
}

final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver, receiverOptions);
return receiver.createInternalReceiver().thenApplyAsync(new Function<Void, PartitionReceiver>() {
return receiver.createInternalReceiver().thenApply(new Function<Void, PartitionReceiver>() {
public PartitionReceiver apply(Void a) {
return receiver;
}
Expand All @@ -136,7 +136,7 @@ private CompletableFuture<Void> createInternalReceiver() throws ServiceBusExcept
StringUtil.getRandomString(),
String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId),
PartitionReceiver.DEFAULT_PREFETCH_COUNT, this)
.thenAcceptAsync(new Consumer<MessageReceiver>() {
.thenAccept(new Consumer<MessageReceiver>() {
public void accept(MessageReceiver r) {
PartitionReceiver.this.internalReceiver = r;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private PartitionSender(MessagingFactory factory, String eventHubName, String pa
static CompletableFuture<PartitionSender> Create(MessagingFactory factory, String eventHubName, String partitionId) throws ServiceBusException {
final PartitionSender sender = new PartitionSender(factory, eventHubName, partitionId);
return sender.createInternalSender()
.thenApplyAsync(new Function<Void, PartitionSender>() {
.thenApply(new Function<Void, PartitionSender>() {
public PartitionSender apply(Void a) {
return sender;
}
Expand All @@ -47,7 +47,7 @@ public PartitionSender apply(Void a) {
private CompletableFuture<Void> createInternalSender() throws ServiceBusException {
return MessageSender.create(this.factory, StringUtil.getRandomString(),
String.format("%s/Partitions/%s", this.eventHubName, this.partitionId))
.thenAcceptAsync(new Consumer<MessageSender>() {
.thenAccept(new Consumer<MessageSender>() {
public void accept(MessageSender a) {
PartitionSender.this.internalSender = a;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand Down Expand Up @@ -100,21 +101,26 @@ private class OpenRequestResponseChannel implements IOperation<RequestResponseCh
@Override
public void run(IOperationResult<RequestResponseChannel, Exception> operationCallback) {

Session session = CBSChannel.this.sessionProvider.getSession(
"cbs-session",
null,
new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition error, Exception exception) {
if (error != null)
operationCallback.onError(new AmqpException(error));
else if (exception != null)
operationCallback.onError(exception);
}
});

if (session == null)
return;

final RequestResponseChannel requestResponseChannel = new RequestResponseChannel(
"cbs",
ClientConstants.CBS_ADDRESS,
CBSChannel.this.sessionProvider.getSession(
"cbs-session",
null,
new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition error, Exception exception) {
if (error != null)
operationCallback.onError(new AmqpException(error));
else if (exception != null)
operationCallback.onError(exception);
}
}));
session);

requestResponseChannel.open(
new IOperationResult<Void, Exception>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class MessageReceiver extends ClientEntity implements IAmqpReceiver
private final ConcurrentLinkedQueue<Message> prefetchedMessages;
private final ReceiveWork receiveWork;
private final CreateAndReceive createAndReceive;
private final Object lastKnownLinkErrorLock;

private int prefetchCount;
private Receiver receiveLink;
Expand Down Expand Up @@ -95,6 +96,7 @@ private MessageReceiver(final MessagingFactory factory,
this.linkOpen = new WorkItem<>(new CompletableFuture<>(), factory.getOperationTimeout());

this.pendingReceives = new ConcurrentLinkedQueue<>();
this.lastKnownLinkErrorLock = new Object();

// onOperationTimeout delegate - per receive call
this.onOperationTimedout = new Runnable() {
Expand Down Expand Up @@ -330,11 +332,6 @@ public void onReceiveComplete(Delivery delivery) {
this.receiveWork.onEvent();
}

public void onError(final ErrorCondition error) {
final Exception completionException = ExceptionUtil.toException(error);
this.onError(completionException);
}

@Override
public void onError(final Exception exception) {
this.prefetchedMessages.clear();
Expand Down Expand Up @@ -460,7 +457,7 @@ public void accept(Session session) {
@Override
public void accept(ErrorCondition t, Exception u) {
if (t != null)
onError(t);
onError((t != null && t.getCondition() != null) ? ExceptionUtil.toException(t) : null);
else if (u != null)
onError(u);
}
Expand Down Expand Up @@ -526,12 +523,19 @@ private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) {
new Runnable() {
public void run() {
if (!linkOpen.getWork().isDone()) {
Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()),
MessageReceiver.this.lastKnownLinkError);
final Receiver link;
final Exception lastReportedLinkError;
synchronized (lastKnownLinkErrorLock) {
link = MessageReceiver.this.receiveLink;
lastReportedLinkError = MessageReceiver.this.lastKnownLinkError;
}

final Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", link.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()),
lastReportedLinkError);
if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Open"),
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Open"),
operationTimedout);
}

Expand All @@ -549,10 +553,15 @@ private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) {
new Runnable() {
public void run() {
if (!linkClose.isDone()) {
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", MessageReceiver.this.receiveLink.getName(), ZonedDateTime.now()));
final Receiver link;
synchronized (lastKnownLinkErrorLock) {
link = MessageReceiver.this.receiveLink;
}

final Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now()));
if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Close"),
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Close"),
operationTimedout);
}

Expand All @@ -567,25 +576,27 @@ public void run() {

@Override
public void onClose(ErrorCondition condition) {
if (condition == null || condition.getCondition() == null) {
this.onError((Exception) null);
} else {
this.onError(condition);
}
final Exception completionException = (condition != null && condition.getCondition() != null) ? ExceptionUtil.toException(condition) : null;
this.onError(completionException);
}

@Override
public ErrorContext getContext() {
final Receiver link;
synchronized (this.lastKnownLinkErrorLock) {
link = this.receiveLink;
}

final boolean isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone();
final String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
: ((this.receiveLink != null) ? this.receiveLink.getName() : null);
final String referenceId = link != null && link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
? link.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
: ((link != null) ? link.getName() : null);

ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
final ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
this.receivePath,
referenceId,
isLinkOpened ? this.prefetchCount : null,
isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit() : null,
isLinkOpened && link != null ? link.getCredit() : null,
isLinkOpened && this.prefetchedMessages != null ? this.prefetchedMessages.size() : null);

return errorContext;
Expand Down
Loading

0 comments on commit f0e0b65

Please sign in to comment.