Skip to content

Commit

Permalink
Change to run grib_to_netcdf on skookum
Browse files Browse the repository at this point in the history
grib_to_netcdf, after refactor to use HRDPS continental grid subdomain that
is cropped to just what we need to the SalishSeaCast domain, has a much
smaller memory demand. So, the dask cluster on salish is no longer required.
  • Loading branch information
douglatornell committed Apr 7, 2023
1 parent 41348a9 commit 352961a
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 44 deletions.
1 change: 0 additions & 1 deletion config/nowcast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,6 @@ zmq:
"salish.eos.ubc.ca:5580", "salish.eos.ubc.ca:5581", "salish.eos.ubc.ca:5582",
"salish.eos.ubc.ca:5583", "salish.eos.ubc.ca:5584", "salish.eos.ubc.ca:5585",
]
grib_to_netcdf: "salish.eos.ubc.ca:5562"


message registry:
Expand Down
12 changes: 2 additions & 10 deletions nowcast/next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,11 @@ def after_crop_gribs(msg, config, checklist):
}
if msg.type == "success 06":
next_workers["success 06"].append(
NextWorker(
"nowcast.workers.grib_to_netcdf",
args=["forecast2"],
host="salish-nowcast",
)
NextWorker("nowcast.workers.grib_to_netcdf", args=["forecast2"])
)
if msg.type == "success 12":
next_workers["success 12"].append(
NextWorker(
"nowcast.workers.grib_to_netcdf",
args=["nowcast+"],
host="salish-nowcast",
)
NextWorker("nowcast.workers.grib_to_netcdf", args=["nowcast+"])
)
return next_workers[msg.type]

Expand Down
3 changes: 0 additions & 3 deletions nowcast/workers/grib_to_netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from pathlib import Path

import arrow
import dask.distributed
import numpy
from nemo_nowcast import NowcastWorker
import xarray
Expand Down Expand Up @@ -110,7 +109,6 @@ def grib_to_netcdf(parsed_args, config, *args):
# So, we compensate for that here.
acpc_index = var_names.index(["APCP_Sfc", "unknown", "precip"])
var_names[acpc_index] = ["APCP_Sfc", "t", "precip"]
dask_client = dask.distributed.Client(config["weather"]["dask cluster"])
match run_type:
case "nowcast+":
logger.info(
Expand Down Expand Up @@ -224,7 +222,6 @@ def grib_to_netcdf(parsed_args, config, *args):
fcst=True,
)
_update_checklist(nc_file, checklist, fcst=True)
dask_client.close()
return checklist


Expand Down
1 change: 0 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ def test_log_aggregator_ports(self, prod_config):
"salish.eos.ubc.ca:5584",
"salish.eos.ubc.ca:5585",
],
"grib_to_netcdf": "salish.eos.ubc.ca:5562",
}


Expand Down
12 changes: 2 additions & 10 deletions tests/test_next_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,7 @@ def mock_now():
workers = next_workers.after_crop_gribs(
Message("crop_gribs", "success 06"), config, checklist
)
expected = [
NextWorker(
"nowcast.workers.grib_to_netcdf", ["forecast2"], host="salish-nowcast"
),
]
expected = [NextWorker("nowcast.workers.grib_to_netcdf", ["forecast2"])]
assert workers == expected

def test_success_12(self, config, checklist, monkeypatch):
Expand All @@ -459,11 +455,7 @@ def mock_now():
workers = next_workers.after_crop_gribs(
Message("crop_gribs", "success 12"), config, checklist
)
expected = [
NextWorker(
"nowcast.workers.grib_to_netcdf", ["nowcast+"], host="salish-nowcast"
),
]
expected = [NextWorker("nowcast.workers.grib_to_netcdf", ["nowcast+"])]
assert workers == expected


Expand Down
19 changes: 0 additions & 19 deletions tests/workers/test_grib_to_netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def config(base_config):
ops dir: forcing/atmospheric/continental2.5/nemo_forcing/
file template: "hrdps_{:y%Ym%md%d}.nc"
dask cluster: tcp://142.103.36.12:4386
"""
)
)
Expand Down Expand Up @@ -193,7 +192,6 @@ def test_weather_section(self, prod_config):
== "/results/forcing/atmospheric/continental2.5/nemo_forcing/"
)
assert weather["file template"] == "hrdps_{:y%Ym%md%d}.nc"
assert weather["dask cluster"] == "tcp://142.103.36.12:4386"
assert (
weather["monitoring image"]
== "/results/nowcast-sys/figures/monitoring/wg.png"
Expand Down Expand Up @@ -239,20 +237,6 @@ def test_failure(self, run_type, caplog):
class TestGribToNetcdf:
"""Unit test for grib_to_netcdf() function."""

@staticmethod
@pytest.fixture
def mock_dask_client(monkeypatch):
def _mock_dask_client(address):
class MockClient:
def close(self):
pass

return MockClient()

monkeypatch.setattr(
grib_to_netcdf.dask.distributed, "Client", _mock_dask_client
)

@staticmethod
@pytest.fixture
def mock_open_dataset(monkeypatch):
Expand Down Expand Up @@ -336,7 +320,6 @@ def test_log_messages(
self,
run_type,
full_grid,
mock_dask_client,
mock_open_dataset,
mock_calc_nemo_var_ds,
mock_combine_by_coords,
Expand Down Expand Up @@ -365,7 +348,6 @@ def test_log_messages(
def test_nowcast_checklist(
self,
full_grid,
mock_dask_client,
mock_open_dataset,
mock_calc_nemo_var_ds,
mock_combine_by_coords,
Expand Down Expand Up @@ -399,7 +381,6 @@ def test_nowcast_checklist(
def test_forecast2_checklist(
self,
full_grid,
mock_dask_client,
mock_open_dataset,
mock_calc_nemo_var_ds,
mock_combine_by_coords,
Expand Down

0 comments on commit 352961a

Please sign in to comment.