From 73b6551d4ffbaed4a411fedeabd544f1a0684be4 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Mon, 9 Dec 2024 17:41:47 -0300 Subject: [PATCH] Improve error message when upsert records (#27) --- target_salesforce_v3/client.py | 17 +++++++++-- target_salesforce_v3/sinks.py | 53 +++++++++++++++++----------------- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/target_salesforce_v3/client.py b/target_salesforce_v3/client.py index d10e40b..bb7f01a 100644 --- a/target_salesforce_v3/client.py +++ b/target_salesforce_v3/client.py @@ -10,6 +10,7 @@ from backports.cached_property import cached_property from datetime import datetime +from requests import PreparedRequest from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.sinks import RecordSink @@ -114,7 +115,7 @@ def check_salesforce_limits(self, response): ) def _request( self, http_method, endpoint, params=None, request_data=None, headers=None - ) -> requests.PreparedRequest: + ) -> requests.Response: """Prepare a request object.""" url = self.url(endpoint) headers = self.http_headers @@ -140,7 +141,7 @@ def _request( self.validate_response(response) return response - def request_api(self, http_method, endpoint=None, params=None, request_data=None, headers=None): + def request_api(self, http_method, endpoint=None, params=None, request_data=None, headers=None) -> requests.Response: """Request records from REST endpoint(s), returning response records.""" resp = self._request(http_method, endpoint, params, request_data, headers) self.check_salesforce_limits(resp) @@ -439,4 +440,14 @@ def map_only_empty_fields(self, mapping, sobject_name, lookup_field): ) if data: mapping = {k:v for k,v in mapping.items() if not data[0].get(k) or k == "Id"} - return mapping \ No newline at end of file + return mapping + + + def log_error_message(self, sink_name: str, response: requests.Response, e: Exception): + try: + response_content = response.json() + except ValueError: + response_content = response.text if hasattr(response, 'text') else str(response) + + error_message = f"[{response.status_code}] Sink: {sink_name} | Error: {e}, URL: {response.url}, Body: {response.request.body}, Response: {response_content}" + self.logger.exception(error_message) diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index 3432cf0..4b8cc44 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -254,7 +254,7 @@ def upsert_record(self, record, context): self.assign_to_topic(id,self.topics) return id, True, state_updates except Exception as e: - self.logger.exception("Error while attempting to create Contact") + self.log_error_message(self.name, response, e) raise e def validate_response(self, response): @@ -320,8 +320,7 @@ def assign_to_topic(self,contact_id,topics:list) -> None: # Means it's already in the topic if "DUPLICATE_VALUE" in str(e): return - - self.logger.exception("Error encountered while creating TopicAssignment") + self.log_error_message(self.name, response, e) raise e def assign_to_campaign(self,contact_id,campaigns:list) -> None: @@ -376,8 +375,7 @@ def assign_to_campaign(self,contact_id,campaigns:list) -> None: id = data.get("id") self.logger.info(f"CampaignMember created with id: {id}") except Exception as e: - self.logger.exception("Error encountered while creating CampaignMember") - error = f"error: {e}, response: {response.json()}" + self.log_error_message(self.name, response, e) raise e @@ -724,7 +722,7 @@ def upsert_record(self, record, context): self.logger.info(f"{self.name} created with id: {id}") return id, True, state_updates except Exception as e: - self.logger.exception("Error encountered while creating campaign") + self.log_error_message(self.name, response, e) raise e @@ -1013,7 +1011,7 @@ def upsert_record(self, record, context): self.logger.info(f"{object_type} updated with id: {id}") return id, True, state_updates except Exception as e: - self.logger.exception(f"Error encountered while updating {object_type}") + self.log_error_message(self.name, response, e) if len(possible_update_fields) > 0: for id_field in possible_update_fields: @@ -1037,28 +1035,31 @@ def upsert_record(self, record, context): self.link_attachment_to_object(id, linked_object_id) return id, True, state_updates except Exception as e: - if "INVALID_FIELD_FOR_INSERT_UPDATE" in str(e): - try: - fields = json.loads(str(e))[0]['fields'] - except: - raise Exception(f"Attempted to write read-only fields. Unable to extract read-only fields to retry request: {str(e)}") - - self.logger.warning(f"Attempted to write read-only fields: {fields}. Removing them and retrying.") - # append read-only field to a list - if not self._target.read_only_fields.get(self.stream_name): - self._target.read_only_fields[self.stream_name] = [] - self._target.read_only_fields[self.stream_name].extend(fields) - # remove read-only fields from record - for f in fields: - record.pop(f, None) - # retry + if "INVALID_FIELD_FOR_INSERT_UPDATE" not in str(e): + self.log_error_message(self.name, response, e) + raise e + try: + fields = json.loads(str(e))[0]['fields'] + except: + raise Exception(f"Attempted to write read-only fields. Unable to extract read-only fields to retry request: {str(e)}") + + self.logger.warning(f"Attempted to write read-only fields: {fields}. Removing them and retrying.") + # append read-only field to a list + if not self._target.read_only_fields.get(self.stream_name): + self._target.read_only_fields[self.stream_name] = [] + self._target.read_only_fields[self.stream_name].extend(fields) + # remove read-only fields from record + for f in fields: + record.pop(f, None) + # retry + try: response = self.request_api("POST", endpoint=endpoint, request_data=record) id = response.json().get("id") self.logger.info(f"{object_type} created with id: {id}") return id, True, state_updates - - self.logger.exception(f"Error encountered while creating {object_type}") - raise e + except Exception as e: + self.log_error_message(self.name, response, e) + raise e def link_attachment_to_object(self, file_id, linked_object_id): @@ -1088,4 +1089,4 @@ def link_attachment_to_object(self, file_id, linked_object_id): self.logger.info(f"File with id {file_id} succesfully linked to object with id {linked_object_id}. Link id {response.json()['id']}") except Exception as e: self.logger.info(f"Failed while trying to link file {file_id} and object {linked_object_id}") - raise e \ No newline at end of file + raise e