Skip to content

Commit

Permalink
Fixes target
Browse files Browse the repository at this point in the history
  • Loading branch information
vmesel committed Dec 8, 2023
1 parent 0194733 commit 759fd6a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
18 changes: 13 additions & 5 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class NoCreatableFieldsException(Exception):

class SalesforceV3Sink(HotglueSink, RecordSink):
"""SalesforceV3 target sink class."""
api_version = "v55.0"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_version = self.config.get("api_version", "55.0").replace("v", "")

@property
def http_headers(self) -> dict:
Expand Down Expand Up @@ -158,7 +161,7 @@ def upsert_record(self, record: dict, context: dict) -> None:
return id, True, state_updates
except:
pass

@property
def authenticator(self):
url = self.url()
Expand Down Expand Up @@ -193,7 +196,7 @@ def url(self, endpoint=None):
instance_url = self.config.get("instance_url")
if not instance_url:
raise Exception("instance_url not defined in config")
return f"{instance_url}/services/data/{self.api_version}/{endpoint}"
return f"{instance_url}/services/data/v{self.api_version}/{endpoint}"

def validate_input(self, record: dict):
return self.unified_schema(**record).dict()
Expand Down Expand Up @@ -323,7 +326,11 @@ def add_custom_field(self,cf,label=None):
# `Activity` sObject, so we change `Task` -> `Activity`
sobject = 'Activity'

url = self.url("services/Soap/m/55.0").replace('services/data/v55.0/','')
url = self.url(
f"services/Soap/m/{self.api_version}"
).replace(
f'services/data/v{self.api_version}/',''
)

# If the new custom field is an external id it needs to contain 'externalid'
external_id = 'true' if 'externalid' in cf.lower() else 'false'
Expand Down Expand Up @@ -380,10 +387,11 @@ def update_field_permissions(self,permission_set_id, sobject_type, field_name):
"PermissionsEdit": "true",
"PermissionsRead": "true"
},
"url": "/services/data/v55.0/sobjects/FieldPermissions/",
"url": f"/services/data/v{self.api_version}/sobjects/FieldPermissions/",
"method": "POST"
}
]
}

response = self.request_api("POST", endpoint="composite", request_data=payload, headers={"Content-Type": "application/json"})
self.logger.info(f"Field permission for {field_name} updated for permission set {permission_set_id}, response: {response.text}")
16 changes: 10 additions & 6 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ContactsSink(SalesforceV3Sink):
name = Contact.Stream.name
campaigns = None
contact_type = "Contact"
available_names = ["contacts", "customers"]
available_names = ["contacts", "customers", "contact", "customer"]

@cached_property
def reference_data(self):
Expand Down Expand Up @@ -232,7 +232,7 @@ def validate_response(self, response):
self.logger.info("INFO: This Contact/Lead is already a Campaign Member.")
elif '[{"errorCode":"NOT_FOUND","message":"The requested resource does not exist"}]' in response.text:
self.logger.info("INFO: This Contact/Lead was not found using Email will attempt to create it.")
if '[{"message":"No such column \'HasOptedOutOfEmail\' on sobject of type' in response.text:
elif '[{"message":"No such column \'HasOptedOutOfEmail\' on sobject of type' in response.text:
self.update_field_permissions(profile = 'System Administrator', sobject_type = self.contact_type, field_name=f"{self.contact_type}.HasOptedOutOfEmail")
raise RetriableAPIError(f"DEBUG: HasOptedOutOfEmail column was not found, updating 'Field-Leve Security'\n'System Administrator'[x]")
else:
Expand Down Expand Up @@ -722,7 +722,8 @@ def preprocess_record(self, record, context):
# Check if object exists in Salesforce
object_type = None
req = self.request_api("GET", "sobjects")
for object in req.json().get("sobjects", []):
objects_list = req.json().get("sobjects", [])
for object in objects_list:
is_name = object["name"] == self.stream_name
is_label = object["label"] == self.stream_name
is_label_plural = object["labelPlural"] == self.stream_name
Expand All @@ -732,14 +733,14 @@ def preprocess_record(self, record, context):
break

if not object_type:
self.logger.info(f"Skipping record, because {self.stream_name} was not found on Salesforce.")
return
self.logger.info(f"Record doesn't exist on Salesforce {self.stream_name} was not found on Salesforce.")
return {}

try:
fields = self.get_fields_for_object(object_type)
except MissingObjectInSalesforceError:
self.logger.info("Skipping record, because it was not found on Salesforce.")
return
return {}
record["object_type"] = object_type

# Try to find object instance
Expand All @@ -756,6 +757,9 @@ def preprocess_record(self, record, context):
return record

def upsert_record(self, record, context):
if record == {} or record is None:
return None, False, {}

state_updates = dict()

object_type = record.pop("object_type", None)
Expand Down
2 changes: 1 addition & 1 deletion target_salesforce_v3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_sink_class(self, stream_name: str):
return sink_class

# Search for streams with multiple names
if stream_name.lower() in sink_class.available_names:
if stream_name.lower() in sink_class.available_names: #[name.lower() for name in sink_class.available_names]:
return sink_class

# Adds a fallback sink for streams that are not supported
Expand Down

0 comments on commit 759fd6a

Please sign in to comment.