Skip to content

Commit

Permalink
fix(141): handle after_flush_postexec when creating version objects
Browse files Browse the repository at this point in the history
when creating version objects if a person has created after_flush_postexec
hook, which keeps calling `after_flush` untill it exhausts 100 attempts
or session is no longer dirty, this is picked up by mapper as after_update
which within same transaction adds a update operation type, so we have a
check if target is already in UoW we continue with operation type that
it is in untill transaction is completed.
  • Loading branch information
indiVar0508 committed Aug 8, 2024
1 parent 5f1e098 commit ffd2fb8
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
8 changes: 6 additions & 2 deletions sqlalchemy_history/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def add_update(self, target):
del state_copy[rel_key]

if state_copy:
self.add(Operation(target, Operation.UPDATE))

if target in self:
# If already in current transaction and some event hook did a update
# prior to commit hook, continue with operation type as it is
self.add(Operation(target, self[self.format_key(target)].type))
else:
self.add(Operation(target, Operation.UPDATE))
def add_delete(self, target):
self.add(Operation(target, Operation.DELETE))
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import datetime
import sqlalchemy as sa
from copy import copy

from tests import TestCase
from sqlalchemy_history import version_class


class TestBug141(TestCase):
# ref: https://github.com/corridor/sqlalchemy-history/issues/141
def create_models(self):
article_author_table = sa.Table(
"article_author",
self.Model.metadata,
sa.Column(
"article_id", sa.Integer, sa.ForeignKey("article.id"), primary_key=True, nullable=False
),
sa.Column("author_id", sa.Integer, sa.ForeignKey("author.id"), primary_key=True, nullable=False),
sa.Column(
"created_date",
sa.DateTime,
nullable=False,
server_default=sa.func.current_timestamp(),
default=lambda: datetime.datetime.now(datetime.timezone.utc)
),
)

class Article(self.Model):
__tablename__ = "article"
__versioned__ = copy(self.options)

id = sa.Column(
sa.Integer, sa.Sequence(f"{__tablename__}_seq", start=1), autoincrement=True, primary_key=True
)
name = sa.Column(sa.Unicode(255), nullable=False)
content = sa.Column(sa.UnicodeText)

class Author(self.Model):
__tablename__ = "author"
__versioned__ = copy(self.options)

id = sa.Column(
sa.Integer, sa.Sequence(f"{__tablename__}_seq", start=1), autoincrement=True, primary_key=True
)
name = sa.Column(sa.Unicode(255))
article_id = sa.Column(sa.Integer, sa.ForeignKey(Article.id))
articles = sa.orm.relationship(Article, secondary=article_author_table, backref="authors")

self.Article = Article
self.Author = Author
self.article_author_table = article_author_table

def test_add_record(self):
article = self.Article(name="Article 1")
author = self.Author(name="Author 1", articles=[article])
@sa.event.listens_for(self.session, 'after_flush_postexec')
def after_flush_postexec(session, flush_context):
if author.name != "yoyoyoyoyo":
author.name = "yoyoyoyoyo"
self.session.add(article)
self.session.add(author)
self.session.commit()

versioned_objs = self.session.query(version_class(self.Author)).all()
assert len(versioned_objs) == 1
assert versioned_objs[0].operation_type == 0
assert versioned_objs[0].name == "yoyoyoyoyo"
author.name = "sdfeoinfe"
self.session.add(author)
self.session.commit()
versioned_objs = self.session.query(version_class(self.Author)).all()
assert len(versioned_objs) == 2
assert versioned_objs[0].operation_type == 0
assert versioned_objs[1].operation_type == 1
assert versioned_objs[0].name == versioned_objs[1].name == "yoyoyoyoyo"
sa.event.remove(self.session, "after_flush_postexec", after_flush_postexec)

0 comments on commit ffd2fb8

Please sign in to comment.