diff --git a/changes/437.changed b/changes/437.changed new file mode 100644 index 000000000..7704dce4e --- /dev/null +++ b/changes/437.changed @@ -0,0 +1 @@ +Changed the Infoblox `utils.client` to make API calls using `requests.Session` instead of `requests.request`. \ No newline at end of file diff --git a/nautobot_ssot/integrations/infoblox/utils/client.py b/nautobot_ssot/integrations/infoblox/utils/client.py index 2e43312f9..689055795 100644 --- a/nautobot_ssot/integrations/infoblox/utils/client.py +++ b/nautobot_ssot/integrations/infoblox/utils/client.py @@ -1,7 +1,6 @@ """All interactions with infoblox.""" # pylint: disable=too-many-lines from __future__ import annotations -import copy import json import ipaddress import logging @@ -10,6 +9,7 @@ from collections import defaultdict from typing import Optional import requests +from requests.auth import HTTPBasicAuth from requests.exceptions import HTTPError from requests.compat import urljoin from dns import reversename @@ -113,41 +113,60 @@ def __init__( raise InvalidUrlScheme(scheme=parsed_url.scheme) else: self.url = parsed_url.geturl() - self.username = username - self.password = password - self.verify_ssl = verify_ssl + self.auth = HTTPBasicAuth(username, password) self.wapi_version = wapi_version - self.cookie = cookie - if self.verify_ssl is False: + self.session = self._init_session(verify_ssl=verify_ssl, cookie=cookie) + + def _init_session(self, verify_ssl: bool, cookie: Optional[dict]) -> requests.Session: + """Initialize requests Session object that is used across all the API calls. + + Args: + verify_ssl (bool): whether to verify SSL cert for https calls + cookie (dict): optional dict with cookies to set on the Session object + + Returns: + initialized session object + """ + if verify_ssl is False: requests.packages.urllib3.disable_warnings( # pylint: disable=no-member requests.packages.urllib3.exceptions.InsecureRequestWarning # pylint: disable=no-member ) # pylint: disable=no-member self.headers = {"Content-Type": "application/json"} - self.extra_vars = {} + session = requests.Session() + if cookie and isinstance(cookie, dict): + session.cookies.update(cookie) + session.verify = verify_ssl + session.headers.update(self.headers) + session.auth = self.auth + + return session def _request(self, method, path, **kwargs): - """Return a response object after making a request to by other methods. + """Return a response object after making a request by a specified method. Args: - method (str): Request HTTP method to call with requests. + method (str): Request HTTP method to call with Session.request. path (str): URL path to call. Returns: :class:`~requests.Response`: Response from the API. """ - kwargs["verify"] = self.verify_ssl - kwargs["headers"] = self.headers api_path = f"/wapi/{self.wapi_version}/{path}" url = urljoin(self.url, api_path) - if self.cookie: - resp = requests.request( - method, url, cookies=self.cookie, timeout=PLUGIN_CFG["infoblox_request_timeout"], **kwargs - ) + if self.session.cookies.get("ibapauth"): + self.session.auth = None else: - kwargs["auth"] = requests.auth.HTTPBasicAuth(self.username, self.password) - resp = requests.request(method, url, timeout=PLUGIN_CFG["infoblox_request_timeout"], **kwargs) - self.cookie = copy.copy(resp.cookies.get_dict("ibapauth")) + self.session.auth = self.auth + + resp = self.session.request(method, url, timeout=PLUGIN_CFG["infoblox_request_timeout"], **kwargs) + # Infoblox provides meaningful error messages for error codes >= 400 + if resp.status_code >= 400: + try: + err_msg = resp.json() + except json.decoder.JSONDecodeError: + err_msg = resp.text + logger.error(err_msg) resp.raise_for_status() return resp