Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to only upsert empty fields #15

Merged
merged 2 commits into from
May 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 43 additions & 16 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,10 @@ def preprocess_record(self, record, context):
class FallbackSink(SalesforceV3Sink):
endpoint = "sobjects/"

@property
def lookup_fields_dict(self):
return self.config.get("lookup_fields") or {}

@property
def name(self):
return self.stream_name
Expand Down Expand Up @@ -745,22 +749,45 @@ def preprocess_record(self, record, context):
return {}
record["object_type"] = object_type

# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]
for email_to_check in email_values:
# Escape special characters on email
for char in ["+", "-"]:
if char in email_to_check:
email_to_check = email_to_check.replace(char, f"\{char}")

query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"])
req = self.request_api("GET", "search/", params={"q": query})

if req.json().get("searchRecords"):
record["Id"] = req.json()["searchRecords"][0]["Id"]
break

# If lookup_fields dict exist in config use it to check if the record exists in Salesforce
object_lookup_field = self.lookup_fields_dict.get(object_type)
# check if the lookup field exists for the object
object_lookup_field = object_lookup_field if object_lookup_field in fields else None
# check if the record has a value for the lookup field
lookup_value = record.get(object_lookup_field)

req = None
# lookup for record with field from config
if object_lookup_field and lookup_value:
query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"])
query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'"
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
else:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]

for email_to_check in email_values:
# Escape special characters on email
for char in ["+", "-"]:
if char in email_to_check:
email_to_check = email_to_check.replace(char, f"\{char}")

query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"])
req = self.request_api("GET", "search/", params={"q": query})
req = req.json().get("searchRecords")
if req:
break

# if record already exists add its Id for patching
if req:
existing_record = req[0]
# if flag only_upsert_empty_fields is true, only send fields with currently empty values
if self.config.get("only_upsert_empty_fields"):
record = {k:v for k,v in record.items() if not existing_record.get(k)}
record["Id"] = existing_record["Id"]
hsyyid marked this conversation as resolved.
Show resolved Hide resolved
return record

def upsert_record(self, record, context):
Expand Down
Loading