diff --git a/docs/catalist.rst b/docs/catalist.rst new file mode 100644 index 0000000000..1e1c1e3eb7 --- /dev/null +++ b/docs/catalist.rst @@ -0,0 +1,53 @@ +Catalist +======= + +******** +Overview +******** + +The CatalistMatch class allows you to interact with the Catalist M Tool (match) API. Users of this Parsons integration can use the Parsons table format to send input files to the M Tool and receive back a matched version of that table. + +.. note:: + Authentication + In order to use this class you must be provided with an OAuth Client ID and Client Secret from catalist, as well as SFTP credentials. You will also need to have Catalist whitelist the IP address you are using to access the M Tool. + +========== +Quickstart +========== + +To instantiate the CatalistMatch class, you must provide your ``client_id``, ``client_secret``, ``sftp_username`` and ``sftp_password`` values as arguments: + +.. code-block:: sh + # In bash, set your environment variables like so: + $ export CATALIST_CLIENT_ID='MY_UUID' + $ export CATALIST_CLIENT_SECRET='MY_SECRET' + $ export CATALIST_SFTP_USERNAME='MY_USERNAME' + $ export CATALIST_SFTP_PASSWORD='MY_PASSWORD' + +.. code-block:: python + + import os + from parsons import CatalistMatch + + match = CatalistMatch( + client_id=os.environ['CATALIST_CLIENT_ID'], + client_secret=os.environ['CATALIST_CLIENT_SECRET'], + sftp_username=os.environ['CATALIST_SFTP_USERNAME'], + sftp_password=os.environ['CATALIST_SFTP_PASSWORD'] + ) + +You can then load a CSV as a Parsons table and submit it for matching, then save the resulting matched Parsons table as a CSV. + +.. code-block:: python + + source_table = Table.from_csv(source_filepath) + result_table = match.match(source_table) + result_table.to_csv(result_filepath) + + +*** +API +*** + +.. autoclass :: parsons.CatalistMatch + :inherited-members: diff --git a/parsons/__init__.py b/parsons/__init__.py index b8793ec649..76393fa8ac 100644 --- a/parsons/__init__.py +++ b/parsons/__init__.py @@ -43,6 +43,7 @@ ("parsons.box.box", "Box"), ("parsons.braintree.braintree", "Braintree"), ("parsons.capitol_canary.capitol_canary", "CapitolCanary"), + ["parsons.catalist.catalist", "CatalistMatch"], ("parsons.civis.civisclient", "CivisClient"), ("parsons.controlshift.controlshift", "Controlshift"), ("parsons.copper.copper", "Copper"), diff --git a/parsons/catalist/__init__.py b/parsons/catalist/__init__.py new file mode 100644 index 0000000000..d849e988a3 --- /dev/null +++ b/parsons/catalist/__init__.py @@ -0,0 +1,3 @@ +from parsons.catalist.catalist import CatalistMatch + +__all__ = ["CatalistMatch"] diff --git a/parsons/catalist/catalist.py b/parsons/catalist/catalist.py new file mode 100644 index 0000000000..4779f70648 --- /dev/null +++ b/parsons/catalist/catalist.py @@ -0,0 +1,441 @@ +"""Utilities for working with the Catalist Match API + +Install dependencies with `pip install parsons[catalist]` +""" + +import base64 +import logging +import os +import tempfile +import time +import urllib +from typing import Optional, Union, Dict, List +from zipfile import ZipFile + +import requests +from parsons.etl import Table +from parsons.sftp import SFTP +from parsons.utilities.api_connector import APIConnector + +logger = logging.getLogger(__name__) + + +class CatalistMatch: + """Connector for working with the Catalist Match API. + + This API allows a trusted third party to submit new files for processing, and/or + reprocess existing files. It also allows retrieval of processing status. Initial + setup of template(s) via the M Tool UI will be required. + + The Catalist Match tool requires OAuth2.0 client credentials for the API as well as + credentials for accessing the Catalist sftp bucket. Each Catalist client is given + their own bucket alias named after a tree species, used for constructing the + filepath within the sftp bucket. + + Accessing the Catalist sftp bucket and Match API both require the source IP address + to be explicitly white-listed by Catalist. + + Example usage: + ``` + tbl = Table.from_csv(...) + client = CatalistMatch(...) + match_result = client.match(tbl) + ``` + + Note that matching can take from 10 minutes up to 6 hours or longer to complete, so + you may want to think strategically about how to await completion without straining + your compute resources on idling. + + To separate submitting the job and fetching the result: + ``` + tbl = Table.from_csv(...) + client = CatalistMatch(...) + response = client.upload(tbl) + match_result = client.await_completion(response["id"]) + ``` + + """ + + def __init__( + self, + client_id: str, + client_secret: str, + sftp_username: str, + sftp_password: str, + ) -> None: + self.client_id = client_id + self.client_secret = client_secret + self.fetch_token() + self.connection = APIConnector("http://api.catalist.us/mapi/") + self.sftp = SFTP("t.catalist.us", sftp_username, sftp_password) + + @property + def token(self) -> str: + """If token is not yet fetched or has expired, fetch new token.""" + if not (self._token and time.time() < self._token_expired_at): + self.fetch_token() + return self._token + + def fetch_token(self) -> None: + """Fetch auth0 token to be used with Catalist API.""" + url = "https://auth.catalist.us/oauth/token" + payload = { + "grant_type": "client_credentials", + "audience": "catalist_api_m_prod", + } + response = requests.post( + url, json=payload, auth=(self.client_id, self.client_secret) + ) + data = response.json() + + self._token = data["access_token"] + self._token_expired_at = time.time() + data["expires_in"] + + logger.info("Token refreshed.") + + def load_table_to_sftp( + self, table: Table, input_subfolder: Optional[str] = None + ) -> str: + """Load table to Catalist sftp bucket as gzipped CSV for matching. + + If input_subfolder is specific, the file will be uploaded to a subfolder of the + myUploads directory in the SFTP server. + + `Args:` + table: Table + Parsons Table for matching. "first_name" and "last_name" columns + are required. Optional columns for matching: last_name, name_suffix, + addr1, addr2, city, state, zip, phone, email, gender_tomatch, dob, + dob_year, matchbackid. + input_subfolder: str + Optional. If specified, the file will be uploaded to a subfolder of the + myUploads directory in the SFTP server. + """ + local_path = table.to_csv(temp_file_compression="gzip") + hashed_name = hash(time.time()) + remote_path_parts = ["myUploads", f"{hashed_name}.csv.gz"] + if input_subfolder: + if input_subfolder not in self.sftp.list_directory("/myUploads/"): + self.sftp.make_directory("/myUploads/" + input_subfolder) + remote_path_parts.insert(1, input_subfolder) + remote_path = "/".join(remote_path_parts) + + self.sftp.put_file(local_path, remote_path) + + # Loads to Catalist SFTP bucket are expcted in the client's uploads bucket + # So we don't need to explicitly include that part of the path + result = f"file://{remote_path.replace('myUploads/', '')}" + return result + + def match( + self, + table: Table, + export: bool = False, + description: Optional[str] = None, + export_filename_suffix: Optional[str] = None, + input_subfolder: Optional[str] = None, + copy_to_sandbox: bool = False, + static_values: Optional[Dict[str, Union[str, int]]] = None, + ) -> Table: + """Load table to the Catalist Match API, returns matched table. + + This method blocks until the match completes, which can take from 10 minutes to + 6 hours or more depending on concurrent traffic. + + `Args:` + table: Table + Parsons Table for matching. "first_name" and "last_name" columns + are required. Optional columns for matching: last_name, name_suffix, + addr1, addr2, city, state, zip, phone, email, gender_tomatch, dob, + dob_year, matchbackid. + export: bool + Defaults to False + description: str + Optional. Description for the match job. + export_filename_suffix: str + Optional. Adds a suffix to the result filename in the SFTP server. + input_subfolder: str + Optional. Adds a prefix to the filepath of the uploaded input file in + the SFTP server. + copy_to_sandbox: bool + Defaults to False. + static_values: dict + Optional. Any included values are mapped to every row of the input table. + """ + response = self.upload( + table, + export, + description, + export_filename_suffix, + input_subfolder, + copy_to_sandbox, + static_values, + ) + result = self.await_completion(response["id"]) + return result + + def upload( + self, + table: Table, + template_id: str = "48827", + export: bool = False, + description: Optional[str] = None, + export_filename_suffix: Optional[str] = None, + input_subfolder: Optional[str] = None, + copy_to_sandbox: bool = False, + static_values: Optional[Dict[str, Union[str, int]]] = None, + ) -> dict: + """Load table to the Catalist Match API, returns response with job metadata. + + `Args:` + table: Table + Parsons Table for matching. "first_name" and "last_name" columns + are required. Optional columns for matching: last_name, name_suffix, + addr1, addr2, city, state, zip, phone, email, gender_tomatch, dob, + dob_year, matchbackid. + template_id: str + Defaults to 48827, currently the only available template for working + with the Match API. + export: bool + Defaults to False + description: str + Optional. Description for the match job. + export_filename_suffix: str + Optional. Adds a suffix to the result filename in the SFTP server. + input_subfolder: str + Optional. Adds a prefix to the filepath of the uploaded input file in + the SFTP server. + copy_to_sandbox: bool + Defaults to False. + static_values: dict + Optional. Any included values are mapped to every row of the input table. + """ + + self.validate_table(table, template_id) + + # upload table to s3 temp location + sftp_file_path = self.load_table_to_sftp(table, input_subfolder) + sftp_file_path_encoded = base64.b64encode( + sftp_file_path.encode("ascii") + ).decode("ascii") + + if export: + action = "export%2Cpublish" + else: + action = "publish" + + # Create endpoint using options + endpoint_params = [ + "upload", + "template", + template_id, + "action", + action, + "url", + sftp_file_path_encoded, + ] + + if description: + endpoint_params.extend(["description", description]) + + endpoint = "/".join(endpoint_params) + + # Assemble query parameters + query_params: Dict[str, Union[str, int]] = {"token": self.token} + if copy_to_sandbox: + query_params["copyToSandbox"] = "true" + if static_values: + query_params.update(static_values) + if export_filename_suffix: + query_params["subClientName"] = export_filename_suffix + + logger.debug(f"Executing request to endpoint {self.connection.uri + endpoint}") + + endpoint = endpoint + "?" + urllib.parse.urlencode(query_params) + + response = self.connection.get_request(endpoint) + + result = response[0] + + return result + + def action( + self, + file_ids: Union[str, List[str]], + match: bool = False, + export: bool = False, + export_filename_suffix: Optional[str] = None, + copy_to_sandbox: bool = False, + ) -> List[dict]: + """Perform actions on existing files. + + All files must be in Finished status (if the action requested is publish), and + must mapped against the same template. The request will return as soon as the + action has been queued. + + `Args:` + file_ids: str or List[str] + one or more file_ids (found in the `id` key of responses from the + upload() or status() methods) + match: bool + Optional. Defaults to False. If True, will initiate matching. + export: bool + Optional. Defaults to False. If True, will initiate export. + export_filename_suffix: str + Optional. If included, adds a suffix to the filepath of the exported + file in the SFTP server. + copy_to_sandbox: bool + Defaults to False. + + """ + actions = ["publish"] + if match: + actions.append("match") + if export: + actions.append("export") + action = urllib.parse.quote(",".join(actions)) + + endpoint_params = ["action", action] + + if isinstance(file_ids, list): + encoded_files = urllib.parse.quote(",".join(file_ids)) + else: + encoded_files = file_ids + + endpoint_params.extend(["file", encoded_files]) + + endpoint = "/".join(endpoint_params) + + logger.debug(f"Executing request to endpoint {self.connection.uri + endpoint}") + + query_params = {"token": self.token} + if copy_to_sandbox: + query_params["copyToSandbox"] = "true" + if export_filename_suffix: + query_params["subClientName"] = export_filename_suffix + + endpoint = endpoint + "?" + urllib.parse.urlencode(query_params) + + result = self.connection.get_request(endpoint) + + return result + + def status(self, id: str) -> dict: + """Check status of a match job.""" + endpoint = "/".join(["status", "id", id]) + query_params = {"token": self.token} + result = self.connection.get_request(endpoint, params=query_params) + return result + + def await_completion(self, id: str, wait: int = 30) -> Table: + """Await completion of a match job. Return matches when ready. + + This method will poll the status of a match job on a timer until the job is + complete. By default, polls once every 30 seconds. + + Note that match job completion can take from 10 minutes up to 6 hours or more + depending on concurrent traffic. Consider your strategy for polling for + completion.""" + while True: + response = self.status(id) + status = response["process"]["processState"] + if status in ("Finished", "Error", "Stopped", "Exception"): + logger.info(f"Job {id} is complete with status {status}.") + break + + logger.info(f"Job {id} has status {status}, awaiting completion.") + time.sleep(wait) + + result = self.load_matches(id) + return result + + def load_matches(self, id: str) -> Table: + """Take a completed job ID, download and open the match file as a Table. + + Result will be a Table with all the original columns along with columns 'DWID', + 'CONFIDENCE', 'ZIP9', and 'STATE'. The original column headers will be prepended + with 'COL#-'.""" + # Validate that the job is complete + response = self.status(str(id)) + status = response["process"]["processState"] + + if status == "Finished": + logger.info(f"Validated that job {id} completed successfully.") + else: + err_msg = "Failed to successfully run match job. " + if status == "Error": + err_msg += "Internal error. " + elif status == "Stopped": + err_msg += "Probably stopped by Catalist staff. Will be rerun. " + elif status == "Exception": + err_msg += ( + "Error with data. Catalist will have been notified and " + "will contact you or rerun the file. " + ) + else: + "Unknown or unexpected final status." + err_msg += f"[job={id}, final_status={status}]" + raise RuntimeError(err_msg) + + remote_filepaths = self.sftp.list_directory("/myDownloads/") + remote_filename = [filename for filename in remote_filepaths if id in filename][ + 0 + ] + remote_filepath = "/myDownloads/" + remote_filename + temp_file_zip = self.sftp.get_file(remote_filepath) + temp_dir = tempfile.mkdtemp() + + with ZipFile(temp_file_zip) as zf: + zf.extractall(path=temp_dir) + + filepath = os.listdir(temp_dir)[0] + + result = Table.from_csv(os.path.join(temp_dir, filepath), delimiter="\t") + return result + + def validate_table(self, table: Table, template_id: str = "48827") -> None: + """Validate table structure and contents.""" + if not template_id == "48827": + logger.warn(f"No validator implemented for template {template_id}.") + return + + expected_table_columns = [ + "first_name", + "middle_name", + "last_name", + "name_suffix", + "addr1", + "addr2", + "city", + "state", + "zip", + "phone", + "email", + "gender_tomatch", + "dob", + "dob_year", + "matchbackid", + ] + + required_columns: List[str] = ["first_name", "last_name"] + actual_table_columns = table.columns + + unexpected_columns = [ + col for col in actual_table_columns if col not in expected_table_columns + ] + missing_required_columns = [ + col for col in required_columns if col not in actual_table_columns + ] + + errors = {} + if unexpected_columns: + errors["unexpected_columns"] = unexpected_columns + if missing_required_columns: + errors["missing_required_columns"] = missing_required_columns + + if errors: + raise ValueError( + "Input table does not have the right structure. %s", errors + ) + else: + logger.info("Table structure validated.") diff --git a/parsons/utilities/api_connector.py b/parsons/utilities/api_connector.py index 5c54106cbe..9db696c022 100644 --- a/parsons/utilities/api_connector.py +++ b/parsons/utilities/api_connector.py @@ -238,6 +238,8 @@ def validate_response(self, resp): if resp.status_code >= 400: if resp.reason: message = f"HTTP error occurred ({resp.status_code}): {resp.reason}" + elif resp.text: + message = f"HTTP error occurred ({resp.status_code}): {resp.text}" else: message = f"HTTP error occurred ({resp.status_code})" diff --git a/requirements.txt b/requirements.txt index b8390dfbf9..410e950254 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,6 +47,7 @@ black==22.12.0 testfixtures==6.18.5 pytest==7.1.1 pytest-datadir==1.3.0 +pytest-mock>=3.0.0 # Stuff for TMC scripts # TODO Remove when we have a TMC-specific Docker image diff --git a/setup.py b/setup.py index 7d4bd5c462..23239a3198 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ def main(): "azure": ["azure-storage-blob"], "box": ["boxsdk"], "braintree": ["braintree"], + "catalist": ["paramiko"], "civis": ["civis"], "facebook": ["joblib", "facebook-business"], "geocode": ["censusgeocode"], diff --git a/test/test_catalist/conftest.py b/test/test_catalist/conftest.py new file mode 100644 index 0000000000..cb9f13cae9 --- /dev/null +++ b/test/test_catalist/conftest.py @@ -0,0 +1,45 @@ +from unittest.mock import MagicMock +from typing import Generator +import pytest +import requests_mock +import re + + +@pytest.fixture(autouse=True) +def mock_requests() -> Generator[MagicMock, None, None]: + """Replace requests in api_connector with a mock client""" + with requests_mock.Mocker() as mocker: + mocker.post(requests_mock.ANY, json={"test": True}) + mocker.post( + "https://auth.catalist.us/oauth/token", + json={"access_token": "tokenexample", "expires_in": 99999, "test": True}, + ) + mocker.get(requests_mock.ANY, json=[{"test": True}]) + mocker.get( + re.compile("/mapi/status/id/"), + json={"process": {"processState": "Finished"}}, + ) + + yield mocker + + +@pytest.fixture(autouse=True) +def mock_sftp(mocker) -> Generator[MagicMock, None, None]: + """Replace sftp client with a mock client""" + magic_mock = MagicMock() + + mocker.patch("parsons.catalist.catalist.SFTP", new=magic_mock) + + yield mocker + + +@pytest.fixture(autouse=True) +def mock_miscellaneous(mocker) -> Generator[MagicMock, None, None]: + """Replace miscellaneous utilities with mocks to simplify testing""" + magic_mock = MagicMock() + + mocker.patch("parsons.catalist.catalist.ZipFile", new=magic_mock) + mocker.patch("parsons.catalist.catalist.os", new=magic_mock) + mocker.patch("parsons.catalist.catalist.Table", new=magic_mock) + + yield mocker diff --git a/test/test_catalist/test_catalist.py b/test/test_catalist/test_catalist.py new file mode 100644 index 0000000000..834b510f92 --- /dev/null +++ b/test/test_catalist/test_catalist.py @@ -0,0 +1,153 @@ +from parsons import Table, CatalistMatch +import time +import pytest +from unittest.mock import MagicMock + +TEST_CLIENT_ID = "some_client_id" +TEST_CLIENT_SECRET = "some_client_secret" +TEST_SFTP_USERNAME = "username" +TEST_SFTP_PASSWORD = "password" + + +def table_for_test(include_last_name: bool = True) -> Table: + """parsons Table for tests""" + table = Table( + [ + {"first_name": "John", "last_name": "Doe"}, + {"first_name": "Jane", "last_name": "Doe"}, + ] + ) + if not include_last_name: + table = table.cut("first_name") + return table + + +def match_client() -> CatalistMatch: + result = CatalistMatch( + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + sftp_username=TEST_SFTP_USERNAME, + sftp_password=TEST_SFTP_PASSWORD, + ) + result._token_expired_at = time.time() + 99999 + return result + + +class TestCatalist: + def test_fixtures_active(self) -> None: + """Test to ensure fixtures are active and relevant clients are mocked.""" + match = match_client() + assert isinstance(match.sftp, MagicMock) + assert match.connection.request("url", "get").json()[0]["test"] + + def test_validate_table(self) -> None: + """Check that table validation method works as expected.""" + match = match_client() + table = table_for_test() + match.validate_table(table) + + # first_name and last_name are required + # We expect an exception raised if last_name is missing + table_to_fail = table_for_test(include_last_name=False) + with pytest.raises(ValueError): + match.validate_table(table_to_fail) + + def test_load_table_to_sftp(self) -> None: + """Check that table load to SFTP executes as expected.""" + match = match_client() + source_table = table_for_test() + response = match.load_table_to_sftp(source_table) + + assert response.startswith("file://") + assert "myUploads" not in response + assert response.endswith(".csv.gz") + + # We expect one call to the SFTP client to put the file + assert len(match.sftp.mock_calls) == 1 + mocked_call = match.sftp.mock_calls[0] + called_method = str(mocked_call).split("(")[0].split(".")[1] + assert called_method == "put_file" + temp_local_file = mocked_call.args[0] + remote_path = mocked_call.args[1] + + # Expect local temp file CSV is the same as the source table CSV + table_to_load = Table.from_csv(temp_local_file) + for row_index in range(table_to_load.num_rows): + assert source_table[row_index] == table_to_load[row_index] + + # Expect the remote path is structured as expected + assert remote_path.startswith("myUploads/") + assert remote_path.endswith(".csv.gz") + + def test_upload(self, mock_requests) -> None: + """Mock use of upload() method, check API calls are structured as expected.""" + match = match_client() + source_table = table_for_test() + + # Execute upload + match.upload(source_table) + + requested_endpoint = mock_requests._adapter.request_history[1].path + requested_queries = mock_requests._adapter.request_history[1].qs + requested_base_url = mock_requests._adapter.request_history[1]._url_parts.netloc + + assert requested_base_url == "api.catalist.us" + assert set(requested_queries.keys()) == set(["token"]) + assert requested_queries["token"] == ["tokenexample"] + assert requested_endpoint.startswith( + "/mapi/upload/template/48827/action/publish/url/" + ) + + def test_upload_with_options(self, mock_requests) -> None: + """Mock use of upload() method with options, check API calls.""" + match = match_client() + source_table = table_for_test() + + # Execute upload + match.upload( + source_table, + copy_to_sandbox=True, + static_values={"phone": 123456789}, + ) + + requested_queries = mock_requests._adapter.request_history[1].qs + + assert set(requested_queries.keys()) == set(["token", "copytosandbox", "phone"]) + assert requested_queries["copytosandbox"] == ["true"] + assert requested_queries["phone"] == ["123456789"] + + def test_status(self, mock_requests) -> None: + """Mock use of status() method, check API calls are structured as expected.""" + match = match_client() + + # Check status + match.status("12345") + + requested_endpoint = mock_requests._adapter.request_history[1].path + requested_queries = mock_requests._adapter.request_history[1].qs + requested_base_url = mock_requests._adapter.request_history[1]._url_parts.netloc + + assert requested_base_url == "api.catalist.us" + assert set(requested_queries.keys()) == set(["token"]) + assert requested_queries["token"] == ["tokenexample"] + assert requested_endpoint == "/mapi/status/id/12345" + + def test_load_matches(self) -> None: + """Check that table download method from SFTP executes as expected.""" + match = match_client() + + # Execute download + match.sftp.list_directory = MagicMock(return_value=["example_12345"]) + match.load_matches("12345") + + # We expect two calls to the SFTP client to list the directory and get the file + assert len(match.sftp.mock_calls) == 2 + first_mocked_call = match.sftp.mock_calls[0] + first_called_method = str(first_mocked_call).split("(")[0].split(".")[1] + assert first_called_method == "list_directory" + assert set(first_mocked_call.args) == set(["/myDownloads/"]) + + second_mocked_call = match.sftp.mock_calls[1] + second_called_method = str(second_mocked_call).split("(")[0].split(".")[1] + assert second_called_method == "get_file" + assert set(second_mocked_call.args) == set(["/myDownloads/example_12345"])