Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facebook Marketing performance improvement #8385

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
57b47cb
Facebook Marketing performance improvement
Dec 1, 2021
69b2958
add comments and little refactoring
Dec 3, 2021
440fc6e
fix after performance test
Dec 6, 2021
dff2421
Check job status in advance, some other changes.
Dec 10, 2021
4112279
Bump rc version
Dec 10, 2021
939d646
fix usage of AsyncJob methods, correct batch job retry
eugene-kulak Dec 22, 2021
0f8439d
bump dev version
eugene-kulak Dec 22, 2021
86ce7db
revert extra diff with master
eugene-kulak Dec 22, 2021
fa650b8
Merge remote-tracking branch 'origin/master' into drezchykov/fb-perf
eugene-kulak Dec 22, 2021
d8f57d6
fix bad merge
eugene-kulak Dec 22, 2021
5c7e18f
isort
eugene-kulak Dec 22, 2021
c9deade
fix flake
eugene-kulak Dec 22, 2021
07acea5
fix unit tests
eugene-kulak Dec 22, 2021
213ac36
fix integration tests with the new config
eugene-kulak Dec 23, 2021
93a03e1
improve job status handling, limit concurrency to 10
eugene-kulak Dec 28, 2021
47ad837
WIP
eugene-kulak Jan 5, 2022
483f5cd
fix campaign jobs, refactor manager
eugene-kulak Jan 13, 2022
4faabcb
big refactoring of async jobs, support random order of slices
eugene-kulak Jan 20, 2022
ea1ac91
update source _read_incremental to hook new state logic
eugene-kulak Jan 20, 2022
c7ca0e3
fix issues with timeout
eugene-kulak Jan 20, 2022
f66ce53
remove debugging and clean up, improve retry logic
eugene-kulak Jan 21, 2022
cc5baea
Merge remote-tracking branch 'origin/master' into drezchykov/fb-perf
eugene-kulak Jan 21, 2022
218db5c
merge changes from #8234
eugene-kulak Jan 21, 2022
2e56c22
fix call super _read_increment
eugene-kulak Jan 22, 2022
8d560d8
generalize batch execution, add use_batch flag
eugene-kulak Jan 22, 2022
3c0754d
fixing unittests
eugene-kulak Jan 23, 2022
76b529c
fix integration tests
eugene-kulak Jan 23, 2022
ca42a4e
fix tests for AsyncJob
eugene-kulak Jan 23, 2022
15e73f1
fix unit_tests and batch yield
eugene-kulak Jan 23, 2022
d09994f
fix SAT
eugene-kulak Jan 23, 2022
06a75ef
fix flake
eugene-kulak Jan 23, 2022
ae503c3
format
eugene-kulak Jan 23, 2022
8df71c9
refactor file layout and fix formatting
eugene-kulak Jan 23, 2022
01f8bd4
format
eugene-kulak Jan 23, 2022
3cbd8fc
format
eugene-kulak Jan 23, 2022
a969afa
improve coverage, do some refactoring of spec
eugene-kulak Jan 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ From the Airbyte repository root, run:
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/facebook-marketing)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_facebook_marketing/spec.json` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `sample_files/sample_config.json` for a sample config file.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source facebook-marketing test creds`
and place them into `secrets/config.json`.
Expand All @@ -49,7 +49,7 @@ and place them into `secrets/config.json`.
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Locally running the connector docker image
Expand All @@ -73,7 +73,7 @@ Then run any of the connector commands as follows:
docker run --rm airbyte/source-facebook-marketing:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-facebook-marketing:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-facebook-marketing:20211222-rc1 read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,10 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 600
empty_streams: ["videos"]
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
future_state_path: "integration_tests/future_state.json"
full_refresh:
- config_path: "secrets/config.json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

means we test all streams from the catalog, and because we using new PK-based comparison we don't need to ignore some fields

configured_catalog_path: "integration_tests/configured_catalog.json"
# Ad Insights API has estimated metrics in response, which is calculated based on another metrics.
# Sometimes API doesn't return estimated metrics. E.g, cost_per_estimated_ad_recallers is calculated
# as total amount spent divided by estimated ad recall lift rate. When second metric is equal to zero
# API may or may not return value. Such behavior causes sequential reads test failing.
# Because one read response contains this metric, and other doesn't.
# Therefore, it's needed to ignore fields like this in API responses.
ignored_fields:
"ads_insights_age_and_gender": ["cost_per_estimated_ad_recallers"]
"ad_creatives": ["thumbnail_url"]
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,29 @@
"stream": {
"name": "ads_insights",
"json_schema": {},
"default_cursor_field": ["date_start"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["date_start"],
"source_defined_primary_key": null,
"namespace": null
"source_defined_primary_key": []
},
"sync_mode": "incremental",
"primary_key": [],
"cursor_field": ["date_start"],
"destination_sync_mode": "append",
"primary_key": null
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ads_insights_age_and_gender",
"name": "ads_insights_platform_and_device",
"json_schema": {},
"default_cursor_field": ["date_start"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["date_start"],
"source_defined_primary_key": null,
"namespace": null
"source_defined_primary_key": []
},
"sync_mode": "incremental",
"primary_key": [],
"cursor_field": ["date_start"],
"destination_sync_mode": "append",
"primary_key": null
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"cursor_field": null,
Expand All @@ -22,13 +21,11 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
"destination_sync_mode": "append"
},
{
"stream": {
Expand All @@ -37,8 +34,7 @@
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_time"],
"source_defined_primary_key": [["id"]],
"namespace": null
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"cursor_field": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ def config_with_wrong_account_fixture(config):

@pytest.fixture(scope="session", name="config_with_include_deleted")
def config_with_include_deleted_fixture(config):
return {**config, "include_deleted": True}
new_config = {**config, "include_deleted": True}
new_config.pop("_limit", None)
new_config.pop("end_date", None)
return new_config
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,10 @@
"title": "Source Facebook Marketing",
"type": "object",
"properties": {
"account_id": {
"title": "Account Id",
"description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.",
"type": "string"
},
"access_token": {
"title": "Access Token",
"description": "The value of the access token generated. See the <a href=\"https://docs.airbyte.io/integrations/sources/facebook-marketing\">docs</a> for more information",
"airbyte_secret": true,
"type": "string"
},
"start_date": {
"title": "Start Date",
"description": "The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
"order": 0,
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2017-01-25T00:00:00Z"],
"type": "string",
Expand All @@ -27,42 +17,44 @@
"end_date": {
"title": "End Date",
"description": "The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.",
"order": 1,
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2017-01-26T00:00:00Z"],
"type": "string",
"format": "date-time"
},
"fetch_thumbnail_images": {
"title": "Fetch Thumbnail Images",
"description": "In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url",
"default": false,
"type": "boolean"
"account_id": {
"title": "Account ID",
"description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.",
"order": 2,
"examples": ["111111111111111"],
"type": "string"
},
"access_token": {
"title": "Access Token",
"description": "The value of the access token generated. See the <a href=\"https://docs.airbyte.io/integrations/sources/facebook-marketing\">docs</a> for more information",
"order": 3,
"airbyte_secret": true,
"type": "string"
},
"include_deleted": {
"title": "Include Deleted",
"description": "Include data from deleted campaigns, ads, and adsets",
"description": "Include data from deleted Campaigns, Ads, and AdSets",
"default": false,
"order": 4,
"type": "boolean"
},
"insights_lookback_window": {
"title": "Insights Lookback Window",
"description": "The attribution window for the actions",
"default": 28,
"minimum": 0,
"maximum": 28,
"type": "integer"
},
"insights_days_per_job": {
"title": "Insights Days Per Job",
"description": "Number of days to sync in one job (the more data you have, the smaller this parameter should be)",
"default": 7,
"minimum": 1,
"maximum": 30,
"type": "integer"
"fetch_thumbnail_images": {
"title": "Fetch Thumbnail Images",
"description": "In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url",
"default": false,
"order": 5,
"type": "boolean"
},
"custom_insights": {
"title": "Custom Insights",
"description": "A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
"description": "A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
"order": 6,
"type": "array",
"items": {
"title": "InsightConfig",
Expand Down Expand Up @@ -105,48 +97,7 @@
}
}
},
"required": ["account_id", "access_token", "start_date"],
"definitions": {
"InsightConfig": {
"title": "InsightConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name value of insight",
"type": "string"
},
"fields": {
"title": "Fields",
"description": "A list of chosen fields for fields parameter",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"breakdowns": {
"title": "Breakdowns",
"description": "A list of chosen breakdowns for breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"action_breakdowns": {
"title": "Action Breakdowns",
"description": "A list of chosen action_breakdowns for action_breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["name"]
}
}
"required": ["start_date", "account_id", "access_token"]
},
"supportsIncremental": true,
"supported_destination_sync_modes": ["append"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@


import copy
import logging
from typing import Any, List, MutableMapping, Set, Tuple

import pytest
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from source_facebook_marketing.source import SourceFacebookMarketing

Expand Down Expand Up @@ -52,7 +52,7 @@ def test_streams_with_include_deleted(self, stream_name, deleted_id, config_with
assert deleted_records, f"{stream_name} stream should have deleted records returned"
assert is_specific_deleted_pulled, f"{stream_name} stream should have a deleted record with id={deleted_id}"

@pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 1), ("ad_sets", 1)])
@pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("ad_sets", 1)])
def test_streams_with_include_deleted_and_state(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state):
"""Should ignore state because of include_deleted enabled"""
catalog = self.slice_catalog(configured_catalog, {stream_name})
Expand Down Expand Up @@ -92,7 +92,7 @@ def slice_catalog(catalog: ConfiguredAirbyteCatalog, streams: Set[str]) -> Confi
def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]:
records = []
states = []
for message in SourceFacebookMarketing().read(AirbyteLogger(), conf, catalog, state=state):
for message in SourceFacebookMarketing().read(logging.getLogger("airbyte"), conf, catalog, state=state):
if message.type == Type.RECORD:
records.append(message)
elif message.type == Type.STATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.35",
"cached_property~=1.5",
"facebook_business~=12.0",
"airbyte-cdk==0.1.47",
"cached_property==1.5.2",
"facebook_business==12.0.1",
"pendulum>=2,<3",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,41 @@
import json
import logging
from time import sleep
from typing import Tuple

import backoff
import pendulum
from cached_property import cached_property
from facebook_business import FacebookAdsApi
from facebook_business.adobjects import user as fb_user
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.common import FacebookAPIException
from source_facebook_marketing.streams.common import retry_pattern

logger = logging.getLogger("airbyte")


class FacebookAPIException(Exception):
"""General class for all API errors"""


backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)


class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""

call_rate_threshold = 90 # maximum percentage of call limit utilization
call_rate_threshold = 95 # maximum percentage of call limit utilization
pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit

# Insights async jobs throttle, from 1 to 100
_ads_insights_throttle: Tuple[float, float]

@property
def ads_insights_throttle(self) -> Tuple[float, float]:
return self._ads_insights_throttle

@staticmethod
def parse_call_rate_header(headers):
usage = 0
Expand Down Expand Up @@ -86,6 +103,19 @@ def handle_call_rate_limit(self, response, params):
logger.warning(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
sleep(pause_interval.total_seconds())

def _update_insights_throttle_limit(self, response: FacebookResponse):
"""
For /insights call every response contains x-fb-ads-insights-throttle
header representing current throttle limit parameter for async insights
jobs for current app/account. We need this information to adjust
number of running async jobs for optimal performance.
"""
ads_insights_throttle = response.headers().get("x-fb-ads-insights-throttle")
if ads_insights_throttle:
ads_insights_throttle = json.loads(ads_insights_throttle)
self._ads_insights_throttle = ads_insights_throttle.get("app_id_util_pct", 0), ads_insights_throttle.get("acc_id_util_pct", 0)

@backoff_policy
def call(
self,
method,
Expand All @@ -98,6 +128,7 @@ def call(
):
"""Makes an API call, delegate actual work to parent class and handles call rates"""
response = super().call(method, path, params, headers, files, url_override, api_version)
self._update_insights_throttle_limit(response)
self.handle_call_rate_limit(response, params)
return response

Expand Down
Loading