Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of spurious wakeups in BufferManagerReadStream #41

Merged
merged 2 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.nio.ByteBuffer;

import com.sun.corba.ee.spi.orb.ORBData;
import com.sun.corba.ee.spi.transport.ByteBufferPool;
import com.sun.corba.ee.spi.orb.ORB;
import com.sun.corba.ee.spi.logging.ORBUtilSystemException;
Expand All @@ -21,6 +22,7 @@
import com.sun.corba.ee.spi.trace.Transport;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

import org.glassfish.pfl.tf.spi.annotation.InfoMethod;

Expand Down Expand Up @@ -79,6 +81,7 @@ private void underflowMessage(String msg, int rid) {
public ByteBuffer underflow(ByteBuffer byteBuffer) {

ByteBuffer result;
long startNanos = System.nanoTime();

synchronized (fragmentQueue) {

Expand All @@ -87,6 +90,11 @@ public ByteBuffer underflow(ByteBuffer byteBuffer) {
throw new RequestCanceledException(cancelReqId);
}

ORBData orbData = orb.getORBData();
int timeoutMillis = orbData.fragmentReadTimeout();
long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
long waitNanos = timeoutNanos;

while (fragmentQueue.size() == 0) {

if (endOfStream) {
Expand All @@ -95,12 +103,17 @@ public ByteBuffer underflow(ByteBuffer byteBuffer) {

boolean interrupted = false;
try {
fragmentQueue.wait(orb.getORBData().fragmentReadTimeout());
orbData.waitNanos(fragmentQueue, waitNanos);
} catch (InterruptedException e) {
interrupted = true;
}

if (!interrupted && fragmentQueue.size() == 0) {
// be robust against spurious wakeups: only throw timeout exception if time has really elapsed
// or if unable to measure elapsed time because the clock went backwards
long elapsedNanos = System.nanoTime() - startNanos;
waitNanos = elapsedNanos < 0 ? 0L : timeoutNanos - elapsedNanos;

if (!interrupted && waitNanos <= 0 && fragmentQueue.size() == 0) {
throw wrapper.bufferReadManagerTimeout();
}

Expand Down
6 changes: 6 additions & 0 deletions orbmain/src/main/java/com/sun/corba/ee/spi/orb/ORBData.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import org.glassfish.pfl.basic.contain.Pair ;

import java.util.concurrent.TimeUnit;

// Which attributes should become setters? NOT everything, but only
// those that we think might actually be useful to set. This may change
// over time. On the other hande, essentially everything should be readable.
Expand Down Expand Up @@ -381,6 +383,10 @@ public void setIIOPPrimaryToContactInfo(
@ManagedAttribute
@Description( "True if ORBD should not be used in this ORB instance")
public boolean disableORBD() ;

default void waitNanos(Object obj, long waitNanos) throws InterruptedException {
TimeUnit.NANOSECONDS.timedWait(obj, waitNanos);
}
}

// End of file.
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,17 @@ static abstract class ORBDataFake implements ORBData {

@Override
public int fragmentReadTimeout() {
if (asynchronousAction != null) asynchronousAction.exec();
return 1;
}

@Override
public void waitNanos(Object obj, long waitNanos) throws InterruptedException {
if (asynchronousAction != null) {
asynchronousAction.exec();
}
ORBData.super.waitNanos(obj, waitNanos);
}

@Override
public int getGIOPBuffMgrStrategy(GIOPVersion gv) {
return gv.equals(GIOPVersion.V1_0) ? BufferManagerFactory.GROW : BufferManagerFactory.STREAM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public int fragmentReadTimeout() {
public int getMaxReadByteBufferSizeThreshold() {
return 500;
}

@Override
public void waitNanos(Object obj, long waitNanos) throws InterruptedException {
ORBData.super.waitNanos(obj, waitNanos);
}
}

static abstract class OrbFake extends ORB {
Expand Down