-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Facebook marketing: Fix duplicating records during insights lookback period #13047
Changes from 17 commits
6353c19
638a2ae
5f441d8
3838e0a
4e6a887
f6cfaaa
0b02887
8edabdb
1e93615
b1ce5a0
d445d9c
9df644c
b3fe9f1
1c962dc
0ca0924
2ccb7ab
8c188f6
58be0f6
23f38b1
22ac460
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,9 +102,17 @@ def read_records( | |
stream_state: Mapping[str, Any] = None, | ||
) -> Iterable[Mapping[str, Any]]: | ||
"""Waits for current job to finish (slice) and yield its result""" | ||
|
||
date_start = stream_state and stream_state.get("date_start") | ||
if date_start: | ||
date_start = pendulum.parse(date_start).date() | ||
|
||
job = stream_slice["insight_job"] | ||
for obj in job.get_result(): | ||
yield obj.export_all_data() | ||
record = obj.export_all_data() | ||
if date_start and pendulum.parse(record["updated_time"]).date() <= date_start: | ||
continue | ||
yield record | ||
|
||
self._completed_slices.add(job.interval.start) | ||
if job.interval.start == self._next_cursor_value: | ||
|
@@ -152,9 +160,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late | |
|
||
def _date_intervals(self) -> Iterator[pendulum.Date]: | ||
"""Get date period to sync""" | ||
if self._end_date < self._next_cursor_value: | ||
today = pendulum.today(tz="UTC").date() | ||
end_date = min(self._end_date, today) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps since we are going to prevent looking at "today" again once that date has already been looked up, the earliest day we look at should be "yesterday" (e.g. today - 1). This way, we won't try to look up data for a partial day without being able to update it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have changed logic to sync only for yesterday |
||
if end_date < self._next_cursor_value: | ||
return | ||
date_range = self._end_date - self._next_cursor_value | ||
date_range = end_date - self._next_cursor_value | ||
yield from date_range.range("days", self.time_increment) | ||
|
||
def _advance_cursor(self): | ||
|
@@ -173,9 +183,14 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]: | |
:return: | ||
""" | ||
|
||
today = pendulum.today(tz="UTC").date() | ||
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD | ||
|
||
for ts_start in self._date_intervals(): | ||
if ts_start in self._completed_slices: | ||
continue | ||
if ts_start < refresh_date: | ||
continue | ||
self._completed_slices.remove(ts_start) | ||
ts_end = ts_start + pendulum.duration(days=self.time_increment - 1) | ||
interval = pendulum.Period(ts_start, ts_end) | ||
yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params) | ||
|
@@ -215,7 +230,7 @@ def _get_start_date(self) -> pendulum.Date: | |
|
||
:return: the first date to sync | ||
""" | ||
today = pendulum.today().date() | ||
today = pendulum.today(tz="UTC").date() | ||
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD | ||
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
|
||
from typing import Any, MutableMapping | ||
|
||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.streams import Stream | ||
|
||
|
||
def read_full_refresh(stream_instance: Stream): | ||
records = [] | ||
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) | ||
for slice in slices: | ||
records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh))) | ||
return records | ||
|
||
|
||
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]): | ||
records = [] | ||
stream_instance.state = stream_state | ||
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state) | ||
for slice in slices: | ||
records.extend(list(stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state))) | ||
stream_state.clear() | ||
stream_state.update(stream_instance.state) | ||
return records | ||
|
||
|
||
class FakeInsightAsyncJobManager: | ||
def __init__(self, jobs, **kwargs): | ||
self.jobs = jobs | ||
|
||
def completed_jobs(self): | ||
yield from self.jobs | ||
|
||
|
||
class FakeInsightAsyncJob: | ||
updated_insights = {} | ||
|
||
@classmethod | ||
def update_insight(cls, date_start, updated_time): | ||
cls.updated_insights[date_start] = updated_time | ||
|
||
def __init__(self, interval, **kwargs): | ||
self.interval = interval | ||
|
||
def get_result(self): | ||
return [self] | ||
|
||
def export_all_data(self): | ||
date_start = str(self.interval.start) | ||
return {"date_start": date_start, "updated_time": self.updated_insights.get(date_start, date_start)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice - this seems to be the main logical change