From b7da373477350eb97b4de8ec16fd59106e753010 Mon Sep 17 00:00:00 2001 From: Keyna Rafael <95432445+keyn4@users.noreply.github.com> Date: Wed, 3 Jul 2024 11:45:13 -0500 Subject: [PATCH] add support to send empty fields for unified sinks (#20) --- target_salesforce_v3/client.py | 19 +++++++++-- target_salesforce_v3/sinks.py | 58 ++++++++++++++++++++++++++++++---- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/target_salesforce_v3/client.py b/target_salesforce_v3/client.py index c9a8ef7..3a8c314 100644 --- a/target_salesforce_v3/client.py +++ b/target_salesforce_v3/client.py @@ -152,6 +152,9 @@ def upsert_record(self, record: dict, context: dict) -> None: # Getting custom fields from record # self.process_custom_fields(record) + if not record: + return "", True, {"state": "no fields to post or update"} + fields = self.sf_fields_description() for field in fields["external_ids"]: @@ -301,10 +304,12 @@ def validate_output(self, mapping): # raise MissingRequiredFieldException(req_field) return payload - def query_sobject(self, query, fields): + def query_sobject(self, query, fields=None): params = {"q": query} response = self.request_api("GET", endpoint="query", params=params) response = response.json()["records"] + if not fields: + return response return [{k: v for k, v in r.items() if k in fields} for r in response] def process_custom_fields(self, record) -> None: @@ -419,4 +424,14 @@ def update_field_permissions(self,permission_set_id, sobject_type, field_name): } response = self.request_api("POST", endpoint="composite", request_data=payload, headers={"Content-Type": "application/json"}) - self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}") \ No newline at end of file + self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}") + + + def map_only_empty_fields(self, mapping, sobject_name, lookup_field): + fields = ",".join([field for field in mapping.keys()]) + data = self.query_sobject( + query = f"SELECT {fields} from {sobject_name} WHERE {lookup_field}", + ) + if data: + mapping = {k:v for k,v in mapping.items() if not data[0].get(k) or k == "Id"} + return mapping \ No newline at end of file diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index b6d1743..daf5a37 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -34,6 +34,7 @@ def reference_data(self): def preprocess_record(self, record: dict, context: dict): + # Parse data if isinstance(record.get("addresses"), str): record["addresses"] = json.loads(record["addresses"]) @@ -67,6 +68,7 @@ def preprocess_record(self, record: dict, context: dict): # fields = self.sf_fields_description + # map data mapping = { "FirstName": record.get("first_name"), "LastName": record.get("last_name"), @@ -98,12 +100,17 @@ def preprocess_record(self, record: dict, context: dict): elif self.contact_type == "Lead": mapping.update({"Company": record.get("company_name")}) + + # check if record already exists + lookup_field = None if record.get('id'): # If contact has an Id will use it to updatev mapping.update({"Id": record['id']}) + lookup_field = f"Id = '{record['id']}'" elif record.get("external_id"): external_id = record["external_id"] mapping[external_id["name"]] = external_id["value"] + lookup_field = f"{external_id['name']} = '{external_id['value']}'" else: # If no Id we'll use email to search for an existing record if record.get('email'): @@ -113,7 +120,9 @@ def preprocess_record(self, record: dict, context: dict): fields = ['Name', 'Id'] ) if data: - mapping.update({"Id":data[0].get("Id")}) + id = data[0].get("Id") + mapping.update({"Id":id}) + lookup_field = f"Id = '{id}'" if record.get('campaigns'): self.campaigns = record['campaigns'] @@ -178,7 +187,14 @@ def preprocess_record(self, record: dict, context: dict): ) mapping["AccountId"] = next(account_id, None) - return self.validate_output(mapping) + # validate mapping + mapping = self.validate_output(mapping) + + # If flag only_upsert_empty_fields is true, only upsert empty fields + if self.config.get("only_upsert_empty_fields") and lookup_field: + mapping = self.map_only_empty_fields(mapping, self.contact_type, lookup_field) + + return mapping def upsert_record(self, record, context): """Process the record.""" @@ -367,11 +383,19 @@ def preprocess_record(self, record, context): cf['name'] += '__c' mapping.update({cf['name']:cf['value']}) + lookup_field = None if record.get("external_id"): external_id = record["external_id"] mapping[external_id["name"]] = external_id["value"] + lookup_field = f'{external_id["name"]} = {external_id["value"]}' - return self.validate_output(mapping) + mapping = self.validate_output(mapping) + + # If flag only_upsert_empty_fields is true, only upsert empty fields + if self.config.get("only_upsert_empty_fields") and lookup_field: + mapping = self.map_only_empty_fields(mapping, "Opportunity", lookup_field) + + return mapping class CompanySink(SalesforceV3Sink): @@ -564,7 +588,15 @@ def preprocess_record(self, record, context): cf['name'] += '__c' mapping.update({cf['name']:cf['value']}) - return self.validate_output(mapping) + mapping = self.validate_output(mapping) + + # If flag only_upsert_empty_fields is true, only upsert empty fields + if self.config.get("only_upsert_empty_fields") and mapping.get("Id"): + lookup_field = f"Id = {mapping['Id']}" + mapping = self.map_only_empty_fields(mapping, "Campaign", lookup_field) + + return mapping + def upsert_record(self, record, context): """Process the record.""" @@ -651,7 +683,14 @@ def preprocess_record(self, record, context) -> dict: cf['name'] += '__c' mapping.update({cf['name']:cf['value']}) - return self.validate_output(mapping) + mapping = self.validate_output(mapping) + + # If flag only_upsert_empty_fields is true, only upsert empty fields + if self.config.get("only_upsert_empty_fields") and mapping.get("Id"): + lookup_field = f"Id = {mapping['Id']}" + mapping = self.map_only_empty_fields(mapping, "CampaignMember", lookup_field) + + return mapping def get_campaign_member_id(self, contact_id, campaign_id, contact_lookup = 'ContactId'): @@ -702,7 +741,14 @@ def preprocess_record(self, record, context): cf['name'] += '__c' mapping.update({cf['name']:cf['value']}) - return self.validate_output(mapping) + mapping = self.validate_output(mapping) + + # If flag only_upsert_empty_fields is true, only upsert empty fields + if self.config.get("only_upsert_empty_fields") and mapping.get("Id"): + lookup_field = f"Id = {mapping['Id']}" + mapping = self.map_only_empty_fields(mapping, "Task", lookup_field) + + return mapping class FallbackSink(SalesforceV3Sink):