Skip to content

Commit

Permalink
Merge pull request #99 from ecmwf-projects/COPDS-1489-old-requests
Browse files Browse the repository at this point in the history
COPDS-1489: remove old requests
  • Loading branch information
alex75 authored Apr 3, 2024
2 parents 8f34be8 + 0df66ea commit 6725fa3
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 39 deletions.
57 changes: 57 additions & 0 deletions alembic/versions/6ee20703d353_cascading_delete_on_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""cascading delete on events.
Revision ID: 6ee20703d353
Revises: 8924bc485ad5
Create Date: 2024-03-28 12:07:05.247016
"""
import datetime

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision = "6ee20703d353"
down_revision = "8924bc485ad5"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_column("system_requests", "request_id")
op.create_primary_key("system_requests_pkey", "system_requests", ["request_uid"])
op.drop_constraint("events_request_uid_fkey", "events")
op.drop_index("ix_system_requests_request_uid", "system_requests_pkey")
op.create_foreign_key(
"events_request_uid_fkey",
"events",
"system_requests",
["request_uid"],
["request_uid"],
ondelete="CASCADE",
)
op.add_column(
"adaptor_properties",
sa.Column("timestamp", sa.TIMESTAMP, default=sa.func.now()),
)
now_str = datetime.datetime.now().isoformat()
op.execute(
f"update adaptor_properties set timestamp='{now_str}' where timestamp is null"
)


def downgrade() -> None:
# do only on empty table
op.drop_constraint("events_request_uid_fkey", "events")
op.drop_constraint("system_requests_pkey", "system_requests")
op.add_column(
"system_requests", sa.Column("request_id", sa.Integer, primary_key=True)
)
op.create_foreign_key(
"events_request_uid_fkey",
"events",
"system_requests",
["request_uid"],
["request_uid"],
)
64 changes: 27 additions & 37 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Events(BaseModel):
event_type = sa.Column(sa.Text, index=True)
request_uid = sa.Column(
sa.dialects.postgresql.UUID(False),
sa.ForeignKey("system_requests.request_uid"),
sa.ForeignKey("system_requests.request_uid", ondelete="CASCADE"),
index=True,
)
message = sa.Column(sa.Text)
Expand All @@ -61,19 +61,15 @@ class AdaptorProperties(BaseModel):
hash = sa.Column(sa.Text, primary_key=True)
config = sa.Column(JSONB)
form = sa.Column(JSONB)
timestamp = sa.Column(sa.TIMESTAMP, default=sa.func.now())


class SystemRequest(BaseModel):
"""System Request ORM model."""

__tablename__ = "system_requests"

request_id = sa.Column(sa.Integer, primary_key=True)
request_uid = sa.Column(
sa.dialects.postgresql.UUID(False),
index=True,
unique=True,
)
request_uid = sa.Column(sa.dialects.postgresql.UUID(False), primary_key=True)
process_id = sa.Column(sa.Text, index=True)
user_uid = sa.Column(sa.Text, index=True)
status = sa.Column(status_enum)
Expand Down Expand Up @@ -102,11 +98,13 @@ class SystemRequest(BaseModel):
),
{},
)
# https://github.com/sqlalchemy/sqlalchemy/issues/11063#issuecomment-2008101926
__mapper_args__ = {"eager_defaults": False}

# joined is temporary
cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined")
adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="select")
events = sa.orm.relationship(Events, lazy="select")
events = sa.orm.relationship(Events, lazy="select", passive_deletes=True)

@property
def age(self):
Expand Down Expand Up @@ -515,27 +513,24 @@ def generate_adaptor_properties_hash(
).hexdigest()


def get_adaptor_properties(
adaptor_properties_hash: str,
session: sa.orm.Session,
) -> AdaptorProperties | None:
try:
statement = sa.select(AdaptorProperties.hash).where(
AdaptorProperties.hash == adaptor_properties_hash
)
return session.execute(statement).one()
except sqlalchemy.orm.exc.NoResultFound:
return None


def add_adaptor_properties(
def ensure_adaptor_properties(
hash: str,
config: dict[str, Any],
form: dict[str, Any],
session: sa.orm.Session,
):
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
session.add(adaptor_properties)
) -> None:
"""Create adaptor properties (if not exists) or update its timestamp."""
try:
statement = (
AdaptorProperties.__table__.update()
.returning(AdaptorProperties.hash)
.where(AdaptorProperties.__table__.c.hash == hash)
.values(timestamp=datetime.datetime.now())
)
session.execute(statement).one()
except sqlalchemy.orm.exc.NoResultFound:
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
session.add(adaptor_properties)


def add_event(
Expand Down Expand Up @@ -567,18 +562,12 @@ def create_request(
request_uid: str | None = None,
) -> dict[str, Any]:
"""Create a request."""
if (
get_adaptor_properties(
adaptor_properties_hash=adaptor_properties_hash, session=session
)
is None
):
add_adaptor_properties(
hash=adaptor_properties_hash,
config=adaptor_config,
form=adaptor_form,
session=session,
)
ensure_adaptor_properties(
hash=adaptor_properties_hash,
config=adaptor_config,
form=adaptor_form,
session=session,
)
metadata["resources"] = resources
metadata["qos_tags"] = qos_tags
request = SystemRequest(
Expand Down Expand Up @@ -678,6 +667,7 @@ def init_database(connection_string: str, force: bool = False) -> sa.engine.Engi
conn = engine.connect()
if "system_requests" not in conn.execute(query).scalars().all():
force = True
conn.close()
if force:
# cleanup and create the schema
BaseModel.metadata.drop_all(engine)
Expand Down
52 changes: 52 additions & 0 deletions cads_broker/entry_points.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module for entry points."""
import datetime
import os
from typing import Any, Optional

Expand All @@ -10,6 +11,57 @@
app = typer.Typer()


@app.command()
def remove_old_requests(
connection_string: Optional[str] = None, older_than_days: Optional[int] = 365
) -> None:
"""Remove records from the system_requests table older than `older_than_days`.
Parameters
----------
connection_string: something like 'postgresql://user:password@netloc:port/dbname'
older_than_days: minimum age (in days) to consider a record to be removed
"""
if not connection_string:
dbsettings = config.ensure_settings(config.dbsettings)
connection_string = dbsettings.connection_string
engine = sa.create_engine(connection_string)
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
# clean system requests and (via cascading delete) events
with engine.begin() as conn:
database.logger.info("deleting old system_requests and events...")
stmt = sa.delete(database.SystemRequest).where(
database.SystemRequest.created_at <= time_delta
)
result = conn.execute(stmt)
conn.commit()
num_requests_deleted = result.rowcount
database.logger.info(
f"{num_requests_deleted} old system requests "
f"successfully removed from the broker database."
)
# clean adaptor_properties
with engine.begin() as conn:
try:
database.logger.info("deleting old adaptor_properties...")
stmt_ap_delete = sa.delete(database.AdaptorProperties).where(
database.AdaptorProperties.timestamp <= time_delta
)
result = conn.execute(stmt_ap_delete)
conn.commit()
num_ap_deleted = result.rowcount
database.logger.info(
f"{num_ap_deleted} old adaptor properties "
f"successfully removed from the broker database."
)
return
except sa.exc.IntegrityError:
database.logger.error(
"cannot remove some old records from table adaptor_properties."
)
raise


@app.command()
def info(connection_string: Optional[str] = None) -> None:
"""Test connection to the database located at URI `connection_string`.
Expand Down
25 changes: 25 additions & 0 deletions tests/test_02_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,33 @@ def test_create_request(session_obj: sa.orm.sessionmaker) -> None:
db.SystemRequest.request_uid == request_dict["request_uid"]
)
request = session.scalars(statement).one()
adaptor_properties = request.adaptor_properties
assert adaptor_properties.hash == "adaptor_properties_hash"
initial_timestamp = adaptor_properties.timestamp
assert request.request_uid == request_dict["request_uid"]
assert request.user_uid == request_dict["user_uid"]
# create again a new request using the same adaptor properties: timestamp updated
with session_obj() as session:
request_dict = db.create_request(
user_uid="abc456",
setup_code="",
entry_point="sum",
request={},
metadata={},
process_id="submit-workflow",
session=session,
portal="c3s",
adaptor_config={"dummy_config": {"foo": "bar"}},
adaptor_form={},
adaptor_properties_hash="adaptor_properties_hash",
)
statement = sa.select(db.SystemRequest).where(
db.SystemRequest.request_uid == request_dict["request_uid"]
)
request = session.scalars(statement).one()
adaptor_properties = request.adaptor_properties
assert adaptor_properties.hash == "adaptor_properties_hash"
adaptor_properties.timestamp > initial_timestamp


def test_get_request(session_obj: sa.orm.sessionmaker) -> None:
Expand Down
2 changes: 0 additions & 2 deletions tests/test_20_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import random
import uuid
from typing import Any

Expand Down Expand Up @@ -32,7 +31,6 @@ def mock_system_request(
adaptor_properties_hash: str = "adaptor_properties_hash",
) -> db.SystemRequest:
system_request = db.SystemRequest(
request_id=random.randrange(1, 100),
request_uid=request_uid or str(uuid.uuid4()),
status=status,
created_at=created_at,
Expand Down
Loading

0 comments on commit 6725fa3

Please sign in to comment.