Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Correct Searchsorted Function and Drop cupy from CuGraphStore in cugraph-pyg #3382

Merged
merged 36 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9a1ef56
pull in files, add skip for all pyg tests
alexbarghi-nv Feb 16, 2023
f0ec571
skip tests on 11.8
alexbarghi-nv Feb 16, 2023
656f5d3
Merge branch 'branch-23.04' into update-ci-for-pyg
alexbarghi-nv Feb 16, 2023
05919f1
update branch
alexbarghi-nv Feb 16, 2023
6da5291
Merge branch 'branch-23.04' into update-pyg-tests
alexbarghi-nv Feb 16, 2023
04e83c5
update skip reason
alexbarghi-nv Feb 16, 2023
3bae1ef
Merge branch 'update-ci-for-pyg' of https://github.com/alexbarghi-nv/…
alexbarghi-nv Feb 16, 2023
4510729
skip mg test on sg
alexbarghi-nv Feb 16, 2023
cd02854
update backend script
alexbarghi-nv Feb 17, 2023
cdc7733
remove backend
alexbarghi-nv Mar 8, 2023
2b36c64
separate the pyg tests, use different env
alexbarghi-nv Mar 8, 2023
447fdd3
minor formatting fix
alexbarghi-nv Mar 8, 2023
610d482
Merge branch 'branch-23.04' into update-ci-for-pyg
alexbarghi-nv Mar 9, 2023
4637af9
update pytorch req to >=2.0
alexbarghi-nv Mar 9, 2023
dd36a95
fix pytorch only to version number
alexbarghi-nv Mar 9, 2023
18d650f
add comments to ci file
alexbarghi-nv Mar 9, 2023
8f4e577
skip testing pyg on arm
alexbarghi-nv Mar 9, 2023
cfa6331
skip tests on arm
alexbarghi-nv Mar 9, 2023
731483c
Merge branch 'branch-23.04' into update-ci-for-pyg
alexbarghi-nv Mar 9, 2023
7903e36
Merge branch 'branch-23.04' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Mar 15, 2023
8e37094
fix merge conflict
alexbarghi-nv Mar 17, 2023
399c7da
Merge branch 'branch-23.04' into update-pyg-tests
alexbarghi-nv Mar 17, 2023
997ae30
Merge branch 'branch-23.04' into update-pyg-tests
alexbarghi-nv Mar 22, 2023
72972bc
resolve merge conflict
alexbarghi-nv Mar 27, 2023
c39e3ea
fix bad merge
alexbarghi-nv Mar 27, 2023
767064d
fix doc issue
alexbarghi-nv Mar 27, 2023
ede2119
add some clarifying code comments
alexbarghi-nv Mar 27, 2023
e2f597e
merge newer changes
alexbarghi-nv Mar 28, 2023
e28ef86
support gpu array input, clean up tests
alexbarghi-nv Mar 28, 2023
a414574
Merge branch 'branch-23.04' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Mar 28, 2023
efc4525
redo the changes
alexbarghi-nv Mar 28, 2023
874bc67
fix backend reference in neighbor sampler, which should probably be d…
alexbarghi-nv Mar 29, 2023
dcea019
Merge branch 'branch-23.04' into cugraph-pyg-drop-cupy
alexbarghi-nv Mar 29, 2023
06e5808
Merge branch 'branch-23.04' into cugraph-pyg-drop-cupy
alexbarghi-nv Apr 3, 2023
9a8b7cc
Merge branch 'branch-23.04' into cugraph-pyg-drop-cupy
alexbarghi-nv Apr 3, 2023
004e632
style
alexbarghi-nv Apr 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 45 additions & 111 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@
TensorType = Union[Tensor, NdArray, cudf.Series, DaskCudfSeries]


def _torch_as_array(a):
if isinstance(a, cudf.Series):
a = a.to_cupy()
if len(a) == 0:
return torch.as_tensor(a.get()).to("cuda")
return torch.as_tensor(a, device="cuda")


class EdgeLayout(Enum):
COO = "coo"
CSC = "csc"
Expand Down Expand Up @@ -196,7 +188,6 @@ def __init__(
F: cugraph.gnn.FeatureStore,
G: Union[Dict[str, Tuple[TensorType]], Dict[str, int]],
num_nodes_dict: Dict[str, int],
backend: str = "torch",
multi_gpu: bool = False,
):
"""
Expand Down Expand Up @@ -233,10 +224,6 @@ def __init__(
A dictionary mapping each node type to the count of nodes
of that type in the graph.

backend : ('torch', 'cupy') (Optional, default = 'torch')
The backend that manages tensors (default = 'torch')
Should usually be 'torch' ('torch', 'cupy' supported).

multi_gpu : bool (Optional, default = False)
Whether the store should be backed by a multi-GPU graph.
Requires dask to have been set up.
Expand All @@ -245,31 +232,7 @@ def __init__(
if None in G:
raise ValueError("Unspecified edge types not allowed in PyG")

# FIXME drop the cupy backend and remove these checks (#2995)
if backend == "torch":
asarray = _torch_as_array
from torch import int64 as vertex_dtype
from torch import float32 as property_dtype
from torch import searchsorted as searchsorted
from torch import concatenate as concatenate
from torch import arange as arange
elif backend == "cupy":
from cupy import asarray
from cupy import int64 as vertex_dtype
from cupy import float32 as property_dtype
from cupy import searchsorted as searchsorted
from cupy import concatenate as concatenate
from cupy import arange as arange
else:
raise ValueError(f"Invalid backend {backend}.")

self.__backend = backend
self.asarray = asarray
self.vertex_dtype = vertex_dtype
self.property_dtype = property_dtype
self.searchsorted = searchsorted
self.concatenate = concatenate
self.arange = arange
self.__vertex_dtype = torch.int64

self._tensor_attr_cls = CuGraphTensorAttr
self._tensor_attr_dict = defaultdict(list)
Expand Down Expand Up @@ -304,12 +267,9 @@ def __init__(
def __make_offsets(self, input_dict):
offsets = {}
offsets["stop"] = [input_dict[v] for v in sorted(input_dict.keys())]
if self.__backend == "cupy":
offsets["stop"] = cupy.array(offsets["stop"])
else:
offsets["stop"] = torch.tensor(offsets["stop"])
if torch.has_cuda:
offsets["stop"] = offsets["stop"].cuda()
offsets["stop"] = torch.tensor(offsets["stop"])
if torch.has_cuda:
offsets["stop"] = offsets["stop"].cuda()

cumsum = offsets["stop"].cumsum(0)
offsets["start"] = cumsum - offsets["stop"]
Expand Down Expand Up @@ -466,10 +426,6 @@ def __construct_graph(
def _edge_types_to_attrs(self) -> dict:
return dict(self.__edge_types_to_attrs)

@property
def backend(self) -> str:
return self.__backend

@cached_property
def _is_delayed(self):
if self.__graph is None:
Expand All @@ -480,19 +436,20 @@ def get_vertex_index(self, vtypes) -> TensorType:
if isinstance(vtypes, str):
vtypes = [vtypes]

# FIXME always use torch, drop cupy (#2995)
if self.__backend == "torch":
ix = torch.tensor([], dtype=torch.int64)
else:
ix = cupy.array([], dtype="int64")
ix = torch.tensor([], dtype=torch.int64)

if isinstance(self.__vertex_type_offsets, dict):
vtypes = np.searchsorted(self.__vertex_type_offsets["type"], vtypes)
for vtype in vtypes:
start = int(self.__vertex_type_offsets["start"][vtype])
stop = int(self.__vertex_type_offsets["stop"][vtype])
ix = self.concatenate(
[ix, self.arange(start, stop + 1, 1, dtype=self.vertex_dtype)]
ix = torch.concatenate(
[
ix,
torch.arange(
start, stop + 1, 1, dtype=self.__vertex_dtype, device="cuda"
),
]
)

return ix
Expand Down Expand Up @@ -594,25 +551,11 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType
if self._is_delayed:
df = df.compute()

src = self.asarray(df[src_col_name]) - src_offset
dst = self.asarray(df[dst_col_name]) - dst_offset
src = torch.as_tensor(df[src_col_name], device="cuda") - src_offset
dst = torch.as_tensor(df[dst_col_name], device="cuda") - dst_offset

if self.__backend == "torch":
src = src.to(self.vertex_dtype)
dst = dst.to(self.vertex_dtype)
elif self.__backend == "cupy":
src = src.astype(self.vertex_dtype)
dst = dst.astype(self.vertex_dtype)
else:
raise TypeError(f"Invalid backend type {self.__backend}")

if self.__backend == "torch":
src = src.to(self.vertex_dtype)
dst = dst.to(self.vertex_dtype)
else:
# self.__backend == 'cupy'
src = src.astype(self.vertex_dtype)
dst = dst.astype(self.vertex_dtype)
src = src.to(self.__vertex_dtype)
dst = dst.to(self.__vertex_dtype)

if src.shape[0] != dst.shape[0]:
raise IndexError("src and dst shape do not match!")
Expand Down Expand Up @@ -689,16 +632,18 @@ def _get_vertex_groups_from_sample(self, nodes_of_interest: cudf.Series) -> dict

"""

nodes_of_interest = self.asarray(nodes_of_interest.sort_values())
nodes_of_interest = torch.as_tensor(
nodes_of_interest.sort_values(), device="cuda"
)

noi_index = {}

vtypes = cudf.Series(self.__vertex_type_offsets["type"])
if len(vtypes) == 1:
noi_index[vtypes[0]] = nodes_of_interest
else:
noi_type_indices = self.searchsorted(
self.asarray(self.__vertex_type_offsets["stop"]),
noi_type_indices = torch.searchsorted(
torch.as_tensor(self.__vertex_type_offsets["stop"], device="cuda"),
nodes_of_interest,
)

Expand All @@ -712,7 +657,7 @@ def _get_vertex_groups_from_sample(self, nodes_of_interest: cudf.Series) -> dict
for type_name, ix in noi_types.items():
# store the renumbering for this vertex type
# renumbered vertex id is the index of the old id
ix = self.asarray(ix)
ix = torch.as_tensor(ix, device="cuda")
# subtract off the offsets
noi_index[type_name] = nodes_of_interest[ix] - noi_starts[ix]

Expand Down Expand Up @@ -764,14 +709,14 @@ def _get_renumbered_edge_groups_from_sample(
t_pyg_type = list(self.__edge_types_to_attrs.values())[0].edge_type
src_type, _, dst_type = t_pyg_type

sources = self.asarray(sampling_results.sources)
sources = torch.as_tensor(sampling_results.sources, device="cuda")
src_id_table = noi_index[src_type]
src = self.searchsorted(src_id_table, sources)
src = torch.searchsorted(src_id_table, sources)
row_dict[t_pyg_type] = src

destinations = self.asarray(sampling_results.destinations)
destinations = torch.as_tensor(sampling_results.destinations, device="cuda")
dst_id_table = noi_index[dst_type]
dst = self.searchsorted(dst_id_table, destinations)
dst = torch.searchsorted(dst_id_table, destinations)
col_dict[t_pyg_type] = dst
else:
# This will retrieve the single string representation.
Expand All @@ -789,27 +734,31 @@ def _get_renumbered_edge_groups_from_sample(
src_type, _, dst_type = pyg_can_edge_type

# Get the de-offsetted sources
sources = self.asarray(sampling_results.sources.iloc[ix])
sources_ix = self.searchsorted(
sources = torch.as_tensor(
sampling_results.sources.iloc[ix], device="cuda"
)
sources_ix = torch.searchsorted(
self.__vertex_type_offsets["stop"], sources
)
sources -= self.__vertex_type_offsets["start"][sources_ix]

# Create the row entry for this type
src_id_table = noi_index[src_type]
src = self.searchsorted(src_id_table, sources)
src = torch.searchsorted(src_id_table, sources)
row_dict[pyg_can_edge_type] = src

# Get the de-offsetted destinations
destinations = self.asarray(sampling_results.destinations.iloc[ix])
destinations_ix = self.searchsorted(
destinations = torch.as_tensor(
sampling_results.destinations.iloc[ix], device="cuda"
)
destinations_ix = torch.searchsorted(
self.__vertex_type_offsets["stop"], destinations
)
destinations -= self.__vertex_type_offsets["start"][destinations_ix]

# Create the col entry for this type
dst_id_table = noi_index[dst_type]
dst = self.searchsorted(dst_id_table, destinations)
dst = torch.searchsorted(dst_id_table, destinations)
col_dict[pyg_can_edge_type] = dst

return row_dict, col_dict
Expand All @@ -834,8 +783,7 @@ def create_named_tensor(
vertex_type : str
The vertex type associated with this new tensor property.
dtype : numpy/cupy dtype (i.e. 'int32') or torch dtype (i.e. torch.float)
The datatype of the tensor. Should be a dtype appropriate
for this store's backend. Usually float32/float64.
The datatype of the tensor. Usually float32/float64.
"""
self._tensor_attr_dict[vertex_type].append(
CuGraphTensorAttr(
Expand Down Expand Up @@ -895,39 +843,28 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType:
if cols is None:
t = self.__features.get_data(idx, attr.group_name, attr.attr_name)

if self.backend == "torch":
if isinstance(t, np.ndarray):
t = torch.as_tensor(t, device="cuda")
else:
t = t.cuda()
if isinstance(t, np.ndarray):
t = torch.as_tensor(t, device="cuda")
else:
t = cupy.array(t)
t = t.cuda()

return t

else:
t = self.__features.get_data(idx, attr.group_name, cols[0])

if len(t.shape) == 1:
if self.backend == "torch":
t = torch.tensor([t])
else:
t = cupy.array([t])
t = torch.tensor([t])

for col in cols[1:]:
u = self.__features.get_data(idx, attr.group_name, col)

if len(u.shape) == 1:
if self.backend == "torch":
u = torch.tensor([u])
else:
u = cupy.array([u])
u = torch.tensor([u])

t = torch.concatenate([t, u])

if self.backend == "torch":
t = t.cuda()
else:
t = cupy.array(t)
t = t.cuda()
return t

def _multi_get_tensor(self, attrs: List[CuGraphTensorAttr]) -> List[TensorType]:
Expand Down Expand Up @@ -1016,10 +953,7 @@ def get_tensor(self, *args, **kwargs) -> TensorType:
return tensor

def _get_tensor_size(self, attr: CuGraphTensorAttr) -> Union[List, int]:
if self.__backend == "cupy":
return self._get_tensor(attr).size
else:
return self._get_tensor(attr).size()
return self._get_tensor(attr).size()

def get_tensor_size(self, *args, **kwargs) -> Union[List, int]:
r"""
Expand Down
10 changes: 1 addition & 9 deletions python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,6 @@ def __neighbor_sample(
metadata=None,
**kwargs,
) -> Union[dict, HeteroSamplerOutput]:
backend = self.__graph_store.backend
if backend != self.__feature_store.backend:
raise ValueError(
f"Graph store backend {backend}"
f"does not match feature store "
f"backend {self.__feature_store.backend}"
)

if not directed:
raise ValueError("Undirected sampling not currently supported")

Expand All @@ -180,7 +172,7 @@ def __neighbor_sample(
# FIXME support variable num neighbors per edge type
num_neighbors = list(num_neighbors.values())[0]

if backend == "torch" and not index.is_cuda:
if not index.is_cuda:
index = index.cuda()

G = self.__graph_store._subgraph(edge_types)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ def test_serialize(multi_edge_multi_vertex_no_graph_1):

F, G, N = multi_edge_multi_vertex_no_graph_1
cugraph_store = CuGraphStore(F, G, N)

cugraph_store_copy = pickle.loads(pickle.dumps(cugraph_store))

for tensor_attr in cugraph_store.get_all_tensor_attrs():
Expand Down