Skip to content

Commit

Permalink
add support to send empty fields for unified sinks (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 authored Jul 3, 2024
1 parent e4c5c38 commit b7da373
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
19 changes: 17 additions & 2 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def upsert_record(self, record: dict, context: dict) -> None:
# Getting custom fields from record
# self.process_custom_fields(record)

if not record:
return "", True, {"state": "no fields to post or update"}

fields = self.sf_fields_description()

for field in fields["external_ids"]:
Expand Down Expand Up @@ -301,10 +304,12 @@ def validate_output(self, mapping):
# raise MissingRequiredFieldException(req_field)
return payload

def query_sobject(self, query, fields):
def query_sobject(self, query, fields=None):
params = {"q": query}
response = self.request_api("GET", endpoint="query", params=params)
response = response.json()["records"]
if not fields:
return response
return [{k: v for k, v in r.items() if k in fields} for r in response]

def process_custom_fields(self, record) -> None:
Expand Down Expand Up @@ -419,4 +424,14 @@ def update_field_permissions(self,permission_set_id, sobject_type, field_name):
}

response = self.request_api("POST", endpoint="composite", request_data=payload, headers={"Content-Type": "application/json"})
self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}")
self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}")


def map_only_empty_fields(self, mapping, sobject_name, lookup_field):
fields = ",".join([field for field in mapping.keys()])
data = self.query_sobject(
query = f"SELECT {fields} from {sobject_name} WHERE {lookup_field}",
)
if data:
mapping = {k:v for k,v in mapping.items() if not data[0].get(k) or k == "Id"}
return mapping
58 changes: 52 additions & 6 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def reference_data(self):

def preprocess_record(self, record: dict, context: dict):

# Parse data
if isinstance(record.get("addresses"), str):
record["addresses"] = json.loads(record["addresses"])

Expand Down Expand Up @@ -67,6 +68,7 @@ def preprocess_record(self, record: dict, context: dict):

# fields = self.sf_fields_description

# map data
mapping = {
"FirstName": record.get("first_name"),
"LastName": record.get("last_name"),
Expand Down Expand Up @@ -98,12 +100,17 @@ def preprocess_record(self, record: dict, context: dict):
elif self.contact_type == "Lead":
mapping.update({"Company": record.get("company_name")})


# check if record already exists
lookup_field = None
if record.get('id'):
# If contact has an Id will use it to updatev
mapping.update({"Id": record['id']})
lookup_field = f"Id = '{record['id']}'"
elif record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f"{external_id['name']} = '{external_id['value']}'"
else:
# If no Id we'll use email to search for an existing record
if record.get('email'):
Expand All @@ -113,7 +120,9 @@ def preprocess_record(self, record: dict, context: dict):
fields = ['Name', 'Id']
)
if data:
mapping.update({"Id":data[0].get("Id")})
id = data[0].get("Id")
mapping.update({"Id":id})
lookup_field = f"Id = '{id}'"

if record.get('campaigns'):
self.campaigns = record['campaigns']
Expand Down Expand Up @@ -178,7 +187,14 @@ def preprocess_record(self, record: dict, context: dict):
)
mapping["AccountId"] = next(account_id, None)

return self.validate_output(mapping)
# validate mapping
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, self.contact_type, lookup_field)

return mapping

def upsert_record(self, record, context):
"""Process the record."""
Expand Down Expand Up @@ -367,11 +383,19 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

lookup_field = None
if record.get("external_id"):
external_id = record["external_id"]
mapping[external_id["name"]] = external_id["value"]
lookup_field = f'{external_id["name"]} = {external_id["value"]}'

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and lookup_field:
mapping = self.map_only_empty_fields(mapping, "Opportunity", lookup_field)

return mapping


class CompanySink(SalesforceV3Sink):
Expand Down Expand Up @@ -564,7 +588,15 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "Campaign", lookup_field)

return mapping


def upsert_record(self, record, context):
"""Process the record."""
Expand Down Expand Up @@ -651,7 +683,14 @@ def preprocess_record(self, record, context) -> dict:
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "CampaignMember", lookup_field)

return mapping

def get_campaign_member_id(self, contact_id, campaign_id, contact_lookup = 'ContactId'):

Expand Down Expand Up @@ -702,7 +741,14 @@ def preprocess_record(self, record, context):
cf['name'] += '__c'
mapping.update({cf['name']:cf['value']})

return self.validate_output(mapping)
mapping = self.validate_output(mapping)

# If flag only_upsert_empty_fields is true, only upsert empty fields
if self.config.get("only_upsert_empty_fields") and mapping.get("Id"):
lookup_field = f"Id = {mapping['Id']}"
mapping = self.map_only_empty_fields(mapping, "Task", lookup_field)

return mapping


class FallbackSink(SalesforceV3Sink):
Expand Down

0 comments on commit b7da373

Please sign in to comment.