Skip to content

Commit

Permalink
Controlling number of exemplars in ND
Browse files Browse the repository at this point in the history
When number of exemplars found is more than `nd_bins` do the following:
- find the closest exemplar to the exemplar #1;
- adjust `delta` as `delta += min_distance`;
- if the distance between two exemplars is less than new `delta` merge
them and store the mapping info;
- adjust exemplar's info for all the relevant members at the end.
  • Loading branch information
Oleksiy Kononenko committed Aug 20, 2018
1 parent 8237ac7 commit 5385071
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 33 deletions.
120 changes: 93 additions & 27 deletions c/extras/aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,30 @@


PyObject* aggregate(PyObject*, PyObject* args) {
int32_t n_bins, nx_bins, ny_bins, max_dimensions;
int32_t n_bins, nx_bins, ny_bins, nd_bins, max_dimensions;
unsigned int seed;
PyObject* dt;

if (!PyArg_ParseTuple(args, "OiiiiI:aggregate", &dt, &n_bins, &nx_bins, &ny_bins,
&max_dimensions, &seed)) return nullptr;
if (!PyArg_ParseTuple(args, "OiiiiiI:aggregate", &dt, &n_bins, &nx_bins, &ny_bins,
&nd_bins, &max_dimensions, &seed)) return nullptr;

DataTable* dt_in = PyObj(dt).as_datatable();
DataTablePtr dt_members = nullptr;


Aggregator agg(n_bins, nx_bins, ny_bins, max_dimensions, seed);
Aggregator agg(n_bins, nx_bins, ny_bins, nd_bins, max_dimensions, seed);
dt_members = agg.aggregate(dt_in);

return pydatatable::wrap(dt_members.release());
}


Aggregator::Aggregator(int32_t n_bins_in, int32_t nx_bins_in, int32_t ny_bins_in,
int32_t max_dimensions_in, unsigned int seed_in) :
int32_t nd_bins_in, int32_t max_dimensions_in, unsigned int seed_in) :
n_bins(n_bins_in),
nx_bins(nx_bins_in),
ny_bins(ny_bins_in),
nd_bins(nd_bins_in),
max_dimensions(max_dimensions_in),
seed(seed_in)
{
Expand Down Expand Up @@ -107,14 +108,13 @@ void Aggregator::aggregate_exemplars(DataTable* dt_exemplars, DataTablePtr& dt_m
d_counts[i] = offsets[i+1] - offsets[i];
}

// Replacing group ids with the exemplar ids for 1D and 2D aggregations
if (dt_exemplars->ncols < 3) {
#pragma omp parallel for schedule(dynamic)
for (size_t i = 0; i < gb_members.ngroups(); ++i) {
for (size_t j = 0; j < static_cast<size_t>(d_counts[i]); ++j) {
size_t member_shift = static_cast<size_t>(offsets[i]) + j;
d_members[ri_members_indices[member_shift]] = static_cast<int32_t>(i);
}
// Replacing group ids with the actual exemplar ids for 1D and 2D aggregations
// After re-mapping also need this for ND
#pragma omp parallel for schedule(dynamic)
for (size_t i = 0; i < gb_members.ngroups(); ++i) {
for (size_t j = 0; j < static_cast<size_t>(d_counts[i]); ++j) {
size_t member_shift = static_cast<size_t>(offsets[i]) + j;
d_members[ri_members_indices[member_shift]] = static_cast<int32_t>(i);
}
}
dt_members->columns[0]->get_stats()->reset();
Expand Down Expand Up @@ -297,24 +297,26 @@ void Aggregator::group_2d_mixed(bool cont_index, DataTablePtr& dt_exemplars, Dat
for (int32_t j = offsets_cat[i]; j < offsets_cat[i+1]; ++j) {
double v_cont = ISNA<double>(d_cont[gi_cat[j]])? 0 : d_cont[gi_cat[j]];
d_groups[gi_cat[j]] = group_cat_id +
static_cast<int32_t>(normx_factor * v_cont + normx_shift);
static_cast<int32_t>(normx_factor * v_cont + normx_shift);
}
}
}


// Leland's algorithm for N-dimensional aggregation, see [1-2] for more details
// N-dimensional aggregation, see [1-3] for more details
// [1] https://www.cs.uic.edu/~wilkinson/Publications/outliers.pdf
// [2] https://github.com/h2oai/vis-data-server/blob/master/library/src/main/java/com/h2o/data/Aggregator.java
// [3] https://mathoverflow.net/questions/308018/coverage-of-balls-on-random-points-in-euclidean-space?answertab=active#tab-top
void Aggregator::group_nd(DataTablePtr& dt_exemplars, DataTablePtr& dt_members) {
auto d = static_cast<int32_t>(dt_exemplars->ncols);
int64_t ndims = std::min(max_dimensions, d);
double* exemplar = new double[ndims];
double* member = new double[ndims];
double* pmatrix = nullptr;
std::vector<double*> exemplars;
std::vector<ex> exemplars;
std::vector<int64_t> ids;
double delta, distance = 0.0;
auto d_counts = static_cast<int32_t*>(dt_members->columns[0]->data_w());
auto d_members = static_cast<int32_t*>(dt_members->columns[0]->data_w());
double radius2 = (d / 6.0) - 1.744 * sqrt(7.0 * d / 180.0);
double radius = (d > 4)? .5 * sqrt(radius2) : .5 / pow(100.0, 1.0 / d);

Expand All @@ -327,33 +329,97 @@ void Aggregator::group_nd(DataTablePtr& dt_exemplars, DataTablePtr& dt_members)
}

delta = radius * radius;
exemplars.push_back(exemplar);
d_counts[0] = 0;

ex e = {0, exemplar};
exemplars.push_back(e);
ids.push_back(e.id);
d_members[0] = static_cast<int32_t>(e.id);

for (int32_t i = 1; i < dt_exemplars->nrows; ++i) {
if (dt_exemplars->ncols > max_dimensions) project_row(dt_exemplars, member, i, pmatrix);
else normalize_row(dt_exemplars, member, i);

for (size_t exemplar_id = 0; exemplar_id < exemplars.size(); ++exemplar_id) {
distance = calculate_distance(member, exemplars[exemplar_id], ndims, delta);
for (size_t j = 0; j < exemplars.size(); ++j) {
distance = calculate_distance(member, exemplars[j].coords, ndims, delta);
if (distance < delta) {
d_counts[i] = static_cast<int32_t>(exemplar_id);
d_members[i] = static_cast<int32_t>(exemplars[j].id);
break;
}
}

if (distance >= delta) {
d_counts[i] = static_cast<int32_t>(exemplars.size());
exemplars.push_back(member);
e = (ex) {static_cast<int64_t>(ids.size()), member};
exemplars.push_back(e);
ids.push_back(e.id);
d_members[i] = static_cast<int32_t>(e.id);
member = new double[ndims];
if (exemplars.size() > static_cast<size_t>(nd_bins)) {
adjust_delta(delta, exemplars, ids, ndims);
}
}
}

adjust_members(ids, dt_members);

delete[] pmatrix;
delete[] member;
#pragma omp parallel for schedule(static)
for (size_t i= 0; i < exemplars.size(); ++i) {
delete[] exemplars[i];
delete[] exemplars[i].coords;
}
}


void Aggregator::adjust_members(std::vector<int64_t>& ids, DataTablePtr& dt_members) {
auto d_members = static_cast<int32_t*>(dt_members->columns[0]->data_w());
size_t* map = new size_t[ids.size()];

#pragma omp parallel for schedule(static)
for (size_t i = 0; i < ids.size(); ++i) {
if (ids[i] == static_cast<int64_t>(i)) {
map[i] = i;
} else {
map[i] = calculate_map(ids, i);
}
}

#pragma omp parallel for schedule(static)
for (int64_t i = 0; i < dt_members->nrows; ++i) {
d_members[i] = static_cast<int32_t>(map[d_members[i]]);
}
}


size_t Aggregator::calculate_map(std::vector<int64_t>& ids, size_t id) {
if (id == static_cast<size_t>(ids[id])){
return id;
} else {
return calculate_map(ids, static_cast<size_t>(ids[id]));
}
}


void Aggregator::adjust_delta(double& delta, std::vector<ex>& exemplars, std::vector<int64_t>& ids, int64_t ndims) {
double min_distance = std::numeric_limits<double>::max();

for (auto it1 = exemplars.begin(); it1 < exemplars.end() - 1; ++it1) {
std::vector<ex>::iterator min_it = exemplars.end();
for (auto it2 = it1 + 1; it2 < exemplars.end(); ++it2) {
double distance = calculate_distance((*it1).coords, (*it2).coords, ndims, delta, it1 != exemplars.begin());
if (distance < min_distance ) {
if (it1 == exemplars.begin()){
min_distance = distance;
delta += min_distance;
}
min_it = it2;
}
}

if (min_it != exemplars.end()) {
ids[static_cast<size_t>((*min_it).id)] = (*it1).id;
delete[] (*min_it).coords;
exemplars.erase(min_it);
}
}
}

Expand All @@ -375,15 +441,15 @@ void Aggregator::adjust_radius(DataTablePtr& dt_exemplars, double& radius) {
}


double Aggregator::calculate_distance(double* e1, double* e2, int64_t ndims, double delta) {
double Aggregator::calculate_distance(double* e1, double* e2, int64_t ndims, double delta, bool early_exit /*=true*/) {
double sum = 0.0;
int32_t n = 0;

for (int i = 0; i < ndims; ++i) {
if (ISNA<double>(e1[i]) || ISNA<double>(e2[i])) continue;
++n;
sum += (e1[i] - e2[i]) * (e1[i] - e2[i]);
if (sum > delta) return sum;
if (early_exit && sum > delta) return sum;
}

return sum * ndims / n;
Expand Down
16 changes: 12 additions & 4 deletions c/extras/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
#include <random>
#include "py_datatable.h"

struct ex {
int64_t id;
double* coords;
};


class Aggregator {
public:
Aggregator(int32_t, int32_t, int32_t, int32_t, unsigned int);
Aggregator(int32_t, int32_t, int32_t, int32_t, int32_t, unsigned int);
DataTablePtr aggregate(DataTable*);
static constexpr double epsilon = 1.0e-15;
static void set_norm_coeffs(double&, double&, double, double, int32_t);
Expand All @@ -25,6 +30,7 @@ class Aggregator {
int32_t n_bins;
int32_t nx_bins;
int32_t ny_bins;
int32_t nd_bins;
int32_t max_dimensions;
unsigned int seed;

Expand All @@ -38,9 +44,11 @@ class Aggregator {
void group_2d_mixed(bool, DataTablePtr&, DataTablePtr&);
void aggregate_exemplars(DataTable*, DataTablePtr&);


size_t calculate_map(std::vector<int64_t>&, size_t);
void adjust_members(std::vector<int64_t>&, DataTablePtr&);
void adjust_delta(double&, std::vector<ex>&, std::vector<int64_t>&, int64_t);
void normalize_row(DataTablePtr&, double*, int32_t);
double calculate_distance(double*, double*, int64_t, double);
double calculate_distance(double*, double*, int64_t, double, bool early_exit=true);
void adjust_radius(DataTablePtr&, double&);
double* generate_pmatrix(DataTablePtr&);
void project_row(DataTablePtr&, double*, int32_t, double*);
Expand All @@ -49,5 +57,5 @@ class Aggregator {

DECLARE_FUNCTION(
aggregate,
"aggregate(self, n_bins=500, nx_bins=50, ny_bins=50, max_dimensions=25, seed=0)\n\n",
"aggregate(self, n_bins=500, nx_bins=50, ny_bins=50, nd_bins = 500, max_dimensions=25, seed=0)\n\n",
dt_EXTRAS_AGGREGATOR_cc)
4 changes: 2 additions & 2 deletions datatable/extras/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datatable import Frame
from datatable.lib import core

def aggregate(self, n_bins=500, nx_bins=50, ny_bins=50, max_dimensions=25, seed=0):
def aggregate(self, n_bins=500, nx_bins=50, ny_bins=50, nd_bins=500, max_dimensions=50, seed=0):
"""
Aggregate datatable in-place.
Expand All @@ -31,7 +31,7 @@ def aggregate(self, n_bins=500, nx_bins=50, ny_bins=50, max_dimensions=25, seed=
with the number of members for each exemplar. The function returns
a new one-column datatable that contains exemplar_ids for each of the original rows.
"""
dt_members = core.aggregate(self._dt, n_bins, nx_bins, ny_bins, max_dimensions, seed)
dt_members = core.aggregate(self._dt, n_bins, nx_bins, ny_bins, nd_bins, max_dimensions, seed)
names_exemplars = self.names + ("count",)
names_members = ("exemplar_id")
self.__init__(self.internal, names_exemplars)
Expand Down

0 comments on commit 5385071

Please sign in to comment.