From 9bf5c8134438abb78c6b62b2d1c2ebc53fe4f706 Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Tue, 13 Jun 2023 17:36:42 +0800 Subject: [PATCH] code format; (#311) --- .../ps/table/common_graph_table.cc | 38 +- .../fleet/heter_ps/graph_gpu_ps_table_inl.cu | 1344 +++++++++-------- .../fleet/heter_ps/graph_gpu_wrapper.cu | 204 +-- .../fleet/heter_ps/graph_gpu_wrapper.h | 30 +- .../framework/fleet/heter_ps/heter_comm.h | 103 +- 5 files changed, 952 insertions(+), 767 deletions(-) diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index bd433477e5a06c..2b65d59a9c7d46 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -177,8 +177,10 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea( return res; } -paddle::framework::GpuPsCommGraphFloatFea GraphTable::make_gpu_ps_graph_float_fea( - int gpu_id, std::vector &node_ids, int float_slot_num) { +paddle::framework::GpuPsCommGraphFloatFea +GraphTable::make_gpu_ps_graph_float_fea(int gpu_id, + std::vector &node_ids, + int float_slot_num) { size_t shard_num = 64; std::vector> bags(shard_num); std::vector feature_array[shard_num]; @@ -241,8 +243,8 @@ paddle::framework::GpuPsCommGraphFloatFea GraphTable::make_gpu_ps_graph_float_fe for (size_t i = 0; i < shard_num; i++) { tot_len += feature_array[i].size(); } - VLOG(1) << "Loaded float feature table on cpu, float feature_list_size[" << tot_len - << "] node_ids_size[" << node_ids.size() << "]"; + VLOG(1) << "Loaded float feature table on cpu, float feature_list_size[" + << tot_len << "] node_ids_size[" << node_ids.size() << "]"; res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), float_slot_num); unsigned int offset = 0, ind = 0; for (size_t i = 0; i < shard_num; i++) { @@ -1321,7 +1323,9 @@ GraphNode *GraphShard::add_graph_node(Node *node) { return reinterpret_cast(bucket[node_location[id]]); } -FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap, int float_fea_num) { +FeatureNode *GraphShard::add_feature_node(uint64_t id, + bool is_overlap, + int float_fea_num) { if (node_location.find(id) == node_location.end()) { node_location[id] = bucket.size(); if (float_fea_num > 0) { @@ -1945,11 +1949,13 @@ std::pair GraphTable::parse_node_file( size_t index = shard_id - shard_start; int slot_fea_num = 0; - if (feat_name.size() > 0) slot_fea_num = feat_name[idx].size(); + if (feat_name.size() > 0) slot_fea_num = feat_name[idx].size(); int float_fea_num = 0; - if (float_feat_id_map.size() > 0) float_fea_num = float_feat_id_map[idx].size(); + if (float_feat_id_map.size() > 0) + float_fea_num = float_feat_id_map[idx].size(); if (load_slot) { - auto node = feature_shards[idx][index]->add_feature_node(id, false, float_fea_num); + auto node = feature_shards[idx][index]->add_feature_node( + id, false, float_fea_num); if (node != NULL) { if (slot_fea_num > 0) node->set_feature_size(slot_fea_num); if (float_fea_num > 0) node->set_float_feature_size(float_fea_num); @@ -2019,9 +2025,11 @@ std::pair GraphTable::parse_node_file( } size_t index = shard_id - shard_start; int float_fea_num = 0; - if (float_feat_id_map.size() > 0) float_fea_num = float_feat_id_map[idx].size(); + if (float_feat_id_map.size() > 0) + float_fea_num = float_feat_id_map[idx].size(); if (load_slot) { - auto node = feature_shards[idx][index]->add_feature_node(id, false, float_fea_num); + auto node = feature_shards[idx][index]->add_feature_node( + id, false, float_fea_num); if (node != NULL) { for (int i = 2; i < num; ++i) { auto &v = vals[i]; @@ -2661,7 +2669,7 @@ int GraphTable::parse_feature(int idx, return -1; } return 0; - } + } // else if (dtype == "float64") { // not used // int ret = FeatureNode::parse_value_to_bytes( // fea_fields.begin(), fea_fields.end(), fea_ptr); @@ -2672,8 +2680,9 @@ int GraphTable::parse_feature(int idx, // return 0; // } } else { - VLOG(4) << "feature_name[" << name << "] is not in feat_id_map, ntype_id[" - << idx << "] feat_id_map_size[" << feat_id_map.size() << "]"; + VLOG(4) << "feature_name[" << name + << "] is not in feat_id_map, ntype_id[" << idx + << "] feat_id_map_size[" << feat_id_map.size() << "]"; } } } @@ -3053,8 +3062,7 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { feat_shape[k].push_back(f_shape); feat_dtype[k].push_back(f_dtype); feat_id_map[k][f_name] = feasign_idx++; - } - else if (f_dtype == "float32"){ + } else if (f_dtype == "float32") { if (float_feat_id_map.size() < (size_t)node_types.size()) { float_feat_name.resize(node_types.size()); float_feat_shape.resize(node_types.size()); diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.cu index 82250a143ebc4d..a4c6983b6ae5b2 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table_inl.cu @@ -20,8 +20,8 @@ #include "math.h" #pragma once #ifdef PADDLE_WITH_HETERPS -#include "cudf/random.cuh" #include "cudf/block_radix_topk.cuh" +#include "cudf/random.cuh" #include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h" #include "paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h" #define ALIGN_INT64(LEN) (uint64_t((LEN) + 7) & uint64_t(~7)) @@ -336,7 +336,8 @@ __global__ void neighbor_sample_kernel_walking(GpuPsCommGraph graph, for (int idx = 0; idx < sample_len; idx++) { sample_array[offset + idx] = data[data_offset + begin + idx]; if (return_weight) { - weight_array[offset + idx] = (float)weight[data_offset + begin + idx]; + weight_array[offset + idx] = (float)weight[data_offset + begin + +idx]; } } } @@ -347,11 +348,12 @@ __global__ void neighbor_sample_kernel_walking(GpuPsCommGraph graph, // For Weighted Sample template -__global__ void get_actual_size_and_neighbor_count(GpuPsNodeInfo* node_info_list, - int* actual_size, - int* neighbor_count, - int sample_len, - int n) { +__global__ void get_actual_size_and_neighbor_count( + GpuPsNodeInfo* node_info_list, + int* actual_size, + int* neighbor_count, + int sample_len, + int n) { int i = threadIdx.x + blockIdx.x * blockDim.x; if (i >= n) return; int neighbor_len = node_info_list[i].neighbor_size; @@ -365,7 +367,8 @@ __global__ void get_actual_size_and_neighbor_count(GpuPsNodeInfo* node_info_list } } -__device__ __forceinline__ float gen_key_from_weight(const float weight, RandomNumGen &rng) { +__device__ __forceinline__ float gen_key_from_weight(const float weight, + RandomNumGen& rng) { rng.NextValue(); float u = -rng.RandomUniformFloat(1.0f, 0.5f); long long random_num2 = 0; @@ -381,17 +384,17 @@ __device__ __forceinline__ float gen_key_from_weight(const float weight, RandomN } template -__launch_bounds__(BLOCK_SIZE) -__global__ void weighted_sample_large_kernel(GpuPsCommGraph graph, - GpuPsNodeInfo* node_info_list, - uint64_t* res, - const int* target_neighbor_offset, - float* weight_keys_buff, - int n, - int sample_len, - unsigned long long random_seed, - float* weight_array, - bool return_weight) { +__launch_bounds__(BLOCK_SIZE) __global__ + void weighted_sample_large_kernel(GpuPsCommGraph graph, + GpuPsNodeInfo* node_info_list, + uint64_t* res, + const int* target_neighbor_offset, + float* weight_keys_buff, + int n, + int sample_len, + unsigned long long random_seed, + float* weight_array, + bool return_weight) { int i = blockIdx.x; if (i >= n) return; int gidx = threadIdx.x + blockIdx.x * BLOCK_SIZE; @@ -412,18 +415,24 @@ __global__ void weighted_sample_large_kernel(GpuPsCommGraph graph, RandomNumGen rng(gidx, random_seed); // get weight threshold for (int j = threadIdx.x; j < neighbor_len; j += BLOCK_SIZE) { float thread_weight = (float)weight[data_offset + j]; - weight_keys_local_buff[j] = static_cast(gen_key_from_weight(thread_weight, rng)); + weight_keys_local_buff[j] = + static_cast(gen_key_from_weight(thread_weight, rng)); } __syncthreads(); float topk_val; bool topk_is_unique; - using BlockRadixSelectT = BlockRadixTopKGlobalMemory; + using BlockRadixSelectT = + BlockRadixTopKGlobalMemory; __shared__ typename BlockRadixSelectT::TempStorage share_storage; BlockRadixSelectT{share_storage}.radixTopKGetThreshold( - weight_keys_local_buff, sample_len, neighbor_len, topk_val, topk_is_unique); + weight_keys_local_buff, + sample_len, + neighbor_len, + topk_val, + topk_is_unique); __shared__ int cnt; if (threadIdx.x == 0) { @@ -431,8 +440,9 @@ __global__ void weighted_sample_large_kernel(GpuPsCommGraph graph, } __syncthreads(); - // We use atomicAdd 1 operations instead of binaryScan to calculate the write index, - // since we do not need to keep the relative positions of element. + // We use atomicAdd 1 operations instead of binaryScan to calculate the + // write index, since we do not need to keep the relative positions of + // element. if (topk_is_unique) { for (int j = threadIdx.x; j < neighbor_len; j += BLOCK_SIZE) { @@ -480,15 +490,15 @@ __global__ void weighted_sample_large_kernel(GpuPsCommGraph graph, // A-RES algorithm template -__launch_bounds__(BLOCK_SIZE) -__global__ void weighted_sample_kernel(GpuPsCommGraph graph, - GpuPsNodeInfo* node_info_list, - uint64_t* res, - int n, - int sample_len, - unsigned long long random_seed, - float* weight_array, - bool return_weight) { +__launch_bounds__(BLOCK_SIZE) __global__ + void weighted_sample_kernel(GpuPsCommGraph graph, + GpuPsNodeInfo* node_info_list, + uint64_t* res, + int n, + int sample_len, + unsigned long long random_seed, + float* weight_array, + bool return_weight) { int i = blockIdx.x; if (i >= n) return; int gidx = threadIdx.x + blockIdx.x * BLOCK_SIZE; @@ -524,14 +534,16 @@ __global__ void weighted_sample_kernel(GpuPsCommGraph graph, neighbor_idxs[j] = idx; } } - const int valid_count = (neighbor_len < (BLOCK_SIZE * ITEMS_PER_THREAD)) ? - neighbor_len : (BLOCK_SIZE * ITEMS_PER_THREAD); + const int valid_count = (neighbor_len < (BLOCK_SIZE * ITEMS_PER_THREAD)) + ? neighbor_len + : (BLOCK_SIZE * ITEMS_PER_THREAD); BlockRadixTopKT{sort_tmp_storage}.radixTopKToStriped( weight_keys, neighbor_idxs, sample_len, valid_count); __syncthreads(); const int stride = BLOCK_SIZE * ITEMS_PER_THREAD - sample_len; - for (int idx_offset = ITEMS_PER_THREAD * BLOCK_SIZE; idx_offset < neighbor_len; + for (int idx_offset = ITEMS_PER_THREAD * BLOCK_SIZE; + idx_offset < neighbor_len; idx_offset += stride) { #pragma unroll for (int j = 0; j < ITEMS_PER_THREAD; j++) { @@ -543,8 +555,10 @@ __global__ void weighted_sample_kernel(GpuPsCommGraph graph, neighbor_idxs[j] = target_idx; } } - const int iter_valid_count = ((neighbor_len - idx_offset) >= stride) ? - (BLOCK_SIZE * ITEMS_PER_THREAD) : (sample_len + neighbor_len - idx_offset); + const int iter_valid_count = + ((neighbor_len - idx_offset) >= stride) + ? (BLOCK_SIZE * ITEMS_PER_THREAD) + : (sample_len + neighbor_len - idx_offset); BlockRadixTopKT{sort_tmp_storage}.radixTopKToStriped( weight_keys, neighbor_idxs, sample_len, iter_valid_count); __syncthreads(); @@ -555,7 +569,8 @@ __global__ void weighted_sample_kernel(GpuPsCommGraph graph, if (idx < sample_len) { res[offset + idx] = data[data_offset + neighbor_idxs[j]]; if (return_weight) { - weight_array[offset + idx] = (float)weight[data_offset + neighbor_idxs[j]]; + weight_array[offset + idx] = + (float)weight[data_offset + neighbor_idxs[j]]; } } } @@ -600,7 +615,7 @@ __global__ void unweighted_sample_large_kernel(GpuPsCommGraph graph, const int rand_num = rng.RandomMod(j + 1); if (rand_num < sample_len) { atomicMax(reinterpret_cast(res + offset + rand_num), - static_cast(j)); + static_cast(j)); } } __syncthreads(); @@ -642,7 +657,7 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, for (int j = threadIdx.x; j < neighbor_len; j += blockDim.x) { res[offset + j] = data[data_offset + j]; if (return_weight) { - weight_array[offset + j] = (float) weight[data_offset + j]; + weight_array[offset + j] = (float)weight[data_offset + j]; } } } else { @@ -650,7 +665,8 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, uint64_t sa_p[ITEMS_PER_THREAD]; int M = sample_len; int N = neighbor_len; - typedef cub::BlockRadixSort BlockRadixSort; + typedef cub::BlockRadixSort + BlockRadixSort; struct IntArray { int value[BLOCK_DIM * ITEMS_PER_THREAD]; }; @@ -669,7 +685,7 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, for (int j = 0; j < ITEMS_PER_THREAD; j++) { uint32_t idx = j * BLOCK_DIM + threadIdx.x; uint32_t r = idx < M ? rng.RandomMod(N - idx) : N; - sa_p[j] = ((uint64_t) r << 32UL) | idx; + sa_p[j] = ((uint64_t)r << 32UL) | idx; } __syncthreads(); BlockRadixSort(shared_data.temp_storage).SortBlockedToStriped(sa_p); @@ -677,7 +693,7 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, #pragma unroll for (int j = 0; j < ITEMS_PER_THREAD; j++) { int idx = j * BLOCK_DIM + threadIdx.x; - int s = (int) (sa_p[j] >> 32UL); + int s = (int)(sa_p[j] >> 32UL); shared_data.sample_shared_data.s.value[idx] = s; int p = sa_p[j] & 0xFFFFFFFF; shared_data.sample_shared_data.p.value[idx] = p; @@ -691,7 +707,8 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, int si = shared_data.sample_shared_data.s.value[idx]; int si1 = shared_data.sample_shared_data.s.value[idx + 1]; if (idx < M && (idx == M - 1 || si != si1) && si >= N - M) { - shared_data.sample_shared_data.chain.value[N - si - 1] = shared_data.sample_shared_data.p.value[idx]; + shared_data.sample_shared_data.chain.value[N - si - 1] = + shared_data.sample_shared_data.p.value[idx]; } } __syncthreads(); @@ -699,7 +716,8 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, #pragma unroll for (int j = 0; j < ITEMS_PER_THREAD; j++) { int idx = j * BLOCK_DIM + threadIdx.x; - shared_data.sample_shared_data.last_chain_tmp.value[idx] = shared_data.sample_shared_data.chain.value[idx]; + shared_data.sample_shared_data.last_chain_tmp.value[idx] = + shared_data.sample_shared_data.chain.value[idx]; } __syncthreads(); #pragma unroll @@ -707,7 +725,8 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, int idx = j * BLOCK_DIM + threadIdx.x; if (idx < M) { shared_data.sample_shared_data.chain.value[idx] = - shared_data.sample_shared_data.last_chain_tmp.value[shared_data.sample_shared_data.last_chain_tmp.value[idx]]; + shared_data.sample_shared_data.last_chain_tmp.value + [shared_data.sample_shared_data.last_chain_tmp.value[idx]]; } } __syncthreads(); @@ -715,7 +734,8 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, #pragma unroll for (int j = 0; j < ITEMS_PER_THREAD; j++) { int idx = j * BLOCK_DIM + threadIdx.x; - shared_data.sample_shared_data.last_chain_tmp.value[idx] = N - shared_data.sample_shared_data.chain.value[idx] - 1; + shared_data.sample_shared_data.last_chain_tmp.value[idx] = + N - shared_data.sample_shared_data.chain.value[idx] - 1; } __syncthreads(); #pragma unroll @@ -724,8 +744,9 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, int ai; if (idx < M) { int qi = shared_data.sample_shared_data.q.value[idx]; - if (idx == 0 || qi == 0 - || shared_data.sample_shared_data.s.value[qi] != shared_data.sample_shared_data.s.value[qi - 1]) { + if (idx == 0 || qi == 0 || + shared_data.sample_shared_data.s.value[qi] != + shared_data.sample_shared_data.s.value[qi - 1]) { ai = shared_data.sample_shared_data.s.value[qi]; } else { int prev_i = shared_data.sample_shared_data.p.value[qi - 1]; @@ -742,41 +763,43 @@ __global__ void unweighted_sample_kernel(GpuPsCommGraph graph, if (idx < M) { res[offset + idx] = data[data_offset + ai]; if (return_weight) { - weight_array[offset + idx] = (float) weight[data_offset + ai]; + weight_array[offset + idx] = (float)weight[data_offset + ai]; } } } } } -void GpuPsGraphTable::weighted_sample( - GpuPsCommGraph& graph, - GpuPsNodeInfo* node_info_list, - int* actual_size_array, - uint64_t* sample_array, - int* neighbor_count_ptr, - int cur_gpu_id, - int remote_gpu_id, - int sample_size, - int shard_len, - bool need_neighbor_count, - unsigned long long random_seed, - float* weight_array, - bool return_weight) { - platform::CUDAPlace place = platform::CUDAPlace(resource_->dev_id(cur_gpu_id)); +void GpuPsGraphTable::weighted_sample(GpuPsCommGraph& graph, + GpuPsNodeInfo* node_info_list, + int* actual_size_array, + uint64_t* sample_array, + int* neighbor_count_ptr, + int cur_gpu_id, + int remote_gpu_id, + int sample_size, + int shard_len, + bool need_neighbor_count, + unsigned long long random_seed, + float* weight_array, + bool return_weight) { + platform::CUDAPlace place = + platform::CUDAPlace(resource_->dev_id(cur_gpu_id)); auto cur_stream = resource_->remote_stream(remote_gpu_id, cur_gpu_id); constexpr int BLOCK_SIZE = 256; int grid_size = (shard_len + 127) / 128; if (need_neighbor_count) { - get_actual_size_and_neighbor_count<<< - grid_size, 128, 0, cur_stream>>>(node_info_list, actual_size_array, - neighbor_count_ptr, sample_size, - shard_len); + get_actual_size_and_neighbor_count + <<>>(node_info_list, + actual_size_array, + neighbor_count_ptr, + sample_size, + shard_len); } else { - get_actual_size_and_neighbor_count<<< - grid_size, 128, 0, cur_stream>>>(node_info_list, actual_size_array, - nullptr, sample_size, shard_len); + get_actual_size_and_neighbor_count + <<>>( + node_info_list, actual_size_array, nullptr, sample_size, shard_len); } CUDA_CHECK(cudaStreamSynchronize(cur_stream)); @@ -801,21 +824,36 @@ void GpuPsGraphTable::weighted_sample( target_neighbor_counts * sizeof(float), phi::Stream(reinterpret_cast(cur_stream))); float* target_weights_key_buf_ptr = - reinterpret_cast(target_weights_key_buf->ptr()); - weighted_sample_large_kernel<<>>( - graph, node_info_list, sample_array, neighbor_offset, - target_weights_key_buf_ptr, shard_len, sample_size, random_seed, - weight_array, return_weight); + reinterpret_cast(target_weights_key_buf->ptr()); + weighted_sample_large_kernel + <<>>(graph, + node_info_list, + sample_array, + neighbor_offset, + target_weights_key_buf_ptr, + shard_len, + sample_size, + random_seed, + weight_array, + return_weight); CUDA_CHECK(cudaStreamSynchronize(cur_stream)); } else { - using WeightedSampleFuncType = void (*)(GpuPsCommGraph, GpuPsNodeInfo *, - uint64_t *, int, int, unsigned long long, - float *, bool); + using WeightedSampleFuncType = void (*)(GpuPsCommGraph, + GpuPsNodeInfo*, + uint64_t*, + int, + int, + unsigned long long, + float*, + bool); static const WeightedSampleFuncType func_array[7] = { - weighted_sample_kernel<4, 128>, weighted_sample_kernel<6, 128>, - weighted_sample_kernel<4, 256>, weighted_sample_kernel<5, 256>, - weighted_sample_kernel<6, 256>, weighted_sample_kernel<8, 256>, - weighted_sample_kernel<8, 512>, + weighted_sample_kernel<4, 128>, + weighted_sample_kernel<6, 128>, + weighted_sample_kernel<4, 256>, + weighted_sample_kernel<5, 256>, + weighted_sample_kernel<6, 256>, + weighted_sample_kernel<8, 256>, + weighted_sample_kernel<8, 512>, }; const int block_sizes[7] = {128, 128, 256, 256, 256, 256, 512}; auto choose_func_idx = [](int sample_size) { @@ -834,37 +872,54 @@ void GpuPsGraphTable::weighted_sample( int func_idx = choose_func_idx(sample_size); int block_size = block_sizes[func_idx]; func_array[func_idx]<<>>( - graph, node_info_list, sample_array, shard_len, sample_size, random_seed, - weight_array, return_weight); + graph, + node_info_list, + sample_array, + shard_len, + sample_size, + random_seed, + weight_array, + return_weight); CUDA_CHECK(cudaStreamSynchronize(cur_stream)); } } -void GpuPsGraphTable::unweighted_sample( - GpuPsCommGraph& graph, - GpuPsNodeInfo* node_info_list, - int* actual_size_array, - uint64_t* sample_array, - int cur_gpu_id, - int remote_gpu_id, - int sample_size, - int shard_len, - unsigned long long random_seed, - float* weight_array, - bool return_weight) { - - platform::CUDAPlace place = platform::CUDAPlace(resource_->dev_id(cur_gpu_id)); +void GpuPsGraphTable::unweighted_sample(GpuPsCommGraph& graph, + GpuPsNodeInfo* node_info_list, + int* actual_size_array, + uint64_t* sample_array, + int cur_gpu_id, + int remote_gpu_id, + int sample_size, + int shard_len, + unsigned long long random_seed, + float* weight_array, + bool return_weight) { + platform::CUDAPlace place = + platform::CUDAPlace(resource_->dev_id(cur_gpu_id)); auto cur_stream = resource_->remote_stream(remote_gpu_id, cur_gpu_id); if (sample_size > SAMPLE_SIZE_THRESHOLD) { unweighted_sample_large_kernel<<>>( - graph, node_info_list, sample_array, actual_size_array, shard_len, sample_size, - random_seed, weight_array, return_weight); + graph, + node_info_list, + sample_array, + actual_size_array, + shard_len, + sample_size, + random_seed, + weight_array, + return_weight); } else { - using UnWeightedSampleFuncType = void (*) (GpuPsCommGraph, GpuPsNodeInfo *, - uint64_t *, int *, int, int, - unsigned long long, float *, - bool); + using UnWeightedSampleFuncType = void (*)(GpuPsCommGraph, + GpuPsNodeInfo*, + uint64_t*, + int*, + int, + int, + unsigned long long, + float*, + bool); static const UnWeightedSampleFuncType func_array[32] = { unweighted_sample_kernel<32, 1>, unweighted_sample_kernel<32, 2>, unweighted_sample_kernel<32, 3>, unweighted_sample_kernel<64, 2>, @@ -883,12 +938,22 @@ void GpuPsGraphTable::unweighted_sample( unweighted_sample_kernel<256, 4>, unweighted_sample_kernel<256, 4>, unweighted_sample_kernel<256, 4>, unweighted_sample_kernel<256, 4>, }; - static const int warp_count_array[32] = {1, 1, 1, 2, 2, 2, 4, 4, 4, 4, 4, 4, 8, 8, 8, 8, - 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8}; + static const int warp_count_array[32] = {1, 1, 1, 2, 2, 2, 4, 4, 4, 4, 4, + 4, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, + 8, 8, 8, 8, 8, 8, 8, 8, 8, 8}; int func_idx = (sample_size - 1) / 32; - func_array[func_idx]<<>>( - graph, node_info_list, sample_array, actual_size_array, shard_len, sample_size, - random_seed, weight_array, return_weight); + func_array[func_idx]<<>>(graph, + node_info_list, + sample_array, + actual_size_array, + shard_len, + sample_size, + random_seed, + weight_array, + return_weight); } CUDA_CHECK(cudaStreamSynchronize(cur_stream)); } @@ -1020,15 +1085,16 @@ void GpuPsGraphTable::move_result_to_source_gpu(int start_index, } } -void GpuPsGraphTable::move_float_result_to_source_gpu(int start_index, - int gpu_num, - int* h_left, - int* h_right, - int* fea_left, - uint32_t* fea_num_list, - uint32_t* actual_feature_size, - float* feature_list, - uint8_t* slot_list) { +void GpuPsGraphTable::move_float_result_to_source_gpu( + int start_index, + int gpu_num, + int* h_left, + int* h_right, + int* fea_left, + uint32_t* fea_num_list, + uint32_t* actual_feature_size, + float* feature_list, + uint8_t* slot_list) { int shard_len[gpu_num]; // NOLINT for (int i = 0; i < gpu_num; i++) { if (h_left[i] == -1 || h_right[i] == -1) { @@ -1056,8 +1122,8 @@ void GpuPsGraphTable::move_float_result_to_source_gpu(int start_index, node.out_stream); MemcpyPeerAsync(reinterpret_cast(slot_list + fea_left[i]), node.val_storage + - sizeof(uint32_t) * (shard_len[i] + shard_len[i] % 2) + - sizeof(float) * fea_num_list[i], + sizeof(uint32_t) * (shard_len[i] + shard_len[i] % 2) + + sizeof(float) * fea_num_list[i], sizeof(uint8_t) * fea_num_list[i], node.out_stream); } @@ -1068,7 +1134,8 @@ void GpuPsGraphTable::move_float_result_to_source_gpu(int start_index, node.out_stream); // MemcpyPeerAsync(reinterpret_cast(actual_slot_size + h_left[i]), // node.val_storage + - // sizeof(uint32_t) * (shard_len[i] + shard_len[i] % 2), + // sizeof(uint32_t) * (shard_len[i] + shard_len[i] % + // 2), // sizeof(uint32_t) * shard_len[i], // node.out_stream); } @@ -1229,7 +1296,8 @@ void GpuPsGraphTable::move_result_to_source_gpu_all_edge_type( node.val_storage + sizeof(int64_t) * shard_len[j] * edge_type_len + sizeof(int) * (shard_len[j] * edge_type_len) + sizeof(float) * (shard_len[j] * sample_size * edge_type_len) + - sizeof(int) * ((shard_len[j] * edge_type_len) * (sample_size + 1)) % 2 + + sizeof(int) * + ((shard_len[j] * edge_type_len) * (sample_size + 1)) % 2 + sizeof(uint64_t) * i * shard_len[j] * sample_size, sizeof(uint64_t) * shard_len[j] * sample_size, node.out_stream); @@ -1303,14 +1371,14 @@ __global__ void fill_float_feature_and_slot(float* dst_feature_list, } __global__ void fill_vari_feature_and_slot(uint64_t* dst_feature_list, - uint8_t* dst_slot_list, - uint64_t* src_feature_list, - uint8_t* src_slot_list, - uint32_t* dst_size_prefix_sum_list, - uint32_t* src_size_prefix_sum_list, - uint32_t* src_size_list, - uint32_t* idx, - int len) { + uint8_t* dst_slot_list, + uint64_t* src_feature_list, + uint8_t* src_slot_list, + uint32_t* dst_size_prefix_sum_list, + uint32_t* src_size_prefix_sum_list, + uint32_t* src_size_list, + uint32_t* idx, + int len) { const size_t i = blockIdx.x * blockDim.x + threadIdx.x; if (i < len) { uint32_t dst_index = dst_size_prefix_sum_list[idx[i]]; @@ -1322,8 +1390,6 @@ __global__ void fill_vari_feature_and_slot(uint64_t* dst_feature_list, } } - - /* TODO: how to optimize it to eliminate the for loop @@ -1364,12 +1430,13 @@ __global__ void fill_dvalues_with_edge_type(uint64_t* d_shard_vals, float* d_weights, int* idx, int sample_size, - int len, // len * edge_type_len - int mod, // len + int len, // len * edge_type_len + int mod, // len bool return_weight) { const size_t i = blockIdx.x * blockDim.x + threadIdx.x; if (i < len) { - int a = i % mod, b = i - i % mod; // a: get actual pos, b: get fill in which edge_type + int a = i % mod, + b = i - i % mod; // a: get actual pos, b: get fill in which edge_type d_actual_sample_size[b + idx[a]] = d_shard_actual_sample_size[i]; size_t offset1 = (b + idx[a]) * sample_size; size_t offset2 = i * sample_size; @@ -1391,13 +1458,14 @@ __global__ void fill_dvalues_with_edge_type_for_all2all( float* d_weights, int* idx, int sample_size, - int n, // len * edge_type_len + int n, // len * edge_type_len int len, int edge_type_len, bool return_weight) { // len const size_t i = blockIdx.x * blockDim.x + threadIdx.x; if (i < n) { - int pos = idx[i % len] * edge_type_len + (int)i / len; // pos: real position + int pos = + idx[i % len] * edge_type_len + (int)i / len; // pos: real position d_actual_sample_size[pos] = d_shard_actual_sample_size[i]; size_t offset1 = pos * sample_size; size_t offset2 = i * sample_size; @@ -1475,7 +1543,8 @@ void GpuPsGraphTable::clear_feature_info(int gpu_id) { if (float_feature_table_num_ > 0) { // float fea idx = 1; - int float_offset = get_table_offset(gpu_id, GraphTableType::FEATURE_TABLE, idx); + int float_offset = + get_table_offset(gpu_id, GraphTableType::FEATURE_TABLE, idx); if (float_offset < tables_.size()) { delete tables_[float_offset]; tables_[float_offset] = NULL; @@ -1532,14 +1601,12 @@ void GpuPsGraphTable::reset_feature_info(int gpu_id, } } - void GpuPsGraphTable::reset_float_feature_info(int gpu_id, size_t capacity, size_t feature_size) { int idx = 1; auto stream = get_local_stream(gpu_id); - int offset = - get_table_offset(gpu_id, GraphTableType::FEATURE_TABLE, idx); + int offset = get_table_offset(gpu_id, GraphTableType::FEATURE_TABLE, idx); if (offset < tables_.size()) { delete tables_[offset]; tables_[offset] = new Table(capacity, stream); @@ -1550,16 +1617,16 @@ void GpuPsGraphTable::reset_float_feature_info(int gpu_id, graph.node_list = NULL; if (graph.feature_list == NULL) { - CUDA_CHECK(cudaMalloc((void**)&graph.feature_list, - feature_size * sizeof(float))); + CUDA_CHECK( + cudaMalloc((void**)&graph.feature_list, feature_size * sizeof(float))); CUDA_CHECK(cudaMalloc((void**)&graph.slot_id_list, ALIGN_INT64(feature_size * sizeof(uint8_t)))); graph.feature_capacity = feature_size; } else if (graph.feature_capacity < feature_size) { cudaFree(graph.feature_list); cudaFree(graph.slot_id_list); - CUDA_CHECK(cudaMalloc((void**)&graph.feature_list, - feature_size * sizeof(float))); + CUDA_CHECK( + cudaMalloc((void**)&graph.feature_list, feature_size * sizeof(float))); CUDA_CHECK(cudaMalloc((void**)&graph.slot_id_list, ALIGN_INT64(feature_size * sizeof(uint8_t)))); graph.feature_capacity = feature_size; @@ -1604,7 +1671,7 @@ void GpuPsGraphTable::build_graph_fea_on_single_gpu(const GpuPsCommGraphFea& g, int gpu_id) { platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id)); size_t capacity = std::max((uint64_t)1, g.node_size) / load_factor_; - int ntype_id = 0; // slot feature + int ntype_id = 0; // slot feature reset_feature_info(gpu_id, capacity, g.feature_size); int offset = get_graph_fea_list_offset(gpu_id); int table_offset = @@ -1645,23 +1712,23 @@ void GpuPsGraphTable::build_graph_fea_on_single_gpu(const GpuPsCommGraphFea& g, << gpu_graph_fea_list_[offset].feature_size; } -void GpuPsGraphTable::build_graph_float_fea_on_single_gpu(const GpuPsCommGraphFloatFea& g, - int gpu_id) { +void GpuPsGraphTable::build_graph_float_fea_on_single_gpu( + const GpuPsCommGraphFloatFea& g, int gpu_id) { platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id)); size_t capacity = std::max((uint64_t)1, g.node_size) / load_factor_; - int ntype_id = 1; // float feature + int ntype_id = 1; // float feature reset_float_feature_info(gpu_id, capacity, g.feature_size); int offset = get_graph_float_fea_list_offset(gpu_id); int table_offset = get_table_offset(gpu_id, GraphTableType::FEATURE_TABLE, ntype_id); if (g.node_size > 0) { - build_ps(gpu_id, - g.node_list, - reinterpret_cast(g.fea_info_list), - g.node_size, - HBMPS_MAX_BUFF, - 8, - table_offset); + build_ps(gpu_id, + g.node_list, + reinterpret_cast(g.fea_info_list), + g.node_size, + HBMPS_MAX_BUFF, + 8, + table_offset); gpu_graph_float_fea_list_[offset].node_size = g.node_size; } else { build_ps(gpu_id, NULL, NULL, 0, HBMPS_MAX_BUFF, 8, table_offset); @@ -1773,10 +1840,10 @@ void GpuPsGraphTable::build_graph_on_single_gpu(const GpuPsCommGraph& g, cudaError_t cudaStatus; if (!FLAGS_enable_neighbor_list_use_uva) { cudaStatus = cudaMalloc(&gpu_graph_list_[offset].neighbor_list, - g.neighbor_size * sizeof(uint64_t)); + g.neighbor_size * sizeof(uint64_t)); } else { cudaStatus = cudaMallocManaged(&gpu_graph_list_[offset].neighbor_list, - g.neighbor_size * sizeof(uint64_t)); + g.neighbor_size * sizeof(uint64_t)); } PADDLE_ENFORCE_EQ(cudaStatus, cudaSuccess, @@ -1796,11 +1863,12 @@ void GpuPsGraphTable::build_graph_on_single_gpu(const GpuPsCommGraph& g, if (g.is_weighted) { cudaError_t cudaStatus = cudaMalloc(&gpu_graph_list_[offset].weight_list, g.neighbor_size * sizeof(half)); - PADDLE_ENFORCE_EQ(cudaStatus, - cudaSuccess, - platform::errors::InvalidArgument( - "failed to allocate memory for graph edge weight on gpu %d", - resource_->dev_id(gpu_id))); + PADDLE_ENFORCE_EQ( + cudaStatus, + cudaSuccess, + platform::errors::InvalidArgument( + "failed to allocate memory for graph edge weight on gpu %d", + resource_->dev_id(gpu_id))); VLOG(0) << "successfully allocate " << g.neighbor_size * sizeof(float) << " bytes of memory for graph-edges-weight on gpu " << resource_->dev_id(gpu_id); @@ -1867,11 +1935,13 @@ void GpuPsGraphTable::build_graph_from_cpu( } if (cpu_graph_list[i].neighbor_size) { if (!FLAGS_enable_neighbor_list_use_uva) { - CUDA_CHECK(cudaMalloc(&gpu_graph_list_[offset].neighbor_list, - cpu_graph_list[i].neighbor_size * sizeof(uint64_t))); + CUDA_CHECK( + cudaMalloc(&gpu_graph_list_[offset].neighbor_list, + cpu_graph_list[i].neighbor_size * sizeof(uint64_t))); } else { - CUDA_CHECK(cudaMallocManaged(&gpu_graph_list_[offset].neighbor_list, - cpu_graph_list[i].neighbor_size * sizeof(uint64_t))); + CUDA_CHECK(cudaMallocManaged( + &gpu_graph_list_[offset].neighbor_list, + cpu_graph_list[i].neighbor_size * sizeof(uint64_t))); } CUDA_CHECK( @@ -1893,38 +1963,38 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v3( NeighborSampleQuery q, bool cpu_switch, bool compress, bool weighted) { if (multi_node_ && FLAGS_enable_graph_multi_node_sampling) { // multi node mode - if (q.sample_step == 1 ) { + if (q.sample_step == 1) { auto result = graph_neighbor_sample_v2(global_device_map[q.gpu_id], - q.table_idx, - q.src_nodes, - q.sample_size, - q.len, - cpu_switch, - compress, - weighted); + q.table_idx, + q.src_nodes, + q.sample_size, + q.len, + cpu_switch, + compress, + weighted); return result; } else { auto result = graph_neighbor_sample_all2all(global_device_map[q.gpu_id], - q.sample_step, - q.table_idx, - q.src_nodes, - q.sample_size, - q.len, - cpu_switch, - compress, - weighted); + q.sample_step, + q.table_idx, + q.src_nodes, + q.sample_size, + q.len, + cpu_switch, + compress, + weighted); return result; } } else { // single node mode auto result = graph_neighbor_sample_v2(global_device_map[q.gpu_id], - q.table_idx, - q.src_nodes, - q.sample_size, - q.len, - cpu_switch, - compress, - weighted); + q.table_idx, + q.src_nodes, + q.sample_size, + q.len, + cpu_switch, + compress, + weighted); return result; } } @@ -1948,16 +2018,15 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_all2all( bool compress, bool weighted) { platform::CUDADeviceGuard guard(gpu_id); - auto &loc = storage_[gpu_id]; + auto& loc = storage_[gpu_id]; auto stream = resource_->local_stream(gpu_id, 0); - loc.alloc(len, sizeof(uint64_t)/*value_bytes*/); - + loc.alloc(len, sizeof(uint64_t) /*value_bytes*/); // all2all mode begins. init resource, partition keys, pull vals by all2all auto pull_size = gather_inter_keys_by_all2all(gpu_id, len, d_keys, stream); - VLOG(2) << "gather_inter_keys_by_all2all finish, pull_size=" << pull_size << ", len=" << len; - + VLOG(2) << "gather_inter_keys_by_all2all finish, pull_size=" << pull_size + << ", len=" << len; // do single-node multi-card sampling auto result = graph_neighbor_sample_v2(gpu_id, @@ -1969,9 +2038,8 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_all2all( compress, weighted); VLOG(2) << "graph_neighbor_sample_v2 local finish" - << ", gpu_id=" << gpu_id - << ", pull_size=" << pull_size - << ", total_sample_size=" << result.total_sample_size; + << ", gpu_id=" << gpu_id << ", pull_size=" << pull_size + << ", total_sample_size=" << result.total_sample_size; // init neighbor result NeighborSampleResult final; @@ -1979,24 +2047,28 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_all2all( final.initialize(sample_size, len, gpu_id); // all2all mode finish, scatter sample values by all2all - scatter_inter_vals_by_all2all_common(gpu_id, - len, - sizeof(uint64_t), //value_bytes - reinterpret_cast(result.val), // in - reinterpret_cast(final.val), // out - reinterpret_cast(loc.d_merged_vals), // tmp hbm - stream); - VLOG(2) << "scatter_inter_vals_by_all2all val finish" << " gpu_id=" << gpu_id; + scatter_inter_vals_by_all2all_common( + gpu_id, + len, + sizeof(uint64_t), // value_bytes + reinterpret_cast(result.val), // in + reinterpret_cast(final.val), // out + reinterpret_cast(loc.d_merged_vals), // tmp hbm + stream); + VLOG(2) << "scatter_inter_vals_by_all2all val finish" + << " gpu_id=" << gpu_id; // all2all mode finish, scatter sample sizes of every node by all2all - scatter_inter_vals_by_all2all_common(gpu_id, - len, - sizeof(int), //value_bytes - reinterpret_cast(result.actual_sample_size), // in - reinterpret_cast(final.actual_sample_size), // out - reinterpret_cast(loc.d_merged_vals), // temp hbm - stream); - VLOG(2) << "scatter_inter_vals_by_all2all actual_sample_size finish" << " gpu_id=" << gpu_id; + scatter_inter_vals_by_all2all_common( + gpu_id, + len, + sizeof(int), // value_bytes + reinterpret_cast(result.actual_sample_size), // in + reinterpret_cast(final.actual_sample_size), // out + reinterpret_cast(loc.d_merged_vals), // temp hbm + stream); + VLOG(2) << "scatter_inter_vals_by_all2all actual_sample_size finish" + << " gpu_id=" << gpu_id; // build final.actual_val if (compress) { @@ -2012,7 +2084,7 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_all2all( reinterpret_cast(cumsum_actual_sample_size->ptr()); CUDA_CHECK( cudaMemsetAsync(cumsum_actual_sample_size_p, 0, sizeof(int), stream)); - //VLOG(0) << "InclusiveSum begin"; + // VLOG(0) << "InclusiveSum begin"; CUDA_CHECK(cub::DeviceScan::InclusiveSum(NULL, temp_storage_bytes, final.actual_sample_size, @@ -2045,11 +2117,10 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_all2all( reinterpret_cast((final.actual_val_mem)->ptr()); VLOG(2) << "sample step:" << sample_step - << ", total_sample_size:" << total_sample_size - << ", len=" << len - << ", final.val=" << final.val - << ", final.actual_val=" << final.actual_val - << ", final.actual_sample_size=" << final.actual_sample_size; + << ", total_sample_size:" << total_sample_size << ", len=" << len + << ", final.val=" << final.val + << ", final.actual_val=" << final.actual_val + << ", final.actual_sample_size=" << final.actual_sample_size; int grid_size = (len - 1) / block_size_ + 1; fill_actual_vals<<>>( @@ -2137,12 +2208,12 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2( reinterpret_cast(d_shard_actual_sample_size->ptr()); split_idx_to_shard(reinterpret_cast(key), - d_idx_ptr, - len, - d_left_ptr, - d_right_ptr, - gpu_id, - stream); + d_idx_ptr, + len, + d_left_ptr, + d_right_ptr, + gpu_id, + stream); heter_comm_kernel_->fill_shard_key( d_shard_keys_ptr, key, d_idx_ptr, len, stream); @@ -2211,14 +2282,13 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2( const dim3 block(WARP_SIZE, BLOCK_WARPS); const dim3 grid((shard_len + TILE_SIZE - 1) / TILE_SIZE); neighbor_sample_kernel_walking - <<>>( - graph, - node_info_list, - actual_size_array, - sample_array, - sample_size, - shard_len, - default_value); + <<>>(graph, + node_info_list, + actual_size_array, + sample_array, + sample_size, + shard_len, + default_value); } else { // Weighted sample. thread_local std::random_device rd; @@ -2227,22 +2297,33 @@ NeighborSampleResult GpuPsGraphTable::graph_neighbor_sample_v2( unsigned long long random_seed = distrib(gen); const bool need_neighbor_count = sample_size > SAMPLE_SIZE_THRESHOLD; - int *neighbor_count_ptr = nullptr; + int* neighbor_count_ptr = nullptr; std::shared_ptr neighbor_count; if (need_neighbor_count) { - neighbor_count = - memory::AllocShared(place, - (shard_len + 1) * sizeof(int), - phi::Stream(reinterpret_cast(cur_stream))); - neighbor_count_ptr = reinterpret_cast(neighbor_count->ptr()); + neighbor_count = memory::AllocShared( + place, + (shard_len + 1) * sizeof(int), + phi::Stream(reinterpret_cast(cur_stream))); + neighbor_count_ptr = reinterpret_cast(neighbor_count->ptr()); } PADDLE_ENFORCE_GT(sample_size, 0, - platform::errors::InvalidArgument("sample_size should be greater than 0.")); - weighted_sample(graph, node_info_list, actual_size_array, sample_array, - neighbor_count_ptr, gpu_id, i, sample_size, shard_len, - need_neighbor_count, random_seed, nullptr, false); + platform::errors::InvalidArgument( + "sample_size should be greater than 0.")); + weighted_sample(graph, + node_info_list, + actual_size_array, + sample_array, + neighbor_count_ptr, + gpu_id, + i, + sample_size, + shard_len, + need_neighbor_count, + random_seed, + nullptr, + false); } } @@ -2444,26 +2525,24 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_sage( bool return_weight) { if (multi_node_ && FLAGS_enable_graph_multi_node_sampling) { // multi node mode - auto result = graph_neighbor_sample_sage_all2all( - gpu_id, - edge_type_len, - key, - sample_size, - len, - edge_type_graphs, - weighted, - return_weight); + auto result = graph_neighbor_sample_sage_all2all(gpu_id, + edge_type_len, + key, + sample_size, + len, + edge_type_graphs, + weighted, + return_weight); return result; } else { - auto result = graph_neighbor_sample_all_edge_type( - gpu_id, - edge_type_len, - key, - sample_size, - len, - edge_type_graphs, - weighted, - return_weight); + auto result = graph_neighbor_sample_all_edge_type(gpu_id, + edge_type_len, + key, + sample_size, + len, + edge_type_graphs, + weighted, + return_weight); return result; } } @@ -2481,7 +2560,7 @@ __global__ void rearange_neighbor_result(uint64_t* val, bool return_weight) { const size_t i = blockIdx.x * blockDim.x + threadIdx.x; if (i < n) { - int pos = (i % edge_type_len) * len + (int) i / edge_type_len; + int pos = (i % edge_type_len) * len + (int)i / edge_type_len; new_ac[pos] = ac[i]; size_t offset1 = pos * sample_size, offset2 = i * sample_size; for (int j = 0; j < ac[i]; j++) { @@ -2502,16 +2581,16 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_sage_all2all( std::vector> edge_type_graphs, bool weighted, bool return_weight) { - platform::CUDADeviceGuard guard(gpu_id); - auto &loc = storage_[gpu_id]; + auto& loc = storage_[gpu_id]; auto stream = resource_->local_stream(gpu_id, 0); - loc.alloc(len, sizeof(uint64_t) * edge_type_len * sample_size); // key_bytes + loc.alloc(len, sizeof(uint64_t) * edge_type_len * sample_size); // key_bytes // all2all mode begins, init resource, partition keys, pull vals by all2all. auto pull_size = gather_inter_keys_by_all2all(gpu_id, len, d_keys, stream); - VLOG(2) << "gather_inter_keys_by_all2all sage finish, pull_size=" << pull_size << ", len=" << len; + VLOG(2) << "gather_inter_keys_by_all2all sage finish, pull_size=" << pull_size + << ", len=" << len; // do single-node multi-card sampling auto result = graph_neighbor_sample_all_edge_type(gpu_id, @@ -2525,55 +2604,58 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_sage_all2all( true); VLOG(2) << "graph_neighbor_sample_all_edge_type local finish" - << ", gpu_id=" << gpu_id - << ", pull_size=" << pull_size; + << ", gpu_id=" << gpu_id << ", pull_size=" << pull_size; // init neighbor result NeighborSampleResultV2 final; final.set_stream(stream); - final.initialize(sample_size, len, edge_type_len, return_weight, - gpu_id); + final.initialize(sample_size, len, edge_type_len, return_weight, gpu_id); VLOG(2) << "Begin scatter_inter_vals_by_all2all_common for val"; // all2all mode finish, scatter sample values by all2all - scatter_inter_vals_by_all2all_common(gpu_id, - len, - sizeof(uint64_t) * edge_type_len * sample_size, // value_bytes - reinterpret_cast(result.val), // in - reinterpret_cast(final.val), // out - reinterpret_cast(loc.d_merged_vals), // tmp hbm - stream, - true); - VLOG(2) << "scatter_inter_vals_by_all2all sage val finish" << " gpu_id=" << gpu_id; + scatter_inter_vals_by_all2all_common( + gpu_id, + len, + sizeof(uint64_t) * edge_type_len * sample_size, // value_bytes + reinterpret_cast(result.val), // in + reinterpret_cast(final.val), // out + reinterpret_cast(loc.d_merged_vals), // tmp hbm + stream, + true); + VLOG(2) << "scatter_inter_vals_by_all2all sage val finish" + << " gpu_id=" << gpu_id; // all2all mode finish, scatter sample sizes of every node by all2all - scatter_inter_vals_by_all2all_common(gpu_id, - len, - sizeof(int) * edge_type_len, // value_bytes - reinterpret_cast(result.actual_sample_size), // in - reinterpret_cast(final.actual_sample_size), // out - reinterpret_cast(loc.d_merged_vals), // tmp hbm - stream, - true); - VLOG(2) << "scatter_inter_vals_by_all2all sage actual_sample_size finish" << " gpu_id=" << gpu_id; + scatter_inter_vals_by_all2all_common( + gpu_id, + len, + sizeof(int) * edge_type_len, // value_bytes + reinterpret_cast(result.actual_sample_size), // in + reinterpret_cast(final.actual_sample_size), // out + reinterpret_cast(loc.d_merged_vals), // tmp hbm + stream, + true); + VLOG(2) << "scatter_inter_vals_by_all2all sage actual_sample_size finish" + << " gpu_id=" << gpu_id; if (return_weight) { - scatter_inter_vals_by_all2all_common(gpu_id, - len, - sizeof(float) * edge_type_len * sample_size, // value_bytes - reinterpret_cast(result.weight), // in - reinterpret_cast(final.weight), // out - reinterpret_cast(loc.d_merged_vals), // tmp hbm - stream, - true); - VLOG(2) << "scatter_inter_vals_by_all2all sage weight finish" << " gpu_id=" << gpu_id; + scatter_inter_vals_by_all2all_common( + gpu_id, + len, + sizeof(float) * edge_type_len * sample_size, // value_bytes + reinterpret_cast(result.weight), // in + reinterpret_cast(final.weight), // out + reinterpret_cast(loc.d_merged_vals), // tmp hbm + stream, + true); + VLOG(2) << "scatter_inter_vals_by_all2all sage weight finish" + << " gpu_id=" << gpu_id; } // Rearange neighbor result. NeighborSampleResultV2 final2; final2.set_stream(stream); - final2.initialize(sample_size, len, edge_type_len, return_weight, - gpu_id); + final2.initialize(sample_size, len, edge_type_len, return_weight, gpu_id); int grid_size_e = (len * edge_type_len - 1) / block_size_ + 1; rearange_neighbor_result<<>>( reinterpret_cast(final.val), @@ -2605,7 +2687,10 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( NeighborSampleResultV2 result; auto stream = resource_->local_stream(gpu_id, 0); result.set_stream(stream); - result.initialize(sample_size, len, edge_type_len, return_weight, + result.initialize(sample_size, + len, + edge_type_len, + return_weight, resource_->dev_id(gpu_id)); if (len == 0) { return result; @@ -2661,21 +2746,20 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( float* d_shard_weight_ptr = nullptr; std::shared_ptr d_shard_weight; if (return_weight) { - d_shard_weight = - memory::AllocShared(place, - sample_size * len * edge_type_len * sizeof(float), - phi::Stream(reinterpret_cast(stream))); - d_shard_weight_ptr = - reinterpret_cast(d_shard_weight->ptr()); + d_shard_weight = memory::AllocShared( + place, + sample_size * len * edge_type_len * sizeof(float), + phi::Stream(reinterpret_cast(stream))); + d_shard_weight_ptr = reinterpret_cast(d_shard_weight->ptr()); } split_idx_to_shard(reinterpret_cast(key), - d_idx_ptr, - len, - d_left_ptr, - d_right_ptr, - gpu_id, - stream); + d_idx_ptr, + len, + d_left_ptr, + d_right_ptr, + gpu_id, + stream); heter_comm_kernel_->fill_shard_key( d_shard_keys_ptr, key, d_idx_ptr, len, stream); @@ -2714,10 +2798,13 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( i, shard_len * sizeof(uint64_t), shard_len * sizeof(uint64_t) * edge_type_len + // key - shard_len * sample_size * sizeof(uint64_t) * edge_type_len + // sample + shard_len * sample_size * sizeof(uint64_t) * + edge_type_len + // sample shard_len * sizeof(int) * edge_type_len + // actual sample size - shard_len * sample_size * sizeof(float) * edge_type_len + // edge weight - sizeof(int) * ((shard_len * edge_type_len * (1 + sample_size)) % 2)); // align, sizeof(int) == sizeof(float) + shard_len * sample_size * sizeof(float) * + edge_type_len + // edge weight + sizeof(int) * ((shard_len * edge_type_len * (1 + sample_size)) % + 2)); // align, sizeof(int) == sizeof(float) } } walk_to_dest(gpu_id, @@ -2764,8 +2851,8 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( float* weight_array_base = nullptr; uint64_t* sample_array_base = nullptr; if (return_weight) { - weight_array_base = - reinterpret_cast(actual_size_base + shard_len * edge_type_len); + weight_array_base = reinterpret_cast(actual_size_base + + shard_len * edge_type_len); sample_array_base = reinterpret_cast( weight_array_base + shard_len * edge_type_len * sample_size + (((shard_len * edge_type_len) * (1 + sample_size)) % 2)); @@ -2781,7 +2868,8 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( unsigned long long random_seed = distrib(gen); PADDLE_ENFORCE_GT(sample_size, 0, - platform::errors::InvalidArgument("sample_size should be greater than 0.")); + platform::errors::InvalidArgument( + "sample_size should be greater than 0.")); if (!weighted) { /*int grid_size_ = (shard_len * edge_type_len - 1) / block_size_ + 1; @@ -2802,16 +2890,25 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( for (int edge_idx = 0; edge_idx < edge_type_len; edge_idx++) { GpuPsNodeInfo* node_info_list = node_info_base + edge_idx * shard_len; int* actual_size_array = actual_size_base + edge_idx * shard_len; - uint64_t* sample_array = sample_array_base + edge_idx * shard_len * sample_size; + uint64_t* sample_array = + sample_array_base + edge_idx * shard_len * sample_size; float* weight_array = nullptr; if (return_weight) { weight_array = weight_array_base + edge_idx * shard_len * sample_size; } int offset = get_graph_list_offset(i, edge_idx); auto graph = gpu_graph_list_[offset]; - unweighted_sample(graph, node_info_list, actual_size_array, sample_array, - gpu_id, i, sample_size, shard_len, random_seed, - weight_array, return_weight); + unweighted_sample(graph, + node_info_list, + actual_size_array, + sample_array, + gpu_id, + i, + sample_size, + shard_len, + random_seed, + weight_array, + return_weight); } } else { // Weighted sample. @@ -2819,16 +2916,17 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( int* neighbor_count_ptr = nullptr; std::shared_ptr neighbor_count; if (need_neighbor_count) { - neighbor_count = - memory::AllocShared(place, - (shard_len + 1) * sizeof(int), - phi::Stream(reinterpret_cast(cur_stream))); - neighbor_count_ptr = reinterpret_cast(neighbor_count->ptr()); + neighbor_count = memory::AllocShared( + place, + (shard_len + 1) * sizeof(int), + phi::Stream(reinterpret_cast(cur_stream))); + neighbor_count_ptr = reinterpret_cast(neighbor_count->ptr()); } for (int edge_idx = 0; edge_idx < edge_type_len; edge_idx++) { GpuPsNodeInfo* node_info_list = node_info_base + edge_idx * shard_len; int* actual_size_array = actual_size_base + edge_idx * shard_len; - uint64_t* sample_array = sample_array_base + edge_idx * shard_len * sample_size; + uint64_t* sample_array = + sample_array_base + edge_idx * shard_len * sample_size; float* weight_array = nullptr; if (return_weight) { weight_array = weight_array_base + edge_idx * shard_len * sample_size; @@ -2836,9 +2934,19 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( int offset = get_graph_list_offset(i, edge_idx); auto graph = gpu_graph_list_[offset]; - weighted_sample(graph, node_info_list, actual_size_array, sample_array, - neighbor_count_ptr, gpu_id, i, sample_size, shard_len, - need_neighbor_count, random_seed, weight_array, return_weight); + weighted_sample(graph, + node_info_list, + actual_size_array, + sample_array, + neighbor_count_ptr, + gpu_id, + i, + sample_size, + shard_len, + need_neighbor_count, + random_seed, + weight_array, + return_weight); } } } @@ -2864,7 +2972,8 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( int grid_size_e = (len * edge_type_len - 1) / block_size_ + 1; if (!for_all2all) { - // vals: [e1, e2, e3], where e1 means all the sample res of len for edge_type 1. + // vals: [e1, e2, e3], where e1 means all the sample res of len for + // edge_type 1. fill_dvalues_with_edge_type<<>>( d_shard_vals_ptr, val, @@ -2879,8 +2988,12 @@ NeighborSampleResultV2 GpuPsGraphTable::graph_neighbor_sample_all_edge_type( return_weight); } else { // different fill mode for all2all - // vals: [node1, node2, node3], where node1 means all the sample res of node1, including all edge_types. - fill_dvalues_with_edge_type_for_all2all<<>>( + // vals: [node1, node2, node3], where node1 means all the sample res of + // node1, including all edge_types. + fill_dvalues_with_edge_type_for_all2all<<>>( d_shard_vals_ptr, val, d_shard_actual_sample_size_ptr, @@ -2950,12 +3063,12 @@ void GpuPsGraphTable::get_node_degree( phi::Stream(reinterpret_cast(stream))); int* d_shard_degree_ptr = reinterpret_cast(d_shard_degree->ptr()); split_idx_to_shard(reinterpret_cast(key), - d_idx_ptr, - len, - d_left_ptr, - d_right_ptr, - gpu_id, - stream); + d_idx_ptr, + len, + d_left_ptr, + d_right_ptr, + gpu_id, + stream); heter_comm_kernel_->fill_shard_key( d_shard_keys_ptr, key, d_idx_ptr, len, stream); CUDA_CHECK(cudaMemcpyAsync(h_left, @@ -3008,10 +3121,7 @@ void GpuPsGraphTable::get_node_degree( reinterpret_cast(node.val_storage); int* node_degree_array = reinterpret_cast(node_info_list + shard_len); int grid_size_ = (shard_len - 1) / block_size_ + 1; - get_node_degree_kernel<<>>( + get_node_degree_kernel<<>>( node_info_list, node_degree_array, shard_len); } for (int i = 0; i < total_gpu; ++i) { @@ -3089,235 +3199,264 @@ int GpuPsGraphTable::get_feature_info_of_nodes( int gpu_id, uint64_t* d_nodes, int node_num, - std::shared_ptr &size_list, - std::shared_ptr &size_list_prefix_sum, + std::shared_ptr& size_list, + std::shared_ptr& size_list_prefix_sum, std::shared_ptr& feature_list, std::shared_ptr& slot_list, bool sage_mode) { - if (node_num == 0) { - return 0; - } - - int all_fea_num = 0; - if (multi_node_) { - if (infer_mode_) { - all_fea_num = get_feature_info_of_nodes_normal(gpu_id, d_nodes, node_num, size_list, - size_list_prefix_sum, feature_list, slot_list); - } else { - if (FLAGS_enable_graph_multi_node_sampling) { - all_fea_num = get_feature_info_of_nodes_all2all(gpu_id, d_nodes, node_num, size_list, - size_list_prefix_sum, feature_list, slot_list, - sage_mode); - } - } - } else { - all_fea_num = get_feature_info_of_nodes_normal(gpu_id, d_nodes, node_num, size_list, - size_list_prefix_sum, feature_list, slot_list); - } - VLOG(2) << "end get feature info of nodes, all_fea_num: " << all_fea_num; - return all_fea_num; + if (node_num == 0) { + return 0; + } + + int all_fea_num = 0; + if (multi_node_) { + if (infer_mode_) { + all_fea_num = get_feature_info_of_nodes_normal(gpu_id, + d_nodes, + node_num, + size_list, + size_list_prefix_sum, + feature_list, + slot_list); + } else { + if (FLAGS_enable_graph_multi_node_sampling) { + all_fea_num = get_feature_info_of_nodes_all2all(gpu_id, + d_nodes, + node_num, + size_list, + size_list_prefix_sum, + feature_list, + slot_list, + sage_mode); + } + } + } else { + all_fea_num = get_feature_info_of_nodes_normal(gpu_id, + d_nodes, + node_num, + size_list, + size_list_prefix_sum, + feature_list, + slot_list); + } + VLOG(2) << "end get feature info of nodes, all_fea_num: " << all_fea_num; + return all_fea_num; } int GpuPsGraphTable::get_feature_info_of_nodes_all2all( - int gpu_id, - uint64_t* d_nodes, - int node_num, - std::shared_ptr &size_list, - std::shared_ptr &size_list_prefix_sum, - std::shared_ptr& feature_list, - std::shared_ptr& slot_list, - bool sage_mode) { - if (node_num == 0) { - return 0; - } - - platform::CUDAPlace place = platform::CUDAPlace(resource_->dev_id(gpu_id)); - platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id)); - int total_gpu = resource_->total_device(); - auto stream = resource_->local_stream(gpu_id, 0); - auto &loc = storage_[gpu_id]; - auto &res = loc.shard_res; - - size_t pull_size = 0; - //loc.alloc(node_num, sizeof(uint64_t) /*key_bytes*/); - loc.alloc(node_num, sizeof(uint32_t) /*key_bytes*/); - pull_size = gather_inter_keys_by_all2all(gpu_id, - node_num, - d_nodes, - stream); - VLOG(2) << "gather iner keys by all2all, pull size: " << pull_size << ", node num: " << node_num; - - std::shared_ptr d_tmp_feature_list; - std::shared_ptr d_tmp_slot_list; - std::shared_ptr d_tmp_size_list; - std::shared_ptr d_tmp_size_prefixsum_list; - - d_tmp_size_list = - memory::Alloc(place, - pull_size * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - d_tmp_size_prefixsum_list = - memory::Alloc(place, - pull_size * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - int ret = get_feature_info_of_nodes_normal(gpu_id, - loc.d_merged_keys, - pull_size, - d_tmp_size_list, - d_tmp_size_prefixsum_list, - d_tmp_feature_list, - d_tmp_slot_list); - VLOG(2) << "finish feature info of nodes, ret: " << ret; - - uint64_t *d_tmp_feature_list_ptr = - reinterpret_cast(d_tmp_feature_list->ptr()); - uint8_t *d_tmp_slot_list_ptr = reinterpret_cast(d_tmp_slot_list->ptr()); - uint32_t *d_tmp_size_list_ptr = - reinterpret_cast(d_tmp_size_list->ptr()); - uint32_t *d_tmp_size_prefixsum_list_ptr = - reinterpret_cast(d_tmp_size_prefixsum_list->ptr()); - - uint32_t *size_list_ptr = - reinterpret_cast(size_list->ptr()); - uint32_t *size_list_prefix_sum_ptr = - reinterpret_cast(size_list_prefix_sum->ptr()); - - VLOG(2) << "begin scatter size list"; - scatter_inter_vals_by_all2all_common(gpu_id, - node_num, - sizeof(uint32_t), - reinterpret_cast(d_tmp_size_list_ptr), - reinterpret_cast(size_list_ptr), - reinterpret_cast(loc.d_merged_vals), - stream); - VLOG(2) << "end scatter size list"; - std::shared_ptr inter_size_list = - memory::Alloc(place, - node_num * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - uint32_t *inter_size_list_ptr = - reinterpret_cast(inter_size_list->ptr()); - CUDA_CHECK(cudaMemcpyAsync(inter_size_list_ptr, - loc.d_merged_vals, - sizeof(uint32_t) * node_num, - cudaMemcpyDeviceToDevice, - stream)); - - VLOG(2) << "begin calc size list prefix sum"; - size_t storage_bytes = 0; - CUDA_CHECK(cub::DeviceScan::ExclusiveSum( - NULL, storage_bytes, size_list_ptr, size_list_prefix_sum_ptr, node_num, stream)); - CUDA_CHECK(cudaStreamSynchronize(stream)); - auto d_temp_storage_tmp = - memory::Alloc(place, - storage_bytes, - phi::Stream(reinterpret_cast(stream))); - CUDA_CHECK(cub::DeviceScan::ExclusiveSum(d_temp_storage_tmp->ptr(), - storage_bytes, - size_list_ptr, - size_list_prefix_sum_ptr, - node_num, - stream)); - VLOG(2) << "end calc size list prefix sum"; - - std::vector h_feature_size_list(node_num, 0); - CUDA_CHECK(cudaMemcpyAsync( - reinterpret_cast(h_feature_size_list.data()), - size_list_ptr, - sizeof(uint32_t) * node_num, - cudaMemcpyDeviceToHost, - stream)); - int fea_num = 0; - for (size_t i = 0; i < h_feature_size_list.size(); i++) { - fea_num += h_feature_size_list[i]; - } - VLOG(2) << "after calc, total fea num:" << fea_num; - - feature_list = - memory::Alloc(place, - fea_num * sizeof(uint64_t), - phi::Stream(reinterpret_cast(stream))); - uint64_t* feature_list_ptr = - reinterpret_cast(feature_list->ptr()); - slot_list = - memory::Alloc(place, - fea_num * sizeof(uint8_t), - phi::Stream(reinterpret_cast(stream))); - uint8_t* slot_list_ptr = reinterpret_cast(slot_list->ptr()); - - //calc new offset - recalc_local_and_remote_size(gpu_id, pull_size, node_num, d_tmp_size_list_ptr, inter_size_list_ptr, stream); - - VLOG(2) << "begin send feature list"; - std::shared_ptr inter_feature_list = - memory::Alloc(place, - fea_num * sizeof(uint64_t), - phi::Stream(reinterpret_cast(stream))); - uint64_t *inter_feature_list_ptr = - reinterpret_cast(inter_feature_list->ptr()); - - scatter_inter_vals_by_all2all_common(gpu_id, - node_num, - sizeof(uint64_t), - reinterpret_cast(d_tmp_feature_list_ptr), - reinterpret_cast(feature_list_ptr), - reinterpret_cast(inter_feature_list_ptr), - stream, - sage_mode, - true); - VLOG(2) << "end send feature list"; - - VLOG(2) << "begin send slot list"; - std::shared_ptr inter_slot_list = - memory::Alloc(place, - fea_num * sizeof(uint8_t), - phi::Stream(reinterpret_cast(stream))); - uint8_t *inter_slot_list_ptr = - reinterpret_cast(inter_slot_list->ptr()); - - scatter_inter_vals_by_all2all_common(gpu_id, - node_num, - sizeof(uint8_t), - reinterpret_cast(d_tmp_slot_list_ptr), - reinterpret_cast(slot_list_ptr), - reinterpret_cast(inter_slot_list_ptr), - stream, - sage_mode, - true); - VLOG(2) << "end send slot list"; - - auto inter_size_list_prefix_sum = - memory::Alloc(place, - node_num * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - uint32_t* inter_size_list_prefix_sum_ptr = - reinterpret_cast(inter_size_list_prefix_sum->ptr()); - CUDA_CHECK(cub::DeviceScan::ExclusiveSum( - NULL, storage_bytes, inter_size_list_ptr, inter_size_list_prefix_sum_ptr, node_num, stream)); - CUDA_CHECK(cub::DeviceScan::ExclusiveSum(d_temp_storage_tmp->ptr(), - storage_bytes, - inter_size_list_ptr, - inter_size_list_prefix_sum_ptr, - node_num, - stream)); - CUDA_CHECK(cudaStreamSynchronize(stream)); - - int grid_size = (node_num - 1) / block_size_ + 1; - fill_vari_feature_and_slot<<>>( - feature_list_ptr, - slot_list_ptr, - inter_feature_list_ptr, - inter_slot_list_ptr, - size_list_prefix_sum_ptr, - inter_size_list_prefix_sum_ptr, - inter_size_list_ptr, - loc.shard_res.d_local_idx_parted, - node_num); - - VLOG(2) << "end all2all get slot info, node_num: " << node_num << ", pull size: " << pull_size << ", fea num: " << fea_num; - CUDA_CHECK(cudaStreamSynchronize(stream)); - - return fea_num; + int gpu_id, + uint64_t* d_nodes, + int node_num, + std::shared_ptr& size_list, + std::shared_ptr& size_list_prefix_sum, + std::shared_ptr& feature_list, + std::shared_ptr& slot_list, + bool sage_mode) { + if (node_num == 0) { + return 0; + } + + platform::CUDAPlace place = platform::CUDAPlace(resource_->dev_id(gpu_id)); + platform::CUDADeviceGuard guard(resource_->dev_id(gpu_id)); + int total_gpu = resource_->total_device(); + auto stream = resource_->local_stream(gpu_id, 0); + auto& loc = storage_[gpu_id]; + auto& res = loc.shard_res; + + size_t pull_size = 0; + // loc.alloc(node_num, sizeof(uint64_t) /*key_bytes*/); + loc.alloc(node_num, sizeof(uint32_t) /*key_bytes*/); + pull_size = gather_inter_keys_by_all2all(gpu_id, node_num, d_nodes, stream); + VLOG(2) << "gather iner keys by all2all, pull size: " << pull_size + << ", node num: " << node_num; + + std::shared_ptr d_tmp_feature_list; + std::shared_ptr d_tmp_slot_list; + std::shared_ptr d_tmp_size_list; + std::shared_ptr d_tmp_size_prefixsum_list; + + d_tmp_size_list = + memory::Alloc(place, + pull_size * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + d_tmp_size_prefixsum_list = + memory::Alloc(place, + pull_size * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + int ret = get_feature_info_of_nodes_normal(gpu_id, + loc.d_merged_keys, + pull_size, + d_tmp_size_list, + d_tmp_size_prefixsum_list, + d_tmp_feature_list, + d_tmp_slot_list); + VLOG(2) << "finish feature info of nodes, ret: " << ret; + + uint64_t* d_tmp_feature_list_ptr = + reinterpret_cast(d_tmp_feature_list->ptr()); + uint8_t* d_tmp_slot_list_ptr = + reinterpret_cast(d_tmp_slot_list->ptr()); + uint32_t* d_tmp_size_list_ptr = + reinterpret_cast(d_tmp_size_list->ptr()); + uint32_t* d_tmp_size_prefixsum_list_ptr = + reinterpret_cast(d_tmp_size_prefixsum_list->ptr()); + + uint32_t* size_list_ptr = reinterpret_cast(size_list->ptr()); + uint32_t* size_list_prefix_sum_ptr = + reinterpret_cast(size_list_prefix_sum->ptr()); + + VLOG(2) << "begin scatter size list"; + scatter_inter_vals_by_all2all_common( + gpu_id, + node_num, + sizeof(uint32_t), + reinterpret_cast(d_tmp_size_list_ptr), + reinterpret_cast(size_list_ptr), + reinterpret_cast(loc.d_merged_vals), + stream); + VLOG(2) << "end scatter size list"; + std::shared_ptr inter_size_list = + memory::Alloc(place, + node_num * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + uint32_t* inter_size_list_ptr = + reinterpret_cast(inter_size_list->ptr()); + CUDA_CHECK(cudaMemcpyAsync(inter_size_list_ptr, + loc.d_merged_vals, + sizeof(uint32_t) * node_num, + cudaMemcpyDeviceToDevice, + stream)); + + VLOG(2) << "begin calc size list prefix sum"; + size_t storage_bytes = 0; + CUDA_CHECK(cub::DeviceScan::ExclusiveSum(NULL, + storage_bytes, + size_list_ptr, + size_list_prefix_sum_ptr, + node_num, + stream)); + CUDA_CHECK(cudaStreamSynchronize(stream)); + auto d_temp_storage_tmp = + memory::Alloc(place, + storage_bytes, + phi::Stream(reinterpret_cast(stream))); + CUDA_CHECK(cub::DeviceScan::ExclusiveSum(d_temp_storage_tmp->ptr(), + storage_bytes, + size_list_ptr, + size_list_prefix_sum_ptr, + node_num, + stream)); + VLOG(2) << "end calc size list prefix sum"; + + std::vector h_feature_size_list(node_num, 0); + CUDA_CHECK( + cudaMemcpyAsync(reinterpret_cast(h_feature_size_list.data()), + size_list_ptr, + sizeof(uint32_t) * node_num, + cudaMemcpyDeviceToHost, + stream)); + int fea_num = 0; + for (size_t i = 0; i < h_feature_size_list.size(); i++) { + fea_num += h_feature_size_list[i]; + } + VLOG(2) << "after calc, total fea num:" << fea_num; + + feature_list = + memory::Alloc(place, + fea_num * sizeof(uint64_t), + phi::Stream(reinterpret_cast(stream))); + uint64_t* feature_list_ptr = reinterpret_cast(feature_list->ptr()); + slot_list = + memory::Alloc(place, + fea_num * sizeof(uint8_t), + phi::Stream(reinterpret_cast(stream))); + uint8_t* slot_list_ptr = reinterpret_cast(slot_list->ptr()); + + // calc new offset + recalc_local_and_remote_size(gpu_id, + pull_size, + node_num, + d_tmp_size_list_ptr, + inter_size_list_ptr, + stream); + + VLOG(2) << "begin send feature list"; + std::shared_ptr inter_feature_list = + memory::Alloc(place, + fea_num * sizeof(uint64_t), + phi::Stream(reinterpret_cast(stream))); + uint64_t* inter_feature_list_ptr = + reinterpret_cast(inter_feature_list->ptr()); + + scatter_inter_vals_by_all2all_common( + gpu_id, + node_num, + sizeof(uint64_t), + reinterpret_cast(d_tmp_feature_list_ptr), + reinterpret_cast(feature_list_ptr), + reinterpret_cast(inter_feature_list_ptr), + stream, + sage_mode, + true); + VLOG(2) << "end send feature list"; + + VLOG(2) << "begin send slot list"; + std::shared_ptr inter_slot_list = + memory::Alloc(place, + fea_num * sizeof(uint8_t), + phi::Stream(reinterpret_cast(stream))); + uint8_t* inter_slot_list_ptr = + reinterpret_cast(inter_slot_list->ptr()); + + scatter_inter_vals_by_all2all_common( + gpu_id, + node_num, + sizeof(uint8_t), + reinterpret_cast(d_tmp_slot_list_ptr), + reinterpret_cast(slot_list_ptr), + reinterpret_cast(inter_slot_list_ptr), + stream, + sage_mode, + true); + VLOG(2) << "end send slot list"; + + auto inter_size_list_prefix_sum = + memory::Alloc(place, + node_num * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + uint32_t* inter_size_list_prefix_sum_ptr = + reinterpret_cast(inter_size_list_prefix_sum->ptr()); + CUDA_CHECK(cub::DeviceScan::ExclusiveSum(NULL, + storage_bytes, + inter_size_list_ptr, + inter_size_list_prefix_sum_ptr, + node_num, + stream)); + CUDA_CHECK(cub::DeviceScan::ExclusiveSum(d_temp_storage_tmp->ptr(), + storage_bytes, + inter_size_list_ptr, + inter_size_list_prefix_sum_ptr, + node_num, + stream)); + CUDA_CHECK(cudaStreamSynchronize(stream)); + + int grid_size = (node_num - 1) / block_size_ + 1; + fill_vari_feature_and_slot<<>>( + feature_list_ptr, + slot_list_ptr, + inter_feature_list_ptr, + inter_slot_list_ptr, + size_list_prefix_sum_ptr, + inter_size_list_prefix_sum_ptr, + inter_size_list_ptr, + loc.shard_res.d_local_idx_parted, + node_num); + + VLOG(2) << "end all2all get slot info, node_num: " << node_num + << ", pull size: " << pull_size << ", fea num: " << fea_num; + CUDA_CHECK(cudaStreamSynchronize(stream)); + + return fea_num; } int GpuPsGraphTable::get_feature_info_of_nodes_normal( @@ -3603,7 +3742,8 @@ int GpuPsGraphTable::get_feature_info_of_nodes_normal( int grid_size = (node_num - 1) / block_size_ + 1; uint32_t* size_list_ptr = reinterpret_cast(size_list->ptr()); - uint32_t* size_list_prefix_sum_ptr = reinterpret_cast(size_list_prefix_sum->ptr()); + uint32_t* size_list_prefix_sum_ptr = + reinterpret_cast(size_list_prefix_sum->ptr()); fill_size<<>>( size_list_ptr, d_size_list_ptr, d_idx_ptr, node_num); @@ -3616,8 +3756,12 @@ int GpuPsGraphTable::get_feature_info_of_nodes_normal( uint32_t* src_fea_size_prefix_sum_ptr = reinterpret_cast(src_fea_size_prefix_sum->ptr()); CUDA_CHECK(cudaStreamSynchronize(stream)); - CUDA_CHECK(cub::DeviceScan::ExclusiveSum( - NULL, storage_bytes, size_list_ptr, size_list_prefix_sum_ptr, node_num, stream)); + CUDA_CHECK(cub::DeviceScan::ExclusiveSum(NULL, + storage_bytes, + size_list_ptr, + size_list_prefix_sum_ptr, + node_num, + stream)); CUDA_CHECK(cudaStreamSynchronize(stream)); auto d_temp_storage_tmp = memory::Alloc(place, @@ -3780,16 +3924,19 @@ int GpuPsGraphTable::get_float_feature_info_of_nodes( // CUDA_CHECK(cudaStreamSynchronize( // node.in_stream)); // wait for walk_to_dest and memset tables_[table_offset]->get(reinterpret_cast(node.key_storage), - reinterpret_cast(d_fea_info[i]), - static_cast(h_right[i] - h_left[i] + 1), - resource_->remote_stream(i, gpu_id)); + reinterpret_cast(d_fea_info[i]), + static_cast(h_right[i] - h_left[i] + 1), + resource_->remote_stream(i, gpu_id)); dim3 grid((shard_len[i] - 1) / dim_y + 1); dim3 block(1, dim_y); - get_float_features_size<<remote_stream(i, gpu_id)>>>( - reinterpret_cast(d_fea_info[i]), - reinterpret_cast(d_fea_size[i]), - shard_len[i]); + get_float_features_size<<remote_stream(i, gpu_id)>>>( + reinterpret_cast(d_fea_info[i]), + reinterpret_cast(d_fea_size[i]), + shard_len[i]); CUDA_CHECK(cudaMemsetAsync(d_fea_size_prefix_sum[i], 0, @@ -3847,9 +3994,9 @@ int GpuPsGraphTable::get_float_feature_info_of_nodes( dim3 grid((shard_len[i] - 1) / dim_y + 1); dim3 block(1, dim_y); get_float_features_kernel<<remote_stream(i, gpu_id)>>>( + block, + 0, + resource_->remote_stream(i, gpu_id)>>>( graph, reinterpret_cast(d_fea_info[i]), reinterpret_cast(d_fea_size_prefix_sum[i]), @@ -3874,8 +4021,7 @@ int GpuPsGraphTable::get_float_feature_info_of_nodes( memory::Alloc(place, all_fea_num * sizeof(float), phi::Stream(reinterpret_cast(stream))); - float* d_feature_list_ptr = - reinterpret_cast(feature_list_tmp->ptr()); + float* d_feature_list_ptr = reinterpret_cast(feature_list_tmp->ptr()); auto slot_list_tmp = memory::Alloc(place, all_fea_num * sizeof(uint8_t), @@ -3926,14 +4072,13 @@ int GpuPsGraphTable::get_float_feature_info_of_nodes( all_fea_num * sizeof(float), phi::Stream(reinterpret_cast(stream))); - float* d_res_feature_list_ptr = - reinterpret_cast(feature_list->ptr()); + float* d_res_feature_list_ptr = reinterpret_cast(feature_list->ptr()); slot_list = memory::Alloc(place, all_fea_num * sizeof(uint8_t), phi::Stream(reinterpret_cast(stream))); - + uint8_t* d_res_slot_list_ptr = reinterpret_cast(slot_list->ptr()); int grid_size = (node_num - 1) / block_size_ + 1; @@ -3984,7 +4129,6 @@ int GpuPsGraphTable::get_float_feature_info_of_nodes( return all_fea_num; } - int GpuPsGraphTable::get_feature_of_nodes(int gpu_id, uint64_t* d_nodes, uint64_t* d_feature, @@ -4107,18 +4251,14 @@ int GpuPsGraphTable::get_feature_of_nodes(int gpu_id, actual_size_array + shard_len + shard_len % 2); dim3 grid((shard_len - 1) / dim_y + 1); dim3 block(1, dim_y); - get_features_kernel<<>>( - graph, - val_array, - actual_size_array, - feature_array, - d_slot_feature_num_map, - slot_num, - shard_len, - fea_num_per_node); + get_features_kernel<<>>(graph, + val_array, + actual_size_array, + feature_array, + d_slot_feature_num_map, + slot_num, + shard_len, + fea_num_per_node); } for (int i = 0; i < total_gpu; ++i) { diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu index 69757684677aee..17962be5f93ec6 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu @@ -48,49 +48,57 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type_str, std::vector first_node_type_vec; if (first_node_type_str[0] == '[') { - assert(first_node_type_str[first_node_type_str.size() -1] == ']'); - std::string tmp_first_node_type_str(first_node_type_str, 1, first_node_type_str.size() - 2); - auto tmp_first_node_types = paddle::string::split_string(tmp_first_node_type_str, ","); - first_node_type_vec.assign(tmp_first_node_types.begin(), tmp_first_node_types.end()); + assert(first_node_type_str[first_node_type_str.size() - 1] == ']'); + std::string tmp_first_node_type_str( + first_node_type_str, 1, first_node_type_str.size() - 2); + auto tmp_first_node_types = paddle::string::split_string( + tmp_first_node_type_str, ","); + first_node_type_vec.assign(tmp_first_node_types.begin(), + tmp_first_node_types.end()); } else { first_node_type_vec.push_back(first_node_type_str); } tensor_pair_num_ = first_node_type_vec.size(); first_node_type_.resize(tensor_pair_num_); all_node_type_.resize(tensor_pair_num_); - for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; ++tensor_pair_idx) { + for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; + ++tensor_pair_idx) { auto &first_node_type = first_node_type_vec[tensor_pair_idx]; auto node_types = paddle::string::split_string(first_node_type, ";"); VLOG(2) << "node_types: " << first_node_type; for (auto &type : node_types) { auto iter = node_to_id.find(type); - PADDLE_ENFORCE_NE( - iter, - node_to_id.end(), - platform::errors::NotFound("(%s) is not found in node_to_id.", type)); + PADDLE_ENFORCE_NE(iter, + node_to_id.end(), + platform::errors::NotFound( + "(%s) is not found in node_to_id.", type)); VLOG(2) << "node_to_id[" << type << "] = " << iter->second; first_node_type_[tensor_pair_idx].push_back(iter->second); all_node_type_[tensor_pair_idx].push_back(iter->second); - } // end for (auto &type : node_types) - } // end for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; + } // end for (auto &type : node_types) + } // end for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; std::vector meta_path_vec; if (meta_path_str[0] == '[') { - assert(meta_path_str[meta_path_str.size() -1] == ']'); + assert(meta_path_str[meta_path_str.size() - 1] == ']'); std::string tmp_meta_path(meta_path_str, 1, meta_path_str.size() - 2); - auto tmp_meta_paths = paddle::string::split_string(tmp_meta_path, ","); + auto tmp_meta_paths = + paddle::string::split_string(tmp_meta_path, ","); meta_path_vec.assign(tmp_meta_paths.begin(), tmp_meta_paths.end()); } else { meta_path_vec.push_back(meta_path_str); } assert(tensor_pair_num_ == meta_path_vec.size()); meta_path_.resize(tensor_pair_num_); - for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; ++tensor_pair_idx) { + for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; + ++tensor_pair_idx) { auto &meta_path = meta_path_vec[tensor_pair_idx]; - meta_path_[tensor_pair_idx].resize(first_node_type_[tensor_pair_idx].size()); - auto meta_paths = paddle::string::split_string(meta_path, ";"); - + meta_path_[tensor_pair_idx].resize( + first_node_type_[tensor_pair_idx].size()); + auto meta_paths = + paddle::string::split_string(meta_path, ";"); + for (size_t i = 0; i < meta_paths.size(); i++) { auto path = meta_paths[i]; auto edges = paddle::string::split_string(path, "-"); @@ -110,9 +118,9 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type_str, all_node_type_[tensor_pair_idx].push_back(src_node_id); all_node_type_[tensor_pair_idx].push_back(dst_node_id); } - } // end for (auto &edge : edges) { - } // end for (size_t i = 0; i < meta_paths.size(); i++) { - } // end for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; + } // end for (auto &edge : edges) { + } // end for (size_t i = 0; i < meta_paths.size(); i++) { + } // end for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; auto paths = paddle::string::split_string(excluded_train_pair, ";"); @@ -133,25 +141,32 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type_str, if (pair_label != "") { pair_label_conf_.assign(id_to_feature.size() * id_to_feature.size(), 0); auto items = paddle::string::split_string(pair_label, ","); - VLOG(2) << "pair_label[" << pair_label << "] id_to_feature.size() = " << id_to_feature.size(); + VLOG(2) << "pair_label[" << pair_label + << "] id_to_feature.size() = " << id_to_feature.size(); for (auto &item : items) { auto sub_items = paddle::string::split_string(item, ":"); int label = std::stoi(sub_items[1]); auto nodes = get_ntype_from_etype(sub_items[0]); auto &edge_src = nodes[0]; auto src_iter = node_to_id.find(edge_src); - PADDLE_ENFORCE_NE(src_iter, edge_to_id.end(), - platform::errors::NotFound( - "(%s) is not found in edge_to_id.", edge_src)); + PADDLE_ENFORCE_NE(src_iter, + edge_to_id.end(), + platform::errors::NotFound( + "(%s) is not found in edge_to_id.", edge_src)); auto &edge_dst = nodes[1]; auto dst_iter = node_to_id.find(edge_dst); - PADDLE_ENFORCE_NE(dst_iter, edge_to_id.end(), - platform::errors::NotFound( - "(%s) is not found in edge_to_id.", edge_dst)); - VLOG(2) << "pair_label_conf[" << src_iter->second << "][" << dst_iter->second << "] = " << label; - pair_label_conf_[src_iter->second * id_to_feature.size() + dst_iter->second] = label; - if (pair_label_conf_[dst_iter->second * id_to_feature.size() + src_iter->second] == 0) { - pair_label_conf_[dst_iter->second * id_to_feature.size() + src_iter->second] = label; + PADDLE_ENFORCE_NE(dst_iter, + edge_to_id.end(), + platform::errors::NotFound( + "(%s) is not found in edge_to_id.", edge_dst)); + VLOG(2) << "pair_label_conf[" << src_iter->second << "][" + << dst_iter->second << "] = " << label; + pair_label_conf_[src_iter->second * id_to_feature.size() + + dst_iter->second] = label; + if (pair_label_conf_[dst_iter->second * id_to_feature.size() + + src_iter->second] == 0) { + pair_label_conf_[dst_iter->second * id_to_feature.size() + + src_iter->second] = label; } } for (int i = 0; i < id_to_feature.size(); ++i) { @@ -174,7 +189,8 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type_str, node_type_start_.resize(tensor_pair_num_); global_infer_node_type_start_.resize(max_dev_id + 1); infer_cursor_.resize(tensor_pair_num_); - for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; ++tensor_pair_idx) { + for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; + ++tensor_pair_idx) { finish_node_type_[tensor_pair_idx].resize(max_dev_id + 1); node_type_start_[tensor_pair_idx].resize(max_dev_id + 1); auto &first_node_type = first_node_type_vec[tensor_pair_idx]; @@ -186,7 +202,7 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type_str, auto &infer_node_type_start = global_infer_node_type_start_[i]; auto &finish_node_type = finish_node_type_[tensor_pair_idx][i]; finish_node_type.clear(); - + for (size_t idx = 0; idx < node_to_id.size(); idx++) { infer_node_type_start[idx] = 0; } @@ -198,13 +214,13 @@ void GraphGpuWrapper::init_conf(const std::string &first_node_type_str, infer_cursor_[tensor_pair_idx].push_back(0); cursor_.push_back(0); } - } // end for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; - } // end static std::mutex mutex; + } // end for (int tensor_pair_idx = 0; tensor_pair_idx < tensor_pair_num_; + } // end static std::mutex mutex; } void GraphGpuWrapper::init_type_keys( - std::vector>>& keys, - std::vector>& lens) { + std::vector>> &keys, + std::vector> &lens) { size_t thread_num = device_id_mapping.size(); auto &graph_all_type_total_keys = get_graph_type_keys(); @@ -230,8 +246,8 @@ void GraphGpuWrapper::init_type_keys( } keys[f_idx].resize(thread_num); auto &type_total_key = graph_all_type_total_keys[f_idx]; - VLOG(0) << "graph_all_type_total_keys[" << f_idx << "] = " - << graph_all_type_total_keys[f_idx].size(); + VLOG(0) << "graph_all_type_total_keys[" << f_idx + << "] = " << graph_all_type_total_keys[f_idx].size(); for (size_t j = 0; j < type_total_key.size(); j++) { uint64_t shard = type_total_key[j] % thread_num; tmp_keys[shard].push_back(type_total_key[j]); @@ -247,9 +263,10 @@ void GraphGpuWrapper::init_type_keys( int gpuid = device_id_mapping[j]; auto place = platform::CUDAPlace(gpuid); platform::CUDADeviceGuard guard(gpuid); - keys[f_idx][j] = - memory::AllocShared(place, tmp_keys[j].size() * sizeof(uint64_t), - phi::Stream(reinterpret_cast(stream))); + keys[f_idx][j] = memory::AllocShared( + place, + tmp_keys[j].size() * sizeof(uint64_t), + phi::Stream(reinterpret_cast(stream))); cudaMemcpyAsync(keys[f_idx][j]->ptr(), tmp_keys[j].data(), sizeof(uint64_t) * tmp_keys[j].size(), @@ -336,10 +353,10 @@ void GraphGpuWrapper::init_metapath_total_keys() { int gpuid = device_id_mapping[j]; auto place = platform::CUDAPlace(gpuid); platform::CUDADeviceGuard guard(gpuid); - d_node_iter_graph_metapath_keys_[j] = - memory::AllocShared(place, - tmp_keys[j].size() * sizeof(uint64_t), - phi::Stream(reinterpret_cast(stream))); + d_node_iter_graph_metapath_keys_[j] = memory::AllocShared( + place, + tmp_keys[j].size() * sizeof(uint64_t), + phi::Stream(reinterpret_cast(stream))); cudaMemcpyAsync(d_node_iter_graph_metapath_keys_[j]->ptr(), tmp_keys[j].data(), sizeof(uint64_t) * tmp_keys[j].size(), @@ -497,7 +514,8 @@ void GraphGpuWrapper::set_feature_separator(std::string ch) { } } -void GraphGpuWrapper::set_feature_info(int slot_num_for_pull_feature, int float_slot_num) { +void GraphGpuWrapper::set_feature_info(int slot_num_for_pull_feature, + int float_slot_num) { this->slot_num_for_pull_feature_ = slot_num_for_pull_feature; this->float_slot_num_ = float_slot_num; } @@ -592,19 +610,20 @@ int GraphGpuWrapper::load_node_file(std::string ntype2files, ntype2files, graph_data_local_path, part_num); } -int GraphGpuWrapper::set_node_iter_from_file( - std::string ntype2files, - std::string node_types_file_path, - int part_num, - bool training) { +int GraphGpuWrapper::set_node_iter_from_file(std::string ntype2files, + std::string node_types_file_path, + int part_num, + bool training) { // 1. load cpu node - ((GpuPsGraphTable *)graph_table)->cpu_graph_table_->parse_node_and_load( - ntype2files, node_types_file_path, part_num, false); + ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table_->parse_node_and_load( + ntype2files, node_types_file_path, part_num, false); // 2. init node iter keys on cpu and release cpu node shards. if (type_keys_initialized_) { // release d_graph_all_type_total_keys_ and h_graph_all_type_keys_len_ - for (size_t f_idx = 0; f_idx < d_graph_all_type_total_keys_.size(); f_idx++) { + for (size_t f_idx = 0; f_idx < d_graph_all_type_total_keys_.size(); + f_idx++) { for (size_t j = 0; j < d_graph_all_type_total_keys_[f_idx].size(); j++) { d_graph_all_type_total_keys_[f_idx][j].reset(); } @@ -614,7 +633,8 @@ int GraphGpuWrapper::set_node_iter_from_file( type_keys_initialized_ = false; } - ((GpuPsGraphTable *)graph_table)->cpu_graph_table_->build_node_iter_type_keys(); + ((GpuPsGraphTable *)graph_table) + ->cpu_graph_table_->build_node_iter_type_keys(); ((GpuPsGraphTable *)graph_table)->cpu_graph_table_->clear_node_shard(); // 3. init train or infer type keys. @@ -635,8 +655,7 @@ int GraphGpuWrapper::set_node_iter_from_file( int GraphGpuWrapper::set_node_iter_from_graph(bool training) { // 1. init type keys if (!type_keys_initialized_) { - init_type_keys(d_graph_all_type_total_keys_, - h_graph_all_type_keys_len_); + init_type_keys(d_graph_all_type_total_keys_, h_graph_all_type_keys_len_); type_keys_initialized_ = true; } @@ -740,7 +759,7 @@ void GraphGpuWrapper::init_service() { } #else PADDLE_THROW( - platform::errors::Unavailable("heter ps need compile with GLOO")); + platform::errors::Unavailable("heter ps need compile with GLOO")); #endif #ifdef PADDLE_WITH_CUDA @@ -750,7 +769,7 @@ void GraphGpuWrapper::init_service() { inner_comms_.resize(dev_size); inter_ncclids_.resize(dev_size); platform::dynload::ncclCommInitAll( - &(inner_comms_[0]), dev_size, &device_id_mapping[0]); + &(inner_comms_[0]), dev_size, &device_id_mapping[0]); // init inter comm #ifdef PADDLE_WITH_GLOO inter_comms_.resize(dev_size); @@ -761,10 +780,10 @@ void GraphGpuWrapper::init_service() { } PADDLE_ENFORCE_EQ( - gloo->IsInitialized(), - true, - platform::errors::PreconditionNotMet( - "You must initialize the gloo environment first to use it.")); + gloo->IsInitialized(), + true, + platform::errors::PreconditionNotMet( + "You must initialize the gloo environment first to use it.")); gloo::BroadcastOptions opts(gloo->GetContext()); opts.setOutput(&inter_ncclids_[0], dev_size); opts.setRoot(0); @@ -774,20 +793,22 @@ void GraphGpuWrapper::init_service() { for (int i = 0; i < dev_size; ++i) { platform::CUDADeviceGuard guard(device_id_mapping[i]); platform::dynload::ncclCommInitRank( - &inter_comms_[i], gloo->Size(), inter_ncclids_[i], gloo->Rank()); + &inter_comms_[i], gloo->Size(), inter_ncclids_[i], gloo->Rank()); } PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclGroupEnd()); rank_id_ = gloo->Rank(); node_size_ = gloo->Size(); #else - PADDLE_THROW(platform::errors::Unavailable("heter ps need compile with GLOO")); + PADDLE_THROW( + platform::errors::Unavailable("heter ps need compile with GLOO")); #endif } #endif size_t gpu_num = device_id_mapping.size(); - GpuPsGraphTable *g = new GpuPsGraphTable(resource, id_to_edge.size(), slot_num_for_pull_feature_, float_slot_num_); + GpuPsGraphTable *g = new GpuPsGraphTable( + resource, id_to_edge.size(), slot_num_for_pull_feature_, float_slot_num_); g->init_cpu_table(table_proto, gpu_num); g->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_, rank_id_); g->cpu_graph_table_->set_feature_separator(feature_separator_); @@ -801,8 +822,7 @@ void GraphGpuWrapper::finalize() { reinterpret_cast(graph_table)->show_table_collisions(); } -void GraphGpuWrapper::show_mem(const char* msg) -{ +void GraphGpuWrapper::show_mem(const char *msg) { show_cpu_mem(msg); show_gpu_mem(msg); } @@ -871,12 +891,13 @@ void GraphGpuWrapper::upload_batch(int table_type, g->build_graph_fea_on_single_gpu(sub_graph, i); sub_graph.release_on_cpu(); VLOG(0) << "sub graph fea on gpu " << i << " is built"; - if (float_slot_num > 0) { + if (float_slot_num > 0) { // build float feature - VLOG(0) << "begin make_gpu_ps_graph_float_fea, node_ids[" << i << "]_size[" - << node_ids[i].size() << "]"; + VLOG(0) << "begin make_gpu_ps_graph_float_fea, node_ids[" << i + << "]_size[" << node_ids[i].size() << "]"; GpuPsCommGraphFloatFea float_sub_graph = - g->cpu_graph_table_->make_gpu_ps_graph_float_fea(i, node_ids[i], float_slot_num); + g->cpu_graph_table_->make_gpu_ps_graph_float_fea( + i, node_ids[i], float_slot_num); // sub_graph.display_on_cpu(); VLOG(0) << "begin build_graph_float_fea_on_single_gpu, node_ids[" << i << "]_size[" << node_ids[i].size() << "]"; @@ -921,7 +942,8 @@ std::vector GraphGpuWrapper::get_sub_graph_float_fea( tasks.push_back(upload_task_pool->enqueue([&, i, this]() -> int { GpuPsGraphTable *g = reinterpret_cast(graph_table); sub_graph_float_feas[i] = - g->cpu_graph_table_->make_gpu_ps_graph_float_fea(i, node_ids[i], float_slot_num); + g->cpu_graph_table_->make_gpu_ps_graph_float_fea( + i, node_ids[i], float_slot_num); return 0; })); } @@ -940,8 +962,8 @@ void GraphGpuWrapper::build_gpu_graph_fea(GpuPsCommGraphFea &sub_graph_fea, } // build_gpu_graph_float_fea -void GraphGpuWrapper::build_gpu_graph_float_fea(GpuPsCommGraphFloatFea &sub_graph_float_fea, - int i) { +void GraphGpuWrapper::build_gpu_graph_float_fea( + GpuPsCommGraphFloatFea &sub_graph_float_fea, int i) { GpuPsGraphTable *g = reinterpret_cast(graph_table); g->build_graph_float_fea_on_single_gpu(sub_graph_float_fea, i); sub_graph_float_fea.release_on_cpu(); @@ -950,11 +972,9 @@ void GraphGpuWrapper::build_gpu_graph_float_fea(GpuPsCommGraphFloatFea &sub_grap } NeighborSampleResult GraphGpuWrapper::graph_neighbor_sample_v3( - NeighborSampleQuery q, bool cpu_switch, bool compress, - bool weighted) { + NeighborSampleQuery q, bool cpu_switch, bool compress, bool weighted) { return reinterpret_cast(graph_table) - ->graph_neighbor_sample_v3(q, cpu_switch, compress, - weighted); + ->graph_neighbor_sample_v3(q, cpu_switch, compress, weighted); } NeighborSampleResultV2 GraphGpuWrapper::graph_neighbor_sample_sage( @@ -967,9 +987,14 @@ NeighborSampleResultV2 GraphGpuWrapper::graph_neighbor_sample_sage( bool weighted, bool return_weight) { return reinterpret_cast(graph_table) - ->graph_neighbor_sample_sage( - gpu_id, edge_type_len, key, sample_size, len, edge_type_graphs, - weighted, return_weight); + ->graph_neighbor_sample_sage(gpu_id, + edge_type_len, + key, + sample_size, + len, + edge_type_graphs, + weighted, + return_weight); } std::vector> @@ -1086,10 +1111,15 @@ std::vector GraphGpuWrapper::graph_neighbor_sample( key.size() * sizeof(uint64_t), cudaMemcpyHostToDevice); VLOG(0) << "key_size: " << key.size(); - auto neighbor_sample_res = - reinterpret_cast(graph_table) - ->graph_neighbor_sample_v2( - gpu_id, idx, cuda_key, sample_size, key.size(), false, true, false); + auto neighbor_sample_res = reinterpret_cast(graph_table) + ->graph_neighbor_sample_v2(gpu_id, + idx, + cuda_key, + sample_size, + key.size(), + false, + true, + false); int *actual_sample_size = new int[key.size()]; cudaMemcpy(actual_sample_size, neighbor_sample_res.actual_sample_size, diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h index 27dd7902e641a5..ee32041f01ace8 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h @@ -58,7 +58,7 @@ class GraphGpuWrapper { void init_conf(const std::string& first_node_type, const std::string& meta_path, const std::string& excluded_train_pair, - const std::string &pair_label); + const std::string& pair_label); void initialize(); void finalize(); void set_device(std::vector ids); @@ -70,13 +70,18 @@ class GraphGpuWrapper { void upload_batch(int table_type, int slice_num, const std::string& edge_type); - void upload_batch(int table_type, int slice_num, int slot_num, int float_slot_num); + void upload_batch(int table_type, + int slice_num, + int slot_num, + int float_slot_num); std::vector get_sub_graph_fea( - std::vector>& node_ids, int slot_num); // NOLINT + std::vector>& node_ids, int slot_num); // NOLINT std::vector get_sub_graph_float_fea( - std::vector>& node_ids, int float_slot_num); // NOLINT + std::vector>& node_ids, + int float_slot_num); // NOLINT void build_gpu_graph_fea(GpuPsCommGraphFea& sub_graph_fea, int i); // NOLINT - void build_gpu_graph_float_fea(GpuPsCommGraphFloatFea& sub_graph_float_fea, int i); // NOLINT + void build_gpu_graph_float_fea(GpuPsCommGraphFloatFea& sub_graph_float_fea, + int i); // NOLINT void add_table_feat_conf(std::string table_name, std::string feat_name, std::string feat_dtype, @@ -187,15 +192,15 @@ class GraphGpuWrapper { std::shared_ptr& size_list_prefix_sum, std::shared_ptr& feature_list, // NOLINT std::shared_ptr& slot_list, - bool sage_mode = false); // NOLINT + bool sage_mode = false); // NOLINT int get_float_feature_info_of_nodes( int gpu_id, - uint64_t *d_nodes, + uint64_t* d_nodes, int node_num, - uint32_t *size_list, - uint32_t *size_list_prefix_sum, - std::shared_ptr &feature_list, // NOLINT - std::shared_ptr &slot_list); // NOLINT + uint32_t* size_list, + uint32_t* size_list_prefix_sum, + std::shared_ptr& feature_list, // NOLINT + std::shared_ptr& slot_list); // NOLINT void init_metapath(std::string cur_metapath, int cur_metapath_index, int cur_metapath_len); @@ -251,7 +256,8 @@ class GraphGpuWrapper { std::vector>> d_node_iter_graph_all_type_keys_; std::vector> h_node_iter_graph_all_type_keys_len_; - std::vector> d_node_iter_graph_metapath_keys_; + std::vector> + d_node_iter_graph_metapath_keys_; std::vector h_node_iter_graph_metapath_keys_len_; std::map - void MemcpyPeerAsync(void *dst, - const void *src, + template + void MemcpyPeerAsync(void* dst, + const void* src, size_t count, StreamType stream); @@ -576,36 +576,35 @@ class HeterComm { char* d_rev_buff, const cudaStream_t& stream); size_t gather_inter_keys_by_all2all(const int& gpu_id, - const size_t& fea_size, - const KeyType* d_in_keys, - const cudaStream_t& stream); - void scatter_inter_vals_by_all2all(const int& gpu_id, const size_t& fea_size, - const char* d_in_vals, - void* d_out_vals, - const size_t& value_bytes, - void* d_tmp_vals, + const KeyType* d_in_keys, const cudaStream_t& stream); + void scatter_inter_vals_by_all2all(const int& gpu_id, + const size_t& fea_size, + const char* d_in_vals, + void* d_out_vals, + const size_t& value_bytes, + void* d_tmp_vals, + const cudaStream_t& stream); void recalc_local_and_remote_size(const int& gpu_id, const size_t& pull_size, const size_t& node_num, const uint32_t* d_tmp_size_list, const uint32_t* d_inter_size_list, - const cudaStream_t &stream); - - template - void scatter_inter_vals_by_all2all_common( - const int& gpu_id, - const size_t& len, - const size_t& value_bytes, - const T* d_in_vals, - T* d_out_vals, - T* d_tmp_vals, - const cudaStream_t& stream, - bool sage = false, - bool slot = false) { - auto &cache = storage_[gpu_id]; - auto &res = cache.shard_res; + const cudaStream_t& stream); + + template + void scatter_inter_vals_by_all2all_common(const int& gpu_id, + const size_t& len, + const size_t& value_bytes, + const T* d_in_vals, + T* d_out_vals, + T* d_tmp_vals, + const cudaStream_t& stream, + bool sage = false, + bool slot = false) { + auto& cache = storage_[gpu_id]; + auto& res = cache.shard_res; auto h_local_part_sizes = res.h_local_part_sizes.data(); auto h_local_part_offsets = res.h_local_part_offsets.data(); @@ -615,28 +614,30 @@ class HeterComm { size_t total_fea_num = 0; if (rdma_checker_->need_rdma_trans() && !sage) { - // Sage mode can not run this branch currently, otherwise the process will hang here. + // Sage mode can not run this branch currently, otherwise the process will + // hang here. total_fea_num = send_vals_by_all2all_trans(gpu_id, - rank_id_, - node_size_, - reinterpret_cast(d_in_vals), - reinterpret_cast(d_tmp_vals), - value_bytes, - stream); + rank_id_, + node_size_, + reinterpret_cast(d_in_vals), + reinterpret_cast(d_tmp_vals), + value_bytes, + stream); } else { // sage is true, set default to run here. - total_fea_num = send_data_by_all2all(gpu_id, - node_size_, - rank_id_, - value_bytes, - h_remote_part_sizes, - h_remote_part_offsets, - h_local_part_sizes, - h_local_part_offsets, - reinterpret_cast(d_in_vals), - reinterpret_cast(d_tmp_vals), - stream); + total_fea_num = + send_data_by_all2all(gpu_id, + node_size_, + rank_id_, + value_bytes, + h_remote_part_sizes, + h_remote_part_offsets, + h_local_part_sizes, + h_local_part_offsets, + reinterpret_cast(d_in_vals), + reinterpret_cast(d_tmp_vals), + stream); } PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamSynchronize(stream)); @@ -645,15 +646,15 @@ class HeterComm { // slot feature don't need scatter if (!slot) { heter_comm_kernel_->scatter_vals( - reinterpret_cast(d_tmp_vals), // in - reinterpret_cast(d_out_vals), // out - res.d_local_idx_parted, - len, - value_bytes, - stream); + reinterpret_cast(d_tmp_vals), // in + reinterpret_cast(d_out_vals), // out + res.d_local_idx_parted, + len, + value_bytes, + stream); CUDA_CHECK(cudaStreamSynchronize(stream)); } -} + } void scatter_inner_vals_p2p(const size_t& total_fea_num, void* d_out_vals,