diff --git a/notebooks/modules/README.md b/notebooks/modules/README.md new file mode 100644 index 00000000000..29cb47784c7 --- /dev/null +++ b/notebooks/modules/README.md @@ -0,0 +1,3 @@ +# This folde contains partial notebooks + +The code here is meant to be used to build other notebooks \ No newline at end of file diff --git a/notebooks/modules/SKIP_CI_TESTING b/notebooks/modules/SKIP_CI_TESTING new file mode 100644 index 00000000000..e69de29bb2d diff --git a/notebooks/modules/mag240m_pg.ipynb b/notebooks/modules/mag240m_pg.ipynb new file mode 100644 index 00000000000..151a3d5307f --- /dev/null +++ b/notebooks/modules/mag240m_pg.ipynb @@ -0,0 +1,426 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load the MAG240M Dataset into a Property Graph\n", + "\n", + "The notebook is meant to stress test loading data into a property graph. \n", + "This notebook requires some setup \n", + "\n", + "__num_gpus = Enter the number of GPUs that you have in the cluster. This will determine the number of records loaded__\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# location of where the data is being downloaded/saved to\n", + "base_dir = \"../../datasets\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "__The notebook requires that the data has already been downloaded__\n", + "\n", + "\n", + " from ogb.lsc import MAG240MDataset\n", + " dataset = MAG240MDataset(root = base_dir)\n", + " dataset.download()\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Feature variable\n", + "skip_edges = False\n", + "skip_features = False\n", + "\n", + "load_paper_features = True\n", + "load_paper_labels = True\n", + "load_paper_year = True\n", + "\n", + "load_affiliation_edges = True\n", + "load_writes_edges = True\n", + "load_cites_edges = True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import needed libraries. \n", + "# We recommend using the [cugraph_dev](https://github.com/rapidsai/cugraph/tree/branch-22.10/conda/environments) env through conda\n", + "\n", + "from dask.distributed import Client, wait\n", + "from dask_cuda import LocalCUDACluster\n", + "from cugraph.dask.comms import comms as Comms\n", + "import cugraph.dask as dask_cugraph\n", + "import cugraph\n", + "import dask_cudf\n", + "import cudf\n", + "import time\n", + "import numpy as np" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup variables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# size of dataset\n", + "class Dataset:\n", + " def __init__(self):\n", + " self.num_papers = 121751666\n", + " self.num_authors = 122383112\n", + " self.num_institutions = 25721\n", + " self.num_paper_features = 768\n", + " self.num_classes = 153\n", + " self.num_cite_edges = 1297748926\n", + " self.num_write_edges = 386022720\n", + " self.num_affiliated_edges = 44592586\n", + " \n", + " def adjust_by(self, factor):\n", + " self.num_papers = int(self.num_papers * factor)\n", + " self.num_authors = int(self.num_authors * factor)\n", + " self.num_institutions = int(self.num_institutions * factor)\n", + " self.num_paper_features = int(self.num_paper_features * factor)\n", + " self.num_classes = int(self.num_classes * factor)\n", + " self.num_cite_edges = int(self.num_cite_edges * factor)\n", + " self.num_write_edges = int(self.num_write_edges * factor)\n", + " self.num_affiliated_edges = int(self.num_affiliated_edges * factor) \n", + "\n", + "dataset = Dataset()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Directories and Files" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This NEEDS to be set to the location of the downloaded data\n", + "mag_dir = base_dir + \"/mag240m_kddcup2021/processed\"\n", + "\n", + "# Features\n", + "paper_feature_file = mag_dir + \"/paper/node_feat.npy\"\n", + "paper_label_file = mag_dir + \"/paper/node_label.npy\"\n", + "paper_year_file = mag_dir + \"/paper/node_year.npy\"\n", + "\n", + "# Edges\n", + "auth_institute_file = mag_dir + \"/author___affiliated_with___institution/edge_index.npy\"\n", + "auth_write_file = mag_dir + \"/author___writes___paper/edge_index.npy\"\n", + "auth_cites_file = mag_dir + \"/paper___cites___paper/edge_index.npy\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#!nvidia-smi" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Specify the GPUs to use\n", + "import os\n", + "os.environ[\"CUDA_VISIBLE_DEVICES\"]=\"0\"\n", + "\n", + "# The number oif GPUs in your system - here we are just testing with 1\n", + "num_gpus = 1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# it takes 16 GPUs to fully load the data\n", + "# compute the percent of data to be loaded\n", + "\n", + "# Note, you can adjust the amount of data loaded by increasing or decreasisng this value\n", + "# use a large number to load a small amount per GPU\n", + "\n", + "# This will be used later\n", + "percent_data_factor = num_gpus / 16" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataset.adjust_by(factor=percent_data_factor)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# number of features to load?\n", + "# there are 768 features, specify how many to be loaded\n", + "# the code just loadfs feature 0 to N, sequencially\n", + "#num_features = 768\n", + "\n", + "# using just 10 features in this test\n", + "num_features = 10" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup the Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the DASK Cluster\n", + "cluster = LocalCUDACluster()\n", + "client = Client(cluster)\n", + "Comms.initialize(p2p=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create the Property Graph" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from cugraph.experimental import MGPropertyGraph\n", + "from cugraph.experimental import PropertyGraph\n", + "\n", + "pG = PropertyGraph()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load the Paper (Vertex) Features\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def data_load_properties(_PG, file_name, num_rec, name=None, chunk=-1, data_range=None, col_names=None):\n", + " \n", + " # are we using SG or MG PG?\n", + " _use_dask = isinstance(_PG, MGPropertyGraph)\n", + " \n", + " _data = np.lib.format.open_memmap(file_name, mode='r') \n", + " \n", + " if chunk == -1:\n", + " chunk = num_rec\n", + " \n", + " _rec_read = 0\n", + " \n", + " while _rec_read < num_rec:\n", + " _start_id = _rec_read\n", + " _end_id = _start_id + chunk\n", + " \n", + " if (_end_id > num_rec):\n", + " _end_id = num_rec\n", + "\n", + " print(f\"reading {name} data from {_start_id} to {_end_id}\")\n", + " \n", + " if data_range is not None:\n", + " _x = _data[_start_id:_end_id, 0:data_range]\n", + " else:\n", + " _x = _data[_start_id:_end_id]\n", + "\n", + " gdf = cudf.DataFrame(_x, columns=col_names)\n", + " gdf['id'] = gdf.index + _start_id\n", + " gdf.columns = gdf.columns.astype(str)\n", + "\n", + " if _use_dask:\n", + " ddf = dask_cudf.from_cudf(gdf, npartitions=num_gpus)\n", + " else:\n", + " ddf = gdf\n", + "\n", + " pG.add_vertex_data(ddf, vertex_col_name='id', type_name=name)\n", + "\n", + " _rec_read = _end_id " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if not skip_features:\n", + " \n", + " if load_paper_labels:\n", + " data_load_properties(pG, paper_label_file, num_rec=dataset.num_papers, name='paper_label', col_names=[\"label\"])\n", + " print(f\"PG now contains {pG.get_num_vertices()} \")\n", + " \n", + " if load_paper_year:\n", + " data_load_properties(pG, paper_year_file, num_rec=dataset.num_papers, name='paper_year', col_names=[\"year\"])\n", + " print(f\"PG now contains {pG.get_num_vertices()} \")\n", + " \n", + " if load_paper_features:\n", + " data_load_properties(pG, paper_feature_file, num_rec=dataset.num_papers, name='paper_feature', data_range=num_features)\n", + " print(f\"PG now contains {pG.get_num_vertices()} \")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load the Edges" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def data_load_edges(_PG, file_name, num_edges, name=None, chunk=-1, col_names=None):\n", + " \n", + " # are we using SG or MG PG?\n", + " _use_dask = isinstance(_PG, MGPropertyGraph)\n", + " \n", + " _data = np.lib.format.open_memmap(file_name, mode='r') \n", + " \n", + " if chunk == -1:\n", + " chunk = num_edges\n", + " \n", + " _rec_read = 0\n", + " \n", + " while _rec_read < num_edges:\n", + " _start_id = _rec_read\n", + " _end_id = _start_id + chunk\n", + " \n", + " if (_end_id > num_edges):\n", + " _end_id = num_edges\n", + "\n", + " print(f\"reading {name} data from {_start_id} to {_end_id}\")\n", + " \n", + " _x = _data[_start_id:_end_id]\n", + "\n", + " gdf = cudf.DataFrame()\n", + " gdf['src'] = _x[0]\n", + " gdf['dst'] = _x[1]\n", + " gdf.columns = gdf.columns.astype(str)\n", + "\n", + " if _use_dask:\n", + " ddf = dask_cudf.from_cudf(gdf, npartitions=num_gpus)\n", + " else:\n", + " ddf = gdf\n", + "\n", + " pG.add_edge_data(ddf, vertex_col_names=['src', 'dst'], type_name=name)\n", + "\n", + " _rec_read = _end_id " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if not skip_edges: \n", + " if load_affiliation_edges:\n", + " data_load_edges(pG, auth_institute_file, num_edges=dataset.num_affiliated_edges, name=\"affiliated\" )\n", + " print(f\"PG now contains {pG.get_num_edges()} \")\n", + "\n", + " if load_writes_edges:\n", + " data_load_edges(pG, auth_write_file, num_edges=dataset.num_write_edges, name=\"writes\" )\n", + " print(f\"PG now contains {pG.get_num_edges()} \")\n", + "\n", + " if load_cites_edges:\n", + " data_load_edges(pG, auth_cites_file, num_edges=dataset.num_cite_edges, name=\"cites\" )\n", + " print(f\"PG now contains {pG.get_num_edges()} \")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"PG now contains {pG.get_num_edges()} \")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "cugraph_dev", + "language": "python", + "name": "cugraph_dev" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}