-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PROD-2734 - initial partitioning support (user-defined windows) #5325
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some self review -
partitioning: | ||
field: billing_address_id | ||
windows: | ||
- start: "0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be nice to allow proper ints here, but we do also need to support timestamps, and easiest for now is just to accept generic strings...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is where the issue is or downstream from this but I ran into the same thing for the SaaS config. You can do this:
class UserDefinedPartitionWindow(BaseModel):
"""Defines a user-defined partition window"""
start: Union[int, str]
end: Union[int, str]
start_inclusive: bool = True
end_inclusive: bool = True
Pydantic will try to parse as an int first, and if that doesn't work it will treat it as a string
requirements.txt
Outdated
@@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218 | |||
expandvars==0.9.0 | |||
fastapi[all]==0.111.0 | |||
fastapi-pagination[sqlalchemy]==0.12.25 | |||
fideslang==3.0.4 | |||
fideslang[all] @ git+https://github.com/ethyca/fideslang@64c15da6e742e5c7cecd3b2484d2b66dbb94fce8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: update once ethyca/fideslang#21 is finalized and merged
@@ -458,7 +466,7 @@ def get_update_stmt( | |||
pk_clauses: List[str], | |||
) -> str: | |||
"""Returns a SQL UPDATE statement to fit SQL syntax.""" | |||
return f"UPDATE {self.node.address.collection} SET {', '.join(update_clauses)} WHERE {' AND '.join(pk_clauses)}" | |||
return f"UPDATE {self.node.address.collection} SET {', '.join(update_clauses)} WHERE {' AND '.join(pk_clauses + ([PARTITION_CLAUSE_TEMPLATE] if self.partitioning else []))}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: could be cleaner
@@ -476,7 +484,7 @@ def format_key_map_for_update_stmt(self, fields: List[str]) -> List[str]: | |||
|
|||
def generate_update_stmt( | |||
self, row: Row, policy: Policy, request: PrivacyRequest | |||
) -> Optional[T]: | |||
) -> Optional[T] | List[T]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: probably could be cleaner
logger.info( | ||
"query = {}, params = {}", Pii(query_str), Pii(update_value_map) | ||
) | ||
return self.format_query_stmt(query_str, update_value_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before, just moved down here into the else
.
partitioned_queries = [] | ||
logger.info( | ||
f"Generating {len(partition_var_sets)} partition queries for node '{self.node.address}' in DSR execution" | ||
) | ||
for partition_var_set in partition_var_sets: | ||
|
||
partitioned_update_value_map = update_value_map.copy() | ||
partitioned_update_value_map.update(partition_var_set) | ||
logger.info( | ||
"query = {}, params = {}", | ||
Pii(query_str), | ||
Pii(partitioned_update_value_map), | ||
) | ||
partitioned_queries.append( | ||
self.format_query_stmt(query_str, partitioned_update_value_map) | ||
) | ||
return partitioned_queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
injection of the partitioning vars, a new piece
results = connection.execute(stmt) | ||
rows = self.cursor_result_to_rows(results) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before
logger.info( | ||
f"Executing {len(partition_var_sets)} partition queries for node '{node.address}' in DSR execution" | ||
) | ||
for partition_var_set in partition_var_sets: | ||
logger.debug( | ||
f"Executing partition query with start '{partition_var_set['partition_start']}' and end ''{partition_var_set['partition_end']}'" | ||
) | ||
results = connection.execute(stmt, partition_var_set) | ||
rows.extend(self.cursor_result_to_rows(results)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
injection of partitioning vars, the new piece
return create_engine( | ||
uri, | ||
credentials_info=credentials_info, | ||
hide_parameters=self.hide_parameters, | ||
echo=not self.hide_parameters, | ||
return BigQueryClient( | ||
credentials=service_account.Credentials.from_service_account_info( | ||
credentials_info # pylint: disable=no-member | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, need to clean up. mistake.
fides Run #10306
Run Properties:
|
Project |
fides
|
Branch Review |
refs/pull/5325/merge
|
Run status |
Passed #10306
|
Run duration | 00m 39s |
Commit |
2c672a21ae ℹ️: Merge 445def52ba7bb05fb87210978ede924c1a72f4cc into eb9ba5ee372b244db58c108ead82...
|
Committer | Adam Sachs |
View all properties for this run ↗︎ |
Test results | |
---|---|
Failures |
0
|
Flaky |
0
|
Pending |
0
|
Skipped |
0
|
Passing |
4
|
View all changes introduced in this branch ↗︎ |
partitioning: | ||
field: billing_address_id | ||
windows: | ||
- start: "0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is where the issue is or downstream from this but I ran into the same thing for the SaaS config. You can do this:
class UserDefinedPartitionWindow(BaseModel):
"""Defines a user-defined partition window"""
start: Union[int, str]
end: Union[int, str]
start_inclusive: bool = True
end_inclusive: bool = True
Pydantic will try to parse as an int first, and if that doesn't work it will treat it as a string
@@ -458,7 +466,7 @@ def get_update_stmt( | |||
pk_clauses: List[str], | |||
) -> str: | |||
"""Returns a SQL UPDATE statement to fit SQL syntax.""" | |||
return f"UPDATE {self.node.address.collection} SET {', '.join(update_clauses)} WHERE {' AND '.join(pk_clauses)}" | |||
return f"UPDATE {self.node.address.collection} SET {', '.join(update_clauses)} WHERE {' AND '.join(pk_clauses + ([PARTITION_CLAUSE_TEMPLATE] if self.partitioning else []))}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to change self.node.address.collection
to self._generate_table_name()
once you pull in my change
return f"UPDATE {self.node.address.collection} SET {', '.join(update_clauses)} WHERE {' AND '.join(pk_clauses + ([PARTITION_CLAUSE_TEMPLATE] if self.partitioning else []))}" | |
return f"UPDATE {self._generate_table_name()} SET {', '.join(update_clauses)} WHERE {' AND '.join(pk_clauses + ([PARTITION_CLAUSE_TEMPLATE] if self.partitioning else []))}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok, well i've done this on the abstract class, but looks like i missed updating the subclasses that override this method - most importantly, bigquery.
thanks for calling that out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see, i think this also changed on the delete override PR 👍
probably was a bit more overlap here than anticipated...
text_clause = text_clauses[0] | ||
assert ( | ||
text_clause.text | ||
== """UPDATE customer SET name = :name WHERE id = :id AND (:partition_field :greater_than_operand :partition_start AND :partition_field :less_than_operand :partition_end)""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, let's add another test to make sure we can generate a namespaced version of the table name
2e19433
to
774fd99
Compare
|
||
return partitioned_queries | ||
|
||
return [table.delete().where(*pk_clauses)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fall back to existing behavior if no partitioning
|
||
return partitioned_queries | ||
|
||
return [table.update().where(*pk_clauses).values(**update_value_map)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fall back to existing behavior if no partitioning
1441ec7
to
086484c
Compare
…ion, for bq only. no validation (yet)
086484c
to
e314302
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally this makes sense - we're trying to support querying on a partitioned database with a very narrow scope, bigquery only to start, and the where clause is defined with bigquery syntax in the yaml that is added directly -
Given all that, I'd lean to keeping everything defined on the bigquery connector/bigquery queryconfig for now, as this is a super-specific implementation
@@ -339,6 +343,7 @@ def convert_dataset_to_graph( | |||
data_categories=( | |||
set(collection.data_categories) if collection.data_categories else set() | |||
), | |||
partitioning=collection_partitioning, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me, if we end up removing support for the where clauses since it's experimental, we'll have to do it in a way that still allows DSR's in process to run:
DSR 3.0 (which our customers should all be running by now) stores a representation of this collection on the requesttask
backing the node in the database, so we don't have to rebuild the graph. If details change about the partitioning, there could still be requesttasks
that haven't run in the database with the old partitioning configuration which would fail when they are queued by the worker and the worker goes to rehydrate the collection off of the request task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for calling that out, great edge case to consider. i don't think there's a whole lot to do right now to anticipate that future state, beyond some code comments to warn the future dev that they'll need to handle that backward compatible case, right? if you have other ideas, would love to hear them to make this as seamless as possible a transition in the (hopefully near) future!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a callout would be fine!
logger.warning( | ||
"Partitioning is only supported on BigQuery connectors at this time!" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems useful, I could have added that for bigquery row-level deletes as well
def get_partition_clauses( | ||
self, | ||
) -> List[str]: | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we for sure only support bigquery at the moment, it felt too early to move this to the generic location, but that's just my opinion
if self.partitioning: | ||
partition_clauses = self.get_partition_clauses() | ||
partitioned_queries = [] | ||
logger.info( | ||
f"Generating {len(partition_clauses)} partition queries for node '{self.node.address}' in DSR execution" | ||
) | ||
|
||
for partition_clause in partition_clauses: | ||
partitioned_queries.append( | ||
table.delete().where(*(pk_clauses + [text(partition_clause)])) | ||
) | ||
|
||
return partitioned_queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be able to dry some of this up with the update/deletes, just a thought
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i hear you and agree, but the codepaths were already a bit duplicated to begin with, and it'll take a bit more effort to pull apart at the moment. i'll mark as a TODO for us to come back to generally DRYing up the two methods in a future iteration 👍
if query_config.partitioning: | ||
partition_clauses = query_config.get_partition_clauses() | ||
logger.info( | ||
f"Executing {len(partition_clauses)} partition queries for node '{node.address}' in DSR execution" | ||
) | ||
for partition_clause in partition_clauses: | ||
logger.debug( | ||
f"Executing partition query with partition clause '{partition_clause}'" | ||
) | ||
existing_bind_params = stmt.compile().params | ||
partitioned_stmt = text( | ||
f"{stmt} AND ({text(partition_clause)})" | ||
).params(existing_bind_params) | ||
results = connection.execute(partitioned_stmt) | ||
rows.extend(self.cursor_result_to_rows(results)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it shouldn't be here yet while it's only supported for bigquery -
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fair, but i'd generally prefer not to create parallel implementations for retrieve_data
(for maintenance purposes) and i'm not seeing a simple way to refactor this otherwise - but maybe i'm not being creative enough!
src/fides/api/graph/config.py
Outdated
raise ValueError("`where_clauses` must be a list of strings!") | ||
for partition_clause in where_clauses: | ||
if matching := match(PARTITION_CLAUSE_PATTERN, partition_clause): | ||
if matching["field_1"] != matching["field_2"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is field_1 and field_2? can you add an example?
EDIT okay got it, parsing your PARTITION_CLAUSE_PATTERN now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is there is no match then, something didn't fit the pattern and the match was None?
EDIT: nvm, that would be picked up in the if matching
:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are all partitioning clauses expected to be on the same field as well, or do we just care about the pairs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is field_1 and field_2? can you add an example?EDIT okay got it, parsing your PARTITION_CLAUSE_PATTERN now
let me still clarify things with some comments so this is easier to understand from a quick glance 👍
are all partitioning clauses expected to be on the same field as well, or do we just care about the pairs?
yeah, i do think we'd expect that, but i'm not sure that checking for it really helps in preventing malicious input. granted, i don't have a super concrete example of the type of malicious input that the current check prevents, but it does at least put some more bounds on each of the clause strings. i wasn't think that checking across strings provides a whole lot of value - but i guess it could help prevent/clarify user error?
just typing out my thoughts...
src/fides/api/graph/config.py
Outdated
@@ -443,6 +444,9 @@ class MaskingOverride: | |||
length: Optional[int] | |||
|
|||
|
|||
PARTITION_CLAUSE_PATTERN = r"`(?P<field_1>[a-zA-Z0-9_]*)` ([<|>][=]?) ([a-zA-Z0-9_\s(),.]*) AND `(?P<field_2>[a-zA-Z0-9_]*)` ([<|>][=]?) ([a-zA-Z0-9_\s(),.]*)$" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is any of this bigquery specific? if so we might note that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah good call, i think the back ticks may be BQ-specific, and we're already scoped down to BQ in other respects so let me just make that more explicit 👍
Reviewing/testing - |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_bigquery_example_data
looks like a new test failure? that should be quick to resolve, I think it's just looking at table names
PROHIBITED_KEYWORDS = [ | ||
"UNION", | ||
"INSERT", | ||
"UPDATE", | ||
"CREATE", | ||
"DROP", | ||
"SELECT", | ||
"CHAR", | ||
"HAVING", | ||
"EXEC", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great this helps add confidence
To be clear, some notable constraints on the input: | ||
- the clause string must begin by referencing a column name wrapped by backticks (`) | ||
- the clause string must compare that first column with a `<>(=)` operator, and may | ||
include at most one other conditional with a `<>(=)` operator that's joined to the first | ||
conditional via an AND operator | ||
- if the clause string contains a second conditional, it must reference the same column name | ||
as the first conditional, also wrapped by backticks | ||
- column names (wrapped by backticks) must always be on the _left_ side of the `<>(=)`operator | ||
in its conditional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V. helpful
if where_clauses := partitioning.get("where_clauses"): | ||
if not isinstance(where_clauses, List) or not all( | ||
isinstance(where_clause, str) for where_clause in where_clauses | ||
): | ||
raise ValueError("`where_clauses` must be a list of strings!") | ||
for partition_clause in where_clauses: | ||
if matching := match( | ||
BIGQUERY_PARTITION_CLAUSE_PATTERN, partition_clause | ||
): | ||
# check that if there are two field comparison sub-clauses, they reference the same field, e.g.: | ||
# "`my_field_1` > 5 AND `my_field_1` <= 10", not "`my_field_1` > 5 AND `my_field_1` <= 10" | ||
if matching["field_2"] is not None and ( | ||
matching["field_1"] != matching["field_2"] | ||
): | ||
raise ValueError( | ||
f"Partition clause must have matching fields. Identified non-matching field references '{matching['field_1']}' and '{matching['field_2']}" | ||
) | ||
|
||
for prohibited_keyword in PROHIBITED_KEYWORDS: | ||
search_str = prohibited_keyword.lower() + r"\s" | ||
if search(search_str, partition_clause.lower()): | ||
raise ValueError( | ||
"Prohibited keyword referenced in partition clause" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you've done what you can given that we're allowing a where clause to be entered directly here! Lots of good checks.
if ( | ||
query_config.partitioning | ||
): # only BigQuery supports partitioning, for now | ||
return self.partitioned_retrieval(query_config, connection, stmt) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do like this, thanks
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5325 +/- ##
==========================================
- Coverage 85.44% 85.31% -0.14%
==========================================
Files 378 378
Lines 23896 23971 +75
Branches 3189 3209 +20
==========================================
+ Hits 20419 20450 +31
- Misses 2892 2931 +39
- Partials 585 590 +5 ☔ View full report in Codecov by Sentry. |
merging as all CI failures already exist in |
fides Run #10305
Run Properties:
|
Project |
fides
|
Branch Review |
main
|
Run status |
Passed #10305
|
Run duration | 00m 39s |
Commit |
c8f5b1f365: PROD-2734 - initial partitioning support (user-defined windows) (#5325)
|
Committer | Adam Sachs |
View all properties for this run ↗︎ |
Test results | |
---|---|
Failures |
0
|
Flaky |
0
|
Pending |
0
|
Skipped |
0
|
Passing |
4
|
View all changes introduced in this branch ↗︎ |
Closes PROD-2734
Description Of Changes
DatasetCollection
partitioning specifications fideslang#21fides_meta.partitioning.where_clauses
(maybe there's a better name?) field on theDatasetCollection
config, which are (for BQ DSR execution) injected into access and erasure querieswhere_clauses
string per node per DSR request, rather that just one query per node per DSR request.where_clauses
input to prevent SQL injectionConfiguration is within
fides_meta
of aDatasetCollection
, e.g.:where_clauses
values must strictly match an expected format/structure:or
Code Changes
partitioning
metadata from dataset configwhere_clauses
values in thepartitioning
metadata to prevent malicious string injectionretrieve_data
to call apartitioned_retrieval
method if the collection haspartitioning
defined (and the connector supports partitioning, i.e. only BigQuery for now)partitioned_retrieval
hook in theBigQueryConnector
to support executing multiple partitioned queries for DSR access requests against BQSteps to Confirm
Pre-Merge Checklist
CHANGELOG.md
main
downgrade()
migration is correct and works