From c7670ed3c84145be8a351a574b3c6eeffd647291 Mon Sep 17 00:00:00 2001 From: snowman2 Date: Wed, 17 Mar 2021 20:11:58 -0500 Subject: [PATCH] REF: Multithread safe Transformer --- docs/history.rst | 2 +- pyproj/proj.py | 8 +-- pyproj/transformer.py | 140 ++++++++++++++++++++++++++++++++++++--- test/test_proj.py | 16 +++++ test/test_transformer.py | 34 ++++++++++ 5 files changed, 187 insertions(+), 13 deletions(-) diff --git a/docs/history.rst b/docs/history.rst index 47b29825e..e5bd4c779 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -4,7 +4,7 @@ Change Log 3.1.0 ----- * DEP: Minimum supported Python version 3.7 (issue #790) -* REF: Multithread safe CRS (issue #782) +* REF: Multithread safe CRS, Proj, & Transformer (issue #782) * BUG: Disallow NaN values with AreaOfInterest & BBox (issue #788) * ENH: Pretty format PROJ string support (issue #764) * ENH: Added :meth:`pyproj.transformer.Transformer.to_proj4` (pull #798) diff --git a/pyproj/proj.py b/pyproj/proj.py index 4b9fca329..ccbb84b21 100644 --- a/pyproj/proj.py +++ b/pyproj/proj.py @@ -18,12 +18,12 @@ import warnings from typing import Any, Optional, Tuple, Type -from pyproj._transformer import Factors, _Transformer -from pyproj.compat import cstrencode, pystrdecode +from pyproj._transformer import Factors +from pyproj.compat import pystrdecode from pyproj.crs import CRS from pyproj.enums import TransformDirection from pyproj.list import get_proj_operations_map -from pyproj.transformer import Transformer +from pyproj.transformer import Transformer, TransformerFromPipeline from pyproj.utils import _convertback, _copytobuffer pj_list = get_proj_operations_map() @@ -131,7 +131,7 @@ def __init__( projstring = self.crs.to_proj4() or self.crs.srs self.srs = re.sub(r"\s\+?type=crs", "", projstring).strip() - super().__init__(_Transformer.from_pipeline(cstrencode(self.srs))) + super().__init__(TransformerFromPipeline(self.srs)) def __call__( self, diff --git a/pyproj/transformer.py b/pyproj/transformer.py index 6f00feb84..6dc2ab3ff 100644 --- a/pyproj/transformer.py +++ b/pyproj/transformer.py @@ -9,8 +9,11 @@ "TransformerGroup", "AreaOfInterest", ] +import threading import warnings +from abc import ABC, abstractmethod from array import array +from dataclasses import dataclass from itertools import chain, islice from pathlib import Path from typing import Any, Iterable, Iterator, List, Optional, Tuple, Union @@ -31,12 +34,102 @@ from pyproj.utils import _convertback, _copytobuffer +class TransformerMaker(ABC): + """ + .. versionadded:: 3.1 + + Base class for generating new instances + of the Cython _Transformer class for + thread safety in the Transformer class. + """ + + @abstractmethod + def __call__(self) -> _Transformer: + """ + Returns + ------- + _Transformer + """ + raise NotImplementedError + + +@dataclass(frozen=True) +class TransformerUnsafe(TransformerMaker): + """ + .. versionadded:: 3.1 + + Returns the original Cython _Transformer + and is not thread-safe. + """ + + transformer: _Transformer + + def __call__(self) -> _Transformer: + """ + Returns + ------- + _Transformer + """ + return self.transformer + + +@dataclass(frozen=True) +class TransformerFromCRS(TransformerMaker): + """ + .. versionadded:: 3.1 + + Generates a Cython _Transformer class from input CRS data. + """ + + crs_from: CRS + crs_to: CRS + skip_equivalent: bool + always_xy: bool + area_of_interest: Optional[AreaOfInterest] + + def __call__(self) -> _Transformer: + """ + Returns + ------- + _Transformer + """ + return _Transformer.from_crs( + self.crs_from._crs, + self.crs_to._crs, + skip_equivalent=self.skip_equivalent, + always_xy=self.always_xy, + area_of_interest=self.area_of_interest, + ) + + +@dataclass(frozen=True) +class TransformerFromPipeline(TransformerMaker): + """ + .. versionadded:: 3.1 + + Generates a Cython _Transformer class from input pipeline data. + """ + + proj_pipeline: str + + def __call__(self) -> _Transformer: + """ + Returns + ------- + _Transformer + """ + return _Transformer.from_pipeline(cstrencode(self.proj_pipeline)) + + class TransformerGroup(_TransformerGroup): """ The TransformerGroup is a set of possible transformers from one CRS to another. .. versionadded:: 2.3.0 + .. warning:: CoordinateOperation and Transformer objects + returned are not thread-safe. + From PROJ docs:: The operations are sorted with the most relevant ones first: by @@ -86,7 +179,7 @@ def __init__( area_of_interest=area_of_interest, ) for iii, transformer in enumerate(self._transformers): - self._transformers[iii] = Transformer(transformer) + self._transformers[iii] = Transformer(TransformerUnsafe(transformer)) @property def transformers(self) -> List["Transformer"]: @@ -168,6 +261,18 @@ def __repr__(self) -> str: ) +class TransformerLocal(threading.local): + """ + Threading local instance for cython _Transformer class. + + For more details, see: + https://github.com/pyproj4/pyproj/issues/782 + """ + + def __init__(self): + self.transformer = None # Initialises in each thread + + class Transformer: """ The Transformer class is for facilitating re-using @@ -180,14 +285,33 @@ class Transformer: """ - def __init__(self, base_transformer: Optional[_Transformer] = None) -> None: - if not isinstance(base_transformer, _Transformer): + def __init__( + self, + transformer_maker: Union[TransformerMaker, None] = None, + ) -> None: + if not isinstance(transformer_maker, TransformerMaker): ProjError.clear() raise ProjError( "Transformer must be initialized using: " "'from_crs', 'from_pipeline', or 'from_proj'." ) - self._transformer = base_transformer + + self._local = TransformerLocal() + self._local.transformer = transformer_maker() + self._transformer_maker = transformer_maker + + @property + def _transformer(self): + """ + The Cython _Transformer object for this thread. + + Returns + ------- + _Transformer + """ + if self._local.transformer is None: + self._local.transformer = self._transformer_maker() + return self._local.transformer @property def name(self) -> str: @@ -370,9 +494,9 @@ def from_crs( """ return Transformer( - _Transformer.from_crs( - CRS.from_user_input(crs_from)._crs, - CRS.from_user_input(crs_to)._crs, + TransformerFromCRS( + CRS.from_user_input(crs_from), + CRS.from_user_input(crs_to), skip_equivalent=skip_equivalent, always_xy=always_xy, area_of_interest=area_of_interest, @@ -395,7 +519,7 @@ def from_pipeline(proj_pipeline: str) -> "Transformer": Transformer """ - return Transformer(_Transformer.from_pipeline(cstrencode(proj_pipeline))) + return Transformer(TransformerFromPipeline(proj_pipeline)) def transform( self, diff --git a/test/test_proj.py b/test/test_proj.py index f0a90457e..09360784b 100644 --- a/test/test_proj.py +++ b/test/test_proj.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import concurrent.futures import math import os import sys @@ -565,3 +566,18 @@ def test_radians(): proj(math.radians(-145.5), math.radians(1.0), radians=True), (-5632642.22547495, 1636571.4883145525), ) + + +@pytest.mark.skipif( + pyproj._datadir._USE_GLOBAL_CONTEXT, reason="Global Context not Threadsafe." +) +def test_proj_multithread(): + # https://github.com/pyproj4/pyproj/issues/782 + trans = Proj("EPSG:3857") + + def transform(num): + return trans(1, 2) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + for result in executor.map(transform, range(10)): + pass diff --git a/test/test_transformer.py b/test/test_transformer.py index 9e4435085..e45e7c2d0 100644 --- a/test/test_transformer.py +++ b/test/test_transformer.py @@ -1,3 +1,4 @@ +import concurrent.futures import os from functools import partial from glob import glob @@ -1117,3 +1118,36 @@ def test_transformer_group__download_grids__directory( ], any_order=True, ) + + +@pytest.mark.skipif( + pyproj._datadir._USE_GLOBAL_CONTEXT, reason="Global Context not Threadsafe." +) +def test_transformer_multithread__pipeline(): + # https://github.com/pyproj4/pyproj/issues/782 + trans = Transformer.from_pipeline( + "+proj=pipeline +step +inv +proj=cart +ellps=WGS84 " + "+step +proj=unitconvert +xy_in=rad +xy_out=deg" + ) + + def transform(num): + return trans.transform(-2704026.010, -4253051.810, 3895878.820) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + for result in executor.map(transform, range(10)): + pass + + +@pytest.mark.skipif( + pyproj._datadir._USE_GLOBAL_CONTEXT, reason="Global Context not Threadsafe." +) +def test_transformer_multithread__crs(): + # https://github.com/pyproj4/pyproj/issues/782 + trans = Transformer.from_crs(4326, 3857) + + def transform(num): + return trans.transform(1, 2) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + for result in executor.map(transform, range(10)): + pass