-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-13615: Add Locations stream and TDL-13614: Add Inventory Levels stream #114
Changes from 21 commits
d3a0119
405e383
3a72f74
5934d4f
8f88a01
5971bdf
8e16fa7
f3dc1a3
8737d6e
9880757
58fe001
9f44ddc
92dd92c
f4bd376
1e9867c
985b79f
04585d8
8d35ce3
0a53d8b
073579a
b74e624
839f585
40a9d90
9901649
2f7d93a
7400413
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
{ | ||
"properties": { | ||
"available": { | ||
"type": ["null", "integer"] | ||
}, | ||
"inventory_item_id": { | ||
"type": ["null", "integer"] | ||
}, | ||
"updated_at": { | ||
"type": ["null", "string"], | ||
"format": "date-time" | ||
}, | ||
"location_id": { | ||
"type": ["null", "integer"] | ||
}, | ||
"admin_graphql_api_id": { | ||
"type": ["null", "string"] | ||
} | ||
}, | ||
"type": "object" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{ | ||
"$ref": "definitions.json#/location" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import shopify | ||
from singer.utils import strftime, strptime_to_utc | ||
from tap_shopify.streams.base import (Stream, | ||
RESULTS_PER_PAGE, | ||
shopify_error_handling) | ||
from tap_shopify.context import Context | ||
|
||
class InventoryLevels(Stream): | ||
name = 'inventory_levels' | ||
replication_key = 'updated_at' | ||
key_properties = ['location_id', 'inventory_item_id'] | ||
replication_object = shopify.InventoryLevel | ||
|
||
@shopify_error_handling | ||
def api_call_for_inventory_levels(self, parent_object_id, bookmark): | ||
return self.replication_object.find( | ||
updated_at_min = bookmark, | ||
limit = RESULTS_PER_PAGE, | ||
location_ids=parent_object_id | ||
) | ||
|
||
def get_inventory_levels(self, parent_object, bookmark): | ||
inventory_page = self.api_call_for_inventory_levels(parent_object, bookmark) | ||
yield from inventory_page | ||
|
||
while inventory_page.has_next_page(): | ||
inventory_page = inventory_page.next_page() | ||
yield from inventory_page | ||
|
||
def get_objects(self): | ||
bookmark = self.get_bookmark() | ||
|
||
selected_parent = Context.stream_objects['locations']() | ||
selected_parent.name = "inventory_level_locations" | ||
|
||
# Get all locations data as location id is used for Inventory Level | ||
# If we get locations updated after a bookmark | ||
# then there is possibility of data loss for Inventory Level | ||
# because location is not updated when any Inventory Level is updated inside it. | ||
for parent_object in selected_parent.get_locations_data(): | ||
inventory_levels = self.get_inventory_levels(parent_object.id, bookmark) | ||
for inventory_level in inventory_levels: | ||
yield inventory_level | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not use yield from? What is difference between line 28? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the code and used "yield from" |
||
|
||
def sync(self): | ||
bookmark = self.get_bookmark() | ||
max_bookmark = bookmark | ||
for inventory_level in self.get_objects(): | ||
inventory_level_dict = inventory_level.to_dict() | ||
replication_value = strptime_to_utc(inventory_level_dict[self.replication_key]) | ||
if replication_value >= bookmark: | ||
yield inventory_level_dict | ||
|
||
if replication_value > max_bookmark: | ||
max_bookmark = replication_value | ||
|
||
self.update_bookmark(strftime(max_bookmark)) | ||
|
||
Context.stream_objects['inventory_levels'] = InventoryLevels |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import shopify | ||
from singer import utils | ||
from tap_shopify.streams.base import (Stream, shopify_error_handling) | ||
from tap_shopify.context import Context | ||
|
||
class Locations(Stream): | ||
name = 'locations' | ||
replication_object = shopify.Location | ||
|
||
@shopify_error_handling | ||
def get_locations_data(self): | ||
location_page = self.replication_object.find() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any parameters being missed for find like API for inventory levels? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the location API documentation does not have the parameters like the inventory levels. |
||
yield from location_page | ||
|
||
while location_page.has_next_page(): | ||
location_page = location_page.next_page() | ||
yield from location_page | ||
|
||
def sync(self): | ||
bookmark = self.get_bookmark() | ||
max_bookmark = bookmark | ||
|
||
for location in self.get_locations_data(): | ||
|
||
location_dict = location.to_dict() | ||
replication_value = utils.strptime_to_utc(location_dict[self.replication_key]) | ||
|
||
if replication_value >= bookmark: | ||
yield location_dict | ||
|
||
# update max bookmark if "replication_value" of current location is greater | ||
if replication_value > max_bookmark: | ||
max_bookmark = replication_value | ||
|
||
self.update_bookmark(utils.strftime(max_bookmark)) | ||
|
||
Context.stream_objects['locations'] = Locations |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,17 @@ def name(self): | |
|
||
|
||
def test_run(self): | ||
# skip 'locations' stream as there is not much info about | ||
# limit of records returned in 1 page | ||
# Documentation: https://help.shopify.com/en/manual/locations/setting-up-your-locations | ||
excepted_streams = {'locations'} | ||
with self.subTest(store="store_1"): | ||
conn_id = self.create_connection(original_credentials=True) | ||
self.pagination_test(conn_id, self.store_1_streams) | ||
self.pagination_test(conn_id, self.store_1_streams - excepted_streams) | ||
|
||
with self.subTest(store="store_2"): | ||
conn_id = self.create_connection(original_properties=False, original_credentials=False) | ||
self.pagination_test(conn_id, self.store_2_streams) | ||
self.pagination_test(conn_id, self.store_2_streams - excepted_streams) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this test could you please add the assertions that verify records are unique across pages by checking primary key values. It should leverage tuples in case of any compound primary keys and the assertCountEqual method. https://github.com/singer-io/tap-hubspot/blob/290e5c051839b01e43739d25e1c46b6ddef5a749/tests/test_hubspot_pagination_test.py#L118-L127 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done the changes. |
||
|
||
|
||
def pagination_test(self, conn_id, testable_streams): | ||
|
@@ -55,6 +59,7 @@ def pagination_test(self, conn_id, testable_streams): | |
# Run a sync job using orchestrator | ||
record_count_by_stream = self.run_sync(conn_id) | ||
actual_fields_by_stream = runner.examine_target_output_for_fields() | ||
sync_records = runner.get_records_from_target_output() | ||
|
||
for stream in testable_streams: | ||
with self.subTest(stream=stream): | ||
|
@@ -70,10 +75,13 @@ def pagination_test(self, conn_id, testable_streams): | |
minimum_record_count, | ||
msg="The number of records is not over the stream max limit") | ||
|
||
expected_pk = self.expected_primary_keys() | ||
sync_messages = sync_records.get(stream, {'messages': []}).get('messages') | ||
|
||
# verify that the automatic fields are sent to the target | ||
self.assertTrue( | ||
actual_fields_by_stream.get(stream, set()).issuperset( | ||
self.expected_primary_keys().get(stream, set()) | | ||
expected_pk.get(stream, set()) | | ||
self.expected_replication_keys().get(stream, set()) | | ||
self.expected_foreign_keys().get(stream, set())), | ||
msg="The fields sent to the target don't include all automatic fields" | ||
|
@@ -83,8 +91,18 @@ def pagination_test(self, conn_id, testable_streams): | |
# SKIP THIS ASSERTION IF ALL FIELDS ARE INTENTIONALLY AUTOMATIC FOR THIS STREAM | ||
self.assertTrue( | ||
actual_fields_by_stream.get(stream, set()).symmetric_difference( | ||
self.expected_primary_keys().get(stream, set()) | | ||
expected_pk.get(stream, set()) | | ||
self.expected_replication_keys().get(stream, set()) | | ||
self.expected_foreign_keys().get(stream, set())), | ||
msg="The fields sent to the target don't include non-automatic fields" | ||
) | ||
|
||
# Verify we did not duplicate any records across pages | ||
records_pks_set = {tuple([message.get('data').get(primary_key) | ||
for primary_key in expected_pk.get(stream, set())]) | ||
for message in sync_messages} | ||
records_pks_list = [tuple([message.get('data').get(primary_key) | ||
for primary_key in expected_pk.get(stream, set())]) | ||
for message in sync_messages] | ||
self.assertCountEqual(records_pks_set, records_pks_list, | ||
msg=f"We have duplicate records for {stream}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import unittest | ||
from unittest import mock | ||
from singer.utils import strptime_to_utc | ||
from tap_shopify.context import Context | ||
|
||
INVENTORY_LEVEL_OBJECT = Context.stream_objects['inventory_levels']() | ||
|
||
class Location(): | ||
def __init__(self, id): | ||
self.id = id | ||
|
||
class InventoryLevels(): | ||
def __init__(self, id, updated_at): | ||
self.id = id | ||
self.updated_at = updated_at | ||
|
||
def to_dict(self): | ||
return {"id": self.id, "updated_at": self.updated_at} | ||
|
||
LEVEL_1 = InventoryLevels("inv_level1", "2021-08-11T01:57:05-04:00") | ||
LEVEL_2 = InventoryLevels("inv_level2", "2021-08-12T01:57:05-04:00") | ||
LEVEL_3 = InventoryLevels("inv_level3", "2021-08-13T01:57:05-04:00") | ||
LEVEL_4 = InventoryLevels("inv_level4", "2021-08-14T01:57:05-04:00") | ||
|
||
@mock.patch("tap_shopify.streams.base.Stream.get_bookmark") | ||
class TestInventoryItems(unittest.TestCase): | ||
|
||
@mock.patch("tap_shopify.streams.locations.Locations.get_locations_data") | ||
@mock.patch("tap_shopify.streams.inventory_levels.InventoryLevels.get_inventory_levels") | ||
def test_get_objects_with_locations(self, mock_get_inventory_levels, mock_parent_object, mock_get_bookmark): | ||
''' | ||
Verify that expected data should be emitted for inventory_levels if locations found. | ||
''' | ||
expected_inventory_levels = [LEVEL_1, LEVEL_2, LEVEL_3, LEVEL_4] | ||
location1 = Location("location1") | ||
location2 = Location("location2") | ||
|
||
mock_get_inventory_levels.side_effect = [[LEVEL_1, LEVEL_2], [LEVEL_3, LEVEL_4]] | ||
mock_parent_object.return_value = [location1, location2] | ||
|
||
actual_inventory_levels = list(INVENTORY_LEVEL_OBJECT.get_objects()) | ||
|
||
#Verify that it returns inventory_levels for all locations | ||
self.assertEqual(actual_inventory_levels, expected_inventory_levels) | ||
|
||
@mock.patch("tap_shopify.streams.locations.Locations.get_locations_data") | ||
@mock.patch("tap_shopify.streams.inventory_levels.InventoryLevels.get_inventory_levels") | ||
def test_get_objects_with_no_locations(self, mock_get_inventory_levels, mock_parent_object, mock_get_bookmark): | ||
''' | ||
Verify that no data should be emitted for inventory_levels if no locations found. | ||
''' | ||
# No data for parent stream location | ||
mock_parent_object.return_value = [] | ||
expected_inventory_levels = [] | ||
|
||
actual_inventory_levels = list(INVENTORY_LEVEL_OBJECT.get_objects()) | ||
|
||
# No get_inventory_levels should be called and no data should be returned | ||
self.assertEqual(actual_inventory_levels, expected_inventory_levels) | ||
self.assertEqual(mock_get_inventory_levels.call_count, 0) | ||
|
||
@mock.patch("tap_shopify.streams.inventory_levels.InventoryLevels.get_objects") | ||
def test_sync(self, mock_get_objects, mock_get_bookmark): | ||
''' | ||
Verify that only data updated after specific bookmark are yielded from sync. | ||
''' | ||
|
||
expected_sync = [LEVEL_3.to_dict(), LEVEL_4.to_dict()] | ||
mock_get_objects.return_value = [LEVEL_1, LEVEL_2, LEVEL_3, LEVEL_4] | ||
|
||
mock_get_bookmark.return_value = strptime_to_utc("2021-08-13T01:05:05-04:00") | ||
|
||
actual_sync = list(INVENTORY_LEVEL_OBJECT.sync()) | ||
|
||
#Verify that only 2 record syncs | ||
self.assertEqual(actual_sync, expected_sync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change it to "Add read_inventory scope for access token, re-authorize the connect to sync and get Inventory levels /Inventory items streams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated