diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index 4c5ee577..04488bf6 100644 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -86,6 +86,10 @@ class Stream(): replication_object = None # Status parameter override option status_key = None + results_per_page = None + + def __init__(self): + self.results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) def get_bookmark(self): bookmark = (singer.get_bookmark(Context.state, @@ -127,7 +131,6 @@ def get_objects(self): stop_time = singer.utils.now().replace(microsecond=0) date_window_size = float(Context.config.get("date_window_size", DATE_WINDOW_SIZE)) - results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) # Page through till the end of the resultset while updated_at_min < stop_time: @@ -151,7 +154,7 @@ def get_objects(self): "since_id": since_id, "updated_at_min": updated_at_min, "updated_at_max": updated_at_max, - "limit": results_per_page, + "limit": self.results_per_page, status_key: "any" } @@ -169,7 +172,7 @@ def get_objects(self): # You know you're at the end when the current page has # less than the request size limits you set. - if len(objects) < results_per_page: + if len(objects) < self.results_per_page: # Save the updated_at_max as our bookmark as we've synced all rows up in our # window and can move forward. Also remove the since_id because we want to # restart at 1. diff --git a/tap_shopify/streams/collects.py b/tap_shopify/streams/collects.py index 078f2bde..83342ee5 100644 --- a/tap_shopify/streams/collects.py +++ b/tap_shopify/streams/collects.py @@ -2,7 +2,6 @@ import singer from singer import utils from tap_shopify.streams.base import (Stream, - RESULTS_PER_PAGE, OutOfOrderIdsError) from tap_shopify.context import Context @@ -20,7 +19,7 @@ def get_objects(self): while True: query_params = { "since_id": since_id, - "limit": RESULTS_PER_PAGE, + "limit": self.results_per_page, } objects = self.call_api(query_params) @@ -38,7 +37,7 @@ def get_objects(self): obj.id, since_id)) yield obj - if len(objects) < RESULTS_PER_PAGE: + if len(objects) < self.results_per_page: # Update the bookmark at the end of the last page self.update_bookmark(max_bookmark) break diff --git a/tap_shopify/streams/metafields.py b/tap_shopify/streams/metafields.py index 9b960759..f96b3a06 100644 --- a/tap_shopify/streams/metafields.py +++ b/tap_shopify/streams/metafields.py @@ -20,7 +20,7 @@ def get_metafields(parent_object, since_id): # This call results in an HTTP request - the parent object never has a # cache of this data so we have to issue that request. return parent_object.metafields( - limit=RESULTS_PER_PAGE, + limit=Context.get_results_per_page(RESULTS_PER_PAGE), since_id=since_id) class Metafields(Stream): @@ -46,7 +46,7 @@ def get_objects(self): raise OutOfOrderIdsError("metafield.id < since_id: {} < {}".format( metafield.id, since_id)) yield metafield - if len(metafields) < RESULTS_PER_PAGE: + if len(metafields) < self.results_per_page: break if metafields[-1].id != max([o.id for o in metafields]): raise OutOfOrderIdsError("{} is not the max id in metafields ({})".format( diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py index 690915b3..523cf57c 100644 --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -2,7 +2,6 @@ from singer.utils import strftime, strptime_to_utc from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, - RESULTS_PER_PAGE, shopify_error_handling, OutOfOrderIdsError) @@ -15,7 +14,7 @@ class OrderRefunds(Stream): def get_refunds(self, parent_object, since_id): return self.replication_object.find( order_id=parent_object.id, - limit=RESULTS_PER_PAGE, + limit=self.results_per_page, since_id=since_id, order='id asc') @@ -33,7 +32,7 @@ def get_objects(self): raise OutOfOrderIdsError("refund.id < since_id: {} < {}".format( refund.id, since_id)) yield refund - if len(refunds) < RESULTS_PER_PAGE: + if len(refunds) < self.results_per_page: break if refunds[-1].id != max([o.id for o in refunds]): raise OutOfOrderIdsError("{} is not the max id in refunds ({})".format( diff --git a/tests/base.py b/tests/base.py index d35ef8d3..fb67e167 100644 --- a/tests/base.py +++ b/tests/base.py @@ -43,7 +43,7 @@ def get_properties(self, original: bool = True): 'shop': 'stitchdatawearhouse', 'date_window_size': 30, # BUG: https://jira.talendforge.org/browse/TDL-13180 - 'results_per_page': '50' + # 'results_per_page': '50' } if original: diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 4959a066..f6050f27 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -16,6 +16,10 @@ def __init__(self, *args, **kwargs): def name(self): return "tap_tester_shopify_pagination_test" + def get_properties(self, *args, **kwargs): + props = super().get_properties(*args, **kwargs) + props['results_per_page'] = '50' + return props def test_run(self): with self.subTest(store="store_1"): @@ -56,15 +60,14 @@ def pagination_test(self, conn_id, testable_streams): record_count_by_stream = self.run_sync(conn_id) actual_fields_by_stream = runner.examine_target_output_for_fields() + api_limit = int(self.get_properties().get('results_per_page', self.DEFAULT_RESULTS_PER_PAGE)) + for stream in testable_streams: with self.subTest(stream=stream): # verify that we can paginate with all fields selected stream_metadata = self.expected_metadata().get(stream, {}) - minimum_record_count = stream_metadata.get( - self.API_LIMIT, - self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) - ) + minimum_record_count = 100 if stream == 'transactions' else api_limit self.assertGreater( record_count_by_stream.get(stream, -1), minimum_record_count,