Skip to content

Commit

Permalink
fewer parameters and better memory handling
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvanBrocard committed Nov 9, 2021
1 parent f8fb7d0 commit d25cc51
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 552 deletions.
137 changes: 53 additions & 84 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

#include "kmeans.h"

// #ifndef DPU_BINARY
// #define DPU_BINARY "src/dpu_kmeans/dpu_program/kmeans_dpu_kernel" /**< filename of the binary sent to the kernel */
// #end

/**
* @brief Computes the lowest common multiple of two integers.
*
Expand All @@ -46,33 +42,34 @@ int get_lcm(int n1, int n2)
/**
* @brief Computes the appropriate task size for DPU tasklets.
*
* @param nfeatures Number of features.
* @param npointperdpu Number of points per DPU.
* @param p Algorithm parameters.
* @return The task size in bytes.
*/
unsigned int get_task_size(int nfeatures, unsigned int npointperdpu)
static unsigned int get_task_size(Params *p)
{
unsigned int task_size_in_points;
unsigned int task_size_in_bytes;
unsigned int task_size_in_features;

// how many points we can fit in w_features
unsigned int max_task_size = (WRAM_FEATURES_SIZE / sizeof(int_feature)) / nfeatures;
/* how many points we can fit in w_features */
unsigned int max_task_size = (WRAM_FEATURES_SIZE / sizeof(int_feature)) / p->nfeatures;

// number of tasks as the smallest multiple of NR_TASKLETS higher than npointperdu / max_task_size
unsigned int ntasks = (npointperdpu + max_task_size - 1) / max_task_size;
/* number of tasks as the smallest multiple of NR_TASKLETS higher than npointperdu / max_task_size */
unsigned int ntasks = (p->npointperdpu + max_task_size - 1) / max_task_size;
ntasks = ((ntasks + NR_TASKLETS - 1) / NR_TASKLETS) * NR_TASKLETS;

// task size has to be at least 1
task_size_in_points = (((npointperdpu + ntasks - 1) / ntasks) < max_task_size) ? ((npointperdpu + ntasks - 1) / ntasks) : max_task_size;
/* task size has to be at least 1 */
task_size_in_points = (((p->npointperdpu + ntasks - 1) / ntasks) < max_task_size)
? ((p->npointperdpu + ntasks - 1) / ntasks)
: max_task_size;
if (task_size_in_points == 0)
task_size_in_points = 1;

task_size_in_features = task_size_in_points * nfeatures;
task_size_in_features = task_size_in_points * p->nfeatures;
task_size_in_bytes = task_size_in_features * sizeof(int_feature);

// task size in bytes must be a multiple of 8 for DMA alignment and also a multiple of number of features x byte size of integers
int lcm = get_lcm(sizeof(int_feature) * nfeatures, 8);
/* task size in bytes must be a multiple of 8 for DMA alignment and also a multiple of number of features x byte size of integers */
int lcm = get_lcm(sizeof(int_feature) * p->nfeatures, 8);
task_size_in_bytes = (task_size_in_bytes + lcm - 1) / lcm * lcm;
if (task_size_in_bytes > WRAM_FEATURES_SIZE)
{
Expand All @@ -86,20 +83,19 @@ unsigned int get_task_size(int nfeatures, unsigned int npointperdpu)
/**
* @brief Loads a binary in the DPUs.
*
* @param allset set of all assigned DPUs
* @param p Algorithm parameters.
* @param DPU_BINARY path to the binary
* @param ndpu number of DPUs
*/
void load_kernel(dpu_set *allset, const char *DPU_BINARY, uint32_t *ndpu)
void load_kernel(Params *p, const char *DPU_BINARY)
{
DPU_ASSERT(dpu_alloc(DPU_ALLOCATE_ALL, NULL, allset));
DPU_ASSERT(dpu_get_nr_dpus(*allset, ndpu));
DPU_ASSERT(dpu_load(*allset, DPU_BINARY, NULL));
DPU_ASSERT(dpu_alloc(DPU_ALLOCATE_ALL, NULL, &p->allset));
DPU_ASSERT(dpu_get_nr_dpus(p->allset, &p->ndpu));
DPU_ASSERT(dpu_load(p->allset, DPU_BINARY, NULL));
}

void free_dpus(dpu_set *allset)
void free_dpus(Params *p)
{
DPU_ASSERT(dpu_free(*allset));
DPU_ASSERT(dpu_free(p->allset));
}

/**
Expand All @@ -108,109 +104,85 @@ void free_dpus(dpu_set *allset)
* @return Number of iterations to reach the best RMSE.
*/
int cluster(
uint64_t npoints, /**< [in] number of data points */
uint64_t npadded, /**< [in] number of data points with padding */
int nfeatures, /**< [in] number of attributes for each point */
uint32_t ndpu, /**< [in] number of available DPUs */
Params *p,
float **features_float, /**< [in] array: [npadded][nfeatures] */
int_feature **features_int, /**< [in] array: [npadded][nfeatures] */
int min_nclusters, /**< [in] min to max number of clusters */
int max_nclusters, /**< [in] max number of clusters */
float scale_factor, /**< [in] scale factor used in preprocessing */
float threshold, /**< [in] loop terminating factor */
int *best_nclusters, /**< [out] number between min and max with lowest RMSE */
float ***cluster_centres, /**< [out] [best_nclusters][nfeatures] */
float *min_rmse, /**< [out] minimum RMSE */
int isRMSE, /**< [in] calculate RMSE */
int isOutput, /**< [in] whether or not to print runtime information */
int nloops, /**< [in] number of iteration for each number of clusters */
int *log_iterations, /**< [out] log of the number of iterations */
double *log_time, /**< [out] log of the time taken */
dpu_set *allset) /**< [in] pointer to the set of all assigned DPUs */
double *log_time) /**< [out] log of the time taken */
{
unsigned int nclusters; /* number of clusters k */
unsigned int log_index = 0; /* index of the current nclusters iteration */
int index = 0; /* number of iteration to reach the best RMSE */
float rmse; /* RMSE for each clustering */
uint8_t *membership; /* which cluster a data point belongs to */
float **tmp_cluster_centres; /* hold coordinates of cluster centers */
unsigned int npointperdpu = npadded / ndpu; /* number of points per DPU */
float min_rmse_ref = FLT_MAX; /* reference min_rmse value */
struct timeval cluster_timing; /* clustering time for a given nclusters */
unsigned int nclusters; /* number of clusters k */
unsigned int log_index = 0; /* index of the current nclusters iteration */
int index = 0; /* number of iteration to reach the best RMSE */
float rmse; /* RMSE for each clustering */
float **tmp_cluster_centres; /* hold coordinates of cluster centers */
float min_rmse_ref = FLT_MAX; /* reference min_rmse value */
struct timeval cluster_timing; /* clustering time for a given nclusters */

/* parameters to calculate once here and send to the DPUs. */
unsigned int task_size_in_points;
unsigned int task_size_in_bytes;
unsigned int task_size_in_features;

/* allocate memory for membership */
membership = (uint8_t *)malloc(npadded * sizeof(uint8_t));

/* =============== DPUs initialization =============== */
/* compute the iteration variables for the DPUs */

task_size_in_bytes = get_task_size(nfeatures, npointperdpu);
task_size_in_bytes = get_task_size(p);

/* realign task size in features and points */
task_size_in_features = task_size_in_bytes / sizeof(int_feature);
task_size_in_points = task_size_in_features / nfeatures;
task_size_in_points = task_size_in_features / p->nfeatures;

/* send computation parameters to the DPUs */
DPU_ASSERT(dpu_broadcast_to(*allset, "nfeatures_host", 0, &nfeatures, sizeof(nfeatures), DPU_XFER_DEFAULT));
// DPU_ASSERT(dpu_broadcast_to(*allset, "npoints", 0, &npointperdpu, sizeof(npointperdpu), DPU_XFER_DEFAULT));
DPU_ASSERT(dpu_broadcast_to(p->allset, "nfeatures_host", 0, &p->nfeatures, sizeof(p->nfeatures), DPU_XFER_DEFAULT));

DPU_ASSERT(dpu_broadcast_to(*allset, "task_size_in_points_host", 0, &task_size_in_points, sizeof(task_size_in_points), DPU_XFER_DEFAULT));
DPU_ASSERT(dpu_broadcast_to(*allset, "task_size_in_bytes_host", 0, &task_size_in_bytes, sizeof(task_size_in_bytes), DPU_XFER_DEFAULT));
DPU_ASSERT(dpu_broadcast_to(*allset, "task_size_in_features_host", 0, &task_size_in_features, sizeof(task_size_in_features), DPU_XFER_DEFAULT));
DPU_ASSERT(dpu_broadcast_to(p->allset, "task_size_in_points_host", 0, &task_size_in_points, sizeof(task_size_in_points), DPU_XFER_DEFAULT));
DPU_ASSERT(dpu_broadcast_to(p->allset, "task_size_in_bytes_host", 0, &task_size_in_bytes, sizeof(task_size_in_bytes), DPU_XFER_DEFAULT));
DPU_ASSERT(dpu_broadcast_to(p->allset, "task_size_in_features_host", 0, &task_size_in_features, sizeof(task_size_in_features), DPU_XFER_DEFAULT));

if (isOutput)
if (p->isOutput)
{
printf("points per DPU : %d\n", npointperdpu);
printf("tasks per DPU: %d\n", npointperdpu / task_size_in_points);
printf("points per DPU : %d\n", p->npointperdpu);
printf("tasks per DPU: %d\n", p->npointperdpu / task_size_in_points);
printf("task size in points : %d\n", task_size_in_points);
printf("task size in bytes : %d\n", task_size_in_bytes);
}

/* allocate memory for device communication */
allocateMemory(npadded, ndpu);
/* =============== end DPUs initialization =============== */

if (isOutput)
if (p->isOutput)
printf("\nStarting calculation\n\n");

/* sweep k from min to max_nclusters to find the best number of clusters */
for (nclusters = min_nclusters; nclusters <= max_nclusters; nclusters++)
for (nclusters = p->min_nclusters; nclusters <= p->max_nclusters; nclusters++)
{
int total_iterations = 0;

if (nclusters > npoints)
if (nclusters > p->npoints)
break; /* cannot have more clusters than points */

cluster_timing.tv_sec = 0;
cluster_timing.tv_usec = 0;

allocateClusters(p, nclusters);

/* iterate nloops times for each number of clusters */
for (int i_init = 0; i_init < nloops; i_init++)
for (int i_init = 0; i_init < p->nloops; i_init++)
{
struct timeval tic, toc;
int iterations_counter = 0;
gettimeofday(&tic, NULL); // `timing = omp_get_wtime();` returns absurd values

gettimeofday(&tic, NULL); // `timing = omp_get_wtime();` returned absurd values

tmp_cluster_centres = kmeans_clustering(
p,
features_int,
features_float,
nfeatures,
npoints,
npadded,
nclusters,
ndpu,
scale_factor,
threshold,
isOutput,
membership,
&iterations_counter,
i_init,
allset);
i_init);

gettimeofday(&toc, NULL);
cluster_timing.tv_sec += toc.tv_sec - tic.tv_sec;
Expand All @@ -231,16 +203,15 @@ int cluster(
// printf("\n");

/* find the number of clusters with the best RMSE */
if (isRMSE || min_nclusters != max_nclusters || nloops > 1)
if (p->isRMSE || p->min_nclusters != p->max_nclusters || p->nloops > 1)
{
rmse = rms_err(
p,
features_float,
nfeatures,
npoints,
tmp_cluster_centres,
nclusters);

if (isOutput)
if (p->isOutput)
printf("RMSE for nclusters = %d : %f\n", nclusters, rmse);

if (rmse < min_rmse_ref)
Expand Down Expand Up @@ -269,17 +240,15 @@ int cluster(
}
}

deallocateClusters();

/* logging number of iterations and time taken */
double cluster_time = ((double)(cluster_timing.tv_sec * 1000000 + cluster_timing.tv_usec)) / 1000000;
log_iterations[log_index] = total_iterations;
log_time[log_index] = cluster_time;
log_index++;
}

deallocateMemory();

free(membership);

/* DEBUG: print best clusters */
// printf("best nclusters: %d\n", *best_nclusters);
// printf("trying\n");
Expand Down
1 change: 1 addition & 0 deletions src/dpu_kmeans/_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _kmeans(self):
False,
self.verbose,
self.n_init,
self.max_iter,
log_iterations,
log_time,
)
Expand Down
Loading

0 comments on commit d25cc51

Please sign in to comment.