diff --git a/CHANGELOG.md b/CHANGELOG.md index 527e6bdcdc..23d5d61197 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ - PR #2848: Fix typo in Python docstring for UMAP - PR #2856: Fix LabelEncoder for filtered input - PR #2855: Updates for RMM being header only +- PR #2844: Fix for OPG KNN Classifier & Regressor - PR #2880: Fix bugs in Auto-ARIMA when s==None - PR #2877: TSNE exception for n_components > 2 - PR #2879: Update unit test for LabelEncoder on filtered input diff --git a/cpp/src/knn/knn_opg_common.cu b/cpp/src/knn/knn_opg_common.cu index 3f0893f60f..cae7fc190e 100644 --- a/cpp/src/knn/knn_opg_common.cu +++ b/cpp/src/knn/knn_opg_common.cu @@ -54,6 +54,7 @@ #include #include +#include #include #include @@ -67,10 +68,22 @@ namespace opg { namespace knn_common { +/** + * This function copies the labels associated to the locally merged indices + * from the index partitions to a merged array of labels + * @param[out] out merged labels + * @param[in] knn_indices merged indices + * @param[in] parts unmerged labels in partitions + * @param[in] offsets array splitting the partitions making it possible + * to identify the origin partition of an nearest neighbor index + * @param[in] cur_batch_size current batch size + * @param[in] n_parts number of partitions + * @param[in] n_labels number of labels to write (batch_size * n_outputs) + */ template -__global__ void copy_outputs_kernel(T *out, int64_t *knn_indices, T **parts, - int64_t *offsets, size_t cur_batch_size, - int n_parts, int n_labels) { +__global__ void copy_label_outputs_from_index_parts_kernel( + T *out, int64_t *knn_indices, T **parts, int64_t *offsets, + size_t cur_batch_size, int n_parts, int n_labels) { int64_t i = (blockIdx.x * TPB_X) + threadIdx.x; if (i >= n_labels) return; int64_t nn_idx = knn_indices[i]; @@ -83,17 +96,20 @@ __global__ void copy_outputs_kernel(T *out, int64_t *knn_indices, T **parts, } template -void copy_outputs(T *out, int64_t *knn_indices, - std::vector> &y, size_t cur_batch_size, - int k, int n_outputs, int n_features, int my_rank, - std::vector &idxPartsToRanks, - std::shared_ptr alloc, cudaStream_t stream) { +void copy_label_outputs_from_index_parts(T *out, int64_t *knn_indices, + std::vector> &y, + size_t cur_batch_size, int k, + int n_outputs, int my_rank, + Matrix::PartDescriptor &index_desc, + std::shared_ptr alloc, + cudaStream_t stream) { const int TPB_X = 256; - int n_labels = cur_batch_size * k; dim3 grid(raft::ceildiv(n_labels, TPB_X)); dim3 blk(TPB_X); + std::vector &idxPartsToRanks = + index_desc.partsToRanks; int64_t offset = 0; std::vector offsets_h; for (auto &rsp : idxPartsToRanks) { @@ -114,10 +130,102 @@ void copy_outputs(T *out, int64_t *knn_indices, } raft::update_device(parts_d.data(), parts_h.data(), n_parts, stream); - copy_outputs_kernel<<>>( - out + (o * n_labels), knn_indices, parts_d.data(), offsets_d.data(), - cur_batch_size, n_parts, n_labels); + copy_label_outputs_from_index_parts_kernel + <<>>(out + (o * n_labels), knn_indices, + parts_d.data(), offsets_d.data(), + cur_batch_size, n_parts, n_labels); + } +} + +/** + * This function copies the labels associated to the merged indices + * from the unmerged to a merged (n_ranks times smaller) array of labels + * @param[out] outputs merged labels + * @param[in] knn_indices merged indices + * @param[in] unmerged_outputs unmerged labels + * @param[in] unmerged_knn_indices unmerged indices + * @param[in] offsets array splitting the partitions making it possible + * to identify the origin partition of an nearest neighbor index + * @param[in] parts_to_ranks get rank index from index partition index, + * informative to find positions as the unmerged arrays are built + * so that ranks are in order (unlike partitions) + * @param[in] nearest_neighbors number of nearest neighbors to look for in query + * @param[in] n_outputs number of targets + * @param[in] n_labels number of labels to write (batch_size * n_outputs) + * @param[in] n_parts number of index partitions + * @param[in] n_ranks number of index ranks + */ +template +__global__ void merge_labels_kernel(T *outputs, int64_t *knn_indices, + T *unmerged_outputs, + int64_t *unmerged_knn_indices, + int64_t *offsets, int *parts_to_ranks, + int nearest_neighbors, int n_outputs, + int n_labels, int n_parts, int n_ranks) { + int64_t i = (blockIdx.x * TPB_X) + threadIdx.x; + if (i >= n_labels) return; + int64_t nn_idx = knn_indices[i]; + int part_idx = 0; + for (; part_idx < n_parts && nn_idx >= offsets[part_idx]; part_idx++) + ; + part_idx = min(max((int)0, part_idx - 1), n_parts - 1); + int rank_idx = parts_to_ranks[part_idx]; + int inbatch_idx = i / nearest_neighbors; + int64_t elm_idx = (rank_idx * n_labels) + inbatch_idx * nearest_neighbors; + for (int k = 0; k < nearest_neighbors; k++) { + if (nn_idx == unmerged_knn_indices[elm_idx + k]) { + for (int o = 0; o < n_outputs; o++) { + outputs[(o * n_labels) + i] = + unmerged_outputs[(o * n_ranks * n_labels) + elm_idx + k]; + } + return; + } + } +} + +template +void merge_labels(T *output, int64_t *knn_indices, T *unmerged_outputs, + int64_t *unmerged_knn_indices, int cur_batch_size, + int nearest_neighbors, int n_outputs, + Matrix::PartDescriptor &index_desc, + std::shared_ptr alloc, cudaStream_t stream) { + const int TPB_X = 256; + int n_labels = cur_batch_size * nearest_neighbors; + dim3 grid(raft::ceildiv(n_labels, TPB_X)); + dim3 blk(TPB_X); + + std::set idxRanks = index_desc.uniqueRanks(); + std::vector &idxPartsToRanks = + index_desc.partsToRanks; + + int offset = 0; + std::vector offsets_h; + for (auto &rsp : idxPartsToRanks) { + offsets_h.push_back(offset); + offset += rsp->size; + } + device_buffer offsets_d(alloc, stream, offsets_h.size()); + raft::update_device(offsets_d.data(), offsets_h.data(), offsets_h.size(), + stream); + + std::vector parts_to_ranks_h; + for (auto &rsp : idxPartsToRanks) { + int i = 0; + for (int rank : idxRanks) { + if (rank == rsp->rank) { + parts_to_ranks_h.push_back(i); + } + ++i; + } } + device_buffer parts_to_ranks_d(alloc, stream, parts_to_ranks_h.size()); + raft::update_device(parts_to_ranks_d.data(), parts_to_ranks_h.data(), + parts_to_ranks_h.size(), stream); + + merge_labels_kernel<<>>( + output, knn_indices, unmerged_outputs, unmerged_knn_indices, + offsets_d.data(), parts_to_ranks_d.data(), nearest_neighbors, n_outputs, + n_labels, idxPartsToRanks.size(), idxRanks.size()); } template @@ -132,31 +240,31 @@ void launch_local_operation(T *out, int64_t *knn_indices, std::vector y, template <> void launch_local_operation( - int *out, int64_t *knn_indices, std::vector y, size_t total_labels, - size_t cur_batch_size, int k, const std::shared_ptr alloc, + int *out, int64_t *knn_indices, std::vector y, size_t n_index_rows, + size_t n_query_rows, int k, const std::shared_ptr alloc, cudaStream_t stream, cudaStream_t *int_streams, int n_int_streams, bool probas_only, std::vector *probas, std::vector *uniq_labels, std::vector *n_unique) { if (probas_only) { MLCommon::Selection::class_probs<32, true>( - *probas, nullptr, y, total_labels, cur_batch_size, k, *uniq_labels, + *probas, nullptr, y, n_index_rows, n_query_rows, k, *uniq_labels, *n_unique, alloc, stream, &int_streams[0], n_int_streams); } else { MLCommon::Selection::knn_classify<32, true>( - out, nullptr, y, total_labels, cur_batch_size, k, *uniq_labels, *n_unique, + out, nullptr, y, n_index_rows, n_query_rows, k, *uniq_labels, *n_unique, alloc, stream, &int_streams[0], n_int_streams); } } template <> void launch_local_operation( - float *out, int64_t *knn_indices, std::vector y, size_t total_labels, - size_t cur_batch_size, int k, const std::shared_ptr alloc, + float *out, int64_t *knn_indices, std::vector y, size_t n_index_rows, + size_t n_query_rows, int k, const std::shared_ptr alloc, cudaStream_t stream, cudaStream_t *int_streams, int n_int_streams, bool probas_only, std::vector *probas, std::vector *uniq_labels, std::vector *n_unique) { MLCommon::Selection::knn_regress( - out, nullptr, y, total_labels, cur_batch_size, k, stream, &int_streams[0], + out, nullptr, y, n_index_rows, n_query_rows, k, stream, &int_streams[0], n_int_streams); } @@ -168,7 +276,6 @@ void perform_local_operation(T *out, int64_t *knn_indices, T *labels, std::vector *uniq_labels = nullptr, std::vector *n_unique = nullptr) { size_t n_labels = cur_batch_size * k; - size_t total_labels = n_outputs * n_labels; std::vector y(n_outputs); for (int o = 0; o < n_outputs; o++) { @@ -184,9 +291,9 @@ void perform_local_operation(T *out, int64_t *knn_indices, T *labels, int_streams[i] = h.get_internal_stream(i); } - launch_local_operation(out, knn_indices, y, total_labels, cur_batch_size, - k, alloc, stream, int_streams, n_int_streams, - probas_only, probas, uniq_labels, n_unique); + launch_local_operation(out, knn_indices, y, n_labels, cur_batch_size, k, + alloc, stream, int_streams, n_int_streams, probas_only, + probas, uniq_labels, n_unique); } template @@ -195,8 +302,7 @@ void reduce(raft::handle_t &handle, std::vector *> *out, std::vector *out_D, device_buffer &res, device_buffer &res_I, device_buffer &res_D, Matrix::PartDescriptor &index_desc, size_t cur_batch_size, int k, - int n_outputs, int local_parts_completed, int cur_batch, - size_t total_n_processed, std::set idxRanks, int my_rank, + int n_outputs, int local_parts_completed, size_t total_n_processed, bool probas_only = false, std::vector> *probas = nullptr, std::vector *uniq_labels = nullptr, @@ -205,13 +311,14 @@ void reduce(raft::handle_t &handle, std::vector *> *out, cudaStream_t stream = h.get_stream(); const auto alloc = h.get_device_allocator(); + std::set idxRanks = index_desc.uniqueRanks(); device_buffer trans(alloc, stream, idxRanks.size()); CUDA_CHECK(cudaMemsetAsync(trans.data(), 0, idxRanks.size() * sizeof(int64_t), stream)); size_t batch_offset = total_n_processed * k; - T *output = nullptr; + T *outputs = nullptr; int64_t *indices = nullptr; float *distances = nullptr; @@ -225,12 +332,16 @@ void reduce(raft::handle_t &handle, std::vector *> *out, indices = indices_b->data(); distances = distances_b->data(); - auto &probas_part = probas->at(local_parts_completed); - for (float *ptr : probas_part) { - probas_with_offsets.push_back(ptr + batch_offset); + std::vector &probas_part = probas->at(local_parts_completed); + for (int i = 0; i < n_outputs; i++) { + float *ptr = probas_part[i]; + int n_unique_classes = n_unique->at(i); + probas_with_offsets.push_back(ptr + + (total_n_processed * n_unique_classes)); } } else { - output = out->at(local_parts_completed)->ptr + batch_offset; + outputs = + out->at(local_parts_completed)->ptr + (n_outputs * total_n_processed); indices = out_I->at(local_parts_completed)->ptr + batch_offset; distances = out_D->at(local_parts_completed)->ptr + batch_offset; } @@ -239,9 +350,15 @@ void reduce(raft::handle_t &handle, std::vector *> *out, indices, cur_batch_size, idxRanks.size(), k, stream, trans.data()); - perform_local_operation(output, indices, res.data(), cur_batch_size, k, - n_outputs, handle, probas_only, &probas_with_offsets, - uniq_labels, n_unique); + device_buffer merged_outputs_b(alloc, stream, + n_outputs * cur_batch_size * k); + T *merged_outputs = merged_outputs_b.data(); + merge_labels(merged_outputs, indices, res.data(), res_I.data(), + cur_batch_size, k, n_outputs, index_desc, alloc, stream); + + perform_local_operation(outputs, indices, merged_outputs, cur_batch_size, + k, n_outputs, handle, probas_only, + &probas_with_offsets, uniq_labels, n_unique); if (probas_only) { delete indices_b; @@ -313,19 +430,20 @@ void broadcast_query(float *query, size_t batch_input_elms, int part_rank, try { comm.waitall(requests.size(), requests.data()); } catch (raft::exception &e) { - std::cout << "FAILURE!" << std::endl; + CUML_LOG_DEBUG("FAILURE!"); } } /** - * All non-root index ranks send the results for the current - * query batch to the root rank for the batch. - */ + * All non-root index ranks send the results for the current + * query batch to the root rank for the batch. + */ template void exchange_results(device_buffer &res, device_buffer &res_I, device_buffer &res_D, const raft::comms::comms_t &comm, int part_rank, std::set idxRanks, cudaStream_t stream, + std::shared_ptr alloc, size_t cur_batch_size, int k, int n_outputs, int local_parts_completed) { int my_rank = comm.get_rank(); @@ -347,13 +465,11 @@ void exchange_results(device_buffer &res, device_buffer &res_I, for (size_t o = 0; o < n_outputs; o++) { comm.isend(res.data() + (o * batch_elms), batch_elms, part_rank, 0, requests.data() + request_idx); - request_idx++; + ++request_idx; } } else { bool part_rank_is_idx = idxRanks.find(part_rank) != idxRanks.end(); - int idx_rank_size = idxRanks.size(); - - int num_received = 0; + size_t idx_rank_size = idxRanks.size(); // if root rank is an index, it will already have // query data, so no need to receive from it. @@ -361,15 +477,41 @@ void exchange_results(device_buffer &res, device_buffer &res_I, res_I.resize(batch_elms * idx_rank_size, stream); res_D.resize(batch_elms * idx_rank_size, stream); if (part_rank_is_idx) { - num_received = 1; // root rank will take the zeroth slot --idx_rank_size; + int i = 0; + for (int rank : idxRanks) { + if (rank == my_rank) { + size_t batch_offset = batch_elms * i; + + // Indices and distances are stored in rank order + raft::copy_async(res_I.data() + batch_offset, res_I.data(), + batch_elms, stream); + raft::copy_async(res_D.data() + batch_offset, res_D.data(), + batch_elms, stream); + + device_buffer tmp_res(alloc, stream, n_outputs * batch_elms); + raft::copy_async(tmp_res.data(), res.data(), tmp_res.size(), stream); + + for (int o = 0; o < n_outputs; ++o) { + // Outputs are stored in target order and then in rank order + raft::copy_async( + res.data() + (o * idxRanks.size() * batch_elms) + batch_offset, + tmp_res.data() + (o * batch_elms), batch_elms, stream); + } + CUDA_CHECK(cudaStreamSynchronize(stream)); + break; + } + i++; + } } + int num_received = 0; requests.resize((2 + n_outputs) * idx_rank_size); for (int rank : idxRanks) { if (rank != my_rank) { size_t batch_offset = batch_elms * num_received; + // Indices and distances are stored in rank order comm.irecv(res_I.data() + batch_offset, batch_elms, rank, 0, requests.data() + request_idx); ++request_idx; @@ -378,11 +520,17 @@ void exchange_results(device_buffer &res, device_buffer &res_I, ++request_idx; for (size_t o = 0; o < n_outputs; o++) { + // Outputs are stored in target order and then in rank order T *r = res.data() + (o * idxRanks.size() * batch_elms) + batch_offset; comm.irecv(r, batch_elms, rank, 0, requests.data() + request_idx); ++request_idx; } - + ++num_received; + } else if (part_rank_is_idx) { + /** + * Prevents overwriting data when the owner of currently + * processed query partition has itself some index partition(s) + */ ++num_received; } } @@ -391,7 +539,7 @@ void exchange_results(device_buffer &res, device_buffer &res_I, try { comm.waitall(requests.size(), requests.data()); } catch (raft::exception &e) { - std::cout << "FAILURE!" << std::endl; + CUML_LOG_DEBUG("FAILURE!"); } } @@ -447,17 +595,12 @@ void opg_knn(raft::handle_t &handle, std::vector *> *out, if (cur_batch == total_batches - 1) cur_batch_size = part_n_rows - (cur_batch * batch_size); - if (my_rank == part_rank && verbose) { - std::cout << "Root Rank is " << my_rank << std::endl; - } + if (my_rank == part_rank) CUML_LOG_DEBUG("Root Rank is %d", my_rank); /** - * Root broadcasts batch to all other ranks - */ - if (verbose) { - std::cout << "Rank " << my_rank << ": Performing Broadcast" - << std::endl; - } + * Root broadcasts batch to all other ranks + */ + CUML_LOG_DEBUG("Rank %d: Performing Broadcast", my_rank); int my_rank = comm.get_rank(); device_buffer part_data(allocator, stream, 0); @@ -497,8 +640,8 @@ void opg_knn(raft::handle_t &handle, std::vector *> *out, bool my_rank_is_idx = idxRanks.find(my_rank) != idxRanks.end(); /** - * Send query to index partitions - */ + * Send query to index partitions + */ if (my_rank == part_rank || my_rank_is_idx) broadcast_query(cur_query_ptr, batch_input_elms, part_rank, idxRanks, comm, stream); @@ -508,11 +651,9 @@ void opg_knn(raft::handle_t &handle, std::vector *> *out, device_buffer res_D(allocator, stream); if (my_rank_is_idx) { /** - * All index ranks perform local KNN - */ - if (verbose) - std::cout << "Rank " << my_rank << ": Performing Local KNN" - << std::endl; + * All index ranks perform local KNN + */ + CUML_LOG_DEBUG("Rank %d: Performing Local KNN", my_rank); size_t batch_knn_elms = k * cur_batch_size; @@ -534,44 +675,45 @@ void opg_knn(raft::handle_t &handle, std::vector *> *out, handle.get_device_allocator(), cur_batch_size, k, cur_query_ptr, rowMajorIndex, rowMajorQuery); - // Synchronize before running labels copy - CUDA_CHECK(cudaStreamSynchronize(stream)); - - copy_outputs(res.data(), res_I.data(), y, (size_t)cur_batch_size, - (int)k, (int)n_outputs, (int)idx_desc.N, my_rank, - idx_desc.partsToRanks, handle.get_device_allocator(), - stream); + copy_label_outputs_from_index_parts( + res.data(), res_I.data(), y, (size_t)cur_batch_size, (int)k, + (int)n_outputs, my_rank, idx_desc, handle.get_device_allocator(), + stream); // Synchronize before sending CUDA_CHECK(cudaStreamSynchronize(stream)); + CUDA_CHECK(cudaPeekAtLastError()); } - /** - * Ranks exchange results. - * Partition owner receives. All other ranks send. - */ - if (verbose) - std::cout << "Rank " << my_rank << ": Exchanging results" << std::endl; - exchange_results(res, res_I, res_D, comm, part_rank, idxRanks, stream, - cur_batch_size, k, n_outputs, local_parts_completed); + if (part_rank == my_rank || my_rank_is_idx) { + /** + * Ranks exchange results. + * Each rank having index partition(s) sends + * its local results (my_rank_is_idx) + * Additionally the owner of currently processed query partition + * receives and performs a reduce even if it has + * no index partition (part_rank == my_rank) + */ + CUML_LOG_DEBUG("Rank %d: Exchanging results", my_rank); + exchange_results(res, res_I, res_D, comm, part_rank, idxRanks, stream, + handle.get_device_allocator(), cur_batch_size, k, + n_outputs, local_parts_completed); + } /** - * Root rank performs local reduce - */ + * Root rank performs local reduce + */ if (part_rank == my_rank) { - if (verbose) - std::cout << "Rank " << my_rank << ": Performing Reduce" << std::endl; + CUML_LOG_DEBUG("Rank %d: Performing Reduce", my_rank); reduce(handle, out, out_I, out_D, res, res_I, res_D, idx_desc, - cur_batch_size, k, n_outputs, local_parts_completed, cur_batch, - total_n_processed, idxRanks, my_rank, probas_only, probas, - uniq_labels, n_unique); + cur_batch_size, k, n_outputs, local_parts_completed, + total_n_processed, probas_only, probas, uniq_labels, n_unique); CUDA_CHECK(cudaStreamSynchronize(stream)); CUDA_CHECK(cudaPeekAtLastError()); - if (verbose) - std::cout << "Rank " << my_rank << ": Finished Reduce" << std::endl; + CUML_LOG_DEBUG("Rank %d: Finished Reduce", my_rank); } total_n_processed += cur_batch_size; @@ -611,52 +753,6 @@ template void opg_knn(raft::handle_t &handle, std::vector *uniq_labels, std::vector *n_unique, bool probas_only); -template void reduce( - raft::handle_t &handle, std::vector *> *out, - std::vector *> *out_I, - std::vector *out_D, device_buffer &res, - device_buffer &res_I, device_buffer &res_D, - Matrix::PartDescriptor &index_desc, size_t cur_batch_size, int k, - int n_outputs, int local_parts_completed, int cur_batch, - size_t total_n_processed, std::set idxRanks, int my_rank, - bool probas_only, std::vector> *probas, - std::vector *uniq_labels, std::vector *n_unique); - -template void reduce( - raft::handle_t &handle, std::vector *> *out, - std::vector *> *out_I, - std::vector *out_D, device_buffer &res, - device_buffer &res_I, device_buffer &res_D, - Matrix::PartDescriptor &index_desc, size_t cur_batch_size, int k, - int n_outputs, int local_parts_completed, int cur_batch, - size_t total_n_processed, std::set idxRanks, int my_rank, - bool probas_only, std::vector> *probas, - std::vector *uniq_labels, std::vector *n_unique); - -template void exchange_results( - device_buffer &res, device_buffer &res_I, - device_buffer &res_D, const raft::comms::comms_t &comm, int part_rank, - std::set idxRanks, cudaStream_t stream, size_t cur_batch_size, int k, - int n_outputs, int local_parts_completed); - -template void exchange_results( - device_buffer &res, device_buffer &res_I, - device_buffer &res_D, const raft::comms::comms_t &comm, int part_rank, - std::set idxRanks, cudaStream_t stream, size_t cur_batch_size, int k, - int n_outputs, int local_parts_completed); - -template void copy_outputs( - int *out, int64_t *knn_indices, std::vector> &y, - size_t cur_batch_size, int k, int n_outputs, int n_features, int my_rank, - std::vector &idxPartsToRanks, - std::shared_ptr alloc, cudaStream_t stream); - -template void copy_outputs( - float *out, int64_t *knn_indices, std::vector> &y, - size_t cur_batch_size, int k, int n_outputs, int n_features, int my_rank, - std::vector &idxPartsToRanks, - std::shared_ptr alloc, cudaStream_t stream); - }; // namespace knn_common }; // namespace opg }; // namespace KNN diff --git a/cpp/src_prims/selection/knn.cuh b/cpp/src_prims/selection/knn.cuh index 4a72a7cbec..7ca783cdab 100644 --- a/cpp/src_prims/selection/knn.cuh +++ b/cpp/src_prims/selection/knn.cuh @@ -472,8 +472,19 @@ void class_probs(std::vector &out, const int64_t *knn_indices, * knn_indices and labels */ device_buffer y_normalized(allocator, stream, n_index_rows); - MLCommon::Label::make_monotonic(y_normalized.data(), y[i], n_index_rows, - stream, allocator); + + /* + * Appending the array of unique labels to the original labels array + * to prevent make_monotonic function from producing misleading results + * due to the absence of some of the unique labels in the labels array + */ + device_buffer y_tmp(allocator, stream, n_index_rows + n_unique_labels); + raft::update_device(y_tmp.data(), y[i], n_index_rows, stream); + raft::update_device(y_tmp.data() + n_index_rows, uniq_labels[i], + n_unique_labels, stream); + + MLCommon::Label::make_monotonic(y_normalized.data(), y_tmp.data(), + y_tmp.size(), stream, allocator); raft::linalg::unaryOp( y_normalized.data(), y_normalized.data(), n_index_rows, [] __device__(int input) { return input - 1; }, stream); diff --git a/python/cuml/dask/neighbors/kneighbors_classifier.py b/python/cuml/dask/neighbors/kneighbors_classifier.py index f65f4d6275..7fd8d073c3 100644 --- a/python/cuml/dask/neighbors/kneighbors_classifier.py +++ b/python/cuml/dask/neighbors/kneighbors_classifier.py @@ -252,9 +252,9 @@ def score(self, X, y, convert_dtype=True): ------- score """ - labels, _, _ = self.predict(X, convert_dtype=convert_dtype) - diff = (labels == y) if self.data_handler.datatype == 'cupy': + preds, _, _ = self.predict(X, convert_dtype=convert_dtype) + diff = (preds == y) mean = da.mean(diff) return mean.compute() else: diff --git a/python/cuml/dask/neighbors/kneighbors_regressor.py b/python/cuml/dask/neighbors/kneighbors_regressor.py index 09bf880d36..25e97d75d6 100644 --- a/python/cuml/dask/neighbors/kneighbors_regressor.py +++ b/python/cuml/dask/neighbors/kneighbors_regressor.py @@ -221,10 +221,12 @@ def score(self, X, y): ------- score """ - labels, _, _ = self.predict(X, convert_dtype=True) - diff = (labels == y) if self.data_handler.datatype == 'cupy': - mean = da.mean(diff) - return mean.compute() + preds, _, _ = self.predict(X, convert_dtype=True) + y_mean = y.mean(axis=0) + residual_sss = ((y - preds) ** 2).sum(axis=0) + total_sss = ((y - y_mean) ** 2).sum(axis=0) + r2_score = da.mean(1 - (residual_sss / total_sss)) + return r2_score.compute() else: raise ValueError("Only Dask arrays are supported") diff --git a/python/cuml/neighbors/kneighbors_regressor_mg.pyx b/python/cuml/neighbors/kneighbors_regressor_mg.pyx index accb7990c3..d615ec9226 100644 --- a/python/cuml/neighbors/kneighbors_regressor_mg.pyx +++ b/python/cuml/neighbors/kneighbors_regressor_mg.pyx @@ -101,7 +101,7 @@ class KNeighborsRegressorMG(KNeighborsMG): query, query_parts_to_ranks, query_nrows, ncols, rank, convert_dtype) - output = self.gen_local_output(data, convert_dtype, dtype='int32') + output = self.gen_local_output(data, convert_dtype, dtype='float32') query_cais = input['cais']['query'] local_query_rows = list(map(lambda x: x.shape[0], query_cais)) diff --git a/python/cuml/test/dask/test_kneighbors_classifier.py b/python/cuml/test/dask/test_kneighbors_classifier.py index 0b702b944d..12e8b341c7 100644 --- a/python/cuml/test/dask/test_kneighbors_classifier.py +++ b/python/cuml/test/dask/test_kneighbors_classifier.py @@ -43,12 +43,12 @@ def generate_dask_array(np_array, n_parts): @pytest.fixture( scope="module", params=[ - unit_param({'n_samples': 1000, 'n_features': 30, + unit_param({'n_samples': 3000, 'n_features': 30, 'n_classes': 5, 'n_targets': 2}), - quality_param({'n_samples': 5000, 'n_features': 100, - 'n_classes': 12, 'n_targets': 4}), - stress_param({'n_samples': 12000, 'n_features': 40, - 'n_classes': 5, 'n_targets': 2}) + quality_param({'n_samples': 8000, 'n_features': 35, + 'n_classes': 12, 'n_targets': 3}), + stress_param({'n_samples': 20000, 'n_features': 40, + 'n_classes': 12, 'n_targets': 4}) ]) def dataset(request): X, y = make_multilabel_classification( @@ -69,18 +69,14 @@ def dataset(request): if len(new_x) >= request.param['n_samples']: break X = X[new_x] + noise = np.random.normal(0, 1.2, X.shape) + X += noise y = np.array(new_y) - return train_test_split(X, y, test_size=0.33) + return train_test_split(X, y, test_size=0.1) -def accuracy_score(y_true, y_pred): - assert y_pred.shape[0] == y_true.shape[0] - assert y_pred.shape[1] == y_true.shape[1] - return np.mean(y_pred == y_true) - - -def match_test(output1, output2): +def exact_match(output1, output2): l1, i1, d1 = output1 l2, i2, d2 = output2 l2 = l2.squeeze() @@ -90,56 +86,54 @@ def match_test(output1, output2): assert i1.shape == i2.shape assert d1.shape == d2.shape - # Distances should strictly match - assert np.array_equal(d1, d2) + # Distances should match + d1 = np.round(d1, 4) + d2 = np.round(d2, 4) + assert np.mean(d1 == d2) > 0.98 - # Indices might differ for equivalent distances - for i in range(d1.shape[0]): - idx_set1, idx_set2 = (set(), set()) - dist = 0. - for j in range(d1.shape[1]): - if d1[i, j] > dist: - assert idx_set1 == idx_set2 - idx_set1, idx_set2 = (set(), set()) - dist = d1[i, j] - idx_set1.add(i1[i, j]) - idx_set2.add(i2[i, j]) - # the last set of indices is not guaranteed + # Indices should match + correct_queries = (i1 == i2).all(axis=1) + assert np.mean(correct_queries) > 0.95 - # As indices might differ, labels can also differ - # assert np.mean((l1 == l2)) > 0.6 + # Labels should match + correct_queries = (l1 == l2).all(axis=1) + assert np.mean(correct_queries) > 0.95 def check_probabilities(l_probas, d_probas): assert len(l_probas) == len(d_probas) for i in range(len(l_probas)): assert l_probas[i].shape == d_probas[i].shape + assert np.array_equal(l_probas[i], d_probas[i]) @pytest.mark.parametrize("datatype", ['dask_array', 'dask_cudf']) -@pytest.mark.parametrize("n_neighbors", [1, 3, 6]) -@pytest.mark.parametrize("n_parts", [None, 2, 3, 5]) -@pytest.mark.parametrize("batch_size", [256, 512, 1024]) -def test_predict(dataset, datatype, n_neighbors, n_parts, batch_size, client): +@pytest.mark.parametrize("n_neighbors", [1, 3, 8]) +@pytest.mark.parametrize("n_parts", [2, 4, 12]) +@pytest.mark.parametrize("batch_size", [128, 1024]) +def test_predict_and_score(dataset, datatype, n_neighbors, + n_parts, batch_size, client): X_train, X_test, y_train, y_test = dataset + np_y_test = y_test l_model = lKNNClf(n_neighbors=n_neighbors) l_model.fit(X_train, y_train) l_distances, l_indices = l_model.kneighbors(X_test) l_labels = l_model.predict(X_test) local_out = (l_labels, l_indices, l_distances) - - if not n_parts: - n_parts = len(client.has_what().keys()) + handmade_local_score = np.mean(y_test == l_labels) + handmade_local_score = round(handmade_local_score, 3) X_train = generate_dask_array(X_train, n_parts) X_test = generate_dask_array(X_test, n_parts) y_train = generate_dask_array(y_train, n_parts) + y_test = generate_dask_array(y_test, n_parts) if datatype == 'dask_cudf': X_train = to_dask_cudf(X_train, client) X_test = to_dask_cudf(X_test, client) y_train = to_dask_cudf(y_train, client) + y_test = to_dask_cudf(y_test, client) d_model = dKNNClf(client=client, n_neighbors=n_neighbors, batch_size=batch_size) @@ -147,6 +141,9 @@ def test_predict(dataset, datatype, n_neighbors, n_parts, batch_size, client): d_labels, d_indices, d_distances = \ d_model.predict(X_test, convert_dtype=True) distributed_out = da.compute(d_labels, d_indices, d_distances) + if datatype == 'dask_array': + distributed_score = d_model.score(X_test, y_test) + distributed_score = round(distributed_score, 3) if datatype == 'dask_cudf': distributed_out = list(map(lambda o: o.as_matrix() @@ -154,66 +151,29 @@ def test_predict(dataset, datatype, n_neighbors, n_parts, batch_size, client): else o.to_array()[..., np.newaxis], distributed_out)) - match_test(local_out, distributed_out) - assert accuracy_score(y_test, distributed_out[0]) > 0.12 - - -@pytest.mark.skip(reason="Sometimes incorrect labels are returned") -@pytest.mark.parametrize("datatype", ['dask_array']) -@pytest.mark.parametrize("n_neighbors", [1, 2, 3]) -@pytest.mark.parametrize("n_parts", [None, 2, 3, 5]) -def test_score(dataset, datatype, n_neighbors, n_parts, client): - X_train, X_test, y_train, y_test = dataset - - if not n_parts: - n_parts = len(client.has_what().keys()) + exact_match(local_out, distributed_out) - X_train = generate_dask_array(X_train, n_parts) - X_test = generate_dask_array(X_test, n_parts) - y_train = generate_dask_array(y_train, n_parts) - y_test = generate_dask_array(y_test, n_parts) - - if datatype == 'dask_cudf': - X_train = to_dask_cudf(X_train, client) - X_test = to_dask_cudf(X_test, client) - y_train = to_dask_cudf(y_train, client) - y_test = to_dask_cudf(y_test, client) - - d_model = dKNNClf(client=client, n_neighbors=n_neighbors) - d_model.fit(X_train, y_train) - d_labels, d_indices, d_distances = \ - d_model.predict(X_test, convert_dtype=True) - distributed_out = da.compute(d_labels, d_indices, d_distances) - - if datatype == 'dask_cudf': - distributed_out = list(map(lambda o: o.as_matrix() - if isinstance(o, DataFrame) - else o.to_array()[..., np.newaxis], - distributed_out)) - cuml_score = d_model.score(X_test, y_test) - - if datatype == 'dask_cudf': - y_test = y_test.compute().as_matrix() + if datatype == 'dask_array': + assert distributed_score == handmade_local_score else: - y_test = y_test.compute() - manual_score = np.mean(y_test == distributed_out[0]) - - assert cuml_score == manual_score + y_pred = distributed_out[0] + handmade_distributed_score = np.mean(np_y_test == y_pred) + handmade_distributed_score = round(handmade_distributed_score, 3) + assert handmade_distributed_score == handmade_local_score @pytest.mark.parametrize("datatype", ['dask_array', 'dask_cudf']) -@pytest.mark.parametrize("n_neighbors", [1, 3, 6]) -@pytest.mark.parametrize("n_parts", [None, 2, 3, 5]) -def test_predict_proba(dataset, datatype, n_neighbors, n_parts, client): +@pytest.mark.parametrize("n_neighbors", [1, 3, 8]) +@pytest.mark.parametrize("n_parts", [2, 4, 12]) +@pytest.mark.parametrize("batch_size", [128, 1024]) +def test_predict_proba(dataset, datatype, n_neighbors, + n_parts, batch_size, client): X_train, X_test, y_train, y_test = dataset l_model = lKNNClf(n_neighbors=n_neighbors) l_model.fit(X_train, y_train) l_probas = l_model.predict_proba(X_test) - if not n_parts: - n_parts = len(client.has_what().keys()) - X_train = generate_dask_array(X_train, n_parts) X_test = generate_dask_array(X_test, n_parts) y_train = generate_dask_array(y_train, n_parts) diff --git a/python/cuml/test/dask/test_kneighbors_regressor.py b/python/cuml/test/dask/test_kneighbors_regressor.py index 6b8d54b4e1..4f9685d8e2 100644 --- a/python/cuml/test/dask/test_kneighbors_regressor.py +++ b/python/cuml/test/dask/test_kneighbors_regressor.py @@ -29,6 +29,7 @@ from cuml.dask.common.dask_arr_utils import to_dask_cudf from cudf.core.dataframe import DataFrame import numpy as np +from sklearn.metrics import r2_score def generate_dask_array(np_array, n_parts): @@ -43,12 +44,12 @@ def generate_dask_array(np_array, n_parts): @pytest.fixture( scope="module", params=[ - unit_param({'n_samples': 1000, 'n_features': 30, + unit_param({'n_samples': 3000, 'n_features': 30, 'n_classes': 5, 'n_targets': 2}), - quality_param({'n_samples': 5000, 'n_features': 100, - 'n_classes': 12, 'n_targets': 4}), - stress_param({'n_samples': 12000, 'n_features': 40, - 'n_classes': 5, 'n_targets': 2}) + quality_param({'n_samples': 8000, 'n_features': 35, + 'n_classes': 12, 'n_targets': 3}), + stress_param({'n_samples': 20000, 'n_features': 40, + 'n_classes': 12, 'n_targets': 4}) ]) def dataset(request): X, y = make_multilabel_classification( @@ -69,69 +70,64 @@ def dataset(request): if len(new_x) >= request.param['n_samples']: break X = X[new_x] - y = np.array(new_y) + noise = np.random.normal(0, 1.2, X.shape) + X += noise + y = np.array(new_y, dtype=np.float32) - return train_test_split(X, y, test_size=0.33) + return train_test_split(X, y, test_size=0.1) -def accuracy_score(y_true, y_pred): - assert y_pred.shape[0] == y_true.shape[0] - assert y_pred.shape[1] == y_true.shape[1] - return np.mean(y_pred == y_true) - - -def match_test(output1, output2): - o1, i1, d1 = output1 - o2, i2, d2 = output2 +def exact_match(output1, output2): + l1, i1, d1 = output1 + l2, i2, d2 = output2 + l2 = l2.squeeze() # Check shapes - assert o1.shape == o2.shape + assert l1.shape == l2.shape assert i1.shape == i2.shape assert d1.shape == d2.shape - # Distances should strictly match - assert np.array_equal(d1, d2) + # Distances should match + d1 = np.round(d1, 4) + d2 = np.round(d2, 4) + assert np.mean(d1 == d2) > 0.98 - # Indices might differ for equivalent distances - for i in range(d1.shape[0]): - idx_set1, idx_set2 = (set(), set()) - dist = 0. - for j in range(d1.shape[1]): - if d1[i, j] > dist: - assert idx_set1 == idx_set2 - idx_set1, idx_set2 = (set(), set()) - dist = d1[i, j] - idx_set1.add(i1[i, j]) - idx_set2.add(i2[i, j]) - # the last set of indices is not guaranteed + # Indices should match + correct_queries = (i1 == i2).all(axis=1) + assert np.mean(correct_queries) > 0.95 - # As indices might differ, outputs can also differ + # Labels should match + correct_queries = (l1 == l2).all(axis=1) + assert np.mean(correct_queries) > 0.95 @pytest.mark.parametrize("datatype", ['dask_array', 'dask_cudf']) -@pytest.mark.parametrize("n_neighbors", [1, 3, 6]) -@pytest.mark.parametrize("n_parts", [None, 2, 3, 5]) -@pytest.mark.parametrize("batch_size", [128, 512, 1024]) -def test_predict(dataset, datatype, n_neighbors, n_parts, batch_size, client): +@pytest.mark.parametrize("n_neighbors", [1, 3, 8]) +@pytest.mark.parametrize("n_parts", [2, 4, 12]) +@pytest.mark.parametrize("batch_size", [128, 1024]) +def test_predict_and_score(dataset, datatype, n_neighbors, + n_parts, batch_size, client): X_train, X_test, y_train, y_test = dataset + np_y_test = y_test l_model = lKNNReg(n_neighbors=n_neighbors) l_model.fit(X_train, y_train) l_distances, l_indices = l_model.kneighbors(X_test) l_outputs = l_model.predict(X_test) local_out = (l_outputs, l_indices, l_distances) - - if not n_parts: - n_parts = len(client.has_what().keys()) + handmade_local_score = r2_score(y_test, l_outputs) + handmade_local_score = round(float(handmade_local_score), 3) X_train = generate_dask_array(X_train, n_parts) X_test = generate_dask_array(X_test, n_parts) y_train = generate_dask_array(y_train, n_parts) + y_test = generate_dask_array(y_test, n_parts) if datatype == 'dask_cudf': X_train = to_dask_cudf(X_train, client) X_test = to_dask_cudf(X_test, client) y_train = to_dask_cudf(y_train, client) + y_test = to_dask_cudf(y_test, client) d_model = dKNNReg(client=client, n_neighbors=n_neighbors, batch_size=batch_size) @@ -139,6 +135,9 @@ def test_predict(dataset, datatype, n_neighbors, n_parts, batch_size, client): d_outputs, d_indices, d_distances = \ d_model.predict(X_test, convert_dtype=True) distributed_out = da.compute(d_outputs, d_indices, d_distances) + if datatype == 'dask_array': + distributed_score = d_model.score(X_test, y_test) + distributed_score = round(float(distributed_score), 3) if datatype == 'dask_cudf': distributed_out = list(map(lambda o: o.as_matrix() @@ -146,47 +145,12 @@ def test_predict(dataset, datatype, n_neighbors, n_parts, batch_size, client): else o.to_array()[..., np.newaxis], distributed_out)) - match_test(local_out, distributed_out) - accuracy_score(local_out[0], distributed_out[0]) > 0.12 - - -@pytest.mark.parametrize("datatype", ['dask_array']) -@pytest.mark.parametrize("n_neighbors", [1, 3, 8]) -@pytest.mark.parametrize("n_parts", [None, 2, 3, 5]) -def test_score(dataset, datatype, n_neighbors, n_parts, client): - X_train, X_test, y_train, y_test = dataset - - if not n_parts: - n_parts = len(client.has_what().keys()) + exact_match(local_out, distributed_out) - X_train = generate_dask_array(X_train, n_parts) - X_test = generate_dask_array(X_test, n_parts) - y_train = generate_dask_array(y_train, n_parts) - y_test = generate_dask_array(y_test, n_parts) - - if datatype == 'dask_cudf': - X_train = to_dask_cudf(X_train, client) - X_test = to_dask_cudf(X_test, client) - y_train = to_dask_cudf(y_train, client) - y_test = to_dask_cudf(y_test, client) - - d_model = dKNNReg(client=client, n_neighbors=n_neighbors) - d_model.fit(X_train, y_train) - d_outputs, d_indices, d_distances = \ - d_model.predict(X_test, convert_dtype=True) - distributed_out = da.compute(d_outputs, d_indices, d_distances) - - if datatype == 'dask_cudf': - distributed_out = list(map(lambda o: o.as_matrix() - if isinstance(o, DataFrame) - else o.to_array()[..., np.newaxis], - distributed_out)) - cuml_score = d_model.score(X_test, y_test) - - if datatype == 'dask_cudf': - y_test = y_test.compute().as_matrix() + if datatype == 'dask_array': + assert distributed_score == handmade_local_score else: - y_test = y_test.compute() - manual_score = accuracy_score(y_test, distributed_out[0]) - - assert cuml_score == manual_score + y_pred = distributed_out[0] + handmade_distributed_score = float(r2_score(np_y_test, y_pred)) + handmade_distributed_score = round(handmade_distributed_score, 3) + assert handmade_distributed_score == handmade_local_score