From bc81684ce3cf8363ec3e0bafa2148a7a65e448e4 Mon Sep 17 00:00:00 2001 From: Dan Mosora <30501696+dmosorast@users.noreply.github.com> Date: Tue, 24 Mar 2020 13:44:20 -0400 Subject: [PATCH] Move to API version 2019-10 and bump SDK version to latest (#54) * Move to API version 2019-07 and bump SDK version to latest * Move tests to this repo and bump api version to 2019-10 * Fix start date test --- .circleci/config.yml | 16 +- Makefile | 2 +- setup.py | 2 +- tap_shopify/__init__.py | 3 +- tap_shopify/schemas/collects.json | 6 - tests/base.py | 278 ++++++++++++++++++ tests/sync_rows.py | 72 +++++ tests/test_automatic_fields.py | 66 +++++ tests/test_bookmarks.py | 138 +++++++++ tests/test_discovery.py | 164 +++++++++++ tests/test_full_replication.py | 95 ++++++ tests/test_pagination.py | 71 +++++ tests/test_start_date.py | 122 ++++++++ .../test_transaction_canonicalize.py | 0 14 files changed, 1018 insertions(+), 17 deletions(-) create mode 100644 tests/base.py create mode 100644 tests/sync_rows.py create mode 100644 tests/test_automatic_fields.py create mode 100644 tests/test_bookmarks.py create mode 100644 tests/test_discovery.py create mode 100644 tests/test_full_replication.py create mode 100644 tests/test_pagination.py create mode 100644 tests/test_start_date.py rename tests/{ => unittests}/test_transaction_canonicalize.py (100%) diff --git a/.circleci/config.yml b/.circleci/config.yml index 52562534..4bcecbd9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2 jobs: build: docker: - - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4 steps: - checkout - run: @@ -24,13 +24,13 @@ jobs: aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-a-test --tap=tap-shopify \ - --target=target-stitch \ - --orchestrator=stitch-orchestrator \ - --email=harrison+sandboxtest@stitchdata.com \ - --password=$SANDBOX_PASSWORD \ - --client-id=50 \ - tap_tester.suites.shopify + run-test --tap=tap-shopify \ + --target=target-stitch \ + --orchestrator=stitch-orchestrator \ + --email=harrison+sandboxtest@stitchdata.com \ + --password=$SANDBOX_PASSWORD \ + --client-id=50 \ + tests workflows: version: 2 commit: diff --git a/Makefile b/Makefile index f4d2f81e..21529fb9 100644 --- a/Makefile +++ b/Makefile @@ -2,4 +2,4 @@ test: pylint tap_shopify -d missing-docstring - nosetests + nosetests tests/unittests diff --git a/setup.py b/setup.py index 43fb1594..debb6e8f 100755 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ classifiers=["Programming Language :: Python :: 3 :: Only"], py_modules=["tap_shopify"], install_requires=[ - "ShopifyAPI==3.1.0", + "ShopifyAPI==7.0.1", "singer-python==5.4.1", ], extras_require={ diff --git a/tap_shopify/__init__.py b/tap_shopify/__init__.py index 0a6f4506..e36be07c 100644 --- a/tap_shopify/__init__.py +++ b/tap_shopify/__init__.py @@ -20,7 +20,8 @@ def initialize_shopify_client(): api_key = Context.config['api_key'] shop = Context.config['shop'] - session = shopify.Session(shop, api_key) + version = '2019-10' + session = shopify.Session(shop, version, api_key) shopify.ShopifyResource.activate_session(session) def get_abs_path(path): diff --git a/tap_shopify/schemas/collects.json b/tap_shopify/schemas/collects.json index c8070f0b..acaab19c 100644 --- a/tap_shopify/schemas/collects.json +++ b/tap_shopify/schemas/collects.json @@ -20,12 +20,6 @@ ], "format": "date-time" }, - "featured": { - "type": [ - "null", - "boolean" - ] - }, "position": { "type": [ "null", diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 00000000..e4c48d48 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,278 @@ +""" +Setup expectations for test sub classes +Run discovery for as a prerequisite for most tests +""" +import unittest +import os +from datetime import datetime as dt +from datetime import timezone as tz + +from tap_tester import connections, menagerie, runner + + +class BaseTapTest(unittest.TestCase): + """ + Setup expectations for test sub classes + Run discovery for as a prerequisite for most tests + """ + + REPLICATION_KEYS = "valid-replication-keys" + PRIMARY_KEYS = "table-key-properties" + FOREIGN_KEYS = "table-foreign-key-properties" + REPLICATION_METHOD = "forced-replication-method" + API_LIMIT = "max-row-limit" + INCREMENTAL = "INCREMENTAL" + FULL = "FULL_TABLE" + START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + + @staticmethod + def name(): + """The name of the test within the suite""" + return "test_name" + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-shopify" + + @staticmethod + def get_type(): + """the expected url route ending""" + return "platform.shopify" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'start_date': '2017-07-01T00:00:00Z', + 'shop': 'stitchdatawearhouse', + 'date_window_size': 30 + } + + if original: + return return_value + + # This test needs the new connections start date to be larger than the default + assert self.start_date > return_value["start_date"] + + return_value["start_date"] = self.start_date + return return_value + + @staticmethod + def get_credentials(): + """Authentication information for the test account""" + return { + 'api_key': os.getenv('TAP_SHOPIFY_API_KEY') + } + + def expected_metadata(self): + """The expected streams and metadata about the streams""" + + default = { + self.REPLICATION_KEYS: {"updated_at"}, + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.API_LIMIT: 250} + + meta = default.copy() + meta.update({self.FOREIGN_KEYS: {"owner_id", "owner_resource"}}) + + return { + "abandoned_checkouts": default, + "collects": default, + "custom_collections": default, + "customers": default, + "orders": default, + "order_refunds": { + self.REPLICATION_KEYS: {"created_at"}, + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.API_LIMIT: 250}, + "products": default, + "metafields": meta, + "transactions": { + self.REPLICATION_KEYS: {"created_at"}, + self.PRIMARY_KEYS: {"id"}, + self.FOREIGN_KEYS: {"order_id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.API_LIMIT: 250} + } + + def expected_streams(self): + """A set of expected stream names""" + return set(self.expected_metadata().keys()) + + def child_streams(self): + """ + Return a set of streams that are child streams + based on having foreign key metadata + """ + return {stream for stream, metadata in self.expected_metadata().items() + if metadata.get(self.FOREIGN_KEYS)} + + def expected_primary_keys(self): + """ + return a dictionary with key of table name + and value as a set of primary key fields + """ + return {table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_keys(self): + """ + return a dictionary with key of table name + and value as a set of replication key fields + """ + return {table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_foreign_keys(self): + """ + return a dictionary with key of table name + and value as a set of foreign key fields + """ + return {table: properties.get(self.FOREIGN_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_method(self): + """return a dictionary with key of table name nd value of replication method""" + return {table: properties.get(self.REPLICATION_METHOD, None) + for table, properties + in self.expected_metadata().items()} + + def setUp(self): + """Verify that you have set the prerequisites to run the tap (creds, etc.)""" + missing_envs = [x for x in [os.getenv('TAP_SHOPIFY_API_KEY')] if x is None] + if missing_envs: + raise Exception("set environment variables") + + def test_run(self): + """ + Default Test Setup + Remove previous connections (with the same name) + Create a new connection (with the properties and credentials above) + Run discovery and ensure it completes successfully + """ + self.do_test(self.create_connection()) + + def do_test(self, conn_id): + """A placeholder test to override in sub-class tests""" + + ######################### + # Helper Methods # + ######################### + + def create_connection(self, original_properties: bool = True): + """Create a new connection with the test name""" + # Create the connection + conn_id = connections.ensure_connection(self, original_properties) + + # Run a check job using orchestrator (discovery) + check_job_name = runner.run_check_mode(self, conn_id) + + # Assert that the check job succeeded + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + return conn_id + + def run_sync(self, conn_id): + """ + Run a sync job and make sure it exited properly. + Return a dictionary with keys of streams synced + and values of records synced for each stream + """ + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_primary_keys()) + return sync_record_count + + @staticmethod + def local_to_utc(date: dt): + """Convert a datetime with timezone information to utc""" + utc = dt(date.year, date.month, date.day, date.hour, date.minute, + date.second, date.microsecond, tz.utc) + + if date.tzinfo and hasattr(date.tzinfo, "_offset"): + utc += date.tzinfo._offset + + return utc + + def max_bookmarks_by_stream(self, sync_records): + """ + Return the maximum value for the replication key for each stream + which is the bookmark expected value. + + Comparisons are based on the class of the bookmark value. Dates will be + string compared which works for ISO date-time strings + """ + max_bookmarks = {} + for stream, batch in sync_records.items(): + + upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + stream_bookmark_key = stream_bookmark_key.pop() + + bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] + max_bookmarks[stream] = {stream_bookmark_key: None} + for bk_value in bk_values: + if bk_value is None: + continue + + if max_bookmarks[stream][stream_bookmark_key] is None: + max_bookmarks[stream][stream_bookmark_key] = bk_value + + if bk_value > max_bookmarks[stream][stream_bookmark_key]: + max_bookmarks[stream][stream_bookmark_key] = bk_value + return max_bookmarks + + def min_bookmarks_by_stream(self, sync_records): + """Return the minimum value for the replication key for each stream""" + min_bookmarks = {} + for stream, batch in sync_records.items(): + + upsert_messages = [m for m in batch.get('messages') if m['action'] == 'upsert'] + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len(stream_bookmark_key) == 1 # There shouldn't be a compound replication key + (stream_bookmark_key, ) = stream_bookmark_key + + bk_values = [message["data"].get(stream_bookmark_key) for message in upsert_messages] + min_bookmarks[stream] = {stream_bookmark_key: None} + for bk_value in bk_values: + if bk_value is None: + continue + + if min_bookmarks[stream][stream_bookmark_key] is None: + min_bookmarks[stream][stream_bookmark_key] = bk_value + + if bk_value < min_bookmarks[stream][stream_bookmark_key]: + min_bookmarks[stream][stream_bookmark_key] = bk_value + return min_bookmarks + + @staticmethod + def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.start_date = self.get_properties().get("start_date") diff --git a/tests/sync_rows.py b/tests/sync_rows.py new file mode 100644 index 00000000..c3371725 --- /dev/null +++ b/tests/sync_rows.py @@ -0,0 +1,72 @@ +import os +import uuid +import singer +from tap_tester.scenario import (SCENARIOS) +import tap_tester.connections as connections +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner +from base import ShopifyTest +import unittest +from functools import reduce +from singer import metadata + +LOGGER = singer.get_logger() + +# The token used to authenticate our requests was generated on +# [2018-09-18](https://github.com/stitchdata/environments/commit/82609cef972fd631c628b8eb733f37eea8f5d4f4). +# If it ever expires, you'll need to login to Shopify via the 1Password +# login creds, setup a Shopify integration on your VM, and copy the new +# token out of the connections credentials into the environments repo. +class ShopifySyncRows(ShopifyTest): + def name(self): + return "tap_tester_shopify_sync_rows" + + def expected_sync_streams(self): + return { + 'orders', + } + + def expected_pks(self): + return { + 'orders': {'id'}, + } + + def do_test(self, conn_id): + # Select our catalogs + our_catalogs = [c for c in self.found_catalogs if c.get('tap_stream_id') in self.expected_sync_streams()] + for c in our_catalogs: + c_annotated = menagerie.get_annotated_schema(conn_id, c['stream_id']) + c_metadata = metadata.to_map(c_annotated['metadata']) + connections.select_catalog_and_fields_via_metadata(conn_id, c, c_annotated, [], []) + + # Clear state before our run + menagerie.set_state(conn_id, {}) + + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + record_count_by_stream = runner.examine_target_output_file(self, conn_id, self.expected_sync_streams(), self.expected_pks()) + replicated_row_count = reduce(lambda accum,c : accum + c, record_count_by_stream.values()) + self.assertGreater(replicated_row_count, 0, msg="failed to replicate any data: {}".format(record_count_by_stream)) + print("total replicated row count: {}".format(replicated_row_count)) + + # Ensure all records have a value for PK(s) + records = runner.get_records_from_target_output() + for stream in self.expected_sync_streams(): + messages = records.get(stream).get('messages') + for m in messages: + pk_set = self.expected_pks()[stream] + for pk in pk_set: + self.assertIsNotNone(m.get('data', {}).get(pk), msg="oh no! {}".format(m)) + + bookmarks = menagerie.get_state(conn_id)['bookmarks'] + + self.assertTrue('orders' in bookmarks) + + +SCENARIOS.add(ShopifySyncRows) diff --git a/tests/test_automatic_fields.py b/tests/test_automatic_fields.py new file mode 100644 index 00000000..9a47b5c1 --- /dev/null +++ b/tests/test_automatic_fields.py @@ -0,0 +1,66 @@ +""" +Test that with no fields selected for a stream automatic fields are still replicated +""" + +from tap_tester import runner, menagerie + +from tap_tester.scenario import SCENARIOS +from base import BaseTapTest + + +class MinimumSelectionTest(BaseTapTest): + """Test that with no fields selected for a stream automatic fields are still replicated""" + + def name(self): + return "tap_tester_shopify_no_fields_test" + + def do_test(self, conn_id): + """ + Verify that for each stream you can get multiple pages of data + when no fields are selected and only the automatic fields are replicated. + + PREREQUISITE + For EACH stream add enough data that you surpass the limit of a single + fetch of data. For instance if you have a limit of 250 records ensure + that 251 (or more) records have been posted for that stream. + """ + incremental_streams = {key for key, value in self.expected_replication_method().items() + if value == self.INCREMENTAL} + + # Select all streams and no fields within streams + # IF THERE ARE NO AUTOMATIC FIELDS FOR A STREAM + # WE WILL NEED TO UPDATE THE BELOW TO SELECT ONE + found_catalogs = menagerie.get_catalogs(conn_id) + untested_streams = self.child_streams().union({'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds'}) + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in incremental_streams.difference( + untested_streams)] + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=False) + + # Run a sync job using orchestrator + record_count_by_stream = self.run_sync(conn_id) + + actual_fields_by_stream = runner.examine_target_output_for_fields() + + for stream in self.expected_streams().difference(untested_streams): + with self.subTest(stream=stream): + + # verify that you get more than a page of data + # SKIP THIS ASSERTION FOR STREAMS WHERE YOU CANNOT GET + # MORE THAN 1 PAGE OF DATA IN THE TEST ACCOUNT + self.assertGreater( + record_count_by_stream.get(stream, -1), + self.expected_metadata().get(stream, {}).get(self.API_LIMIT, 0), + msg="The number of records is not over the stream max limit") + + # verify that only the automatic fields are sent to the target + self.assertEqual( + actual_fields_by_stream.get(stream, set()), + self.expected_primary_keys().get(stream, set()) | + self.expected_replication_keys().get(stream, set()) | + self.expected_foreign_keys().get(stream, set()), + msg="The fields sent to the target are not the automatic fields" + ) + + +SCENARIOS.add(MinimumSelectionTest) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py new file mode 100644 index 00000000..f6b3114b --- /dev/null +++ b/tests/test_bookmarks.py @@ -0,0 +1,138 @@ +""" +Test tap sets a bookmark and respects it for the next sync of a stream +""" +from datetime import datetime as dt + +from dateutil.parser import parse + +from tap_tester import menagerie, runner +from tap_tester.scenario import SCENARIOS +from base import BaseTapTest + + +class BookmarkTest(BaseTapTest): + """Test tap sets a bookmark and respects it for the next sync of a stream""" + + def name(self): + return "tap_tester_shopify_bookmark_test" + + def do_test(self, conn_id): + """ + Verify that for each stream you can do a sync which records bookmarks. + That the bookmark is the maximum value sent to the target for the replication key. + That a second sync respects the bookmark + All data of the second sync is > the bookmark from the first sync + The number of records in the 2nd sync is less then the first (This assumes that + new data added to the stream is done at a rate slow enough that you haven't + doubled the amount of data from the start date to the first sync between + the first sync and second sync run in this test) + + Verify that only data for incremental streams is sent to the target + + PREREQUISITE + For EACH stream that is incrementally replicated there are multiple rows of data with + different values for the replication key + """ + # Select all streams and no fields within streams + found_catalogs = menagerie.get_catalogs(conn_id) + incremental_streams = {key for key, value in self.expected_replication_method().items() + if value == self.INCREMENTAL} + + # Our test data sets for Shopify do not have any abandoned_checkouts + untested_streams = self.child_streams().union({'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds'}) + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in incremental_streams.difference( + untested_streams)] + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=False) + + # Run a sync job using orchestrator + first_sync_record_count = self.run_sync(conn_id) + + # verify that the sync only sent records to the target for selected streams (catalogs) + self.assertEqual(set(first_sync_record_count.keys()), + incremental_streams.difference(untested_streams)) + + first_sync_state = menagerie.get_state(conn_id) + + # Get data about actual rows synced + first_sync_records = runner.get_records_from_target_output() + first_max_bookmarks = self.max_bookmarks_by_stream(first_sync_records) + first_min_bookmarks = self.min_bookmarks_by_stream(first_sync_records) + + # Run a second sync job using orchestrator + second_sync_record_count = self.run_sync(conn_id) + + # Get data about rows synced + second_sync_records = runner.get_records_from_target_output() + second_min_bookmarks = self.min_bookmarks_by_stream(second_sync_records) + + # THIS MAKES AN ASSUMPTION THAT CHILD STREAMS DO NOT HAVE BOOKMARKS. + # ADJUST IF NECESSARY + for stream in incremental_streams.difference(untested_streams): + with self.subTest(stream=stream): + + # get bookmark values from state and target data + stream_bookmark_key = self.expected_replication_keys().get(stream, set()) + assert len( + stream_bookmark_key) == 1 # There shouldn't be a compound replication key + stream_bookmark_key = stream_bookmark_key.pop() + + state_value = first_sync_state.get("bookmarks", {}).get( + stream, {None: None}).get(stream_bookmark_key) + target_value = first_max_bookmarks.get( + stream, {None: None}).get(stream_bookmark_key) + target_min_value = first_min_bookmarks.get( + stream, {None: None}).get(stream_bookmark_key) + + try: + # attempt to parse the bookmark as a date + if state_value: + if isinstance(state_value, str): + state_value = self.local_to_utc(parse(state_value)) + if isinstance(state_value, int): + state_value = self.local_to_utc(dt.utcfromtimestamp(state_value)) + + if target_value: + if isinstance(target_value, str): + target_value = self.local_to_utc(parse(target_value)) + if isinstance(target_value, int): + target_value = self.local_to_utc(dt.utcfromtimestamp(target_value)) + + if target_min_value: + if isinstance(target_min_value, str): + target_min_value = self.local_to_utc(parse(target_min_value)) + if isinstance(target_min_value, int): + target_min_value = self.local_to_utc( + dt.utcfromtimestamp(target_min_value)) + + except (OverflowError, ValueError, TypeError): + print("bookmarks cannot be converted to dates, comparing values directly") + + # verify that there is data with different bookmark values - setup necessary + self.assertGreaterEqual(target_value, target_min_value, + msg="Data isn't set up to be able to test bookmarks") + + # verify state agrees with target data after 1st sync + self.assertGreaterEqual(state_value, target_value, + msg="The bookmark value isn't correct based on target data") + + # verify that you get less data the 2nd time around + self.assertGreater( + first_sync_record_count.get(stream, 0), + second_sync_record_count.get(stream, 0), + msg="second syc didn't have less records, bookmark usage not verified") + + # verify all data from 2nd sync >= 1st bookmark + target_value = second_min_bookmarks.get( + stream, {None: None}).get(stream_bookmark_key) + try: + if target_value: + if isinstance(target_value, str): + target_value = self.local_to_utc(parse(target_value)) + if isinstance(target_value, int): + target_value = self.local_to_utc(dt.utcfromtimestamp(target_value)) + + except (OverflowError, ValueError, TypeError): + print("bookmarks cannot be converted to dates, comparing values directly") + +SCENARIOS.add(BookmarkTest) diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 00000000..5ca829be --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,164 @@ +""" +Test tap discovery +""" +import re + +from tap_tester import menagerie + +from tap_tester.scenario import SCENARIOS +from base import BaseTapTest + + +class DiscoveryTest(BaseTapTest): + """ Test the tap discovery """ + + def name(self): + return "tap_tester_shopify_discovery_test" + + def do_test(self, conn_id): + """ + Verify that discover creates the appropriate catalog, schema, metadata, etc. + + • Verify number of actual streams discovered match expected + • Verify the stream names discovered were what we expect + • Verify stream names follow naming convention + streams should only have lowercase alphas and underscores + • verify there is only 1 top level breadcrumb + • verify replication key(s) + • verify primary key(s) + • verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + • verify the actual replication matches our expected replication method + • verify that primary, replication and foreign keys + are given the inclusion of automatic (metadata and annotated schema). + • verify that all other fields have inclusion of available (metadata and schema) + """ + + # Verify number of actual streams discovered match expected + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len(found_catalogs), 0, + msg="unable to locate schemas for connection {}".format(conn_id)) + self.assertEqual(len(found_catalogs), + len(self.expected_streams()), + msg="Expected {} streams, actual was {} for connection {}," + " actual {}".format( + len(self.expected_streams()), + len(found_catalogs), + found_catalogs, + conn_id)) + + # Verify the stream names discovered were what we expect + found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} + self.assertEqual(set(self.expected_streams()), + set(found_catalog_names), + msg="Expected streams don't match actual streams") + + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming") + + for stream in self.expected_streams(): + with self.subTest(stream=stream): + catalog = next(iter([catalog for catalog in found_catalogs + if catalog["stream_name"] == stream])) + assert catalog # based on previous tests this should always be found + + schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + metadata = schema_and_metadata["metadata"] + schema = schema_and_metadata["annotated-schema"] + + # verify the stream level properties are as expected + # verify there is only 1 top level breadcrumb + stream_properties = [item for item in metadata if item.get("breadcrumb") == []] + self.assertTrue(len(stream_properties) == 1, + msg="There is more than one top level breadcrumb") + + # verify replication key(s) + self.assertEqual( + set(stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, [])), + self.expected_replication_keys()[stream], + msg="expected replication key {} but actual is {}".format( + self.expected_replication_keys()[stream], + set(stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: None}).get( + self.REPLICATION_KEYS, [])))) + + # verify primary key(s) + self.assertEqual( + set(stream_properties[0].get( + "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, [])), + self.expected_primary_keys()[stream], + msg="expected primary key {} but actual is {}".format( + self.expected_primary_keys()[stream], + set(stream_properties[0].get( + "metadata", {self.PRIMARY_KEYS: None}).get(self.PRIMARY_KEYS, [])))) + + # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + actual_replication_method = stream_properties[0].get( + "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + if stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []): + + self.assertTrue(actual_replication_method == self.INCREMENTAL, + msg="Expected INCREMENTAL replication " + "since there is a replication key") + else: + self.assertTrue(actual_replication_method == self.FULL, + msg="Expected FULL replication " + "since there is no replication key") + + # verify the actual replication matches our expected replication method + self.assertEqual( + self.expected_replication_method().get(stream, None), + actual_replication_method, + msg="The actual replication method {} doesn't match the expected {}".format( + actual_replication_method, + self.expected_replication_method().get(stream, None))) + + expected_primary_keys = self.expected_primary_keys()[stream] + expected_replication_keys = self.expected_replication_keys()[stream] + expected_automatic_fields = expected_primary_keys | expected_replication_keys + + # verify that primary, replication and foreign keys + # are given the inclusion of automatic in annotated schema. + actual_automatic_fields = {key for key, value in schema["properties"].items() + if value.get("inclusion") == "automatic"} + self.assertEqual(expected_automatic_fields, + actual_automatic_fields, + msg="expected {} automatic fields but got {}".format( + expected_automatic_fields, + actual_automatic_fields)) + + # verify that all other fields have inclusion of available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all({value.get("inclusion") == "available" for key, value + in schema["properties"].items() + if key not in actual_automatic_fields}), + msg="Not all non key properties are set to available in annotated schema") + + # verify that primary, replication and foreign keys + # are given the inclusion of automatic in metadata. + actual_automatic_fields = \ + {item.get("breadcrumb", ["properties", None])[1] + for item in metadata + if item.get("metadata").get("inclusion") == "automatic"} + self.assertEqual(expected_automatic_fields, + actual_automatic_fields, + msg="expected {} automatic fields but got {}".format( + expected_automatic_fields, + actual_automatic_fields)) + + # verify that all other fields have inclusion of available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all({item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] + not in actual_automatic_fields}), + msg="Not all non key properties are set to available in metadata") + + +SCENARIOS.add(DiscoveryTest) diff --git a/tests/test_full_replication.py b/tests/test_full_replication.py new file mode 100644 index 00000000..74c898f9 --- /dev/null +++ b/tests/test_full_replication.py @@ -0,0 +1,95 @@ +""" +Test tap gets all records for streams with full replication +""" +import json + +from tap_tester import menagerie, runner + +from tap_tester.scenario import SCENARIOS +from base import BaseTapTest + + +class FullReplicationTest(BaseTapTest): + """Test tap gets all records for streams with full replication""" + + def name(self): + return "tap_tester_shopify_full_test" + + def do_test(self, conn_id): + """ + Verify that a bookmark doesn't exist for the stream + Verify that the second sync includes the same number or more records than the first sync + Verify that all records in the first sync are included in the second sync + Verify that the sync only sent records to the target for selected streams (catalogs) + + PREREQUISITE + For EACH stream that is fully replicated there are multiple rows of data with + different values for the replication key + """ + # Select all streams and no fields within streams + found_catalogs = menagerie.get_catalogs(conn_id) + full_streams = {key for key, value in self.expected_replication_method().items() + if value == self.FULL} + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in full_streams] + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) + + # Run a sync job using orchestrator + first_sync_record_count = self.run_sync(conn_id) + + # verify that the sync only sent records to the target for selected streams (catalogs) + self.assertEqual(set(first_sync_record_count.keys()), full_streams) + + first_sync_state = menagerie.get_state(conn_id) + + # Get the set of records from a first sync + first_sync_records = runner.get_records_from_target_output() + + # Run a second sync job using orchestrator + second_sync_record_count = self.run_sync(conn_id) + + # Get the set of records from a second sync + second_sync_records = runner.get_records_from_target_output() + + # THIS MAKES AN ASSUMPTION THAT CHILD STREAMS DO NOT NEED TESTING. + # ADJUST IF NECESSARY + for stream in full_streams.difference(self.child_streams()): + with self.subTest(stream=stream): + + # verify there is no bookmark values from state + state_value = first_sync_state.get("bookmarks", {}).get(stream) + self.assertIsNone(state_value) + + # verify that there is more than 1 record of data - setup necessary + self.assertGreater(first_sync_record_count.get(stream, 0), 1, + msg="Data isn't set up to be able to test full sync") + + # verify that you get the same or more data the 2nd time around + self.assertGreaterEqual( + second_sync_record_count.get(stream, 0), + first_sync_record_count.get(stream, 0), + msg="second syc didn't have more records, full sync not verified") + + # verify all data from 1st sync included in 2nd sync + first_data = [record["data"] for record + in first_sync_records.get(stream, {}).get("messages", {"data": {}})] + second_data = [record["data"] for record + in second_sync_records.get(stream, {}).get("messages", {"data": {}})] + + same_records = 0 + for first_record in first_data: + first_value = json.dumps(first_record, sort_keys=True) + + for compare_record in second_data: + compare_value = json.dumps(compare_record, sort_keys=True) + + if first_value == compare_value: + second_data.remove(compare_record) + same_records += 1 + break + + self.assertEqual(len(first_data), same_records, + msg="Not all data from the first sync was in the second sync") + + +SCENARIOS.add(FullReplicationTest) diff --git a/tests/test_pagination.py b/tests/test_pagination.py new file mode 100644 index 00000000..b24d1751 --- /dev/null +++ b/tests/test_pagination.py @@ -0,0 +1,71 @@ +""" +Test tap pagination of streams +""" +from tap_tester import menagerie, runner + +from tap_tester.scenario import SCENARIOS +from base import BaseTapTest + + +class PaginationTest(BaseTapTest): + """ Test the tap pagination to get multiple pages of data """ + + def name(self): + return "tap_tester_shopify_pagination_test" + + def do_test(self, conn_id): + """ + Verify that for each stream you can get multiple pages of data + and that when all fields are selected more than the automatic fields are replicated. + + PREREQUISITE + For EACH stream add enough data that you surpass the limit of a single + fetch of data. For instance if you have a limit of 250 records ensure + that 251 (or more) records have been posted for that stream. + """ + # Select all streams and all fields within streams + found_catalogs = menagerie.get_catalogs(conn_id) + incremental_streams = {key for key, value in self.expected_replication_method().items() + if value == self.INCREMENTAL} + + untested_streams = self.child_streams().union({'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds'}) + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in incremental_streams.difference( + untested_streams)] + + + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) + # Run a sync job using orchestrator + record_count_by_stream = self.run_sync(conn_id) + actual_fields_by_stream = runner.examine_target_output_for_fields() + + for stream in self.expected_streams().difference(untested_streams): + with self.subTest(stream=stream): + + # verify that we can paginate with all fields selected + self.assertGreater( + record_count_by_stream.get(stream, -1), + self.expected_metadata().get(stream, {}).get(self.API_LIMIT, 0), + msg="The number of records is not over the stream max limit") + + # verify that the automatic fields are sent to the target + self.assertTrue( + actual_fields_by_stream.get(stream, set()).issuperset( + self.expected_primary_keys().get(stream, set()) | + self.expected_replication_keys().get(stream, set()) | + self.expected_foreign_keys().get(stream, set())), + msg="The fields sent to the target don't include all automatic fields" + ) + + # verify we have more fields sent to the target than just automatic fields + # SKIP THIS ASSERTION IF ALL FIELDS ARE INTENTIONALLY AUTOMATIC FOR THIS STREAM + self.assertTrue( + actual_fields_by_stream.get(stream, set()).symmetric_difference( + self.expected_primary_keys().get(stream, set()) | + self.expected_replication_keys().get(stream, set()) | + self.expected_foreign_keys().get(stream, set())), + msg="The fields sent to the target don't include non-automatic fields" + ) + + +SCENARIOS.add(PaginationTest) diff --git a/tests/test_start_date.py b/tests/test_start_date.py new file mode 100644 index 00000000..bfcded25 --- /dev/null +++ b/tests/test_start_date.py @@ -0,0 +1,122 @@ +""" +Test that the start_date configuration is respected +""" + +from functools import reduce + +from dateutil.parser import parse + +from tap_tester import menagerie, runner + +from tap_tester.scenario import SCENARIOS +from base import BaseTapTest + + +class StartDateTest(BaseTapTest): + """ + Test that the start_date configuration is respected + + • verify that a sync with a later start date has at least one record synced + and less records than the 1st sync with a previous start date + • verify that each stream has less records than the earlier start date sync + • verify all data from later start data has bookmark values >= start_date + • verify that the minimum bookmark sent to the target for the later start_date sync + is greater than or equal to the start date + """ + + def name(self): + return "tap_tester_shopify_start_date_test" + + def do_test(self, conn_id): + """Test we get a lot of data back based on the start date configured in base""" + + # Select all streams and all fields within streams + found_catalogs = menagerie.get_catalogs(conn_id) + incremental_streams = {key for key, value in self.expected_replication_method().items() + if value == self.INCREMENTAL} + + # IF THERE ARE STREAMS THAT SHOULD NOT BE TESTED + # REPLACE THE EMPTY SET BELOW WITH THOSE STREAMS + untested_streams = self.child_streams().union({'abandoned_checkouts', 'collects', 'metafields', 'transactions', 'order_refunds'}) + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in incremental_streams.difference( + untested_streams)] + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) + + # Run a sync job using orchestrator + first_sync_record_count = self.run_sync(conn_id) + first_total_records = reduce(lambda a, b: a + b, first_sync_record_count.values()) + + # Count actual rows synced + first_sync_records = runner.get_records_from_target_output() + + # set the start date for a new connection based off bookmarks largest value + first_max_bookmarks = self.max_bookmarks_by_stream(first_sync_records) + bookmark_list = [next(iter(book.values())) for stream, book in first_max_bookmarks.items()] + bookmark_dates = [] + for bookmark in bookmark_list: + try: + bookmark_dates.append(parse(bookmark)) + except (ValueError, OverflowError, TypeError): + pass + + if not bookmark_dates: + # THERE WERE NO BOOKMARKS THAT ARE DATES. + # REMOVE CODE TO FIND A START DATE AND ENTER ONE MANUALLY + raise ValueError + + largest_bookmark = reduce(lambda a, b: a if a > b else b, bookmark_dates) + self.start_date = self.local_to_utc(largest_bookmark) \ + .replace(hour=0, minute=0, second=0) \ + .strftime(self.START_DATE_FORMAT) + + # create a new connection with the new start_date + conn_id = self.create_connection(original_properties=False) + + # Select all streams and all fields within streams + found_catalogs = menagerie.get_catalogs(conn_id) + our_catalogs = [catalog for catalog in found_catalogs if + catalog.get('tap_stream_id') in incremental_streams.difference( + untested_streams)] + self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) + + # Run a sync job using orchestrator + second_sync_record_count = self.run_sync(conn_id) + second_total_records = reduce(lambda a, b: a + b, second_sync_record_count.values(), 0) + second_sync_records = runner.get_records_from_target_output() + second_min_bookmarks = self.min_bookmarks_by_stream(second_sync_records) + + # verify that at least one record synced and less records synced than the 1st connection + self.assertGreater(second_total_records, 0) + self.assertLess(second_total_records, first_total_records) + + for stream in incremental_streams.difference(untested_streams): + with self.subTest(stream=stream): + + # verify that each stream has less records than the first connection sync + self.assertGreater( + first_sync_record_count.get(stream, 0), + second_sync_record_count.get(stream, 0), + msg="second had more records, start_date usage not verified") + + # verify all data from 2nd sync >= start_date + target_mark = second_min_bookmarks.get(stream, {"mark": None}) + target_value = next(iter(target_mark.values())) # there should be only one + + if target_value: + + # it's okay if there isn't target data for a stream + try: + target_value = self.local_to_utc(parse(target_value)) + + # verify that the minimum bookmark sent to the target for the second sync + # is greater than or equal to the start date + self.assertGreaterEqual(target_value, + self.local_to_utc(parse(self.start_date))) + + except (OverflowError, ValueError, TypeError): + print("bookmarks cannot be converted to dates, " + "can't test start_date for {}".format(stream)) + + +SCENARIOS.add(StartDateTest) diff --git a/tests/test_transaction_canonicalize.py b/tests/unittests/test_transaction_canonicalize.py similarity index 100% rename from tests/test_transaction_canonicalize.py rename to tests/unittests/test_transaction_canonicalize.py