In this repository we provide Matlab implementation of coreset algorithms, used for evaluation in following paper:
k-Means for Streaming and Distributed Big Sparse Data. Artem Barger, and Dan Feldman. Proceedings of the 2016 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2016.
Current project consist of three main modules:
- kmeans-coreset - the Java based implementation of coresets algorithms: 1.1. Uniform coreset 1.2. Non Uniform coreset (Sensitivity) 1.3. Deterministic coreset construction suggested in the paper above
- coreset-communication - definition of the protocol for distributed computation of the coreset
- coreset-server - implementation of server and client side of the distributed protocol which enables coreset computation.
-
All algorithms works with
SparseWeightableVector
which is an implementation of weighted vector inR^d
, extends and implements following interfaces:- SparseSample
public interface SparseSample extends SparseWeightable { void setProbability(final double prob); double getProbability(); }
- SparseWeightable
public interface SparseWeightable { void setWeight(final double weight); double getWeight(); }
- SparseClusterable
public interface SparseClusterable { RealVector getVector(); }
SparseCoresetAlgorithm - is the interface which defines a main API for
coreset algorithm, as follows:
public interface SparseCoresetAlgorithm extends Serializable { List<SparseWeightableVector> takeSample(final List<SparseWeightableVector> pointset); }
where
takeSample
is the method which actually represents coreset computation, it receives a list of weighted poinst and returns new weighted list of points reduced from the original list.We provide three different implementations of this interface:
- Uniform coreset -
SparseUniformCoreset
/** * @param sampleSize - coreset size to sample */ public SparseUniformCoreset(final int t) { this.t = t; }
Constructor which receives parameter of the coreset size to sample.
- Non uniform coreset -
SparseNonUniformCoreset
/** * @param seedingAlgorithm - algorithm used to seed and compute sensitivity * @param sampleSize - coreset size to sample */ public SparseNonUniformCoreset(final SparseSeedingAlgorithm seedingAlgorithm, final int sampleSize) { this.sampleSize = sampleSize; // Default is the k-means++ algorithm which provides (1, log(n)) approximation for k-means problem. this.seedingAlgorithm = seedingAlgorithm; }
Constructor which receives coreset size to sample and intial seeding algorithm (
SparseSeedingAlgorithm
) used for sensitivity computation.public interface SparseSeedingAlgorithm { /** * Seed initial cluster centers. * * @param vectors vectors to select initial centers from. Method should not change input parameter. * @return list of the cluster centers. */ List<SparseCentroidCluster> seed(final List<SparseWeightableVector> vectors); }
- Our algorithm -
SparseKmeansCoresetAlgorithm
/** * @param sampleSize - coreset size to sample */ public SparseKmeansCoresetAlgorithm(int sampleSize) { this.sampleSize = sampleSize; }
Constructor which receives as a parameter coreset size to compute ("sample").
-
Following class defines the streaming algorithm - encapsulated abstraction of the merge-and-reduce tree which allows us to compute coreset in streaming seting.
public class StreamingAlgorithm { private Map<Integer, List<SparseWeightableVector>> coresetTree = Maps.newHashMap(); private SparseCoresetAlgorithm coresetAlgorithm; public StreamingAlgorithm(SparseCoresetAlgorithm coresetAlgorithm) { this.coresetAlgorithm = coresetAlgorithm; } public void addDataset(final List<SparseWeightableVector> dataset) { List<SparseWeightableVector> coreset = coresetAlgorithm.takeSample(dataset); int treeLevel = 0; List<SparseWeightableVector> leaf = coresetTree.get(treeLevel); while (leaf != null) { coresetTree.remove(treeLevel++); coreset.addAll(leaf); coreset = coresetAlgorithm.takeSample(coreset); leaf = coresetTree.get(treeLevel); } coresetTree.put(treeLevel, coreset); } public List<SparseWeightableVector> getTotalCoreset() { if (coresetTree.size() == 1) { for (List<SparseWeightableVector> coreset : coresetTree.values()) { return coreset; } } final List<SparseWeightableVector> treeCoresetView = Lists.newArrayList(); for (Map.Entry<Integer, List<SparseWeightableVector>> each : coresetTree.entrySet()) { treeCoresetView.addAll(each.getValue()); } return coresetAlgorithm.takeSample(treeCoresetView); } }
It accepts concrete implementation of the
SparseCoresetAlgorithm
interface and proceeds with coreset tree construction with streaming batch of points. -
Following defines Apache Thrift structure and services for distributed coreset computation:
namespace java univ.ml.distributed.coreset enum CoresetAlgorithm { UNIFORM, NON_UNIFORM, KMEANS_PLUS_PLUS } struct CoresetPoint { 1: map<i32, double> coords, 2: i32 dim } struct CoresetWeightedPoint { 1: CoresetPoint point, 2: double weight, } struct CoresetPoints { 1: i64 id, 2: list<CoresetWeightedPoint> points } service CoresetService { bool initialize(1:i32 k, 2:i32 sampleSize, 3:CoresetAlgorithm algorithm) oneway void compressPoints(1:CoresetPoints message) CoresetPoints getTotalCoreset() double getEnergy(1: CoresetPoints centers, 2: CoresetPoints points) bool isDone() }
-
CoresetServiceHandler
implements the logic of theCoresetService
define with Apache Thrift above, it's capable to serve incomming batches of streamed data points and maintain coreset merge-and-reduce tree according to the initialization parameters.Protocol definition allows to initialize service handler with three different implementations of the coreset algorithm, the k of kmeans algorithm parameters and the actual coreset size.
Once initialized it will allow to send weithed points from
R^d
to compress them into coreset and construct merge-and-reduce tree. After all it also could be used to compute energy function in the distributed manner, leveraginggetEnergy
method, which receives centers and points and computes sum of squared distances to the provided centers.
In order to build the project please run:
mvn clean install
Compilation will produce two jar files in coreset-serer/target folder:
1.1. client.jar - the client application to distribute points accross servers 1.2. server.jar - the server application to receive batch of streamed points, compute and construct coreset tree.
-
To execute server code you need to run:
java -jar server.jar 9999
where 9999 stands for TCP port server will bind to.
Next to execute client you need to run:
java -jar client.jar -k 32 -algorithm KMEANS_PLUS_PLUS \ -sampleSize 512 -batchSize 1024 \ -hosts serverIP1:9999,serverIP2:9999,...,serverIPN
Where:
- k - kmeans clustering parameter
- algorithm - one of the available agorithms options: UNIFORM, NON_UNIFORM, KMEANS_PLUS_PLUS
- sampleSize - the coreset size to sample
- batchSize - number of points to send in each batch to the remote server
- hosts - comma separated list of the remote hosts to distributed points to
If you'd like to use this implementation, please reference original paper, any feedback send to Artem Barger ([email protected])
The software is released under the MIT License as detailed in kmeans.pyx.