Skip to content

Commit

Permalink
REF: Multithread safe Transformer (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
snowman2 authored Mar 19, 2021
1 parent 8a19f39 commit 95ec69f
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Change Log
3.1.0
-----
* DEP: Minimum supported Python version 3.7 (issue #790)
* REF: Multithread safe CRS (issue #782)
* REF: Multithread safe CRS, Proj, & Transformer (issue #782)
* BUG: Disallow NaN values with AreaOfInterest & BBox (issue #788)
* ENH: Pretty format PROJ string support (issue #764)
* ENH: Added :meth:`pyproj.transformer.Transformer.to_proj4` (pull #798)
Expand Down
8 changes: 4 additions & 4 deletions pyproj/proj.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import warnings
from typing import Any, Optional, Tuple, Type

from pyproj._transformer import Factors, _Transformer
from pyproj.compat import cstrencode, pystrdecode
from pyproj._transformer import Factors
from pyproj.compat import pystrdecode
from pyproj.crs import CRS
from pyproj.enums import TransformDirection
from pyproj.list import get_proj_operations_map
from pyproj.transformer import Transformer
from pyproj.transformer import Transformer, TransformerFromPipeline
from pyproj.utils import _convertback, _copytobuffer

pj_list = get_proj_operations_map()
Expand Down Expand Up @@ -131,7 +131,7 @@ def __init__(
projstring = self.crs.to_proj4() or self.crs.srs

self.srs = re.sub(r"\s\+?type=crs", "", projstring).strip()
super().__init__(_Transformer.from_pipeline(cstrencode(self.srs)))
super().__init__(TransformerFromPipeline(self.srs))

def __call__(
self,
Expand Down
140 changes: 132 additions & 8 deletions pyproj/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
"TransformerGroup",
"AreaOfInterest",
]
import threading
import warnings
from abc import ABC, abstractmethod
from array import array
from dataclasses import dataclass
from itertools import chain, islice
from pathlib import Path
from typing import Any, Iterable, Iterator, List, Optional, Tuple, Union
Expand All @@ -31,12 +34,102 @@
from pyproj.utils import _convertback, _copytobuffer


class TransformerMaker(ABC):
"""
.. versionadded:: 3.1
Base class for generating new instances
of the Cython _Transformer class for
thread safety in the Transformer class.
"""

@abstractmethod
def __call__(self) -> _Transformer:
"""
Returns
-------
_Transformer
"""
raise NotImplementedError


@dataclass(frozen=True)
class TransformerUnsafe(TransformerMaker):
"""
.. versionadded:: 3.1
Returns the original Cython _Transformer
and is not thread-safe.
"""

transformer: _Transformer

def __call__(self) -> _Transformer:
"""
Returns
-------
_Transformer
"""
return self.transformer


@dataclass(frozen=True)
class TransformerFromCRS(TransformerMaker):
"""
.. versionadded:: 3.1
Generates a Cython _Transformer class from input CRS data.
"""

crs_from: CRS
crs_to: CRS
skip_equivalent: bool
always_xy: bool
area_of_interest: Optional[AreaOfInterest]

def __call__(self) -> _Transformer:
"""
Returns
-------
_Transformer
"""
return _Transformer.from_crs(
self.crs_from._crs,
self.crs_to._crs,
skip_equivalent=self.skip_equivalent,
always_xy=self.always_xy,
area_of_interest=self.area_of_interest,
)


@dataclass(frozen=True)
class TransformerFromPipeline(TransformerMaker):
"""
.. versionadded:: 3.1
Generates a Cython _Transformer class from input pipeline data.
"""

proj_pipeline: str

def __call__(self) -> _Transformer:
"""
Returns
-------
_Transformer
"""
return _Transformer.from_pipeline(cstrencode(self.proj_pipeline))


class TransformerGroup(_TransformerGroup):
"""
The TransformerGroup is a set of possible transformers from one CRS to another.
.. versionadded:: 2.3.0
.. warning:: CoordinateOperation and Transformer objects
returned are not thread-safe.
From PROJ docs::
The operations are sorted with the most relevant ones first: by
Expand Down Expand Up @@ -86,7 +179,7 @@ def __init__(
area_of_interest=area_of_interest,
)
for iii, transformer in enumerate(self._transformers):
self._transformers[iii] = Transformer(transformer)
self._transformers[iii] = Transformer(TransformerUnsafe(transformer))

@property
def transformers(self) -> List["Transformer"]:
Expand Down Expand Up @@ -168,6 +261,18 @@ def __repr__(self) -> str:
)


class TransformerLocal(threading.local):
"""
Threading local instance for cython _Transformer class.
For more details, see:
https://github.com/pyproj4/pyproj/issues/782
"""

def __init__(self):
self.transformer = None # Initialises in each thread


class Transformer:
"""
The Transformer class is for facilitating re-using
Expand All @@ -180,14 +285,33 @@ class Transformer:
"""

def __init__(self, base_transformer: Optional[_Transformer] = None) -> None:
if not isinstance(base_transformer, _Transformer):
def __init__(
self,
transformer_maker: Union[TransformerMaker, None] = None,
) -> None:
if not isinstance(transformer_maker, TransformerMaker):
ProjError.clear()
raise ProjError(
"Transformer must be initialized using: "
"'from_crs', 'from_pipeline', or 'from_proj'."
)
self._transformer = base_transformer

self._local = TransformerLocal()
self._local.transformer = transformer_maker()
self._transformer_maker = transformer_maker

@property
def _transformer(self):
"""
The Cython _Transformer object for this thread.
Returns
-------
_Transformer
"""
if self._local.transformer is None:
self._local.transformer = self._transformer_maker()
return self._local.transformer

@property
def name(self) -> str:
Expand Down Expand Up @@ -370,9 +494,9 @@ def from_crs(
"""
return Transformer(
_Transformer.from_crs(
CRS.from_user_input(crs_from)._crs,
CRS.from_user_input(crs_to)._crs,
TransformerFromCRS(
CRS.from_user_input(crs_from),
CRS.from_user_input(crs_to),
skip_equivalent=skip_equivalent,
always_xy=always_xy,
area_of_interest=area_of_interest,
Expand All @@ -395,7 +519,7 @@ def from_pipeline(proj_pipeline: str) -> "Transformer":
Transformer
"""
return Transformer(_Transformer.from_pipeline(cstrencode(proj_pipeline)))
return Transformer(TransformerFromPipeline(proj_pipeline))

def transform(
self,
Expand Down
16 changes: 16 additions & 0 deletions test/test_proj.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import concurrent.futures
import math
import os
import sys
Expand Down Expand Up @@ -565,3 +566,18 @@ def test_radians():
proj(math.radians(-145.5), math.radians(1.0), radians=True),
(-5632642.22547495, 1636571.4883145525),
)


@pytest.mark.skipif(
pyproj._datadir._USE_GLOBAL_CONTEXT, reason="Global Context not Threadsafe."
)
def test_proj_multithread():
# https://github.com/pyproj4/pyproj/issues/782
trans = Proj("EPSG:3857")

def transform(num):
return trans(1, 2)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for result in executor.map(transform, range(10)):
pass
34 changes: 34 additions & 0 deletions test/test_transformer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import os
from functools import partial
from glob import glob
Expand Down Expand Up @@ -1117,3 +1118,36 @@ def test_transformer_group__download_grids__directory(
],
any_order=True,
)


@pytest.mark.skipif(
pyproj._datadir._USE_GLOBAL_CONTEXT, reason="Global Context not Threadsafe."
)
def test_transformer_multithread__pipeline():
# https://github.com/pyproj4/pyproj/issues/782
trans = Transformer.from_pipeline(
"+proj=pipeline +step +inv +proj=cart +ellps=WGS84 "
"+step +proj=unitconvert +xy_in=rad +xy_out=deg"
)

def transform(num):
return trans.transform(-2704026.010, -4253051.810, 3895878.820)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for result in executor.map(transform, range(10)):
pass


@pytest.mark.skipif(
pyproj._datadir._USE_GLOBAL_CONTEXT, reason="Global Context not Threadsafe."
)
def test_transformer_multithread__crs():
# https://github.com/pyproj4/pyproj/issues/782
trans = Transformer.from_crs(4326, 3857)

def transform(num):
return trans.transform(1, 2)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for result in executor.map(transform, range(10)):
pass

0 comments on commit 95ec69f

Please sign in to comment.