diff --git a/src/net/named_data/jndn/util/RttEstimator.java b/src/net/named_data/jndn/util/RttEstimator.java new file mode 100644 index 00000000..74e96ae7 --- /dev/null +++ b/src/net/named_data/jndn/util/RttEstimator.java @@ -0,0 +1,136 @@ +package net.named_data.jndn.util; + +/** + * RTTEstimator is a utility class which uses Round-Trip times to calculates retransmission timeout + * + * This class implements the "Mean-Deviation" RTT estimator, as discussed in RFC 6298, + * with the modifications to RTO calculation described in RFC 7323 Appendix G. + */ + +public class RttEstimator { + + public static class Options { + + public double alpha = 0.125; // weight of exponential moving average for smoothed RTT + public double beta = 0.25; // weight of exponential moving average for RTT variation + public double initialRto = 1000.0; // initial RTO value in milliseconds + public double minRto = 200.0; // lower bound of RTO in milliseconds + public double maxRto = 60000.0; // upper bound of RTO in milliseconds + public int k = 4; // RTT variation multiplier used when calculating RTO + public int rtoBackoffMultiplier = 2; // RTO multiplier used in backoff operation + + } + + /** + * Creates an RTT estimator. + * + * Configures the RTT estimator with the default parameters. + */ + public RttEstimator() { + this(new Options()); + } + + /** + * Create an RTT Estimator + * + * Configures the RTT Estimator + * @param options_ Parameters for configuration. + */ + RttEstimator(Options options_) { + this.options_ = options_; + rto_ = options_.initialRto; + } + + /** + * Record a new RTT measurement. + * + * @param rtt the sampled RTT + * @param nExpectedSamples number of expected samples, must be greater than 0. + * It should be set to the current number of in-flight Interests. Please + * refer to Appendix G of RFC 7323 for details. + * NOTE: Do not call this function with RTT samples from retransmitted Interests + * (per Karn's algorithm). + */ + void + addMeasurement(double rtt, int nExpectedSamples) { + if (nRttSamples_ == 0) { // first measurement + sRtt_ = rtt; + rttVar_ = sRtt_ / 2; + } + else { + double alpha = options_.alpha / nExpectedSamples; + double beta = options_.beta / nExpectedSamples; + rttVar_ = (1 - beta) * rttVar_ + beta * Math.abs(sRtt_ - rtt); + sRtt_ = (1 - alpha) * sRtt_ + alpha * rtt; + } + + rto_ = sRtt_ + options_.k * rttVar_; + rto_ = clamp(rto_, options_.minRto, options_.maxRto); + + rttAvg_ = (nRttSamples_ * rttAvg_ + rtt) / (nRttSamples_ + 1); + rttMax_ = Math.max(rtt, rttMax_); + rttMin_ = Math.max(rtt, rttMin_); + nRttSamples_++; + } + + /** + * Backoff RTO by a factor of Options.rtoBackoffMultiplier. + */ + void + backoffRto() + { + rto_ = clamp(rto_ * options_.rtoBackoffMultiplier, + options_.minRto, options_.maxRto); + } + + + private static double clamp + (double val, double min, double max) { + return Math.max(min, Math.min(max, val)); + } + + /** + * Returns the estimated RTO value. + */ + double + getEstimatedRto() + { + return rto_; + } + + /** + * Returns the minimum RTT observed. + */ + double + getMinRtt() + { + return rttMin_; + } + + /** + * Returns the maximum RTT observed. + */ + double + getMaxRtt() + { + return rttMax_; + } + + /** + * Returns the average RTT. + */ + double + getAvgRtt() + { + return rttAvg_; + } + + private final Options options_; + private double sRtt_ = Double.NaN; // smoothed round-trip time + private double rttVar_ = Double.NaN; // round-trip time variation + private double rto_ = 0; // retransmission timeout + private double rttMin_ = Double.MAX_VALUE; + private double rttMax_ = Double.MIN_VALUE; + private double rttAvg_ = 0.0; + private long nRttSamples_ = 0; // number of RTT samples +} diff --git a/src/net/named_data/jndn/util/SegmentFetcher.java b/src/net/named_data/jndn/util/SegmentFetcher.java index a0497bec..2dbbe9c5 100644 --- a/src/net/named_data/jndn/util/SegmentFetcher.java +++ b/src/net/named_data/jndn/util/SegmentFetcher.java @@ -1,5 +1,6 @@ /** * Copyright (C) 2015-2019 Regents of the University of California. + * * @author: Jeff Thompson * @author: From ndn-cxx util/segment-fetcher https://github.com/named-data/ndn-cxx * @@ -22,50 +23,55 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; import java.util.logging.Level; import java.util.logging.Logger; import net.named_data.jndn.Data; import net.named_data.jndn.Face; import net.named_data.jndn.Interest; import net.named_data.jndn.Name; +import net.named_data.jndn.NetworkNack; import net.named_data.jndn.OnData; +import net.named_data.jndn.OnNetworkNack; import net.named_data.jndn.OnTimeout; import net.named_data.jndn.encoding.EncodingException; import net.named_data.jndn.security.KeyChain; -import net.named_data.jndn.security.OnVerified; import net.named_data.jndn.security.OnDataValidationFailed; +import net.named_data.jndn.security.OnVerified; +import net.named_data.jndn.security.ValidatorConfigError; +import net.named_data.jndn.security.v2.*; /** * SegmentFetcher is a utility class to fetch the latest version of segmented data. * - * SegmentFetcher assumes that the data is named /{prefix}/{version}/{segment}, + * SegmentFetcher assumes that segments in the object are named `///`, * where: - * - {prefix} is the specified name prefix, - * - {version} is an unknown version that needs to be discovered, and - * - {segment} is a segment number. (The number of segments is unknown and is - * controlled by the `FinalBlockId` field in at least the last Data packet. - * - * The following logic is implemented in SegmentFetcher: - * - * 1. Express the first Interest to discover the version: - * - * Interest: /{prefix}?ChildSelector=1&MustBeFresh=true + * - `` is the specified prefix, + * - `` is an unknown version that needs to be discovered, and + * - `` is a segment number (the number of segments in the object is unknown until a Data + * packet containing the `FinalBlockId` field is received). * - * 2. Infer the latest version of the Data: {version} = Data.getName().get(-2) + * SegmentFetcher implements the following logic: * - * 3. If the segment number in the retrieved packet == 0, go to step 5. + * 1. Express an Interest to discover the latest version of the object: * - * 4. Send an Interest for segment 0: + * Interest: `/?ndn.CanBePrefix=true&ndn.MustBeFresh=true` * - * Interest: /{prefix}/{version}/{segment=0} + * 2. Infer the latest version of the object: ` = Data.getName().get(-2)` * - * 5. Keep sending Interests for the next segment while the retrieved Data does - * not have a FinalBlockId or the FinalBlockId != Data.getName().get(-1). + * 3. Keep sending Interests for future segments until an error occurs or the number of segments + * indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start + * at segment 1 if segment 0 was received in response to the Interest expressed in step 2; + * otherwise, retrieval will start at segment 0. By default, congestion control will be used to + * manage the Interest window size. Interests expressed in this step will follow this Name + * format: * - * Interest: /{prefix}/{version}/{segment=(N+1))} + * Interest: `///` * - * 6. Call the OnComplete callback with a blob that concatenates the content + * 4. Call the OnComplete callback with a blob that concatenates the content * from all the segmented objects. * * If an error occurs during the fetching process, the OnError callback is called @@ -77,6 +83,7 @@ * - `SEGMENT_VERIFICATION_FAILED`: if any retrieved segment fails * the user-provided VerifySegment callback or KeyChain verifyData. * - `IO_ERROR`: for I/O errors when sending an Interest. + * - 'NACK_ERROR': Unknown network error occurred, * * In order to validate individual segments, a KeyChain needs to be supplied. * If verifyData fails, the fetching process is aborted with @@ -97,347 +104,775 @@ * ... * }}); */ -public class SegmentFetcher implements OnData, OnDataValidationFailed, OnTimeout { - public enum ErrorCode { - INTEREST_TIMEOUT, - DATA_HAS_NO_SEGMENT, - SEGMENT_VERIFICATION_FAILED, - IO_ERROR - } - - public interface OnComplete { - void onComplete(Blob content); - } - - public interface VerifySegment { - boolean verifySegment(Data data); - } - - public interface OnError { - void onError(ErrorCode errorCode, String message); - } - - /** - * DontVerifySegment may be used in fetch to skip validation of Data packets. - */ - public static final VerifySegment DontVerifySegment = new VerifySegment() { - public boolean verifySegment(Data data) { - return true; - }}; - - /** - * Initiate segment fetching. For more details, see the documentation for - * the class. - * @param face This calls face.expressInterest to fetch more segments. - * @param baseInterest An Interest for the initial segment of the requested - * data, where baseInterest.getName() has the name prefix. - * This interest may include a custom InterestLifetime and selectors that will - * propagate to all subsequent Interests. The only exception is that the - * initial Interest will be forced to include selectors "ChildSelector=1" and - * "MustBeFresh=true" which will be turned off in subsequent Interests. - * @param verifySegment When a Data packet is received this calls - * verifySegment.verifySegment(data). If it returns false then abort fetching - * and call onError.onError with ErrorCode.SEGMENT_VERIFICATION_FAILED. If - * data validation is not required, use DontVerifySegment. - * NOTE: The library will log any exceptions thrown by this callback, but for - * better error handling the callback should catch and properly handle any - * exceptions. - * @param onComplete When all segments are received, call - * onComplete.onComplete(content) where content is the concatenation of the - * content of all the segments. - * NOTE: The library will log any exceptions thrown by this callback, but for - * better error handling the callback should catch and properly handle any - * exceptions. - * @param onError Call onError.onError(errorCode, message) for timeout or an - * error processing segments. - * NOTE: The library will log any exceptions thrown by this callback, but for - * better error handling the callback should catch and properly handle any - * exceptions. - */ - public static void - fetch - (Face face, Interest baseInterest, VerifySegment verifySegment, - OnComplete onComplete, OnError onError) - { - new SegmentFetcher(face, null, verifySegment, onComplete, onError) - .fetchFirstSegment(baseInterest); - } - - /** - * Initiate segment fetching. For more details, see the documentation for - * the class. - * @param face This calls face.expressInterest to fetch more segments. - * @param baseInterest An Interest for the initial segment of the requested - * data, where baseInterest.getName() has the name prefix. - * This interest may include a custom InterestLifetime and selectors that will - * propagate to all subsequent Interests. The only exception is that the - * initial Interest will be forced to include selectors "ChildSelector=1" and - * "MustBeFresh=true" which will be turned off in subsequent Interests. - * @param validatorKeyChain When a Data packet is received this calls - * validatorKeyChain.verifyData(data). If validation fails then abort - * fetching and call onError with SEGMENT_VERIFICATION_FAILED. This does not - * make a copy of the KeyChain; the object must remain valid while fetching. - * If validatorKeyChain is null, this does not validate the data packet. - * @param onComplete When all segments are received, call - * onComplete.onComplete(content) where content is the concatenation of the - * content of all the segments. - * NOTE: The library will log any exceptions thrown by this callback, but for - * better error handling the callback should catch and properly handle any - * exceptions. - * @param onError Call onError.onError(errorCode, message) for timeout or an - * error processing segments. - * NOTE: The library will log any exceptions thrown by this callback, but for - * better error handling the callback should catch and properly handle any - * exceptions. - */ - public static void - fetch +public class SegmentFetcher implements OnData, OnDataValidationFailed, OnTimeout, OnNetworkNack { + + public static class Options { + + // if true, window size is kept at `initCwnd` + public boolean useConstantCwnd = false; + // lifetime of sent Interests in milliseconds - independent of Interest timeout + public int interestLifetime = 4000; + // initial congestion window size + public double initCwnd = 1.0; + // maximum allowed time between successful receipt of segments in millisecond + public int maxTimeout = 60000; + // initial slow start threshold + public double initSsthresh = Double.MAX_VALUE; + // additive increase step (in segments) + public double aiStep = 1.0; + // multiplicative decrease coefficient + public double mdCoef = 0.5; + // disable Conservative Window Adaptation + public boolean disableCwa = false; + // reduce cwnd_ to initCwnd when loss event occurs + public boolean resetCwndToInit = false; + // disable window decrease after congestion mark received + public boolean ignoreCongMarks = false; + // max window size for sending interests + public int maxWindowSize = Integer.MAX_VALUE; + // if true, Interest timeout is kept at `maxTimeout` + public boolean useConstantInterestTimeout = false; + // options for RTT estimator + public RttEstimator.Options rttOptions = new RttEstimator.Options(); + + } + + private enum SegmentState { + FirstInterest, // the first Interest for this segment has been sent + InRetxQueue, // the segment is awaiting Interest retransmission + Retransmitted // one or more retransmitted Interests have been sent for this segment + } + + class PendingSegment { + + public SegmentState state; + public long sendTime; + public long pendingInterestId; + public boolean inFlight; + + public PendingSegment(SegmentState state, long sendTime, long pendingInterestId, boolean inFlight) { + this.state = state; + this.sendTime = sendTime; + this.pendingInterestId = pendingInterestId; + this.inFlight = inFlight; + } + } + + public enum ErrorCode { + INTEREST_TIMEOUT, + DATA_HAS_NO_SEGMENT, + SEGMENT_VERIFICATION_FAILED, + IO_ERROR, + NACK_ERROR + } + + public interface OnComplete { + void onComplete(Blob content); + } + + public interface VerifySegment { + boolean verifySegment(Data data); + } + + public interface OnError { + void onError(SegmentFetcher.ErrorCode errorCode, String message); + } + + /** + * DontVerifySegment may be used in fetch to skip validation of Data packets. + */ + public static final SegmentFetcher.VerifySegment DontVerifySegment = new SegmentFetcher.VerifySegment() { + public boolean verifySegment(Data data) { + return true; + } + }; + + /** + * Initiate segment fetching. For more details, see the documentation for + * the class. + * @param face This calls face.expressInterest to fetch more segments. + * @param baseInterest Interest for the initial segment of requested data. + * This interest may include a custom InterestLifetime and parameters that + * will propagate to all subsequent Interests. The only exception is that the + * initial Interest will be forced to include the "CanBePrefix=true" and + * "MustBeFresh=true" parameters, which will not be included in subsequent + * interests. + * @param options A set of options to control the sending and receiving of packets + * in the AIMD pipelining. + * @param verifySegment When a Data packet is received this calls + * verifySegment.verifySegment(data). If it returns false then abort fetching + * and call onError.onError with ErrorCode.SEGMENT_VERIFICATION_FAILED. If + * data validation is not required, use DontVerifySegment. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + * @param onComplete When all segments are received, call + * onComplete.onComplete(content) where content is the concatenation of the + * content of all the segments. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + * @param onError Call onError.onError(errorCode, message) for timeout or an + * error processing segments. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + */ + public static void fetch + (Face face, Interest baseInterest, Options options, SegmentFetcher.VerifySegment verifySegment, + SegmentFetcher.OnComplete onComplete, SegmentFetcher.OnError onError) { + new SegmentFetcher(face, null, options, null, verifySegment, onComplete, onError) + .construct(baseInterest); + } + + public static void fetch + (Face face, Interest baseInterest, SegmentFetcher.VerifySegment verifySegment, + SegmentFetcher.OnComplete onComplete, SegmentFetcher.OnError onError) { + fetch(face, baseInterest, new Options(), verifySegment, onComplete, onError); + } + + /** + * Initiate segment fetching. For more details, see the documentation for + * the class. + * @param face This calls face.expressInterest to fetch more segments. + * @param baseInterest Interest for the initial segment of requested data. + * This interest may include a custom InterestLifetime and parameters that + * will propagate to all subsequent Interests. The only exception is that the + * initial Interest will be forced to include the "CanBePrefix=true" and + * "MustBeFresh=true" parameters, which will not be included in subsequent + * interests. + * @param options A set of options to control the sending and receiving of packets + * in the AIMD pipelining. + * @param validatorKeyChain When a Data packet is received this calls + * validatorKeyChain.verifyData(data). If validation fails then abort + * fetching and call onError with SEGMENT_VERIFICATION_FAILED. This does not + * make a copy of the KeyChain; the object must remain valid while fetching. + * If validatorKeyChain is null, this does not validate the data packet. + * @param onComplete When all segments are received, call + * onComplete.onComplete(content) where content is the concatenation of the + * content of all the segments. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + * @param onError Call onError.onError(errorCode, message) for timeout or an + * error processing segments. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + */ + public static void fetch + (Face face, Interest baseInterest, Options options, KeyChain validatorKeyChain, + SegmentFetcher.OnComplete onComplete, SegmentFetcher.OnError onError) { + new SegmentFetcher + (face, validatorKeyChain, options, null, DontVerifySegment, onComplete, onError) + .construct(baseInterest); + } + + public static void fetch (Face face, Interest baseInterest, KeyChain validatorKeyChain, - OnComplete onComplete, OnError onError) - { - new SegmentFetcher - (face, validatorKeyChain, DontVerifySegment, onComplete, onError) - .fetchFirstSegment(baseInterest); - } - - /** - * Create a new SegmentFetcher to use the Face. See the static fetch method - * for details. If validatorKeyChain is not null, use it and ignore - * verifySegment. After creating the SegmentFetcher, call fetchFirstSegment. - * @param face This calls face.expressInterest to fetch more segments. - * @param validatorKeyChain If this is not null, use its verifyData instead of - * the verifySegment callback. - * @param verifySegment When a Data packet is received this calls - * verifySegment.verifySegment(data). If it returns false then abort fetching - * and call onError.onError with ErrorCode.SEGMENT_VERIFICATION_FAILED. - * @param onComplete When all segments are received, call - * onComplete.onComplete(content) where content is the concatenation of the - * content of all the segments. - * @param onError Call onError.onError(errorCode, message) for timeout or an - * error processing segments. - */ - private SegmentFetcher - (Face face, KeyChain validatorKeyChain, VerifySegment verifySegment, - OnComplete onComplete, OnError onError) - { - face_ = face; - validatorKeyChain_ = validatorKeyChain; - verifySegment_ = verifySegment; - onComplete_ = onComplete; - onError_ = onError; - } - - private void - fetchFirstSegment(Interest baseInterest) - { - Interest interest = new Interest(baseInterest); - interest.setChildSelector(1); - interest.setMustBeFresh(true); - - try { - face_.expressInterest(interest, this, this); - } catch (IOException ex) { - try { - onError_.onError - (ErrorCode.IO_ERROR, "I/O error fetching the first segment " + ex); - } catch (Throwable exception) { - logger_.log(Level.SEVERE, "Error in onError", exception); - } - } - } - - private void - fetchNextSegment(Interest originalInterest, Name dataName, long segment) - { - // Start with the original Interest to preserve any special selectors. - Interest interest = new Interest(originalInterest); - // Changing a field clears the nonce so that the library will generate a new one. - interest.setChildSelector(0); - interest.setMustBeFresh(false); - interest.setName(dataName.getPrefix(-1).appendSegment(segment)); - try { - face_.expressInterest(interest, this, this); - } catch (IOException ex) { - try { - onError_.onError - (ErrorCode.IO_ERROR, "I/O error fetching the next segment " + ex); - } catch (Throwable exception) { - logger_.log(Level.SEVERE, "Error in onError", exception); - } - } - } - - public void - onData(final Interest originalInterest, Data data) - { - if (validatorKeyChain_ != null) { - try { - final SegmentFetcher thisSegmentFetcher = this; - validatorKeyChain_.verifyData - (data, - new OnVerified() { - public void onVerified(Data localData) { - thisSegmentFetcher.onVerified(localData, originalInterest); - } - }, - this); - } catch (Throwable ex) { + SegmentFetcher.OnComplete onComplete, SegmentFetcher.OnError onError) { + fetch(face, baseInterest, new Options(), validatorKeyChain, onComplete, + onError); + } + + /** + * Initiate segment fetching. For more details, see the documentation for + * the class. + * @param face This calls face.expressInterest to fetch more segments. + * @param baseInterest Interest for the initial segment of requested data. + * This interest may include a custom InterestLifetime and parameters that + * will propagate to all subsequent Interests. The only exception is that the + * initial Interest will be forced to include the "CanBePrefix=true" and + * "MustBeFresh=true" parameters, which will not be included in subsequent + * interests. + * @param options A set of options to control the sending and receiving of packets + * in the AIMD pipelining. + * @param validator The Validator, the fetcher will use to validate data. + * The caller must ensure the validator remains valid until either #onComplete + * or #onError has been signaled. + * @param onComplete When all segments are received, call + * onComplete.onComplete(content) where content is the concatenation of the + * content of all the segments. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + * @param onError Call onError.onError(errorCode, message) for timeout or an + * error processing segments. + * NOTE: The library will log any exceptions thrown by this callback, but for + * better error handling the callback should catch and properly handle any + * exceptions. + */ + public static void fetch + (Face face, Interest baseInterest, Options options, Validator validator, + SegmentFetcher.OnComplete onComplete, SegmentFetcher.OnError onError) { + new SegmentFetcher(face, null, options, validator, null, onComplete, onError) + .construct(baseInterest); + } + + public static void fetch + (Face face, Interest baseInterest, Validator validator, + SegmentFetcher.OnComplete onComplete, SegmentFetcher.OnError onError) { + new SegmentFetcher(face, null, new Options(), validator, null, onComplete, onError) + .construct(baseInterest); + } + + /** + * Create a new SegmentFetcher to use the Face. See the static fetch method + * for details. If validatorKeyChain is not null, use it and ignore + * verifySegment. After creating the SegmentFetcher, call fetchFirstSegment. + * + * @param face This calls face.expressInterest to fetch more segments. + * @param validatorKeyChain If this is not null, use its verifyData instead of + * the verifySegment callback. + * @param verifySegment When a Data packet is received this calls + * verifySegment.verifySegment(data). If it returns false then abort fetching + * and call onError.onError with ErrorCode.SEGMENT_VERIFICATION_FAILED. + * @param onComplete When all segments are received, call + * onComplete.onComplete(content) where content is the concatenation of the + * content of all the segments. + * @param onError Call onError.onError(errorCode, message) for timeout or an + * error processing segments. + */ + private SegmentFetcher + (Face face, KeyChain validatorKeyChain, Options options, Validator validator, + SegmentFetcher.VerifySegment verifySegment, SegmentFetcher.OnComplete onComplete, + SegmentFetcher.OnError onError) { + this.options_ = options; + face_ = face; + validator_ = validator; + validatorKeyChain_ = validatorKeyChain; + verifySegment_ = verifySegment; + onComplete_ = onComplete; + onError_ = onError; + } + + private void construct(Interest baseInterest) { + rttEstimator_ = new RttEstimator(options_.rttOptions); + cwnd_ = options_.initCwnd; + ssThresh_ = options_.initSsthresh; + timeLastSegmentReceived_ = System.currentTimeMillis(); + fetchFirstSegment(baseInterest, false); + } + + private void fetchFirstSegment(Interest baseInterest, boolean isRetransmission) { + Interest interest = new Interest(baseInterest); + interest.setCanBePrefix(true); + interest.setMustBeFresh(true); + if (isRetransmission) { + interest.refreshNonce(); + } + try { - onError_.onError - (ErrorCode.SEGMENT_VERIFICATION_FAILED, - "Error in KeyChain.verifyData " + ex.getMessage()); - } catch (Throwable ex2) { - logger_.log(Level.SEVERE, "Error in onError", ex2); + sendInterest(0, interest, isRetransmission); + + } catch (IOException ex) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.IO_ERROR, "I/O error fetching the first segment " + ex); + } catch (Throwable exception) { + logger_.log(Level.SEVERE, "Error in onError", exception); + } + } + } + + private void fetchSegmentsInWindow(Interest originalInterest) { + if (checkAllSegmentsReceived()) { + // All segments have been retrieved + finalizeFetch(); + return; } - } - } - else { - boolean verified = false; - try { - verified = verifySegment_.verifySegment(data); - } catch (Throwable ex) { - logger_.log(Level.SEVERE, "Error in verifySegment", ex); - } - if (!verified) { + + double availableWindowSize = cwnd_ - nSegmentsInFlight_; + Map segmentsToRequest = new HashMap(); // The boolean indicates whether a retx or not + + while (availableWindowSize > 0) { + + if (!retxQueue_.isEmpty()) { + Long key = retxQueue_.element(); + retxQueue_.remove(); + segmentsToRequest.put(key, true); + } else if (nSegments_ == -1 || nextSegmentNum_ < nSegments_) { + if (receivedSegments_.containsKey(nextSegmentNum_)) { + // Don't request a segment a second time if received in response to first "discovery" Interest + nextSegmentNum_++; + continue; + } + if (allSegmentSent_) { + break; + } + segmentsToRequest.put(nextSegmentNum_++, false); + } else { + break; + } + availableWindowSize--; + } + + for (Map.Entry segment : segmentsToRequest.entrySet()) { + // Start with the original Interest to preserve any special selectors. + Interest interest = new Interest(originalInterest); + interest.setName(versionedDataName_.getPrefix(-1).appendSegment(segment.getKey())); + interest.setCanBePrefix(false); + interest.setMustBeFresh(false); + interest.refreshNonce(); + + try { + sendInterest(segment.getKey(), interest, segment.getValue()); + } catch (IOException ex) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.IO_ERROR, "I/O error fetching the next segment " + ex); + } catch (Throwable exception) { + logger_.log(Level.SEVERE, "Error in onError", exception); + } + } + } + } + + private void sendInterest(long segNum, final Interest interest, boolean isRetransmission) throws IOException { + int timeout = options_.useConstantInterestTimeout ? options_.interestLifetime : getEstimatedRto(); + interest.setInterestLifetimeMilliseconds(timeout); + + long pendingInterestId = face_.expressInterest(interest, this, this, this); + ++nSegmentsInFlight_; + + if (isRetransmission) { + updateRetransmittedSegment(segNum, pendingInterestId); + return; + } + + pendingSegments_.put(segNum, new PendingSegment(SegmentState.FirstInterest, + System.currentTimeMillis() ,pendingInterestId, true)); + highInterest_ = segNum; + } + + private void updateRetransmittedSegment(long segNum, final long pendingInterestId) { + PendingSegment pendingSegmentIt = pendingSegments_.get(segNum); + pendingSegmentIt.state = SegmentState.Retransmitted; + pendingSegmentIt.pendingInterestId = pendingInterestId; + pendingSegmentIt.sendTime = System.currentTimeMillis(); + pendingSegmentIt.inFlight = true; + } + + private int getEstimatedRto() { + // We don't want an Interest timeout greater than the maximum allowed timeout between the + // successful receipt of segments + return Math.min(options_.maxTimeout, (int) rttEstimator_.getEstimatedRto()); + } + + private Long findFirstEntry() { + Map.Entry o = (Map.Entry) pendingSegments_.entrySet().toArray()[0]; + return o.getKey(); + } + + private boolean checkAllSegmentsReceived() { + boolean haveReceivedAllSegments = false; + + if (nSegments_ != -1 && receivedSegments_.size() >= nSegments_) { + haveReceivedAllSegments = true; + // Verify that all segments in window have been received. If not, send Interests for missing segments. + for (long i = 0; i < nSegments_; i++) { + if (!receivedSegments_.containsKey(i)) { + retxQueue_.offer(i); + return false; + } + } + } + return haveReceivedAllSegments; + } + + private void finalizeFetch() { + // We are finished. + // Get the total size and concatenate to get content. + int totalSize = 0; + for (long i = 0; i < nSegments_; ++i) { + totalSize += (receivedSegments_.get(i)).size(); + } + + ByteBuffer content = ByteBuffer.allocate(totalSize); + for (long i = 0; i < nSegments_; ++i) { + if (receivedSegments_.get(i).size() != 0) + content.put((receivedSegments_.get(i)).buf()); + } + content.flip(); + clean(); + try { - onError_.onError - (ErrorCode.SEGMENT_VERIFICATION_FAILED, "Segment verification failed"); + onComplete_.onComplete(new Blob(content, false)); } catch (Throwable ex) { - logger_.log(Level.SEVERE, "Error in onError", ex); + logger_.log(Level.SEVERE, "Error in onComplete", ex); } - return; - } - - onVerified(data, originalInterest); - } - } - - public void - onVerified(Data data, Interest originalInterest) - { - if (!endsWithSegmentNumber(data.getName())) { - // We don't expect a name without a segment number. Treat it as a bad packet. - try { - onError_.onError - (ErrorCode.DATA_HAS_NO_SEGMENT, - "Got an unexpected packet without a segment number: " + data.getName().toUri()); - } catch (Throwable ex) { - logger_.log(Level.SEVERE, "Error in onError", ex); - } - } - else { - long currentSegment; - try { - currentSegment = data.getName().get(-1).toSegment(); - } - catch (EncodingException ex) { + } + + public void onData(final Interest originalInterest, Data data) { + if (shouldStop()) return; + + nSegmentsInFlight_--; + Name.Component currentSegmentComponent = data.getName().get(-1); + if (!currentSegmentComponent.isSegment()) { + onError_.onError + (SegmentFetcher.ErrorCode.DATA_HAS_NO_SEGMENT, "Data Name has no segment number"); + return; + } + + long currentSegment; try { - onError_.onError - (ErrorCode.DATA_HAS_NO_SEGMENT, - "Error decoding the name segment number " + - data.getName().get(-1).toEscapedString() + ": " + ex); - } catch (Throwable exception) { - logger_.log(Level.SEVERE, "Error in onError", exception); + currentSegment = currentSegmentComponent.toSegment(); + } catch (EncodingException e) { + onError_.onError(SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED, "Data Name has no segment number"); + e.printStackTrace(); + return; + } + + // The first received Interest could have any segment ID + final long pendingSegmentIt; + if (receivedSegments_.size() > 0) { + if (receivedSegments_.containsKey(currentSegment)) + return; + pendingSegmentIt = currentSegment; + } else { + pendingSegmentIt = findFirstEntry(); } - return; - } - - long expectedSegmentNumber = contentParts_.size(); - if (currentSegment != expectedSegmentNumber) { - // Try again to get the expected segment. This also includes the case - // where the first segment is not segment 0. - fetchNextSegment(originalInterest, data.getName(), expectedSegmentNumber); - } - else { - // Save the content and check if we are finished. - contentParts_.add(data.getContent()); - - if (data.getMetaInfo().getFinalBlockId().getValue().size() > 0) { - long finalSegmentNumber; - try { - finalSegmentNumber = data.getMetaInfo().getFinalBlockId().toSegment(); - } - catch (EncodingException ex) { + + PendingSegment ps = pendingSegments_.get(pendingSegmentIt); + ps.inFlight = false; + + if (validatorKeyChain_ != null) { try { - onError_.onError - (ErrorCode.DATA_HAS_NO_SEGMENT, - "Error decoding the FinalBlockId segment number " + - data.getMetaInfo().getFinalBlockId().toEscapedString() + ": " + ex); - } catch (Throwable exception) { - logger_.log(Level.SEVERE, "Error in onError", exception); + final SegmentFetcher thisSegmentFetcher = this; + validatorKeyChain_.verifyData + (data, + new OnVerified() { + public void onVerified(Data localData) { + thisSegmentFetcher.onVerified(localData, originalInterest, pendingSegmentIt); + } + }, + this); + } catch (Throwable ex) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED, + "Error in KeyChain.verifyData " + ex.getMessage()); + } catch (Throwable ex2) { + logger_.log(Level.SEVERE, "Error in onError", ex2); + } } + } else if(validator_ != null){ + try { + validator_.validate(data, new DataValidationSuccessCallback() { + @Override + public void successCallback(Data data) { + onVerified(data, originalInterest, pendingSegmentIt); + } + }, new DataValidationFailureCallback() { + @Override + public void failureCallback(Data data, ValidationError error) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED, + "Segment verification failed"); + } catch (Throwable ex) { + logger_.log(Level.SEVERE, "Error in onError", ex); + } + } + }); + } catch (CertificateV2.Error | ValidatorConfigError error) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED, + "Error in KeyChain.verifyData " + error.getMessage()); + } catch (Throwable ex2) { + logger_.log(Level.SEVERE, "Error in onError", ex2); + } + } + } + else { + boolean verified = false; + try { + verified = verifySegment_.verifySegment(data); + } catch (Throwable ex) { + logger_.log(Level.SEVERE, "Error in verifySegment", ex); + } + if (!verified) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED, "Segment verification failed"); + } catch (Throwable ex) { + logger_.log(Level.SEVERE, "Error in onError", ex); + } + return; + } + onVerified(data, originalInterest, pendingSegmentIt); + } + } + + private void onVerified(Data data, Interest originalInterest, long pendingSegmentIt) { + if (shouldStop()) return; + + if (!endsWithSegmentNumber(data.getName())) { + // We don't expect a name without a segment number. Treat it as a bad packet. + try { + onError_.onError + (SegmentFetcher.ErrorCode.DATA_HAS_NO_SEGMENT, + "Got an unexpected packet without a segment number: " + data.getName().toUri()); + } catch (Throwable ex) { + logger_.log(Level.SEVERE, "Error in onError", ex); + } + } else { + long currentSegment; + try { + // It was verified in onData that the last Data name component is a segment number + currentSegment = data.getName().get(-1).toSegment(); + } catch (EncodingException ex) { + try { + onError_.onError + (SegmentFetcher.ErrorCode.DATA_HAS_NO_SEGMENT, + "Error decoding the name segment number " + + data.getName().get(-1).toEscapedString() + ": " + ex); + } catch (Throwable exception) { + logger_.log(Level.SEVERE, "Error in onError", exception); + } + return; + } + + // We update the last receive time here instead of in the segment received callback so that the + // transfer will not fail to terminate if we only received invalid Data packets. + timeLastSegmentReceived_ = System.currentTimeMillis(); + + if (pendingSegments_.get(pendingSegmentIt).state == SegmentState.FirstInterest) { + rttEstimator_.addMeasurement(timeLastSegmentReceived_ - pendingSegments_.get(pendingSegmentIt).sendTime, + Math.max(nSegmentsInFlight_ + 1, 1)); + } + + // Remove from pending segments map + pendingSegments_.remove(pendingSegmentIt); + + // Copy data in segment to temporary buffer + receivedSegments_.put(currentSegment, data.getContent()); + + if (receivedSegments_.size() == 1) { + versionedDataName_ = data.getName(); + if (currentSegment == 0) { + // We received the first segment in response, so we can increment the next segment number + nextSegmentNum_++; + } + } + + if (data.getMetaInfo().getFinalBlockId().getValue().size() > 0) { + try { + nSegments_ = data.getMetaInfo().getFinalBlockId().toSegment() + 1; + } catch (EncodingException ex) { + try { + onError_.onError + (ErrorCode.DATA_HAS_NO_SEGMENT, + "Error decoding the FinalBlockId segment number " + + data.getMetaInfo().getFinalBlockId().toEscapedString() + ": " + ex); + } catch (Throwable exception) { + logger_.log(Level.SEVERE, "Error in onError", exception); + } + return; + } + } + + if (highData_ < currentSegment) { + highData_ = currentSegment; + } + + if (data.getCongestionMark() > 0 && !options_.ignoreCongMarks) { + windowDecrease(); + } else { + windowIncrease(); + } + fetchSegmentsInWindow(originalInterest); + } + + } + + private void windowIncrease() { + if (options_.useConstantCwnd || cwnd_ == options_.maxWindowSize) { return; - } + } - if (currentSegment == finalSegmentNumber) { - // We are finished. + if (cwnd_ < ssThresh_) { + cwnd_ += options_.aiStep; // additive increase + } else { + cwnd_ += options_.aiStep / cwnd_; // congestion avoidance + } + } + + private void windowDecrease() { + if (options_.disableCwa || highData_ > recPoint_) { + recPoint_ = highInterest_; + + if (options_.useConstantCwnd) { + return; + } + + // Refer to RFC 5681, Section 3.1 for the rationale behind the code below + ssThresh_ = Math.max(MIN_SSTHRESH, cwnd_ * options_.mdCoef); // multiplicative decrease + cwnd_ = options_.resetCwndToInit ? options_.initCwnd : ssThresh_; + } + } + + public void onDataValidationFailed(Data data, String reason) { + if (shouldStop()) return; + + try { + onError_.onError + (SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED, + "Segment verification failed for " + data.getName().toUri() + + " . Reason: " + reason); + } catch (Throwable ex) { + logger_.log(Level.SEVERE, "Error in onError", ex); + } + } - // Get the total size and concatenate to get content. - int totalSize = 0; - for (int i = 0; i < contentParts_.size(); ++i) - totalSize += ((Blob)contentParts_.get(i)).size(); - ByteBuffer content = ByteBuffer.allocate(totalSize); - for (int i = 0; i < contentParts_.size(); ++i) - content.put(((Blob)contentParts_.get(i)).buf()); - content.flip(); + @Override + public void onNetworkNack(Interest interest, NetworkNack networkNack) { + if (shouldStop()) return; + + switch (networkNack.getReason()) { + case DUPLICATE: + case CONGESTION: + afterNackOrTimeout(interest); + break; + default: + try { + onError_.onError + (ErrorCode.NACK_ERROR, + "Nack Error"); + } catch (Throwable ex) { + logger_.log(Level.SEVERE, "Error in onError", ex); + } + break; + } + } + + public void onTimeout(Interest interest) { + if (shouldStop()) return; + + long segNum; + try { + segNum = interest.getName().get(-1).toSegment(); + } catch (EncodingException e) { + e.printStackTrace(); + return; + } + if(pendingSegments_.containsKey(segNum) && pendingSegments_.get(segNum).inFlight){ + afterNackOrTimeout(interest); + } + } + private void afterNackOrTimeout(Interest interest) { + if (System.currentTimeMillis() >= timeLastSegmentReceived_ + options_.maxTimeout) { + // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments try { - onComplete_.onComplete(new Blob(content, false)); + onError_.onError + (ErrorCode.INTEREST_TIMEOUT, + "Time out for interest " + interest.getName().toUri()); } catch (Throwable ex) { - logger_.log(Level.SEVERE, "Error in onComplete", ex); + logger_.log(Level.SEVERE, "Error in onError", ex); } return; - } } + Name.Component lastNameComponent = interest.getName().get(-1); + long currentSegment; + if (lastNameComponent.isSegment()) { + try { + currentSegment = lastNameComponent.toSegment(); + } catch (EncodingException e) { + e.printStackTrace(); + return; + } + + } else { + // First Interest + currentSegment = findFirstEntry(); + } + + rttEstimator_.backoffRto(); + + if (pendingSegments_.containsKey(currentSegment)) { + // Cancel timeout event and set status to InRetxQueue + PendingSegment pendingSegmentIt = pendingSegments_.get(currentSegment); + pendingSegmentIt.inFlight = false; + pendingSegmentIt.state = SegmentState.InRetxQueue; + nSegmentsInFlight_--; + } else return; + + if (receivedSegments_.size() == 0) { + // Resend first Interest (until maximum receive timeout exceeded) + fetchFirstSegment(interest, true); + } else { + windowDecrease(); + retxQueue_.offer(currentSegment); + fetchSegmentsInWindow(interest); + } + } + + public boolean isStopped() { + return stop_; + } + + /** + * Stop fetching packets and clear the received data. + */ + public void stop() { + stop_ = true; + } + + /** + * Check if we should stop fetching interests. + * @return The current state of stop_. + */ + private boolean shouldStop() { + if(stop_) + clean(); + return stop_; + } + + /** + * Clean the data received + */ + private void clean() { + pendingSegments_.clear(); // cancels pending Interests and timeout events + receivedSegments_.clear(); // remove the received segments + } + + /** + * Check if the last component in the name is a segment number. + * + * @param name The name to check. + * @return True if the name ends with a segment number, otherwise false. + */ + private static boolean + endsWithSegmentNumber(Name name) { + return name.size() >= 1 && name.get(-1).isSegment(); + } + + private double cwnd_ = 1; + private final Options options_; + private double ssThresh_; + private final Face face_; + private RttEstimator rttEstimator_; - // Fetch the next segment. - fetchNextSegment(originalInterest, data.getName(), expectedSegmentNumber + 1); - } - } - } - - public void - onDataValidationFailed(Data data, String reason) - { - try { - onError_.onError - (ErrorCode.SEGMENT_VERIFICATION_FAILED, - "Segment verification failed for " + data.getName().toUri() + - " . Reason: " + reason); - } catch (Throwable ex) { - logger_.log(Level.SEVERE, "Error in onError", ex); - } - } - - public void - onTimeout(Interest interest) - { - try { - onError_.onError - (ErrorCode.INTEREST_TIMEOUT, - "Time out for interest " + interest.getName().toUri()); - } catch (Throwable ex) { - logger_.log(Level.SEVERE, "Error in onError", ex); - } - } - - /** - * Check if the last component in the name is a segment number. - * @param name The name to check. - * @return True if the name ends with a segment number, otherwise false. - */ - private static boolean - endsWithSegmentNumber(Name name) - { - return name.size() >= 1 && name.get(-1).isSegment(); - } - - // Use a non-template ArrayList so it works with older Java compilers. - private final ArrayList contentParts_ = new ArrayList(); // of Blob - private final Face face_; - private final KeyChain validatorKeyChain_; - private final VerifySegment verifySegment_; - private final OnComplete onComplete_; - private final OnError onError_; - private static final Logger logger_ = Logger.getLogger(SegmentFetcher.class.getName()); -} + private long highData_ = 0; + private long recPoint_ = 0; + private long highInterest_ = 0; + private int nSegmentsInFlight_ = 0; + private long nSegments_ = -1; + private Map pendingSegments_ = new HashMap(); + private Map receivedSegments_ = new HashMap(); + private Queue retxQueue_ = new LinkedList<>(); + private long nextSegmentNum_ = 0; + private long timeLastSegmentReceived_ = 0; + private Name versionedDataName_; + private boolean allSegmentSent_ = false; + private boolean stop_ = false; + private static final double MIN_SSTHRESH = 2.0; + private final Validator validator_; + private final KeyChain validatorKeyChain_; + private final SegmentFetcher.VerifySegment verifySegment_; + private final SegmentFetcher.OnComplete onComplete_; + private final SegmentFetcher.OnError onError_; + private static final Logger logger_ = Logger.getLogger(SegmentFetcher.class.getName()); +} \ No newline at end of file diff --git a/tests/src/net/named_data/jndn/tests/unit_tests/TestSegmentFetcher.java b/tests/src/net/named_data/jndn/tests/unit_tests/TestSegmentFetcher.java new file mode 100644 index 00000000..dc5b819e --- /dev/null +++ b/tests/src/net/named_data/jndn/tests/unit_tests/TestSegmentFetcher.java @@ -0,0 +1,105 @@ +package net.named_data.jndn.tests.unit_tests; + + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import net.named_data.jndn.*; +import net.named_data.jndn.security.v2.ValidationPolicyAcceptAll; +import net.named_data.jndn.security.v2.Validator; +import net.named_data.jndn.util.Blob; +import net.named_data.jndn.util.SegmentFetcher; +import src.net.named_data.jndn.tests.integration_tests.ValidatorFixture; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; + +/** + * Test sending interests and receiving segmented data + * + * @author Ritik Kumar + */ + + +public class TestSegmentFetcher { + + public final ValidatorFixture.TestFace face_ = new ValidatorFixture.TestFace(); + private ConcurrentHashMap> cacheMap_; + private Name name_ = new Name("/localhost/nfd/location/%FD%00/%00%00"); + private int nSegments_ = 10; + + @Before + public void setUp() throws Exception { + + cacheMap_ = new ConcurrentHashMap<>(); + + final ArrayList data = new ArrayList<>(); + + cacheMap_.put("key", data); + byte[] segment_buffer = new byte[200]; + MetaInfo meta_info = new MetaInfo(); + Name.Component finalBlockId = Name.Component.fromSegment(nSegments_-1); + meta_info.setFinalBlockId(finalBlockId); + + for (int i = 0; i < nSegments_ ; i++){ + Data d = new Data(name_.getPrefix(-1).appendSegment(i)); + d.setMetaInfo(meta_info); + d.setContent(new Blob(segment_buffer)); + + data.add(d); + } + + face_.processInterest_ = new ValidatorFixture.TestFace.ProcessInterest() { + public void processInterest + (final Interest interest, final OnData onData, OnTimeout onTimeout, + OnNetworkNack onNetworkNack) { + if (cacheMap_.containsKey("key")){ + Thread th = new Thread(){ + @Override + public void run() { + try { + Thread.sleep(30); + } catch (InterruptedException e) { + e.printStackTrace(); + } + for(Data d: data){ + if (interest.matchesName(d.getName())){ + onData.onData(interest, d); + } + } + } + }; + + th.start(); + } + } + }; + } + + @Test + public void fetch() { + Interest baseInterest = new Interest(name_); + + SegmentFetcher.OnComplete onComplete = new SegmentFetcher.OnComplete() { + @Override + public void onComplete(Blob content) { + assertEquals(content.size(), 200 * nSegments_); + } + }; + + SegmentFetcher.OnError onError = new SegmentFetcher.OnError() { + @Override + public void onError(SegmentFetcher.ErrorCode errorCode, String message) { + System.out.println("onError: " + message); + } + }; + + SegmentFetcher.VerifySegment verifySegment = new SegmentFetcher.VerifySegment() { + @Override + public boolean verifySegment(Data data) { + return true; + } + }; + + SegmentFetcher.fetch(face_, baseInterest, new Validator(new ValidationPolicyAcceptAll()), onComplete, onError); + } +} \ No newline at end of file