Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Table Name; Feature Info #9

Merged
merged 6 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 97 additions & 3 deletions paddle/fluid/distributed/service/graph_py_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@ std::vector<std::string> GraphPyService::split(std::string& str,
return res;
}


void GraphPyService::add_table_feat_conf(std::string table_name,
std::string feat_name,
std::string feat_dtype,
int32_t feat_shape) {
if(this->table_id_map.count(table_name)) {
this->table_feat_conf_table_name.push_back(table_name);
this->table_feat_conf_feat_name.push_back(feat_name);
this->table_feat_conf_feat_dtype.push_back(feat_dtype);
this->table_feat_conf_feat_shape.push_back(feat_shape);
}
}


void GraphPyService::set_up(std::string ips_str, int shard_num,
std::vector<std::string> node_types,
std::vector<std::string> edge_types) {
set_shard_num(shard_num);
set_num_node_types(node_types.size());
// set_client_Id(client_id);
// set_rank(rank);

Expand Down Expand Up @@ -121,7 +136,31 @@ ::paddle::distributed::PSParameter GraphPyServer::GetServerProto() {
VLOG(0) << " make a new table " << tuple.second;
::paddle::distributed::TableParameter* sparse_table_proto =
downpour_server_proto->add_downpour_table_param();
GetDownpourSparseTableProto(sparse_table_proto, tuple.second);
std::vector<std::string > feat_name;
std::vector<std::string > feat_dtype;
std::vector<int32_t> feat_shape;
for(size_t i=0; i<this->table_feat_conf_table_name.size(); i++) {
if(tuple.first == table_feat_conf_table_name[i]) {
feat_name.push_back(table_feat_conf_feat_name[i]);
feat_dtype.push_back(table_feat_conf_feat_dtype[i]);
feat_shape.push_back(table_feat_conf_feat_shape[i]);
}
}
std::string table_type;
if(tuple.second < this->num_node_types) {
table_type = "node";
}
else {
table_type = "edge";
}

GetDownpourSparseTableProto(sparse_table_proto,
tuple.second,
tuple.first,
table_type,
feat_name,
feat_dtype,
feat_shape);
}

return server_fleet_desc;
Expand All @@ -137,11 +176,38 @@ ::paddle::distributed::PSParameter GraphPyClient::GetWorkerProto() {
worker_proto->mutable_downpour_worker_param();

for (auto& tuple : this->table_id_map) {
VLOG(0) << " make a new table " << tuple.second;
::paddle::distributed::TableParameter* worker_sparse_table_proto =
downpour_worker_proto->add_downpour_table_param();
GetDownpourSparseTableProto(worker_sparse_table_proto, tuple.second);
std::vector<std::string > feat_name;
std::vector<std::string > feat_dtype;
std::vector<int32_t> feat_shape;
for(size_t i=0; i<this->table_feat_conf_table_name.size(); i++) {
if(tuple.first == table_feat_conf_table_name[i]) {
feat_name.push_back(table_feat_conf_feat_name[i]);
feat_dtype.push_back(table_feat_conf_feat_dtype[i]);
feat_shape.push_back(table_feat_conf_feat_shape[i]);
}
}
std::string table_type;
if(tuple.second < this->num_node_types) {
table_type = "node";
}
else {
table_type = "edge";
}

GetDownpourSparseTableProto(worker_sparse_table_proto,
tuple.second,
tuple.first,
table_type,
feat_name,
feat_dtype,
feat_shape);
}



::paddle::distributed::ServerParameter* server_proto =
worker_fleet_desc.mutable_server_param();
::paddle::distributed::DownpourServerParameter* downpour_server_proto =
Expand All @@ -155,11 +221,38 @@ ::paddle::distributed::PSParameter GraphPyClient::GetWorkerProto() {
server_service_proto->set_server_thread_num(12);

for (auto& tuple : this->table_id_map) {
VLOG(0) << " make a new table " << tuple.second;
::paddle::distributed::TableParameter* sparse_table_proto =
downpour_server_proto->add_downpour_table_param();
GetDownpourSparseTableProto(sparse_table_proto, tuple.second);
std::vector<std::string > feat_name;
std::vector<std::string > feat_dtype;
std::vector<int32_t> feat_shape;
for(size_t i=0; i<this->table_feat_conf_table_name.size(); i++) {
if(tuple.first == table_feat_conf_table_name[i]) {
feat_name.push_back(table_feat_conf_feat_name[i]);
feat_dtype.push_back(table_feat_conf_feat_dtype[i]);
feat_shape.push_back(table_feat_conf_feat_shape[i]);
}
}
std::string table_type;
if(tuple.second < this->num_node_types) {
table_type = "node";
}
else {
table_type = "edge";
}

GetDownpourSparseTableProto(sparse_table_proto,
tuple.second,
tuple.first,
table_type,
feat_name,
feat_dtype,
feat_shape);
}



return worker_fleet_desc;
}
void GraphPyClient::load_edge_file(std::string name, std::string filepath,
Expand Down Expand Up @@ -232,3 +325,4 @@ std::vector<GraphNode> GraphPyClient::pull_graph_list(std::string name,
}
}
}

26 changes: 25 additions & 1 deletion paddle/fluid/distributed/service/graph_py_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ class GraphPyService {
protected:
std::vector<std::string> server_list, port_list, host_sign_list;
int server_size, shard_num;
int num_node_types;
std::unordered_map<std::string, uint32_t> table_id_map;
std::vector<std::string> table_feat_conf_table_name;
std::vector<std::string> table_feat_conf_feat_name;
std::vector<std::string> table_feat_conf_feat_dtype;
std::vector<int32_t> table_feat_conf_feat_shape;

// std::thread *server_thread, *client_thread;

// std::shared_ptr<paddle::distributed::PSServer> pserver_ptr;
Expand All @@ -65,25 +71,43 @@ class GraphPyService {
void set_shard_num(int shard_num) { this->shard_num = shard_num; }
void GetDownpourSparseTableProto(
::paddle::distributed::TableParameter* sparse_table_proto,
uint32_t table_id) {
uint32_t table_id,
std::string table_name,
std::string table_type,
std::vector<std::string> feat_name,
std::vector<std::string> feat_dtype,
std::vector<int32_t> feat_shape) {
sparse_table_proto->set_table_id(table_id);
sparse_table_proto->set_table_class("GraphTable");
sparse_table_proto->set_shard_num(shard_num);
sparse_table_proto->set_type(::paddle::distributed::PS_SPARSE_TABLE);
::paddle::distributed::TableAccessorParameter* accessor_proto =
sparse_table_proto->mutable_accessor();

::paddle::distributed::CommonAccessorParameter* common_proto =
sparse_table_proto->mutable_common();

// Set GraphTable Parameter
common_proto->set_table_name(table_name);
common_proto->set_name(table_type);
for(size_t i = 0;i < feat_name.size();i ++) {
common_proto->add_params(feat_dtype[i]);
common_proto->add_dims(feat_shape[i]);
common_proto->add_attributes(feat_name[i]);
}

accessor_proto->set_accessor_class("CommMergeAccessor");
}

void set_server_size(int server_size) { this->server_size = server_size; }
void set_num_node_types(int num_node_types) { this->num_node_types = num_node_types; }
int get_server_size(int server_size) { return server_size; }
std::vector<std::string> split(std::string& str, const char pattern);
void set_up(std::string ips_str, int shard_num,
std::vector<std::string> node_types,
std::vector<std::string> edge_types);

void add_table_feat_conf(std::string node_type, std::string feat_name, std::string feat_dtype, int32_t feat_shape);
};
class GraphPyServer : public GraphPyService {
public:
Expand Down
16 changes: 16 additions & 0 deletions paddle/fluid/distributed/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,22 @@ int32_t GraphTable::initialize() {
and _shard_idx to server
rank
*/
auto common = _config.common();

this->table_name = common.table_name();
this->table_type = common.name();
VLOG(0) << " init graph table type " << this->table_type << " table name " << this->table_name;
int feat_conf_size = static_cast<int>(common.attributes().size());
for(int i=0; i<feat_conf_size;i ++) {
auto & f_name= common.attributes()[i];
auto & f_shape = common.dims()[i];
auto & f_dtype = common.params()[i];
this->feat_name.push_back(f_name);
this->feat_shape.push_back(f_shape);
this->feat_dtype.push_back(f_dtype);
VLOG(0) << "init graph table feat conf name:"<< f_name << " shape:" << f_shape << " dtype:" << f_dtype;
}

shard_num = _config.shard_num();
VLOG(0) << "in init graph table shard num = " << shard_num << " shard_idx"
<< _shard_idx;
Expand Down
7 changes: 7 additions & 0 deletions paddle/fluid/distributed/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class GraphTable : public SparseTable {
size_t shard_start, shard_end, server_num, shard_num_per_table, shard_num;
const int task_pool_size_ = 11;
const int random_sample_nodes_ranges = 3;

std::vector<std::string > feat_name;
std::vector<std::string > feat_dtype;
std::vector<int32_t > feat_shape;
std::string table_name;
std::string table_type;

std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
};
}
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/pybind/fleet_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,16 @@ void BindGraphPyServer(py::module* m) {
py::class_<GraphPyServer>(*m, "GraphPyServer")
.def(py::init<>())
.def("start_server", &GraphPyServer::start_server)
.def("set_up", &GraphPyServer::set_up);
.def("set_up", &GraphPyServer::set_up)
.def("add_table_feat_conf", &GraphPyServer::add_table_feat_conf);
}
void BindGraphPyClient(py::module* m) {
py::class_<GraphPyClient>(*m, "GraphPyClient")
.def(py::init<>())
.def("load_edge_file", &GraphPyClient::load_edge_file)
.def("load_node_file", &GraphPyClient::load_node_file)
.def("set_up", &GraphPyClient::set_up)
.def("add_table_feat_conf", &GraphPyClient::add_table_feat_conf)
.def("pull_graph_list", &GraphPyClient::pull_graph_list)
.def("start_client", &GraphPyClient::start_client)
.def("batch_sample_neighboors", &GraphPyClient::batch_sample_neighboors)
Expand Down