Skip to content

Commit

Permalink
Source Marketo: process fail during creation of an export job (#13930)
Browse files Browse the repository at this point in the history
* #9322 source Marketo: process fail during creation of an export job

* #9322 source marketo: upd changelog

* #9322 source marketo: fix unit test

* #9322 source marketo: fix SATs

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
davydov-d and octavia-squidington-iii authored Jun 21, 2022
1 parent de0cf89 commit e8a5c7f
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
icon: marketo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4799,7 +4799,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:0.1.3"
- dockerImage: "airbyte/source-marketo:0.1.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
empty_streams: ["activities_visit_webpage"]
timeout_seconds: 3600
expect_records:
path: "integration_tests/expected_records.txt"
Expand Down

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-marketo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-mock~=3.6.1",
"requests-mock",
"source-acceptance-test",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
"workspace": {
"type": ["null", "string"]
},
"headStart": {
"type": ["null", "boolean"]
},
"folder": {
"type": ["object", "null"],
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
)
}

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[MutableMapping[str, any]]]:
"""
Override default stream_slices CDK method to provide date_slices as page chunks for data fetch.
Returns list of dict, example: [{
Expand Down Expand Up @@ -172,7 +172,9 @@ def get_export_status(self, stream_slice):
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"bulk/v1/{self.stream_name}/export/{stream_slice['id']}/file.json"

def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
def stream_slices(
self, sync_mode, stream_state: MutableMapping[str, Any] = None, **kwargs
) -> Iterable[Optional[MutableMapping[str, any]]]:
date_slices = super().stream_slices(sync_mode, stream_state, **kwargs)

for date_slice in date_slices:
Expand All @@ -182,8 +184,12 @@ def stream_slices(self, sync_mode, stream_state: Mapping[str, Any] = None, **kwa

export = self.create_export(param)

date_slice["id"] = export["exportId"]
return date_slices
status, export_id = export.get("status", "").lower(), export.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job for data slice {date_slice}!")
continue
date_slice["id"] = export_id
yield date_slice

def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool:
while True:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
"client_id": {
"title": "Client ID",
"type": "string",
"title": "Client ID",
"description": "The Client ID of your Marketo developer application. See <a href=\"https://docs.airbyte.io/integrations/sources/marketo\"> the docs </a> for info on how to obtain this.",
"order": 0,
"airbyte_secret": true
},
"client_secret": {
"title": "Client Secret",
"type": "string",
"title": "Client Secret",
"description": "The Client Secret of your Marketo developer application. See <a href=\"https://docs.airbyte.io/integrations/sources/marketo\"> the docs </a> for info on how to obtain this.",
"order": 1,
"airbyte_secret": true
Expand All @@ -35,7 +33,6 @@
"title": "Start Date",
"type": "string",
"order": 2,
"title": "Start Date",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2020-09-25T00:00:00Z"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pendulum
import pytest
from source_marketo.source import Activities, MarketoAuthenticator


@pytest.fixture(autouse=True)
def mock_requests(requests_mock):
requests_mock.register_uri(
"GET", "https://602-euo-598.mktorest.com/identity/oauth/token", json={"access_token": "token", "expires_in": 3600}
)
requests_mock.register_uri(
"POST",
"https://602-euo-598.mktorest.com/bulk/v1/activities/export/create.json",
[
{"json": {"result": [{"exportId": "2c09ce6d", "format": "CSV", "status": "Created", "createdAt": "2022-06-20T08:44:08Z"}]}},
{"json": {"result": [{"exportId": "cd465f55", "format": "CSV", "status": "Created", "createdAt": "2022-06-20T08:45:08Z"}]}},
{"json": {"result": [{"exportId": "null", "format": "CSV", "status": "Failed", "createdAt": "2022-06-20T08:46:08Z"}]}},
{"json": {"result": [{"exportId": "232aafb4", "format": "CSV", "status": "Created", "createdAt": "2022-06-20T08:47:08Z"}]}},
],
)


@pytest.fixture
def config():
start_date = pendulum.now().subtract(days=100).strftime("%Y-%m-%dT%H:%M:%SZ")
config = {
"client_id": "client-id",
"client_secret": "********",
"domain_url": "https://602-EUO-598.mktorest.com",
"start_date": start_date,
"window_in_days": 30,
}
config["authenticator"] = MarketoAuthenticator(config)
return config


@pytest.fixture
def send_email_stream(config):
activity = {
"id": 6,
"name": "send_email",
"description": "Send Marketo Email to a person",
"primaryAttribute": {"name": "Mailing ID", "dataType": "integer"},
"attributes": [
{"name": "Campaign Run ID", "dataType": "integer"},
{"name": "Choice Number", "dataType": "integer"},
{"name": "Has Predictive", "dataType": "boolean"},
{"name": "Step ID", "dataType": "integer"},
{"name": "Test Variant", "dataType": "integer"},
],
}
stream_name = f"activities_{activity['name']}"
cls = type(stream_name, (Activities,), {"activity": activity})
return cls(config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from unittest.mock import ANY

from airbyte_cdk.models.airbyte_protocol import SyncMode


def test_create_export_job(send_email_stream, caplog):
caplog.set_level(logging.WARNING)
slices = list(send_email_stream.stream_slices(sync_mode=SyncMode.incremental))
assert slices == [
{"endAt": ANY, "id": "2c09ce6d", "startAt": ANY},
{"endAt": ANY, "id": "cd465f55", "startAt": ANY},
{"endAt": ANY, "id": "232aafb4", "startAt": ANY},
]
assert "Failed to create export job for data slice " in caplog.records[-1].message
35 changes: 18 additions & 17 deletions docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ This connector can be used to sync the following tables from Marketo:

### Data type mapping

| Integration Type | Airbyte Type | Notes |
| :--- | :--- | :--- |
| `array` | `array` | primitive arrays are converted into arrays of the types described in this table |
| `int`, `long` | `number` | |
| `object` | `object` | |
| `string` | `string` | \`\` |
| Namespaces | No | |
| Integration Type | Airbyte Type | Notes |
|:-----------------|:-------------|:--------------------------------------------------------------------------------|
| `array` | `array` | primitive arrays are converted into arrays of the types described in this table |
| `int`, `long` | `number` | |
| `object` | `object` | |
| `string` | `string` | \`\` |
| Namespaces | No | |

### Features

Feature

| Supported?\(Yes/No\) | Notes |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental - Append Sync | Yes |
| Supported?\(Yes/No\) | Notes |
|:--------------------------|:------|
| Full Refresh Sync | Yes |
| Incremental - Append Sync | Yes |

### Performance considerations

Expand Down Expand Up @@ -89,10 +89,11 @@ We're almost there! Armed with your Endpoint & Identity URLs and your Client ID

## CHANGELOG

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------|
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |

0 comments on commit e8a5c7f

Please sign in to comment.