Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load_ard() task graph creation limited by GeoBox/CRS hash function performance #1230

Closed
csiro-dave opened this issue Feb 16, 2022 · 13 comments · Fixed by #1242
Closed

load_ard() task graph creation limited by GeoBox/CRS hash function performance #1230

csiro-dave opened this issue Feb 16, 2022 · 13 comments · Fixed by #1242

Comments

@csiro-dave
Copy link

csiro-dave commented Feb 16, 2022

Expected behaviour

When I run load_ard() on a large region (the Murray Darling Basin) using distributed dask, I would like to be able to create up to 500,000 tasks for my workflow. That is, fully utilise the dask scheduler’s capacity before I parallelise the problem across multiple dask clusters/tiles/time slices etc.

Actual behaviour

On my simplified problem, using dask chunking (1x10240x10240) the production of the task graph (6144 tasks) in the distributed dask cluster takes around 16 seconds. On a real problem, I haven’t the patience to wait for it to finish.

As suggested in the dask manual, I have run the %prun over the persist function to see what is consuming the resources. The step that is taking most of the resources is the cull step of the task graph optimisation. The cull step identifies any tasks that don’t need to be executed and removes them from the task graph. When culling, dask stores the tasks that it identifies in a set. Python sets make use of object hashes to efficiently calculate which objects are in the intersection of those objects used vs those not used. However in the case of the simplified problem, calculating the hash of the CRS consumes most of the processing time (96%).

Why are the CRS hashes part of the task graph? As child of task dc_load_fmask-to_float, two lazy functions (datacube.api.core.fuse_lazy) include a GeoBox as an argument. The GeoBox is a composite object including a CRS. When GeoBox is hashed, then the hash CRS is required.

Why is CRS hashing slow? Open data cube CRS is a wrapper around PyProj CRS, which is a wrapper around Proj. ODC CRS considers CRS an immutable object defined by the value of its contents. When calculating the hash, it uses the hash of the WKT projection text. The underlying Proj library can calculate the text very quickly. However, PyProj function call is much slower. I think it might be due to the overhead of transferring data from C++ to Python. Or it could be creating new proj_context for each new CRS object (to handle multi-threading issues).

The implementation seems well motivated, so I have not been able to bring a solution. Perhaps during the step that generates multiple GeoBox objects, the CRS should be eagerly converted to a WKT for the purposes of the hash.

Steps to reproduce the behaviour

I have attached a Jupyter notebook.
geobox_hash_performance.zip
A collection of major lines are here:

query = {
# Site details
'latitude': (-24.5856075286866, -37.6786722912282),
'longitude': (138.566245999366, 152.491485595703),  
'time': ('1988-01-01', '1988-02-29'), 
'output_crs': 'EPSG:4326',
'resolution': (-0.0002698, 0.0002698)
}

dc= datacube.Datacube(app='dc-LS-extract')

# Read in Landsat images and bands
bands=['nbart_blue','nbart_green','nbart_red','nbart_nir','nbart_swir_1','nbart_swir_2']

chunks = {'time': 1, 'latitude': 512*2*10, 'longitude': 512*2*10}

ds = load_ard(dc=dc,
            products=['ga_ls5t_ard_3','ga_ls7e_ard_3', 'ga_ls8c_ard_3'],
            measurements=bands, **query, group_by='solar_day',
             dask_chunks= chunks
             )   
%prun ds.nbart_blue.persist()
773424 function calls (646389 primitive calls) in 15.597 seconds

Ordered by: internal time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
2664   15.096    0.006   15.128    0.006 {method 'to_wkt' of 'pyproj._crs.Base' objects}
52282/17594    0.056    0.000    3.474    0.000 utils.py:1669(stringify)
10315    0.042    0.000   10.197    0.001 core.py:159(keys_in_tasks)
3    0.035    0.012    5.145    1.715 optimization.py:429(fuse)

A small snippet to reproduce the pyproj performance (takes about 0.5 seconds):

from pyproj import CRS
from pyproj.enums import WktVersion
import timeit
crs = CRS.from_epsg(4326)
timeit.timeit(lambda:crs.to_wkt(),number=100)

A small snippet to eliminate the proj performance (takes 6 milliseconds)

#include <iostream>
#include <vector>
#include <string>
#include <chrono>
#include <proj.h>
#include <sqlite3.h>

using namespace std;

int main(void)
{
    const char *wkt;

    std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
    PJ_CONTEXT *m_ctxt = proj_context_create();
    auto crs = proj_create_from_database(m_ctxt, "EPSG", "4326",
                                         PJ_CATEGORY_CRS, false, nullptr);
    const char *const options[] = {"MULTILINE=NO", nullptr};
    for (int i = 0; i < 100; i++)
    { 
        wkt = proj_as_wkt(m_ctxt, crs, PJ_WKT1_GDAL, options);
    }

    std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();

    printf("%s", wkt);
    std::cout << "Time difference = " << std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count() << " millisecond" << std::endl;
}

Environment information

Open Data Cube core, version 1.8.6
pyproj info:
pyproj: 3.2.1
PROJ: 7.2.1
data dir: /usr/local/share/proj
user_data_dir: /home/jovyan/.local/share/proj

System:
python: 3.8.10 (default, Jun 2 2021, 10:49:15) [GCC 9.4.0]
executable: /env/bin/python
machine: Linux-4.14.256-197.484.amzn2.x86_64-x86_64-with-glibc2.29
Python deps:
certifi: 2021.10.08
pip: 21.3.1
setuptools: 58.5.3
Cython: 0.29.24

@csiro-dave
Copy link
Author

I hope this issue report is useful. Rob W said that Kirill888 was looking at refactoring the geometry codes & might have some insights.

I have done a little bit more tracing of the python/c++ calls. Calls to proj_to_wkt are slow because of overheads related to creating sqlite database contexts. The trace image is attached.
proj_as_wkt_profile

@snowman2
Copy link
Contributor

Related: pyproj4/pyproj#782

@Kirill888
Copy link
Member

I hope this issue report is useful. Rob W said that Kirill888 was looking at refactoring the geometry codes & might have some insights.

@csiro-dave thank you for this detailed analysis. I'm currently working on extracting that part of datacube into a separate library (https://github.com/opendatacube/odc-geo). There has been some significant clean up and refactor in there. I will create an issue in that repo. And will address it there first.

We are already caching pyproj CRS object as well as epsg code, so adding wkt to that cache should be relatively straightforward.

@cachetools.cached({})
def _make_crs(crs_str: str) -> Tuple[_CRS, Optional[int]]:
crs = _CRS.from_user_input(crs_str)
return (crs, crs.to_epsg())

Another quick to implement option is to use epsg code for hashing purposes when available and only fallback to wkt when isn't.

def __hash__(self) -> int:
return hash(self.to_wkt())

If you can modify above to something like:

    def __hash__(self) -> int:
        if self._epsg is not None:
            return self._epsg
        return hash(self.to_wkt())

and rerun your tests and see if that helps.

@snowman2 thanks for the links, I didn't realize that CRS objects and transformers had threading constraints. We should probably switch to thread-local caches then.

@snowman2
Copy link
Contributor

EPSG codes are likely worse than WKT. See #1223

I recommend using the srs attribute if you need performance: https://pyproj4.github.io/pyproj/stable/api/crs/crs.html#pyproj.crs.CRS.srs

@Kirill888
Copy link
Member

EPSG codes are likely worse than WKT. See #1223

thanks for the info, but we always have them extracted at construction and they are cached unlike wkt. In the Dask context there is really only one CRS object, so this can be solved with caching very well. I don't think scenarios with a large number of different CRS objects are common, so we should try improve caching first and then look at pyproj context.

@snowman2
Copy link
Contributor

I didn't realize that CRS objects and transformers had threading constraints. We should probably switch to thread-local caches then.

They shouldn't have issues now: pyproj4/pyproj#782. It should create a new CRS object for each thread for you.

@Kirill888
Copy link
Member

They shouldn't have issues now: pyproj4/pyproj#782. It should create a new CRS object for each thread for you.

As in, one global pyproj.CRS object will create a thread-local internal object when accessed from a different thread than where it was originally created?

@snowman2
Copy link
Contributor

snowman2 commented Feb 16, 2022

As in, one global pyproj.CRS object will create a thread-local internal object when accessed from a different thread than where it was originally created?

Correct: pyproj4/pyproj#793

EDIT: requires pyproj 3.1+

@csiro-dave
Copy link
Author

Thank you snowman2 and Kirill888! It looks like pyproj.set_use_global_context(True) makes a huge difference. There is an overhead to re-creating the database context at the proj level. At the pyproj level, the thread protections are perhaps a little aggressive if they need to create a new database context for every method call on an object. The example above that took 0.5 seconds is now 0.001 seconds. The 16 second persist call now takes 0.7 seconds. Hooray!

from pyproj import CRS
from pyproj.enums import WktVersion
import timeit
pyproj.set_use_global_context(True)
crs = CRS.from_epsg(4326)
timeit.timeit(lambda:crs.to_wkt(),number=100)

Some thoughts on the hash/equals of the CRS. I don't think GeoBox needs to include CRS in its hash function. If it has the same shape and transform, it is most likely going to be the same. In the rare case they are different, it will call equals a few extra times. So it might make sense to change

    def __hash__(self):
        return hash((*self._shape, self._crs, self._affine))

to

    def __hash__(self):
        return hash((*self._shape, self._affine))

I don't currently have an environment set up to test the datacube source changes, but they look like they could work too.

@Kirill888
Copy link
Member

regarding hash change on geobox class, good point, but maybe replace self._crs with self._crs.epsg then hash collision will only happen for non-epsg geoboxes of the same shape/transform. crs.epsg is either None or int and is just a plain python property lookup without calling into pyproj.

@RichardScottOZ
Copy link
Contributor

Thanks Alan, useful to know.

Kirill888 added a commit that referenced this issue Mar 8, 2022
This is mostly to address slow hashing issue #1230. But also tackles slow
construction described in #1222 without moving away from epsg representation as
a default when available.
Kirill888 added a commit that referenced this issue Mar 8, 2022
This is mostly to address slow hashing issue #1230. But also tackles slow
construction described in #1222 without moving away from epsg representation as
a default when available.
@csiro-dave
Copy link
Author

Thanks Kirill888, snowman2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants