Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass DataStore Upsert Original pSQL Error Messages #188

Merged
merged 10 commits into from
Feb 10, 2025
1 change: 1 addition & 0 deletions changes/188.canada.changes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DataStore upsert error dicts now pass the original pSQL error message and code stored in `upsert_info`
297 changes: 234 additions & 63 deletions ckanext/datastore/backend/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from __future__ import annotations

import itertools
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
import re
import string

from typing_extensions import TypeAlias

Expand Down Expand Up @@ -91,6 +95,16 @@
_UPSERT = 'upsert'
_UPDATE = 'update'

# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
# noqa_reason: regex is fine
# type_ignore_reason: regex is fine
FK_DETAILS_MATCH__KEYS = re.compile(
'(?<=DETAIL:).*(\((.*)\))=') # noqa: W605 # type: ignore
FK_DETAILS_MATCH__VALUES = re.compile(
'(?<=DETAIL:).*(\((.*)\))') # noqa: W605 # type: ignore
FK_DETAILS_MATCH__TABLE = re.compile('(?<=DETAIL:).*"(.*?)"')


if not os.environ.get('DATASTORE_LOAD'):
ValidationError = toolkit.ValidationError # type: ignore
Expand All @@ -106,6 +120,71 @@ def __init__(self, error_dict: ErrorDict):
_engines = {}


# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
class PartialFormat(Dict[Any, Any]):
"""
String Formatter class for partial format replacements.

Allows for a string to be formatted without all of
the expected formatting keys.
"""
def __missing__(self, key: str) -> str:
return "{" + key + "}"


# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
def _parse_constraint_error_from_psql_error(exception: Exception,
error_message: str) -> Dict[str, Any]:
"""
Parses the pSQL original constraint error string to determine
the referenced/referencing keys, values, and table. Formatting
the passed error message with the values.

Returns a dict with the formatted error message and extra
constraint info parsed from the pSQL error message.
"""
sql_original_msg = None
sql_original_code = None
if hasattr(exception, 'orig'):
sql_original_msg = str(exception.orig)
sql_original_code = exception.orig.pgcode
elif hasattr(exception, 'error_dict') and 'info' in exception.error_dict \
and 'orig' in exception.error_dict['info']:
# for the scenario if an extension wants to reparse
# the sql error with a new custom error message
sql_original_msg = exception.error_dict['info']['orig']
sql_original_code = exception.error_dict['info']['pgcode']
keys_match = re.search(FK_DETAILS_MATCH__KEYS, sql_original_msg)
values_match = re.search(FK_DETAILS_MATCH__VALUES, sql_original_msg)
table_match = re.search(FK_DETAILS_MATCH__TABLE, sql_original_msg)
ref_keys = keys_match.group(2) if keys_match else None
ref_values = values_match.group(2) if values_match else None
ref_resource = table_match.group(1) if table_match else None
error_message = toolkit._(error_message) # gettext before formatting
formatter = string.Formatter()
# partial replacements to handle custom error messages
# that may not contain all the replacement keys.
mapping = PartialFormat(refKeys=ref_keys,
refValues=ref_values,
refTable=ref_resource)
return {
'errors': {
'foreign_constraint': [formatter.vformat(error_message, (), mapping)]
},
'constraint_info': {
'ref_keys': ref_keys,
'ref_values': ref_values,
'ref_resource': ref_resource
},
'info': {
'orig': sql_original_msg,
'pgcode': sql_original_code
}
}


def literal_string(s: str):
"""
Return s as a postgres literal string
Expand Down Expand Up @@ -331,44 +410,97 @@ def _get_fields(connection: Any, resource_id: str):
# (canada fork only): foreign keys
# TODO: upstream contrib!!
def _get_foreign_constraints(connection, resource_id, return_constraint_names=False):
u'''
return a list of foreign constraint names.
'''
# p.conrelid: The table this constraint is on
# p.conindid: The index supporting this constraint
# p.confrelid: The referenced table
# p.confupdtype: Foreign key update action code (#TODO: support `cascade`??)
# p.confdeltype: Foreign key deletion action code (#TODO: support `cascade`??)
# p.conkey: List of the constrained columns (int2, pg_attribute.attnun {The number of the column})
# p.confkey: List of the referenced columns (int2, pg_attribute.attnun {The number of the column})
#FIXME: this is just super broken.
foreign_constraints = {} if not return_constraint_names else []
foreign_constraints_sql = sa.text(u'''
SELECT
p.conname as constraint_name,
c1.relname as foreign_table,
co1.column_name as foreign_column,
co2.column_name as primary_column
FROM pg_constraint p
LEFT JOIN pg_class c1 ON c1.oid = p.confrelid
LEFT JOIN pg_class c2 ON c2.oid = p.conrelid
LEFT JOIN information_schema.columns co1 ON
(SELECT relname FROM pg_class WHERE oid = p.confrelid) = co1.table_name
AND co1.ordinal_position = ANY(p.confkey)
LEFT JOIN information_schema.columns co2 ON
(SELECT relname FROM pg_class WHERE oid = p.conrelid) = co2.table_name
AND co2.ordinal_position = ANY(p.conkey)
WHERE p.contype = 'f'
AND c2.relname = '{0}'
ORDER BY p.oid;
'''.format(resource_id))
foreign_constraints_results = connection.execute(foreign_constraints_sql)
for result in foreign_constraints_results.fetchall():
if return_constraint_names:
foreign_constraints.append(result.constraint_name)
continue
foreign_constraints[result.foreign_table] = {}
foreign_constraints[result.foreign_table][result.primary_column] = result.foreign_column
Return a list of foreign constraint names or the info for the constraints.
'''
foreign_constraints = []
if return_constraint_names:
foreign_constraints_sql = sa.text('''
SELECT
p.conname as constraint_name
FROM pg_constraint p
LEFT JOIN pg_class c1 ON c1.oid = p.confrelid
LEFT JOIN pg_class c2 ON c2.oid = p.conrelid
WHERE p.contype = 'f'
AND c2.relname = {0}
ORDER BY p.oid;
'''.format(literal_string(resource_id)))
foreign_constraints_results = connection.execute(foreign_constraints_sql)
foreign_constraints += [r.constraint_name for r in foreign_constraints_results.fetchall()]
else:
foreign_constraints_sql = sa.text('''
SELECT
con.conname AS constraint_name,
child_table.relname AS child_table,
child_cols.attname AS child_column,
parent_table.relname AS parent_table,
parent_cols.attname AS parent_column,
rc.update_rule as update_rule,
rc.delete_rule as delete_rule
FROM pg_constraint con
JOIN information_schema.referential_constraints rc ON
con.conname = rc.constraint_name
JOIN pg_class child_table ON con.conrelid = child_table.oid
JOIN pg_class parent_table ON con.confrelid = parent_table.oid
JOIN LATERAL unnest(con.conkey) WITH ORDINALITY AS child_col_num(child_attnum, ord) ON true
JOIN LATERAL unnest(con.confkey) WITH ORDINALITY AS parent_col_num(parent_attnum, ord)
ON child_col_num.ord = parent_col_num.ord
JOIN pg_attribute child_cols ON child_cols.attrelid = con.conrelid
AND child_cols.attnum = child_col_num.child_attnum
JOIN pg_attribute parent_cols ON parent_cols.attrelid = con.confrelid
AND parent_cols.attnum = parent_col_num.parent_attnum
WHERE con.contype = 'f'
AND child_table.relname = {0}
ORDER BY con.conname, child_col_num.ord;
'''.format(literal_string(resource_id)))
foreign_constraints_results = connection.execute(foreign_constraints_sql)
db_results = foreign_constraints_results.fetchall()
foreign_constraint_usages_sql = sa.text('''
SELECT DISTINCT
con.conname AS constraint_name,
child_table.relname AS child_table,
child_cols.attname AS child_column,
parent_table.relname AS parent_table,
parent_cols.attname AS parent_column,
rc.update_rule as update_rule,
rc.delete_rule as delete_rule
FROM pg_constraint con
JOIN information_schema.constraint_column_usage colus ON colus.constraint_name = con.conname
JOIN information_schema.referential_constraints rc ON
con.conname = rc.constraint_name
JOIN pg_class child_table ON con.conrelid = child_table.oid
JOIN pg_class parent_table ON con.confrelid = parent_table.oid
JOIN LATERAL unnest(con.conkey) WITH ORDINALITY AS child_col_num(child_attnum, ord) ON true
JOIN LATERAL unnest(con.confkey) WITH ORDINALITY AS parent_col_num(parent_attnum, ord)
ON child_col_num.ord = parent_col_num.ord
JOIN pg_attribute child_cols ON child_cols.attrelid = con.conrelid
AND child_cols.attnum = child_col_num.child_attnum
JOIN pg_attribute parent_cols ON parent_cols.attrelid = con.confrelid
AND parent_cols.attnum = parent_col_num.parent_attnum
WHERE con.contype = 'f'
AND colus.table_name = {0}
ORDER BY con.conname;
'''.format(literal_string(resource_id)))
foreign_constraint_usages_results = connection.execute(foreign_constraint_usages_sql)
db_results += foreign_constraint_usages_results.fetchall()
# Group by constraints and output in the correct List order.
# This way the JSON will always appear in the correct order,
# and the child_columns and parent_columns array indices match
# up like ordinal positions.
foreign_constraints_by_name = {}
for result in db_results:
if result.constraint_name not in foreign_constraints_by_name:
foreign_constraints_by_name[result.constraint_name] = {
'update_rule': result.update_rule,
'delete_rule': result.delete_rule,
'child_table': result.child_table,
'child_columns': [],
'parent_table': result.parent_table,
'parent_columns': []
}
foreign_constraints_by_name[result.constraint_name]['child_columns'].append(result.child_column)
foreign_constraints_by_name[result.constraint_name]['parent_columns'].append(result.parent_column)
foreign_constraints += foreign_constraints_by_name.values()
return foreign_constraints


Expand Down Expand Up @@ -1351,8 +1483,20 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
try:
context['connection'].execute(sa.text(sql_string), rows)
except (DatabaseError, DataError) as err:
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
errmsg = _programming_error_summary(err)
if 'violates foreign key constraint' in errmsg:
_ = lambda x:x
errmsg = _(
'Cannot insert records ({refValues}) because '\
'they do not exist in the referenced table. '\
'Referencing {refKeys} from {refTable}.')
raise ValidationError(dict(
_parse_constraint_error_from_psql_error(err, errmsg),
records_row=num))
raise ValidationError({
'records': [_programming_error_summary(err)],
'records': [errmsg],
'records_row': num,
})

Expand Down Expand Up @@ -1444,8 +1588,20 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
sa.text(sql_string),
{**used_values, **unique_values})
except DatabaseError as err:
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
errmsg = _programming_error_summary(err)
if 'violates foreign key constraint' in errmsg:
_ = lambda x:x
errmsg = _(
'Cannot insert records ({refValues}) because '\
'they do not exist in the referenced table. '\
'Referencing {refKeys} from {refTable}.')
raise ValidationError(dict(
_parse_constraint_error_from_psql_error(err, errmsg),
records_row=num))
raise ValidationError({
'records': [_programming_error_summary(err)],
'records': [errmsg],
'records_row': num,
})

Expand Down Expand Up @@ -1490,8 +1646,20 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
context['connection'].execute(
sa.text(insert_string), values)
except DatabaseError as err:
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
errmsg = _programming_error_summary(err)
if 'violates foreign key constraint' in errmsg:
_ = lambda x:x
errmsg = _(
'Cannot insert records ({refValues}) because '\
'they do not exist in the referenced table. '\
'Referencing {refKeys} from {refTable}.')
raise ValidationError(dict(
_parse_constraint_error_from_psql_error(err, errmsg),
records_row=num))
raise ValidationError({
'records': [_programming_error_summary(err)],
'records': [errmsg],
'records_row': num,
})

Expand Down Expand Up @@ -2238,15 +2406,14 @@ def delete(self, context: Context, data_dict: dict[str, Any]):
#FIXME: flagged issue, referencing someone else's table will limit their table from the constraints.
except (IntegrityError, ProgrammingError, InternalError) as e:
if e.orig.pgcode == _PG_ERR_CODE['row_referenced_constraint'] or e.orig.pgcode == _PG_ERR_CODE['table_referenced_constraint']:
# FIXME: better error message??
raise ValidationError({
'foreign_constraints': ['Cannot delete records or table'
' because of a reference to another table.'],
'info': {
'orig': str(e.orig),
'pgcode': e.orig.pgcode
}
})
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
_ = lambda x:x
errmsg = _(
'Cannot delete records or table because of '
'a reference to another table. '
'Referencing {refKeys}({refValues}) from {refTable}.')
raise ValidationError(_parse_constraint_error_from_psql_error(e, errmsg))
raise

def create(
Expand Down Expand Up @@ -2318,15 +2485,14 @@ def create(
# TODO: upstream contrib!!
except ProgrammingError as e:
if e.orig.pgcode == _PG_ERR_CODE['no_unique_constraint']:
# FIXME: better error message??
raise ValidationError({
'foreign_constraints': ['Cannot insert records or create index'
' because of a foreign constraint'],
'info': {
'orig': str(e.orig),
'pgcode': e.orig.pgcode
}
})
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
_ = lambda x:x
errmsg = _(
'Cannot insert records ({refValues}) or create index because '\
'they do not exist in the referenced table. '\
'Referencing {refKeys} from {refTable}.')
raise ValidationError(_parse_constraint_error_from_psql_error(e, errmsg))
raise
except DataError as e:
raise ValidationError(cast(ErrorDict, {
Expand Down Expand Up @@ -2460,11 +2626,6 @@ def resource_fields(self, id: str) -> dict[str, Any]:
aliases.append(alias[0])
info['meta']['aliases'] = aliases

# (canada fork only): foreign keys
# TODO: upstream contrib!!
#FIXME: fix later, multiple column foreign keys and ordered dict/lists?
#info['foreign_keys'] = _get_foreign_constraints(engine, id)

# get the data dictionary for the resource
with engine.connect() as conn:
data_dictionary = _result_fields(
Expand Down Expand Up @@ -2523,8 +2684,18 @@ def resource_fields(self, id: str) -> dict[str, Any]:
'is_index': row.is_index,
'uniquekey': row.uniquekey,
'foreignkeys': row.foreignkeys}

schemainfo[colname] = colinfo

# (canada fork only): foreign keys
# TODO: upstream contrib!!
# constraint info requires higher perms
write_engine = self._get_write_engine()
with write_engine.connect() as conn:
fk_info = _get_foreign_constraints(conn, id)
if fk_info:
info['meta']['foreignkeys'] = fk_info

for field in data_dictionary:
if field['id'].startswith('_'):
continue
Expand Down