Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upsert when Snowfakery is used within CumulusCI #644

Merged
merged 6 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cumulusci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,34 @@ flows:
num_records: 5
num_records_tablename: User

test_upserts:
steps:
1:
task: snowfakery
options:
recipe: tests/upsert.yml
set_recently_viewed: False

2:
task: query
options:
object: Contact
query: "select Id,Email,Name from Contact where LastName='Bluth'"
result_file: /tmp/contacts_before.csv

3:
task: snowfakery
options:
recipe: tests/upsert-2.yml
set_recently_viewed: False

4:
task: query
options:
object: Contact
query: "select Id,Email,Name from Contact where LastName='Bluth'"
result_file: /tmp/contacts_after.csv

orgs:
scratch:
person_and_communities:
Expand Down
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,8 @@ An update recipe should have a single top-level object with no `count` on it.
The recipe can take `options` if needed. It will generate the same number of
output rows as input rows.

To do updates in a Salesforce org, refer to the [CumulusCI documentation](https://cumulusci.readthedocs.io/en/stable/data.html#update-data).

## Use Snowfakery with Salesforce

Snowfakery recipes that generate Salesforce records are like any other Snowfakery recipes, but instead use `SObject` names for the `objects`. There are several examples [in the Snowfakery repository](https://github.com/SFDO-Tooling/Snowfakery/tree/main/examples/salesforce).
Expand Down
4 changes: 4 additions & 0 deletions docs/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,7 @@ Files can be used as Salesforce ContentVersions like this:
FirstPublishLocationId:
reference: Account
```

## Other features

To do updates or upserts in a Salesforce org, refer to the [CumulusCI documentation](https://cumulusci.readthedocs.io/en/stable/data.html#update-data).
26 changes: 9 additions & 17 deletions schema/snowfakery_recipe.jsonschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
"just_once": {
"type": "boolean"
},
"update_key": {
"type": "string"
},
"count": {
"anyOf": [
{
Expand Down Expand Up @@ -79,9 +82,7 @@
"type": "string"
}
},
"required": [
"object"
]
"required": ["object"]
},
"include_file": {
"title": "Include File",
Expand All @@ -93,9 +94,7 @@
"type": "string"
}
},
"required": [
"include_file"
]
"required": ["include_file"]
},
"plugin": {
"title": "Plugin Declaration",
Expand All @@ -107,9 +106,7 @@
"type": "string"
}
},
"required": [
"plugin"
]
"required": ["plugin"]
},
"var": {
"title": "Variable",
Expand Down Expand Up @@ -151,10 +148,7 @@
]
}
},
"required": [
"var",
"value"
]
"required": ["var", "value"]
},
"option": {
"title": "Variable",
Expand Down Expand Up @@ -196,9 +190,7 @@
]
}
},
"required": [
"option"
]
"required": ["option"]
},
"macro": {
"title": "Macro",
Expand Down Expand Up @@ -245,4 +237,4 @@
"required": ["snowfakery_version"]
}
}
}
}
8 changes: 8 additions & 0 deletions snowfakery/data_generator_runtime_object_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def __init__(
just_once: bool = False,
fields: Sequence = (),
friends: Sequence = (),
update_key: str = None,
):
self.tablename = tablename
self.nickname = nickname
Expand All @@ -146,6 +147,7 @@ def __init__(
self.fields = fields
self.friends = friends
self.for_each_expr = for_each_expr
self.update_key = update_key

if count_expr and for_each_expr:
raise DataGenSyntaxError(
Expand Down Expand Up @@ -245,6 +247,12 @@ def _generate_row(
"""Generate an individual row"""
id = context.generate_id(self.nickname)
row = {"id": id}

# add a column keeping track of what update_key was specified by
# the template. This allows multiple templates to have different
# update_keys.
if self.update_key:
row["_sf_update_key"] = self.update_key
sobj = ObjectRow(self.tablename, row, index)

context.register_object(sobj, self.nickname, self.just_once)
Expand Down
75 changes: 61 additions & 14 deletions snowfakery/generate_mapping_from_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@
from snowfakery.salesforce import find_record_type_column
from snowfakery.data_generator_runtime import Dependency
from snowfakery.utils.collections import OrderedSet
from snowfakery.parse_recipe_yaml import TableInfo

from .cci_mapping_files.declaration_parser import SObjectRuleDeclaration
from .cci_mapping_files.post_processes import add_after_statements


class LoadStep(T.NamedTuple):
action: str
table_name: str
update_key: T.Optional[str]
fields: T.Sequence[str]


def mapping_from_recipe_templates(
summary: ExecutionSummary,
declarations: T.Mapping[str, SObjectRuleDeclaration] = None,
Expand All @@ -27,12 +35,37 @@ def mapping_from_recipe_templates(
table_order = sort_dependencies(
inferred_dependencies, declared_dependencies, tables
)
mappings = mappings_from_sorted_tables(
tables, table_order, reference_fields, declarations
)
load_steps = load_steps_from_tableinfos(tables, table_order)
mappings = mappings_from_load_steps(load_steps, reference_fields, declarations)
return mappings


def load_steps_from_tableinfos(
tables: T.Dict[str, TableInfo], table_order: T.List[str]
) -> T.List[LoadStep]:
load_steps = OrderedSet()
for table_name, tableinfo in tables.items():
for template in tableinfo._templates:

if template.update_key:
action = "upsert"
else:
action = "insert"

load_steps.add(
LoadStep(
action,
table_name,
template.update_key,
tuple(tableinfo.fields.keys()),
)
)

load_steps_as_list = list(load_steps)
load_steps_as_list.sort(key=lambda step: table_order.index(step.table_name))
return load_steps_as_list


def remove_person_contact_id(dependencies, tables):
if "Account" in dependencies:
dep_to_person_contact = [
Expand Down Expand Up @@ -123,21 +156,20 @@ def sort_dependencies(inferred_dependencies, declared_dependencies, tables):
return sorted_tables


def mappings_from_sorted_tables(
tables: dict,
table_order: list,
def mappings_from_load_steps(
load_steps: T.List[LoadStep],
reference_fields: dict,
declarations: T.Mapping[str, SObjectRuleDeclaration],
):
"""Generate mapping.yml data structures."""
mappings = {}
for table_name in table_order:
table = tables[table_name]
record_type_col = find_record_type_column(table_name, table.fields.keys())
for load_step in load_steps:
table_name = load_step.table_name
record_type_col = find_record_type_column(table_name, load_step.fields)

fields = {
fieldname: fieldname
for fieldname, fielddef in table.fields.items()
for fieldname in load_step.fields
if (table_name, fieldname) not in reference_fields.keys()
and fieldname != record_type_col
}
Expand All @@ -149,22 +181,37 @@ def mappings_from_sorted_tables(
"table": reference_fields[(table_name, fieldname)],
"key_field": fieldname,
}
for fieldname, fielddef in table.fields.items()
for fieldname in load_step.fields
if (table_name, fieldname) in reference_fields.keys()
}
if table_name == "PersonContact":
sf_object = "Contact"
else:
sf_object = table.name
mapping = {"sf_object": sf_object, "table": table.name, "fields": fields}
sf_object = table_name
mapping = {"sf_object": sf_object, "table": table_name, "fields": fields}
if lookups:
mapping["lookups"] = lookups

sobject_declarations = declarations.get(table_name)
if sobject_declarations:
mapping.update(sobject_declarations.as_mapping())

mappings[f"Insert {table_name}"] = mapping
if load_step.update_key:
mapping["action"] = "upsert"
mapping["update_key"] = load_step.update_key
mapping["filters"] = [f"_sf_update_key = '{load_step.update_key}'"]
step_name = f"Upsert {table_name} on {load_step.update_key}"
else:
step_name = f"Insert {table_name}"
any_other_loadstep_for_this_table_has_update_key = any(
ls
for ls in load_steps
if (ls.table_name == table_name and ls.update_key)
)
if any_other_loadstep_for_this_table_has_update_key:
mapping["filters"] = ["_sf_update_key = NULL"]

mappings[step_name] = mapping

add_after_statements(mappings)
return mappings
20 changes: 17 additions & 3 deletions snowfakery/output_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from collections import namedtuple, defaultdict
from typing import Dict, Union, Optional, Mapping, Callable, Sequence
import typing as T
from warnings import warn

from sqlalchemy import (
Expand Down Expand Up @@ -342,14 +343,19 @@ def create_or_validate_tables(self, inferred_tables: Dict[str, TableInfo]) -> No

for tablename, model in self.metadata.tables.items():
if tablename in inferred_tables:
self.table_info[tablename] = TableTuple(
table_info = TableTuple(
insert_statement=model.insert(bind=self.engine, inline=True),
fallback_dict={
key: None for key in inferred_tables[tablename].fields.keys()
},
)
# id is special
self.table_info[tablename].fallback_dict.setdefault("id", None)
table_info.fallback_dict.setdefault("id", None)

# See create_tables_from_inferred_fields to see what _sf_update_key are for
if inferred_tables[tablename].has_update_keys:
table_info.fallback_dict.setdefault("_sf_update_key", None)
self.table_info[tablename] = table_info


# backwards-compatible name for CCI
Expand Down Expand Up @@ -404,7 +410,9 @@ def close(self, *args, **kwargs):
self.tempdir.cleanup()


def create_tables_from_inferred_fields(tables, engine, metadata):
def create_tables_from_inferred_fields(
tables: T.Dict[str, TableInfo], engine, metadata
):
"""Create tables based on dictionary of tables->field-list."""
with engine.connect() as conn:
inspector = inspect(engine)
Expand All @@ -423,6 +431,12 @@ def create_tables_from_inferred_fields(tables, engine, metadata):
"id", Integer(), primary_key=True, autoincrement=True
)

# add a column keeping track of what update_key was specified by
# the template. This allows multiple templates to have different
# update_keys.
if table.has_update_keys:
columns.append(Column("_sf_update_key", Unicode(255)))

t = Table(table_name, metadata, id_column, *columns)

if inspector.has_table(table_name):
Expand Down
5 changes: 5 additions & 0 deletions snowfakery/parse_recipe_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, name):
self.name = name
self.fields = {}
self.friends = {}
self.has_update_keys = False
self._templates = []

def register(self, template: ObjectTemplate) -> None:
Expand All @@ -79,6 +80,8 @@ def register(self, template: ObjectTemplate) -> None:
if hasattr(friend, "tablename")
}
)
if template.update_key:
self.has_update_keys = True
self._templates.append(template)

def __repr__(self) -> str:
Expand Down Expand Up @@ -362,6 +365,7 @@ def parse_object_template(yaml_sobj: Dict, context: ParseContext) -> ObjectTempl
"just_once": bool,
"for_each": dict,
"count": (str, int, dict),
"update_key": str,
},
context=context,
)
Expand All @@ -385,6 +389,7 @@ def parse_object_template(yaml_sobj: Dict, context: ParseContext) -> ObjectTempl
sobj_def["nickname"] = nickname = parsed_template.nickname
check_identifier(nickname, yaml_sobj, "Nicknames")
sobj_def["just_once"] = parsed_template.just_once or False
sobj_def["update_key"] = parsed_template.update_key or None
sobj_def["line_num"] = parsed_template.line_num.line_num
sobj_def["filename"] = parsed_template.line_num.filename

Expand Down
Loading