Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally process transactions in background #593

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 23 additions & 120 deletions gramps_webapi/api/resources/transactions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Gramps Web API - A RESTful API for the Gramps genealogy program
#
# Copyright (C) 2021-2023 David Straub
# Copyright (C) 2021-2024 David Straub
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -20,35 +20,19 @@
"""Raw database transaction API resource."""

import json
from typing import Dict

from flask import Response, request
from gramps.gen.db import DbTxn
from gramps.gen.db.base import DbReadBase
from flask_jwt_extended import get_jwt_identity
from gramps.gen.db.dbconst import TXNADD, TXNDEL, TXNUPD
from gramps.gen.errors import HandleError
from gramps.gen.lib.serialize import from_json, to_json
from gramps.gen.merge.diff import diff_items
from webargs import fields

from ...auth.const import PERM_ADD_OBJ, PERM_DEL_OBJ, PERM_EDIT_OBJ
from ...types import ResponseReturnValue
from ..auth import require_permissions
from ..search import (
SearchIndexer,
get_search_indexer,
SemanticSearchIndexer,
get_semantic_search_indexer,
)
from ..util import (
abort_with_message,
check_quota_people,
get_db_handle,
get_tree_from_jwt_or_fail,
update_usage_people,
use_args,
)
from ..tasks import AsyncResult, make_task_response, process_transactions, run_task
from ..util import abort_with_message, use_args, get_tree_from_jwt_or_fail
from . import ProtectedResource
from .util import app_has_semantic_search, reverse_transaction, transaction_to_json
from .util import reverse_transaction

trans_code = {"delete": TXNDEL, "add": TXNADD, "update": TXNUPD}

Expand All @@ -60,10 +44,11 @@ class TransactionsResource(ProtectedResource):
{
"undo": fields.Boolean(load_default=False),
"force": fields.Boolean(load_default=False),
"background": fields.Boolean(load_default=False),
},
location="query",
)
def post(self, args) -> Response:
def post(self, args) -> ResponseReturnValue:
"""Post the transaction."""
require_permissions([PERM_ADD_OBJ, PERM_EDIT_OBJ, PERM_DEL_OBJ])
payload = request.json
Expand All @@ -72,108 +57,26 @@ def post(self, args) -> Response:
is_undo = args["undo"]
if is_undo:
payload = reverse_transaction(payload)
db_handle = get_db_handle(readonly=False)
num_people_deleted = sum(
item["type"] == "delete" and item["_class"] == "Person" for item in payload
)
num_people_added = sum(
item["type"] == "add" and item["_class"] == "Person" for item in payload
)
num_people_new = num_people_added - num_people_deleted
check_quota_people(to_add=num_people_new)
with DbTxn("Raw transaction", db_handle) as trans:
for item in payload:
try:
class_name = item["_class"]
trans_type = item["type"]
handle = item["handle"]
old_data = item["old"]
if not args["force"] and not self.old_unchanged(
db_handle, class_name, handle, old_data
):
if num_people_added or num_people_deleted:
update_usage_people()
abort_with_message(409, "Object has changed")
new_data = item["new"]
if new_data:
new_obj = from_json(json.dumps(new_data))
if trans_type == "delete":
self.handle_delete(trans, class_name, handle)
if (
class_name == "Person"
and handle == db_handle.get_default_handle()
):
db_handle.set_default_person_handle(None)
elif trans_type == "add":
self.handle_add(trans, class_name, new_obj)
elif trans_type == "update":
self.handle_commit(trans, class_name, new_obj)
else:
if num_people_added or num_people_deleted:
update_usage_people()
abort_with_message(400, "Unexpected transaction type")
except (KeyError, UnicodeDecodeError, json.JSONDecodeError, TypeError):
if num_people_added or num_people_deleted:
update_usage_people()
abort_with_message(400, "Error while processing transaction")
trans_dict = transaction_to_json(trans)
if num_people_new:
update_usage_people()
# update search index
tree = get_tree_from_jwt_or_fail()
indexer: SearchIndexer = get_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
indexer.delete_object(handle, class_name)
else:
indexer.add_or_update_object(handle, db_handle, class_name)
if app_has_semantic_search():
semantic_indexer: SemanticSearchIndexer = get_semantic_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
semantic_indexer.delete_object(handle, class_name)
else:
semantic_indexer.add_or_update_object(handle, db_handle, class_name)
user_id = get_jwt_identity()
if args["background"]:
task = run_task(
process_transactions,
tree=tree,
user_id=user_id,
payload=payload,
force=args["force"],
)
if isinstance(task, AsyncResult):
return make_task_response(task)
return task, 200
trans_dict = process_transactions(
tree=tree, user_id=user_id, payload=payload, force=args["force"]
)
res = Response(
response=json.dumps(trans_dict),
status=200,
mimetype="application/json",
)
res.headers.add("X-Total-Count", str(len(trans_dict)))
return res

def handle_delete(self, trans: DbTxn, class_name: str, handle: str) -> None:
"""Handle a delete action."""
del_func = trans.db.method("remove_%s", class_name)
del_func(handle, trans)

def handle_commit(self, trans: DbTxn, class_name: str, obj) -> None:
"""Handle an update action."""
com_func = trans.db.method("commit_%s", class_name)
com_func(obj, trans)

def handle_add(self, trans: DbTxn, class_name: str, obj) -> None:
"""Handle an add action."""
if class_name != "Tag" and not obj.gramps_id:
abort_with_message(400, "Gramps ID missing")
self.handle_commit(trans, class_name, obj)

def old_unchanged(
self, db: DbReadBase, class_name: str, handle: str, old_data: Dict
) -> bool:
"""Check if the "old" object is still unchanged."""
handle_func = db.method("get_%s_from_handle", class_name)
try:
obj = handle_func(handle)
except HandleError:
if old_data is None:
return True
return False
obj_dict = json.loads(to_json(obj))
if diff_items(class_name, old_data, obj_dict):
return False
return True
128 changes: 125 additions & 3 deletions gramps_webapi/api/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#
# Gramps Web API - A RESTful API for the Gramps genealogy program
#
# Copyright (C) 2021-2023 David Straub
# Copyright (C) 2021-2024 David Straub
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -20,6 +19,7 @@

from __future__ import annotations

import json
import os
import uuid
from gettext import gettext as _
Expand All @@ -29,6 +29,11 @@
from celery import shared_task, Task
from celery.result import AsyncResult
from flask import current_app
from gramps.gen.db import DbTxn
from gramps.gen.lib.serialize import from_json, to_json
from gramps.gen.db.base import DbReadBase
from gramps.gen.errors import HandleError
from gramps.gen.merge.diff import diff_items

from gramps_webapi.api.search.indexer import SearchIndexer, SemanticSearchIndexer

Expand All @@ -40,9 +45,15 @@
from .media_importer import MediaImporter
from .report import run_report
from .resources.delete import delete_all_objects
from .resources.util import dry_run_import, run_import
from .resources.util import (
app_has_semantic_search,
dry_run_import,
run_import,
transaction_to_json,
)
from .search import get_search_indexer, get_semantic_search_indexer
from .util import (
abort_with_message,
check_quota_people,
close_db,
get_config,
Expand Down Expand Up @@ -410,3 +421,114 @@ def delete_objects(
self, title="Updating semantic search index..."
),
)


@shared_task(bind=True)
def process_transactions(
self, tree: str, user_id: str, payload: list[dict], force: bool
):
"""Process a set of database transactions, updating search indices as needed."""
num_people_deleted = sum(
item["type"] == "delete" and item["_class"] == "Person" for item in payload
)
num_people_added = sum(
item["type"] == "add" and item["_class"] == "Person" for item in payload
)
num_people_new = num_people_added - num_people_deleted
check_quota_people(to_add=num_people_new, tree=tree, user_id=user_id)
db_handle = get_db_outside_request(
tree=tree, view_private=True, readonly=False, user_id=user_id
)
with DbTxn("Raw transaction", db_handle) as trans:
for item in payload:
try:
class_name = item["_class"]
trans_type = item["type"]
handle = item["handle"]
old_data = item["old"]
if not force and not old_unchanged(
db_handle, class_name, handle, old_data
):
if num_people_added or num_people_deleted:
update_usage_people(tree=tree, user_id=user_id)
abort_with_message(409, "Object has changed")
new_data = item["new"]
if new_data:
new_obj = from_json(json.dumps(new_data))
if trans_type == "delete":
handle_delete(trans, class_name, handle)
if (
class_name == "Person"
and handle == db_handle.get_default_handle()
):
db_handle.set_default_person_handle(None)
elif trans_type == "add":
handle_add(trans, class_name, new_obj)
elif trans_type == "update":
handle_commit(trans, class_name, new_obj)
else:
if num_people_added or num_people_deleted:
update_usage_people(tree=tree, user_id=user_id)
abort_with_message(400, "Unexpected transaction type")
except (KeyError, UnicodeDecodeError, json.JSONDecodeError, TypeError):
if num_people_added or num_people_deleted:
update_usage_people(tree=tree, user_id=user_id)
abort_with_message(400, "Error while processing transaction")
trans_dict = transaction_to_json(trans)
if num_people_new:
update_usage_people(tree=tree, user_id=user_id)
# update search index
indexer: SearchIndexer = get_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
indexer.delete_object(handle, class_name)
else:
indexer.add_or_update_object(handle, db_handle, class_name)
# update semantic search index
if app_has_semantic_search():
semantic_indexer: SemanticSearchIndexer = get_semantic_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
semantic_indexer.delete_object(handle, class_name)
else:
semantic_indexer.add_or_update_object(handle, db_handle, class_name)
return trans_dict


def handle_delete(trans: DbTxn, class_name: str, handle: str) -> None:
"""Handle a delete action."""
del_func = trans.db.method("remove_%s", class_name)
del_func(handle, trans)


def handle_commit(trans: DbTxn, class_name: str, obj) -> None:
"""Handle an update action."""
com_func = trans.db.method("commit_%s", class_name)
com_func(obj, trans)


def handle_add(trans: DbTxn, class_name: str, obj) -> None:
"""Handle an add action."""
if class_name != "Tag" and not obj.gramps_id:
abort_with_message(400, "Gramps ID missing")
handle_commit(trans, class_name, obj)


def old_unchanged(db: DbReadBase, class_name: str, handle: str, old_data: Dict) -> bool:
"""Check if the "old" object is still unchanged."""
handle_func = db.method("get_%s_from_handle", class_name)
assert handle_func is not None, "No handle function found"
try:
obj = handle_func(handle)
except HandleError:
if old_data is None:
return True
return False
obj_dict = json.loads(to_json(obj))
if diff_items(class_name, old_data, obj_dict):
return False
return True
Loading