Skip to content

Commit

Permalink
fix multi_pairs (PaddlePaddle#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
huwei02 authored and danleifeng committed Sep 12, 2023
1 parent b9dfe5f commit e04ad0b
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 51 deletions.
106 changes: 71 additions & 35 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <chrono>
#include <set>
#include <sstream>
#include <tuple>

#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
Expand Down Expand Up @@ -1354,7 +1355,7 @@ void GraphTable::dbh_graph_edge_partition() {
dest_edge_ids.push_back(k);
}
}
int src_fea_idx = feature_to_id[node_type[0]];
int src_fea_idx = node_type_str_to_node_types_idx[node_type[0]];
std::vector<std::future<int>> shard_tasks;
for (size_t part_id = 0; part_id < shard_num; ++part_id) {
shard_tasks.push_back(
Expand Down Expand Up @@ -1462,13 +1463,13 @@ void GraphTable::dbh_graph_edge_partition() {
void GraphTable::dbh_graph_feature_partition() {
VLOG(0) << "start to process dbh feature shard";
std::vector<std::future<int>> tasks;
for (auto &it : this->feature_to_id) {
auto node_idx = it.second;
for (auto &it : this->node_type_str_to_node_types_idx) {
auto node_types_idx = it.second;
for (size_t i = 0; i < shard_num_per_server; i++) {
tasks.push_back(
load_node_edge_task_pool->enqueue([&, node_idx, i, this]() -> int {
load_node_edge_task_pool->enqueue([&, node_types_idx, i, this]() -> int {
std::vector<uint64_t> remove_keys;
auto &shards = feature_shards[node_idx][i]->get_bucket();
auto &shards = feature_shards[node_types_idx][i]->get_bucket();
for (auto node : shards) {
uint64_t id = node->get_id();
// 在边表里的key以及hash对应的key保留,其余删除
Expand All @@ -1479,7 +1480,7 @@ void GraphTable::dbh_graph_feature_partition() {
}
}
for (auto &key : remove_keys) {
feature_shards[node_idx][i]->delete_node(key);
feature_shards[node_types_idx][i]->delete_node(key);
}
return 0;
}));
Expand Down Expand Up @@ -2028,11 +2029,11 @@ void GraphTable::fix_feature_node_shards(bool load_slot) {
auto &shards = (load_slot) ? feature_shards: node_shards;
VLOG(0) << "begin fix " << ((load_slot) ? "feature ": "")
<< "node type count=" << shards.size() << ", edge count=" << edge_shards.size();
std::vector<std::future<std::pair<size_t, size_t>>> tasks;
std::vector<std::future<std::tuple<size_t, size_t, size_t>>> tasks;
for (size_t idx = 0; idx < shards.size(); ++idx) {
for (size_t j = 0; j < shards[idx].size(); ++j) {
tasks.push_back(load_node_edge_task_pool->enqueue(
[this, idx, j, load_slot]() -> std::pair<size_t, size_t> {
[this, idx, j, load_slot]() -> std::tuple<size_t, size_t, size_t> {
size_t cnt = 0;
size_t edge_node_cnt = 0;
auto &features = (load_slot) ? feature_shards[idx][j] : node_shards[idx][j];
Expand All @@ -2056,20 +2057,32 @@ void GraphTable::fix_feature_node_shards(bool load_slot) {
size_t total = features->get_size();
VLOG(5) << "fix total edge node count=" << edge_node_cnt
<< ", total feature node count=" << total
<< ", node idx=" << idx << ", shard id=" << j << ", count=" << cnt;
return {total, cnt};
<< ", node_types_idx=" << idx << ", shard id=" << j << ", add_count=" << cnt;
return std::make_tuple(total, cnt, idx);
}));
}
}
size_t total = 0;
size_t add_cnt = 0;
std::vector<size_t> add_cnt_vec;
std::vector<size_t> total_vec;
add_cnt_vec.assign(shards.size(), 0);
total_vec.assign(shards.size(), 0);
for (auto &t : tasks) {
auto pair = t.get();
total += pair.first;
add_cnt += pair.second;
total += std::get<0>(pair);
add_cnt += std::get<1>(pair);
int node_types_idx = std::get<2>(pair);
total_vec[node_types_idx] += std::get<0>(pair);
add_cnt_vec[node_types_idx] += std::get<1>(pair);
}
VLOG(0) << "fix node count=" << total
<< ", add count=" << add_cnt << ", with slot=" << load_slot;
for (size_t i = 0; i < shards.size(); ++i) {
VLOG(1) << "node_type[" << node_types_[i] << "] node_type_idx[" << i
<< "] orig[" << total_vec[i] - add_cnt_vec[i] << "] add_cnt["
<< add_cnt_vec[i] << "] total[" << total_vec[i] << "]";
}
}
int32_t GraphTable::load_node_and_edge_file(
std::string etype2files,
Expand Down Expand Up @@ -2357,8 +2370,8 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file_parallel(
continue;
}
std::string parse_node_type = vals[0].to_string();
auto it = feature_to_id.find(parse_node_type);
if (it == feature_to_id.end()) {
auto it = node_type_str_to_node_types_idx.find(parse_node_type);
if (it == node_type_str_to_node_types_idx.end()) {
VLOG(1) << parse_node_type << "type error, please check, line["
<< line << "] file[" << path << "]";
continue;
Expand Down Expand Up @@ -2449,12 +2462,13 @@ int32_t GraphTable::load_nodes(const std::string &path,
VLOG(0) << "node_type not specified, loading edges to "
<< id_to_feature[0] << " part";
} else {
if (feature_to_id.find(node_type) == feature_to_id.end()) {
if (node_type_str_to_node_types_idx.find(node_type)
== node_type_str_to_node_types_idx.end()) {
VLOG(0) << "node_type " << node_type
<< " is not defined, nothing will be loaded";
return 0;
}
idx = feature_to_id[node_type];
idx = node_type_str_to_node_types_idx[node_type];
}
for (auto path : paths) {
VLOG(2) << "Begin GraphTable::load_nodes(), path[" << path << "]";
Expand All @@ -2469,7 +2483,7 @@ int32_t GraphTable::load_nodes(const std::string &path,
return -1;
}

VLOG(0) << valid_count << "/" << count << " nodes in node_type[ " << node_type
VLOG(0) << valid_count << "/" << count << " nodes in node_type[" << node_type
<< "] are loaded successfully!";
return 0;
}
Expand Down Expand Up @@ -3338,6 +3352,15 @@ int32_t GraphTable::Initialize(const TableParameter &config,
return Initialize(graph);
}

std::string GraphTable::node_types_idx_to_node_type_str(int node_types_idx) {
return node_types_[node_types_idx];
}

std::string GraphTable::index_to_node_type_str(int index) {
int node_types_idx = index_to_type_[index];
return node_types_idx_to_node_type_str(node_types_idx);
}

void GraphTable::load_node_weight(int type_id, int idx, std::string path) {
auto paths = ::paddle::string::split_string<std::string>(path, ";");
int64_t count = 0;
Expand Down Expand Up @@ -3396,6 +3419,7 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {

auto graph_feature = graph.graph_feature();
auto node_types = graph.node_types();
node_types_.assign(node_types.begin(), node_types.end());
auto edge_types = graph.edge_types();

#if defined(PADDLE_WITH_HETERPS) && defined(PADDLE_WITH_GLOO)
Expand Down Expand Up @@ -3424,7 +3448,7 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
feat_dtype.resize(node_types.size());
VLOG(0) << "got " << node_types.size() << " node types in total";
for (int k = 0; k < node_types.size(); k++) {
feature_to_id[node_types[k]] = k;
node_type_str_to_node_types_idx[node_types[k]] = k;
auto node_type = node_types[k];
auto feature = graph_feature[k];
id_to_feature.push_back(node_type);
Expand Down Expand Up @@ -3458,13 +3482,13 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
<< " shape:" << f_shape << " dtype:" << f_dtype;
}
}
nodeid_to_edgeids_.resize(feature_to_id.size());
nodeid_to_edgeids_.resize(node_type_str_to_node_types_idx.size());
for (auto &obj : edge_to_id) {
size_t pos = obj.first.find("2");
CHECK(pos != std::string::npos);
std::string nodetype = obj.first.substr(0, pos);
auto it = feature_to_id.find(nodetype);
CHECK(it != feature_to_id.end());
auto it = node_type_str_to_node_types_idx.find(nodetype);
CHECK(it != node_type_str_to_node_types_idx.end());
nodeid_to_edgeids_[it->second].push_back(obj.second);
VLOG(0) << "add edge [" << obj.first
<< "=" << obj.second << "] to ["
Expand Down Expand Up @@ -3603,27 +3627,36 @@ void GraphTable::calc_edge_type_limit() {
}

void GraphTable::build_graph_type_keys() {
VLOG(0) << "begin build_graph_type_keys";
VLOG(0) << "begin build_graph_type_keys, feature size="
<< this->node_type_str_to_node_types_idx.size();
graph_type_keys_.clear();
graph_type_keys_.resize(this->feature_to_id.size());
graph_type_keys_.resize(this->node_type_str_to_node_types_idx.size());

int cnt = 0;
for (auto &it : this->feature_to_id) {
auto node_idx = it.second;
uint64_t total_key = 0;
for (auto &it : this->node_type_str_to_node_types_idx) {
auto node_types_idx = it.second;
std::vector<std::vector<uint64_t>> keys;
this->get_all_id(GraphTableType::FEATURE_TABLE, node_idx, 1, &keys);
type_to_index_[node_idx] = cnt;
this->get_all_id(GraphTableType::FEATURE_TABLE, node_types_idx, 1, &keys);
type_to_index_[node_types_idx] = cnt;
index_to_type_[cnt] = node_types_idx;
total_key += keys[0].size();
VLOG(1) << "node_type[" << node_types_[node_types_idx] << "] node_types_idx["
<< node_types_idx << "] index[" << type_to_index_[node_types_idx]
<< "] graph_type_keys_[" << cnt << "]_size=" << keys[0].size()
<< " total_key[" << total_key << "]";
graph_type_keys_[cnt++] = std::move(keys[0]);
}
VLOG(0) << "finish build_graph_type_keys";

VLOG(0) << "begin insert feature into graph_total_keys";
VLOG(0) << "begin insert feature into graph_total_keys, feature size="
<< this->node_type_str_to_node_types_idx.size();
// build feature embedding id
for (auto &it : this->feature_to_id) {
auto node_idx = it.second;
for (auto &it : this->node_type_str_to_node_types_idx) {
auto node_types_idx = it.second;
std::vector<std::vector<uint64_t>> keys;
this->get_all_feature_ids(
GraphTableType::FEATURE_TABLE, node_idx, 1, &keys);
GraphTableType::FEATURE_TABLE, node_types_idx, 1, &keys);
graph_total_keys_.insert(
graph_total_keys_.end(), keys[0].begin(), keys[0].end());
}
Expand All @@ -3635,14 +3668,17 @@ void GraphTable::build_graph_type_keys() {
void GraphTable::build_node_iter_type_keys() {
VLOG(0) << "enter build_node_iter_type_keys";
graph_type_keys_.clear();
graph_type_keys_.resize(this->feature_to_id.size());
graph_type_keys_.resize(this->node_type_str_to_node_types_idx.size());

int cnt = 0;
for (auto &it : this->feature_to_id) {
auto node_idx = it.second;
for (auto &it : this->node_type_str_to_node_types_idx) {
auto node_types_idx = it.second;
std::vector<std::vector<uint64_t>> keys;
this->get_all_id(GraphTableType::NODE_TABLE, node_idx, 1, &keys);
this->get_all_id(GraphTableType::NODE_TABLE, node_types_idx, 1, &keys);
graph_type_keys_[cnt++] = std::move(keys[0]);
VLOG(1) << "node_type[" << node_types_[node_types_idx] << "] node_types_idx["
<< node_types_idx << "] index[" << type_to_index_[node_types_idx]
<< "] graph_type_keys_num[" << keys[0].size() << "]";
}
VLOG(0) << "finish build_node_iter_type_keys";
}
Expand Down
6 changes: 5 additions & 1 deletion paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,10 +778,14 @@ class GraphTable : public Table {
void fennel_graph_feature_partition();
void fix_feature_node_shards(bool load_slot);
void stat_graph_edge_info(int type);
std::string node_types_idx_to_node_type_str(int node_types_idx);
std::string index_to_node_type_str(int index);

std::vector<uint64_t> graph_total_keys_;
std::vector<std::vector<uint64_t>> graph_type_keys_;
std::unordered_map<int, int> type_to_index_;
std::unordered_map<int, int> index_to_type_;
std::vector<std::string> node_types_;
robin_hood::unordered_set<uint64_t> unique_all_edge_keys_;
// node 2 rank
GraphNodeRank egde_node_rank_;
Expand All @@ -807,7 +811,7 @@ class GraphTable : public Table {
// int float_fea_num_{-1};
std::vector<std::unordered_map<std::string, int32_t>> feat_id_map;
std::vector<std::unordered_map<std::string, int32_t>> float_feat_id_map;
std::unordered_map<std::string, int> feature_to_id, edge_to_id;
std::unordered_map<std::string, int> node_type_str_to_node_types_idx, edge_to_id;
std::vector<std::string> id_to_feature, id_to_edge;
std::string table_name;
std::string table_type;
Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/framework/data_feed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -4260,7 +4260,7 @@ bool FillInferBuf(
cudaStream_t stream) {
auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
auto &global_infer_node_type_start =
gpu_graph_ptr->global_infer_node_type_start_[conf.gpuid];
gpu_graph_ptr->global_infer_node_type_start_[tensor_pair_idx][conf.gpuid];
auto &infer_cursor =
gpu_graph_ptr->infer_cursor_[tensor_pair_idx][conf.thread_id];
*total_row_ptr = 0;
Expand All @@ -4269,6 +4269,8 @@ bool FillInferBuf(
std::set<int> all_node_type_index_set;
for (auto &node_type : all_node_type) {
all_node_type_index_set.insert(type_to_index[node_type]);
VLOG(1) << "add all_node_type_index_set: "
<< gpu_graph_ptr->node_types_idx_to_node_type_str(node_type);
}

if (infer_cursor < h_device_keys_len.size()) {
Expand Down Expand Up @@ -4371,7 +4373,7 @@ bool GraphDataGenerator::DoWalkForInfer() {
infer_flag &= FillInferBuf(h_device_keys_len_[tensor_pair_idx],
d_device_keys_[tensor_pair_idx],
conf_,
gpu_graph_ptr->all_node_type_[tensor_pair_idx],
gpu_graph_ptr->all_node_type_,
tensor_pair_idx,
&total_row_[tensor_pair_idx],
&infer_node_start_[tensor_pair_idx],
Expand All @@ -4383,6 +4385,8 @@ bool GraphDataGenerator::DoWalkForInfer() {
d_uniq_node_num_,
place_,
sample_stream_);
VLOG(1) << "aft FillInferBuf, total_row[" << tensor_pair_idx << "] = "
<< total_row_[tensor_pair_idx];
cudaStreamSynchronize(sample_stream_);
}

Expand Down
Loading

0 comments on commit e04ad0b

Please sign in to comment.