Skip to content

Commit

Permalink
Update fetcher with global periodic RTO
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-ritik committed Jun 13, 2019
1 parent 0360ad9 commit 2067b9c
Showing 1 changed file with 76 additions and 54 deletions.
130 changes: 76 additions & 54 deletions src/net/named_data/jndn/util/SegmentFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public static class Options {
public double aiStep = 1.0;
// multiplicative decrease coefficient
public double mdCoef = 0.5;
// disable Conservative Window Adaptation
// interval for checking retransmission timer in millisecond
public int rtoCheckInterval = 10;
// disable Conservative Window Adaptation
public boolean disableCwa = false;
// reduce cwnd_ to initCwnd when loss event occurs
public boolean resetCwndToInit = false;
Expand All @@ -149,14 +151,13 @@ class PendingSegment {
public SegmentState state;
public long sendTime;
public long pendingInterestId;
public ScheduledFuture scheduledFuture;
public long rto;

public PendingSegment(SegmentState state, long sendTime, long pendingInterestId,
ScheduledFuture scheduledFuture) {
public PendingSegment(SegmentState state, long sendTime, long pendingInterestId, long rto) {
this.state = state;
this.sendTime = sendTime;
this.pendingInterestId = pendingInterestId;
this.scheduledFuture = scheduledFuture;
this.rto = rto;
}
}

Expand Down Expand Up @@ -349,17 +350,19 @@ public boolean verifySegment(Data data) {
}

private void construct(Interest baseInterest) {
scheduledThreadPoolExecutor_.setRemoveOnCancelPolicy(true);
scheduledThreadPoolExecutor_. setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
rttEstimator_ = new RttEstimator(options_.rttOptions);
cwnd_ = options_.initCwnd;
ssThresh_ = options_.initSsthresh;
timeLastSegmentReceived_ = System.currentTimeMillis();
fetchFirstSegment(baseInterest, false);
baseInterest_ = baseInterest;

fetchFirstSegment(false);

face_.callLater(options_.rtoCheckInterval, rtoTimeoutRunnable_);
}

private void fetchFirstSegment(Interest baseInterest, boolean isRetransmission) {
Interest interest = new Interest(baseInterest);
private void fetchFirstSegment(boolean isRetransmission) {
Interest interest = new Interest(baseInterest_);
interest.setCanBePrefix(true);
interest.setMustBeFresh(true);
interest.setInterestLifetimeMilliseconds(options_.interestLifetime);
Expand All @@ -380,7 +383,7 @@ private void fetchFirstSegment(Interest baseInterest, boolean isRetransmission)
}
}

private void fetchSegmentsInWindow(Interest originalInterest) {
private void fetchSegmentsInWindow() {
if (checkAllSegmentsReceived()) {
// All segments have been retrieved
finalizeFetch();
Expand Down Expand Up @@ -411,7 +414,7 @@ private void fetchSegmentsInWindow(Interest originalInterest) {

for (Map.Entry<Long, Boolean> segment : segmentsToRequest.entrySet()) {
// Start with the original Interest to preserve any special selectors.
Interest interest = new Interest(originalInterest);
Interest interest = new Interest(baseInterest_);
interest.setName(versionedDataName_.getPrefix(-1).appendSegment(segment.getKey()));
interest.setCanBePrefix(false);
interest.setMustBeFresh(false);
Expand All @@ -437,32 +440,22 @@ private void sendInterest(long segNum, final Interest interest, boolean isRetran
long pendingInterestId = face_.expressInterest(interest, this, this, this);
++nSegmentsInFlight_;

Runnable timeoutRunnable = new Runnable() {
@Override
public void run() {
OnRTO(interest);
}
};

ScheduledFuture scheduledFuture =scheduledThreadPoolExecutor_.schedule(timeoutRunnable, timeout, TimeUnit.MILLISECONDS);

if (isRetransmission) {
updateRetransmittedSegment(segNum, pendingInterestId, scheduledFuture);
updateRetransmittedSegment(segNum, pendingInterestId, timeout);
return;
}

pendingSegments_.put(segNum, new PendingSegment(SegmentState.FirstInterest,
System.currentTimeMillis() ,pendingInterestId, scheduledFuture));
System.currentTimeMillis() ,pendingInterestId, timeout));
highInterest_ = segNum;
}

private void updateRetransmittedSegment(long segNum, final long pendingInterestId, ScheduledFuture scheduledFuture) {
private void updateRetransmittedSegment(long segNum, final long pendingInterestId, long rtoTimeout) {
PendingSegment pendingSegmentIt = pendingSegments_.get(segNum);
pendingSegmentIt.state = SegmentState.Retransmitted;
pendingSegmentIt.pendingInterestId = pendingInterestId;
pendingSegmentIt.sendTime = System.currentTimeMillis();
pendingSegmentIt.scheduledFuture.cancel(false);
pendingSegmentIt.scheduledFuture = scheduledFuture;
pendingSegmentIt.rto = rtoTimeout;
}

private int getEstimatedRto() {
Expand Down Expand Up @@ -548,9 +541,6 @@ public void onData(final Interest originalInterest, Data data) {
pendingSegmentIt = findFirstEntry();
}

PendingSegment ps = pendingSegments_.get(pendingSegmentIt);
ps.scheduledFuture.cancel(false);

if (validatorKeyChain_ != null) {
try {
final SegmentFetcher thisSegmentFetcher = this;
Expand Down Expand Up @@ -674,7 +664,7 @@ private void onVerified(Data data, Interest originalInterest, long pendingSegmen
} else {
windowIncrease();
}
fetchSegmentsInWindow(originalInterest);
fetchSegmentsInWindow();
}

}
Expand Down Expand Up @@ -725,7 +715,7 @@ public void onNetworkNack(Interest interest, NetworkNack networkNack) {
switch (networkNack.getReason()) {
case DUPLICATE:
case CONGESTION:
afterNackOrTimeout(interest);
afterNack(interest);
break;
default:
try {
Expand Down Expand Up @@ -760,14 +750,61 @@ public void onTimeout(Interest interest) {
}
}

private void OnRTO(Interest interest) {
if (shouldStop()) return;
private Runnable rtoTimeoutRunnable_ = new Runnable() {
@Override
public void run() {
if (shouldStop()) return;

boolean hasTimeout = false;

afterNackOrTimeout(interest);
for (Map.Entry<Long, PendingSegment> entry : pendingSegments_.entrySet()) {
PendingSegment ps = entry.getValue();
if (ps.state != SegmentState.InRetxQueue) { // skip segments already in the retx queue
long timeElapsed = System.currentTimeMillis() - ps.sendTime;
if (timeElapsed > ps.rto) { // timer expired?
hasTimeout = true;
enqueueForRetransmission(entry.getKey());
}
}
}

if (hasTimeout) {
afterNackOrTimeout();
}

// schedule the next check after predefined interval
face_.callLater(options_.rtoCheckInterval, rtoTimeoutRunnable_ );

}
};

private boolean enqueueForRetransmission(Long segmentNumber) {
if (pendingSegments_.containsKey(segmentNumber)) {
// Cancel timeout event and set status to InRetxQueue
PendingSegment pendingSegmentIt = pendingSegments_.get(segmentNumber);
pendingSegmentIt.state = SegmentState.InRetxQueue;
nSegmentsInFlight_--;
} else return false;

if (receivedSegments_.size() != 0) {
retxQueue_.offer(segmentNumber);
}

return true;
}

private void afterNackOrTimeout() {
rttEstimator_.backoffRto();
if (receivedSegments_.size() == 0) {
// Resend first Interest (until maximum receive timeout exceeded)
fetchFirstSegment(true);
} else {
windowDecrease();
fetchSegmentsInWindow();
}
}

private void afterNackOrTimeout(Interest interest) {
private void afterNack(Interest interest) {
if (System.currentTimeMillis() >= timeLastSegmentReceived_ + options_.maxTimeout) {
// Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
try {
Expand Down Expand Up @@ -796,22 +833,10 @@ private void afterNackOrTimeout(Interest interest) {

rttEstimator_.backoffRto();

if (pendingSegments_.containsKey(currentSegment)) {
// Cancel timeout event and set status to InRetxQueue
PendingSegment pendingSegmentIt = pendingSegments_.get(currentSegment);
pendingSegmentIt.state = SegmentState.InRetxQueue;
pendingSegmentIt.scheduledFuture.cancel(false);
nSegmentsInFlight_--;
} else return;
if(!enqueueForRetransmission(currentSegment))
return;

if (receivedSegments_.size() == 0) {
// Resend first Interest (until maximum receive timeout exceeded)
fetchFirstSegment(interest, true);
} else {
windowDecrease();
retxQueue_.offer(currentSegment);
fetchSegmentsInWindow(interest);
}
afterNackOrTimeout();
}

public boolean isStopped() {
Expand Down Expand Up @@ -842,7 +867,6 @@ private boolean shouldStop() {
private void clean() {
pendingSegments_.clear(); // cancels pending Interests and timeout events
receivedSegments_.clear(); // remove the received segments
scheduledThreadPoolExecutor_.shutdownNow();
}

/**
Expand All @@ -862,12 +886,10 @@ private void clean() {
private final Face face_;
private RttEstimator rttEstimator_;

private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor_ =
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);

private long highData_ = 0;
private long recPoint_ = 0;
private long highInterest_ = 0;
private Interest baseInterest_;
private int nSegmentsInFlight_ = 0;
private long nSegments_ = -1;
private Map<Long, PendingSegment> pendingSegments_ = new HashMap();
Expand Down

0 comments on commit 2067b9c

Please sign in to comment.