Skip to content

Commit

Permalink
Merge branch 'main' into HGI-5922
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 committed Jul 3, 2024
2 parents 7be1e5c + e4c5c38 commit 9dca4c7
Showing 1 changed file with 46 additions and 18 deletions.
64 changes: 46 additions & 18 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def preprocess_record(self, record: dict, context: dict):
"Salutation": salutation,
"Birthdate": birthdate,
"OwnerId": record.get("owner_id"),
"HasOptedOutOfEmail": record.get("unsubscribed"),
"HasOptedOutOfEmail": record.get("unsubscribed")
if record.get("unsubscribed") is not None
else record.get("subscribe_status") == "unsubscribed",
"NumberOfEmployees": record.get("number_of_employees"),
"Website": record.get("website"),
"Industry": industry,
Expand Down Expand Up @@ -707,6 +709,10 @@ def preprocess_record(self, record, context):
class FallbackSink(SalesforceV3Sink):
endpoint = "sobjects/"

@property
def lookup_fields_dict(self):
return self.config.get("lookup_fields") or {}

@property
def name(self):
return self.stream_name
Expand Down Expand Up @@ -756,29 +762,51 @@ def preprocess_record(self, record, context):
return {}
record["object_type"] = object_type

# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]
for email_to_check in email_values:
# Escape special characters on email
for char in ["+", "-"]:
if char in email_to_check:
email_to_check = email_to_check.replace(char, f"\{char}")

query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"])
req = self.request_api("GET", "search/", params={"q": query})

if req.json().get("searchRecords"):
record["Id"] = req.json()["searchRecords"][0]["Id"]
break

# If lookup_fields dict exist in config use it to check if the record exists in Salesforce
object_lookup_field = self.lookup_fields_dict.get(object_type)
# check if the lookup field exists for the object
object_lookup_field = object_lookup_field if object_lookup_field in fields else None
# check if the record has a value for the lookup field
lookup_value = record.get(object_lookup_field)

req = None
# lookup for record with field from config
if object_lookup_field and lookup_value:
query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"])
query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'"
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
else:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]

for email_to_check in email_values:
# Escape special characters on email
for char in ["+", "-"]:
if char in email_to_check:
email_to_check = email_to_check.replace(char, f"\{char}")

query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"])
req = self.request_api("GET", "search/", params={"q": query})
req = req.json().get("searchRecords")
if req:
break

# if record already exists add its Id for patching
if req:
existing_record = req[0]
# if flag only_upsert_empty_fields is true, only send fields with currently empty values
if self.config.get("only_upsert_empty_fields"):
record = {k:v for k,v in record.items() if not existing_record.get(k)}
record["Id"] = existing_record["Id"]
return record
except TargetSalesforceQuotaExceededException as e:
return {"error": str(e)}
else:
return {"error": "Unprocessed record due to requests exceeded API rate limits"}



def upsert_record(self, record, context):
# Not process records if target hit API rate limits
Expand Down

0 comments on commit 9dca4c7

Please sign in to comment.