From e291bb645c98c9e47d676a2519838bf7ced8f9df Mon Sep 17 00:00:00 2001 From: Arnab Phani Date: Tue, 27 Aug 2024 13:10:08 +0200 Subject: [PATCH] [SYSTEMDS-2650] Standardize dedup lineage trace serialization format This patch standardizes the dedup patch serialization format, regardless of if they are captured by the lineage() built-in or via a write operator. Previously, all patches were stored in a separate file. Now, we always create a single file, containing first the global trace, followed by the patches. This change also allows us to copy the traces from the console (lineage()) and store them in a single file for future recomputation. --- .../controlprogram/caching/FrameObject.java | 2 +- .../controlprogram/caching/MatrixObject.java | 2 +- .../controlprogram/caching/TensorObject.java | 2 +- .../cp/AggregateUnaryCPInstruction.java | 2 +- .../sysds/runtime/lineage/LineageMap.java | 9 ++++----- .../sysds/runtime/lineage/LineageParser.java | 20 ++++++++++++++++++- .../lineage/LineageRecomputeUtils.java | 17 ++++++++-------- .../functions/lineage/LineageCodegenTest.java | 2 +- .../lineage/LineageTraceBuiltinTest.java | 2 +- .../lineage/LineageTraceDedupTest.java | 3 +-- .../lineage/LineageTraceExecSparkTest.java | 4 ++-- .../lineage/LineageTraceExecTest.java | 2 +- .../lineage/LineageTraceFunctionTest.java | 2 +- .../lineage/LineageTraceGPUTest.java | 2 +- .../lineage/LineageTraceParforTest.java | 2 +- 15 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 945021626ea..582bb64dd8c 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -308,7 +308,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt) @Override protected FrameBlock reconstructByLineage(LineageItem li) throws IOException { return ((FrameObject) LineageRecomputeUtils - .parseNComputeLineageTrace(li.getData(), null)) + .parseNComputeLineageTrace(li.getData())) .acquireReadAndRelease(); } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java index 29f2c1cb599..f58b315e68f 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java @@ -588,7 +588,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String output @Override protected MatrixBlock reconstructByLineage(LineageItem li) throws IOException { return ((MatrixObject) LineageRecomputeUtils - .parseNComputeLineageTrace(Explain.explain(li), null)) + .parseNComputeLineageTrace(Explain.explain(li))) .acquireReadAndRelease(); } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java index 13665f65a23..8908f55d065 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java @@ -202,7 +202,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt) @Override protected TensorBlock reconstructByLineage(LineageItem li) throws IOException { return ((TensorObject) LineageRecomputeUtils - .parseNComputeLineageTrace(li.getData(), null)) + .parseNComputeLineageTrace(li.getData())) .acquireReadAndRelease(); } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java index 78bc7d132d5..920a79b77dc 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java @@ -185,7 +185,7 @@ else if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) { LineageItem li = ec.getLineageItem(input1); String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) : - Explain.explain(li) + LineageDedupUtils.mergeExplainDedupBlocks(ec); + Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec); ec.setScalarOutput(outputName, new StringObject(out)); break; } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java index df4d0c2e1be..41875bdfdf6 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java @@ -245,10 +245,9 @@ private void processWriteLI(CPOperand input1, CPOperand input2, ExecutionContext LineageItem li = get(input1); String fName = ec.getScalarInput(input2.getName(), Types.ValueType.STRING, input2.isLiteral()).getStringValue(); - if (DMLScript.LINEAGE_DEDUP) { - // gracefully serialize the dedup maps without decompressing - LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup"); - } - LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage"); + // Combine the global trace and dedup patches in a single file. + String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) : + Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec); + LineageItemUtils.writeTraceToHDFS(out, fName + ".lineage"); } } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java index 439fe1f10da..8acfbdc9e9e 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java @@ -122,7 +122,7 @@ private static LineageItem parseLineageInstruction(Long id, String str, Map loopPatchMap = new HashMap<>(); - public static Data parseNComputeLineageTrace(String mainTrace, String dedupPatches) { - if (DEBUG) { + public static Data parseNComputeLineageTrace(String mainTrace) { + if (DEBUG) System.out.println(mainTrace); - System.out.println(dedupPatches); - } - LineageItem root = LineageParser.parseLineageTrace(mainTrace); - if (dedupPatches != null) - LineageParser.parseLineageTraceDedup(dedupPatches); + + // Separate the global trace and the dedup patches + String[] patches = LineageParser.separateMainAndDedupPatches(mainTrace); + LineageItem root = LineageParser.parseLineageTrace(patches[0]); //global trace + if (patches.length > 1) + LineageParser.parseLineageTraceDedup(patches[1]); // Disable GPU execution. TODO: Support GPU boolean GPUenabled = false; diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java index cc88317d14d..59cda3cc695 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java @@ -100,7 +100,7 @@ private void testLineageTrace(String testname) { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); TestUtils.compareMatrices(dmlfile, tmp, 1e-6); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java index 84cea6d74a6..3e3845b311e 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java @@ -78,7 +78,7 @@ private void testLineageTraceBuiltin(String testname) { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java index f372cd3aa97..c9a6beb1f03 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java @@ -152,8 +152,7 @@ public void testLineageTrace(String testname) { //deserialize, generate program and execute String Rtrace = readDMLLineageFromHDFS("R"); - String RDedupPatches = readDMLLineageDedupFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); //match the original and recomputed results HashMap orig = readDMLMatrixFromOutputDir("R"); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java index 88456dd9626..4009cea6848 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java @@ -116,12 +116,12 @@ public void testLineageTraceSpark(String testname) { TestUtils.compareScalars(Y_lineage, Explain.explain(Y_li)); //generate program - Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null); + Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage); HashMap X_dmlfile = readDMLMatrixFromOutputDir("X"); MatrixBlock X_tmp = ((MatrixObject)X_data).acquireReadAndRelease(); TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6); - Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null); + Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage); HashMap Y_dmlfile = readDMLMatrixFromOutputDir("Y"); MatrixBlock Y_tmp = ((MatrixObject)Y_data).acquireReadAndRelease(); TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java index 7e19d1db8d7..127426b5bda 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java @@ -127,7 +127,7 @@ private void testLineageTraceExec(String testname) { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) { double val1 = readDMLScalarFromOutputDir("R").get(new CellIndex(1,1)); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java index 763e73591e0..53fc1b54ab3 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java @@ -86,7 +86,7 @@ private void testLineageTraceFunction(String testname) { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java index 031ed65f96b..a6cb707b214 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java @@ -83,7 +83,7 @@ private void testLineageTraceExec(String testname) { String Rtrace = readDMLLineageFromHDFS("R"); AutomatedTestBase.TEST_GPU = false; //NOTE: the generated program is CP-only. - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java index 48c39b0bdff..3abd25ce63b 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java @@ -160,7 +160,7 @@ private void testLineageTraceParFor(int ncol, String testname) { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject) ret).acquireReadAndRelease();