Skip to content

Commit

Permalink
Merge pull request #1 from eyazrgeotab/client-certificate
Browse files Browse the repository at this point in the history
Client certificate support and example
  • Loading branch information
aaront authored Nov 12, 2021
2 parents 5fa0a7c + afde445 commit 42e68d3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 9 deletions.
18 changes: 16 additions & 2 deletions mygeotab/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
server="my.geotab.com",
timeout=DEFAULT_TIMEOUT,
proxies=None,
cert=None
):
"""Initialize the MyGeotab API object with credentials.
Expand All @@ -56,6 +57,8 @@ def __init__(
:type timeout: float or None
:param proxies: The proxies dictionary to apply to the request.
:type proxies: dict or None
:param cert: The path to client certificate. A single path to .pem file or a Tuple (.cer file, .key file).
:type cert: str or Tuple or None
:raise Exception: Raises an Exception if a username, or one of the session_id or password is not provided.
"""
if username is None:
Expand All @@ -68,6 +71,7 @@ def __init__(
self.timeout = timeout
self._proxies = proxies
self.__reauthorize_count = 0
self._cert = cert

@property
def _server(self):
Expand Down Expand Up @@ -105,7 +109,12 @@ def call(self, method, **parameters):

try:
result = _query(
self._server, method, params, self.timeout, verify_ssl=self._is_verify_ssl, proxies=self._proxies
self._server,
method, params,
self.timeout,
verify_ssl=self._is_verify_ssl,
proxies=self._proxies,
cert=self._cert
)
if result is not None:
self.__reauthorize_count = 0
Expand Down Expand Up @@ -219,6 +228,7 @@ def authenticate(self):
self.timeout,
verify_ssl=self._is_verify_ssl,
proxies=self._proxies,
cert=self._cert
)
if result:
if "path" not in result and self.credentials.session_id:
Expand Down Expand Up @@ -307,7 +317,7 @@ def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
)


def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl=True, proxies=None):
def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl=True, proxies=None, cert=None):
"""Formats and performs the query against the API.
:param server: The MyGeotab server.
Expand All @@ -322,6 +332,8 @@ def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl=True,
:type verify_ssl: bool
:param proxies: The proxies dictionary to apply to the request.
:type proxies: dict or None
:param cert: The path to client certificate. A single path to .pem file or a Tuple (.cer file, .pem file)
:type cert: str or Tuple or None
:raise MyGeotabException: Raises when an exception occurs on the MyGeotab server.
:raise TimeoutException: Raises when the request does not respond after some time.
:raise urllib2.HTTPError: Raises when there is an HTTP status code that indicates failure.
Expand All @@ -332,6 +344,8 @@ def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl=True,
headers = get_headers()
with requests.Session() as session:
session.mount("https://", GeotabHTTPAdapter())
if cert:
session.cert = cert
try:
response = session.post(
api_endpoint,
Expand Down
22 changes: 18 additions & 4 deletions mygeotab/py3/api_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
server="my.geotab.com",
timeout=DEFAULT_TIMEOUT,
proxies=None,
cert=None
):
"""
Initialize the asynchronous MyGeotab API object with credentials.
Expand All @@ -46,9 +47,10 @@ def __init__(
:param server: The server ie. my23.geotab.com. Optional as this usually gets resolved upon authentication.
:param timeout: The timeout to make the call, in seconds. By default, this is 300 seconds (or 5 minutes).
:param proxies: The proxies dictionary to apply to the request.
:param cert: The path to client certificate. A single path to .pem file or a Tuple (.cer file, .pem file)
:raise Exception: Raises an Exception if a username, or one of the session_id or password is not provided.
"""
super().__init__(username, password, database, session_id, server, timeout, proxies=proxies)
super().__init__(username, password, database, session_id, server, timeout, proxies=proxies, cert=cert)

async def call_async(self, method, **parameters):
"""Makes an async call to the API.
Expand All @@ -68,7 +70,7 @@ async def call_async(self, method, **parameters):
params["credentials"] = self.credentials.get_param()

try:
result = await _query(self._server, method, params, verify_ssl=self._is_verify_ssl)
result = await _query(self._server, method, params, verify_ssl=self._is_verify_ssl, cert=self._cert)
if result is not None:
self.__reauthorize_count = 0
return result
Expand Down Expand Up @@ -181,14 +183,15 @@ async def server_call_async(method, server, timeout=DEFAULT_TIMEOUT, verify_ssl=
return await _query(server, method, parameters, timeout=timeout, verify_ssl=verify_ssl)


async def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl=True):
async def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl=True, cert=None):
"""Formats and performs the asynchronous query against the API
:param server: The server to query.
:param method: The method name.
:param parameters: A dict of parameters to send
:param timeout: The timeout to make the call, in seconds. By default, this is 300 seconds (or 5 minutes).
:param verify_ssl: Whether or not to verify SSL connections
:param cert: The path to client certificate. A single path to .pem file or a Tuple (.cer file, .pem file)
:return: The JSON-decoded result from the server
:raise MyGeotabException: Raises when an exception occurs on the MyGeotab server
:raise TimeoutException: Raises when the request does not respond after some time.
Expand All @@ -197,7 +200,18 @@ async def _query(server, method, parameters, timeout=DEFAULT_TIMEOUT, verify_ssl
api_endpoint = api.get_api_url(server)
params = dict(id=-1, method=method, params=parameters)
headers = get_headers()
conn = aiohttp.TCPConnector(ssl=ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) if verify_ssl else False)

ssl_context = False
if verify_ssl or cert:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
if cert:
if isinstance(cert, str):
ssl_context.load_cert_chain(cert)
elif isinstance(cert, tuple):
cer, key = cert
ssl_context.load_cert_chain(cer, key)

conn = aiohttp.TCPConnector(ssl=ssl_context)
try:
async with aiohttp.ClientSession(connector=conn) as session:
response = await session.post(
Expand Down
9 changes: 7 additions & 2 deletions tests/test_api_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from mygeotab import API, server_call_async
from mygeotab.exceptions import MyGeotabException, TimeoutException
from tests.test_api_call import SERVER, USERNAME, PASSWORD, DATABASE, TRAILER_NAME
from tests.test_api_call import SERVER, USERNAME, PASSWORD, DATABASE, CER_FILE, KEY_FILE, PEM_FILE, TRAILER_NAME

ASYNC_TRAILER_NAME = "async {name}".format(name=TRAILER_NAME)

Expand All @@ -20,8 +20,13 @@

@pytest.fixture(scope="session")
def async_populated_api():
cert = None
if CER_FILE and KEY_FILE:
cert = (CER_FILE, KEY_FILE)
elif PEM_FILE:
cert = PEM_FILE
if USERNAME and PASSWORD:
session = API(USERNAME, password=PASSWORD, database=DATABASE, server=SERVER)
session = API(USERNAME, password=PASSWORD, database=DATABASE, server=SERVER, cert=cert)
try:
session.authenticate()
except MyGeotabException as exception:
Expand Down
10 changes: 9 additions & 1 deletion tests/test_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
PASSWORD = os.environ.get("MYGEOTAB_PASSWORD")
DATABASE = os.environ.get("MYGEOTAB_DATABASE")
SERVER = os.environ.get("MYGEOTAB_SERVER")
CER_FILE = os.environ.get("MYGEOTAB_CERTIFICATE_CER")
KEY_FILE = os.environ.get("MYGEOTAB_CERTIFICATE_KEY")
PEM_FILE = os.environ.get("MYGEOTAB_CERTIFICATE_PEM")
TRAILER_NAME = "mygeotab-python test trailer"

FAKE_USERNAME = "fakeusername"
Expand All @@ -21,8 +24,13 @@

@pytest.fixture(scope="session")
def populated_api():
cert = None
if CER_FILE and KEY_FILE:
cert = (CER_FILE, KEY_FILE)
elif PEM_FILE:
cert = PEM_FILE
if USERNAME and PASSWORD:
session = api.API(USERNAME, password=PASSWORD, database=DATABASE, server=SERVER)
session = api.API(USERNAME, password=PASSWORD, database=DATABASE, server=SERVER, cert=cert)
try:
session.authenticate()
except api.MyGeotabException as exception:
Expand Down

0 comments on commit 42e68d3

Please sign in to comment.