diff --git a/src/cluster.c b/src/cluster.c index 05a260c..7b1a74e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -17,10 +17,6 @@ #include "kmeans.h" -// #ifndef DPU_BINARY -// #define DPU_BINARY "src/dpu_kmeans/dpu_program/kmeans_dpu_kernel" /**< filename of the binary sent to the kernel */ -// #end - /** * @brief Computes the lowest common multiple of two integers. * @@ -46,33 +42,34 @@ int get_lcm(int n1, int n2) /** * @brief Computes the appropriate task size for DPU tasklets. * - * @param nfeatures Number of features. - * @param npointperdpu Number of points per DPU. + * @param p Algorithm parameters. * @return The task size in bytes. */ -unsigned int get_task_size(int nfeatures, unsigned int npointperdpu) +static unsigned int get_task_size(Params *p) { unsigned int task_size_in_points; unsigned int task_size_in_bytes; unsigned int task_size_in_features; - // how many points we can fit in w_features - unsigned int max_task_size = (WRAM_FEATURES_SIZE / sizeof(int_feature)) / nfeatures; + /* how many points we can fit in w_features */ + unsigned int max_task_size = (WRAM_FEATURES_SIZE / sizeof(int_feature)) / p->nfeatures; - // number of tasks as the smallest multiple of NR_TASKLETS higher than npointperdu / max_task_size - unsigned int ntasks = (npointperdpu + max_task_size - 1) / max_task_size; + /* number of tasks as the smallest multiple of NR_TASKLETS higher than npointperdu / max_task_size */ + unsigned int ntasks = (p->npointperdpu + max_task_size - 1) / max_task_size; ntasks = ((ntasks + NR_TASKLETS - 1) / NR_TASKLETS) * NR_TASKLETS; - // task size has to be at least 1 - task_size_in_points = (((npointperdpu + ntasks - 1) / ntasks) < max_task_size) ? ((npointperdpu + ntasks - 1) / ntasks) : max_task_size; + /* task size has to be at least 1 */ + task_size_in_points = (((p->npointperdpu + ntasks - 1) / ntasks) < max_task_size) + ? ((p->npointperdpu + ntasks - 1) / ntasks) + : max_task_size; if (task_size_in_points == 0) task_size_in_points = 1; - task_size_in_features = task_size_in_points * nfeatures; + task_size_in_features = task_size_in_points * p->nfeatures; task_size_in_bytes = task_size_in_features * sizeof(int_feature); - // task size in bytes must be a multiple of 8 for DMA alignment and also a multiple of number of features x byte size of integers - int lcm = get_lcm(sizeof(int_feature) * nfeatures, 8); + /* task size in bytes must be a multiple of 8 for DMA alignment and also a multiple of number of features x byte size of integers */ + int lcm = get_lcm(sizeof(int_feature) * p->nfeatures, 8); task_size_in_bytes = (task_size_in_bytes + lcm - 1) / lcm * lcm; if (task_size_in_bytes > WRAM_FEATURES_SIZE) { @@ -86,20 +83,19 @@ unsigned int get_task_size(int nfeatures, unsigned int npointperdpu) /** * @brief Loads a binary in the DPUs. * - * @param allset set of all assigned DPUs + * @param p Algorithm parameters. * @param DPU_BINARY path to the binary - * @param ndpu number of DPUs */ -void load_kernel(dpu_set *allset, const char *DPU_BINARY, uint32_t *ndpu) +void load_kernel(Params *p, const char *DPU_BINARY) { - DPU_ASSERT(dpu_alloc(DPU_ALLOCATE_ALL, NULL, allset)); - DPU_ASSERT(dpu_get_nr_dpus(*allset, ndpu)); - DPU_ASSERT(dpu_load(*allset, DPU_BINARY, NULL)); + DPU_ASSERT(dpu_alloc(DPU_ALLOCATE_ALL, NULL, &p->allset)); + DPU_ASSERT(dpu_get_nr_dpus(p->allset, &p->ndpu)); + DPU_ASSERT(dpu_load(p->allset, DPU_BINARY, NULL)); } -void free_dpus(dpu_set *allset) +void free_dpus(Params *p) { - DPU_ASSERT(dpu_free(*allset)); + DPU_ASSERT(dpu_free(p->allset)); } /** @@ -108,109 +104,85 @@ void free_dpus(dpu_set *allset) * @return Number of iterations to reach the best RMSE. */ int cluster( - uint64_t npoints, /**< [in] number of data points */ - uint64_t npadded, /**< [in] number of data points with padding */ - int nfeatures, /**< [in] number of attributes for each point */ - uint32_t ndpu, /**< [in] number of available DPUs */ + Params *p, float **features_float, /**< [in] array: [npadded][nfeatures] */ int_feature **features_int, /**< [in] array: [npadded][nfeatures] */ - int min_nclusters, /**< [in] min to max number of clusters */ - int max_nclusters, /**< [in] max number of clusters */ - float scale_factor, /**< [in] scale factor used in preprocessing */ - float threshold, /**< [in] loop terminating factor */ int *best_nclusters, /**< [out] number between min and max with lowest RMSE */ float ***cluster_centres, /**< [out] [best_nclusters][nfeatures] */ float *min_rmse, /**< [out] minimum RMSE */ - int isRMSE, /**< [in] calculate RMSE */ - int isOutput, /**< [in] whether or not to print runtime information */ - int nloops, /**< [in] number of iteration for each number of clusters */ int *log_iterations, /**< [out] log of the number of iterations */ - double *log_time, /**< [out] log of the time taken */ - dpu_set *allset) /**< [in] pointer to the set of all assigned DPUs */ + double *log_time) /**< [out] log of the time taken */ { - unsigned int nclusters; /* number of clusters k */ - unsigned int log_index = 0; /* index of the current nclusters iteration */ - int index = 0; /* number of iteration to reach the best RMSE */ - float rmse; /* RMSE for each clustering */ - uint8_t *membership; /* which cluster a data point belongs to */ - float **tmp_cluster_centres; /* hold coordinates of cluster centers */ - unsigned int npointperdpu = npadded / ndpu; /* number of points per DPU */ - float min_rmse_ref = FLT_MAX; /* reference min_rmse value */ - struct timeval cluster_timing; /* clustering time for a given nclusters */ + unsigned int nclusters; /* number of clusters k */ + unsigned int log_index = 0; /* index of the current nclusters iteration */ + int index = 0; /* number of iteration to reach the best RMSE */ + float rmse; /* RMSE for each clustering */ + float **tmp_cluster_centres; /* hold coordinates of cluster centers */ + float min_rmse_ref = FLT_MAX; /* reference min_rmse value */ + struct timeval cluster_timing; /* clustering time for a given nclusters */ /* parameters to calculate once here and send to the DPUs. */ unsigned int task_size_in_points; unsigned int task_size_in_bytes; unsigned int task_size_in_features; - /* allocate memory for membership */ - membership = (uint8_t *)malloc(npadded * sizeof(uint8_t)); - /* =============== DPUs initialization =============== */ /* compute the iteration variables for the DPUs */ - task_size_in_bytes = get_task_size(nfeatures, npointperdpu); + task_size_in_bytes = get_task_size(p); /* realign task size in features and points */ task_size_in_features = task_size_in_bytes / sizeof(int_feature); - task_size_in_points = task_size_in_features / nfeatures; + task_size_in_points = task_size_in_features / p->nfeatures; /* send computation parameters to the DPUs */ - DPU_ASSERT(dpu_broadcast_to(*allset, "nfeatures_host", 0, &nfeatures, sizeof(nfeatures), DPU_XFER_DEFAULT)); - // DPU_ASSERT(dpu_broadcast_to(*allset, "npoints", 0, &npointperdpu, sizeof(npointperdpu), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_broadcast_to(p->allset, "nfeatures_host", 0, &p->nfeatures, sizeof(p->nfeatures), DPU_XFER_DEFAULT)); - DPU_ASSERT(dpu_broadcast_to(*allset, "task_size_in_points_host", 0, &task_size_in_points, sizeof(task_size_in_points), DPU_XFER_DEFAULT)); - DPU_ASSERT(dpu_broadcast_to(*allset, "task_size_in_bytes_host", 0, &task_size_in_bytes, sizeof(task_size_in_bytes), DPU_XFER_DEFAULT)); - DPU_ASSERT(dpu_broadcast_to(*allset, "task_size_in_features_host", 0, &task_size_in_features, sizeof(task_size_in_features), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_broadcast_to(p->allset, "task_size_in_points_host", 0, &task_size_in_points, sizeof(task_size_in_points), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_broadcast_to(p->allset, "task_size_in_bytes_host", 0, &task_size_in_bytes, sizeof(task_size_in_bytes), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_broadcast_to(p->allset, "task_size_in_features_host", 0, &task_size_in_features, sizeof(task_size_in_features), DPU_XFER_DEFAULT)); - if (isOutput) + if (p->isOutput) { - printf("points per DPU : %d\n", npointperdpu); - printf("tasks per DPU: %d\n", npointperdpu / task_size_in_points); + printf("points per DPU : %d\n", p->npointperdpu); + printf("tasks per DPU: %d\n", p->npointperdpu / task_size_in_points); printf("task size in points : %d\n", task_size_in_points); printf("task size in bytes : %d\n", task_size_in_bytes); } - /* allocate memory for device communication */ - allocateMemory(npadded, ndpu); /* =============== end DPUs initialization =============== */ - if (isOutput) + if (p->isOutput) printf("\nStarting calculation\n\n"); /* sweep k from min to max_nclusters to find the best number of clusters */ - for (nclusters = min_nclusters; nclusters <= max_nclusters; nclusters++) + for (nclusters = p->min_nclusters; nclusters <= p->max_nclusters; nclusters++) { int total_iterations = 0; - if (nclusters > npoints) + if (nclusters > p->npoints) break; /* cannot have more clusters than points */ cluster_timing.tv_sec = 0; cluster_timing.tv_usec = 0; + allocateClusters(p, nclusters); + /* iterate nloops times for each number of clusters */ - for (int i_init = 0; i_init < nloops; i_init++) + for (int i_init = 0; i_init < p->nloops; i_init++) { struct timeval tic, toc; int iterations_counter = 0; - gettimeofday(&tic, NULL); // `timing = omp_get_wtime();` returns absurd values + + gettimeofday(&tic, NULL); // `timing = omp_get_wtime();` returned absurd values tmp_cluster_centres = kmeans_clustering( + p, features_int, features_float, - nfeatures, - npoints, - npadded, nclusters, - ndpu, - scale_factor, - threshold, - isOutput, - membership, &iterations_counter, - i_init, - allset); + i_init); gettimeofday(&toc, NULL); cluster_timing.tv_sec += toc.tv_sec - tic.tv_sec; @@ -231,16 +203,15 @@ int cluster( // printf("\n"); /* find the number of clusters with the best RMSE */ - if (isRMSE || min_nclusters != max_nclusters || nloops > 1) + if (p->isRMSE || p->min_nclusters != p->max_nclusters || p->nloops > 1) { rmse = rms_err( + p, features_float, - nfeatures, - npoints, tmp_cluster_centres, nclusters); - if (isOutput) + if (p->isOutput) printf("RMSE for nclusters = %d : %f\n", nclusters, rmse); if (rmse < min_rmse_ref) @@ -269,6 +240,8 @@ int cluster( } } + deallocateClusters(); + /* logging number of iterations and time taken */ double cluster_time = ((double)(cluster_timing.tv_sec * 1000000 + cluster_timing.tv_usec)) / 1000000; log_iterations[log_index] = total_iterations; @@ -276,10 +249,6 @@ int cluster( log_index++; } - deallocateMemory(); - - free(membership); - /* DEBUG: print best clusters */ // printf("best nclusters: %d\n", *best_nclusters); // printf("trying\n"); diff --git a/src/dpu_kmeans/_kmeans.py b/src/dpu_kmeans/_kmeans.py index 248f876..d10d6cc 100644 --- a/src/dpu_kmeans/_kmeans.py +++ b/src/dpu_kmeans/_kmeans.py @@ -51,6 +51,7 @@ def _kmeans(self): False, self.verbose, self.n_init, + self.max_iter, log_iterations, log_time, ) diff --git a/src/kmeans.c b/src/kmeans.c index 80fded6..c389e71 100644 --- a/src/kmeans.c +++ b/src/kmeans.c @@ -56,15 +56,10 @@ static void strip_ext(char *fname) * @brief Reads a binary input file from disk. */ void read_bin_input( + Params *p, /**< Algorithm parameters */ const char *filename, /**< [in] The file name. */ - uint64_t *npoints_out, /**< [out] Number of points. */ - uint64_t *npadded_out, /**< [out] Number of points with padding added. */ - int *nfeatures_out, /**< [out] Number of features. */ - uint32_t ndpu, /**< [in] Number of available DPUs. */ float ***features_out) /**< [out] Vector of features. */ { - uint64_t npoints, npadded; - int nfeatures; float **features; FILE *infile; @@ -75,17 +70,17 @@ void read_bin_input( } /* get nfeatures and npoints */ - fread(&npoints, sizeof(uint64_t), 1, infile); - fread(&nfeatures, sizeof(int), 1, infile); + fread(&p->npoints, sizeof(uint64_t), 1, infile); + fread(&p->nfeatures, sizeof(int), 1, infile); /* rounding the size of the input to the smallest multiple of 8*ndpu larger than npoints */ - npadded = ((npoints + 8 * ndpu - 1) / (8 * ndpu)) * 8 * ndpu; + p->npadded = ((p->npoints + 8 * p->ndpu - 1) / (8 * p->ndpu)) * 8 * p->ndpu; /* allocate space for features[][] and read attributes of all objects */ - features = (float **)malloc(npadded * sizeof(float *)); - features[0] = (float *)malloc(npadded * nfeatures * sizeof(float)); - for (int ipoint = 1; ipoint < npadded; ipoint++) - features[ipoint] = features[ipoint - 1] + nfeatures; + features = (float **)malloc(p->npadded * sizeof(*features)); + features[0] = (float *)malloc(p->npadded * p->nfeatures * sizeof(**features)); + for (int ipoint = 1; ipoint < p->npadded; ipoint++) + features[ipoint] = features[ipoint - 1] + p->nfeatures; /* checking that we managed to assign enough memory */ if (!features[0]) @@ -94,31 +89,22 @@ void read_bin_input( exit(EXIT_FAILURE); } - fread(features[0], sizeof(float), npoints * nfeatures, infile); + fread(features[0], sizeof(float), p->npoints * p->nfeatures, infile); fclose(infile); *features_out = features; - *npoints_out = npoints; - *npadded_out = npadded; - *nfeatures_out = nfeatures; } /** * @brief Reads a text input file from disk. */ void read_txt_input( + Params *p, /**< Algorithm parameters */ const char *filename, /**< [in] The file name. */ - uint64_t *npoints_out, /**< [out] Number of points. */ - uint64_t *npadded_out, /**< [out] Number of points with padding added. */ - int *nfeatures_out, /**< [out] Number of features. */ - uint32_t ndpu, /**< [in] Number of available DPUs. */ float ***features_out) /**< [out] Vector of features. */ { char line[1024]; - uint64_t npoints = 0; - uint64_t npadded; - int nfeatures = 0; float **features; FILE *infile; @@ -129,7 +115,7 @@ void read_txt_input( } while (fgets(line, 1024, infile) != NULL) if (strtok(line, " \t\n") != 0) - npoints++; + p->npoints++; rewind(infile); while (fgets(line, 1024, infile) != NULL) { @@ -137,18 +123,18 @@ void read_txt_input( { /* ignore the id (first attribute): nfeatures = 1; */ while (strtok(NULL, " ,\t\n") != NULL) - nfeatures++; + p->nfeatures++; break; } } /* rounding the size of the input to the smallest multiple of 8*ndpu larger than npoints */ - npadded = ((npoints + 8 * ndpu - 1) / (8 * ndpu)) * 8 * ndpu; + p->npadded = ((p->npoints + 8 * p->ndpu - 1) / (8 * p->ndpu)) * 8 * p->ndpu; /* allocate space for features[] and read attributes of all objects */ - features = (float **)malloc(npadded * sizeof(float *)); - features[0] = (float *)malloc(npadded * nfeatures * sizeof(float)); - for (int ipoint = 1; ipoint < npadded; ipoint++) - features[ipoint] = features[ipoint - 1] + nfeatures; + features = (float **)malloc(p->npadded * sizeof(*features)); + features[0] = (float *)malloc(p->npadded * p->nfeatures * sizeof(**features)); + for (int ipoint = 1; ipoint < p->npadded; ipoint++) + features[ipoint] = features[ipoint - 1] + p->nfeatures; /* checking that we managed to assign enough memory */ if (!features[0]) @@ -162,11 +148,9 @@ void read_txt_input( int ifeature_global = 0; while (fgets(line, 1024, infile) != NULL) { - // if(i%10==0) - // printf("line %d\n", i); if (strtok(line, " \t\n") == NULL) continue; - for (int ifeature = 0; ifeature < nfeatures; ifeature++) + for (int ifeature = 0; ifeature < p->nfeatures; ifeature++) { features[0][ifeature_global] = atof(strtok(NULL, " ,\t\n")); ifeature_global++; @@ -176,26 +160,22 @@ void read_txt_input( fclose(infile); *features_out = features; - *npoints_out = npoints; - *npadded_out = npadded; - *nfeatures_out = nfeatures; } /** * @brief Saves the input data in a binary file for faster access next time. * + * @param p Algorithm parameters. * @param filename_in [in] Name of the input text file. - * @param npoints [in] Number of points. - * @param nfeatures [in] Number of features. * @param features [npoints][nfeatures] Feature array. */ -void save_dat_file(const char *filename_in, uint64_t npoints, int nfeatures, float **features) +void save_dat_file(Params *p, const char *filename_in, float **features) { char *filename = strdup(filename_in); char suffix[] = ".dat"; int n = strlen(filename) + strlen(suffix); - char *dat_name = (char *)malloc(n * sizeof(char)); + char *dat_name = (char *)malloc(n * sizeof(*dat_name)); strcpy(dat_name, filename); strip_ext(dat_name); @@ -205,9 +185,9 @@ void save_dat_file(const char *filename_in, uint64_t npoints, int nfeatures, flo FILE *binfile; binfile = fopen(dat_name, "wb"); - fwrite(&npoints, sizeof(uint64_t), 1, binfile); - fwrite(&nfeatures, sizeof(int), 1, binfile); - fwrite(features[0], sizeof(float), npoints * nfeatures, binfile); + fwrite(&p->npoints, sizeof(p->npoints), 1, binfile); + fwrite(&p->nfeatures, sizeof(p->nfeatures), 1, binfile); + fwrite(features[0], sizeof(*features[0]), p->npoints * p->nfeatures, binfile); fclose(binfile); free(filename); @@ -218,22 +198,18 @@ void save_dat_file(const char *filename_in, uint64_t npoints, int nfeatures, flo * @brief Formats a flat array into a bidimensional representation */ void format_array_input( - uint64_t npoints, /**< [in] Number of points. */ - uint64_t *npadded_out, /**< [out] Number of points with padding. */ - int nfeatures, /**< [in] Number of features. */ - uint32_t ndpu, /**< [in] Number of available DPUs */ + Params *p, /**< Algorithm parameters. */ float *data, /**< [in] The data as a flat table */ float ***features_out) /**< [out] The data as two dimensional table */ { - uint64_t npadded; - npadded = ((npoints + 8 * ndpu - 1) / (8 * ndpu)) * 8 * ndpu; + // uint64_t npadded; + p->npadded = ((p->npoints + 8 * p->ndpu - 1) / (8 * p->ndpu)) * 8 * p->ndpu; - float **features = (float **)malloc(npadded * sizeof(float *)); + float **features = (float **)malloc(p->npadded * sizeof(*features)); features[0] = data; - for (int ipoint = 1; ipoint < npadded; ipoint++) - features[ipoint] = features[ipoint - 1] + nfeatures; + for (int ipoint = 1; ipoint < p->npadded; ipoint++) + features[ipoint] = features[ipoint - 1] + p->nfeatures; - *npadded_out = npadded; *features_out = features; } @@ -242,31 +218,28 @@ void format_array_input( * * @return float Scaling factor applied to the input data. */ -float preprocessing( - float **mean_out, /**< [out] Per-feature average. */ - int nfeatures, /**< [in] Number of features. */ - uint64_t npoints, /**< [in] Number of points. */ - uint64_t npadded, /**< [in] Number of points with padding. */ +void preprocessing( + Params *p, /**< Algorithm parameters */ float **features_float, /**< [in] Features as floats. */ int_feature ***features_int_out, /**< [out] Features as integers. */ - float *threshold, /**< [in,out] Termination criterion for the algorithm. */ int verbose) /**< [in] Whether or not to print runtime information. */ { uint64_t ipoint; int ifeature; float *mean; - double *variance; + float *variance; int_feature **features_int; - double avg_variance; + float avg_variance; float max_feature = 0; - float scale_factor; #ifdef PERF_COUNTER struct timeval tic, toc; gettimeofday(&tic, NULL); #endif + p->npointperdpu = p->npadded / p->ndpu; + /* DEBUG : print features head */ // printf("features head:\n"); // for (int ipoint = 0; ipoint < 10; ipoint++) @@ -277,33 +250,35 @@ float preprocessing( // } // printf("\n"); - mean = (float *)calloc(nfeatures, sizeof(float)); - variance = (double *)calloc(nfeatures, sizeof(double)); + mean = (float *)calloc(p->nfeatures, sizeof(*p->mean)); + variance = (float *)calloc(p->nfeatures, sizeof(*variance)); /* compute mean by feature */ #pragma omp parallel for collapse(2) \ reduction(+ \ - : mean[:nfeatures]) - for (ifeature = 0; ifeature < nfeatures; ifeature++) - for (ipoint = 0; ipoint < npoints; ipoint++) + : mean[:p->nfeatures]) + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) + for (ipoint = 0; ipoint < p->npoints; ipoint++) mean[ifeature] += features_float[ipoint][ifeature]; #pragma omp parallel for - for (ifeature = 0; ifeature < nfeatures; ifeature++) - mean[ifeature] /= npoints; + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) + mean[ifeature] /= p->npoints; + + p->mean = mean; if (verbose) { printf("means = "); - for (ifeature = 0; ifeature < nfeatures; ifeature++) - printf(" %.4f",mean[ifeature]); + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) + printf(" %.4f", p->mean[ifeature]); printf("\n"); } /* subtract mean from each feature */ #pragma omp parallel for collapse(2) - for (ipoint = 0; ipoint < npoints; ipoint++) - for (ifeature = 0; ifeature < nfeatures; ifeature++) - features_float[ipoint][ifeature] -= mean[ifeature]; + for (ipoint = 0; ipoint < p->npoints; ipoint++) + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) + features_float[ipoint][ifeature] -= p->mean[ifeature]; /* ****** discretization ****** */ @@ -311,20 +286,20 @@ float preprocessing( #pragma omp parallel for collapse(2) \ reduction(max \ : max_feature) - for (ipoint = 0; ipoint < npoints; ipoint++) - for (ifeature = 0; ifeature < nfeatures; ifeature++) + for (ipoint = 0; ipoint < p->npoints; ipoint++) + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) if (fabsf(features_float[ipoint][ifeature]) > max_feature) max_feature = fabsf(features_float[ipoint][ifeature]); switch (sizeof(int_feature)) { case 1UL: - scale_factor = INT8_MAX / max_feature / 2; + p->scale_factor = INT8_MAX / max_feature / 2; break; case 2UL: - scale_factor = INT16_MAX / max_feature / 2; + p->scale_factor = INT16_MAX / max_feature / 2; break; case 4UL: - scale_factor = INT32_MAX / max_feature / 2; + p->scale_factor = INT32_MAX / max_feature / 2; break; default: printf("Error: unsupported type for int_feature.\n"); @@ -334,14 +309,14 @@ float preprocessing( if (verbose) { printf("max absolute value : %f\n", max_feature); - printf("scale factor = %.4f\n", scale_factor); + printf("scale factor = %.4f\n", p->scale_factor); } /* allocate space for features_int[][] and convert attributes of all objects */ - features_int = (int_feature **)malloc(npadded * sizeof(int_feature *)); - features_int[0] = (int_feature *)malloc(npadded * nfeatures * sizeof(int_feature)); - for (ipoint = 1; ipoint < npadded; ipoint++) - features_int[ipoint] = features_int[ipoint - 1] + nfeatures; + features_int = (int_feature **)malloc(p->npadded * sizeof(*features_int)); + features_int[0] = (int_feature *)malloc(p->npadded * p->nfeatures * sizeof(features_int)); + for (ipoint = 1; ipoint < p->npadded; ipoint++) + features_int[ipoint] = features_int[ipoint - 1] + p->nfeatures; /* checking that we managed to assign enough memory */ if (!features_int[0]) @@ -351,9 +326,9 @@ float preprocessing( } #pragma omp parallel for collapse(2) - for (ipoint = 0; ipoint < npoints; ipoint++) - for (ifeature = 0; ifeature < nfeatures; ifeature++) - features_int[ipoint][ifeature] = lroundf(features_float[ipoint][ifeature] * scale_factor); + for (ipoint = 0; ipoint < p->npoints; ipoint++) + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) + features_int[ipoint][ifeature] = lroundf(features_float[ipoint][ifeature] * p->scale_factor); /* DEBUG : print features head */ // printf("features head:\n"); @@ -382,23 +357,23 @@ float preprocessing( /* compute variance by feature */ #pragma omp parallel for collapse(2) \ reduction(+ \ - : variance[:nfeatures]) - for (ipoint = 0; ipoint < npoints; ipoint++) - for (ifeature = 0; ifeature < nfeatures; ifeature++) + : variance[:p->nfeatures]) + for (ipoint = 0; ipoint < p->npoints; ipoint++) + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) variance[ifeature] += features_float[ipoint][ifeature] * features_float[ipoint][ifeature]; #pragma omp parallel for - for (ifeature = 0; ifeature < nfeatures; ifeature++) - variance[ifeature] /= npoints; + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) + variance[ifeature] /= p->npoints; /* compute average of variance */ avg_variance = 0; #pragma omp parallel for reduction(+ \ : avg_variance) - for (ifeature = 0; ifeature < nfeatures; ifeature++) + for (ifeature = 0; ifeature < p->nfeatures; ifeature++) avg_variance += variance[ifeature]; - avg_variance /= nfeatures; - *threshold *= avg_variance; + avg_variance /= p->nfeatures; + p->threshold *= avg_variance; #ifdef PERF_COUNTER /* compute time spent on preprocessing */ @@ -409,7 +384,7 @@ float preprocessing( if (verbose) { printf("avg_variance = %.4f\n", avg_variance); - printf("threshold = %.4f\n", *threshold); + printf("threshold = %.4f\n", p->threshold); printf("\npreprocessing completed\n\n"); } @@ -431,58 +406,48 @@ float preprocessing( // printf("\n"); // } - *mean_out = mean; *features_int_out = features_int; - - return scale_factor; } /** * @brief Restores the input data to its original state. * - * @param npoints [in] number of points - * @param nfeatures [in] number of features + * @param p [in] Algorithm parameters. * @param features [in,out] array of features - * @param mean [in] average computed during preprocessing */ -void postprocessing(uint64_t npoints, int nfeatures, float **features, float *mean) +void postprocessing(Params *p, float **features_float) { #pragma omp parallel for collapse(2) - for (uint64_t ipoint = 0; ipoint < npoints; ipoint++) - for (int ifeature = 0; ifeature < nfeatures; ifeature++) - features[ipoint][ifeature] += mean[ifeature]; + for (uint64_t ipoint = 0; ipoint < p->npoints; ipoint++) + for (int ifeature = 0; ifeature < p->nfeatures; ifeature++) + features_float[ipoint][ifeature] += p->mean[ifeature]; } /** * @brief Checks for errors in the input * - * @param npoints [in] Number of points. - * @param npadded [in] Number of points with padding. - * @param min_nclusters [in] Minimum number of clusters. - * @param max_nclusters [in] Maximum number of clusters. - * @param nfeatures [in] Number of features. - * @param ndpu [in] Number of available DPUs. + * @param p Algorithm parameters. */ -static void error_check(uint64_t npoints, uint64_t npadded, int min_nclusters, int max_nclusters, int nfeatures, uint32_t ndpu) +static void error_check(Params *p) { - if (npoints < min_nclusters) + if (p->npoints < p->min_nclusters) { - printf("Error: min_nclusters(%d) > npoints(%lu) -- cannot proceed\n", min_nclusters, npoints); + printf("Error: min_nclusters(%d) > npoints(%lu) -- cannot proceed\n", p->min_nclusters, p->npoints); exit(EXIT_FAILURE); } - if ((max_nclusters < min_nclusters) || (max_nclusters > ASSUMED_NR_CLUSTERS)) + if ((p->max_nclusters < p->min_nclusters) || (p->max_nclusters > ASSUMED_NR_CLUSTERS)) { - printf("Error: min_nclusters(%d) > max_nclusters(%lu) or max_nclusters > max clusters allowed(%d) -- cannot proceed\n", min_nclusters, npoints, ASSUMED_NR_CLUSTERS); + printf("Error: min_nclusters(%d) > max_nclusters(%lu) or max_nclusters > max clusters allowed(%d) -- cannot proceed\n", p->min_nclusters, p->npoints, ASSUMED_NR_CLUSTERS); exit(EXIT_FAILURE); } - if (ASSUMED_NR_FEATURES < nfeatures) + if (ASSUMED_NR_FEATURES < p->nfeatures) { - printf("Error: nfeatures(%d) > max clusters allowed(%d) -- cannot proceed\n", nfeatures, ASSUMED_NR_FEATURES); + printf("Error: nfeatures(%d) > max clusters allowed(%d) -- cannot proceed\n", p->nfeatures, ASSUMED_NR_FEATURES); exit(EXIT_FAILURE); } - if (npadded * nfeatures / ndpu > MAX_FEATURE_DPU) + if (p->npadded * p->nfeatures / p->ndpu > MAX_FEATURE_DPU) { - printf("Error: npadded*nfeatures/ndpu(%lu) > max features allowed per dpu(%d) -- cannot proceed\n", npadded * nfeatures / ndpu, MAX_FEATURE_DPU); + printf("Error: npadded*nfeatures/ndpu(%lu) > max features allowed per dpu(%d) -- cannot proceed\n", p->npadded * p->nfeatures / p->ndpu, MAX_FEATURE_DPU); exit(EXIT_FAILURE); } } @@ -490,57 +455,49 @@ static void error_check(uint64_t npoints, uint64_t npadded, int min_nclusters, i /** * @brief Output to array. * + * @param p Algorithm parameters. * @param best_nclusters [in] Best number of clusters according to RMSE. - * @param nfeatures [in] Number of features. * @param cluster_centres [in] Coordinate of clusters centres for the best iteration. - * @param scale_factor [in] Factor that was used in quantizing the data. - * @param mean [in] Mean that was subtracted during preprocessing. * @return float* The return array */ -static float *array_output(int best_nclusters, int nfeatures, float **cluster_centres, float scale_factor, float *mean) +static float *array_output(Params *p, int best_nclusters, float **cluster_centres) { - float *output_clusters = (float *)malloc(best_nclusters * nfeatures * sizeof(float)); +#pragma omp parallel for collapse(2) for (int icluster = 0; icluster < best_nclusters; icluster++) - for (int ifeature = 0; ifeature < nfeatures; ifeature++) - output_clusters[ifeature + icluster * nfeatures] = cluster_centres[icluster][ifeature] + mean[ifeature]; - return output_clusters; + for (int ifeature = 0; ifeature < p->nfeatures; ifeature++) + cluster_centres[icluster][ifeature] = cluster_centres[icluster][ifeature] + p->mean[ifeature]; + + return cluster_centres[0]; } /** - * @brief output to the command line + * @brief Output to the command line. * */ static void cli_output( - int min_nclusters, /**< [in] lower bound of the number of clusters */ - int max_nclusters, /**< [in] upper bound of the number of clusters */ - int nfeatures, /**< [in] number of features */ + Params *p, /**< Algorithm parameters */ float **cluster_centres, /**< [in] coordinate of clusters centres for the best iteration */ - float scale_factor, /**< [in] factor that was used in quantizing the data */ - float *mean, /**< [in] mean that was subtracted during preprocessing */ - int nloops, /**< [in] how many times the algorithm will be executed for each number of clusters */ - int isRMSE, /**< [in] whether or not RMSE is computed */ float rmse, /**< [in] value of the RMSE for the best iteration */ int index) /**< [in] number of trials for the best RMSE */ { - /* cluster center coordinates - :displayed only for when k=1*/ - if (min_nclusters == max_nclusters) + /* print cluster center coordinates */ + if (p->min_nclusters == p->max_nclusters) { printf("\n================= Centroid Coordinates =================\n"); - for (int icluster = 0; icluster < max_nclusters; icluster++) + for (int icluster = 0; icluster < p->max_nclusters; icluster++) { printf("%2d:", icluster); - for (int ifeature = 0; ifeature < nfeatures; ifeature++) - printf(" % 10.6f", cluster_centres[icluster][ifeature] + mean[ifeature]); + for (int ifeature = 0; ifeature < p->nfeatures; ifeature++) + printf(" % 10.6f", cluster_centres[icluster][ifeature]); printf("\n"); } } - printf("Number of Iteration: %d\n", nloops); + printf("Number of Iteration: %d\n", p->nloops); - if (min_nclusters == max_nclusters && isRMSE) + if (p->min_nclusters == p->max_nclusters && p->isRMSE) { - if (nloops != 1) + if (p->nloops != 1) { // single k, multiple iteration printf("Number of trials to approach the best RMSE of %.3f is %d\n", rmse, index + 1); } @@ -557,137 +514,58 @@ static void cli_output( * @return float* The centroids coordinates found by the algorithm. */ float *kmeans_c( + Params *p, /**< Algorithm parameters */ float **features_float, /**< [in] array of features */ int_feature **features_int, /**< [in] array of quantized features */ - int nfeatures, /**< [in] number of feature */ - uint64_t npoints, /**< [in] number of points */ - uint64_t npadded, /**< [in] number of points with padding */ - float scale_factor, /**< [in] scale factor used in quantizaton */ - float threshold, /**< [in] threshold for termination of the algorithm */ - float *mean, /**< [in] feature-wise mean subtracted during preprocessing */ - int max_nclusters, /**< [in] upper bound of the number of clusters */ - int min_nclusters, /**< [in] lower bound of the number of clusters */ - int isRMSE, /**< [in] whether or not RMSE is computed */ - int isOutput, /**< [in] whether or not to print the centroids and runtime information */ - int nloops, /**< [in] how many times the algorithm will be executed for each number of clusters */ int *log_iterations, /**< [out] Number of iterations per nclusters */ double *log_time, /**< [out] Time taken per nclusters */ - uint32_t ndpu, /**< [in] number of assigned DPUs */ - dpu_set *allset, /**< [in] set of all assigned DPUs */ int *best_nclusters) /**< [out] best number of clusters according to RMSE */ { /* Variables for I/O. */ float *output_clusters; /* return pointer */ - // dpu_set allset; /* Set of all available DPUs. */ - /* Data arrays. */ float **cluster_centres = NULL; /* array of centroid coordinates */ - // float *mean; /* feature-wise average of points coordinates */ - // int_feature **cluster_centres_int = NULL; /* array of discretized centroid coordinates */ /* Generated values. */ int index; /* number of iterations on the best run */ float rmse; /* RMSE value */ - // int best_nclusters = 0; /* best number of clusters according to RMSE */ - - /* ============== DPUs init ==============*/ - /* necessary to do it first to know the n° of available DPUs */ - // TO REMOVE: refactored to Container class - // DPU_ASSERT(dpu_alloc(DPU_ALLOCATE_ALL, NULL, &allset)); - // DPU_ASSERT(dpu_get_nr_dpus(allset, &ndpu)); - /* ============== DPUs init end ==========*/ - - /* ============== I/O begin ==============*/ - // TO REMOVE: refactored to Container class - // if (fileInput) - // { - // if (isBinaryFile) - // { //Binary file input - // read_bin_input(filename, &npoints, &npadded, &nfeatures, ndpu, &features_float); - // } - // else - // { //Text file input - // read_txt_input(filename, &npoints, &npadded, &nfeatures, ndpu, &features_float); - // /* Saving features as a binary for next time */ - // save_dat_file(filename, npoints, nfeatures, features_float); - // } - // } - // else - // { - // format_array_input(npoints, &npadded, nfeatures, ndpu, data, &features_float); - // } - - if (isOutput) + if (p->isOutput) { - printf("\nNumber of objects without padding: %lu\n", npoints); - printf("Number of objects with padding: %lu\n", npadded); - printf("Number of features: %d\n", nfeatures); - printf("Number of DPUs: %d\n", ndpu); + printf("\nNumber of objects without padding: %lu\n", p->npoints); + printf("Number of objects with padding: %lu\n", p->npadded); + printf("Number of features: %d\n", p->nfeatures); + printf("Number of DPUs: %d\n", p->ndpu); } - /* ============== I/O end ==============*/ - // error check for clusters - error_check(npoints, npadded, min_nclusters, max_nclusters, nfeatures, ndpu); - - /* ======================= pre-processing ===========================*/ - // TO REMOVE: refactored to Container class - // scale_factor = preprocessing(&mean, nfeatures, npoints, npadded, features_float, &features_int, &threshold); - /* ======================= pre-processing end =======================*/ + /* Error check for clusters. */ + error_check(p); /* ======================= core of the clustering ===================*/ cluster_centres = NULL; - index = cluster(npoints, /* number of data points */ - npadded, /* number of data points with padding */ - nfeatures, /* number of features for each point */ - ndpu, /* number of available DPUs */ - features_float, /* array: [npoints][nfeatures] */ - features_int, /* array: [npoints][nfeatures] */ - min_nclusters, /* range of min to max number of clusters */ - max_nclusters, /* range of min to max number of clusters */ - scale_factor, /* scaling factor used in preprocessing */ - threshold, /* loop termination factor */ - best_nclusters, /* [out] number between min and max */ - &cluster_centres, /* [out] [best_nclusters][nfeatures] */ - &rmse, /* Root Mean Squared Error */ - isRMSE, /* calculate RMSE */ - isOutput, /* whether or not to print runtime information */ - nloops, /* number of iteration for each number of clusters */ - log_iterations, /* log of the number of iterations */ - log_time, /* log of the time taken */ - allset); /* set of all DPUs */ + index = cluster( + p, /* Algorithm parameters */ + features_float, /* [in] array: [npoints][nfeatures] */ + features_int, /* [in] array: [npoints][nfeatures] */ + best_nclusters, /* [out] number between min and max */ + &cluster_centres, /* [out] [best_nclusters][nfeatures] */ + &rmse, /* [out] Root Mean Squared Error */ + log_iterations, /* [out] log of the number of iterations */ + log_time /* [out] log of the time taken */ + ); /* =============== Array Output ====================== */ - output_clusters = array_output(*best_nclusters, nfeatures, cluster_centres, scale_factor, mean); + output_clusters = array_output(p, *best_nclusters, cluster_centres); /* =============== Command Line Output =============== */ - if (isOutput) - cli_output(min_nclusters, max_nclusters, nfeatures, cluster_centres, scale_factor, mean, nloops, isRMSE, rmse, index); - - /* =============== Postprocessing ==================== */ - - // TO REMOVE: refactored to Container class - // if (!fileInput) - // postprocessing(npoints, nfeatures, features_float, mean); - - /* free up memory */ - // TO REMOVE: refactored to Container class - // if (fileInput) - // free(features_float[0]); - // free(features_float); - // free(features_int[0]); - // free(features_int); + if (p->isOutput) + cli_output(p, cluster_centres, rmse, index); - // free(cluster_centres_int[0]); - // free(cluster_centres_int); - free(cluster_centres[0]); free(cluster_centres); - // free(mean); - // DPU_ASSERT(dpu_free(allset)); return output_clusters; } diff --git a/src/kmeans.h b/src/kmeans.h index de527b1..a8ed8b9 100644 --- a/src/kmeans.h +++ b/src/kmeans.h @@ -11,6 +11,7 @@ #ifndef _KMEANS_DPU_KERNEL_H_ #include #include +#include typedef struct dpu_set_t dpu_set; @@ -77,134 +78,104 @@ typedef int16_t int_feature; // typedef int32_t int_feature; // #define FEATURETYPE_32 -// Function declarations #ifndef _KMEANS_DPU_KERNEL_H_ +// Parameters holding struct +typedef struct Params{ + uint64_t npoints; + uint64_t npadded; + uint64_t npointperdpu; + int nfeatures; + float scale_factor; + float threshold; + float *mean; + int max_nclusters; + int min_nclusters; + int isRMSE; + int isOutput; + int nloops; + int max_iter; + uint32_t ndpu; + dpu_set allset; + int from_file; +} Params; + +// Function declarations /** @name rmse.c */ /**@{*/ -float rms_err(float **, int, uint64_t, float **, int); +float rms_err(Params *p, float **feature, float **cluster_centres, int nclusters); /**@}*/ /** @name kmeans.c */ /**@{*/ double time_seconds(struct timeval tic, struct timeval toc); void read_bin_input( + Params *p, const char *filename, - uint64_t *npoints_out, - uint64_t *npadded_out, - int *nfeatures_out, - uint32_t ndpu, float ***features_out); void read_txt_input( + Params *p, const char *filename, - uint64_t *npoints_out, - uint64_t *npadded_out, - int *nfeatures_out, - uint32_t ndpu, float ***features_out); -void save_dat_file(const char *filename_in, uint64_t npoints, int nfeatures, float **features); +void save_dat_file(Params *p, const char *filename_in, float **features); void format_array_input( - uint64_t npoints, - uint64_t *npadded_out, - int nfeatures, - uint32_t ndpu, + Params *p, float *data, float ***features_out); -float preprocessing( - float **mean_out, - int nfeatures, - uint64_t npoints, - uint64_t npadded, +void preprocessing( + Params *p, float **features, int_feature ***features_int_out, - float *threshold, int verbose); -void postprocessing(uint64_t npoints, int nfeatures, float **features, float *mean); +void postprocessing(Params *p, float **features); float *kmeans_c( + Params *p, float **features_float, int_feature **features_int, - int nfeatures, - uint64_t npoints, - uint64_t npadded, - float scale_factor, - float threshold, - float *mean, - int max_nclusters, - int min_nclusters, - int isRMSE, - int isOutput, - int nloops, int *log_iterations, double *log_time, - uint32_t ndpu, - dpu_set *allset, int *best_nclusters); /**@}*/ /** @name cluster.c */ /**@{*/ -void load_kernel(dpu_set *allset, const char *DPU_BINARY, uint32_t *ndpu); -void free_dpus(dpu_set *allset); +void load_kernel(Params *p, const char *DPU_BINARY); +void free_dpus(Params *p); int cluster( - uint64_t npoints, - uint64_t npadded, - int nfeatures, - uint32_t ndpu, + Params *p, float **features_float, int_feature **features_int, - int min_nclusters, - int max_nclusters, - float scale_factor, - float threshold, int *best_nclusters, float ***cluster_centres, float *min_rmse, - int isRMSE, - int isOutput, - int nloops, int *log_iterations, - double *log_time, - dpu_set *allset); + double *log_time); /**@}*/ /** @name kmeans_clustering.c */ /**@{*/ +void allocateClusters(Params *p, unsigned int nclusters); +void deallocateClusters(); float **kmeans_clustering( + Params *p, int_feature **features_int, float **features_float, - int nfeatures, - uint64_t npoints, - uint64_t npadded, unsigned int nclusters, - int ndpu, - float scale_factor, - float threshold, - int isOutput, - uint8_t *membership, int *loop, - int iteration, - dpu_set *allset); + int i_init); /**@}*/ /** @name kmeans_dpu.c */ /**@{*/ -void allocateMemory(uint64_t npadded, int ndpu); +void allocateMemory(Params *p); void deallocateMemory(); void populateDpu( - int_feature **feature, - int nfeatures, - uint64_t npoints, - uint64_t npadded, - int ndpu, - dpu_set *allset); + Params *p, + int_feature **feature); void kmeansDpu( - int nfeatures, - uint64_t npoints, - uint64_t npadded, - int ndpu, + Params *p, int nclusters, int64_t new_centers_len[ASSUMED_NR_CLUSTERS], - int64_t new_centers[ASSUMED_NR_CLUSTERS][ASSUMED_NR_FEATURES], - dpu_set *allset); + int64_t new_centers[ASSUMED_NR_CLUSTERS][ASSUMED_NR_FEATURES]); /**@}*/ #endif // ifndef _KMEANS_DPU_KERNEL_H_ diff --git a/src/kmeans_clustering.c b/src/kmeans_clustering.c index 1a810d6..2a842da 100644 --- a/src/kmeans_clustering.c +++ b/src/kmeans_clustering.c @@ -17,18 +17,69 @@ static int64_t new_centers_len[ASSUMED_NR_CLUSTERS]; /**< [nclusters]: no. of points in each cluster */ static int64_t new_centers[ASSUMED_NR_CLUSTERS][ASSUMED_NR_FEATURES]; /**< coordinates of the centroids */ +static float **clusters_float; /**< final cluster coordinates */ +static float **old_clusters_float; /**< previous cluster coordinates */ +static int_feature **clusters_int; /**< integer cluster coordinates */ + +void allocateClusters(Params *p, unsigned int nclusters) +{ + /* making sure we are sending cluster data in multiple of 8 */ + unsigned int features_in_8bytes = 8 / sizeof(int_feature); + int nclusters_round = ((nclusters + features_in_8bytes - 1) / features_in_8bytes) * features_in_8bytes; + + /* allocate space for and initialize exchange variable clusters_int[] */ + clusters_int = (int_feature **)malloc(nclusters_round * sizeof(*clusters_int)); + clusters_int[0] = (int_feature *)malloc(nclusters_round * p->nfeatures * sizeof(**clusters_int)); + for (int i = 1; i < nclusters; i++) + clusters_int[i] = clusters_int[i - 1] + p->nfeatures; + + /* allocate space for and initialize temporary variable old_clusters_float[] */ + old_clusters_float = (float **)malloc(nclusters * sizeof(*old_clusters_float)); + old_clusters_float[0] = (float *)malloc(nclusters * p->nfeatures * sizeof(**old_clusters_float)); + for (int i = 1; i < nclusters; i++) + old_clusters_float[i] = old_clusters_float[i - 1] + p->nfeatures; +} + +void deallocateClusters() +{ + free(old_clusters_float[0]); + free(old_clusters_float); + free(clusters_int[0]); + free(clusters_int); +} + +#ifdef FLT_REDUCE +static uint8_t *membership; + +/** + * @brief Allocates memory for membership table. + * + * @param p Algorithm parameters. + */ +void allocateMembershipTable(Params *p) +{ + membership = (uint8_t *)malloc(p->npadded * sizeof(*membership)); +} + +/** + * @brief Deallocates membership table. + * + */ +void deallocateMembershipTable() +{ + free(membership); +} /** * @brief Performs a final reduction of the centroids coordinates in float format. */ void final_reduction( - float **features_float, /**< array: [npadded][nfeatures] */ + float **features_float, /**< array: [npadded][nfeatures] */ int nfeatures, /**< number of attributes for each point */ uint64_t npoints, /**< number of data points */ uint64_t npadded, /**< number of data points with padding */ unsigned int nclusters, /**< number of clusters in this iteration */ int ndpu, /**< number of available DPUs */ - uint8_t *membership, /**< membership of each point */ float **clusters_float, /**< [out] final centroids coordinates */ dpu_set *allset) /**< pointer to the set of all assigned DPUs */ { @@ -60,8 +111,8 @@ void final_reduction( #pragma omp parallel { - float **clusters_thread = (float **)malloc(nclusters * sizeof(float *)); - clusters_thread[0] = (float *)calloc(nclusters * nfeatures, sizeof(float)); + float **clusters_thread = (float **)malloc(nclusters * sizeof(*clusters_thread)); + clusters_thread[0] = (float *)calloc(nclusters * nfeatures, sizeof(**clusters_thread)); for (int i = 1; i < nclusters; i++) clusters_thread[i] = clusters_thread[i - 1] + nfeatures; @@ -88,6 +139,7 @@ void final_reduction( printf("final reduction time: %f seconds\n\n", time_seconds(tic, toc)); #endif } +#endif /** * @brief Performs one run of the KMeans clustering algorithm. @@ -95,53 +147,39 @@ void final_reduction( * @return The centroids coordinates. */ float **kmeans_clustering( + Params *p, /**< Algorithm parameters.*/ int_feature **features_int, /**< array: [npadded][nfeatures] */ - float **features_float, /**< array: [npadded][nfeatures] */ - int nfeatures, /**< number of attributes for each point */ - uint64_t npoints, /**< number of data points */ - uint64_t npadded, /**< number of data points with padding */ + float **features_float, /**< array: [npadded][nfeatures] */ unsigned int nclusters, /**< number of clusters in this iteration */ - int ndpu, /**< number of available DPUs */ - float scale_factor, /**< [in] scale factor used in preprocessing */ - float threshold, /**< loop terminating factor */ - int isOutput, /**< [in] whether or not to print runtime information */ - uint8_t *membership, /**< [out] cluster membership of each point */ int *loop, /**< [out] number of inner iterations */ - int iteration, /**< index of current outer iteration */ - dpu_set *allset) /**< pointer to the set of all assigned DPUs */ + int i_init) /**< index of current outer iteration */ { - float **clusters_float; /* [out] final cluster coordinates */ - int_feature **clusters_int; /* intermediary cluster coordinates */ - float frob; /* Frobenius norm of the difference in the cluster centers of two consecutive iterations */ - unsigned int npointperdpu = npadded / ndpu; /* number of points per DPU */ + float frob; /* Frobenius norm of the difference in the cluster + centers of two consecutive iterations */ + + float **switch_clusters_float; /* pointer for switching */ /* nclusters should never be > npoints that would guarantee a cluster without points */ - if (nclusters > npoints) - nclusters = npoints; + if (nclusters > p->npoints) + nclusters = p->npoints; /* making sure we are sending cluster data in multiple of 8 */ unsigned int features_in_8bytes = 8 / sizeof(int_feature); int nclusters_round = ((nclusters + features_in_8bytes - 1) / features_in_8bytes) * features_in_8bytes; - /* allocate space for and initialize returning variable clusters_int[] */ - clusters_int = (int_feature **)malloc(nclusters_round * sizeof(int_feature *)); - clusters_int[0] = (int_feature *)malloc(nclusters_round * nfeatures * sizeof(int_feature)); - for (int i = 1; i < nclusters; i++) - clusters_int[i] = clusters_int[i - 1] + nfeatures; - /* allocate space for and initialize returning variable clusters_float[] */ - clusters_float = (float **)malloc(nclusters * sizeof(float *)); - clusters_float[0] = (float *)malloc(nclusters * nfeatures * sizeof(float)); + clusters_float = (float **)malloc(nclusters * sizeof(*clusters_float)); + clusters_float[0] = (float *)malloc(nclusters * p->nfeatures * sizeof(**clusters_float)); for (int i = 1; i < nclusters; i++) - clusters_float[i] = clusters_float[i - 1] + nfeatures; + clusters_float[i] = clusters_float[i - 1] + p->nfeatures; /* initialize the random clusters */ - int iteration_base_index = iteration * nclusters; + int iteration_base_index = i_init * nclusters; for (int icluster = 0; icluster < nclusters; icluster++) - for (int ifeature = 0; ifeature < nfeatures; ifeature++) + for (int ifeature = 0; ifeature < p->nfeatures; ifeature++) { - clusters_int[icluster][ifeature] = features_int[(icluster + iteration_base_index) % npoints][ifeature]; + clusters_int[icluster][ifeature] = features_int[(icluster + iteration_base_index) % p->npoints][ifeature]; clusters_float[icluster][ifeature] = clusters_int[icluster][ifeature]; } @@ -155,32 +193,28 @@ float **kmeans_clustering( // } // printf("\n"); -#ifdef CPU_REDUCE +#ifdef FLT_REDUCE /* initialize the membership to -1 for all */ for (i = 0; i < npoints; i++) membership[i] = -1; #endif /* inform DPUs of the current number of cluters */ - DPU_ASSERT(dpu_broadcast_to(*allset, "nclusters_host", 0, &nclusters, sizeof(nclusters), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_broadcast_to(p->allset, "nclusters_host", 0, &nclusters, sizeof(nclusters), DPU_XFER_DEFAULT)); /* iterate until convergence */ do { - DPU_ASSERT(dpu_broadcast_to(*allset, "c_clusters", 0, clusters_int[0], nclusters_round * nfeatures * sizeof(int_feature), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_broadcast_to(p->allset, "c_clusters", 0, clusters_int[0], nclusters_round * p->nfeatures * sizeof(int_feature), DPU_XFER_DEFAULT)); memset(new_centers, 0, sizeof(new_centers)); memset(new_centers_len, 0, sizeof(new_centers_len)); kmeansDpu( - nfeatures, /* number of attributes for each point */ - npoints, /* number of data points */ - npadded, /* number of data points with padding */ - ndpu, /* number of available DPUs */ + p, nclusters, /* number of clusters k */ new_centers_len, /* [out] number of points in each cluster */ - new_centers, /* [out] sum of points coordinates in each cluster */ - allset); + new_centers); /* [out] sum of points coordinates in each cluster */ /* DEBUG : print the centroids on each iteration */ // printf("clusters :\n"); @@ -194,31 +228,52 @@ float **kmeans_clustering( // } // printf("\n"); - /* replace old cluster centers with new_centers */ /* CPU side of reduction */ - frob = 0; + + /* switch old and new cluster pointers */ + if (*loop != 0) + { + switch_clusters_float = old_clusters_float; + old_clusters_float = clusters_float; + clusters_float = switch_clusters_float; + } + + /* replace old cluster centers with new_centers */ // #pragma omp parallel for collapse(2) reduction(+:frob) // awful performance, don't do that for (int i = 0; i < nclusters; i++) - for (int j = 0; j < nfeatures; j++) + for (int j = 0; j < p->nfeatures; j++) if (new_centers_len[i] > 0) { float new_coordinate = (float)new_centers[i][j] / new_centers_len[i]; /* take average i.e. sum/n */ clusters_int[i][j] = lround(new_coordinate); - new_coordinate /= scale_factor; + new_coordinate /= p->scale_factor; clusters_float[i][j] = new_coordinate; - float diff = clusters_float[i][j] - new_coordinate; + } + else + clusters_float[i][j] = old_clusters_float[i][j]; + + /* compute Frobenius norm */ + frob = 0; + if(*loop != 0) + { + for (int i = 0; i < nclusters; i++) + for (int j = 0; j < p->nfeatures; j++) + { + float diff = clusters_float[i][j] - old_clusters_float[i][j]; frob += diff * diff; } + } /* DEBUG : print convergence info */ - // printf("finished loop %d\n", loop); + // printf("finished loop %d\n", *loop); // printf("Frobenius norm = %.12f\n", frob); - // printf("delta = %d\n", delta); - // printf("\n\n"); + // printf("threshold = %f\n", p->threshold); + // printf("\n"); - } while (((*loop)++ < 500) && (frob > threshold)); /* makes sure loop terminates */ + (*loop)++; + } while (((*loop < p->max_iter) && (frob > p->threshold)) || *loop == 1); /* makes sure loop terminates */ - if (isOutput) + if (p->isOutput) printf("iterated %d times\n", *loop); #ifdef FLT_REDUCE @@ -228,7 +283,6 @@ float **kmeans_clustering( npadded, nclusters, ndpu, - membership, clusters_float, allset); #endif diff --git a/src/kmeans_dpu.c b/src/kmeans_dpu.c index b9b1d62..50fd410 100644 --- a/src/kmeans_dpu.c +++ b/src/kmeans_dpu.c @@ -28,16 +28,16 @@ uint64_t (*counters)[HOST_COUNTERS]; /**< performance counters from every DPU */ * @param npadded Number of points with padding. * @param ndpu Number of available DPUs. */ -void allocateMemory(uint64_t npadded, int ndpu) +void allocateMemory(Params *p) { - centers_psum = (int64_t *)malloc(ndpu * ASSUMED_NR_CLUSTERS * ASSUMED_NR_FEATURES * sizeof(int64_t)); - centers_pcount = (int **)malloc(ndpu * sizeof(int *)); - centers_pcount[0] = (int *)malloc(ndpu * ASSUMED_NR_CLUSTERS * sizeof(int)); - for (int i = 1; i < ndpu; i++) + centers_psum = (int64_t *)malloc(p->ndpu * ASSUMED_NR_CLUSTERS * ASSUMED_NR_FEATURES * sizeof(*centers_psum)); + centers_pcount = (int **)malloc(p->ndpu * sizeof(*centers_pcount)); + centers_pcount[0] = (int *)malloc(p->ndpu * ASSUMED_NR_CLUSTERS * sizeof(**centers_pcount)); + for (int i = 1; i < p->ndpu; i++) centers_pcount[i] = centers_pcount[i - 1] + ASSUMED_NR_CLUSTERS; #ifdef PERF_COUNTER - counters = malloc(ndpu * sizeof(uint64_t[HOST_COUNTERS])); + counters = malloc(p->ndpu * sizeof(uint64_t[HOST_COUNTERS])); #endif } @@ -74,39 +74,32 @@ int offset(int feature, int cluster, int dpu, int nfeatures, int nclusters) * @brief Fills the DPUs with their assigned points. */ void populateDpu( - int_feature **feature, /**< array: [npoints][nfeatures] */ - int nfeatures, /**< number of attributes for each point */ - uint64_t npoints, /**< number of real data points */ - uint64_t npadded, /**< number of padded data points */ - int ndpu, /**< number of available DPUs */ - dpu_set *allset) /**< set of assigned DPUs */ + Params *p, /**< Algorithm parameters */ + int_feature **feature) /**< array: [npoints][nfeatures] */ { - /**@{*/ - /** Iteration variables for the DPUs. */ + /* Iteration variables for the DPUs. */ struct dpu_set_t dpu; uint32_t each_dpu; - /**@{*/ - uint64_t npointperdpu = npadded / ndpu; /**< number of points per DPU */ - int *nreal_points; /**< number of real data points on each dpu */ - int64_t remaining_points = npoints; /**< number of yet unassigned points */ + int *nreal_points; /* number of real data points on each dpu */ + int64_t remaining_points = p->npoints; /* number of yet unassigned points */ - DPU_FOREACH(*allset, dpu, each_dpu) + DPU_FOREACH(p->allset, dpu, each_dpu) { int next; - next = each_dpu * npointperdpu; + next = each_dpu * p->npointperdpu; DPU_ASSERT(dpu_prepare_xfer(dpu, feature[next])); } - DPU_ASSERT(dpu_push_xfer(*allset, DPU_XFER_TO_DPU, "t_features", 0, npointperdpu * nfeatures * sizeof(int_feature), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_push_xfer(p->allset, DPU_XFER_TO_DPU, "t_features", 0, p->npointperdpu * p->nfeatures * sizeof(int_feature), DPU_XFER_DEFAULT)); // telling each DPU how many real points it has to process - nreal_points = (int *)malloc(ndpu * sizeof(int)); - for (int idpu = 0; idpu < ndpu; idpu++) + nreal_points = (int *)malloc(p->ndpu * sizeof(*nreal_points)); + for (int idpu = 0; idpu < p->ndpu; idpu++) { - nreal_points[idpu] = (remaining_points <= 0) ? 0 - : (remaining_points > npointperdpu) ? npointperdpu - : remaining_points; - remaining_points -= npointperdpu; + nreal_points[idpu] = (remaining_points <= 0) ? 0 + : (remaining_points > p->npointperdpu) ? p->npointperdpu + : remaining_points; + remaining_points -= p->npointperdpu; } /* DEBUG : print the number of non-padding points assigned to each DPU */ @@ -117,11 +110,11 @@ void populateDpu( // } // printf("\n"); - DPU_FOREACH(*allset, dpu, each_dpu) + DPU_FOREACH(p->allset, dpu, each_dpu) { DPU_ASSERT(dpu_prepare_xfer(dpu, &nreal_points[each_dpu])); } - DPU_ASSERT(dpu_push_xfer(*allset, DPU_XFER_TO_DPU, "npoints", 0, sizeof(int), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_push_xfer(p->allset, DPU_XFER_TO_DPU, "npoints", 0, sizeof(int), DPU_XFER_DEFAULT)); free(nreal_points); } @@ -129,26 +122,20 @@ void populateDpu( * @brief Performs one iteration of the Lloyd algorithm on DPUs and gets the results. */ void kmeansDpu( - int nfeatures, /**< number of attributes for each point */ - uint64_t npoints, /**< number of data points */ - uint64_t npadded, /**< number of data points with padding */ - int ndpu, /**< number of available DPUs */ + Params *p, /**< Algorithm parameters */ int nclusters, /**< number of clusters k */ int64_t new_centers_len[ASSUMED_NR_CLUSTERS], /**< [out] number of elements in each cluster */ - int64_t new_centers[ASSUMED_NR_CLUSTERS][ASSUMED_NR_FEATURES], /**< [out] sum of elements in each cluster */ - dpu_set *allset) /**< pointer to the set of all assigned DPUs */ + int64_t new_centers[ASSUMED_NR_CLUSTERS][ASSUMED_NR_FEATURES]) /**< [out] sum of elements in each cluster */ { struct dpu_set_t dpu; /* Iteration variable for the DPUs. */ uint32_t each_dpu; /* Iteration variable for the DPUs. */ - int npointperdpu = npadded / ndpu; /* number of points per DPU */ - #ifdef PERF_COUNTER uint64_t counters_mean[HOST_COUNTERS] = {0}; #endif //============RUNNING ONE LLOYD ITERATION ON THE DPU============== - DPU_ASSERT(dpu_launch(*allset, DPU_SYNCHRONOUS)); + DPU_ASSERT(dpu_launch(p->allset, DPU_SYNCHRONOUS)); //================================================================ /* DEBUG : read logs */ @@ -192,12 +179,12 @@ void kmeansDpu( #endif /* copy back membership count per dpu (device to host) */ - DPU_FOREACH(*allset, dpu, each_dpu) + DPU_FOREACH(p->allset, dpu, each_dpu) { DPU_ASSERT(dpu_prepare_xfer(dpu, &(centers_pcount[each_dpu][0]))); } int nclusters_even = ((nclusters + 1) / 2) * 2; - DPU_ASSERT(dpu_push_xfer(*allset, DPU_XFER_FROM_DPU, "centers_count_mram", 0, sizeof(int) * nclusters_even, DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_push_xfer(p->allset, DPU_XFER_FROM_DPU, "centers_count_mram", 0, sizeof(int) * nclusters_even, DPU_XFER_DEFAULT)); /* DEBUG : print outputed centroids counts per DPU */ // for (int dpu_id = 0; dpu_id < ndpu; dpu_id++) @@ -210,21 +197,21 @@ void kmeansDpu( // } /* copy back centroids partial averages (device to host) */ - DPU_FOREACH(*allset, dpu, each_dpu) + DPU_FOREACH(p->allset, dpu, each_dpu) { - DPU_ASSERT(dpu_prepare_xfer(dpu, ¢ers_psum[offset(0, 0, each_dpu, nfeatures, nclusters)])); + DPU_ASSERT(dpu_prepare_xfer(dpu, ¢ers_psum[offset(0, 0, each_dpu, p->nfeatures, nclusters)])); } - DPU_ASSERT(dpu_push_xfer(*allset, DPU_XFER_FROM_DPU, "centers_sum_mram", 0, nfeatures * nclusters * sizeof(int64_t), DPU_XFER_DEFAULT)); + DPU_ASSERT(dpu_push_xfer(p->allset, DPU_XFER_FROM_DPU, "centers_sum_mram", 0, p->nfeatures * nclusters * sizeof(int64_t), DPU_XFER_DEFAULT)); - for (int dpu_id = 0; dpu_id < ndpu; dpu_id++) + for (int dpu_id = 0; dpu_id < p->ndpu; dpu_id++) { for (int cluster_id = 0; cluster_id < nclusters; cluster_id++) { /* sum membership counts */ new_centers_len[cluster_id] += centers_pcount[dpu_id][cluster_id]; /* compute the new centroids sum */ - for (int feature_id = 0; feature_id < nfeatures; feature_id++) - new_centers[cluster_id][feature_id] += centers_psum[offset(feature_id, cluster_id, dpu_id, nfeatures, nclusters)]; + for (int feature_id = 0; feature_id < p->nfeatures; feature_id++) + new_centers[cluster_id][feature_id] += centers_psum[offset(feature_id, cluster_id, dpu_id, p->nfeatures, nclusters)]; } } } diff --git a/src/main.cpp b/src/main.cpp index a7fbca1..5aeeaf1 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -37,17 +37,9 @@ namespace py = pybind11; class Container { private: - float *mean; - int nfeatures; - uint64_t npoints; - uint64_t npadded; - uint32_t ndpu; - dpu_set allset; - float scale_factor; - float threshold; + Params p; float **features_float; int_feature **features_int; - bool from_file = false; public: /** @@ -57,7 +49,7 @@ class Container */ void load_kernel(const char *DPU_BINARY) { - ::load_kernel(&allset, DPU_BINARY, &ndpu); + ::load_kernel(&p, DPU_BINARY); } /** * @brief Loads data into the DPU from a file. @@ -67,61 +59,73 @@ class Container */ void load_file_data(const char *filename, bool is_binary_file, float threshold_in, int verbose) { - from_file = true; + p.from_file = true; if (is_binary_file) - read_bin_input(filename, &npoints, &npadded, &nfeatures, ndpu, &features_float); + read_bin_input(&p, filename, &features_float); else - read_txt_input(filename, &npoints, &npadded, &nfeatures, ndpu, &features_float); - save_dat_file(filename, npoints, nfeatures, features_float); + read_txt_input(&p, filename, &features_float); + save_dat_file(&p, filename, features_float); transfer_data(threshold_in, verbose); } /** * @brief Loads data into the DPUs from a python array * * @param data A python ndarray. - * @param npoints_in Number of points. - * @param nfeatures_in Number of features. + * @param npoints Number of points. + * @param nfeatures Number of features. + * @param threshold Parameter to declare convergence. + * @param verbose Verbosity level. */ - void load_array_data(py::array_t data, uint64_t npoints_in, int nfeatures_in, float threshold_in, int verbose) + void load_array_data(py::array_t data, uint64_t npoints, int nfeatures, float threshold, int verbose) { float *data_ptr = (float *)data.request().ptr; - npoints = npoints_in; - nfeatures = nfeatures_in; - format_array_input(npoints, &npadded, nfeatures, ndpu, data_ptr, &features_float); - transfer_data(threshold_in, verbose); + p.from_file = false; + + p.npoints = npoints; + p.nfeatures = nfeatures; + format_array_input(&p, data_ptr, &features_float); + transfer_data(threshold, verbose); } /** * @brief Preprocesses and transfers quantized data to the DPUs * */ - void transfer_data(float threshold_in, int verbose) + void transfer_data(float threshold, int verbose) { - threshold = threshold_in; - scale_factor = preprocessing(&mean, nfeatures, npoints, npadded, features_float, &features_int, &threshold, verbose); - populateDpu(features_int, nfeatures, npoints, npadded, ndpu, &allset); + p.threshold = threshold; + preprocessing(&p, features_float, &features_int, verbose); + populateDpu(&p, features_int); + allocateMemory(&p); + #ifdef FLT_REDUCE + allocateMembershipTable(&p); + #endif } /** * @brief Frees the data. */ - void free_data(bool from_file) + void free_data(bool from_file, bool restore_features) { /* We are NOT freeing the underlying float array if it is managed by python */ if (from_file) free(features_float[0]); - else - postprocessing(npoints, nfeatures, features_float, mean); + else if (restore_features) + postprocessing(&p, features_float); free(features_float); free(features_int[0]); free(features_int); - free(mean); + free(p.mean); + #ifdef FLT_REDUCE + deallocateMembershipTable(); + #endif } /** * @brief Frees the DPUs */ void free_dpus() { - ::free_dpus(&allset); + ::free_dpus(&p); + deallocateMemory(); } py::array_t kmeans_cpp( @@ -130,6 +134,7 @@ class Container int isRMSE, int isOutput, int nloops, + int max_iter, py::array_t log_iterations, py::array_t log_time); }; @@ -145,6 +150,7 @@ py::array_t Container ::kmeans_cpp( int isRMSE, /**< whether or not RMSE is computed */ int isOutput, /**< whether or not to print the centroids */ int nloops, /**< how many times the algorithm will be executed for each number of clusters */ + int max_iter, /**< upper bound of the number of iterations */ py::array_t log_iterations, /**< array logging the iterations per nclusters */ py::array_t log_time) /**< array logging the time taken per nclusters */ { @@ -153,28 +159,23 @@ py::array_t Container ::kmeans_cpp( int *log_iter_ptr = (int *)log_iterations.request().ptr; double *log_time_ptr = (double *)log_time.request().ptr; + p.max_nclusters = max_nclusters; + p.min_nclusters = min_nclusters; + p.isRMSE = isRMSE; + p.isOutput = isOutput; + p.nloops = nloops; + p.max_iter = max_iter; + float *clusters = kmeans_c( + &p, features_float, features_int, - nfeatures, - npoints, - npadded, - scale_factor, - threshold, - mean, - max_nclusters, - min_nclusters, - isRMSE, - isOutput, - nloops, log_iter_ptr, log_time_ptr, - ndpu, - &allset, &best_nclusters); - std::vector shape = {best_nclusters, nfeatures}; - std::vector strides = {(int)sizeof(float) * nfeatures, sizeof(float)}; + std::vector shape = {best_nclusters, p.nfeatures}; + std::vector strides = {(int)sizeof(float) * p.nfeatures, sizeof(float)}; py::capsule free_when_done(clusters, [](void *f) { delete reinterpret_cast(f); }); diff --git a/src/rmse.c b/src/rmse.c index 455c094..754c83e 100644 --- a/src/rmse.c +++ b/src/rmse.c @@ -62,9 +62,8 @@ __inline int find_nearest_point(float *pt, /**< array:[nfeatures] */ * * @return the RMSE */ -float rms_err(float **feature, /**< array:[npoints][nfeatures] */ - int nfeatures, /**< number of features */ - uint64_t npoints, /**< number of points */ +float rms_err(Params *p, + float **feature, /**< array:[npoints][nfeatures] */ float **cluster_centres, /**< array:[nclusters][nfeatures] */ int nclusters) /**< number of clusters */ { @@ -72,6 +71,8 @@ float rms_err(float **feature, /**< array:[npoints][nfeatures] */ int nearest_cluster_index; /* cluster center id with min distance to pt */ float sum_euclid = 0.0; /* sum of Euclidean distance squares */ float ret; /* return value */ + uint64_t npoints = p->npoints; + int nfeatures = p->nfeatures; /* calculate and sum the square of euclidean distance*/ #pragma omp parallel for shared(feature, cluster_centres) \ @@ -91,7 +92,7 @@ float rms_err(float **feature, /**< array:[npoints][nfeatures] */ nfeatures); } /* divide by n, then take sqrt */ - ret = sqrt(sum_euclid / npoints); + ret = sqrt(sum_euclid / p->npoints); return (ret); }