From 663ab6a557a1a3053a5861cb1c6be98b6cbf6f61 Mon Sep 17 00:00:00 2001 From: marcosmarxm Date: Thu, 9 Jun 2022 17:11:25 -0300 Subject: [PATCH 1/4] change logic for counting records --- .../connectors/source-salesforce/Dockerfile | 2 +- .../source-salesforce/source_salesforce/streams.py | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index aadd91b9170ff..e7bbb0550892a 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.9 +LABEL io.airbyte.version=1.0.10 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index a506a26327937..22ec66f84191f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -312,8 +312,8 @@ def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[ chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix") for chunk in chunks: chunk = chunk.replace({nan: None}).to_dict(orient="records") - for n, row in enumerate(chunk, 1): - yield n, row + for row in chunk: + yield row except pd.errors.EmptyDataError as e: self.logger.info(f"Empty data received. {e}") yield from [] @@ -382,12 +382,15 @@ def read_records( count = 0 record: Mapping[str, Any] = {} - for count, record in self.read_with_chunks(self.download_data(url=job_full_url)): + for record in self.read_with_chunks(self.download_data(url=job_full_url)): + count += 1 yield record self.delete_job(url=job_full_url) if count < self.page_size: - # this is a last page + # Salesforce doesn't give a next token or something to know the request was + # the last page. The connectors will sync batches in `page_size` and + # considers that batch is smaller than the `page_size` it must be the last page. break next_page_token = self.next_page_token(record) From c9ef296251a6d55e41d6f761fee6dc4db7c4ce4e Mon Sep 17 00:00:00 2001 From: marcosmarxm Date: Thu, 9 Jun 2022 17:13:21 -0300 Subject: [PATCH 2/4] update doc --- docs/integrations/sources/salesforce.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 7242ea0a8949a..29e5feae1bb03 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -117,6 +117,7 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------| +| 1.0.10 | 2022-06-09 | [13658](https://github.com/airbytehq/airbyte/pull/13658) | Correct logic to sync stream larger than page size | | 1.0.9 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions | | 1.0.8 | 2022-05-04 | [12576](https://github.com/airbytehq/airbyte/pull/12576) | Decode responses as utf-8 and fallback to ISO-8859-1 if needed | | 1.0.7 | 2022-05-03 | [12552](https://github.com/airbytehq/airbyte/pull/12552) | Decode responses as ISO-8859-1 instead of utf-8 | From 7339e982eeef2a2c033dbe6f9756600eaac689eb Mon Sep 17 00:00:00 2001 From: marcosmarxm Date: Thu, 9 Jun 2022 17:49:56 -0300 Subject: [PATCH 3/4] correct unit test --- .../connectors/source-salesforce/unit_tests/api_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 0888317eee549..961de6aae7542 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -215,7 +215,7 @@ def test_download_data_filter_null_bytes(stream_config, stream_api): m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') res = list(stream.read_with_chunks(stream.download_data(url=job_full_url))) - assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": False})] + assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": False}] def test_check_connection_rate_limit(stream_config): @@ -427,7 +427,7 @@ def test_csv_reader_dialect_unix(): with requests_mock.Mocker() as m: m.register_uri("GET", url + "/results", text=text) - result = [dict(i[1]) for i in stream.read_with_chunks(stream.download_data(url))] + result = [i for i in stream.read_with_chunks(stream.download_data(url))] assert result == data From bb6f547f0464cf2bb8451bb702d182ca289e597f Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 14 Jun 2022 11:20:02 +0000 Subject: [PATCH 4/4] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d0f680ec71cea..98bbbd245e4a1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -802,7 +802,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 1.0.9 + dockerImageTag: 1.0.10 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 856f5a6e76593..19d4bfd23cedf 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7646,7 +7646,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:1.0.9" +- dockerImage: "airbyte/source-salesforce:1.0.10" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce" connectionSpecification: