Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-15198: Added best practices #116

Merged
merged 14 commits into from
Sep 30, 2021
10 changes: 2 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ orbs:
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
Expand All @@ -32,13 +32,7 @@ jobs:
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-shopify \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-shopify tests
- slack/notify-on-failure:
only_for_branches: master

Expand Down
10 changes: 7 additions & 3 deletions tap_shopify/streams/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class Stream():
replication_object = None
# Status parameter override option
status_key = None
results_per_page = None

def __init__(self):
self.results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE)

def get_bookmark(self):
bookmark = (singer.get_bookmark(Context.state,
Expand Down Expand Up @@ -127,7 +131,7 @@ def get_objects(self):

stop_time = singer.utils.now().replace(microsecond=0)
date_window_size = float(Context.config.get("date_window_size", DATE_WINDOW_SIZE))
results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE)
# results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we added this commented code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, removed the line


# Page through till the end of the resultset
while updated_at_min < stop_time:
Expand All @@ -151,7 +155,7 @@ def get_objects(self):
"since_id": since_id,
"updated_at_min": updated_at_min,
"updated_at_max": updated_at_max,
"limit": results_per_page,
"limit": self.results_per_page,
status_key: "any"
}

Expand All @@ -169,7 +173,7 @@ def get_objects(self):

# You know you're at the end when the current page has
# less than the request size limits you set.
if len(objects) < results_per_page:
if len(objects) < self.results_per_page:
# Save the updated_at_max as our bookmark as we've synced all rows up in our
# window and can move forward. Also remove the since_id because we want to
# restart at 1.
Expand Down
5 changes: 2 additions & 3 deletions tap_shopify/streams/collects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import singer
from singer import utils
from tap_shopify.streams.base import (Stream,
RESULTS_PER_PAGE,
OutOfOrderIdsError)
from tap_shopify.context import Context

Expand All @@ -20,7 +19,7 @@ def get_objects(self):
while True:
query_params = {
"since_id": since_id,
"limit": RESULTS_PER_PAGE,
"limit": self.results_per_page,
}

objects = self.call_api(query_params)
Expand All @@ -38,7 +37,7 @@ def get_objects(self):
obj.id, since_id))
yield obj

if len(objects) < RESULTS_PER_PAGE:
if len(objects) < self.results_per_page:
# Update the bookmark at the end of the last page
self.update_bookmark(max_bookmark)
break
Expand Down
4 changes: 2 additions & 2 deletions tap_shopify/streams/metafields.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_metafields(parent_object, since_id):
# This call results in an HTTP request - the parent object never has a
# cache of this data so we have to issue that request.
return parent_object.metafields(
limit=RESULTS_PER_PAGE,
limit=Context.get_results_per_page(RESULTS_PER_PAGE),
since_id=since_id)

class Metafields(Stream):
Expand All @@ -46,7 +46,7 @@ def get_objects(self):
raise OutOfOrderIdsError("metafield.id < since_id: {} < {}".format(
metafield.id, since_id))
yield metafield
if len(metafields) < RESULTS_PER_PAGE:
if len(metafields) < self.results_per_page:
break
if metafields[-1].id != max([o.id for o in metafields]):
raise OutOfOrderIdsError("{} is not the max id in metafields ({})".format(
Expand Down
5 changes: 2 additions & 3 deletions tap_shopify/streams/order_refunds.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from singer.utils import strftime, strptime_to_utc
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
RESULTS_PER_PAGE,
shopify_error_handling,
OutOfOrderIdsError)

Expand All @@ -15,7 +14,7 @@ class OrderRefunds(Stream):
def get_refunds(self, parent_object, since_id):
return self.replication_object.find(
order_id=parent_object.id,
limit=RESULTS_PER_PAGE,
limit=self.results_per_page,
since_id=since_id,
order='id asc')

Expand All @@ -33,7 +32,7 @@ def get_objects(self):
raise OutOfOrderIdsError("refund.id < since_id: {} < {}".format(
refund.id, since_id))
yield refund
if len(refunds) < RESULTS_PER_PAGE:
if len(refunds) < self.results_per_page:
break
if refunds[-1].id != max([o.id for o in refunds]):
raise OutOfOrderIdsError("{} is not the max id in refunds ({})".format(
Expand Down
2 changes: 1 addition & 1 deletion tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_properties(self, original: bool = True):
'shop': 'stitchdatawearhouse',
'date_window_size': 30,
# BUG: https://jira.talendforge.org/browse/TDL-13180
'results_per_page': '50'
# 'results_per_page': '50'
}

if original:
Expand Down
15 changes: 11 additions & 4 deletions tests/test_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def __init__(self, *args, **kwargs):
def name(self):
return "tap_tester_shopify_pagination_test"

def get_properties(self, *args, **kwargs):
props = super().get_properties(*args, **kwargs)
props['results_per_page'] = '50'
return props

def test_run(self):
with self.subTest(store="store_1"):
Expand Down Expand Up @@ -56,15 +60,18 @@ def pagination_test(self, conn_id, testable_streams):
record_count_by_stream = self.run_sync(conn_id)
actual_fields_by_stream = runner.examine_target_output_for_fields()

api_limit = int(self.get_properties().get('results_per_page', self.DEFAULT_RESULTS_PER_PAGE))

for stream in testable_streams:
with self.subTest(stream=stream):

# verify that we can paginate with all fields selected
stream_metadata = self.expected_metadata().get(stream, {})
minimum_record_count = stream_metadata.get(
self.API_LIMIT,
self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE)
)
# minimum_record_count = stream_metadata.get(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this commented code if not required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the commented code

# self.API_LIMIT,
# self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE)
# )
minimum_record_count = 100 if stream == 'transactions' else api_limit
self.assertGreater(
record_count_by_stream.get(stream, -1),
minimum_record_count,
Expand Down