You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello,
I'm using Dask to chunk xarrays and pass them into the metpy.calc.wet_bulb_temperature function and am getting a cryptic error message that I believe is due to something breaking internally in the wetbulb function. I understand from #1479 that Dask integration is an ongoing project, so my intention is just to document this issue.
import metpy.calc
import xarray as xr
from dask.distributed import Client, LocalCluster
# Setup adaptive dask client
cluster = LocalCluster(dashboard_address=':8901',memory_limit='5GB',threads_per_worker=1 )
cluster.adapt(minimum=1, maximum=10) # scale between 0 and 10 workers
client=Client(cluster)
# Load pressure, temperature, and water vapor mixing ratio
def init_data(chunks_dict):
ds_press = xr.open_dataset('cesmLE1/atm/proc/tseries/monthly/PS/b.e11.B20TRC5CNBDRD.f09_g16.001.cam.h0.PS.185001-200512.nc', chunks=chunks_dict)
ds_temp = xr.open_dataset('cesmLE1/atm/proc/tseries/daily/TREFHT/b.e11.BRCP85C5CNBDRD.f09_g16.001.cam.h1.TREFHT.20810101-21001231.nc', chunks=chunks_dict)
ds_qbot = xr.open_dataset('cesmLE1/atm/proc/tseries/daily/QBOT/b.e11.BRCP85C5CNBDRD.f09_g16.001.cam.h1.QBOT.20810101-21001231.nc', chunks=chunks_dict)
return ds_temp, ds_press, ds_qbot
temp, press, qbot = init_data(chunks_dict={'lat':20,'lon':20}) # Partition 20x20 chunks
# Get time mean of pressure
press_mean = press1.mean('time')
press_mean['PS'] = press_mean.PS.assign_attrs(units='Pa')
# Calculate vapor pressure and then dewpoint
vapor_pressure = metpy.calc.vapor_pressure(press_mean['PS'], qbot['QBOT'])
dewpoint = metpy.calc.dewpoint(vapor_pressure)
# Calculate wetbulb
wetbulb = metpy.calc.wet_bulb_temperature(press_mean['PS'], temp['TREFHT'], dewpoint)
The dask cluster works fine when calculating dewpoint and for the first set of metpy.calc.wet_bulb_temperature processes like opening the datasets, boradcasting, etc, but then it freezes progress at the process shown in the screenshot and returns the following error:
/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/dask/array/core.py:1395: FutureWarning: The `numpy.place` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
FutureWarning,
/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/dask/array/core.py:1395: FutureWarning: The `numpy.place` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
FutureWarning,
/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/dask/array/core.py:1395: FutureWarning: The `numpy.place` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
FutureWarning,
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
The process actually keeps running after this, but the dask interface shows that none of the workers are being used.
Can I just check quickly - metpy v1.0.1 doesn't use Dask yet for wet bulb temperature calculations does it? Because none of my workers are doing anything when I use it (passing in chunked xarray DataArrays).
@aurelgriesser Due to the current implementation of moist_lapse, wet_bulb_temperature is iterating point-by-point through the arrays using nditer; there's probably any number of reasons in that chain of calls where the Dask arrays end up converted into plain numpy arrays. We definitely want to improve this is the somewhat near future.
Can I just check quickly - metpy v1.0.1 doesn't use Dask yet for wet bulb temperature calculations does it? Because none of my workers are doing anything when I use it (passing in chunked xarray DataArrays).
@aurelgriesser To speed up the wetbulb calculations I ended up calculating wetbulb over a range of reasonable temperature, pressure, and dewpoint. The result is a 3D volume of calculated wetbulb temperatures that can be queried for any combination of temp, pressure, and dewpoint.
Hello,
I'm using Dask to chunk xarrays and pass them into the
metpy.calc.wet_bulb_temperature
function and am getting a cryptic error message that I believe is due to something breaking internally in the wetbulb function. I understand from #1479 that Dask integration is an ongoing project, so my intention is just to document this issue.The dask cluster works fine when calculating dewpoint and for the first set of
metpy.calc.wet_bulb_temperature
processes like opening the datasets, boradcasting, etc, but then it freezes progress at the process shown in the screenshot and returns the following error:The process actually keeps running after this, but the dask interface shows that none of the workers are being used.
Relevant packages in my Conda environment:
The text was updated successfully, but these errors were encountered: