diff --git a/pyresample/spherical.py b/pyresample/spherical.py
index f6c715072..3a7c04dc3 100644
--- a/pyresample/spherical.py
+++ b/pyresample/spherical.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2013 - 2021 Pyresample developers
+# Copyright (c) 2013 - 2022 Pyresample developers
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,29 +15,346 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Some generalized spherical functions.
+"""Spherical Geometry Classes for spherical computation à la Shapely sytle.
-base type is a numpy array of size (n, 2) (2 for lon and lats)
+Base type is a numpy array of size (n, 2) (2 for lon and lats)
+(lon,lat) units are expected as radians.
+import copy
import logging
+import warnings
import numpy as np
+import pyproj
+from shapely.geometry import MultiLineString # LineString
+from shapely.geometry import MultiPolygon, Polygon
+from pyresample.utils.shapely import (
+ _check_valid_extent,
+ _close_vertices,
+ _get_extent_from_vertices,
+ _unclose_vertices,
+ bounds_from_extent,
+ get_antimeridian_safe_line_vertices,
+ get_idx_antimeridian_crossing,
+ get_non_overlapping_list_extents,
+ remove_duplicated_vertices,
logger = logging.getLogger(__name__)
+EPSILON = 0.0000001
+# ----------------------------------------------------------------------------.
+def unwrap_longitude_degree(x, period=360):
+ """Unwrap longitude array."""
+ x = np.asarray(x)
+ mod = period / 2
+ return (x + mod) % (2 * mod) - mod
+def unwrap_radians(x, period=2 * np.pi):
+ """Unwrap radians array between -period/2 and period/2."""
+ x = np.asarray(x)
+ mod = period / 2
+ return (x + mod) % (2 * mod) - mod
+def _hdistance_matrix(vertices, ref_vertices):
+ """Compute a distance matrix between vertices using haverstine formula.
+ Vertices matrix must have shape (n,2) with (lon,lat) in radians !
+ The returned distance matrix has shape n_vertices x n_ref_vertices)
+ """
+ lon = vertices[:, 0]
+ lat = vertices[:, 1]
+ ref_lat = ref_vertices[:, 1]
+ diff_lon = lon[:, None] - ref_vertices[:, 0]
+ diff_lat = lat[:, None] - ref_vertices[:, 1]
+ d = np.sin(diff_lat / 2)**2 + np.cos(lat[:, None]) * np.cos(ref_lat) * np.sin(diff_lon / 2)**2
+ return d
+def get_list_extent_from_vertices(vertices, radians=False):
+ """Get list of extent for SExtent creation."""
+ vertices = _close_vertices(vertices)
+ cross_indices = get_idx_antimeridian_crossing(vertices, radians=radians)
+ if len(cross_indices) == 0:
+ return [_get_extent_from_vertices(vertices)]
+ else:
+ if radians:
+ vertices = np.rad2deg(vertices)
+ # Hereafter, computations are done in degrees
+ l_extent = []
+ list_vertices = get_antimeridian_safe_line_vertices(vertices)
+ list_vertices[0] = np.vstack((list_vertices[-1], list_vertices[0]))
+ del list_vertices[len(list_vertices) - 1]
+ l_extent = [_get_extent_from_vertices(v) for v in list_vertices]
+ # TODO If North Pole enclosing add 90 (or pi/2) to y_max
+ # TODO If South Pole enclosing, add -90 (or pi/2) to y_min
+ # Union the list of extents in order to be non-overlapping
+ if len(l_extent) > 1:
+ l_extent = get_non_overlapping_list_extents(l_extent)
+ # Backconversion to radians if asked
+ if radians:
+ l_extent = [np.deg2rad(ext) for ext in l_extent]
+ return l_extent
+def _polygon_contain_point(vertices, point):
+ # - Should not use/call a SPolygon method/class !
+ # - To be used in get_list_extent_from_vertices
+ # - To be called within a SPolygon.contains_point method
+ return None
+def _check_extent_topology_validity(list_extent):
+ p = MultiPolygon([Polygon.from_bounds(*bounds_from_extent(ext)) for ext in list_extent])
+ try:
+ p.intersects(p)
+ except Exception: # TODO: specify Error !!! Shapely TopologicalError
+ raise ValueError("The Sextent is not valid. The composing extents must not overlap each other.")
+def get_list_connected_pairs(pairs):
+ """Return a list of connected element pairs.
+ Given a list of pairs, it returns a list of connected pairs.
+ Example: [(1,0), (0,2), (4,5), (3,1)] --> [[1,0,2,3], [4,5]]
+ Taken from:
+ https://stackoverflow.com/questions/28980797/given-n-tuples-representing-pairs-return-a-list-with-connected-tuples
+ """
+ lists_by_element = {}
+ def make_new_list_for(x, y):
+ lists_by_element[x] = lists_by_element[y] = [x, y]
+ def add_element_to_list(lst, el):
+ lst.append(el)
+ lists_by_element[el] = lst
+ def merge_lists(lst1, lst2):
+ merged_list = lst1 + lst2
+ for el in merged_list:
+ lists_by_element[el] = merged_list
+ for x, y in pairs:
+ xList = lists_by_element.get(x)
+ yList = lists_by_element.get(y)
+ if not xList and not yList:
+ make_new_list_for(x, y)
+ if xList and not yList:
+ add_element_to_list(xList, y)
+ if yList and not xList:
+ add_element_to_list(yList, x)
+ if xList and yList and xList != yList:
+ merge_lists(xList, yList)
+ # return the unique lists present in the dictionary
+ return list(set(tuple(el) for el in lists_by_element.values()))
+def _cascade_polygon_union(list_polygons):
+ """Union list of SPolygon(s) togethers."""
+ import itertools
+ # Retrieve possible polygon intersections
+ n_polygons = len(list_polygons)
+ list_combo_pairs = list(itertools.combinations(range(n_polygons), 2))
+ idx_combo_pairs_inter = np.where([list_polygons[i].intersects(list_polygons[j]) for i, j in list_combo_pairs])[0]
+ # If no intersection, returns all polygons
+ if len(idx_combo_pairs_inter) == 0:
+ return SMultiPolygon(list_polygons)
+ # Otherwise, union intersecting polygons
+ list_combo_pairs_inter = [list_combo_pairs[i] for i in idx_combo_pairs_inter]
+ list_inter_indices = get_list_connected_pairs(list_combo_pairs_inter)
+ list_union = []
+ for inter_indices in list_inter_indices:
+ union_p = list_polygons[inter_indices[0]]
+ for i in inter_indices[1:]:
+ union_p = list_polygons[i].union(union_p) # This return always a SPolygon
+ list_union.append(union_p)
+ # Identify non-intersecting polygons
+ idx_poly_inter = np.unique(list_combo_pairs_inter)
+ idx_poly_non_inter = list(set(range(n_polygons)).difference(set(idx_poly_inter)))
+ # Add non-intersecting polygons
+ if len(idx_poly_non_inter) > 0:
+ _ = [list_union.append(list_polygons[i]) for i in idx_poly_non_inter]
-class SCoordinate(object):
+ # Return unioned polygon
+ return SMultiPolygon(list_union)
+class SExtent(object):
+ """Spherical Extent.
+ SExtent longitudes are defined between -180 and 180 degree.
+ A spherical geometry crossing the anti-meridian will have an SExtent
+ composed of [..., 180, ...,...] and [-180, ..., ..., ...]
+ Intersection between SExtents does not include touching extents !
+ The extents composing an SExtent can not intersect/overlap each other.
+ There is not an upper limit on the number of extents composing SExtent.
+ They just need to not overlap.
+ """
+ def __init__(self, *args):
+ list_extent = list(list(locals().values())[1:][0])
+ self.list_extent = list_extent
+ if len(list_extent) == 0:
+ raise ValueError("No argument passed to SExtent.")
+ if isinstance(list_extent[0], (int, float, str)):
+ raise TypeError("You need to pass [lon_min, lon_max, lat_min, lat_max] list(s) to SExtent.")
+ if list_extent[0] is None:
+ raise ValueError("SExtent does not accept None as input argument.")
+ if len(list_extent[0]) == 0:
+ raise ValueError("An empty extent passed to SExtent.")
+ # Check valid data range
+ list_extent = [_check_valid_extent(ext) for ext in list_extent]
+ # Check extents does not overlaps
+ _check_extent_topology_validity(self.list_extent)
+ # If wraps around the earth across all longitudes, specify a unique -180, 180 extent
+ # if np.any([(ext[0] == -180) & (ext[1] == 180) for ext in list_extent]):
+ # lat_min = min([ext[2] for ext in list_extent])
+ # lat_max = max([ext[3] for ext in list_extent])
+ # list_extent = [[-180, 180, lat_min, lat_max]]
+ self.list_extent = list_extent
+ def __str__(self):
+ """Get simplified representation of SExtent."""
+ return str(self.list_extent)
+ def __repr__(self):
+ """Get simplified representation of SExtent."""
+ return str(self.list_extent)
+ def __iter__(self):
+ """Get list_extent iterator."""
+ return self.list_extent.__iter__()
+ @property
+ def is_global(self):
+ """Check if the extent is global."""
+ if len(self.list_extent) != 1:
+ return False
+ if self.list_extent[0] == [-180, 180, -90, 90]:
+ return True
+ else:
+ return False
+ def intersects(self, other):
+ """Check if SExtent is intersecting the other SExtent.
+ Touching extent are considered to not intersect !
+ """
+ if not isinstance(other, SExtent):
+ raise TypeError("SExtent.intersects() expects a SExtent class instance.")
+ bl = (self.to_shapely().intersects(other.to_shapely()) and
+ not self.to_shapely().touches(other.to_shapely()))
+ return bl
+ def disjoint(self, other):
+ """Check if SExtent does not intersect (and do not touch) the other SExtent."""
+ if not isinstance(other, SExtent):
+ raise TypeError("SExtent.intersects() expects a SExtent class instance.")
+ return self.to_shapely().disjoint(other.to_shapely())
+ def within(self, other):
+ """Check if SExtent is within another SExtent."""
+ if not isinstance(other, SExtent):
+ raise TypeError("SExtent.within() expects a SExtent class instance.")
+ return self.to_shapely().within(other.to_shapely())
+ def contains(self, other):
+ """Check if the SExtent contains another SExtent."""
+ if not isinstance(other, SExtent):
+ raise TypeError("SExtent.contains() expects a SExtent class instance.")
+ return self.to_shapely().contains(other.to_shapely())
+ def touches(self, other):
+ """Check if SExtent external touches another SExtent."""
+ if not isinstance(other, SExtent):
+ raise TypeError("SExtent.touches() expects a SExtent class instance.")
+ return self.to_shapely().touches(other.to_shapely())
+ def equals(self, other):
+ """Check if SExtent is equals to SExtent."""
+ if not isinstance(other, SExtent):
+ raise TypeError("SExtent.equals() expects a SExtent class instance.")
+ return self.to_shapely().equals(other.to_shapely())
+ def to_shapely(self):
+ """Return the shapely extent rectangle(s) polygon(s)."""
+ return MultiPolygon([Polygon.from_bounds(*bounds_from_extent(ext)) for ext in self.list_extent])
+ def plot(self, ax=None, facecolor='orange', edgecolor='black', alpha=0.4, **kwargs):
+ """Plot the SLine using Cartopy."""
+ import cartopy.crs as ccrs
+ import matplotlib.pyplot as plt
+ # Retrieve shapely polygon
+ geom = self.to_shapely()
+ # Create figure if ax not provided
+ ax_not_provided = False
+ if ax is None:
+ ax_not_provided = True
+ proj_crs = ccrs.PlateCarree()
+ fig, ax = plt.subplots(subplot_kw=dict(projection=proj_crs))
+ # Add extent polygon
+ ax.add_geometries([geom], crs=ccrs.PlateCarree(),
+ facecolor=facecolor, edgecolor=edgecolor, alpha=alpha,
+ **kwargs)
+ # Beautify plot by default
+ if ax_not_provided:
+ ax.stock_img()
+ ax.coastlines()
+ gl = ax.gridlines(draw_labels=True, linestyle='--')
+ gl.xlabels_top = False
+ gl.ylabels_right = False
+ return ax
+class SPoint(object):
"""Spherical coordinates.
The ``lon`` and ``lat`` coordinates should be provided in radians.
def __init__(self, lon, lat):
self.lon = lon
self.lat = lat
+ @property
+ def vertices(self):
+ """Return SPoint vertices in a ndarray of shape [1,2]."""
+ return np.array([self.lon, self.lat])[None, :]
def cross2cart(self, point):
"""Compute the cross product, and convert to cartesian coordinates."""
lat1 = self.lat
@@ -65,7 +382,10 @@ def to_cart(self):
def distance(self, point):
- """Get distance using Vincenty formula."""
+ """Get distance using Vincenty formula.
+ The result must be multiplied by Earth radius to obtain distance in m or km.
+ """
dlambda = self.lon - point.lon
num = ((np.cos(point.lat) * np.sin(dlambda)) ** 2 +
(np.cos(self.lat) * np.sin(point.lat) -
@@ -77,7 +397,10 @@ def distance(self, point):
return np.arctan2(num ** .5, den)
def hdistance(self, point):
- """Get distance using Haversine formula."""
+ """Get distance using Haversine formula.
+ The result must be multiplied by Earth radius to obtain distance in m or km.
+ """
return 2 * np.arcsin((np.sin((point.lat - self.lat) / 2.0) ** 2.0 +
np.cos(point.lat) * np.cos(self.lat) *
np.sin((point.lon - self.lon) / 2.0) ** 2.0) ** .5)
@@ -102,6 +425,34 @@ def __iter__(self):
"""Get iterator over lon/lat pairs."""
return zip([self.lon, self.lat]).__iter__()
+ def plot(self, ax=None, color='blue', alpha=1, **kwargs):
+ """Plot the SPoint using Cartopy."""
+ import cartopy.crs as ccrs
+ import matplotlib.pyplot as plt
+ # Create figure if ax not provided
+ ax_not_provided = False
+ if ax is None:
+ ax_not_provided = True
+ proj_crs = ccrs.PlateCarree()
+ fig, ax = plt.subplots(subplot_kw=dict(projection=proj_crs))
+ # Plot Points
+ ax.scatter(x=np.rad2deg(self.vertices[:, 0]),
+ y=np.rad2deg(self.vertices[:, 1]),
+ color=color,
+ alpha=alpha, **kwargs)
+ # Beautify plot by default
+ if ax_not_provided:
+ ax.stock_img()
+ ax.coastlines()
+ gl = ax.gridlines(draw_labels=True, linestyle='--')
+ gl.xlabels_top = False
+ gl.ylabels_right = False
+ return ax
class CCoordinate(object):
"""Cartesian coordinates."""
@@ -167,49 +518,92 @@ def __rmul__(self, other):
def to_spherical(self):
"""Convert to Spherical coordinate object."""
- return SCoordinate(np.arctan2(self.cart[1], self.cart[0]),
- np.arcsin(self.cart[2]))
-EPSILON = 0.0000001
+ return SPoint(np.arctan2(self.cart[1], self.cart[0]),
+ np.arcsin(self.cart[2]))
-def modpi(val, mod=np.pi):
- """Put *val* between -*mod* and *mod*."""
- return (val + mod) % (2 * mod) - mod
+class SArc(object):
+ """An arc of the great circle between two points.
-class Arc(object):
- """An arc of the great circle between two points."""
+ A GreatCircle Arc is defined as the shortest tracks between two points.
+ An arc is defined as the shortest tracks between two points.
+ """
def __init__(self, start, end):
self.start, self.end = start, end
+ def __hash__(self):
+ """Define SArc hash to enable LRU caching of arc.intersection."""
+ return hash((self.start.lon, self.start.lat, self.end.lon, self.end.lat))
def __eq__(self, other):
"""Check equality."""
if self.start == other.start and self.end == other.end:
- return 1
- return 0
+ return True
+ return False
def __ne__(self, other):
"""Check not equal comparison."""
return not self.__eq__(other)
def __str__(self):
- """Get simplified representation."""
+ """Get simplified representation in lat/lon degrees."""
return str(self.start) + " -> " + str(self.end)
def __repr__(self):
- """Get simplified representation."""
+ """Get simplified representation in lat/lon degrees."""
return str(self.start) + " -> " + str(self.end)
+ @property
+ def vertices(self):
+ """Get start SPoint and end SPoint vertices array."""
+ return self.start.vertices, self.end.vertices
+ def to_line(self):
+ """Convert to SLine."""
+ vertices = np.vstack(self.vertices)
+ return SLine(vertices)
+ def to_shapely(self):
+ """Convert to Shapely MultiLineString."""
+ return self.to_line().to_shapely()
+ def plot(self, ax=None, edgecolor='black', alpha=0.4, **kwargs):
+ """Plot the SArc using Cartopy."""
+ import cartopy.crs as ccrs
+ import matplotlib.pyplot as plt
+ # Retrieve shapely polygon
+ geom = self.to_shapely()
+ # Create figure if ax not provided
+ ax_not_provided = False
+ if ax is None:
+ ax_not_provided = True
+ proj_crs = ccrs.PlateCarree()
+ fig, ax = plt.subplots(subplot_kw=dict(projection=proj_crs))
+ # Plot polygon
+ ax.add_geometries([geom], crs=ccrs.Geodetic(),
+ facecolor='none',
+ edgecolor=edgecolor, alpha=alpha,
+ **kwargs)
+ # Beautify plot by default
+ if ax_not_provided:
+ ax.stock_img()
+ ax.coastlines() # ax.add_feature(cfeature.COASTLINE.with_scale('110m'), linewidth=0.75)
+ gl = ax.gridlines(draw_labels=True, linestyle='--')
+ gl.xlabels_top = False
+ gl.ylabels_right = False
+ return ax
def angle(self, other_arc):
- """Oriented angle between two arcs.
+ """Get oriented angle between two consecutive arcs.
Angle in radians. A straight line will be 0. A clockwise path
will be a negative angle and counter-clockwise will be positive.
if self.start == other_arc.start:
a__ = self.start
@@ -247,18 +641,22 @@ def angle(self, other_arc):
return angle
- def intersections(self, other_arc):
+ def _great_circle_intersections(self, other_arc):
"""Give the two intersections of the greats circles defined by the current arc and *other_arc*.
From http://williams.best.vwh.net/intersect.htm
if self.end.lon - self.start.lon > np.pi:
+ self = copy.deepcopy(self)
self.end.lon -= 2 * np.pi
if other_arc.end.lon - other_arc.start.lon > np.pi:
+ other_arc = copy.deepcopy(other_arc)
other_arc.end.lon -= 2 * np.pi
if self.end.lon - self.start.lon < -np.pi:
+ self = copy.deepcopy(self)
self.end.lon += 2 * np.pi
if other_arc.end.lon - other_arc.start.lon < -np.pi:
+ other_arc = copy.deepcopy(other_arc)
other_arc.end.lon += 2 * np.pi
ea_ = self.start.cross2cart(self.end).normalize()
@@ -269,8 +667,8 @@ def intersections(self, other_arc):
np.sqrt(cross.cart[0] ** 2 + cross.cart[1] ** 2))
lon = np.arctan2(cross.cart[1], cross.cart[0])
- return (SCoordinate(lon, lat),
- SCoordinate(modpi(lon + np.pi), -lat))
+ return (SPoint(lon, lat),
+ SPoint(unwrap_radians(lon + np.pi), -lat))
def intersects(self, other_arc):
"""Check if the current arc and the *other_arc* intersect.
@@ -279,73 +677,349 @@ def intersects(self, other_arc):
return bool(self.intersection(other_arc))
- def intersection(self, other_arc):
- """Return where, if the current arc and the *other_arc* intersect.
+ def intersection_point(self, other_arc):
+ """Compute the intersection point between two arcs.
- None is returned if there is not intersection. An arc is defined
- as the shortest tracks between two points.
+ If arc and *other_arc* intersect, it returns the intersection SPoint.
+ If arc and *other_arc* does not intersect, it returns None.
if self == other_arc:
return None
+ great_circles_intersection_spoints = self._great_circle_intersections(other_arc)
- for i in self.intersections(other_arc):
- a__ = self.start
- b__ = self.end
- c__ = other_arc.start
- d__ = other_arc.end
+ for spoint in great_circles_intersection_spoints:
+ a = self.start
+ b = self.end
+ c = other_arc.start
+ d = other_arc.end
- ab_ = a__.hdistance(b__)
- cd_ = c__.hdistance(d__)
+ ab_dist = a.hdistance(b)
+ cd_dist = c.hdistance(d)
+ ap_dist = a.hdistance(spoint)
+ bp_dist = b.hdistance(spoint)
+ cp_dist = c.hdistance(spoint)
+ dp_dist = d.hdistance(spoint)
+ if (((spoint in (a, b)) or (abs(ap_dist + bp_dist - ab_dist) < EPSILON)) and
+ ((spoint in (c, d)) or (abs(cp_dist + dp_dist - cd_dist) < EPSILON))):
+ return spoint
- if(((i in (a__, b__)) or
- (abs(a__.hdistance(i) + b__.hdistance(i) - ab_) < EPSILON)) and
- ((i in (c__, d__)) or
- (abs(c__.hdistance(i) + d__.hdistance(i) - cd_) < EPSILON))):
- return i
return None
- def get_next_intersection(self, arcs, known_inter=None):
- """Get the next intersection between the current arc and *arcs*."""
- res = []
+ def intersection(self, other_arc):
+ """Give the two intersections of the greats circles defined by the current arc and *other_arc*.
+ Use _great_circle_intersections instead.
+ """
+ warnings.warn("'SArc.intersection' is deprecated, use 'intersection_point' instead.",
+ PendingDeprecationWarning)
+ return self.intersection_point(other_arc)
+ def intersections(self, other_arc):
+ """Compute the intersection point between two arcs.
+ Use intersection_point instead.
+ """
+ warnings.warn("'SArc.intersections' is deprecated, use '_great_circle_intersections' instead.",
+ PendingDeprecationWarning)
+ return self._great_circle_intersections(other_arc)
+ def get_next_intersection(self, arcs, known_intersection_spoint=None):
+ """Get the next intersection between the current arc and *arcs*.
+ It return a tuple with the intersecting point and the arc within *arcs*
+ that intersect the self arc.
+ """
+ list_intersection_spoint_arc = []
for arc in arcs:
- inter = self.intersection(arc)
- if (inter is not None and
- inter != arc.end and
- inter != self.end):
- res.append((inter, arc))
+ spoint = self.intersection_point(arc)
+ if (spoint is not None and spoint != arc.end and spoint != self.end):
+ list_intersection_spoint_arc.append((spoint, arc))
def dist(args):
"""Get distance key."""
return self.start.distance(args[0])
take_next = False
- for inter, arc in sorted(res, key=dist):
- if known_inter is not None:
- if known_inter == inter:
+ for spoint, arc in sorted(list_intersection_spoint_arc, key=dist):
+ if known_intersection_spoint is not None:
+ if known_intersection_spoint == spoint:
take_next = True
elif take_next:
- return inter, arc
+ return spoint, arc
- return inter, arc
+ return spoint, arc
return None, None
+ def midpoint(self, ellips='WGS84'):
+ """Return the SArc midpoint coordinate."""
+ geod = pyproj.Geod(ellps=ellips)
+ lon_start = self.start.lon
+ lon_end = self.end.lon
+ lat_start = self.start.lat
+ lat_end = self.end.lat
+ lon_mid, lat_mid = geod.npts(lon_start, lat_start, lon_end, lat_end, npts=1, radians=True)[0]
+ return SPoint(lon_mid, lat_mid)
+ def segmentize(self, npts=0, max_distance=0, ellips='WGS84'):
+ """Segmentize the spherical SArc.
-class SphPolygon:
- """Spherical polygon.
+ It returns an SLine.
- Represents a polygon on a spherical geoid. Initialise with
- an ndarray of shape ``[N, 2]`` where the first column contains longitudes
- and the second column contains latitudes. The units should be in radians.
- The inside of the polygon is defined by the vertices being defined clockwise
- around it.
+ npts or max_distance are mutually exclusively. Specify one of them.
+ max_distance must be provided in kilometers.
+ """
+ if npts != 0:
+ npts = npts + 2 # + 2 to account for initial and terminus
+ geod = pyproj.Geod(ellps=ellips)
+ lon_start = self.start.lon
+ lon_end = self.end.lon
+ lat_start = self.start.lat
+ lat_end = self.end.lat
+ points = geod.inv_intermediate(lon_start, lat_start, lon_end, lat_end,
+ del_s=max_distance,
+ npts=npts,
+ radians=True,
+ initial_idx=0, terminus_idx=0)
+ lons, lats = (points.lons, points.lats)
+ lons = np.asarray(lons)
+ lats = np.asarray(lats)
+ vertices = np.stack((lons, lats)).T
+ return SLine(vertices)
+class SLine(object):
+ """A spherical line composed of great circle arcs."""
+ def __init__(self, vertices):
+ """Initialise SLine object.
+ Parameters
+ ----------
+ vertices : np.ndarray
+ Array of shape ``[N, 2]`` with ``N`` points describing a line.
+ The first column describes longitudes, the second the latitudes.
+ Units should be in radians.
+ """
+ # Check vertices shape is correct
+ if not isinstance(vertices, np.ndarray):
+ raise TypeError("SLine expects a numpy ndarray.")
+ vertices = remove_duplicated_vertices(vertices)
+ if len(vertices.shape) != 2 or vertices.shape[1] != 2 or vertices.shape[0] < 2:
+ raise ValueError("SLine expects a numpy ndarray with shape (>=2, 2).")
+ # Ensure vertices precision
+ vertices = vertices.astype(np.float64, copy=False)
+ # Define SLine composing arcs
+ lon = vertices[:, 0]
+ lat = vertices[:, 1]
+ list_arcs = []
+ for i in range(len(lon) - 1):
+ list_arcs.append(SArc(SPoint(lon[i], lat[i]),
+ SPoint(lon[i + 1], lat[i + 1])))
+ self.vertices = vertices
+ self.lon = lon
+ self.lat = lat
+ self._list_arcs = list_arcs
+ def __getitem__(self, i):
+ """Subset SLine.
+ If an integer is provided, it return the corresponding SArc.
+ If using slices, it return the SLine subset.
+ """
+ if isinstance(i, int):
+ return self._list_arcs[i]
+ if isinstance(i, slice): # i.e. sline[1:5]
+ n_indices = i.stop - i.start
+ if n_indices < 2:
+ raise ValueError("Keep at least 2 SArc when subsetting SLine.")
+ import copy
+ i = slice(i.start, i.stop + 1, i.step)
+ self = copy.copy(self)
+ self.vertices = self.vertices[i, :]
+ self.lon = self.lon[i]
+ self.lat = self.lat[i]
+ self._list_arcs = self._list_arcs[i]
+ return self
+ else:
+ raise TypeError("Either subset the SLine with an integer "
+ "(to get an SArc) or with a slice to retrieve "
+ "a subsetted SLine.")
- The optional second argument ``radius`` indicates the radius of the
- spherical geoid on which calculations occur.
+ def __len__(self):
+ """Return the number of SArc(s) composing SLine."""
+ return len(self._list_arcs)
+ def __iter__(self):
+ """Return iterator of SLine composing SArc(s)."""
+ return self._list_arcs.__iter__()
+ def get_parallel_lines(self, distance=0, ellips="WGS84"):
+ """Return 2 SLine(s) at a given distance parallel to the current SLine.
+ The distance should be provided in km.
+ """
+ if distance <= 0:
+ raise ValueError("SLine.get_parallel_lines expects a "
+ "distance in km larger than 0.")
+ geod = pyproj.Geod(ellps=ellips)
+ lon_start = self.vertices[0:-1, 0]
+ lat_start = self.vertices[0:-1, 1]
+ lon_end = self.vertices[1:, 0]
+ lat_end = self.vertices[1:, 1]
+ # Retrieve forward azimuths
+ az12, _, _ = geod.inv(lon_start, lat_start,
+ lon_end, lat_end,
+ radians=True)
+ # Define orthogonal azimuth
+ az12_top = az12 - np.pi / 2
+ az12_bottom = az12 + np.pi / 2
+ # Include last vertex to maintain the same number of vertices
+ lon_start = np.append(lon_start, self.vertices[-1, 0])
+ lat_start = np.append(lat_start, self.vertices[-1, 1])
+ az12_top = np.append(az12_top, az12_top[-1])
+ az12_bottom = np.append(az12_bottom, az12_bottom[-1])
+ # Retrieve the point coordinate at dist distance and forward azimuth
+ dist = np.ones(az12_top.shape) * distance * 1000 # convert km to m
+ lon_top, lat_top, _ = geod.fwd(lon_start, lat_start,
+ az12_top,
+ dist=dist,
+ radians=True)
+ lon_bottom, lat_bottom, _ = geod.fwd(lon_start, lat_start,
+ az12_bottom,
+ dist=dist,
+ radians=True)
+ # Assemble line vertices
+ top_vertices = np.stack((lon_top, lat_top)).T
+ bottom_vertices = np.stack((lon_bottom, lat_bottom)).T
+ return (SLine(top_vertices), SLine(bottom_vertices))
+ def buffer(self, distance, ellips="WGS84"):
+ """Return a SPolygon buffering on both sides of SLine.
+ The distance must be provided in kilometers.
+ """
+ left_sline, right_sline = self.get_parallel_lines(distance=distance,
+ ellips=ellips)
+ # Assemble polygon vertices
+ polygon_vertices = np.vstack((left_sline.vertices,
+ right_sline.vertices[::-1, :]))
+ polygon_vertices = np.vstack((polygon_vertices,
+ polygon_vertices[0, :]))
+ return SPolygon(polygon_vertices)
+ def segmentize(self, npts=0, max_distance=0, ellips='WGS84'):
+ """Subdivide each SArc of SLine in n steps."""
+ list_vertices = [arc.segmentize(npts=npts,
+ max_distance=max_distance,
+ ellips=ellips).vertices[:-1, :] for arc in self._list_arcs[0:-1]]
+ list_vertices.append(self._list_arcs[-1].segmentize(npts=npts,
+ max_distance=max_distance,
+ ellips=ellips).vertices)
+ vertices = np.vstack(list_vertices)
+ return SLine(vertices)
+ def intersects(self, other):
+ """Return True if two SLine intersects each other."""
+ if not isinstance(other, SLine):
+ raise TypeError("SLine.intersects expects an SLine.")
+ for arc1 in self._list_arcs:
+ for arc2 in other:
+ spoint = arc1.intersection_point(arc2)
+ if spoint is not None:
+ return True
+ return False
+ def intersection_points(self, other):
+ """Return the intersection points between two SLines."""
+ if not isinstance(other, SLine):
+ raise TypeError("SLine.intersection_points expects an SLine.")
+ list_spoints = []
+ for arc1 in self._list_arcs:
+ for arc2 in other:
+ spoint = arc1.intersection_point(arc2)
+ if spoint is not None:
+ list_spoints.append(spoint)
+ if len(list_spoints) == 0:
+ list_spoints = None
+ return list_spoints
+ def to_shapely(self):
+ """Convert to Shapely MultiLineString."""
+ list_vertices = get_antimeridian_safe_line_vertices(np.rad2deg(self.vertices))
+ return MultiLineString(list_vertices)
+ def _repr_svg_(self):
+ """Display SLine in the Ipython terminal."""
+ return self.to_shapely()._repr_svg_()
+ def plot(self, ax=None, edgecolor='black', alpha=0.4, **kwargs):
+ """Plot the SLine using Cartopy."""
+ import cartopy.crs as ccrs
+ import matplotlib.pyplot as plt
+ # Retrieve shapely polygon
+ geom = self.to_shapely()
+ # Create figure if ax not provided
+ ax_not_provided = False
+ if ax is None:
+ ax_not_provided = True
+ proj_crs = ccrs.PlateCarree()
+ fig, ax = plt.subplots(subplot_kw=dict(projection=proj_crs))
+ # Plot polygon
+ ax.add_geometries([geom], crs=ccrs.Geodetic(),
+ facecolor='none',
+ edgecolor=edgecolor, alpha=alpha,
+ **kwargs)
+ # Beautify plot by default
+ if ax_not_provided:
+ ax.stock_img()
+ ax.coastlines()
+ gl = ax.gridlines(draw_labels=True, linestyle='--')
+ gl.xlabels_top = False
+ gl.ylabels_right = False
+ return ax
+class SPolygon:
+ """Spherical Polygon.
+ A Spherical Polygon is a collection of points, each of which
+ are connected by edges which consist of arcs of great circles.
+ The points are ordered clockwise, such that if you walk along
+ the surface of the earth along the polygon's edges, the
+ 'inside' of the polygon will lie to your right.
+ Spherical Polygon is initialised with a polygon vertices
+ ndarray of shape ``[N, 2]``.
+ The first column must contains longitudes, the second column
+ must contains latitudes.
+ The units must be in radians.
+ The inside of the polygon is defined by the vertices
+ being defined clockwise around it.
+ No check is performed to assess vertices order !
+ The Polygon is valid only if is not self-intersecting with itself.
+ If the last vertex is equal to the first, the last vertex
+ is removed automatically.
- def __init__(self, vertices, radius=1):
+ def __init__(self, vertices, check_validity=False):
"""Initialise SphPolygon object.
@@ -353,38 +1027,96 @@ def __init__(self, vertices, radius=1):
points describing a polygon clockwise. First column
describes longitudes, second column describes latitudes. Units
should be in radians.
- radius (optional, number): Radius of spherical planet.
- self.vertices = vertices.astype(np.float64, copy=False)
+ # Check vertices shape is correct
+ if not isinstance(vertices, np.ndarray):
+ raise TypeError("SPolygon expects a numpy ndarray.")
+ if len(vertices.shape) != 2 or vertices.shape[1] != 2:
+ raise ValueError("SPolygon expects an array of shape (n, 2).")
+ if vertices.shape[0] < 3:
+ raise ValueError("SPolygon expects an array of shape (>=3, 2).")
+ # Check last vertices is not equal to first
+ vertices = _unclose_vertices(vertices)
+ # Remove duplicated vertices
+ vertices = remove_duplicated_vertices(vertices, tol=1e-08)
+ # Check it has at least 3 vertices
+ if vertices.shape[0] < 3:
+ raise ValueError("This is likely an error caused by current tol "
+ "argument in remove_duplicated_vertices. "
+ "Please report issue.")
+ # Define SExtent
+ self.extent = SExtent(*get_list_extent_from_vertices(np.rad2deg(vertices),
+ radians=False))
+ # Define vertices
+ vertices = vertices.astype(np.float64, copy=False) # important !
+ self.vertices = vertices
self.lon = self.vertices[:, 0]
self.lat = self.vertices[:, 1]
- self.radius = radius
- self.cvertices = np.array([np.cos(self.lat) * np.cos(self.lon),
- np.cos(self.lat) * np.sin(self.lon),
- np.sin(self.lat)]).T * radius
- self.x__ = self.cvertices[:, 0]
- self.y__ = self.cvertices[:, 1]
- self.z__ = self.cvertices[:, 2]
+ # Add geoms for compatibility with SMultiPolygon
+ self.geoms = [self]
+ # Check for not self-intersection (valid geometry)
+ # - This function is quite slow
+ if check_validity:
+ if not self.is_valid_geometry:
+ raise ValueError("The provided vertices cause an invalid "
+ "self-intersecting polygon.")
+ @property
+ def is_valid_geometry(self):
+ """Check that there are not self intersections."""
+ arcs = [edge for edge in self.aedges()]
+ list_edges_intersection_spoints = self._get_all_arc_intersection_points(arcs, arcs)
+ if len(list_edges_intersection_spoints) == 0:
+ return True
+ else:
+ return False
+ def __getitem__(self, i):
+ """Get the SPolygon.
+ Defined for compatibility with SMultiPolygon. The only valid i value is 0.
+ """
+ return self.geoms[i]
+ def __len__(self):
+ """Get the number of SPolygon.
+ Defined for compatibility with SMultiPolygon. It returns 1.""
+ """
+ return len(self.geoms)
+ def __iter__(self):
+ """Get an iterator returning the SPolygon.
+ Defined for compatibility with SMultiPolygon. It returns SPolygon.""
+ """
+ return self.geoms.__iter__()
+ def __str__(self):
+ """Get numpy representation of vertices."""
+ return str(np.rad2deg(self.vertices))
def invert(self):
"""Invert the polygon."""
self.vertices = np.flipud(self.vertices)
- self.cvertices = np.flipud(self.cvertices)
self.lon = self.vertices[:, 0]
self.lat = self.vertices[:, 1]
- self.x__ = self.cvertices[:, 0]
- self.y__ = self.cvertices[:, 1]
- self.z__ = self.cvertices[:, 2]
def inverse(self):
"""Return an inverse of the polygon."""
- return SphPolygon(np.flipud(self.vertices), radius=self.radius)
+ return SPolygon(np.flipud(self.vertices))
def aedges(self):
"""Get generator over the edges, in arcs of Coordinates."""
for (lon_start, lat_start), (lon_stop, lat_stop) in self.edges():
- yield Arc(SCoordinate(lon_start, lat_start),
- SCoordinate(lon_stop, lat_stop))
+ yield SArc(SPoint(lon_start, lat_start),
+ SPoint(lon_stop, lat_stop))
def edges(self):
"""Get generator over the edges, in geographical coordinates."""
@@ -392,33 +1124,33 @@ def edges(self):
yield (self.lon[i], self.lat[i]), (self.lon[i + 1], self.lat[i + 1])
yield (self.lon[i + 1], self.lat[i + 1]), (self.lon[0], self.lat[0])
- def area(self):
- """Find the area of a polygon.
+ def _area(self):
+ """Find the area of the polygon in units of square radii.
+ Note that the earth is not exactly spherical.
+ The equatorial radius is 6378 km, while the polar radius is 6357 km.
+ For more accurate area computations, use the area() function.
The inside of the polygon is defined by having the vertices enumerated
clockwise around it.
+ A polygon containing the entire planet would have area 4π.
- Uses the algorithm described in [bev1987]_.
- .. [bev1987] , Michael Bevis and Greg Cambareri,
+ Uses the algorithm described in [bev1987].
+ .. Michael Bevis and Greg Cambareri,
"Computing the area of a spherical polygon of arbitrary shape",
in *Mathematical Geology*, May 1987, Volume 19, Issue 4, pp 335-346.
- Note: The article mixes up longitudes and latitudes in equation 3! Look
- at the fortran code appendix for the correct version.
+ Note: The article mixes up longitudes and latitudes in equation 3!
+ Look at the fortran code appendix for the correct version.
+ The units are the square of the radius passed to the constructor.
- The units are the square of the radius passed to the constructor. For
- example, to calculate the area in km² of a polygon near the equator of a
+ For example, to calculate the area in km² of a polygon near the equator of a
spherical planet with a radius of 6371 km (similar to Earth):
- >>> pol = SphPolygon(np.deg2rad(np.array([[0., 0.], [0., 1.], [1., 1.], [1., 0.]])),
- radius=6371)
- >>> print(pol.area())
+ >>> radius = 6371 # [km]
+ >>> pol = SPolygon(np.deg2rad(np.array([[0., 0.], [0., 1.], [1., 1.], [1., 0.]])))
+ >>> print(pol.area()*radius**2 # [km²]
- If `SphPolygon` was constructed without passing any units, the result
- has units of square radii (i.e., the polygon containing the entire
- planet would have area 4π).
phi_a = self.lat
phi_p = self.lat.take(np.arange(len(self.lat)) + 1, mode="wrap")
@@ -440,131 +1172,702 @@ def area(self):
alpha = new_lons_a - new_lons_b
alpha[alpha < 0] += 2 * np.pi
- return (sum(alpha) - (len(self.lon) - 2) * np.pi) * self.radius ** 2
+ return (sum(alpha) - (len(self.lon) - 2) * np.pi)
- def _bool_oper(self, other, sign=1):
- """Perform a boolean operation on this and *other* polygons.abs.
+ def area(self, ellps="WGS84"):
+ """Calculate the polygon area [in km²] using pyproj.
- By default, or when sign is 1, the union is perfomed. If sign is -1,
- the intersection of the polygons is returned.
+ ellps argument enable to specify custom ellipsoid for accurate area calculation.
+ """
+ geod = pyproj.Geod(ellps=ellps) # "sphere", maybe be more flexibile?
+ pyproj_area, _ = geod.polygon_area_perimeter(self.vertices[::-1, 0],
+ self.vertices[::-1, 1],
+ radians=True)
+ return pyproj_area
+ def perimeter(self, ellps="WGS84"):
+ """Calculate the polygon perimeter [in km] using pyproj.
- The algorithm works this way: Find an intersection between the two
- polygons. If none can be found, then the two polygons are either not
- overlapping, or one is entirely included in the other. Otherwise,
- follow the edges of a polygon until another intersection is
- encountered, at which point you start following the edges of the other
- polygon, and so on until you come back to the first intersection. In
- which direction to follow the edges of the polygons depends if you are
- interested in the union or the intersection of the two polygons.
+ ellps argument enable to specify custom ellipsoid for accurate perimeter calculation.
- def rotate_arcs(start_arc, arcs):
- idx = arcs.index(start_arc)
- return arcs[idx:] + arcs[:idx]
+ geod = pyproj.Geod(ellps=ellps) # "sphere", maybe be more flexibile?
+ _, pyproj_perimeter = geod.polygon_area_perimeter(self.vertices[::-1, 0],
+ self.vertices[::-1, 1],
+ radians=True)
+ return pyproj_perimeter
+ def equals_exact(self, other, rtol=1e-05, atol=1e-08):
+ """Check if self SPolygon is exactly equal to the other SPolygon.
+ This method uses exact coordinate equality, which requires coordinates
+ to be equal (within specified tolerance) and in the same order.
+ This is in contrast with the ``equals`` function which checks for spatial
+ (topological) equality.
+ """
+ if not isinstance(other, (SPolygon, SMultiPolygon, type(None))):
+ raise NotImplementedError
+ if isinstance(other, SMultiPolygon) or isinstance(other, type(None)):
+ return False
+ if (self.vertices.shape == other.vertices.shape and
+ np.allclose(self.vertices, other.vertices, rtol=rtol, atol=atol)):
+ return True
+ else:
+ return False
- arcs1 = [edge for edge in self.aedges()]
- arcs2 = [edge for edge in other.aedges()]
+ def _is_inside(self, other):
+ """Check if the polygon is entirely inside (not touching) the other.
+ This method must be used only if no intersection between self and other !
+ """
+ # Check first if extent is within
+ if not self.extent.within(other.extent):
+ return False
- nodes = []
+ # --------------------------------------------------------------------.
+ # Define antipodal points of first two vertices
+ anti_lon_0 = self.lon[0] + np.pi
+ if anti_lon_0 > np.pi:
+ anti_lon_0 -= np.pi * 2
- # find the first intersection, to start from.
- for edge1 in arcs1:
- inter, edge2 = edge1.get_next_intersection(arcs2)
- if inter is not None and inter != edge1.end and inter != edge2.end:
- break
+ anti_lon_1 = self.lon[1] + np.pi
+ if anti_lon_1 > np.pi:
+ anti_lon_1 -= np.pi * 2
- # if no intersection is found, find out if the one poly is included in
- # the other.
- if inter is None:
- polys = [0, self, other]
- if self._is_inside(other):
- return polys[-sign]
- if other._is_inside(self):
- return polys[sign]
+ anti_lat_0 = -self.lat[0]
+ anti_lat_1 = -self.lat[1]
+ # --------------------------------------------------------------------.
+ # Define ????
+ arc1 = SArc(SPoint(self.lon[1],
+ self.lat[1]),
+ SPoint(anti_lon_0,
+ anti_lat_0))
+ arc2 = SArc(SPoint(anti_lon_0,
+ anti_lat_0),
+ SPoint(anti_lon_1,
+ anti_lat_1))
+ arc3 = SArc(SPoint(anti_lon_1,
+ anti_lat_1),
+ SPoint(self.lon[0],
+ self.lat[0]))
+ # --------------------------------------------------------------------.
+ # Do what ???
+ other_arcs = [edge for edge in other.aedges()]
+ for arc in [arc1, arc2, arc3]:
+ inter, other_arc = arc.get_next_intersection(other_arcs)
+ if inter is not None:
+ sarc = SArc(arc.start, inter)
+ earc = SArc(inter, other_arc.end)
+ return sarc.angle(earc) < 0
+ return other._area() > (2 * np.pi)
+ @staticmethod
+ def _get_all_arc_intersection_points(arcs1, arcs2):
+ """Get the list of SArc(s) at intersection points.
+ It returns a list of format [(SPoint, SArc1, SArc2), ...]
+ """
+ list_edges_intersection_spoints = []
+ for arc1 in arcs1:
+ for arc2 in arcs2:
+ spoint = arc1.intersection_point(arc2)
+ if (spoint is not None and spoint != arc1.end and spoint != arc2.end):
+ list_edges_intersection_spoints.append((spoint, arc1, arc2))
+ return list_edges_intersection_spoints
+ def _bool_oper(self, other, sign=1):
+ """Perform a boolean operation on this and *other* polygons.
+ By default, or when sign is 1, the union is performed.
+ If sign is -1, the intersection of the polygons is returned.
+ The algorithm works this way:
+ 1. Find an intersection between the two polygons.
+ 2. If none can be found, then the two polygons are either not
+ overlapping, or one is entirely included in the other.
+ 3. Otherwise, follow the edges of a polygon until another
+ intersection is encountered, at which point you start following
+ the edges of the other polygon, and so on until you come
+ back to the first intersection.
+ In which direction to follow the edges of the polygons depends if
+ being interested in the union or the intersection of the two polygons.
+ The routine can deal with multiple intersections, but can return
+ unexpected results when the union is performed between 2 SPolygons
+ that have more than a single intersection.
+ """
+ # Iterate over other SMultiPolygon
+ if isinstance(other, SMultiPolygon):
+ if sign == -1: # intersection
+ return SMultiPolygon([self.intersection(p) for p in other])
+ else: # union
+ # First identify which polygons intersects
+ list_polygons = self.geoms + other.geoms
+ return _cascade_polygon_union(list_polygons)
+ # Logical speed ups
+ if isinstance(other, type(None)):
return None
- # starting from the intersection, follow the edges of one of the
- # polygons.
+ # If wishing the intersection and the extents do not intersects
+ if sign == -1 and not self.extent.intersects(other.extent):
+ return None
- while True:
- arcs1 = rotate_arcs(edge1, arcs1)
- arcs2 = rotate_arcs(edge2, arcs2)
+ # If vertices are equal, return one polygon
+ if self.equals_exact(other):
+ return self
- narcs1 = arcs1 + [edge1]
- narcs2 = arcs2 + [edge2]
+ # --------------------------------------------------------------------.
+ # Get list of SArc(s) for each polygon
+ arcs1 = [edge for edge in self.aedges()]
+ arcs2 = [edge for edge in other.aedges()]
- arc1 = Arc(inter, edge1.end)
- arc2 = Arc(inter, edge2.end)
+ # --------------------------------------------------------------------.
+ # Get list of intersection points and asssociated intersecting arcs
+ # - If arc1==arc2 (same direction) --> No intersection point !
+ # - It excludes intersection points which are equal to both the end
+ # coordinate of the arcs.
+ # - It includes intersection points if arc1 and arc2 are
+ # touching/overlapping but have different directions.
+ # TODO: This can still cause problems in the edge case where
+ # two polygons are just touching !!!
+ # The output of the intersection could result in a line ...
+ # The occurence of this is limited by the use the
+ # self.extent.intersects(other.extent) check above
+ list_edges_intersection_spoints = self._get_all_arc_intersection_points(arcs1, arcs2)
+ # --------------------------------------------------------------------.
+ # If no intersection points:
+ # - Find out if one poly is included in the other (or viceversa).
+ # - Or if the polygons are disjointm return None
+ n_intersection_points = len(list_edges_intersection_spoints)
+ if n_intersection_points == 0:
+ polys = ['dummy', self, other]
+ if self._is_inside(other): # within / equals (topologically)
+ return polys[-sign]
+ if other._is_inside(self): # contain / equals (topologically)
+ return polys[sign]
+ if sign == 1: # if searching for union
+ return SMultiPolygon([self, other])
+ # if sign=-1 (searching for intersection)
+ return None # disjoint
+ # --------------------------------------------------------------------.
+ # Define function to reorder the list of SArc(s)
+ def reorder_arcs(start_arc, arcs):
+ """Return a list of SArc starting with start_arc."""
+ idx = arcs.index(start_arc)
+ return arcs[idx:] + arcs[:idx]
+ # --------------------------------------------------------------------.
+ # Starting from an intersection point, follow the edges of a polygon.
+ # Restart the loop if not all intersection points have been encountered
+ list_intersection_spoints = [tpl[0] for tpl in list_edges_intersection_spoints]
+ original_list_intersection_spoints = list_intersection_spoints.copy()
+ original_arcs1 = arcs1.copy()
+ original_arcs2 = arcs2.copy()
+ # Initialize
+ list_vertices = []
+ while len(list_intersection_spoints) > 0:
+ # (Re)initialize
+ idx = original_list_intersection_spoints.index(list_intersection_spoints[0])
+ intersection_spoint, edge1, edge2 = list_edges_intersection_spoints[idx]
+ arcs1 = original_arcs1
+ arcs2 = original_arcs2
+ nodes = []
+ # Reorder arcs so that intersecting arc is at first position
+ arcs1 = reorder_arcs(edge1, arcs1)
+ arcs2 = reorder_arcs(edge2, arcs2)
+ # "Close" the polygon with the first arc
+ c_arcs1 = arcs1 + [edge1]
+ c_arcs2 = arcs2 + [edge2]
+ # Get the first SArc from intersection point to original SArc end
+ arc1 = SArc(intersection_spoint, edge1.end)
+ arc2 = SArc(intersection_spoint, edge2.end)
+ # Swap arcs depending on intersection/union
if np.sign(arc1.angle(arc2)) != sign:
arcs1, arcs2 = arcs2, arcs1
- narcs1, narcs2 = narcs2, narcs1
- nodes.append(inter)
- for edge1 in narcs1:
- inter, edge2 = edge1.get_next_intersection(narcs2, inter)
- if inter is not None:
+ c_arcs1, c_arcs2 = c_arcs2, c_arcs1
+ # Append start coordinate to nodes list
+ nodes.append(intersection_spoint)
+ # Update the list of intersection points to still encounter
+ try:
+ list_intersection_spoints.remove(intersection_spoint)
+ except Exception: # TODO ValueError? AttributeError
+ pass
+ # Travel along a list of arcs till next intersection
+ for edge1 in c_arcs1:
+ intersection_spoint, edge2 = edge1.get_next_intersection(c_arcs2, intersection_spoint)
+ # When reaching another intersection point, exit the for loop
+ if intersection_spoint is not None:
+ # Otherwise add the edges as vertices of the intersecting/union polygon
elif len(nodes) > 0 and edge1.end not in [nodes[-1], nodes[0]]:
- if inter is None and len(nodes) > 2 and nodes[-1] == nodes[0]:
- nodes = nodes[:-1]
- break
- if inter == nodes[0]:
- break
- return SphPolygon(np.array([(node.lon, node.lat) for node in nodes]), radius=self.radius)
+ # If returned to the starting point, the polygon has been identified.
+ # - Remove last node if equal to the first
+ if intersection_spoint is None:
+ if len(nodes) > 2 and nodes[-1] == nodes[0]:
+ nodes = nodes[:-1]
+ elif intersection_spoint == nodes[0]:
+ # Add identified polygon vertices to the list of polygons
+ # - Ensure longitudes are within [-pi, pi]
+ vertices = np.array([(unwrap_radians(node.lon), node.lat) for node in nodes])
+ list_vertices.append(vertices)
+ # Remove duplicate vertices
+ list_vertices = [remove_duplicated_vertices(v, tol=1e-08) for v in list_vertices]
+ # Check more than two nodes have been found (otherwise line ...)
+ list_vertices = [v for v in list_vertices if len(v) > 2]
+ if len(list_vertices) == 0:
+ return None
+ # Retrieve list of SPolygon
+ list_SPolygons = [SPolygon(v) for v in list_vertices]
+ # If performing union, the first list element might contain the exterior,
+ # while the others the interiors (which are currently discarded)
+ if sign == 1 and len(list_SPolygons) > 1:
+ print("Union of polygons whose boundaries intersects in more than 4 points might be misleading.")
+ list_SPolygons = [list_SPolygons[0]]
+ # Return the SPolygon/SMultiPolygons
+ return SMultiPolygon(list_SPolygons)
def union(self, other):
"""Return the union of this and `other` polygon.
- NB! If the two polygons do not overlap (they have nothing in common) None is returned.
+ If the two polygons do not overlap (they have nothing in common),
+ a MultiPolygon is returned.
return self._bool_oper(other, 1)
def intersection(self, other):
- """Return the intersection of this and `other` polygon."""
+ """Return the intersection of this and `other` polygon.
+ NB! If the two polygons do not intersect (they have nothing in common)
+ None is returned.
+ """
return self._bool_oper(other, -1)
- def _is_inside(self, other):
- """Check if the polygon is entirely inside the other.
+ def within(self, other):
+ """Check if the polygon is entirely inside the other."""
+ # Check input instance
+ if not isinstance(other, (SPolygon, SMultiPolygon, type(None))):
+ raise NotImplementedError
+ # Iterate over other SMultiPolygon
+ if isinstance(other, SMultiPolygon):
+ return np.any([self.within(p) for p in other])
+ # Logical speed ups
+ if isinstance(other, type(None)):
+ return False
+ # Compute intersection
+ intersection = self.intersection(other)
+ # Check same area
+ if not isinstance(intersection, type(None)):
+ return np.abs(intersection._area() - self._area()) < EPSILON
+ else:
+ return False
+ def contains(self, other):
+ """Check if the polygon contains entirely the other polygon."""
+ if isinstance(other, SMultiPolygon):
+ return np.all([p.within(self) for p in other])
+ elif isinstance(other, SPolygon):
+ return other.within(self)
+ else:
+ raise NotImplementedError
- Should be used with :meth:`inter` first to check if the is a
- known intersection.
+ def equals(self, other):
+ """Test spatial topological equality between two SPolygon.
+ If A is within B and B is within A, A and B are considered equal.
- anti_lon_0 = self.lon[0] + np.pi
- if anti_lon_0 > np.pi:
- anti_lon_0 -= np.pi * 2
+ if not isinstance(other, (SPolygon, SMultiPolygon, type(None))):
+ raise NotImplementedError
+ if isinstance(other, SMultiPolygon) or isinstance(other, type(None)):
+ return False
+ if self.equals_exact(other):
+ return True
+ if self.within(other) and other.within(self):
+ return True
+ else:
+ return False
+ def intersects(self, other):
+ """Return True if intersect, within, contains, or equals other."""
+ intersection = self.intersection(other)
+ if isinstance(intersection, type(None)):
+ return False
+ if intersection:
+ return True
+ else:
+ return False
- anti_lon_1 = self.lon[1] + np.pi
- if anti_lon_1 > np.pi:
- anti_lon_1 -= np.pi * 2
+ def disjoint(self, other):
+ """Return True if the two polygons does not intersect."""
+ return not self.intersects(other)
- arc1 = Arc(SCoordinate(self.lon[1],
- self.lat[1]),
- SCoordinate(anti_lon_0,
- -self.lat[0]))
+ def overlap_fraction(self, other):
+ """Get the fraction of the current polygon covered by the *other* polygon."""
+ intersect_poly = self.intersection(other)
+ if intersect_poly is None:
+ return 0
+ else:
+ return intersect_poly._area() / self._area()
- arc2 = Arc(SCoordinate(anti_lon_0,
- -self.lat[0]),
- SCoordinate(anti_lon_1,
- -self.lat[1]))
+ def overlap_rate(self, other):
+ """Get the fraction of *other" polygon covered by the current polygon."""
+ intersect_poly = self.intersection(other)
+ if intersect_poly is None:
+ return 0
+ else:
+ return intersect_poly._area() / other._area()
+ def intersection_points(self, other):
+ """Get intersection points between 2 polygons."""
+ sline = self.to_line()
+ if isinstance(other, SLine):
+ other_slines = [other]
+ if isinstance(other, SPolygon):
+ other_slines = [other.to_line()]
+ if isinstance(other, SMultiPolygon):
+ other_slines = [o.to_line() for o in other]
+ list_spoints = [sline.intersection_points(o_sline) for o_sline in other_slines]
+ list_spoints = [item for sublist in list_spoints if sublist is not None for item in sublist]
+ # list_spoints = [p for p in list_spoints if p is not None]
+ return list_spoints
+ def to_line(self):
+ """Convert SPolygon to SLine."""
+ return SLine(_close_vertices(self.vertices))
+ def buffer(self, distance, ellips="WGS84"):
+ """Return an buffered SPolygon.
+ If distance [in km] is positive, it enlarge the SPolygon.
+ If distance [in km] is negative, it decrease the size of the SPolygon.
+ """
+ flag_sign = np.sign(distance)
+ distance = np.abs(distance)
+ left_sline, right_sline = self.to_line().get_parallel_lines(distance=distance, ellips=ellips)
+ if flag_sign == -1:
+ return SPolygon(right_sline.vertices)
+ else:
+ return SPolygon(left_sline.vertices)
+ def segmentize(self, npts=0, max_distance=0, ellips='WGS84'):
+ """Subdivide each SArc in n steps."""
+ sline = self.to_line()
+ list_vertices = [arc.segmentize(npts=npts,
+ max_distance=max_distance,
+ ellips=ellips).vertices[:-1, :] for arc in sline._list_arcs[0:-1]]
+ list_vertices.append(sline._list_arcs[-1].segmentize(npts=npts,
+ max_distance=max_distance,
+ ellips=ellips).vertices)
+ vertices = np.vstack(list_vertices)
+ return SPolygon(vertices)
+ def to_shapely(self):
+ """Convert to Shapely Polygon."""
+ return Polygon(np.rad2deg(self.vertices)[::-1])
+ def _repr_svg_(self):
+ """Display the SPolygon in the Ipython terminal."""
+ return self.to_shapely()._repr_svg_()
+ def plot(self, ax=None, facecolor=None, edgecolor='black', alpha=0.4, **kwargs):
+ """Plot the SPolygon using Cartopy."""
+ import cartopy.crs as ccrs
+ import matplotlib.pyplot as plt
+ # Retrieve shapely polygon
+ geom = self.to_shapely()
+ # Create figure if ax not provided
+ ax_not_provided = False
+ if ax is None:
+ ax_not_provided = True
+ proj_crs = ccrs.PlateCarree()
+ fig, ax = plt.subplots(subplot_kw=dict(projection=proj_crs))
+ # Plot polygon
+ ax.add_geometries([geom], crs=ccrs.Geodetic(),
+ facecolor=facecolor,
+ edgecolor=edgecolor,
+ alpha=alpha,
+ **kwargs)
+ # Beautify plot by default
+ if ax_not_provided:
+ ax.stock_img()
+ ax.coastlines()
+ gl = ax.gridlines(draw_labels=True, linestyle='--')
+ gl.xlabels_top = False
+ gl.ylabels_right = False
+ return ax
+class SMultiPolygon():
+ """A collection of one or more SPolygons.
+ The collection of SPolygon(s) must not overlap !
+ If component polygons overlap the collection is `invalid` and
+ some operations on it may fail.
+ """
- arc3 = Arc(SCoordinate(anti_lon_1,
- -self.lat[1]),
- SCoordinate(self.lon[0],
- self.lat[0]))
+ def __new__(cls, polygons, check_validity=False):
+ """Create SPolygon or SMultiPolygon object.
- other_arcs = [edge for edge in other.aedges()]
- for arc in [arc1, arc2, arc3]:
- inter, other_arc = arc.get_next_intersection(other_arcs)
- if inter is not None:
- sarc = Arc(arc.start, inter)
- earc = Arc(inter, other_arc.end)
- return sarc.angle(earc) < 0
- return other.area() > (2 * np.pi * other.radius ** 2)
+ If polygons is a list with only a single SPolygon, it returns a SPolygon.
+ None value present in the list are removed.
+ """
+ # Check providing a list of SPolygon only (and eventually None)
+ if not isinstance(polygons, list):
+ raise TypeError("Not providing a list of SPolygons to SMultiPolygon.")
+ if not np.all([isinstance(p, (SPolygon, type(None))) for p in polygons]):
+ raise ValueError("SMultiPolygon accepts only a list of SPolygon.")
+ # Remove possible None
+ idx_polygons = np.where([not isinstance(p, type(None)) for p in polygons])[0]
+ # If only None, return None
+ # - This is important when _bool_oper returns None (i.e. not intersecting)
+ if len(idx_polygons) == 0:
+ return None
- def __str__(self):
- """Get numpy representation of vertices."""
- return str(np.rad2deg(self.vertices))
+ # If a single SPolygon, return SPolygon
+ polygons = [polygons[i] for i in idx_polygons]
+ if len(polygons) == 1:
+ return polygons[0]
+ # Else create SMultiPolygon
+ return super().__new__(cls)
+ def __init__(self, polygons, check_validity=False):
+ self.geoms = polygons
+ # Check that polygons does not overlap
+ # - This function is quite slow
+ if check_validity:
+ if not self.is_valid_geometry:
+ raise ValueError("The SPolygon(s) composing SMultiPolygon must not overlap.")
+ @property
+ def is_valid_geometry(self):
+ """Check that there are not self intersections."""
+ # First check for non self-intersection in each single SPolygon
+ if not np.all([p.is_valid_geometry for p in self.geoms]):
+ return False
+ # Then, check the SPolygons does not overlap
+ out = self.union(self)
+ if len(out) == len(self):
+ return True
+ else:
+ False
+ def __getitem__(self, i):
+ """Get a specific SPolygon composing SMultiPolygon."""
+ return self.geoms[i]
+ def __len__(self):
+ """Get the number of SPolygon composing SMultiPolygon."""
+ return len(self.geoms)
+ def __iter__(self):
+ """Get an iterator returning the SPolygons composing SMultiPolygon."""
+ return self.geoms.__iter__()
+ def _area(self):
+ """Compute the area of the polygon in units of square radii."""
+ return np.sum([p._area() for p in self.geoms])
+ def area(self):
+ """Compute the polygon area [in km²] using pyproj."""
+ return np.sum([p.area() for p in self.geoms])
+ def union(self, other):
+ """Return the union of this and `other` polygon."""
+ list_polygons = self.geoms + other.geoms
+ union_p = list_polygons[0]
+ for p in list_polygons[1:]:
+ union_p = p.union(union_p) # ensure call to SPolygon.union
+ return union_p
+ def intersection(self, other):
+ """Return the intersection of this and `other` polygon."""
+ p_inter = SMultiPolygon([p.intersection(other) for p in self.geoms])
+ # Tentative union of intersections if a MultiPolygon
+ if isinstance(p_inter, SMultiPolygon):
+ p_inter = p_inter.geoms[0].union(p_inter.geoms[1:])
+ return p_inter
+ def overlap_rate(self, other):
+ """Get how much the current polygon overlaps the *other* polygon."""
+ intersect_polys = ([p.intersection(other) for p in self.geoms])
+ list_polygons = [p for p in intersect_polys if (p is not None)]
+ if len(list_polygons) == 0:
+ return 0
+ intersect_area = np.sum([p.area() for p in list_polygons])
+ return intersect_area / other.area()
+ def intersects(self, other):
+ """Return True if intersect, within, contains, or equals other."""
+ return np.any([p.intersects(other) for p in self.geoms])
+ def disjoint(self, other):
+ """Return True if the two polygons does not intersect."""
+ return np.all([p.disjoint(other) for p in self.geoms])
+ def within(self, other):
+ """Check if the polygon is entirely inside the other."""
+ return np.all([p.within(other) for p in self.geoms])
+ def contains(self, other):
+ """Check if the polygon contains entirely the other polygon."""
+ return np.any([p.contains(other) for p in self.geoms])
+ def equals_exact(self, other, rtol=1e-05, atol=1e-08):
+ """Check if SMultiPolygon is exactly equal to the other SMultiPolygon.
+ This method uses exact geometry and coordinate equality,
+ which requires geometries and coordinates to be equal
+ (within specified tolerance) and in the same order.
+ This is in contrast with the ``equals`` function which checks for
+ spatial (topological) equality.
+ """
+ if not isinstance(other, (SPolygon, SMultiPolygon, type(None))):
+ raise NotImplementedError
+ if isinstance(other, SPolygon) or isinstance(other, type(None)):
+ return False
+ if len(self.geoms) != len(other.geoms):
+ return False
+ return np.all([s.equals_exact(o, rtol=rtol, atol=atol) for s, o in zip(self.geoms, other.geoms)])
+ def equals(self, other):
+ """Test spatial topological equality between two SMultiPolygon.
+ If A is within B and B is within A, A and B are considered equal.
+ The geometries within SMultiPolygon must not be necessary in the same order.
+ """
+ if not isinstance(other, (SPolygon, SMultiPolygon, type(None))):
+ raise NotImplementedError
+ if isinstance(other, SPolygon) or isinstance(other, type(None)):
+ return False
+ if len(self.geoms) != len(other.geoms):
+ return False
+ if self.equals_exact(other):
+ return True
+ if (np.all([np.any([o.equals(s) for s in self.geoms]) for i, o in enumerate(other.geoms)]) and
+ np.all([np.any([o.equals(s) for o in other.geoms]) for i, s in enumerate(self.geoms)])):
+ return True
+ else:
+ return False
+ def intersection_points(self, other):
+ """Get intersection points between 2 polygons."""
+ list_spoints = [p.intersection_points(other) for p in self.geoms]
+ list_spoints = [item for sublist in list_spoints if sublist is not None for item in sublist]
+ return list_spoints
+ def to_shapely(self):
+ """Convert to Shapely MultiPolygon."""
+ return MultiPolygon([p.to_shapely() for p in self.geoms])
+ def _repr_svg_(self):
+ """Display SPolygons in the Ipython terminal."""
+ return self.to_shapely()._repr_svg_()
+ def plot(self, ax=None, facecolor=None, edgecolor='black', alpha=0.4, **kwargs):
+ """Plot the SPolygon using Cartopy."""
+ import cartopy.crs as ccrs
+ import matplotlib.pyplot as plt
+ # Retrieve shapely polygon
+ bbox_geom = self.to_shapely()
+ # Create figure if ax not provided
+ ax_not_provided = False
+ if ax is None:
+ ax_not_provided = True
+ proj_crs = ccrs.PlateCarree()
+ fig, ax = plt.subplots(subplot_kw=dict(projection=proj_crs))
+ # Add swath polygon
+ ax.add_geometries([bbox_geom],
+ crs=ccrs.Geodetic(),
+ facecolor=facecolor,
+ edgecolor=edgecolor,
+ alpha=alpha,
+ **kwargs)
+ # Beautify plot by default
+ if ax_not_provided:
+ ax.stock_img()
+ ax.coastlines()
+ gl = ax.gridlines(draw_labels=True, linestyle='--')
+ gl.xlabels_top = False
+ gl.ylabels_right = False
+ return ax
+# ----------------------------------------------------------------------------.
+# Backward Compatibilites
+class SCoordinate(SPoint):
+ """Introduce SCoordinate for backcompatibility reasons."""
+ def __init_subclass__(self):
+ """Get a dummy docstring for flake8 hook isort sake."""
+ warnings.warn("SCoordinate is deprecated. Use SPoint instead.",
+ PendingDeprecationWarning, 2)
+class Arc(SArc):
+ """Introduce Arc for backcompatibility reasons."""
+ def __init_subclass__(self):
+ """Get a dummy docstring for flake8 hook isort sake."""
+ warnings.warn("Arc is deprecated. Use SArc instead.",
+ PendingDeprecationWarning, 2)
+class SphPolygon(SPolygon): # TODO: radius can not be passed anymore
+ """Introduce SphPolygon for backcompatibility reasons."""
+ def __init_subclass__(self):
+ """Get a dummy docstring for flake8 hook isort sake."""
+ warnings.warn("SphPolygon is deprecated. Use SPolygon instead.",
+ PendingDeprecationWarning, 2)
+ def area(self):
+ """Redefine area for backcompatibility reasons."""
+ return SPolygon._area(self)
diff --git a/pyresample/utils/shapely.py b/pyresample/utils/shapely.py
new file mode 100644
index 000000000..38902f08c
--- /dev/null
+++ b/pyresample/utils/shapely.py
@@ -0,0 +1,601 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2022-2022 Pyresample developers
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see .
+"""Tools to deal with geometry operations."""
+import math
+import numpy as np
+import shapely
+import shapely.ops
+from shapely.geometry import MultiPoint # Point
+from shapely.geometry import (
+ GeometryCollection,
+ LineString,
+ MultiLineString,
+ MultiPolygon,
+ Polygon,
+# ---------------------------------------------------------------------------.
+# Vertices Processing
+def _close_vertices(vertices):
+ """Ensure last vertex is equal to first."""
+ if not np.allclose(vertices[0, :], vertices[-1, :]):
+ return np.vstack((vertices, vertices[0, :]))
+ else:
+ return vertices
+def _unclose_vertices(vertices):
+ """Ensure last vertex is not equal to first."""
+ if np.allclose(vertices[0, :], vertices[-1, :]):
+ return vertices[:-1, :]
+ else:
+ return vertices
+def remove_duplicated_vertices(arr, tol=1e-08):
+ """Remove sequential duplicated (lon, lat) coordinates."""
+ x = arr[:, 0]
+ y = arr[:, 1]
+ x_idx = np.where(np.abs(np.ediff1d(x)) <= tol)[0] + 1
+ y_idx = np.where(np.abs(np.ediff1d(y)) <= tol)[0] + 1
+ duplicate_idx = np.intersect1d(x_idx, y_idx)
+ if len(duplicate_idx) > 0:
+ print("There are duplicated vertices... removing it.") # TODO remove
+ arr = np.delete(arr, duplicate_idx, 0)
+ if len(arr) == 0:
+ raise ValueError("All duplicated values.")
+ return arr
+# ---------------------------------------------------------------------------.
+# Antimeridian Processing
+def get_idx_antimeridian_crossing(vertices, radians=False):
+ """Return the indices at which the boundary cross the antimeridian.
+ Assumption:
+ - Any two consecutive points with over 180 degrees difference in
+ longitude are assumed to cross the antimeridian.
+ """
+ if radians:
+ thr = np.pi
+ else:
+ thr = 180
+ idx_crossing = np.where(np.abs(np.diff(vertices[:, 0])) > thr)[0]
+ return idx_crossing
+def get_number_antimeridian_crossing(vertices, radians=False):
+ """Return the number of times the boundary cross the antimeridian.
+ Assumption:
+ - Any two consecutive points with over 180 degrees difference in
+ longitude are assumed to cross the antimeridian.
+ """
+ idx_crossing = get_idx_antimeridian_crossing(vertices, radians=radians)
+ return len(idx_crossing)
+def is_antimeridian_crossing(vertices, radians=False):
+ """Check if antimeridian crossing.
+ Assumption:
+ - Any two consecutive points with over 180 degrees difference in
+ longitude are assumed to cross the antimeridian.
+ """
+ n_crossing = get_number_antimeridian_crossing(vertices, radians=radians)
+ if n_crossing == 0:
+ return False
+ else:
+ return True
+def _get_antimeridian_crossing_point(coord_start, coord_end):
+ """Compute the point where the arc cross the antimeridian.
+ It expects coordinates in degree !
+ """
+ from pyresample.spherical import SArc, SPoint
+ # Retrieve start and end lon/lat coordinates of the arc crossing the antimeridian
+ lon_start = coord_start[0]
+ lat_start = coord_start[1]
+ lon_end = coord_end[0]
+ lat_end = coord_end[1]
+ # Define spherical arcs
+ antimeridian_arc = SArc(SPoint(np.deg2rad(180), np.deg2rad(-90)),
+ SPoint(np.deg2rad(180), np.deg2rad(90))
+ )
+ crossing_arc = SArc(SPoint(np.deg2rad(lon_start), np.deg2rad(lat_start)),
+ SPoint(np.deg2rad(lon_end), np.deg2rad(lat_end))
+ )
+ # Retrieve crossing point
+ crossing_point = crossing_arc.intersection(antimeridian_arc)
+ clat = np.rad2deg(crossing_point.lat)
+ # Identify direction
+ # -1 --> toward East --> 180
+ # 1 --> toward West --> -180
+ direction = math.copysign(1, lon_end - lon_start)
+ if direction == -1:
+ clon = 180
+ else:
+ clon = -180
+ # Crossing point at the antimeridian
+ split_coord = [clon, clat]
+ # Return
+ return split_coord
+def get_antimeridian_safe_line_vertices(vertices):
+ """Split lines at the antimeridian.
+ It return a list of line vertices not crossing the antimeridian.
+ It expects vertices in degree !
+ """
+ # Retrieve line vertices when crossing the antimeridian
+ idx_crossing = np.where(np.abs(np.diff(vertices[:, 0])) > 180)[0]
+ # If no crossing, return vertices
+ n_crossing = len(idx_crossing)
+ if n_crossing == 0:
+ return [vertices]
+ # Split line at anti-meridians
+ previous_idx = 0
+ previous_split_coord = None
+ list_vertices = []
+ for i in range(0, n_crossing):
+ # - Retrieve coordinates around anti-meridian crossing
+ tmp_idx_crossing = idx_crossing[i]
+ coord1 = vertices[tmp_idx_crossing]
+ coord2 = vertices[tmp_idx_crossing + 1]
+ # - Retrieve anti-meridian split coordinates
+ split_coord = _get_antimeridian_crossing_point(coord1, coord2)
+ # - Retrieve line vertices
+ new_coords = vertices[previous_idx:tmp_idx_crossing + 1, :]
+ if previous_split_coord is None:
+ new_vertices = np.vstack((new_coords, split_coord))
+ else:
+ new_vertices = np.vstack((previous_split_coord, new_coords, split_coord))
+ # - Update previous idx
+ previous_idx = tmp_idx_crossing + 1
+ # - Update previous split coords
+ previous_split_coord = split_coord
+ previous_split_coord[0] = -180 if split_coord[0] == 180 else 180
+ # - Append polygon vertices to the list
+ list_vertices.append(new_vertices)
+ # Add last vertices
+ new_coords = vertices[previous_idx:, :]
+ new_vertices = np.vstack((previous_split_coord, new_coords))
+ list_vertices.append(new_vertices)
+ # Return list of vertices
+ return list_vertices
+def get_antimeridian_safe_polygon_vertices(vertices):
+ """Split polygons at the antimeridian.
+ It return a list of polygon vertices not crossing the antimeridian.
+ Each vertices array is unwrapped (last vertex is not equal to first.)
+ If the polygon enclose a pole, the processing can fail or return wrong output
+ without warnings. !!!
+ It also does not account for holes in the polygons !
+ It expects vertices in degree !
+ """
+ from spherical1 import unwrap_longitude_degree
+ # Wrap vertices
+ vertices = _close_vertices(vertices)
+ # Ensure 180 longitude is converted to -180
+ # - So that if longitude are [-180 180, -180 180, ...] are considered equal !
+ vertices[:, 0] = unwrap_longitude_degree(vertices[:, 0])
+ # Retrieve line vertices when crossing the antimeridian
+ idx_crossing = np.where(np.abs(np.diff(vertices[:, 0])) > 180)[0]
+ # If no crossing, return vertices
+ n_crossing = len(idx_crossing)
+ if n_crossing == 0:
+ return [_unclose_vertices(vertices)]
+ if n_crossing == 1:
+ # print("Can not deal with polygons enclosing the poles yet")
+ # raise NotImplementedError # TODO
+ return [_unclose_vertices(vertices)]
+ # Check that there are an even number of antimeridian crossing points
+ if (n_crossing % 2) != 0:
+ raise ValueError("Expecting a even number of antimeridian crossing point of polygon vertices.")
+ # Split polygons at anti-meridians
+ previous_idx = 0
+ previous_idx_rev = -1
+ previous_split_coord = None
+ previous_split_coord_rev = None
+ list_vertices = []
+ for i in range(0, int(n_crossing / 2) + 1):
+ # - Define index for reverse coordinate
+ j = -1 - i
+ # - Retrieve coordinates around anti-meridian crossing
+ tmp_idx_crossing = idx_crossing[i]
+ tmp_idx_crossing_rev = idx_crossing[j]
+ coord1 = vertices[tmp_idx_crossing]
+ coord2 = vertices[tmp_idx_crossing + 1]
+ coord1_rev = vertices[tmp_idx_crossing_rev + 1]
+ coord2_rev = vertices[tmp_idx_crossing_rev]
+ # - Retrieve anti-meridian split coordinates
+ split_coord = _get_antimeridian_crossing_point(coord1, coord2)
+ split_coord_rev = _get_antimeridian_crossing_point(coord1_rev, coord2_rev)
+ # - Retrieve polygon vertices
+ new_coords = vertices[previous_idx:tmp_idx_crossing + 1, :]
+ new_coords_rev = vertices[tmp_idx_crossing_rev + 1:previous_idx_rev, :]
+ if i != int(n_crossing / 2):
+ if previous_split_coord is None:
+ new_vertices = np.vstack((new_coords, split_coord, split_coord_rev, new_coords_rev))
+ else:
+ new_vertices = np.vstack((previous_split_coord, new_coords, split_coord,
+ split_coord_rev, new_coords_rev, previous_split_coord_rev))
+ else:
+ new_vertices = np.vstack((previous_split_coord, new_coords, split_coord))
+ # - Update previous idx
+ previous_idx = tmp_idx_crossing + 1
+ previous_idx_rev = tmp_idx_crossing_rev
+ # - Update previous split coords
+ previous_split_coord = split_coord
+ previous_split_coord_rev = split_coord_rev
+ previous_split_coord[0] = -180 if split_coord[0] == 180 else 180
+ previous_split_coord_rev[0] = -180 if split_coord_rev[0] == 180 else 180
+ # - Append polygon vertices to the list
+ list_vertices.append(new_vertices)
+ # Return list of (unwrapped) vertices
+ list_vertices = [_unclose_vertices(vertices) for vertices in list_vertices]
+ return list_vertices
+# --------------------------------------------------------------------------.
+# Extent tools
+def bounds_from_extent(extent):
+ """Get shapely bounds from a matplotlib/cartopy extent.
+ # Shapely bounds
+ bounds = [min_x, min_y, max_x, max_y]
+ # Matplotlib extent
+ extent = [min_x, max_x, min_y, max_y]
+ """
+ bounds = [extent[0], extent[2], extent[1], extent[3]]
+ return bounds
+def extent_from_bounds(bounds, x_margin=None, y_margin=None):
+ """Get matplotlib/cartopy extent from shapely bounds.
+ x_margin and ymargin enable to extend the extent by custom degrees.
+ # Shapely bounds
+ bounds = [min_x, min_y, max_x, max_y]
+ # Matplotlib extent
+ extent = [min_x, max_x, min_y, max_y]
+ """
+ extent = [bounds[0], bounds[2], bounds[1], bounds[3]]
+ if (x_margin is not None) or (y_margin is not None):
+ extent = extend_extent(extent, x_margin=x_margin, y_margin=y_margin)
+ return extent
+def extend_extent(extent, x_margin=0.1, y_margin=0.1):
+ """Extend an extent on x and y sides by x/y degree.
+ Extent is defined as: [x_min, x_max, y_min, y_max]
+ x and y are clipped at [-180, 180] and [-90,90]
+ """
+ if x_margin is None:
+ x_margin = 0
+ if y_margin is None:
+ y_margin = 0
+ extent[0] = extent[0] - x_margin
+ extent[1] = extent[1] + x_margin
+ extent[2] = extent[2] - y_margin
+ extent[3] = extent[3] + y_margin
+ if extent[0] < -180:
+ extent[0] = -180
+ if extent[1] > 180:
+ extent[1] = 180
+ if extent[2] < -90:
+ extent[2] = -90
+ if extent[3] > 90:
+ extent[3] = 90
+ return extent
+def _get_extent_from_vertices(vertices):
+ """Compute extent using min/max on lon and lat columns."""
+ extent = [np.min(vertices[:, 0]),
+ np.max(vertices[:, 0]),
+ np.min(vertices[:, 1]),
+ np.max(vertices[:, 1])]
+ return extent
+def get_indices_with_side_duplicates(x):
+ """Return indices with duplicate values on both sides."""
+ indices = [] # [0, len(y)-1]
+ for i in range(1, len(x) - 1):
+ if x[i - 1] == x[i] and x[i] == x[i + 1]:
+ indices.append(i)
+ return indices
+def simplify_rectangle_vertices(vertices):
+ """Simplify rectangle vertices to corners vertices."""
+ vertices = _unclose_vertices(vertices)
+ # Remove duplicated vertices along lats (>3 sequential duplicates)
+ while vertices[0, 1] == vertices[-1, 1]:
+ vertices = np.roll(vertices, 1, axis=0)
+ idx_to_remove = get_indices_with_side_duplicates(vertices[:, 1])
+ vertices = np.delete(vertices, idx_to_remove, axis=0)
+ # Remove duplicated vertices along lons (>3 sequential duplicates)
+ while vertices[0, 0] == vertices[-1, 0]:
+ vertices = np.roll(vertices, 1, axis=0)
+ idx_to_remove = get_indices_with_side_duplicates(vertices[:, 0])
+ vertices = np.delete(vertices, idx_to_remove, axis=0)
+ return vertices
+def get_rectangle_splitter_line(start, end):
+ """Create a LineString to split rectilinear polygons."""
+ if start[0] == end[0]:
+ line = LineString([(start[0], -90), (start[0], 90)])
+ else:
+ line = LineString([(-180, start[1]), (180, start[1])])
+ return line
+def decompose_rectilinear_polygons_into_rectangles(polygon):
+ """Decompose rectilinear polygons into many rectangles."""
+ polygon = shapely.ops.unary_union(polygon)
+ if isinstance(polygon, MultiPolygon):
+ list_polygons = list(polygon.geoms)
+ else:
+ list_polygons = [polygon]
+ # Initialize lists
+ list_extent_polygons = []
+ list_to_split = []
+ # Identify polygons with > 4 vertices to be splitted
+ for p in list_polygons:
+ vertices = np.array(p.exterior.coords)
+ vertices = simplify_rectangle_vertices(vertices)
+ # If only -180 and 180 longitudes, infer rectangle vertices from extent
+ if np.all(np.isin(vertices[:, 0], [-180, 180])):
+ extent = _get_extent_from_vertices(vertices)
+ p = Polygon.from_bounds(*bounds_from_extent(extent))
+ else:
+ p = Polygon(vertices)
+ if len(p.exterior.coords) == 5:
+ list_extent_polygons.append(p)
+ else:
+ list_to_split.append(p)
+ # Initialize coords index
+ i = 0
+ while len(list_to_split) > 0:
+ # Get a polygon with more than 4 vertices
+ polygon_to_split = list_to_split[0]
+ list_coords = list(polygon_to_split.exterior.coords)
+ start, end = list_coords[i:i + 2]
+ splitter_line = get_rectangle_splitter_line(start, end)
+ splitted_geom = shapely.ops.split(polygon_to_split, splitter_line)
+ if len(splitted_geom.geoms) == 1:
+ splitted_geom = Polygon(splitted_geom.geoms[0])
+ else:
+ splitted_geom = MultiPolygon(splitted_geom)
+ # If some splitting occur, update list_to_split
+ if not splitted_geom.equals_exact(polygon_to_split, tolerance=1e-8):
+ del list_to_split[0]
+ i = 0
+ if isinstance(splitted_geom, MultiPolygon) or isinstance(splitted_geom, GeometryCollection):
+ for geom in splitted_geom.geoms:
+ vertices = np.array(geom.exterior.coords)
+ vertices = simplify_rectangle_vertices(vertices)
+ geom = Polygon(vertices)
+ if len(geom.exterior.coords) == 5:
+ list_extent_polygons.append(geom)
+ else:
+ list_to_split.append(geom)
+ elif isinstance(splitted_geom, Polygon):
+ if len(splitted_geom.exterior.coords) == 5:
+ list_extent_polygons.append(splitted_geom)
+ raise ValueError("This should not happen.")
+ else:
+ raise ValueError("This should not happen.")
+ else:
+ i += 1
+ return MultiPolygon(list_extent_polygons)
+def get_non_overlapping_list_extents(list_extent):
+ """Given a list of extents, return a list of non-overlapping extents."""
+ p = MultiPolygon([Polygon.from_bounds(*bounds_from_extent(ext)) for ext in list_extent])
+ p = shapely.ops.unary_union(p)
+ p = decompose_rectilinear_polygons_into_rectangles(p)
+ if isinstance(p, MultiPolygon):
+ list_extent = [extent_from_bounds(geom.bounds) for geom in p.geoms]
+ else: # Polygon
+ list_extent = [extent_from_bounds(p.bounds)]
+ return list_extent
+def _check_valid_extent(extent, use_radians=False):
+ """Check lat/lon extent validity."""
+ if len(extent) != 4:
+ raise ValueError("'extent' must have length 4: [lon_min, lon_max, lat_min, lat_max].")
+ if not isinstance(extent, (tuple, list, np.ndarray)):
+ raise TypeError("'extent' must be a list, tuple or np.array. [lon_min, lon_max, lat_min, lat_max].")
+ extent = np.array(extent)
+ # Check extent order validity
+ if extent[0] > extent[1]:
+ raise ValueError('extent[0] (aka lon_min) must be smaller than extent[1] (aka lon_max).')
+ if extent[2] > extent[3]:
+ raise ValueError('extent[2] (aka lat_min) must be smaller than extent[2] (aka lat_max).')
+ # Check min max values
+ if use_radians:
+ if extent[0] < -np.pi:
+ raise ValueError('extent[0] (aka lon_min) must be equal or larger than -π.')
+ if extent[1] > np.pi:
+ raise ValueError('extent[1] (aka lon_max) must be equal or smaller than π.')
+ if extent[2] < -np.pi / 2:
+ raise ValueError('extent[2] (aka lat_min) must be equal or larger than -π/2.')
+ if extent[3] < -np.pi / 2:
+ raise ValueError('extent[3] (aka lat_max) must be equal or larger than π/2.')
+ else:
+ if extent[0] < -180:
+ raise ValueError('extent[0] (aka lon_min) must be equal or larger than -180.')
+ if extent[1] > 180:
+ raise ValueError('extent[1] (aka lon_max) must be equal or smaller than 180.')
+ if extent[2] < -90:
+ raise ValueError('extent[2] (aka lat_min) must be equal or larger than -90.')
+ if extent[3] < -180:
+ raise ValueError('extent[3] (aka lat_max) must be equal or larger than 90.')
+ return extent.tolist()
+# --------------------------------------------------------------------------.
+# Shapely utils à la POSTGIS / sf style
+def st_add_x_offset(geom, offset):
+ """Add an offset to x coordinates."""
+ if isinstance(geom, (MultiPolygon, MultiLineString, MultiPoint)):
+ return type(geom)([shapely.affinity.translate(geom[i], xoff=offset) for i in range(len(geom.geoms))])
+ else:
+ return shapely.affinity.translate(geom, xoff=offset)
+def st_add_y_offset(geom, offset):
+ """Add an offset on y coordinates."""
+ if isinstance(geom, (MultiPolygon, MultiLineString, MultiPoint)):
+ return type(geom)([shapely.affinity.translate(geom[i], yoff=offset) for i in range(len(geom.geoms))])
+ else:
+ return shapely.affinity.translate(geom, yoff=offset)
+def st_polygon_clockwise(geom):
+ """Ensure shapely Polygon or MultiPolygon to be clockwise oriented."""
+ # Check geometry type
+ if not isinstance(geom, (Polygon, MultiPolygon)):
+ raise TypeError("Expects Polygon or MultiPolygon.")
+ if isinstance(geom, MultiPolygon):
+ return MultiPolygon([shapely.geometry.polygon.orient(geom[i], -1) for i in range(len(geom.geoms))])
+ else:
+ return shapely.geometry.polygon.orient(geom, -1)
+def st_polygon_counterclockwise(geom):
+ """Ensure shapely Polygon or MultiPolygon to be counterclockwise oriented."""
+ # Check geometry type
+ if not isinstance(geom, (Polygon, MultiPolygon)):
+ raise TypeError("Expects Polygon or MultiPolygon.")
+ if isinstance(geom, MultiPolygon):
+ return MultiPolygon([shapely.geometry.polygon.orient(geom[i], 1) for i in range(len(geom.geoms))])
+ else:
+ return shapely.geometry.polygon.orient(geom, 1)
+def st_polygon_antimeridian_safe(geom):
+ """Sanitize shapely polygons crossing the antimeridian.
+ Given a Shapely Polygon or MultiPolygon representation of a polygon,
+ returns a MultiPolygon of 'antimeridian-safe' constituent polygons splitted at the anti-meridian.
+ The returned MultiPolygon ensure compliance with GeoJSON standards
+ GeoJSON standards: https://tools.ietf.org/html/rfc7946#section-3.1.9
+ Assumptions:
+ - Any two consecutive points with over 180 degrees difference in
+ longitude are assumed to cross the antimeridian.
+ - The polygon can wrap multiple time across the globe and cross the antimeridian on multiple occasions.
+ - If the polygon enclose a pole, the processing can fail or return wrong output
+ without warnings. !!!
+ - Does not account for holes in the polygons !
+ Returns:
+ MultiPolygon: antimeridian-safe polygon(s)
+ """
+ # Check geometry type
+ if not isinstance(geom, (shapely.geometry.Polygon, shapely.geometry.MultiPolygon)):
+ raise TypeError("Expects Polygon or MultiPolygon.")
+ # Get list of vertices
+ if isinstance(geom, shapely.geometry.Polygon):
+ list_vertices = [np.array(geom.exterior.coords)]
+ else:
+ list_vertices = [np.array(geom.geoms[i].exterior.coords) for i in range(len(geom.geoms))]
+ # Split lines at the antimeridian
+ list_vertices = [get_antimeridian_safe_polygon_vertices(vertices) for vertices in list_vertices]
+ # Flat the list
+ list_vertices = [item for sublist in list_vertices for item in sublist]
+ # Return a MultiLineString object
+ return MultiPolygon([Polygon(vertices) for vertices in list_vertices])
+def st_line_antimeridian_safe(geom):
+ """Sanitize shapely lines crossing the antimeridian.
+ Given a Shapely LineString or MultiLineString representation of a Line,
+ returns a MultiLineString of 'antimeridian-safe' constituent lines splitted at the anti-meridian.
+ The returned MultiString ensure compliance with GeoJSON standards
+ GeoJSON standards: https://tools.ietf.org/html/rfc7946#section-3.1.9
+ Assumptions:
+ - Any two consecutive points with over 180 degrees difference in
+ longitude are assumed to cross the antimeridian.
+ - The line can wrap multiple time across the globe and cross the antimeridian on multiple occasions.
+ Returns:
+ MultiLineString: antimeridian-safe line(s)
+ """
+ # Check geometry type
+ if not isinstance(geom, (shapely.geometry.LineString, shapely.geometry.MultiLineString)):
+ raise TypeError("Expects LineString or MultiLineString.")
+ # Get list of vertices
+ if isinstance(geom, shapely.geometry.LineString):
+ list_vertices = [np.array(geom.coords)]
+ else:
+ list_vertices = [np.array(geom.geoms[i].coords) for i in range(len(geom.geoms))]
+ # Split lines at the antimeridian
+ list_vertices = [get_antimeridian_safe_line_vertices(vertices) for vertices in list_vertices]
+ # Flat the list
+ list_vertices = [item for sublist in list_vertices for item in sublist]
+ # Return a MultiLineString object
+ return MultiLineString(list_vertices)
+# --------------------------------------------------------------------------.