Skip to content

Commit

Permalink
[MINOR] Bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
phaniarnab committed Dec 25, 2023
1 parent 24d2c6d commit c842072
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 41 deletions.
70 changes: 31 additions & 39 deletions src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,27 +161,23 @@ else if (e.isRDDPersist()) {
return false; //the executing thread removed this entry from cache

//Reuse the cached RDD (local or persisted at the executors)
switch(e.getCacheStatus()) {
case TOPERSISTRDD:
//Change status to PERSISTEDRDD on the second hit
//putValueRDD method will save the RDD and call persist
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
//Cannot reuse rdd as already garbage collected
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
return false;
case PERSISTEDRDD:
//Reuse the persisted intermediate at the executors
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
//Safely cleanup the child RDDs if this RDD is persisted already
//If reused 3 times and still not persisted, move to Spark asynchronously
if (probeRDDDistributed(e))
LineageSparkCacheEviction.cleanupChildRDDs(e);
else
LineageSparkCacheEviction.moveToSpark(e);
break;
default:
return false;
if (e.getCacheStatus() == LineageCacheStatus.TOPERSISTRDD) { //second hit
//Cannot reuse rdd as already garbage collected
//putValueRDD will save the RDD and call persist
if (DMLScript.STATISTICS) LineageCacheStatistics.incrementDelHitsRdd();
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
return false;
}
//Reuse from third hit onwards (status == PERSISTEDRDD)
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
//Set the cached data characteristics to the output matrix object
ec.getMatrixObject(outName).updateDataCharacteristics(rdd.getDataCharacteristics());
//Safely cleanup the child RDDs if this RDD is persisted already
//If reused 3 times and still not persisted, move to Spark asynchronously
if (probeRDDDistributed(e))
LineageSparkCacheEviction.cleanupChildRDDs(e);
else
LineageSparkCacheEviction.moveToSpark(e);
}
else { //TODO handle locks on gpu objects
Pointer gpuPtr = e.getGPUPointer();
Expand Down Expand Up @@ -288,8 +284,10 @@ else if (e.isRDDPersist()) {
RDDObject rdd = e.getRDDObject();
if (rdd == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing thread removed this entry from cache
//Set the data characteristics and hdfs file to the output matrix object
MetaDataFormat md = new MetaDataFormat(rdd.getDataCharacteristics(),FileFormat.BINARY);
boundValue = new MatrixObject(ValueType.FP64, boundVarName, md);
String filename = rdd.getHDFSFilename() != null ? rdd.getHDFSFilename() : boundVarName;
boundValue = new MatrixObject(ValueType.FP64, filename, md);
((MatrixObject) boundValue).setRDDHandle(rdd);
}
else if (e.isScalarValue()) {
Expand Down Expand Up @@ -343,6 +341,7 @@ else if (e.isRDDPersist()) {
//Cannot reuse rdd as already garbage collected
//putValue method will save the RDD and call persist
//while caching the original instruction
if (DMLScript.STATISTICS) LineageCacheStatistics.incrementDelHitsRdd(); //increase miss count
return false;
case PERSISTEDRDD:
//Reuse the persisted intermediate at the executors
Expand Down Expand Up @@ -800,10 +799,6 @@ private static void putValueRDD(Instruction inst, LineageItem instLI, ExecutionC
if (!probe(instLI))
return;
LineageCacheEntry centry = _cache.get(instLI);
// Remember the 1st hit and put the RDD in the cache the 2nd time
if (centry.getCacheStatus() != LineageCacheStatus.EMPTY //first hit
&& centry.getCacheStatus() != LineageCacheStatus.PERSISTEDRDD) //second hit
return;
// Avoid reuse chkpoint, which is unnecessary
if (inst.getOpcode().equalsIgnoreCase("chkpoint")) {
removePlaceholder(instLI);
Expand All @@ -816,27 +811,24 @@ private static void putValueRDD(Instruction inst, LineageItem instLI, ExecutionC
return;
}

// Filter out Spark instructions with broadcast input
// TODO: This code avoids one crash. Remove once fixed.
if (!opToPersist && !allInputsSpark(inst, ec)) {
removePlaceholder(instLI);
return;
}

// Get the RDD handle of the RDD
CacheableData<?> cd = ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
RDDObject rddObj = cd.getRDDHandle();
// Save the metadata. Required for estimating cached space overhead.
// Save the metadata and hdfs filename. Required during reuse and space management.
rddObj.setDataCharacteristics(cd.getDataCharacteristics());
rddObj.setHDFSFilename(cd.getFileName());
// Set the RDD object in the cache
switch(centry.getCacheStatus()) {
case EMPTY: //first hit
// Do not save the child RDDS (incl. broadcast vars) on the first hit.
// Let them be garbage collected via rmvar. Save them on the second hit
// by disabling garbage collection on this and the child RDDs.
centry.setRDDValue(rddObj, computetime); //rddObj will be garbage collected
break;
case PERSISTEDRDD: //second hit
// Cache right away if delayed caching is disabled
if (LineageCacheConfig.isDelayedCachingRDD()) {
// Do not save the child RDDS (incl. broadcast vars) on the first hit.
// Let them be garbage collected via rmvar. Save them on the second hit
// by disabling garbage collection on this and the child RDDs.
centry.setRDDValue(rddObj, computetime); //rddObj will be garbage collected
break;
} //else, fall through and cache
case TOPERSISTRDD: //second hit
// Replace the old RDD (GCed) with the new one
centry.setRDDValue(rddObj);
// Set the correct status to indicate the RDD is marked to be persisted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static boolean isNone() {
// Note, delayed caching helps in reducing lineage caching/probing overhead for use cases with
// no reusable instructions, but is anti-productive for use cases with repeating patterns (eg. scoring).
private static boolean DELAYED_CACHING_GPU = true;
private static boolean DELAYED_CACHING_RDD = true;

//-------------DISK SPILLING RELATED CONFIGURATIONS--------------//

Expand Down Expand Up @@ -409,6 +410,10 @@ public static boolean isDelayedCachingGPU() {
return DELAYED_CACHING_GPU;
}

public static boolean isDelayedCachingRDD() {
return DELAYED_CACHING_RDD;
}

public static void setCachePolicy(LineageCachePolicy policy) {
// TODO: Automatic tuning of weights.
switch(policy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ protected static void updateSize(long space, boolean addspace) {
_cachesize -= space;
}

public static void removeAll(Map<LineageItem, LineageCacheEntry> cache) {
while (!weightedQueue.isEmpty()) {
LineageCacheEntry e = weightedQueue.pollFirst();
if (e == null)
continue;
removeOrSpillEntry(cache, e, false);
}
}

protected static boolean isBelowThreshold(long spaceNeeded) {
return ((spaceNeeded + _cachesize) <= CACHE_LIMIT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class LineageCacheStatistics {
private static final LongAdder _numHitsRddPersist = new LongAdder();
private static final LongAdder _numRddPersist = new LongAdder();
private static final LongAdder _numRddUnpersist = new LongAdder();
private static final LongAdder _numHitsDelRdd = new LongAdder();

public static void reset() {
_numHitsMem.reset();
Expand Down Expand Up @@ -85,6 +86,7 @@ public static void reset() {
_numHitsRddPersist.reset();
_numRddPersist.reset();
_numRddUnpersist.reset();
_numHitsDelRdd.reset();
}

public static void incrementMemHits() {
Expand Down Expand Up @@ -233,7 +235,7 @@ public static void incrementGpuDel() {
}

public static void incrementDelHitsGpu() {
// Number of hits on pointers that are deleted/recycled before
// Number of hits on pointers that are delayed for caching or deleted/recycled before
_numHitsDelGpu.increment();
}

Expand Down Expand Up @@ -268,6 +270,11 @@ public static void incrementRDDUnpersists() {
_numRddUnpersist.increment();
}

public static void incrementDelHitsRdd() {
// Number of hits on RDDs that are delayed for caching or evicted
_numHitsDelRdd.increment();
}

public static String displayHits() {
StringBuilder sb = new StringBuilder();
sb.append(_numHitsMem.longValue());
Expand Down Expand Up @@ -366,6 +373,8 @@ public static String displaySparkPersist() {
sb.append(_numRddPersist.longValue());
sb.append("/");
sb.append(_numRddUnpersist.longValue());
sb.append("/");
sb.append(_numHitsDelRdd.longValue());
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private static void removeEntry(Map<LineageItem, LineageCacheEntry> cache, Linea
private static void setSparkStorageLimit() {
// Set the limit only during the first RDD caching to avoid context creation
// Cache size = 70% of unified Spark memory = 0.7 * 0.6 = 42%.
// TODO: Reduce to avoid disk spilling. 80% of storage.
if (SPARK_STORAGE_LIMIT == 0) {
long unifiedSparkMem = (long) SparkExecutionContext.getDataMemoryBudget(false, true);
SPARK_STORAGE_LIMIT = (long)(unifiedSparkMem * 0.7d);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/apache/sysds/utils/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ public static String display(int maxHeavyHitters)
}
if (LineageCacheStatistics.ifSparkStats()) {
sb.append("LinCache Spark (Col/Loc/Dist): \t" + LineageCacheStatistics.displaySparkHits() + ".\n");
sb.append("LinCache Spark (Per/Unper): \t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
sb.append("LinCache Spark (Per/Unper/Del):\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
}
sb.append("LinCache writes (Mem/FS/Del): \t" + LineageCacheStatistics.displayWtrites() + ".\n");
sb.append("LinCache FStimes (Rd/Wr): \t" + LineageCacheStatistics.displayFSTime() + " sec.\n");
Expand Down

0 comments on commit c842072

Please sign in to comment.