Skip to content

Commit

Permalink
feat: support SQLAlchemy 2.x (#150)
Browse files Browse the repository at this point in the history
Signed-off-by: Weston Steimel <[email protected]>
  • Loading branch information
westonsteimel authored Apr 17, 2023
1 parent fc69375 commit 01b30b9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ python-dateutil = "^2.8.2"
defusedxml = "^0.7.1"
dataclass-wizard = "^0.22.2"
orjson = "^3.8.6"
SQLAlchemy = "^1.4.46"
SQLAlchemy = ">= 1.4.46, < 3.0" # note: 1.4.x currently required for enterprise
mergedeep = "^1.3.4"
future = "^0.18.3"
importlib-metadata = "^6.1.0"
Expand Down
32 changes: 16 additions & 16 deletions src/vunnel/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,34 @@ def db_file_path(self) -> str:
return os.path.join(self.workspace.results_path, self.filename)

def _create_table(self) -> db.Table:
metadata = db.MetaData(self.engine)
metadata = db.MetaData()
table = db.Table(
self.table_name,
metadata,
db.Column("id", db.String(), primary_key=True, index=True),
db.Column("record", db.LargeBinary()),
)
metadata.create_all()
metadata.create_all(self.engine)
return table

def store(self, identifier: str, record: Envelope) -> None:
record_str = orjson.dumps(asdict(record))
conn, table = self.connection()

# upsert the record conditionally based on the skip_duplicates configuration

existing = conn.execute(table.select().where(table.c.id == identifier)).first()
if existing:
if self.skip_duplicates:
self.logger.warning(f"{identifier!r} entry already written (skipping)")
return
self.logger.trace(f"overwriting existing entry: {identifier!r}") # type: ignore[attr-defined]
statement = db.update(table).where(table.c.id == identifier).values(record=record_str)
else:
self.logger.trace(f"writing record to {identifier!r} key") # type: ignore[attr-defined]
statement = db.insert(table).values(id=identifier, record=record_str)

conn.execute(statement)
with conn.begin():
# upsert the record conditionally based on the skip_duplicates configuration
existing = conn.execute(table.select().where(table.c.id == identifier)).first()
if existing:
if self.skip_duplicates:
self.logger.warning(f"{identifier!r} entry already written (skipping)")
return
self.logger.trace(f"overwriting existing entry: {identifier!r}") # type: ignore[attr-defined]
statement = db.update(table).where(table.c.id == identifier).values(record=record_str)
else:
self.logger.trace(f"writing record to {identifier!r} key") # type: ignore[attr-defined]
statement = db.insert(table).values(id=identifier, record=record_str)

conn.execute(statement)

def close(self) -> None:
if self.conn:
Expand Down

0 comments on commit 01b30b9

Please sign in to comment.