From 48de384bb3dca3e63f35b654e907e9ecaf5d747c Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Tue, 9 Apr 2024 20:16:50 +0200 Subject: [PATCH] [MINOR] Update cocode algorithms for CLA This commit adds a new memorizer that rely on an array in the size of number of columns to compress, instead of a hashmap with all. The memory footprint is the same, but the performance is very much improved because it allows constant time deletion of all memorized column groups that contains a combination with the given specific columns. The technique first allocate an array in size number of columns each index get its own hashmap. containing the columngroup associated with it. then when combining columnsgroups, the lowest index of all columns combined determine which array index hash map to add the combined index into. Once a combination is chosen, the buckets of the lowest index of each column group combined is reset, and the combined columngroup is inserted. The result is constant time O(1) deletion and insertion in the memorizer --- .../compress/cocode/AColumnCoCoder.java | 7 +- .../runtime/compress/cocode/CoCodeGreedy.java | 36 ++++-- .../runtime/compress/cocode/CoCodeHybrid.java | 33 ++++-- .../compress/cocode/CoCodePriorityQue.java | 43 +++---- .../compress/cocode/CoCoderFactory.java | 23 ++-- .../runtime/compress/cocode/ColIndexes.java | 4 +- .../runtime/compress/cocode/Memorizer.java | 13 ++- .../runtime/compress/cocode/MemorizerV2.java | 110 ++++++++++++++++++ .../sysds/runtime/compress/estim/AComEst.java | 76 +++++++----- .../estim/CompressedSizeInfoColGroup.java | 21 ++++ 10 files changed, 277 insertions(+), 89 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java index fc13e16f659..cfe1b1b55ec 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java @@ -26,6 +26,10 @@ import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +/** + * Main abstract class for the co-coding of columns to combine different compression statistics and calculate the + * combinations of columns + */ public abstract class AColumnCoCoder { protected static final Log LOG = LogFactory.getLog(AColumnCoCoder.class.getName()); @@ -34,8 +38,7 @@ public abstract class AColumnCoCoder { protected final ACostEstimate _cest; protected final CompressionSettings _cs; - protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate costEstimator, - CompressionSettings cs) { + protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) { _sest = sizeEstimator; _cest = costEstimator; _cs = cs; diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java index d5d6c6936e7..45f5654ab2c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java @@ -37,14 +37,14 @@ public class CoCodeGreedy extends AColumnCoCoder { - private final Memorizer mem; + private final MemorizerV2 mem; protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) { super(sizeEstimator, costEstimator, cs); - mem = new Memorizer(sizeEstimator); + mem = new MemorizerV2(sizeEstimator, sizeEstimator.getNumColumns()); } - protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, Memorizer mem) { + protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, MemorizerV2 mem) { super(sizeEstimator, costEstimator, cs); this.mem = mem; } @@ -93,16 +93,22 @@ private List coCodeBruteForce(List changeInCost) + if(-Math.min(costC1, costC2) > changeInCost // change in cost cannot possibly be better. + || (maxCombined < 0) // int overflow + || (maxCombined > c1i.getNumRows())) // higher combined number of rows. continue; // Combine the two column groups. @@ -206,10 +212,20 @@ protected CombineTask(ColIndexes c1, ColIndexes c2) { } @Override - public Object call() { - final IColIndex c = _c1._indexes.combine(_c2._indexes); - final ColIndexes cI = new ColIndexes(c); - mem.getOrCreate(cI, _c1, _c2); + public Object call() throws Exception { + final CompressedSizeInfoColGroup c1i = mem.get(_c1); + final CompressedSizeInfoColGroup c2i = mem.get(_c2); + if(c1i != null && c2i != null) { + final int maxCombined = c1i.getNumVals() * c2i.getNumVals(); + + if(maxCombined < 0 // int overflow + || maxCombined > c1i.getNumRows()) // higher combined than number of rows. + return null; + + final IColIndex c = _c1._indexes.combine(_c2._indexes); + final ColIndexes cI = new ColIndexes(c); + mem.getOrCreate(cI, _c1, _c2); + } return null; } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java index 6dc53739d24..c2d3dc9667c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java @@ -21,6 +21,7 @@ import org.apache.sysds.runtime.compress.CompressionSettings; import org.apache.sysds.runtime.compress.cost.ACostEstimate; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; @@ -37,33 +38,43 @@ protected CoCodeHybrid(AComEst sizeEstimator, ACostEstimate costEstimator, Compr @Override protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) { final int startSize = colInfos.getInfo().size(); + final int pqColumnThreashold = Math.max(128, (_sest.getNumColumns() / startSize) * 100); + LOG.error(pqColumnThreashold); + if(startSize == 1) return colInfos; // nothing to join when there only is one column else if(startSize <= 16) {// Greedy all compare all if small number of columns - LOG.debug("Hybrid chose to do greedy cocode because of few columns"); + LOG.debug("Hybrid chose to do greedy CoCode because of few columns"); CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs); return colInfos.setInfo(gd.combine(colInfos.getInfo(), k)); } - else if(startSize > 1000) - return colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, k)); - LOG.debug("Using Hybrid Cocode Strategy: "); + else if(startSize > 1000) { + CoCodePriorityQue pq = new CoCodePriorityQue(_sest, _cest, _cs, pqColumnThreashold); + + return colInfos.setInfo(pq.join(colInfos.getInfo(), 1, k)); + } + LOG.debug("Using Hybrid CoCode Strategy: "); final int PriorityQueGoal = startSize / 5; if(PriorityQueGoal > 30) { // hybrid if there is a large number of columns to begin with Timing time = new Timing(true); - colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, PriorityQueGoal, k)); - LOG.debug("Que based time: " + time.stop()); + CoCodePriorityQue pq = new CoCodePriorityQue(_sest, _cest, _cs, pqColumnThreashold); + colInfos.setInfo(pq.join(colInfos.getInfo(), PriorityQueGoal, k)); final int pqSize = colInfos.getInfo().size(); - if(pqSize <= PriorityQueGoal * 2) { - time = new Timing(true); + + LOG.debug("Que based time: " + time.stop()); + if(pqSize < PriorityQueGoal || (pqSize < startSize && _cest instanceof ComputationCostEstimator)) { CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs); colInfos.setInfo(gd.combine(colInfos.getInfo(), k)); LOG.debug("Greedy time: " + time.stop()); } return colInfos; } - else // If somewhere in between use the que based approach only. - return colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, k)); - + else { + LOG.debug("Using only Greedy based since Nr Column groups: " + startSize + " is not large enough"); + CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs); + colInfos.setInfo(gd.combine(colInfos.getInfo(), k)); + return colInfos; + } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java index b600c697dbc..ca7135c262c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java @@ -48,27 +48,30 @@ public class CoCodePriorityQue extends AColumnCoCoder { private static final int COL_COMBINE_THRESHOLD = 1024; - protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) { + private final int lastCombineThreshold; + + protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, + int lastCombineThreshold) { super(sizeEstimator, costEstimator, cs); + this.lastCombineThreshold = lastCombineThreshold; } @Override protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) { - colInfos.setInfo(join(colInfos.getInfo(), _sest, _cest, 1, k)); + colInfos.setInfo(join(colInfos.getInfo(), 1, k)); return colInfos; } - protected static List join(List groups, AComEst sEst, - ACostEstimate cEst, int minNumGroups, int k) { + protected List join(List groups, int minNumGroups, int k) { if(groups.size() > COL_COMBINE_THRESHOLD && k > 1) - return combineMultiThreaded(groups, sEst, cEst, minNumGroups, k); + return combineMultiThreaded(groups, _sest, _cest, minNumGroups, k); else - return combineSingleThreaded(groups, sEst, cEst, minNumGroups); + return combineSingleThreaded(groups, _sest, _cest, minNumGroups); } - private static List combineMultiThreaded(List groups, - AComEst sEst, ACostEstimate cEst, int minNumGroups, int k) { + private List combineMultiThreaded(List groups, AComEst sEst, + ACostEstimate cEst, int minNumGroups, int k) { final ExecutorService pool = CommonThreadPool.get(k); try { final List tasks = new ArrayList<>(); @@ -90,18 +93,18 @@ private static List combineMultiThreaded(List combineSingleThreaded(List groups, - AComEst sEst, ACostEstimate cEst, int minNumGroups) { + private List combineSingleThreaded(List groups, AComEst sEst, + ACostEstimate cEst, int minNumGroups) { return combineBlock(groups, 0, groups.size(), sEst, cEst, minNumGroups); } - private static List combineBlock(List groups, int start, - int end, AComEst sEst, ACostEstimate cEst, int minNumGroups) { + private List combineBlock(List groups, int start, int end, + AComEst sEst, ACostEstimate cEst, int minNumGroups) { Queue que = getQue(end - start, cEst); for(int i = start; i < end; i++) { @@ -113,7 +116,7 @@ private static List combineBlock(List combineBlock(Queue que, AComEst sEst, + private List combineBlock(Queue que, AComEst sEst, ACostEstimate cEst, int minNumGroups) { List ret = new ArrayList<>(); @@ -133,21 +136,21 @@ private static List combineBlock(Queue 128){ + if(numColumns > lastCombineThreshold) { lastCombine++; ret.add(g); } - else{ + else { lastCombine = 0; que.add(g); } } - else{ + else { lastCombine++; ret.add(l); } } - else{ + else { lastCombine++; ret.add(l); } @@ -155,7 +158,7 @@ private static List combineBlock(Queue> { + protected class PQTask implements Callable> { private final List _groups; private final int _start; diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java index abd12d3f6a8..6c560fb9792 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java @@ -63,16 +63,18 @@ public static CompressedSizeInfo findCoCodesByPartitioning(AComEst est, Compress AColumnCoCoder co = createColumnGroupPartitioner(cs.columnPartitioner, est, costEstimator, cs); // Find out if any of the groups are empty. - final boolean containsEmptyOrConst = containsEmptyOrConst(colInfos); + final boolean containsEmptyConstOrIncompressable = containsEmptyConstOrIncompressable(colInfos); // if there are no empty or const columns then try cocode algorithms for all columns - if(!containsEmptyOrConst) + if(!containsEmptyConstOrIncompressable) return co.coCodeColumns(colInfos, k); else { // filtered empty groups final List emptyCols = new ArrayList<>(); // filtered const groups final List constCols = new ArrayList<>(); + // incompressable groups + final List incompressable = new ArrayList<>(); // filtered groups -- in the end starting with all groups final List groups = new ArrayList<>(); @@ -85,13 +87,15 @@ public static CompressedSizeInfo findCoCodesByPartitioning(AComEst est, Compress emptyCols.add(g.getColumns()); else if(g.isConst()) constCols.add(g.getColumns()); + else if(g.isIncompressable()) + incompressable.add(g.getColumns()); else groups.add(g); } // overwrite groups. colInfos.compressionInfo = groups; - + // cocode remaining groups if(!groups.isEmpty()) { colInfos = co.coCodeColumns(colInfos, k); @@ -109,14 +113,19 @@ else if(g.isConst()) colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.CONST)); } + if(incompressable.size() > 0) { + final IColIndex idx = ColIndexFactory.combineIndexes(incompressable); + colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.UNCOMPRESSED)); + } + return colInfos; } } - private static boolean containsEmptyOrConst(CompressedSizeInfo colInfos) { + private static boolean containsEmptyConstOrIncompressable(CompressedSizeInfo colInfos) { for(CompressedSizeInfoColGroup g : colInfos.compressionInfo) - if(g.isEmpty() || g.isConst()) + if(g.isEmpty() || g.isConst() || g.isIncompressable()) return true; return false; } @@ -133,9 +142,9 @@ private static AColumnCoCoder createColumnGroupPartitioner(PartitionerType type, case STATIC: return new CoCodeStatic(est, costEstimator, cs); case PRIORITY_QUE: - return new CoCodePriorityQue(est, costEstimator, cs); + return new CoCodePriorityQue(est, costEstimator, cs, 128); default: - throw new RuntimeException("Unsupported column group partitioner: " + type.toString()); + throw new RuntimeException("Unsupported column group partition technique: " + type.toString()); } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java index dcdcbe464c0..910c640d72c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java @@ -42,9 +42,9 @@ public boolean equals(Object that) { } public boolean contains(ColIndexes a, ColIndexes b) { - if(a == null || b == null) return false; - return _indexes.contains(a._indexes.get(0)) || _indexes.contains(b._indexes.get(0)); + return _indexes.contains(a._indexes.get(0)) // + || _indexes.contains(b._indexes.get(0)); } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java index db77a32bf68..9ac0d5c9485 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.sysds.runtime.compress.DMLCompressionException; import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; @@ -50,8 +51,6 @@ public CompressedSizeInfoColGroup get(ColIndexes c) { } public void remove(ColIndexes c1, ColIndexes c2) { - mem.remove(c1); - mem.remove(c2); Iterator> i = mem.entrySet().iterator(); while(i.hasNext()) { final ColIndexes eci = i.next().getKey(); @@ -60,7 +59,7 @@ public void remove(ColIndexes c1, ColIndexes c2) { } } - public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2){ + public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2) { CompressedSizeInfoColGroup g = mem.get(cI); st2++; if(g == null) { @@ -69,7 +68,11 @@ public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColI if(left != null && right != null) { st3++; g = _sEst.combine(cI._indexes, left, right); - + if(g != null) { + if(g.getNumVals() < 0) + throw new DMLCompressionException( + "Combination returned less distinct values on: \n" + left + "\nand\n" + right + "\nEq\n" + g); + } synchronized(this) { mem.put(cI, g); } @@ -88,7 +91,7 @@ public void incst4() { } public String stats() { - return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: "+ st4; + return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: " + st4; } public void resetStats() { diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java new file mode 100644 index 00000000000..b63a3657fc4 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.cocode; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.estim.AComEst; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; + +public class MemorizerV2 { + private final AComEst _sEst; + + private final Map[] mem; + private int st1 = 0, st2 = 0, st3 = 0, st4 = 0; + + @SuppressWarnings("unchecked") + public MemorizerV2(AComEst sEst, int nCol) { + _sEst = sEst; + mem = new Map[nCol]; + } + + public void put(CompressedSizeInfoColGroup g) { + put(new ColIndexes(g.getColumns()), g); + } + + public void put(ColIndexes key, CompressedSizeInfoColGroup val) { + final IColIndex gi = key._indexes; + final int bucketID = gi.get(0); + Map bucket = mem[bucketID]; + if(bucket == null) + bucket = mem[bucketID] = new HashMap<>(); + bucket.put(key, val); + } + + public CompressedSizeInfoColGroup get(ColIndexes c) { + return mem[c._indexes.get(0)].get(c); + } + + public void remove(ColIndexes c1, ColIndexes c2) { + mem[c1._indexes.get(0)] = null; + mem[c2._indexes.get(0)] = null; + } + + public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2) { + CompressedSizeInfoColGroup g = get(cI); + st2++; + if(g == null) { + final CompressedSizeInfoColGroup left = get(c1); + final CompressedSizeInfoColGroup right = get(c2); + if(left != null && right != null) { + st3++; + g = _sEst.combine(cI._indexes, left, right); + if(g != null) { + if(g.getNumVals() < 0) + throw new DMLCompressionException( + "Combination returned less distinct values on: \n" + left + "\nand\n" + right + "\nEq\n" + g); + } + synchronized(this) { + put(cI, g); + } + } + + } + return g; + } + + public void incst1() { + st1++; + } + + public void incst4() { + st4++; + } + + public String stats() { + return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: " + st4; + } + + public void resetStats() { + st1 = 0; + st2 = 0; + st3 = 0; + st4 = 0; + } + + @Override + public String toString() { + return mem.toString(); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java b/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java index 03cf173a13a..832725f328f 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -63,11 +62,21 @@ protected AComEst(MatrixBlock data, CompressionSettings cs) { _cs = cs; } - protected int getNumRows() { + /** + * Get the number of rows in the overall compressing block. + * + * @return The number of rows + */ + public int getNumRows() { return _cs.transposed ? _data.getNumColumns() : _data.getNumRows(); } - protected int getNumColumns() { + /** + * Get the number of columns in the overall compressing block. + * + * @return The number of cols + */ + public int getNumColumns() { return _cs.transposed ? _data.getNumRows() : _data.getNumColumns(); } @@ -221,6 +230,13 @@ public void clearNNZ() { protected abstract CompressedSizeInfoColGroup combine(IColIndex combinedColumns, CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int maxDistinct); + /** + * Collect the compressed size for all individual columns using the available k parallelism degree. + * + * @param clen The number of total columns + * @param k The parallelization degree + * @return A list of the individual columns compressibility. + */ protected List CompressedSizeInfoColGroup(int clen, int k) { if(k <= 1) return CompressedSizeInfoColGroupSingleThread(clen); @@ -228,6 +244,12 @@ protected List CompressedSizeInfoColGroup(int clen, return CompressedSizeInfoColGroupParallel(clen, k); } + /** + * Compress the column groups using a single thread. + * + * @param clen the number of total columns + * @return A list of the individual columns compressibility. + */ private List CompressedSizeInfoColGroupSingleThread(int clen) { List ret = new ArrayList<>(clen); if(!_cs.transposed && !_data.isEmpty() && _data.isInSparseFormat()) @@ -237,6 +259,13 @@ private List CompressedSizeInfoColGroupSingleThread( return ret; } + /** + * Collect the compressed size for all individual columns using the available k parallelism degree. + * + * @param clen The number of total columns + * @param k The parallelization degree + * @return A list of the individual columns compressibility. + */ private List CompressedSizeInfoColGroupParallel(int clen, int k) { final ExecutorService pool = CommonThreadPool.get(k); try { @@ -249,15 +278,22 @@ private List CompressedSizeInfoColGroupParallel(int CompressedSizeInfoColGroup[] res = new CompressedSizeInfoColGroup[clen]; final int blkz = Math.max(1, clen / (k * 10)); - final ArrayList tasks = new ArrayList<>(clen / blkz + 1); + final ArrayList> tasks = new ArrayList<>(clen / blkz + 1); if(blkz != 1) LOG.debug("Extracting column samples in blocks of " + blkz); - for(int col = 0; col < clen; col += blkz) - tasks.add(new SizeEstimationTask(res, col, Math.min(clen, col + blkz))); + for(int col = 0; col < clen; col += blkz) { + final int start = col; + final int end = Math.min(clen, col + blkz); + tasks.add(pool.submit(() -> { + for(int c = start; c < end; c++) + res[c] = getColGroupInfo(new SingleIndex(c)); + return null; + })); + } - for(Future f : pool.invokeAll(tasks)) + for(Future f : tasks) f.get(); return Arrays.asList(res); @@ -265,35 +301,11 @@ private List CompressedSizeInfoColGroupParallel(int catch(Exception e) { throw new DMLCompressionException("Multithreaded first extraction failed", e); } - finally{ + finally { pool.shutdown(); } } - private class SizeEstimationTask implements Callable { - final CompressedSizeInfoColGroup[] _res; - final int _cs; - final int _ce; - - private SizeEstimationTask(CompressedSizeInfoColGroup[] res, int cs, int ce) { - _res = res; - _cs = cs; - _ce = ce; - } - - @Override - public Object call() { - try { - for(int c = _cs; c < _ce; c++) - _res[c] = getColGroupInfo(new SingleIndex(c)); - return null; - } - catch(Exception e) { - throw new DMLCompressionException("ColGroup extraction failed", e); - } - } - } - @Override public String toString() { return this.getClass().getSimpleName(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java index 1168147b3d2..4fbf9b0ee4d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java @@ -129,6 +129,10 @@ public CompressedSizeInfoColGroup(IColIndex columns, int nRows, CompressionType _sizes.put(ct, (double) ColGroupSizes.estimateInMemorySizeCONST(columns.size(), columns.isContiguous(), 1.0, false)); break; + case UNCOMPRESSED: + _sizes.put(ct, (double) ColGroupSizes.estimateInMemorySizeUncompressed(nRows, columns.isContiguous(), + columns.size(), 1.0)); + break; default: throw new DMLCompressionException("Invalid instantiation of const Cost"); } @@ -206,6 +210,10 @@ public IEncode getMap() { return _map; } + public void setMap(IEncode map) { + _map = map; + } + public boolean containsZeros() { return _facts.numOffs < _facts.numRows; } @@ -229,6 +237,10 @@ public boolean isConst() { return _bestCompressionType == CompressionType.CONST || _sizes.containsKey(CompressionType.CONST); } + public boolean isIncompressable() { + return _bestCompressionType == CompressionType.UNCOMPRESSED; + } + private static double getCompressionSize(IColIndex cols, CompressionType ct, EstimationFactors fact) { int nv; final int numCols = cols.size(); @@ -284,6 +296,15 @@ public String toString() { sb.append(" Sizes: " + _sizes); sb.append(" facts: " + _facts); sb.append(" mapIsNull: " + (_map == null)); + if(_map != null) { + String s = _map.toString(); + if(s.length() > 1000) { + sb.append(s, 0, 1000); + } + else { + sb.append(s); + } + } return sb.toString(); }