Skip to content

Commit

Permalink
Add reports streams rate limit handling logics.
Browse files Browse the repository at this point in the history
Add rate limit unit tests.
  • Loading branch information
htrueman committed Sep 15, 2021
1 parent aeab6a8 commit b49e49c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth
from airbyte_cdk.sources.streams.http.exceptions import RequestBodyException
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException
from airbyte_cdk.sources.streams.http.http import BODY_REQUEST_METHODS
from airbyte_cdk.sources.streams.http.rate_limiting import default_backoff_handler
from Crypto.Cipher import AES
from source_amazon_seller_partner.auth import AWSSignature

Expand Down Expand Up @@ -182,6 +183,18 @@ def request_headers(self) -> Mapping[str, Any]:
def path(self, document_id: str) -> str:
return f"{self.path_prefix}/documents/{document_id}"

def should_retry(self, response: requests.Response) -> bool:
return response.status_code == 429 or 500 <= response.status_code < 600

@default_backoff_handler(max_tries=5, factor=5)
def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
response: requests.Response = self._session.send(request)
if self.should_retry(response):
raise DefaultBackoffException(request=request, response=response)
else:
response.raise_for_status()
return response

def _create_prepared_request(
self, path: str, http_method: str = "GET", headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
) -> requests.PreparedRequest:
Expand Down Expand Up @@ -215,7 +228,7 @@ def _create_report(self) -> Mapping[str, Any]:
headers=dict(request_headers, **self.authenticator.get_auth_header()),
data=json_lib.dumps(report_data),
)
report_response = self._session.send(create_report_request)
report_response = self._send_request(create_report_request)
return report_response.json()[self.data_field]

def _retrieve_report(self, report_id: str) -> Mapping[str, Any]:
Expand All @@ -224,7 +237,7 @@ def _retrieve_report(self, report_id: str) -> Mapping[str, Any]:
path=f"{self.path_prefix}/reports/{report_id}",
headers=dict(request_headers, **self.authenticator.get_auth_header()),
)
retrieve_report_response = self._session.send(retrieve_report_request)
retrieve_report_response = self._send_request(retrieve_report_request)
report_payload = retrieve_report_response.json().get(self.data_field, {})
return report_payload

Expand Down Expand Up @@ -290,7 +303,7 @@ def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(),
)
response = self._session.send(request)
response = self._send_request(request)
yield from self.parse_response(response)
else:
logger.warn(f"There are no report document related in stream `{self.name}`. Report body {report_payload}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import pytest
import requests
from airbyte_cdk.sources.streams.http.auth import NoAuth
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from source_amazon_seller_partner.auth import AWSSignature
from source_amazon_seller_partner.streams import MerchantListingsReports


@pytest.fixture
def reports_stream():
aws_signature = AWSSignature(
service="execute-api",
aws_access_key_id="AccessKeyId",
aws_secret_access_key="SecretAccessKey",
aws_session_token="SessionToken",
region="US",
)
stream = MerchantListingsReports(
url_base="https://test.url",
aws_signature=aws_signature,
replication_start_date="2017-01-25T00:00:00Z",
marketplace_ids=["id"],
authenticator=NoAuth(),
)
return stream


def test_reports_stream_should_retry(mocker, reports_stream):
response = requests.Response()
response.status_code = 429
mocker.patch.object(requests.Session, "send", return_value=response)
should_retry = reports_stream.should_retry(response=response)

assert should_retry is True


def test_reports_stream_send_request(mocker, reports_stream):
response = requests.Response()
response.status_code = 200
mocker.patch.object(requests.Session, "send", return_value=response)

assert response == reports_stream._send_request(request=requests.PreparedRequest())


def test_reports_stream_send_request_backoff_exception(mocker, caplog, reports_stream):

This comment has been minimized.

Copy link
@sherifnada

sherifnada Sep 17, 2021

Contributor

we should override time.sleep to make it a no-op so we don't wait 1min for this to sleep (we should also do that in the CDK tests - can you create an issue?)

response = requests.Response()
response.status_code = 429
mocker.patch.object(requests.Session, "send", return_value=response)

with pytest.raises(DefaultBackoffException):
reports_stream._send_request(request=requests.PreparedRequest())

assert "Backing off _send_request(...) for 5.0s" in caplog.text
assert "Backing off _send_request(...) for 10.0s" in caplog.text
assert "Backing off _send_request(...) for 20.0s" in caplog.text
assert "Backing off _send_request(...) for 40.0s" in caplog.text
assert "Giving up _send_request(...) after 5 tries" in caplog.text

This file was deleted.

0 comments on commit b49e49c

Please sign in to comment.