Skip to content

Commit

Permalink
Merge pull request #2 from jameshalgren/remote_data_load
Browse files Browse the repository at this point in the history
Remote data load
  • Loading branch information
arpita0911patel authored Sep 24, 2022
2 parents 6631907 + f3ed9a1 commit c67bfd2
Show file tree
Hide file tree
Showing 35 changed files with 3,389 additions and 0 deletions.
Empty file added data/.gitkeep
Empty file.
328 changes: 328 additions & 0 deletions parquet/example_remote_dl_parquet.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
},
"tags": []
},
"outputs": [],
"source": [
"import pandas as pd\n",
"from pyarrow.parquet import ParquetFile\n",
"import dask.dataframe as dd\n",
"import os\n",
"import xarray as xr\n",
"import ujson\n",
"import pprint\n",
"\n",
"#%matplotlib inline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# These fs options don't work for http... beware!\n",
"so = dict(mode=\"rb\", anon=True, default_fill_cache=False, default_cache_type=\"first\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def gen_json(u, fs, outf=None):\n",
" with fs.open(u, **so) as infile:\n",
" h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)\n",
" p = u.split(\"/\")\n",
" date = p[3]\n",
" fname = p[5]\n",
" if outf:\n",
" # outf = f'{json_dir}{date}.{fname}.json'\n",
" with open(outf, \"wb\") as f:\n",
" f.write(ujson.dumps(h5chunks.translate()).encode())\n",
" else:\n",
" return h5chunks.translate()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# dir_files = [os.path.join(\"../short_range_18files\", files) for files in os.listdir(\"../short_range_18files\")]\n",
"# dir_files = [os.path.join(\"short_range_2files\", files) for files in os.listdir(\"short_range_2files\")]\n",
"# print(dir_files)\n",
"dir_files = [\n",
" \"nwm.t00z.short_range.channel_rt.f001.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f002.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f003.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f004.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f005.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f006.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f007.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f008.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f009.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f010.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f011.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f012.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f013.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f014.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f015.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f016.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f017.conus.nc\",\n",
" \"nwm.t00z.short_range.channel_rt.f018.conus.nc\",\n",
"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import fsspec\n",
"import xarray as xr\n",
"from kerchunk.hdf import SingleHdf5ToZarr\n",
"\n",
"fs = fsspec.filesystem(\"gcs\", anon=True)\n",
"\n",
"# https://storage.googleapis.com/national-water-model/nwm.20220911/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc\n",
"# gcs_url = \"gcs://national-water-model/nwm.20220911/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc\"\n",
"gcs_url = \"gcs://national-water-model/nwm.20220911/short_range/\"\n",
"\n",
"sr_h5 = []\n",
"for f in dir_files:\n",
" print(f)\n",
" sr_h5.append(gen_json(gcs_url + f, fs))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%time\n",
"fds = []\n",
"for xj in sr_h5:\n",
" backend_args = {\n",
" \"consolidated\": False,\n",
" \"storage_options\": {\n",
" \"fo\": xj,\n",
" # Adding these options returns a properly dimensioned but otherwise null dataframe\n",
" # \"remote_protocol\": \"https\",\n",
" # \"remote_options\": {'anon':True}\n",
" },\n",
" }\n",
" fds.append(\n",
" xr.open_dataset(\n",
" \"reference://\",\n",
" engine=\"zarr\",\n",
" mask_and_scale=False,\n",
" backend_kwargs=backend_args,\n",
" )\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%time\n",
"ds = xr.concat(fds, dim=\"time\")\n",
"ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"df = ds[\"streamflow\"].to_dataframe()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# df = df.streamflow\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%time\n",
"# df = pd.Series.to_frame(df)\n",
"\n",
"df.to_parquet(\n",
" \"../data/parquet_all_feature_ids.gzip\", engine=\"pyarrow\", compression=\"gzip\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"ParquetFile(\"../data/parquet_all_feature_ids.gzip\").metadata # num_columns: 3"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%time\n",
"data = dd.read_parquet(\n",
" \"../data/parquet_all_feature_ids.gzip\", storage_options={\"anon\": True}\n",
")\n",
"data\n",
"result = data.compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# result = result.loc[:, [101]]\n",
"result = result.loc[:, 100:1032]\n",
"# result= result.loc[:, :, 1000:11000]\n",
"# result= result.loc[:, :, 10000:110000]\n",
"result\n",
"r_xa = result.to_xarray()\n",
"r_xa\n",
"r_xa.plot.scatter(\"time\", \"streamflow\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Loading

0 comments on commit c67bfd2

Please sign in to comment.