diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index 6ff11f5..579e715 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -78,7 +78,9 @@ def preprocess_record(self, record: dict, context: dict): "Salutation": salutation, "Birthdate": birthdate, "OwnerId": record.get("owner_id"), - "HasOptedOutOfEmail": record.get("unsubscribed"), + "HasOptedOutOfEmail": record.get("unsubscribed") + if record.get("unsubscribed") is not None + else record.get("subscribe_status") == "unsubscribed", "NumberOfEmployees": record.get("number_of_employees"), "Website": record.get("website"), "Industry": industry, @@ -707,6 +709,10 @@ def preprocess_record(self, record, context): class FallbackSink(SalesforceV3Sink): endpoint = "sobjects/" + @property + def lookup_fields_dict(self): + return self.config.get("lookup_fields") or {} + @property def name(self): return self.stream_name @@ -756,29 +762,51 @@ def preprocess_record(self, record, context): return {} record["object_type"] = object_type - # Try to find object instance using email - email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"] - email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)] - for email_to_check in email_values: - # Escape special characters on email - for char in ["+", "-"]: - if char in email_to_check: - email_to_check = email_to_check.replace(char, f"\{char}") - - query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"]) - req = self.request_api("GET", "search/", params={"q": query}) - - if req.json().get("searchRecords"): - record["Id"] = req.json()["searchRecords"][0]["Id"] - break - + # If lookup_fields dict exist in config use it to check if the record exists in Salesforce + object_lookup_field = self.lookup_fields_dict.get(object_type) + # check if the lookup field exists for the object + object_lookup_field = object_lookup_field if object_lookup_field in fields else None + # check if the record has a value for the lookup field + lookup_value = record.get(object_lookup_field) + + req = None + # lookup for record with field from config + if object_lookup_field and lookup_value: + query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"]) + query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'" + req = self.request_api("GET", "queryAll", params={"q": query}) + req = req.json().get("records") + # lookup for record with email fields + else: + # Try to find object instance using email + email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"] + email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)] + + for email_to_check in email_values: + # Escape special characters on email + for char in ["+", "-"]: + if char in email_to_check: + email_to_check = email_to_check.replace(char, f"\{char}") + + query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"]) + req = self.request_api("GET", "search/", params={"q": query}) + req = req.json().get("searchRecords") + if req: + break + + # if record already exists add its Id for patching + if req: + existing_record = req[0] + # if flag only_upsert_empty_fields is true, only send fields with currently empty values + if self.config.get("only_upsert_empty_fields"): + record = {k:v for k,v in record.items() if not existing_record.get(k)} + record["Id"] = existing_record["Id"] return record except TargetSalesforceQuotaExceededException as e: return {"error": str(e)} else: return {"error": "Unprocessed record due to requests exceeded API rate limits"} - def upsert_record(self, record, context): # Not process records if target hit API rate limits