Skip to content

Commit

Permalink
Adds some required fields checking
Browse files Browse the repository at this point in the history
  • Loading branch information
vmesel committed Nov 9, 2023
1 parent be00317 commit f09b8ca
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 13 deletions.
13 changes: 7 additions & 6 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,15 @@ def url(self, endpoint=None):
def validate_input(self, record: dict):
return self.unified_schema(**record).dict()

@cached_property
def sf_fields(self):
sobject = self.request_api("GET", f"{self.endpoint}/describe/")
def sf_fields(self, object_type=None):
if not object_type:
sobject = self.request_api("GET", f"{self.endpoint}/describe/")
else:
sobject = self.request_api("GET", f"sobjects/{object_type}/describe/")
return [f for f in sobject.json()["fields"]]

@cached_property
def sf_fields_description(self):
fld = self.sf_fields
def sf_fields_description(self, object_type=None):
fld = self.sf_fields(object_type=object_type)
fields = {}
fields["createable"] = [
f["name"] for f in fld if f["createable"] and not f["custom"]
Expand Down
51 changes: 44 additions & 7 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,27 +728,68 @@ def validate_record(self, record, fields):
return new_record

def preprocess_record(self, record, context):
# Check if object exists in Salesforce
object_type = None
req = self.request_api("GET", "sobjects")
for object in req.json().get("sobjects", []):
is_name = object["name"] == self.stream_name
is_label = object["label"] == self.stream_name
is_label_plural = object["labelPlural"] == self.stream_name
if is_name or is_label or is_label_plural:
self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.")
object_type = object["name"]
break

if not object_type:
self.logger.info(f"Skipping record, because {self.stream_name} was not found on Salesforce.")
return

try:
fields = self.get_fields_for_object(self.stream_name)
fields = self.get_fields_for_object(object_type)
except MissingObjectInSalesforceError:
self.logger.info("Skipping record, because it was not found on Salesforce.")
return
record = self.validate_record(record, fields)
record["object_type"] = object_type
return record

def process_record(self, record, context):
object_type = record.pop("object_type", None)
self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.")

if record == {}:
self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.")
return

required_fields = []
if record.get("Id"):
fields = ["Id"]
else:
fields = self.sf_fields_description["external_ids"]
fields_desc = self.sf_fields_description(object_type=object_type)
required_fields = fields_desc["required"]
list_fields = [field_list for field_list in fields_desc.values()]
fields = []
for list_field in list_fields:
for item in list_field:
fields.append(item)

endpoint = f"sobjects/{object_type}"

# Checks for required fields
for field in required_fields:
if record.get(field) is None:
self.logger.info(f"Skipping record, because {field} is required.")
return

for field in record.keys():
if field not in fields:
self.logger.info(f"Field {field} doesn't exist on Salesforce.")

endpoint = f"sobjects/{self.stream_name}"

missing_fields = list(set(fields) - set(record.keys()))

if len(missing_fields) > 0.5 * len(fields):
self.logger.info(f"This record may require more fields to be mapped. Missing fields: {missing_fields}")

for field in fields:
if record.get(field):
Expand All @@ -766,10 +807,6 @@ def process_record(self, record, context):
self.logger.info(f"{field} with id {record[field]} does not exist. \nWill attepmt to create it.")
record = update_record

if not record.get("Name") and not record.get("WhatId"):
raise FatalAPIError("ERROR: Campaigns in Salesforce are required to have a 'Name' field")


try:
response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
Expand Down

0 comments on commit f09b8ca

Please sign in to comment.