Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intermittent test failure (datacube geometry hashing for dask?) #123

Closed
benjimin opened this issue Sep 22, 2021 · 5 comments
Closed

Intermittent test failure (datacube geometry hashing for dask?) #123

benjimin opened this issue Sep 22, 2021 · 5 comments

Comments

@benjimin
Copy link
Collaborator

benjimin commented Sep 22, 2021

Intermittent build test failure, where test_api.py::test_wit elicits a UnicodeDecodeError (invalid start bytes for utf8) from pyproj.

The trace involves witprocess cal_area, dask core, and datacube geometry __hash__ and to_wkt.

Previously noted in #122 (comment)

@benjimin benjimin changed the title Intermittent test failure (geometry hashing for dask?) Intermittent test failure (datacube geometry hashing for dask?) Sep 22, 2021
@benjimin
Copy link
Collaborator Author

datacube_wps/processes/witprocess.py:60: in process_data
    re_wit = cal_area(aggregated)
datacube_wps/processes/witprocess.py:212: in cal_area
    re = pd.merge(re, ((aggregated.TCW > wet_threshold).astype('int')
/env/lib/python3.8/site-packages/xarray/core/dataarray.py:929: in load
    ds = self._to_temp_dataset().load(**kwargs)
/env/lib/python3.8/site-packages/xarray/core/dataset.py:865: in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
/env/lib/python3.8/site-packages/dask/base.py:570: in compute
    results = schedule(dsk, keys, **kwargs)
/env/lib/python3.8/site-packages/dask/threaded.py:79: in get
    results = get_async(
/env/lib/python3.8/site-packages/dask/local.py:517: in get_async
    raise_exception(exc, tb)
...
/env/lib/python3.8/site-packages/dask/core.py:122: in _execute_task
    elif not ishashable(arg):
/env/lib/python3.8/site-packages/dask/core.py:20: in ishashable
    hash(x)
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:1069: in __hash__
    return hash((*self.shape, self.crs, self.affine))
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:259: in __hash__
    return hash(self.to_wkt())
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:190: in to_wkt
    return self._crs.to_wkt(pretty=pretty, version=version)
pyproj/_crs.pyx:457: in pyproj._crs.Base.to_wkt
pyproj/_crs.pyx:120: in pyproj._crs._to_wkt
pyproj/_crs.pyx:24: in pyproj._crs.cstrdecode
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc1 in position 3: invalid start byte
Click for full error log
=================================== FAILURES ===================================
___________________________________ test_wit ___________________________________

    def test_wit():
        catalog = read_process_catalog("datacube-wps-config.yaml")
        wit_proc = [entry for entry in catalog if isinstance(entry, WIT)][0]
        poly = Geometry(
            {
                "type": "Polygon",
                "coordinates": [
                    [
                        (147.28271484375003, -35.89238773935897),
                        (147.03277587890628, -35.663990911348115),
                        (146.65237426757815, -35.90684930677119),
                        (147.09182739257815, -36.15894422111004),
                        (147.28271484375003, -35.89238773935897),
                    ]
                ],
            },
            crs=CRS("EPSG:4326"),
        )
>       results = wit_proc.query_handler(time=("2019-03-05", "2019-07-10"), feature=poly)

tests/test_api.py:88: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
datacube_wps/processes/__init__.py:65: in log_wrapper
    result = func(*args, **kwargs)
datacube_wps/processes/__init__.py:520: in query_handler
    df = self.process_data(data, {"time": time, "feature": feature, **parameters})
datacube_wps/processes/__init__.py:65: in log_wrapper
    result = func(*args, **kwargs)
datacube_wps/processes/witprocess.py:60: in process_data
    re_wit = cal_area(aggregated)
datacube_wps/processes/witprocess.py:212: in cal_area
    re = pd.merge(re, ((aggregated.TCW > wet_threshold).astype('int')
/env/lib/python3.8/site-packages/xarray/core/dataarray.py:929: in load
    ds = self._to_temp_dataset().load(**kwargs)
/env/lib/python3.8/site-packages/xarray/core/dataset.py:865: in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
/env/lib/python3.8/site-packages/dask/base.py:570: in compute
    results = schedule(dsk, keys, **kwargs)
/env/lib/python3.8/site-packages/dask/threaded.py:79: in get
    results = get_async(
/env/lib/python3.8/site-packages/dask/local.py:517: in get_async
    raise_exception(exc, tb)
/env/lib/python3.8/site-packages/dask/local.py:325: in reraise
    raise exc
/env/lib/python3.8/site-packages/dask/local.py:223: in execute_task
    result = _execute_task(task, data)
/env/lib/python3.8/site-packages/dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
/env/lib/python3.8/site-packages/dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
/env/lib/python3.8/site-packages/dask/core.py:122: in _execute_task
    elif not ishashable(arg):
/env/lib/python3.8/site-packages/dask/core.py:20: in ishashable
    hash(x)
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:1069: in __hash__
    return hash((*self.shape, self.crs, self.affine))
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:259: in __hash__
    return hash(self.to_wkt())
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:190: in to_wkt
    return self._crs.to_wkt(pretty=pretty, version=version)
pyproj/_crs.pyx:457: in pyproj._crs.Base.to_wkt
    ???
pyproj/_crs.pyx:120: in pyproj._crs._to_wkt
    ???
pyproj/_crs.pyx:24: in pyproj._crs.cstrdecode
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

cstr = b'`4F\xc1\xbe\x7f'

    def pystrdecode(cstr):
        """
        Decode a string to a python string.
        """
        try:
>           return cstr.decode("utf-8")
E           UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc1 in position 3: invalid start byte

/env/lib/python3.8/site-packages/pyproj/compat.py:23: UnicodeDecodeError
----------------------------- Captured stdout call -----------------------------
mask all touch True
query_handler self: <datacube_wps.processes.witprocess.WIT object at 0x7fbec793ab50>
query_handler time: ('2019-03-05', '2019-07-10')
query_handler feature: Geometry({'type': 'Polygon', 'coordinates': (((147.28271484375003, -35.89238773935897), (147.03277587890628, -35.663990911348115), (146.65237426757815, -35.90684930677119), (147.09182739257815, -36.15894422111004), (147.28271484375003, -35.89238773935897)),)}, CRS('EPSG:4326'))
process_data self: <datacube_wps.processes.witprocess.WIT object at 0x7fbec793ab50>
process_data data: <xarray.Dataset>
Dimensions:      (time: 8, y: 1841, x: 1890)
Coordinates:
  * time         (time) datetime64[ns] 2019-03-09T23:56:30.301619 ... 2019-06...
  * y            (y) float64 -3.976e+06 -3.976e+06 ... -4.031e+06 -4.031e+06
  * x            (x) float64 1.319e+06 1.319e+06 ... 1.376e+06 1.376e+06
    spatial_ref  int32 3577
Data variables:
    bs           (time, y, x) uint8 dask.array<chunksize=(1, 1841, 1890), meta=np.ndarray>
    pv           (time, y, x) uint8 dask.array<chunksize=(1, 1841, 1890), meta=np.ndarray>
    npv          (time, y, x) uint8 dask.array<chunksize=(1, 1841, 1890), meta=np.ndarray>
    TCW          (time, y, x) float32 dask.array<chunksize=(1, 1841, 1890), meta=np.ndarray>
    water        (time, y, x) int16 dask.array<chunksize=(1, 1841, 1890), meta=np.ndarray>
Attributes:
    crs:      EPSG:3577
process_data parameters: {'time': ('2019-03-05', '2019-07-10'), 'feature': Geometry(POLYGON ((147.28271484375 -35.89238773935897, 147.0327758789063 -35.66399091134812, 146.6523742675782 -35.90684930677119, 147.0918273925782 -36.15894422111004, 147.28271484375 -35.89238773935897)), EPSG:4326)}
feature in wit Geometry({'type': 'Polygon', 'coordinates': (((147.28271484375003, -35.89238773935897), (147.03277587890628, -35.663990911348115), (146.65237426757815, -35.90684930677119), (147.09182739257815, -36.15894422111004), (147.28271484375003, -35.89238773935897)),)}, CRS('EPSG:4326'))
geobox of data GeoBox(Geometry({'type': 'Polygon', 'coordinates': (((146.62347849895943, -35.70259916046566), (146.69357389976432, -36.19700984174693), (147.31910391574698, -36.1370963930901), (147.24608026203015, -35.64305237905194), (146.62347849895943, -35.70259916046566)),)}, CRS('EPSG:4326')))
polygon area 1744720

@benjimin
Copy link
Collaborator Author

benjimin commented Sep 23, 2021

Intermittent and definitely not deterministic. On different occurrences, the same error relates to a slightly different byte sequence.

cstr = b' \xf16\\\t\x7f'
...
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf1 in position 1: invalid continuation byte

I'm not sure if this could be because of some kind of thread race in what dask is doing, or if it simply means there is a memory error (like a buffer overflow or similar corruption).

@woodcockr
Copy link
Member

I’ve not looked at the code but proj CRS are not thread safe and you error looks very much like that problem - you can’t create CRS objects and then pass them into a Dask worker. You will need to modify the code to create the CRS object IN the thread using it. I usually pass the CRS epsg code in and then create the CRS in the code running on the worker.

@benjimin
Copy link
Collaborator Author

benjimin commented Sep 23, 2021

It seems this was a known symptom of pyproj not being threadsafe (and had impacted other dask applications).

It looks like this was supposed to be mostly fixed in pyproj 3.1.0 earlier this year.

...and looks like the build was using pyproj 2.6.1.

benjimin added a commit that referenced this issue Sep 23, 2021
To address #123 wish to use pyproj 3.1.0+
(i.e. make CRS threadsafe enough for dask datacube xarray operations)
@benjimin
Copy link
Collaborator Author

Now, how to verify a fix of an intermittent problem...

I estimate the fault was previously occurring in around 10% of builds. So could probably force it to occur, by wrapping the pytest invocation in a bash for loop (say try 50 repeats), if further investigation warranted.

Tentatively closing as an upstream issue; build is currently using pyproj 3.2.1.

emmaai pushed a commit that referenced this issue Oct 26, 2021
To address #123 wish to use pyproj 3.1.0+
(i.e. make CRS threadsafe enough for dask datacube xarray operations)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants