From fcfaf21b57cc6de352e9b40bf53b7a4de5783acf Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 9 Aug 2021 20:49:12 +0530 Subject: [PATCH 01/12] added best practices --- .circleci/config.yml | 10 ++-------- tap_shopify/streams/base.py | 7 ++++--- tap_shopify/streams/collects.py | 4 ++-- tap_shopify/streams/metafields.py | 4 ++-- tap_shopify/streams/order_refunds.py | 4 ++-- tests/base.py | 2 +- tests/test_pagination.py | 30 ++++++++++++++++++++++++---- 7 files changed, 39 insertions(+), 22 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a47d9d1d..303c8484 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -5,7 +5,7 @@ orbs: jobs: build: docker: - - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4 + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester steps: - checkout - run: @@ -32,13 +32,7 @@ jobs: aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-shopify \ - --target=target-stitch \ - --orchestrator=stitch-orchestrator \ - --email=harrison+sandboxtest@stitchdata.com \ - --password=$SANDBOX_PASSWORD \ - --client-id=50 \ - tests + run-test --tap=tap-shopify tests - slack/notify-on-failure: only_for_branches: master diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index 4c5ee577..ed98d066 100644 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -86,6 +86,7 @@ class Stream(): replication_object = None # Status parameter override option status_key = None + results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) def get_bookmark(self): bookmark = (singer.get_bookmark(Context.state, @@ -127,7 +128,7 @@ def get_objects(self): stop_time = singer.utils.now().replace(microsecond=0) date_window_size = float(Context.config.get("date_window_size", DATE_WINDOW_SIZE)) - results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) + # results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) # Page through till the end of the resultset while updated_at_min < stop_time: @@ -151,7 +152,7 @@ def get_objects(self): "since_id": since_id, "updated_at_min": updated_at_min, "updated_at_max": updated_at_max, - "limit": results_per_page, + "limit": self.results_per_page, status_key: "any" } @@ -169,7 +170,7 @@ def get_objects(self): # You know you're at the end when the current page has # less than the request size limits you set. - if len(objects) < results_per_page: + if len(objects) < self.results_per_page: # Save the updated_at_max as our bookmark as we've synced all rows up in our # window and can move forward. Also remove the since_id because we want to # restart at 1. diff --git a/tap_shopify/streams/collects.py b/tap_shopify/streams/collects.py index 078f2bde..a2c28262 100644 --- a/tap_shopify/streams/collects.py +++ b/tap_shopify/streams/collects.py @@ -20,7 +20,7 @@ def get_objects(self): while True: query_params = { "since_id": since_id, - "limit": RESULTS_PER_PAGE, + "limit": self.results_per_page, } objects = self.call_api(query_params) @@ -38,7 +38,7 @@ def get_objects(self): obj.id, since_id)) yield obj - if len(objects) < RESULTS_PER_PAGE: + if len(objects) < self.results_per_page: # Update the bookmark at the end of the last page self.update_bookmark(max_bookmark) break diff --git a/tap_shopify/streams/metafields.py b/tap_shopify/streams/metafields.py index 9b960759..f96b3a06 100644 --- a/tap_shopify/streams/metafields.py +++ b/tap_shopify/streams/metafields.py @@ -20,7 +20,7 @@ def get_metafields(parent_object, since_id): # This call results in an HTTP request - the parent object never has a # cache of this data so we have to issue that request. return parent_object.metafields( - limit=RESULTS_PER_PAGE, + limit=Context.get_results_per_page(RESULTS_PER_PAGE), since_id=since_id) class Metafields(Stream): @@ -46,7 +46,7 @@ def get_objects(self): raise OutOfOrderIdsError("metafield.id < since_id: {} < {}".format( metafield.id, since_id)) yield metafield - if len(metafields) < RESULTS_PER_PAGE: + if len(metafields) < self.results_per_page: break if metafields[-1].id != max([o.id for o in metafields]): raise OutOfOrderIdsError("{} is not the max id in metafields ({})".format( diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py index 690915b3..59f8ef7b 100644 --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -15,7 +15,7 @@ class OrderRefunds(Stream): def get_refunds(self, parent_object, since_id): return self.replication_object.find( order_id=parent_object.id, - limit=RESULTS_PER_PAGE, + limit=self.results_per_page, since_id=since_id, order='id asc') @@ -33,7 +33,7 @@ def get_objects(self): raise OutOfOrderIdsError("refund.id < since_id: {} < {}".format( refund.id, since_id)) yield refund - if len(refunds) < RESULTS_PER_PAGE: + if len(refunds) < self.results_per_page: break if refunds[-1].id != max([o.id for o in refunds]): raise OutOfOrderIdsError("{} is not the max id in refunds ({})".format( diff --git a/tests/base.py b/tests/base.py index d35ef8d3..fb67e167 100644 --- a/tests/base.py +++ b/tests/base.py @@ -43,7 +43,7 @@ def get_properties(self, original: bool = True): 'shop': 'stitchdatawearhouse', 'date_window_size': 30, # BUG: https://jira.talendforge.org/browse/TDL-13180 - 'results_per_page': '50' + # 'results_per_page': '50' } if original: diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 4959a066..4cfbb2b3 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -16,6 +16,25 @@ def __init__(self, *args, **kwargs): def name(self): return "tap_tester_shopify_pagination_test" + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'start_date': '2017-07-01T00:00:00Z', + 'shop': 'stitchdatawearhouse', + 'date_window_size': 30, + # BUG: https://jira.talendforge.org/browse/TDL-13180 + 'results_per_page': '50' + } + + if original: + return return_value + + # This test needs the new connections start date to be larger than the default + assert self.start_date > return_value["start_date"] + + return_value["start_date"] = self.start_date + return_value['shop'] = 'talenddatawearhouse' + return return_value def test_run(self): with self.subTest(store="store_1"): @@ -56,15 +75,18 @@ def pagination_test(self, conn_id, testable_streams): record_count_by_stream = self.run_sync(conn_id) actual_fields_by_stream = runner.examine_target_output_for_fields() + api_limit = self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) + for stream in testable_streams: with self.subTest(stream=stream): # verify that we can paginate with all fields selected stream_metadata = self.expected_metadata().get(stream, {}) - minimum_record_count = stream_metadata.get( - self.API_LIMIT, - self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) - ) + # minimum_record_count = stream_metadata.get( + # self.API_LIMIT, + # self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) + # ) + minimum_record_count = 100 if stream == 'transactions' else api_limit self.assertGreater( record_count_by_stream.get(stream, -1), minimum_record_count, From fc909d81b60f4b115874af288598668566267483 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 9 Aug 2021 20:53:09 +0530 Subject: [PATCH 02/12] resolved pylint --- tap_shopify/streams/collects.py | 1 - tap_shopify/streams/order_refunds.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tap_shopify/streams/collects.py b/tap_shopify/streams/collects.py index a2c28262..83342ee5 100644 --- a/tap_shopify/streams/collects.py +++ b/tap_shopify/streams/collects.py @@ -2,7 +2,6 @@ import singer from singer import utils from tap_shopify.streams.base import (Stream, - RESULTS_PER_PAGE, OutOfOrderIdsError) from tap_shopify.context import Context diff --git a/tap_shopify/streams/order_refunds.py b/tap_shopify/streams/order_refunds.py index 59f8ef7b..523cf57c 100644 --- a/tap_shopify/streams/order_refunds.py +++ b/tap_shopify/streams/order_refunds.py @@ -2,7 +2,6 @@ from singer.utils import strftime, strptime_to_utc from tap_shopify.context import Context from tap_shopify.streams.base import (Stream, - RESULTS_PER_PAGE, shopify_error_handling, OutOfOrderIdsError) From 498c62025ad466e8f7dfdfa164caaae67e84dea6 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 9 Aug 2021 21:34:04 +0530 Subject: [PATCH 03/12] test: run only pagination test --- .circleci/config.yml | 2 +- tests/test_pagination.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 303c8484..9d565f80 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-shopify tests + run-test --tap=tap-shopify tests/test_pagination.py - slack/notify-on-failure: only_for_branches: master diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 4cfbb2b3..f3fdefba 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -23,7 +23,7 @@ def get_properties(self, original: bool = True): 'shop': 'stitchdatawearhouse', 'date_window_size': 30, # BUG: https://jira.talendforge.org/browse/TDL-13180 - 'results_per_page': '50' + 'results_per_page': 50 } if original: @@ -75,7 +75,7 @@ def pagination_test(self, conn_id, testable_streams): record_count_by_stream = self.run_sync(conn_id) actual_fields_by_stream = runner.examine_target_output_for_fields() - api_limit = self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) + api_limit = self.get_properties().get('results_per_page', self.DEFAULT_RESULTS_PER_PAGE) for stream in testable_streams: with self.subTest(stream=stream): From d196597a45ae534438d5a9893febddd39a7d818d Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 9 Aug 2021 21:53:03 +0530 Subject: [PATCH 04/12] test: run only pagination test --- tests/test_pagination.py | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/tests/test_pagination.py b/tests/test_pagination.py index f3fdefba..55b2e28a 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -16,25 +16,10 @@ def __init__(self, *args, **kwargs): def name(self): return "tap_tester_shopify_pagination_test" - def get_properties(self, original: bool = True): - """Configuration properties required for the tap.""" - return_value = { - 'start_date': '2017-07-01T00:00:00Z', - 'shop': 'stitchdatawearhouse', - 'date_window_size': 30, - # BUG: https://jira.talendforge.org/browse/TDL-13180 - 'results_per_page': 50 - } - - if original: - return return_value - - # This test needs the new connections start date to be larger than the default - assert self.start_date > return_value["start_date"] - - return_value["start_date"] = self.start_date - return_value['shop'] = 'talenddatawearhouse' - return return_value + def get_properties(self, *args, **kwargs): + props = super().get_properties(*args, **kwargs) + props['results_per_page'] = 50 + return props def test_run(self): with self.subTest(store="store_1"): From ca11e98cda27b369269a4d7c9093a4ed3f8c5880 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 10 Aug 2021 09:30:02 +0530 Subject: [PATCH 05/12] test: run all tests --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9d565f80..303c8484 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-shopify tests/test_pagination.py + run-test --tap=tap-shopify tests - slack/notify-on-failure: only_for_branches: master From 6b59b64e95d52122e51168e033e4b57991cf7450 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 10 Aug 2021 09:56:46 +0530 Subject: [PATCH 06/12] test: run all tests --- tests/test_pagination.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 55b2e28a..855e81ec 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -18,7 +18,7 @@ def name(self): def get_properties(self, *args, **kwargs): props = super().get_properties(*args, **kwargs) - props['results_per_page'] = 50 + props['results_per_page'] = '50' return props def test_run(self): @@ -60,7 +60,7 @@ def pagination_test(self, conn_id, testable_streams): record_count_by_stream = self.run_sync(conn_id) actual_fields_by_stream = runner.examine_target_output_for_fields() - api_limit = self.get_properties().get('results_per_page', self.DEFAULT_RESULTS_PER_PAGE) + api_limit = int(self.get_properties().get('results_per_page', self.DEFAULT_RESULTS_PER_PAGE)) for stream in testable_streams: with self.subTest(stream=stream): From c9808a87c99007b14df66104125c8e047563a40a Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 10 Aug 2021 10:14:27 +0530 Subject: [PATCH 07/12] test: run only paginaiton test --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 303c8484..9d565f80 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-shopify tests + run-test --tap=tap-shopify tests/test_pagination.py - slack/notify-on-failure: only_for_branches: master From 4a4451885ab252b5ca14482ee993b909c30d4fc2 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 10 Aug 2021 13:04:32 +0530 Subject: [PATCH 08/12] test: updated variable page size setting --- tap_shopify/streams/base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index ed98d066..9024f6f6 100644 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -86,7 +86,10 @@ class Stream(): replication_object = None # Status parameter override option status_key = None - results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) + results_per_page = None + + def __init__(self): + self.results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) def get_bookmark(self): bookmark = (singer.get_bookmark(Context.state, From 34239ba6f02cd4f0e3bc61adeea543085568364a Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 10 Aug 2021 13:10:32 +0530 Subject: [PATCH 09/12] run all the tests --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9d565f80..303c8484 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-shopify tests/test_pagination.py + run-test --tap=tap-shopify tests - slack/notify-on-failure: only_for_branches: master From 4bbe1510bf5d6bb2d698227902f63c9a3697ad62 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Fri, 27 Aug 2021 12:18:37 +0530 Subject: [PATCH 10/12] made changes according to comments --- tap_shopify/streams/base.py | 1 - tests/test_pagination.py | 4 ---- 2 files changed, 5 deletions(-) diff --git a/tap_shopify/streams/base.py b/tap_shopify/streams/base.py index 9024f6f6..04488bf6 100644 --- a/tap_shopify/streams/base.py +++ b/tap_shopify/streams/base.py @@ -131,7 +131,6 @@ def get_objects(self): stop_time = singer.utils.now().replace(microsecond=0) date_window_size = float(Context.config.get("date_window_size", DATE_WINDOW_SIZE)) - # results_per_page = Context.get_results_per_page(RESULTS_PER_PAGE) # Page through till the end of the resultset while updated_at_min < stop_time: diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 855e81ec..f6050f27 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -67,10 +67,6 @@ def pagination_test(self, conn_id, testable_streams): # verify that we can paginate with all fields selected stream_metadata = self.expected_metadata().get(stream, {}) - # minimum_record_count = stream_metadata.get( - # self.API_LIMIT, - # self.get_properties().get('result_per_page', self.DEFAULT_RESULTS_PER_PAGE) - # ) minimum_record_count = 100 if stream == 'transactions' else api_limit self.assertGreater( record_count_by_stream.get(stream, -1), From aedbea86b54c625d842a60c30a4d3c1967c75a69 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 7 Sep 2021 16:12:02 +0530 Subject: [PATCH 11/12] Added integration test for start date and bookmark both are provided --- tests/test_start_date_and_bookmark.py | 68 +++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 tests/test_start_date_and_bookmark.py diff --git a/tests/test_start_date_and_bookmark.py b/tests/test_start_date_and_bookmark.py new file mode 100644 index 00000000..42363843 --- /dev/null +++ b/tests/test_start_date_and_bookmark.py @@ -0,0 +1,68 @@ +import os +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner + +from base import BaseTapTest + +class TestStartDateAndBookmark(BaseTapTest): + """Test bookmark is used if start date and bookmark both provided in sync""" + + def name(self): + return "tap_tester_shopify_startdate_bookmark_test" + + + def get_properties(self, original: bool = True): + return { + 'start_date': '2017-07-01T00:00:00Z', + 'shop': 'talenddatawearhouse', + 'date_window_size': 30, + 'results_per_page': '50' + } + + @staticmethod + def get_credentials(original_credentials: bool = True): + return { + 'api_key': os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE') + } + + def test_run(self): + conn_id = self.create_connection() + + # Select all incremental streams and all fields within streams + found_catalogs = menagerie.get_catalogs(conn_id) + incremental_streams = {key for key, value in self.expected_replication_method().items() + if value == self.INCREMENTAL} + + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in incremental_streams] + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) + + # Set state with bookmark greater than start date + state = dict() + original_bookmark_value = '2021-04-01T00:00:00Z' + state['bookmarks'] = {stream: {next(iter(self.expected_replication_keys()[stream])): original_bookmark_value} + for stream in incremental_streams} + + menagerie.set_state(conn_id, state) + + # Run a sync job using orchestrator + sync_record_count = self.run_sync(conn_id) + + # Synced records + sync_records = runner.get_records_from_target_output() + + sync_bookmarks = menagerie.get_state(conn_id) + + for stream in incremental_streams: + with self.subTest(stream=stream): + # record messages + sync_messages = sync_records.get(stream, {'messages': []}).get('messages') + + replication_key = next(iter(self.expected_replication_keys()[stream])) + + # Verify that replication key value for all the data are greater than bookmark key + # so it verify that start date is not considered. + for message in sync_messages: + replication_key_value = message.get('data').get(replication_key) + self.assertLess(original_bookmark_value, replication_key_value, + msg="Record with lesser replication key value than bookmark was found.") From d626909976cb9ab2ed678f12783f2baed7d9f229 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Fri, 10 Sep 2021 11:29:24 +0530 Subject: [PATCH 12/12] Removed startdate&bookmark test as it's implicitly covered in bookmark test --- tests/test_start_date_and_bookmark.py | 68 --------------------------- 1 file changed, 68 deletions(-) delete mode 100644 tests/test_start_date_and_bookmark.py diff --git a/tests/test_start_date_and_bookmark.py b/tests/test_start_date_and_bookmark.py deleted file mode 100644 index 42363843..00000000 --- a/tests/test_start_date_and_bookmark.py +++ /dev/null @@ -1,68 +0,0 @@ -import os -import tap_tester.menagerie as menagerie -import tap_tester.runner as runner - -from base import BaseTapTest - -class TestStartDateAndBookmark(BaseTapTest): - """Test bookmark is used if start date and bookmark both provided in sync""" - - def name(self): - return "tap_tester_shopify_startdate_bookmark_test" - - - def get_properties(self, original: bool = True): - return { - 'start_date': '2017-07-01T00:00:00Z', - 'shop': 'talenddatawearhouse', - 'date_window_size': 30, - 'results_per_page': '50' - } - - @staticmethod - def get_credentials(original_credentials: bool = True): - return { - 'api_key': os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE') - } - - def test_run(self): - conn_id = self.create_connection() - - # Select all incremental streams and all fields within streams - found_catalogs = menagerie.get_catalogs(conn_id) - incremental_streams = {key for key, value in self.expected_replication_method().items() - if value == self.INCREMENTAL} - - our_catalogs = [catalog for catalog in found_catalogs if - catalog.get('tap_stream_id') in incremental_streams] - self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) - - # Set state with bookmark greater than start date - state = dict() - original_bookmark_value = '2021-04-01T00:00:00Z' - state['bookmarks'] = {stream: {next(iter(self.expected_replication_keys()[stream])): original_bookmark_value} - for stream in incremental_streams} - - menagerie.set_state(conn_id, state) - - # Run a sync job using orchestrator - sync_record_count = self.run_sync(conn_id) - - # Synced records - sync_records = runner.get_records_from_target_output() - - sync_bookmarks = menagerie.get_state(conn_id) - - for stream in incremental_streams: - with self.subTest(stream=stream): - # record messages - sync_messages = sync_records.get(stream, {'messages': []}).get('messages') - - replication_key = next(iter(self.expected_replication_keys()[stream])) - - # Verify that replication key value for all the data are greater than bookmark key - # so it verify that start date is not considered. - for message in sync_messages: - replication_key_value = message.get('data').get(replication_key) - self.assertLess(original_bookmark_value, replication_key_value, - msg="Record with lesser replication key value than bookmark was found.")