Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COPDS-1489: remove old requests #99

Merged
merged 7 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions alembic/versions/6ee20703d353_cascading_delete_on_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""cascading delete on events.

Revision ID: 6ee20703d353
Revises: 8924bc485ad5
Create Date: 2024-03-28 12:07:05.247016

"""
import datetime

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision = "6ee20703d353"
down_revision = "8924bc485ad5"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_column("system_requests", "request_id")
op.create_primary_key("system_requests_pkey", "system_requests", ["request_uid"])
op.drop_constraint("events_request_uid_fkey", "events")
op.drop_index("ix_system_requests_request_uid", "system_requests_pkey")
op.create_foreign_key(
"events_request_uid_fkey",
"events",
"system_requests",
["request_uid"],
["request_uid"],
ondelete="CASCADE",
)
op.add_column(
"adaptor_properties",
sa.Column("timestamp", sa.TIMESTAMP, default=sa.func.now()),
)
now_str = datetime.datetime.now().isoformat()
op.execute(
f"update adaptor_properties set timestamp='{now_str}' where timestamp is null"
)


def downgrade() -> None:
# do only on empty table
op.drop_constraint("events_request_uid_fkey", "events")
op.drop_constraint("system_requests_pkey", "system_requests")
op.add_column(
"system_requests", sa.Column("request_id", sa.Integer, primary_key=True)
)
op.create_foreign_key(
"events_request_uid_fkey",
"events",
"system_requests",
["request_uid"],
["request_uid"],
)
64 changes: 27 additions & 37 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Events(BaseModel):
event_type = sa.Column(sa.Text, index=True)
request_uid = sa.Column(
sa.dialects.postgresql.UUID(False),
sa.ForeignKey("system_requests.request_uid"),
sa.ForeignKey("system_requests.request_uid", ondelete="CASCADE"),
index=True,
)
message = sa.Column(sa.Text)
Expand All @@ -61,19 +61,15 @@ class AdaptorProperties(BaseModel):
hash = sa.Column(sa.Text, primary_key=True)
config = sa.Column(JSONB)
form = sa.Column(JSONB)
timestamp = sa.Column(sa.TIMESTAMP, default=sa.func.now())


class SystemRequest(BaseModel):
"""System Request ORM model."""

__tablename__ = "system_requests"

request_id = sa.Column(sa.Integer, primary_key=True)
request_uid = sa.Column(
sa.dialects.postgresql.UUID(False),
index=True,
unique=True,
)
request_uid = sa.Column(sa.dialects.postgresql.UUID(False), primary_key=True)
process_id = sa.Column(sa.Text, index=True)
user_uid = sa.Column(sa.Text, index=True)
status = sa.Column(status_enum)
Expand Down Expand Up @@ -102,11 +98,13 @@ class SystemRequest(BaseModel):
),
{},
)
# https://github.com/sqlalchemy/sqlalchemy/issues/11063#issuecomment-2008101926
__mapper_args__ = {"eager_defaults": False}

# joined is temporary
cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined")
adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="select")
events = sa.orm.relationship(Events, lazy="select")
events = sa.orm.relationship(Events, lazy="select", passive_deletes=True)

@property
def age(self):
Expand Down Expand Up @@ -507,27 +505,24 @@ def generate_adaptor_properties_hash(
).hexdigest()


def get_adaptor_properties(
adaptor_properties_hash: str,
session: sa.orm.Session,
) -> AdaptorProperties | None:
try:
statement = sa.select(AdaptorProperties.hash).where(
AdaptorProperties.hash == adaptor_properties_hash
)
return session.execute(statement).one()
except sqlalchemy.orm.exc.NoResultFound:
return None


def add_adaptor_properties(
def ensure_adaptor_properties(
hash: str,
config: dict[str, Any],
form: dict[str, Any],
session: sa.orm.Session,
):
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
session.add(adaptor_properties)
) -> None:
"""Create adaptor properties (if not exists) or update its timestamp."""
try:
statement = (
AdaptorProperties.__table__.update()
.returning(AdaptorProperties.hash)
.where(AdaptorProperties.__table__.c.hash == hash)
.values(timestamp=datetime.datetime.now())
)
session.execute(statement).one()
except sqlalchemy.orm.exc.NoResultFound:
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
session.add(adaptor_properties)


def add_event(
Expand Down Expand Up @@ -559,18 +554,12 @@ def create_request(
request_uid: str | None = None,
) -> dict[str, Any]:
"""Create a request."""
if (
get_adaptor_properties(
adaptor_properties_hash=adaptor_properties_hash, session=session
)
is None
):
add_adaptor_properties(
hash=adaptor_properties_hash,
config=adaptor_config,
form=adaptor_form,
session=session,
)
ensure_adaptor_properties(
hash=adaptor_properties_hash,
config=adaptor_config,
form=adaptor_form,
session=session,
)
metadata["resources"] = resources
metadata["qos_tags"] = qos_tags
request = SystemRequest(
Expand Down Expand Up @@ -670,6 +659,7 @@ def init_database(connection_string: str, force: bool = False) -> sa.engine.Engi
conn = engine.connect()
if "system_requests" not in conn.execute(query).scalars().all():
force = True
conn.close()
if force:
# cleanup and create the schema
BaseModel.metadata.drop_all(engine)
Expand Down
52 changes: 52 additions & 0 deletions cads_broker/entry_points.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module for entry points."""
import datetime
import os
from typing import Any, Optional

Expand All @@ -10,6 +11,57 @@
app = typer.Typer()


@app.command()
def remove_old_requests(
connection_string: Optional[str] = None, older_than_days: Optional[int] = 365
) -> None:
"""Remove records from the system_requests table older than `older_than_days`.

Parameters
----------
connection_string: something like 'postgresql://user:password@netloc:port/dbname'
older_than_days: minimum age (in days) to consider a record to be removed
"""
if not connection_string:
dbsettings = config.ensure_settings(config.dbsettings)
connection_string = dbsettings.connection_string
engine = sa.create_engine(connection_string)
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
# clean system requests and (via cascading delete) events
with engine.begin() as conn:
database.logger.info("deleting old system_requests and events...")
stmt = sa.delete(database.SystemRequest).where(
database.SystemRequest.created_at <= time_delta
)
result = conn.execute(stmt)
conn.commit()
num_requests_deleted = result.rowcount
database.logger.info(
f"{num_requests_deleted} old system requests "
f"successfully removed from the broker database."
)
# clean adaptor_properties
with engine.begin() as conn:
try:
database.logger.info("deleting old adaptor_properties...")
stmt_ap_delete = sa.delete(database.AdaptorProperties).where(
database.AdaptorProperties.timestamp <= time_delta
)
result = conn.execute(stmt_ap_delete)
conn.commit()
num_ap_deleted = result.rowcount
database.logger.info(
f"{num_ap_deleted} old adaptor properties "
f"successfully removed from the broker database."
)
return
except sa.exc.IntegrityError:
database.logger.error(
"cannot remove some old records from table adaptor_properties."
)
raise


@app.command()
def info(connection_string: Optional[str] = None) -> None:
"""Test connection to the database located at URI `connection_string`.
Expand Down
25 changes: 25 additions & 0 deletions tests/test_02_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,33 @@ def test_create_request(session_obj: sa.orm.sessionmaker) -> None:
db.SystemRequest.request_uid == request_dict["request_uid"]
)
request = session.scalars(statement).one()
adaptor_properties = request.adaptor_properties
assert adaptor_properties.hash == "adaptor_properties_hash"
initial_timestamp = adaptor_properties.timestamp
assert request.request_uid == request_dict["request_uid"]
assert request.user_uid == request_dict["user_uid"]
# create again a new request using the same adaptor properties: timestamp updated
with session_obj() as session:
request_dict = db.create_request(
user_uid="abc456",
setup_code="",
entry_point="sum",
request={},
metadata={},
process_id="submit-workflow",
session=session,
portal="c3s",
adaptor_config={"dummy_config": {"foo": "bar"}},
adaptor_form={},
adaptor_properties_hash="adaptor_properties_hash",
)
statement = sa.select(db.SystemRequest).where(
db.SystemRequest.request_uid == request_dict["request_uid"]
)
request = session.scalars(statement).one()
adaptor_properties = request.adaptor_properties
assert adaptor_properties.hash == "adaptor_properties_hash"
adaptor_properties.timestamp > initial_timestamp


def test_get_request(session_obj: sa.orm.sessionmaker) -> None:
Expand Down
2 changes: 0 additions & 2 deletions tests/test_20_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import random
import uuid
from typing import Any

Expand Down Expand Up @@ -32,7 +31,6 @@ def mock_system_request(
adaptor_properties_hash: str = "adaptor_properties_hash",
) -> db.SystemRequest:
system_request = db.SystemRequest(
request_id=random.randrange(1, 100),
request_uid=request_uid or str(uuid.uuid4()),
status=status,
created_at=created_at,
Expand Down
Loading
Loading