Skip to content

Commit

Permalink
Fix PyG Loaders by properly supporting multi_get_tensor (#2860)
Browse files Browse the repository at this point in the history
Fixes the currently-broken `pyg_hetero_mag` notebook, which fails due to the sampling output not matching what the API expects.  Note: this does use an optional import of PyG, similar to how other code optionally uses `torch` or `cudf`.

Also adds an in-place `fillna` function for `PropertyGraph` and `MGPropertyGraph`.  There is a separate issue to do this for `RemoteGraph` (rapidsai/graph_dl#97).

Also removes an unintended dependence on `cugraph` by adding `is_multi_gpu()` methods to `PropertyGraph` and `MGPropertyGraph` so whether a graph is MG can be determined without importing `cugraph`.

Closes rapidsai/graph_dl#78
Closes rapidsai/graph_dl#77
Closes rapidsai/graph_dl#63

Merge after #2832

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Erik Welch (https://github.com/eriknw)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - Erik Welch (https://github.com/eriknw)

URL: #2860
  • Loading branch information
alexbarghi-nv authored Nov 17, 2022
1 parent 435e1c3 commit 6574e55
Show file tree
Hide file tree
Showing 11 changed files with 570 additions and 192 deletions.
27 changes: 18 additions & 9 deletions notebooks/gnn/pyg_hetero_mag.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"import rmm\n",
"\n",
"rmm.reinitialize(pool_allocator=True,initial_pool_size=5e+9, maximum_pool_size=20e+9)"
Expand Down Expand Up @@ -86,7 +85,7 @@
" last_offset += num_nodes\n",
" \n",
" blank_df = cudf.DataFrame({'id':range(vertex_offsets[node_type], vertex_offsets[node_type] + num_nodes)})\n",
" blank_df.id = blank_df.id.astype('int32')\n",
" blank_df.id = blank_df.id.astype('int64')\n",
" if isinstance(pG, MGPropertyGraph):\n",
" blank_df = dask_cudf.from_cudf(blank_df, npartitions=2)\n",
" pG.add_vertex_data(blank_df, vertex_col_name='id', type_name=node_type)\n",
Expand All @@ -113,11 +112,14 @@
" feature_df = cudf.DataFrame(node_features)\n",
" feature_df.columns = [str(c) for c in range(feature_df.shape[1])]\n",
" feature_df['id'] = range(vertex_offset, vertex_offset + node_features.shape[0])\n",
" feature_df.id = feature_df.id.astype('int32')\n",
" feature_df.id = feature_df.id.astype('int64')\n",
" if isinstance(pG, MGPropertyGraph):\n",
" feature_df = dask_cudf.from_cudf(feature_df, npartitions=2)\n",
"\n",
" pG.add_vertex_data(feature_df, vertex_col_name='id', type_name=node_type)"
" pG.add_vertex_data(feature_df, vertex_col_name='id', type_name=node_type)\n",
"\n",
"# Fill in an empty value for vertices without properties.\n",
"pG.fillna_vertices(0.0)"
]
},
{
Expand All @@ -141,8 +143,8 @@
" eidx = [n + vertex_offset_src for n in eidx[0]], [n + vertex_offset_dst for n in eidx[1]]\n",
"\n",
" edge_df = cudf.DataFrame({'src':eidx[0], 'dst':eidx[1]})\n",
" edge_df.src = edge_df.src.astype('int32')\n",
" edge_df.dst = edge_df.dst.astype('int32')\n",
" edge_df.src = edge_df.src.astype('int64')\n",
" edge_df.dst = edge_df.dst.astype('int64')\n",
" edge_df['type'] = edge_type\n",
" if isinstance(pG, MGPropertyGraph):\n",
" edge_df = dask_cudf.from_cudf(edge_df, npartitions=2)\n",
Expand All @@ -167,7 +169,7 @@
"source": [
"y_df = cudf.DataFrame(data[1]['paper'], columns=['y'])\n",
"y_df['id'] = range(vertex_offsets['paper'], vertex_offsets['paper'] + len(y_df))\n",
"y_df.id = y_df.id.astype('int32')\n",
"y_df.id = y_df.id.astype('int64')\n",
"if isinstance(pG, MGPropertyGraph):\n",
" y_df = dask_cudf.from_cudf(y_df, npartitions=2)\n",
"\n",
Expand Down Expand Up @@ -219,15 +221,15 @@
" shuffle=True,\n",
" batch_size=50,\n",
" node_sampler=sampler,\n",
" input_nodes='author'\n",
" input_nodes=('author', graph_store.get_vertex_index('author'))\n",
")\n",
"\n",
"test_loader = NodeLoader(\n",
" data=(feature_store, graph_store),\n",
" shuffle=True,\n",
" batch_size=50,\n",
" node_sampler=sampler,\n",
" input_nodes='author'\n",
" input_nodes=('author', graph_store.get_vertex_index('author'))\n",
")\n"
]
},
Expand Down Expand Up @@ -357,6 +359,13 @@
" train_acc = test()\n",
" print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, Train: {train_acc:.4f}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
10 changes: 5 additions & 5 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ def uniform_neighbor_sample(
start_list = [start_list]

if isinstance(start_list, list):
start_list = cudf.Series(start_list, dtype="int32")
# FIXME: ensure other sequence types (eg. cudf Series) can be handled.
if start_list.dtype != "int32":
raise ValueError(
f"'start_list' must have int32 values, " f"got: {start_list.dtype}"
start_list = cudf.Series(
start_list,
dtype=input_graph.edgelist.edgelist_df[
input_graph.renumber_map.renumbered_src_col_name
].dtype,
)

# fanout_vals must be a host array!
Expand Down
38 changes: 38 additions & 0 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,37 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None):

return None

def fillna_vertices(self, val=0):
"""
Fills empty vertex property values with the given value, zero by default.
Fills in-place.
Parameters
----------
val : object, Series, or dict
The object that will replace "na". Default = 0. If a dict or
Series is passed, the index or keys are the columns to fill
and the values are the fill value for the corresponding column.
"""
self.__vertex_prop_dataframe = self.__vertex_prop_dataframe.fillna(
val
).persist()

def fillna_edges(self, val=0):
"""
Fills empty edge property values with the given value, zero by default.
Fills in-place.
Parameters
----------
val : object, Series, or dict
The object that will replace "na". Default = 0. If a dict or
Series is passed, the index or keys are the columns to fill
and the values are the fill value for the corresponding column.
"""

self.__edge_prop_dataframe = self.__edge_prop_dataframe.fillna(val).persist()

def select_vertices(self, expr, from_previous_selection=None):
raise NotImplementedError

Expand Down Expand Up @@ -1172,6 +1203,13 @@ def renumber_edges_by_type(self):
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def is_multi_gpu(self):
"""
Return True if this is a multi-gpu graph. Always returns True for
MGPropertyGraph.
"""
return True

@classmethod
def is_multigraph(cls, df):
"""
Expand Down
Loading

0 comments on commit 6574e55

Please sign in to comment.