Skip to content

Commit

Permalink
Source marketo: retry job creation instead of skipping (#15683)
Browse files Browse the repository at this point in the history
* #14149 source marketo: retry job creation instead of skipping

* #14149 source marketo: upd changelog

* #14149 source marketo: increase timeouts for SATs

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Aug 17, 2022
1 parent e40ee3f commit b811d8c
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
icon: marketo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5182,7 +5182,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:0.1.4"
- dockerImage: "airbyte/source-marketo:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: ["activities_visit_webpage"]
timeout_seconds: 3600
timeout_seconds: 4800
expect_records:
path: "integration_tests/expected_records.txt"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
timeout_seconds: 3600
timeout_seconds: 4800
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 3600
timeout_seconds: 4800
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["null", "object"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["null", "object"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"company": {
"type": ["string", "null"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["integer", "null"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["integer", "null"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,8 @@ def stream_slices(

export = self.create_export(param)

status, export_id = export.get("status", "").lower(), export.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job for data slice {date_slice}!")
continue
date_slice["id"] = export_id
yield date_slice
date_slice["id"] = export["exportId"]
return date_slices

def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool:
while True:
Expand Down Expand Up @@ -270,6 +266,16 @@ class MarketoExportCreate(MarketoStream):
def path(self, **kwargs) -> str:
return f"bulk/v1/{self.stream_name}/export/create.json"

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 429 or 500 <= response.status_code < 600:
return True
record = next(self.parse_response(response, {}))
status, export_id = record.get("status", "").lower(), record.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job! Status is {status}!")
return True
return False

def request_body_json(self, **kwargs) -> Optional[Mapping]:
params = {"format": "CSV"}
if self.param:
Expand Down Expand Up @@ -382,7 +388,7 @@ def get_json_schema(self) -> Mapping[str, Any]:
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": False,
"additionalProperties": True,
"properties": properties,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def mock_requests(requests_mock):

@pytest.fixture
def config():
start_date = pendulum.now().subtract(days=100).strftime("%Y-%m-%dT%H:%M:%SZ")
start_date = pendulum.now().subtract(days=75).strftime("%Y-%m-%dT%H:%M:%SZ")
config = {
"client_id": "client-id",
"client_secret": "********",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def test_create_export_job(send_email_stream, caplog):
{"endAt": ANY, "id": "cd465f55", "startAt": ANY},
{"endAt": ANY, "id": "232aafb4", "startAt": ANY},
]
assert "Failed to create export job for data slice " in caplog.records[-1].message
assert "Failed to create export job! Status is failed!" in caplog.records[-1].message
15 changes: 8 additions & 7 deletions docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ We're almost there! Armed with your Endpoint & Identity URLs and your Client ID

## CHANGELOG

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------|
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------|
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |

0 comments on commit b811d8c

Please sign in to comment.