Skip to content

Commit

Permalink
[MINOR] Update cocode algorithms for CLA
Browse files Browse the repository at this point in the history
This commit adds a new memorizer that rely on an array in
the size of number of columns to compress, instead of a hashmap with all.
The memory footprint is the same, but the performance is very much
improved because it allows constant time deletion of all memorized
column groups that contains a combination with the given specific columns.

The technique first allocate an array in size number of columns
each index get its own hashmap. containing the columngroup associated with it.
then when combining columnsgroups, the lowest index of all columns combined
determine which array index hash map to add the combined index into.
Once a combination is chosen, the buckets of the lowest index of each
column group combined is reset, and the combined columngroup is inserted.

The result is constant time O(1) deletion and insertion in the memorizer
  • Loading branch information
Baunsgaard committed Apr 9, 2024
1 parent 3449285 commit 48de384
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import org.apache.sysds.runtime.compress.estim.AComEst;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;

/**
* Main abstract class for the co-coding of columns to combine different compression statistics and calculate the
* combinations of columns
*/
public abstract class AColumnCoCoder {

protected static final Log LOG = LogFactory.getLog(AColumnCoCoder.class.getName());
Expand All @@ -34,8 +38,7 @@ public abstract class AColumnCoCoder {
protected final ACostEstimate _cest;
protected final CompressionSettings _cs;

protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate costEstimator,
CompressionSettings cs) {
protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) {
_sest = sizeEstimator;
_cest = costEstimator;
_cs = cs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@

public class CoCodeGreedy extends AColumnCoCoder {

private final Memorizer mem;
private final MemorizerV2 mem;

protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) {
super(sizeEstimator, costEstimator, cs);
mem = new Memorizer(sizeEstimator);
mem = new MemorizerV2(sizeEstimator, sizeEstimator.getNumColumns());
}

protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, Memorizer mem) {
protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, MemorizerV2 mem) {
super(sizeEstimator, costEstimator, cs);
this.mem = mem;
}
Expand Down Expand Up @@ -93,16 +93,22 @@ private List<CompressedSizeInfoColGroup> coCodeBruteForce(List<CompressedSizeInf
for(int j = i + 1; j < workSet.size(); j++) {
final ColIndexes c1 = workSet.get(i);
final ColIndexes c2 = workSet.get(j);
final double costC1 = _cest.getCost(mem.get(c1));
final double costC2 = _cest.getCost(mem.get(c2));
final CompressedSizeInfoColGroup c1i = mem.get(c1);
final CompressedSizeInfoColGroup c2i = mem.get(c2);

final double costC1 = _cest.getCost(c1i);
final double costC2 = _cest.getCost(c2i);

mem.incst1();
final int maxCombined = c1i.getNumVals() * c2i.getNumVals();

// Pruning filter : skip dominated candidates
// Since even if the entire size of one of the column lists is removed,
// it still does not improve compression.
// In the case of workload we relax the requirement for the filter.
if(-Math.min(costC1, costC2) > changeInCost)
if(-Math.min(costC1, costC2) > changeInCost // change in cost cannot possibly be better.
|| (maxCombined < 0) // int overflow
|| (maxCombined > c1i.getNumRows())) // higher combined number of rows.
continue;

// Combine the two column groups.
Expand Down Expand Up @@ -206,10 +212,20 @@ protected CombineTask(ColIndexes c1, ColIndexes c2) {
}

@Override
public Object call() {
final IColIndex c = _c1._indexes.combine(_c2._indexes);
final ColIndexes cI = new ColIndexes(c);
mem.getOrCreate(cI, _c1, _c2);
public Object call() throws Exception {
final CompressedSizeInfoColGroup c1i = mem.get(_c1);
final CompressedSizeInfoColGroup c2i = mem.get(_c2);
if(c1i != null && c2i != null) {
final int maxCombined = c1i.getNumVals() * c2i.getNumVals();

if(maxCombined < 0 // int overflow
|| maxCombined > c1i.getNumRows()) // higher combined than number of rows.
return null;

final IColIndex c = _c1._indexes.combine(_c2._indexes);
final ColIndexes cI = new ColIndexes(c);
mem.getOrCreate(cI, _c1, _c2);
}
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.cost.ACostEstimate;
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.estim.AComEst;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
Expand All @@ -37,33 +38,43 @@ protected CoCodeHybrid(AComEst sizeEstimator, ACostEstimate costEstimator, Compr
@Override
protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) {
final int startSize = colInfos.getInfo().size();
final int pqColumnThreashold = Math.max(128, (_sest.getNumColumns() / startSize) * 100);
LOG.error(pqColumnThreashold);

if(startSize == 1)
return colInfos; // nothing to join when there only is one column
else if(startSize <= 16) {// Greedy all compare all if small number of columns
LOG.debug("Hybrid chose to do greedy cocode because of few columns");
LOG.debug("Hybrid chose to do greedy CoCode because of few columns");
CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
return colInfos.setInfo(gd.combine(colInfos.getInfo(), k));
}
else if(startSize > 1000)
return colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, k));
LOG.debug("Using Hybrid Cocode Strategy: ");
else if(startSize > 1000) {
CoCodePriorityQue pq = new CoCodePriorityQue(_sest, _cest, _cs, pqColumnThreashold);

return colInfos.setInfo(pq.join(colInfos.getInfo(), 1, k));
}
LOG.debug("Using Hybrid CoCode Strategy: ");

final int PriorityQueGoal = startSize / 5;
if(PriorityQueGoal > 30) { // hybrid if there is a large number of columns to begin with
Timing time = new Timing(true);
colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, PriorityQueGoal, k));
LOG.debug("Que based time: " + time.stop());
CoCodePriorityQue pq = new CoCodePriorityQue(_sest, _cest, _cs, pqColumnThreashold);
colInfos.setInfo(pq.join(colInfos.getInfo(), PriorityQueGoal, k));
final int pqSize = colInfos.getInfo().size();
if(pqSize <= PriorityQueGoal * 2) {
time = new Timing(true);

LOG.debug("Que based time: " + time.stop());
if(pqSize < PriorityQueGoal || (pqSize < startSize && _cest instanceof ComputationCostEstimator)) {
CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
colInfos.setInfo(gd.combine(colInfos.getInfo(), k));
LOG.debug("Greedy time: " + time.stop());
}
return colInfos;
}
else // If somewhere in between use the que based approach only.
return colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, k));

else {
LOG.debug("Using only Greedy based since Nr Column groups: " + startSize + " is not large enough");
CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
colInfos.setInfo(gd.combine(colInfos.getInfo(), k));
return colInfos;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,30 @@ public class CoCodePriorityQue extends AColumnCoCoder {

private static final int COL_COMBINE_THRESHOLD = 1024;

protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) {
private final int lastCombineThreshold;

protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs,
int lastCombineThreshold) {
super(sizeEstimator, costEstimator, cs);
this.lastCombineThreshold = lastCombineThreshold;
}

@Override
protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) {
colInfos.setInfo(join(colInfos.getInfo(), _sest, _cest, 1, k));
colInfos.setInfo(join(colInfos.getInfo(), 1, k));
return colInfos;
}

protected static List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
ACostEstimate cEst, int minNumGroups, int k) {
protected List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> groups, int minNumGroups, int k) {

if(groups.size() > COL_COMBINE_THRESHOLD && k > 1)
return combineMultiThreaded(groups, sEst, cEst, minNumGroups, k);
return combineMultiThreaded(groups, _sest, _cest, minNumGroups, k);
else
return combineSingleThreaded(groups, sEst, cEst, minNumGroups);
return combineSingleThreaded(groups, _sest, _cest, minNumGroups);
}

private static List<CompressedSizeInfoColGroup> combineMultiThreaded(List<CompressedSizeInfoColGroup> groups,
AComEst sEst, ACostEstimate cEst, int minNumGroups, int k) {
private List<CompressedSizeInfoColGroup> combineMultiThreaded(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
ACostEstimate cEst, int minNumGroups, int k) {
final ExecutorService pool = CommonThreadPool.get(k);
try {
final List<PQTask> tasks = new ArrayList<>();
Expand All @@ -90,18 +93,18 @@ private static List<CompressedSizeInfoColGroup> combineMultiThreaded(List<Compre
catch(Exception e) {
throw new DMLCompressionException("Failed parallel priority que cocoding", e);
}
finally{
finally {
pool.shutdown();
}
}

private static List<CompressedSizeInfoColGroup> combineSingleThreaded(List<CompressedSizeInfoColGroup> groups,
AComEst sEst, ACostEstimate cEst, int minNumGroups) {
private List<CompressedSizeInfoColGroup> combineSingleThreaded(List<CompressedSizeInfoColGroup> groups, AComEst sEst,
ACostEstimate cEst, int minNumGroups) {
return combineBlock(groups, 0, groups.size(), sEst, cEst, minNumGroups);
}

private static List<CompressedSizeInfoColGroup> combineBlock(List<CompressedSizeInfoColGroup> groups, int start,
int end, AComEst sEst, ACostEstimate cEst, int minNumGroups) {
private List<CompressedSizeInfoColGroup> combineBlock(List<CompressedSizeInfoColGroup> groups, int start, int end,
AComEst sEst, ACostEstimate cEst, int minNumGroups) {
Queue<CompressedSizeInfoColGroup> que = getQue(end - start, cEst);

for(int i = start; i < end; i++) {
Expand All @@ -113,7 +116,7 @@ private static List<CompressedSizeInfoColGroup> combineBlock(List<CompressedSize
return combineBlock(que, sEst, cEst, minNumGroups);
}

private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst,
private List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst,
ACostEstimate cEst, int minNumGroups) {

List<CompressedSizeInfoColGroup> ret = new ArrayList<>();
Expand All @@ -133,29 +136,29 @@ private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSiz
if(costOfJoin < costIndividual) {
que.poll();
int numColumns = g.getColumns().size();
if(numColumns > 128){
if(numColumns > lastCombineThreshold) {
lastCombine++;
ret.add(g);
}
else{
else {
lastCombine = 0;
que.add(g);
}
}
else{
else {
lastCombine++;
ret.add(l);
}
}
else{
else {
lastCombine++;
ret.add(l);
}

l = que.poll();
groupNr = ret.size() + que.size();
}
while(que.peek() != null){
while(que.peek() != null) {
// empty que
ret.add(l);
l = que.poll();
Expand All @@ -180,7 +183,7 @@ private static double getCost(CompressedSizeInfoColGroup x, ACostEstimate cEst)
return cEst.getCost(x) + x.getColumns().avgOfIndex() / 100000;
}

protected static class PQTask implements Callable<List<CompressedSizeInfoColGroup>> {
protected class PQTask implements Callable<List<CompressedSizeInfoColGroup>> {

private final List<CompressedSizeInfoColGroup> _groups;
private final int _start;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ public static CompressedSizeInfo findCoCodesByPartitioning(AComEst est, Compress
AColumnCoCoder co = createColumnGroupPartitioner(cs.columnPartitioner, est, costEstimator, cs);

// Find out if any of the groups are empty.
final boolean containsEmptyOrConst = containsEmptyOrConst(colInfos);
final boolean containsEmptyConstOrIncompressable = containsEmptyConstOrIncompressable(colInfos);

// if there are no empty or const columns then try cocode algorithms for all columns
if(!containsEmptyOrConst)
if(!containsEmptyConstOrIncompressable)
return co.coCodeColumns(colInfos, k);
else {
// filtered empty groups
final List<IColIndex> emptyCols = new ArrayList<>();
// filtered const groups
final List<IColIndex> constCols = new ArrayList<>();
// incompressable groups
final List<IColIndex> incompressable = new ArrayList<>();
// filtered groups -- in the end starting with all groups
final List<CompressedSizeInfoColGroup> groups = new ArrayList<>();

Expand All @@ -85,13 +87,15 @@ public static CompressedSizeInfo findCoCodesByPartitioning(AComEst est, Compress
emptyCols.add(g.getColumns());
else if(g.isConst())
constCols.add(g.getColumns());
else if(g.isIncompressable())
incompressable.add(g.getColumns());
else
groups.add(g);
}

// overwrite groups.
colInfos.compressionInfo = groups;

// cocode remaining groups
if(!groups.isEmpty()) {
colInfos = co.coCodeColumns(colInfos, k);
Expand All @@ -109,14 +113,19 @@ else if(g.isConst())
colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.CONST));
}

if(incompressable.size() > 0) {
final IColIndex idx = ColIndexFactory.combineIndexes(incompressable);
colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.UNCOMPRESSED));
}

return colInfos;

}
}

private static boolean containsEmptyOrConst(CompressedSizeInfo colInfos) {
private static boolean containsEmptyConstOrIncompressable(CompressedSizeInfo colInfos) {
for(CompressedSizeInfoColGroup g : colInfos.compressionInfo)
if(g.isEmpty() || g.isConst())
if(g.isEmpty() || g.isConst() || g.isIncompressable())
return true;
return false;
}
Expand All @@ -133,9 +142,9 @@ private static AColumnCoCoder createColumnGroupPartitioner(PartitionerType type,
case STATIC:
return new CoCodeStatic(est, costEstimator, cs);
case PRIORITY_QUE:
return new CoCodePriorityQue(est, costEstimator, cs);
return new CoCodePriorityQue(est, costEstimator, cs, 128);
default:
throw new RuntimeException("Unsupported column group partitioner: " + type.toString());
throw new RuntimeException("Unsupported column group partition technique: " + type.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public boolean equals(Object that) {
}

public boolean contains(ColIndexes a, ColIndexes b) {

if(a == null || b == null)
return false;
return _indexes.contains(a._indexes.get(0)) || _indexes.contains(b._indexes.get(0));
return _indexes.contains(a._indexes.get(0)) //
|| _indexes.contains(b._indexes.get(0));
}
}
Loading

0 comments on commit 48de384

Please sign in to comment.