diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 1c4a524795..a73b9aea53 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -1071,6 +1071,7 @@ private IpcPublication addIpcPublication( rawLog, publicationUnblockTimeoutNs, context.systemCounters(), + nanoClock, isExclusive); ipcPublications.add(publication); diff --git a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java index b07f9523d0..a885a27226 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java @@ -20,6 +20,7 @@ import io.aeron.logbuffer.LogBufferDescriptor; import io.aeron.logbuffer.LogBufferUnblocker; import org.agrona.collections.ArrayUtil; +import org.agrona.concurrent.NanoClock; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.concurrent.status.AtomicCounter; import org.agrona.concurrent.status.Position; @@ -50,12 +51,12 @@ enum Status private final int termWindowLength; private final int positionBitsToShift; private final int initialTermId; - private long tripLimit = 0; - private long consumerPosition = 0; - private long lastConsumerPosition = 0; - private long timeOfLastConsumerPositionChange = 0; - private long cleanPosition = 0; - private long timeOfLastStatusChange = 0; + private long tripLimit; + private long consumerPosition; + private long lastConsumerPosition; + private long timeOfLastConsumerPositionChangeNs; + private long cleanPosition; + private long timeOfLastStatusChangeNs; private int refCount = 0; private boolean reachedEndOfLife = false; private final boolean isExclusive; @@ -64,6 +65,7 @@ enum Status private ReadablePosition[] subscriberPositions = EMPTY_POSITIONS; private final Position publisherLimit; private final UnsafeBuffer metaDataBuffer; + private final NanoClock nanoClock; private final RawLog rawLog; private final AtomicCounter unblockedPublications; @@ -75,6 +77,7 @@ public IpcPublication( final RawLog rawLog, final long unblockTimeoutNs, final SystemCounters systemCounters, + final NanoClock nanoClock, final boolean isExclusive) { this.registrationId = registrationId; @@ -83,6 +86,7 @@ public IpcPublication( this.isExclusive = isExclusive; this.termBuffers = rawLog.termBuffers(); this.initialTermId = initialTermId(rawLog.metaData()); + this.nanoClock = nanoClock; final int termLength = rawLog.termLength(); this.termBufferLength = termLength; @@ -98,6 +102,7 @@ public IpcPublication( consumerPosition = producerPosition(); lastConsumerPosition = consumerPosition; cleanPosition = consumerPosition; + timeOfLastConsumerPositionChangeNs = nanoClock.nanoTime(); } public int sessionId() @@ -143,6 +148,11 @@ public void close() public void addSubscriber(final ReadablePosition subscriberPosition) { + if (subscriberPositions.length == 0) + { + timeOfLastConsumerPositionChangeNs = nanoClock.nanoTime(); + } + subscriberPositions = ArrayUtil.add(subscriberPositions, subscriberPosition); } @@ -236,13 +246,13 @@ public void onTimeEvent(final long timeNs, final long timeMs, final DriverConduc if (isDrained()) { status = Status.LINGER; - timeOfLastStatusChange = timeNs; + timeOfLastStatusChangeNs = timeNs; conductor.transitionToLinger(this); } break; case LINGER: - if (timeNs > (timeOfLastStatusChange + PUBLICATION_LINGER_NS)) + if (timeNs > (timeOfLastStatusChangeNs + PUBLICATION_LINGER_NS)) { reachedEndOfLife = true; conductor.cleanupIpcPublication(this); @@ -256,14 +266,14 @@ public boolean hasReachedEndOfLife() return reachedEndOfLife; } - public void timeOfLastStateChange(final long time) + public void timeOfLastStateChange(final long timeNs) { - timeOfLastStatusChange = time; + timeOfLastStatusChangeNs = timeNs; } public long timeOfLastStateChange() { - return timeOfLastStatusChange; + return timeOfLastStatusChangeNs; } public void delete() @@ -318,8 +328,8 @@ private void checkForBlockedPublisher(final long timeNs) { if (consumerPosition == lastConsumerPosition) { - if (producerPosition() > consumerPosition && - timeNs > (timeOfLastConsumerPositionChange + unblockTimeoutNs)) + if (timeNs > (timeOfLastConsumerPositionChangeNs + unblockTimeoutNs) && + producerPosition() > consumerPosition) { if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition)) { @@ -329,7 +339,7 @@ private void checkForBlockedPublisher(final long timeNs) } else { - timeOfLastConsumerPositionChange = timeNs; + timeOfLastConsumerPositionChangeNs = timeNs; lastConsumerPosition = consumerPosition; } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index b1b0bf46a3..1070a7aa4d 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -198,6 +198,7 @@ public NetworkPublication( lastSenderPosition = senderPosition.get(); cleanPosition = lastSenderPosition; + timeOfLastActivityNs = nowNs; } public void close() @@ -565,8 +566,8 @@ private void checkForBlockedPublisher(final long timeNs, final long senderPositi { if (senderPosition == lastSenderPosition) { - if (producerPosition() > senderPosition && - timeNs > (timeOfLastActivityNs + unblockTimeoutNs)) + if (timeNs > (timeOfLastActivityNs + unblockTimeoutNs) && + producerPosition() > senderPosition) { if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition)) {