Skip to content

Commit

Permalink
Backoff on 504. (#17)
Browse files Browse the repository at this point in the history
* Adds logging to response (#8)

* Adds journal_lines stream (#9)

* Adds journal_lines stream

* Adds a validation on line_mdata

* Adds config flag to enable journal_lines_stream

* Move to Jenkins

* Adds fk for JournalLines in order to match with Journal (#10)

* Feature/journal lines as stream (#11)

* Fix Xero requirements

* Handle XeroUnauthorizedError properly

* add ability to filter by start_date

* Backoff for RemoteDisconnect and ConnectionError (#15)

* Backoff on 504.

---------

Co-authored-by: Vinicius Mesel <[email protected]>
Co-authored-by: Hassan Syyid <[email protected]>
  • Loading branch information
3 people authored Mar 6, 2024
1 parent 90b5445 commit 260fcff
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 8 deletions.
10 changes: 10 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
pipeline {
agent {label 'linux'}
stages {
stage('Deploy connector') {
steps {
sh('cd /var/lib/jenkins/ && ./deploy-connector.sh')
}
}
}
}
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
from setuptools import setup, find_packages

setup(name="tap-xero",
version="2.2.5",
version="2.2.12",
description="Singer.io tap for extracting data from the Xero API",
author="Stitch",
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_xero"],
install_requires=[
"singer-python==5.9.0",
"requests==2.20.0",
"requests==2.29.0",
],
extras_require={
'dev': [
Expand Down
19 changes: 14 additions & 5 deletions tap_xero/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import pytz
import backoff
import singer
from http.client import RemoteDisconnected
from requests.exceptions import ConnectionError,ReadTimeout,ChunkedEncodingError
from urllib3.exceptions import ProtocolError

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -185,7 +188,7 @@ def __init__(self, config):
self.access_token = None

def refresh_credentials(self, config, config_path):

LOGGER.info("Refreshing OAuth credentials")
header_token = b64encode((config["client_id"] + ":" + config["client_secret"]).encode('utf-8'))

headers = {
Expand All @@ -203,6 +206,7 @@ def refresh_credentials(self, config, config_path):
raise_for_error(resp)
else:
resp = resp.json()
LOGGER.info(f"Completed refresh of OAuth tokens. response={resp}")

# Write to config file
config['refresh_token'] = resp["refresh_token"]
Expand All @@ -211,8 +215,8 @@ def refresh_credentials(self, config, config_path):
self.tenant_id = config['tenant_id']


@backoff.on_exception(backoff.expo, (json.decoder.JSONDecodeError, XeroInternalError), max_tries=3)
@backoff.on_exception(retry_after_wait_gen, XeroTooManyInMinuteError, giveup=is_not_status_code_fn([429]), jitter=None, max_tries=3)
@backoff.on_exception(backoff.expo, (json.decoder.JSONDecodeError, XeroInternalError,RemoteDisconnected,ConnectionError,ReadTimeout,ChunkedEncodingError,ProtocolError), max_tries=3)
@backoff.on_exception(retry_after_wait_gen, XeroTooManyInMinuteError, giveup=is_not_status_code_fn([429,504]), jitter=None, max_tries=3)
def check_platform_access(self, config, config_path):

# Validating the authentication of the provided configuration
Expand All @@ -233,8 +237,8 @@ def check_platform_access(self, config, config_path):
raise_for_error(response)


@backoff.on_exception(backoff.expo, (json.decoder.JSONDecodeError, XeroInternalError), max_tries=3)
@backoff.on_exception(retry_after_wait_gen, XeroTooManyInMinuteError, giveup=is_not_status_code_fn([429]), jitter=None, max_tries=3)
@backoff.on_exception(backoff.expo, (json.decoder.JSONDecodeError, XeroInternalError,RemoteDisconnected,ConnectionError,ReadTimeout,ChunkedEncodingError,ProtocolError), max_tries=3)
@backoff.on_exception(retry_after_wait_gen, XeroTooManyInMinuteError, giveup=is_not_status_code_fn([429,504]), jitter=None, max_tries=3)
def filter(self, tap_stream_id, since=None, **params):
xero_resource_name = tap_stream_id.title().replace("_", "")
is_report = False
Expand All @@ -245,6 +249,8 @@ def filter(self, tap_stream_id, since=None, **params):
headers = {"Accept": "application/json",
"Authorization": "Bearer " + self.access_token,
"Xero-tenant-id": self.tenant_id}
if params.get("headers"):
headers.update(params.pop("headers"))
if self.user_agent:
headers["User-Agent"] = self.user_agent
if since:
Expand Down Expand Up @@ -285,6 +291,9 @@ def raise_for_error(resp):
elif error_code in (403, 401):
api_message = ERROR_CODE_EXCEPTION_MAPPING[error_code]["message"]
message = "HTTP-error-code: {}, Error: {}".format(error_code, api_message)
elif error_code == 504:
message = "HTTP-error-code: 504, Error: Gateway Timeout"
raise ReadTimeout(message)
else:
# Forming a response message for raising custom exception
try:
Expand Down
5 changes: 5 additions & 0 deletions tap_xero/schemas/journals.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
"JournalLines": {
"items": {
"properties": {
"JournalID": {
"type": [
"string"
]
},
"JournalLineID": {
"type": [
"null",
Expand Down
50 changes: 49 additions & 1 deletion tap_xero/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from . import transform
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from tap_xero.client import XeroUnauthorizedError
from http.client import RemoteDisconnected
from requests.exceptions import ConnectionError,ReadTimeout,ChunkedEncodingError
from urllib3.exceptions import ProtocolError


LOGGER = singer.get_logger()
Expand All @@ -32,13 +36,18 @@ class RateLimitException(Exception):


@backoff.on_exception(backoff.expo,
RateLimitException,
(RateLimitException,RemoteDisconnected,ConnectionError,ReadTimeout,ChunkedEncodingError,ProtocolError),
max_tries=10,
factor=2)
def _make_request(ctx, tap_stream_id, filter_options=None, attempts=0):
filter_options = filter_options or {}
try:
return _request_with_timer(tap_stream_id, ctx.client, filter_options)
except XeroUnauthorizedError as e:
if attempts == 1:
raise e
ctx.refresh_credentials()
return _make_request(ctx, tap_stream_id, filter_options, attempts + 1)
except HTTPError as e:
if e.response.status_code == 401:
if attempts == 1:
Expand Down Expand Up @@ -178,6 +187,10 @@ def sync(self, ctx):
journal_number = ctx.get_bookmark(bookmark) or 0
while True:
filter_options = {"offset": journal_number}
if ctx.config.get("start_date"):
start_date = parse(ctx.config.get("start_date"))
filter_options['headers'] = {"If-Modified-Since": start_date.strftime("%Y-%m-%dT%H:%M:%SZ")}

records = _make_request(ctx, self.tap_stream_id, filter_options)
logging.info("Got {} records: {}".format(
len(records), records
Expand All @@ -191,6 +204,41 @@ def sync(self, ctx):
if not records or len(records) < FULL_PAGE_SIZE:
break

def write_records(self, records, ctx):
""""Custom implementation from the write records method available in Stream class"""
stream = ctx.catalog.get_stream(self.tap_stream_id)
schema = stream.schema.to_dict()
lines_schema = schema["properties"].get("JournalLines", {}).get("items")
lines_schema["JournalID"] = schema["properties"]["JournalID"]
lines_stream_id = "{}_lines".format(self.tap_stream_id)
mdata = stream.metadata
try:
line_mdata = [i for i in mdata if "JournalLines" in i.get("breadcrumb", [])]
except IndexError:
line_mdata = None

if line_mdata:
singer.write_schema(
lines_stream_id,
lines_schema,
["JournalLineID"]
)

if line_mdata is None:
line_mdata = []

for rec in records:
with Transformer() as transformer:
rec = transformer.transform(rec, schema, metadata.to_map(mdata))
singer.write_record(self.tap_stream_id, rec)
if "JournalLines" in rec and len(line_mdata) > 0 and ctx.config.get("journal_lines_stream") in ["true", True]:
for line in rec["JournalLines"]:
with Transformer() as transformer:
line["JournalID"] = rec["JournalID"]
line = transformer.transform(line, lines_schema, metadata.to_map(line_mdata))
singer.write_record(lines_stream_id, line)
self.metrics(records)


class LinkedTransactions(Stream):
"""The Linked Transactions endpoint is a special case. It supports
Expand Down

0 comments on commit 260fcff

Please sign in to comment.