Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeout issue in MsgFactory & thread-safety issue in MsgSender/Receiver #92

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static CompletableFuture<EventHubClient> createFromConnectionString(final
final EventHubClient eventHubClient = new EventHubClient(connStr);

return MessagingFactory.createFromConnectionString(connectionString.toString(), retryPolicy)
.thenApplyAsync(new Function<MessagingFactory, EventHubClient>() {
.thenApply(new Function<MessagingFactory, EventHubClient>() {
@Override
public EventHubClient apply(MessagingFactory factory) {
eventHubClient.underlyingFactory = factory;
Expand Down Expand Up @@ -205,7 +205,7 @@ public final CompletableFuture<Void> send(final EventData data) {
throw new IllegalArgumentException("EventData cannot be empty.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(data.toAmqpMessage());
Expand Down Expand Up @@ -295,7 +295,7 @@ public final CompletableFuture<Void> send(final Iterable<EventData> eventDatas)
throw new IllegalArgumentException("Empty batch of EventData cannot be sent.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas));
Expand Down Expand Up @@ -371,7 +371,7 @@ public final CompletableFuture<Void> send(final EventData eventData, final Strin
throw new IllegalArgumentException("partitionKey cannot be null");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(eventData.toAmqpMessage(partitionKey));
Expand Down Expand Up @@ -445,7 +445,7 @@ public final CompletableFuture<Void> send(final Iterable<EventData> eventDatas,
String.format(Locale.US, "PartitionKey exceeds the maximum allowed length of partitionKey: {0}", ClientConstants.MAX_PARTITION_KEY_LENGTH));
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey));
Expand Down Expand Up @@ -1222,7 +1222,7 @@ public CompletableFuture<Void> onClose() {
if (this.underlyingFactory != null) {
synchronized (this.senderCreateSync) {
final CompletableFuture<Void> internalSenderClose = this.sender != null
? this.sender.close().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
? this.sender.close().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClient.this.underlyingFactory.close();
Expand All @@ -1242,7 +1242,7 @@ private CompletableFuture<Void> createInternalSender() {
synchronized (this.senderCreateSync) {
if (!this.isSenderCreateStarted) {
this.createSender = MessageSender.create(this.underlyingFactory, StringUtil.getRandomString(), this.eventHubName)
.thenAcceptAsync(new Consumer<MessageSender>() {
.thenAccept(new Consumer<MessageSender>() {
public void accept(MessageSender a) {
EventHubClient.this.sender = a;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static CompletableFuture<PartitionReceiver> create(MessagingFactory factory,
}

final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver, receiverOptions);
return receiver.createInternalReceiver().thenApplyAsync(new Function<Void, PartitionReceiver>() {
return receiver.createInternalReceiver().thenApply(new Function<Void, PartitionReceiver>() {
public PartitionReceiver apply(Void a) {
return receiver;
}
Expand All @@ -136,7 +136,7 @@ private CompletableFuture<Void> createInternalReceiver() throws ServiceBusExcept
StringUtil.getRandomString(),
String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId),
PartitionReceiver.DEFAULT_PREFETCH_COUNT, this)
.thenAcceptAsync(new Consumer<MessageReceiver>() {
.thenAccept(new Consumer<MessageReceiver>() {
public void accept(MessageReceiver r) {
PartitionReceiver.this.internalReceiver = r;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private PartitionSender(MessagingFactory factory, String eventHubName, String pa
static CompletableFuture<PartitionSender> Create(MessagingFactory factory, String eventHubName, String partitionId) throws ServiceBusException {
final PartitionSender sender = new PartitionSender(factory, eventHubName, partitionId);
return sender.createInternalSender()
.thenApplyAsync(new Function<Void, PartitionSender>() {
.thenApply(new Function<Void, PartitionSender>() {
public PartitionSender apply(Void a) {
return sender;
}
Expand All @@ -47,7 +47,7 @@ public PartitionSender apply(Void a) {
private CompletableFuture<Void> createInternalSender() throws ServiceBusException {
return MessageSender.create(this.factory, StringUtil.getRandomString(),
String.format("%s/Partitions/%s", this.eventHubName, this.partitionId))
.thenAcceptAsync(new Consumer<MessageSender>() {
.thenAccept(new Consumer<MessageSender>() {
public void accept(MessageSender a) {
PartitionSender.this.internalSender = a;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand Down Expand Up @@ -100,21 +101,26 @@ private class OpenRequestResponseChannel implements IOperation<RequestResponseCh
@Override
public void run(IOperationResult<RequestResponseChannel, Exception> operationCallback) {

Session session = CBSChannel.this.sessionProvider.getSession(
"cbs-session",
null,
new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition error, Exception exception) {
if (error != null)
operationCallback.onError(new AmqpException(error));
else if (exception != null)
operationCallback.onError(exception);
}
});

if (session == null)
return;

final RequestResponseChannel requestResponseChannel = new RequestResponseChannel(
"cbs",
ClientConstants.CBS_ADDRESS,
CBSChannel.this.sessionProvider.getSession(
"cbs-session",
null,
new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition error, Exception exception) {
if (error != null)
operationCallback.onError(new AmqpException(error));
else if (exception != null)
operationCallback.onError(exception);
}
}));
session);

requestResponseChannel.open(
new IOperationResult<Void, Exception>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ public final void closeSync() throws ServiceBusException {
}
}

protected final void throwIfClosed(Throwable cause) {
protected final void throwIfClosed() {
if (this.getIsClosingOrClosed()) {
throw new IllegalStateException(String.format(Locale.US, "Operation not allowed after the %s instance is Closed.", this.getClass().getName()), cause);
throw new IllegalStateException(String.format(Locale.US, "Operation not allowed after the %s instance is Closed.", this.getClass().getName()), this.getLastKnownError());
}
}

protected Exception getLastKnownError() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class MessageReceiver extends ClientEntity implements IAmqpReceiver
private final ConcurrentLinkedQueue<Message> prefetchedMessages;
private final ReceiveWork receiveWork;
private final CreateAndReceive createAndReceive;
private final Object errorCasesLock;

private int prefetchCount;
private Receiver receiveLink;
Expand Down Expand Up @@ -95,6 +96,7 @@ private MessageReceiver(final MessagingFactory factory,
this.linkOpen = new WorkItem<>(new CompletableFuture<>(), factory.getOperationTimeout());

this.pendingReceives = new ConcurrentLinkedQueue<>();
this.errorCasesLock = new Object();

// onOperationTimeout delegate - per receive call
this.onOperationTimedout = new Runnable() {
Expand Down Expand Up @@ -250,7 +252,7 @@ public void setReceiveTimeout(final Duration value) {
}

public CompletableFuture<Collection<Message>> receive(final int maxMessageCount) {
this.throwIfClosed(this.lastKnownLinkError);
this.throwIfClosed();

if (maxMessageCount <= 0 || maxMessageCount > this.prefetchCount) {
throw new IllegalArgumentException(String.format(Locale.US, "parameter 'maxMessageCount' should be a positive number and should be less than prefetchCount(%s)", this.prefetchCount));
Expand Down Expand Up @@ -289,7 +291,9 @@ public void onOpenComplete(Exception exception) {
this.openTimer.cancel(false);
}

this.lastKnownLinkError = null;
synchronized (this.errorCasesLock) {
this.lastKnownLinkError = null;
}

this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());

Expand Down Expand Up @@ -330,11 +334,6 @@ public void onReceiveComplete(Delivery delivery) {
this.receiveWork.onEvent();
}

public void onError(final ErrorCondition error) {
final Exception completionException = ExceptionUtil.toException(error);
this.onError(completionException);
}

@Override
public void onError(final Exception exception) {
this.prefetchedMessages.clear();
Expand All @@ -358,7 +357,9 @@ public void onError(final Exception exception) {

this.linkClose.complete(null);
} else {
this.lastKnownLinkError = exception == null ? this.lastKnownLinkError : exception;
synchronized (this.errorCasesLock) {
this.lastKnownLinkError = exception == null ? this.lastKnownLinkError : exception;
}

final Exception completionException = exception == null
? new ServiceBusException(true, "Client encountered transient error for unknown reasons, please retry the operation.") : exception;
Expand Down Expand Up @@ -452,15 +453,17 @@ public void accept(Session session) {

receiver.open();

MessageReceiver.this.receiveLink = receiver;
synchronized (MessageReceiver.this.errorCasesLock) {
MessageReceiver.this.receiveLink = receiver;
}
}
};

final BiConsumer<ErrorCondition, Exception> onSessionOpenFailed = new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition t, Exception u) {
if (t != null)
onError(t);
onError((t != null && t.getCondition() != null) ? ExceptionUtil.toException(t) : null);
else if (u != null)
onError(u);
}
Expand Down Expand Up @@ -526,12 +529,19 @@ private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) {
new Runnable() {
public void run() {
if (!linkOpen.getWork().isDone()) {
Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()),
MessageReceiver.this.lastKnownLinkError);
final Receiver link;
final Exception lastReportedLinkError;
synchronized (errorCasesLock) {
link = MessageReceiver.this.receiveLink;
lastReportedLinkError = MessageReceiver.this.lastKnownLinkError;
}

final Exception operationTimedout = new TimeoutException(
String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", link.getName(), MessageReceiver.this.receivePath, ZonedDateTime.now()),
lastReportedLinkError);
if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Open"),
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Open"),
operationTimedout);
}

Expand All @@ -549,10 +559,15 @@ private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) {
new Runnable() {
public void run() {
if (!linkClose.isDone()) {
Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", MessageReceiver.this.receiveLink.getName(), ZonedDateTime.now()));
final Receiver link;
synchronized (errorCasesLock) {
link = MessageReceiver.this.receiveLink;
}

final Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", link.getName(), ZonedDateTime.now()));
if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
TRACE_LOGGER.log(Level.WARNING,
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, MessageReceiver.this.receiveLink.getName(), "Close"),
String.format(Locale.US, "receiverPath[%s], linkName[%s], %s call timedout", MessageReceiver.this.receivePath, link.getName(), "Close"),
operationTimedout);
}

Expand All @@ -567,25 +582,27 @@ public void run() {

@Override
public void onClose(ErrorCondition condition) {
if (condition == null || condition.getCondition() == null) {
this.onError((Exception) null);
} else {
this.onError(condition);
}
final Exception completionException = (condition != null && condition.getCondition() != null) ? ExceptionUtil.toException(condition) : null;
this.onError(completionException);
}

@Override
public ErrorContext getContext() {
final Receiver link;
synchronized (this.errorCasesLock) {
link = this.receiveLink;
}

final boolean isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone();
final String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
: ((this.receiveLink != null) ? this.receiveLink.getName() : null);
final String referenceId = link != null && link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)
? link.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString()
: ((link != null) ? link.getName() : null);

ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
final ReceiverContext errorContext = new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null,
this.receivePath,
referenceId,
isLinkOpened ? this.prefetchCount : null,
isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit() : null,
isLinkOpened && link != null ? link.getCredit() : null,
isLinkOpened && this.prefetchedMessages != null ? this.prefetchedMessages.size() : null);

return errorContext;
Expand Down Expand Up @@ -628,6 +645,13 @@ public void onEvent() {
return this.linkClose;
}

@Override
protected Exception getLastKnownError() {
synchronized (this.errorCasesLock) {
return this.lastKnownLinkError;
}
}

private final class ReceiveWork extends DispatchHandler {

@Override
Expand Down
Loading