diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py index 459494f60922b..bb3c8c1863c90 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py @@ -37,8 +37,9 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth -from airbyte_cdk.sources.streams.http.exceptions import RequestBodyException +from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException from airbyte_cdk.sources.streams.http.http import BODY_REQUEST_METHODS +from airbyte_cdk.sources.streams.http.rate_limiting import default_backoff_handler from Crypto.Cipher import AES from source_amazon_seller_partner.auth import AWSSignature @@ -182,6 +183,18 @@ def request_headers(self) -> Mapping[str, Any]: def path(self, document_id: str) -> str: return f"{self.path_prefix}/documents/{document_id}" + def should_retry(self, response: requests.Response) -> bool: + return response.status_code == 429 or 500 <= response.status_code < 600 + + @default_backoff_handler(max_tries=5, factor=5) + def _send_request(self, request: requests.PreparedRequest) -> requests.Response: + response: requests.Response = self._session.send(request) + if self.should_retry(response): + raise DefaultBackoffException(request=request, response=response) + else: + response.raise_for_status() + return response + def _create_prepared_request( self, path: str, http_method: str = "GET", headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None ) -> requests.PreparedRequest: @@ -215,7 +228,7 @@ def _create_report(self) -> Mapping[str, Any]: headers=dict(request_headers, **self.authenticator.get_auth_header()), data=json_lib.dumps(report_data), ) - report_response = self._session.send(create_report_request) + report_response = self._send_request(create_report_request) return report_response.json()[self.data_field] def _retrieve_report(self, report_id: str) -> Mapping[str, Any]: @@ -224,7 +237,7 @@ def _retrieve_report(self, report_id: str) -> Mapping[str, Any]: path=f"{self.path_prefix}/reports/{report_id}", headers=dict(request_headers, **self.authenticator.get_auth_header()), ) - retrieve_report_response = self._session.send(retrieve_report_request) + retrieve_report_response = self._send_request(retrieve_report_request) report_payload = retrieve_report_response.json().get(self.data_field, {}) return report_payload @@ -290,7 +303,7 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: headers=dict(request_headers, **self.authenticator.get_auth_header()), params=self.request_params(), ) - response = self._session.send(request) + response = self._send_request(request) yield from self.parse_response(response) else: logger.warn(f"There are no report document related in stream `{self.name}`. Report body {report_payload}") diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_repots_streams_rate_limits.py b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_repots_streams_rate_limits.py new file mode 100644 index 0000000000000..5e316c075b604 --- /dev/null +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/test_repots_streams_rate_limits.py @@ -0,0 +1,81 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +import pytest +import requests +from airbyte_cdk.sources.streams.http.auth import NoAuth +from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException +from source_amazon_seller_partner.auth import AWSSignature +from source_amazon_seller_partner.streams import MerchantListingsReports + + +@pytest.fixture +def reports_stream(): + aws_signature = AWSSignature( + service="execute-api", + aws_access_key_id="AccessKeyId", + aws_secret_access_key="SecretAccessKey", + aws_session_token="SessionToken", + region="US", + ) + stream = MerchantListingsReports( + url_base="https://test.url", + aws_signature=aws_signature, + replication_start_date="2017-01-25T00:00:00Z", + marketplace_ids=["id"], + authenticator=NoAuth(), + ) + return stream + + +def test_reports_stream_should_retry(mocker, reports_stream): + response = requests.Response() + response.status_code = 429 + mocker.patch.object(requests.Session, "send", return_value=response) + should_retry = reports_stream.should_retry(response=response) + + assert should_retry is True + + +def test_reports_stream_send_request(mocker, reports_stream): + response = requests.Response() + response.status_code = 200 + mocker.patch.object(requests.Session, "send", return_value=response) + + assert response == reports_stream._send_request(request=requests.PreparedRequest()) + + +def test_reports_stream_send_request_backoff_exception(mocker, caplog, reports_stream): + response = requests.Response() + response.status_code = 429 + mocker.patch.object(requests.Session, "send", return_value=response) + + with pytest.raises(DefaultBackoffException): + reports_stream._send_request(request=requests.PreparedRequest()) + + assert "Backing off _send_request(...) for 5.0s" in caplog.text + assert "Backing off _send_request(...) for 10.0s" in caplog.text + assert "Backing off _send_request(...) for 20.0s" in caplog.text + assert "Backing off _send_request(...) for 40.0s" in caplog.text + assert "Giving up _send_request(...) after 5 tries" in caplog.text diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/unit_test.py deleted file mode 100644 index b8a8150b507fd..0000000000000 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/unit_tests/unit_test.py +++ /dev/null @@ -1,27 +0,0 @@ -# -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -# - - -def test_example_method(): - assert True