Skip to content

Commit

Permalink
Update receive/send link creation logic and improve tracing (Azure#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjkwak authored Jan 2, 2019
1 parent 9d04a98 commit c37c848
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
package com.microsoft.azure.eventhubs.impl;

import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,18 +21,28 @@ public BaseLinkHandler(final AmqpLink amqpLink) {
@Override
public void onLinkLocalClose(Event event) {
final Link link = event.getLink();
final ErrorCondition condition = link.getCondition();

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s]", link.getName()));
TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s], errorCondition[%s], errorDescription[%s]",
link.getName(),
condition != null ? condition.getCondition() : "n/a",
condition != null ? condition.getDescription() : "n/a"));
}

closeSession(link);
closeSession(link, link.getCondition());
}

@Override
public void onLinkRemoteClose(Event event) {
final Link link = event.getLink();
final ErrorCondition condition = link.getCondition();

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s]", link.getName()));
TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s], errorCondition[%s], errorDescription[%s]",
link.getName(),
condition != null ? condition.getCondition() : "n/a",
condition != null ? condition.getDescription() : "n/a"));
}

handleRemoteLinkClosed(event);
Expand All @@ -44,8 +51,13 @@ public void onLinkRemoteClose(Event event) {
@Override
public void onLinkRemoteDetach(Event event) {
final Link link = event.getLink();
final ErrorCondition condition = link.getCondition();

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s]", link.getName()));
TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s], errorCondition[%s], errorDescription[%s]",
link.getName(),
condition != null ? condition.getCondition() : "n/a",
condition != null ? condition.getDescription() : "n/a"));
}

handleRemoteLinkClosed(event);
Expand All @@ -66,21 +78,34 @@ public void processOnClose(Link link, Exception exception) {
this.underlyingEntity.onError(exception);
}

private void closeSession(Link link) {
if (link.getSession() != null && link.getSession().getLocalState() != EndpointState.CLOSED)
link.getSession().close();
private void closeSession(Link link, ErrorCondition condition) {
final Session session = link.getSession();

if (session != null && session.getLocalState() != EndpointState.CLOSED) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("closeSession for linkName[%s], errorCondition[%s], errorDescription[%s]",
link.getName(),
condition != null ? condition.getCondition() : "n/a",
condition != null ? condition.getDescription() : "n/a"));
}

session.setCondition(condition);
session.close();
}
}

private void handleRemoteLinkClosed(final Event event) {
final Link link = event.getLink();

final ErrorCondition condition = link.getRemoteCondition();

if (link.getLocalState() != EndpointState.CLOSED) {
link.setCondition(condition);
link.close();
}

final ErrorCondition condition = link.getRemoteCondition();
this.processOnClose(link, condition);

closeSession(link);
this.closeSession(link, condition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

public class FaultTolerantObject<T extends IOObject> {

final Operation<T> openTask;
final Operation<Void> closeTask;
final Queue<OperationResult<T, Exception>> openCallbacks;
final Queue<OperationResult<Void, Exception>> closeCallbacks;
private final Operation<T> openTask;
private final Operation<Void> closeTask;
private final Queue<OperationResult<T, Exception>> openCallbacks;
private final Queue<OperationResult<Void, Exception>> closeCallbacks;

T innerObject;
boolean creatingNewInnerObject;
boolean closingInnerObject;
private T innerObject;
private boolean creatingNewInnerObject;
private boolean closingInnerObject;

public FaultTolerantObject(
final Operation<T> openAsync,
Expand All @@ -30,7 +30,7 @@ public FaultTolerantObject(
}

// should be invoked from reactor thread
public T unsafeGetIfOpened() {
T unsafeGetIfOpened() {

if (innerObject != null && innerObject.getState() == IOObject.IOObjectState.OPENED)
return innerObject;
Expand All @@ -47,29 +47,33 @@ public void runOnOpenedObject(
@Override
public void onEvent() {
if (!creatingNewInnerObject
&& (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED || innerObject.getState() == IOObject.IOObjectState.CLOSING)) {
&& (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED ||
innerObject.getState() == IOObject.IOObjectState.CLOSING)) {
creatingNewInnerObject = true;
openCallbacks.offer(openCallback);
openTask.run(new OperationResult<T, Exception>() {
@Override
public void onComplete(T result) {
creatingNewInnerObject = false;
innerObject = result;
for (OperationResult<T, Exception> callback : openCallbacks)
callback.onComplete(result);

openCallbacks.clear();
}

@Override
public void onError(Exception error) {
creatingNewInnerObject = false;
for (OperationResult<T, Exception> callback : openCallbacks)
callback.onError(error);

openCallbacks.clear();
}
});
try {
openCallbacks.offer(openCallback);
openTask.run(new OperationResult<T, Exception>() {
@Override
public void onComplete(T result) {
innerObject = result;
for (OperationResult<T, Exception> callback : openCallbacks)
callback.onComplete(result);

openCallbacks.clear();
}

@Override
public void onError(Exception error) {
for (OperationResult<T, Exception> callback : openCallbacks)
callback.onError(error);

openCallbacks.clear();
}
});
} finally {
creatingNewInnerObject = false;
}
} else if (innerObject != null && innerObject.getState() == IOObject.IOObjectState.OPENED) {
openCallback.onComplete(innerObject);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
private volatile CompletableFuture<?> closeTimer;
private int prefetchCount;
private Exception lastKnownLinkError;
private String linkCreationTime;

private MessageReceiver(final MessagingFactory factory,
final String name,
Expand Down Expand Up @@ -111,8 +112,8 @@ public void run() {
if (MessageReceiver.this.shouldScheduleOperationTimeoutTimer()) {
TimeoutTracker timeoutTracker = topWorkItem.getTimeoutTracker();

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs",
getClientId(),
Expand Down Expand Up @@ -203,7 +204,6 @@ public String getReceivePath() {
}

private CompletableFuture<MessageReceiver> createLink() {
this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
try {
this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() {
@Override
Expand Down Expand Up @@ -257,8 +257,8 @@ public CompletableFuture<Collection<Message>> receive(final int maxMessageCount)
}

if (this.shouldScheduleOperationTimeoutTimer()) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs",
this.getClientId(),
Expand Down Expand Up @@ -289,10 +289,10 @@ public void onOpenComplete(Exception exception) {
if (exception == null) {
if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
this.linkOpen.getWork().complete(this);
if (this.openTimer != null)
this.openTimer.cancel(false);
}

this.cancelOpenTimer();

if (this.getIsClosingOrClosed()) {
return;
}
Expand All @@ -307,7 +307,7 @@ public void onOpenComplete(Exception exception) {
this.sendFlow(this.prefetchCount - this.prefetchedMessages.size());

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]",
TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]",
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
}
} else {
Expand Down Expand Up @@ -346,15 +346,22 @@ public void onEvent() {
} else if (exception instanceof EventHubException && !((EventHubException) exception).getIsTransient()) {
this.cancelOpen(exception);
}
} else {
this.cancelOpenTimer();
}
}
}

private void cancelOpen(final Exception completionException) {
this.setClosed();
ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), completionException, this);
if (this.openTimer != null)
this.cancelOpenTimer();
}

private void cancelOpenTimer() {
if (this.openTimer != null && !this.openTimer.isCancelled()) {
this.openTimer.cancel(false);
}
}

@Override
Expand All @@ -378,7 +385,6 @@ public void onReceiveComplete(Delivery delivery) {
@Override
public void onError(final Exception exception) {
this.prefetchedMessages.clear();
this.underlyingFactory.deregisterForConnectionError(this.receiveLink);

if (this.getIsClosingOrClosed()) {
if (this.closeTimer != null)
Expand All @@ -396,6 +402,15 @@ public void onError(final Exception exception) {
"Entity(%s): client encountered transient error for unknown reasons, please retry the operation.", this.receivePath))
: exception;

if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], onError: %s",
this.getClientId(),
this.receivePath,
this.receiveLink.getName(),
completionException));
}

this.onOpenComplete(completionException);

final WorkItem<Collection<Message>> workItem = this.pendingReceives.peek();
Expand Down Expand Up @@ -456,10 +471,24 @@ private void scheduleOperationTimer(final TimeoutTracker tracker) {
}

private void createReceiveLink() {
if (creatingLink)
return;
synchronized (this.errorConditionLock) {
if (this.creatingLink) {
return;
}

this.creatingLink = true;
}

this.creatingLink = true;
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"clientId[%s], path[%s], operationTimeout[%s], creating a receive link",
this.getClientId(), this.receivePath, this.operationTimeout));
}

this.linkCreationTime = Instant.now().toString();

this.scheduleLinkOpenTimeout(TimeoutTracker.create(this.operationTimeout));

final Consumer<Session> onSessionOpen = new Consumer<Session>() {
@Override
Expand Down Expand Up @@ -499,6 +528,11 @@ public void accept(Session session) {

final ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this);
BaseHandler.setHandler(receiver, handler);

if (MessageReceiver.this.receiveLink != null) {
MessageReceiver.this.underlyingFactory.deregisterForConnectionError(MessageReceiver.this.receiveLink);
}

MessageReceiver.this.underlyingFactory.registerForConnectionError(receiver);

receiver.open();
Expand All @@ -512,10 +546,11 @@ public void accept(Session session) {
final BiConsumer<ErrorCondition, Exception> onSessionOpenFailed = new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition t, Exception u) {
if (t != null)
if (t != null) {
onError((t.getCondition() != null) ? ExceptionUtil.toException(t) : null);
else if (u != null)
} else if (u != null) {
onError(u);
}
}
};

Expand Down Expand Up @@ -588,6 +623,8 @@ private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) {
this.openTimer = timer.schedule(
new Runnable() {
public void run() {
creatingLink = false;

if (!linkOpen.getWork().isDone()) {
final Receiver link;
final Exception lastReportedLinkError;
Expand Down Expand Up @@ -679,6 +716,10 @@ private void operationTimeoutTimerFired() {

@Override
public void onClose(ErrorCondition condition) {
if (this.receiveLink != null) {
this.underlyingFactory.deregisterForConnectionError(MessageReceiver.this.receiveLink);
}

final Exception completionException = (condition != null && condition.getCondition() != null) ? ExceptionUtil.toException(condition) : null;
this.onError(completionException);
}
Expand Down Expand Up @@ -769,7 +810,6 @@ private final class CreateAndReceive extends DispatchHandler {

@Override
public void onEvent() {

receiveWork.onEvent();

if (!MessageReceiver.this.getIsClosingOrClosed()
Expand Down
Loading

0 comments on commit c37c848

Please sign in to comment.