Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/new-routing'
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Aug 21, 2024
2 parents 36c828f + 44cd60f commit 79b38c8
Show file tree
Hide file tree
Showing 32 changed files with 1,655 additions and 299 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.1-beta.11
0.3.1-beta.12
7 changes: 6 additions & 1 deletion alembic/versions/1c51b42f13d5_self_tests_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@


def upgrade():
op.execute("ALTER TABLE tests ADD COLUMN IF NOT EXISTS rule_type character varying NULL")
connection = op.get_bind()
dialect = connection.dialect
if dialect.name == "sqlite":
op.execute("ALTER TABLE tests ADD COLUMN rule_type character varying NULL")
else:
op.execute("ALTER TABLE tests ADD COLUMN IF NOT EXISTS rule_type character varying NULL")


def downgrade():
Expand Down
10 changes: 8 additions & 2 deletions alembic/versions/31a5db4f993e_subtasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@


def upgrade():
op.execute("ALTER TABLE tasks ADD COLUMN IF NOT EXISTS parent_id character varying NULL")
op.execute("ALTER TABLE task_events ADD COLUMN IF NOT EXISTS client_timestamp float NULL")
connection = op.get_bind()
dialect = connection.dialect
if dialect.name == "sqlite":
op.execute("ALTER TABLE tasks ADD COLUMN parent_id character varying NULL")
op.execute("ALTER TABLE task_events ADD COLUMN client_timestamp float NULL")
else:
op.execute("ALTER TABLE tasks ADD COLUMN IF NOT EXISTS parent_id character varying NULL")
op.execute("ALTER TABLE task_events ADD COLUMN IF NOT EXISTS client_timestamp float NULL")


def downgrade():
Expand Down
10 changes: 9 additions & 1 deletion alembic/versions/6041e3878f32_self_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@


def upgrade():
connection = op.get_bind()
dialect = connection.dialect
print(dialect.name)
if dialect.name == "sqlite":
jsonb = sa.Text() # type: ignore
else:
jsonb = postgresql.JSONB(astext_type=sa.Text()) # type: ignore

op.create_table(
"tests",
sa.Column("id", sa.String()),
Expand All @@ -26,7 +34,7 @@ def upgrade():
sa.Column("time_begin", sa.DateTime(), nullable=True),
sa.Column("time_end", sa.DateTime(), nullable=True),
sa.Column("status", sa.String(), nullable=True),
sa.Column("data", JSONB(), nullable=True),
sa.Column("data", jsonb, nullable=True),
sa.PrimaryKeyConstraint("id"),
)

Expand Down
11 changes: 9 additions & 2 deletions alembic/versions/9c38f4f15a29_processor_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@


def upgrade():
connection = op.get_bind()
dialect = connection.dialect
if dialect.name == "sqlite":
jsonb = sa.Text # type: ignore
else:
jsonb = postgresql.JSONB # type: ignore

op.create_table(
"processor_outputs",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
Expand All @@ -28,8 +35,8 @@ def upgrade():
sa.Column("task_mrn", sa.String),
sa.Column("module", sa.String),
sa.Column("index", sa.Integer),
sa.Column("settings", postgresql.JSONB),
sa.Column("output", postgresql.JSONB),
sa.Column("settings", jsonb),
sa.Column("output", jsonb),

)

Expand Down
7 changes: 6 additions & 1 deletion alembic/versions/af102cd510bd_remove_task_fk.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@


def upgrade():
op.drop_constraint("task_events_task_fk", "task_events", type_="foreignkey")
connection = op.get_bind()
dialect = connection.dialect
if dialect.name == "sqlite":
pass
else:
op.drop_constraint("task_events_task_fk", "task_events", type_="foreignkey")


def downgrade():
Expand Down
23 changes: 17 additions & 6 deletions alembic/versions/ee4575e2cf40_tasks_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,36 @@ def create_table(table_name, *params) -> None:

def upgrade():
# ### commands auto generated by Alembic - please adjust! ###

connection = op.get_bind()
dialect = connection.dialect
if dialect.name == "sqlite":
jsonb = sa.Text() # type: ignore
else:
jsonb = postgresql.JSONB(astext_type=sa.Text()) # type: ignore
create_table(
"tasks",
sa.Column("id", sa.String(), nullable=False),
sa.Column("time", sa.DateTime(), nullable=True),
sa.Column("series_uid", sa.String(), nullable=True),
sa.Column("study_uid", sa.String(), nullable=True),
sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("data", jsonb, nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.execute("ALTER TABLE dicom_series ADD COLUMN IF NOT EXISTS study_uid character varying NULL")
if dialect.name == "sqlite":
op.execute("ALTER TABLE dicom_series ADD COLUMN study_uid character varying NULL")
else:
op.execute("ALTER TABLE dicom_series ADD COLUMN IF NOT EXISTS study_uid character varying NULL")
# op.add_column("dicom_series", sa.Column("study_uid", sa.String(), nullable=True))

op.rename_table("series_events", "task_events")
op.add_column("task_events", sa.Column("task_id", sa.String(), nullable=True))
# op.drop_column('task_events', 'series_uid')
op.execute("ALTER SEQUENCE series_events_id_seq RENAME TO task_events_id_seq")
op.execute("ALTER INDEX series_events_pkey RENAME TO task_events_pkey")
op.create_foreign_key("task_events_task_fk", "task_events", "tasks", ["task_id"], ["id"])
if dialect.name == "sqlite":
pass
else:
op.execute("ALTER SEQUENCE series_events_id_seq RENAME TO task_events_id_seq")
op.execute("ALTER INDEX series_events_pkey RENAME TO task_events_pkey")
op.create_foreign_key("task_events_task_fk", "task_events", "tasks", ["task_id"], ["id"])
# ### end Alembic commands ###


Expand Down
Binary file modified bin/ubuntu22.04/getdcmtags
Binary file not shown.
35 changes: 16 additions & 19 deletions bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

# Standard python includes
import contextlib
import os
from pathlib import Path
import subprocess
Expand Down Expand Up @@ -402,37 +403,33 @@ async def store_processor_output(request) -> JSONResponse:
###################################################################################


app = Starlette(debug=bk_config.DEBUG_MODE, routes=router)
app.add_middleware(
AuthenticationMiddleware,
backend=TokenAuth(),
on_error=lambda _, exc: PlainTextResponse(str(exc), status_code=401),
)
app.mount("/query", query.query_app)


@app.on_event("startup")
async def startup() -> None:
"""Connects to database on startup. If the database does not exist, it will
be created."""
@contextlib.asynccontextmanager
async def lifespan(app):
await database.connect()
create_database()
bk_config.set_api_key()


@app.on_event("shutdown")
async def shutdown() -> None:
"""Disconnect from database on shutdown."""
yield
await database.disconnect()


@app.exception_handler(500)
async def server_error(request, exc) -> Response:
"""
Return an HTTP 500 page.
"""
return JSONResponse({"error": "Internal server error"}, status_code=500)

exception_handlers = {
500: server_error
}


app = Starlette(debug=bk_config.DEBUG_MODE, routes=router, lifespan=lifespan, exception_handlers=exception_handlers)
app.add_middleware(
AuthenticationMiddleware,
backend=TokenAuth(),
on_error=lambda _, exc: PlainTextResponse(str(exc), status_code=401),
)
app.mount("/query", query.query_app)

def main(args=sys.argv[1:]) -> None:
if "--reload" in args or os.getenv("MERCURE_ENV", "PROD").lower() == "dev":
Expand Down
5 changes: 3 additions & 2 deletions bookkeeping/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"""

# Standard python includes
from sqlalchemy.dialects.postgresql import JSONB
import sqlalchemy
from sqlalchemy.sql import func
import databases
Expand All @@ -25,7 +24,9 @@
database = databases.Database(bk_config.DATABASE_URL)
metadata = sqlalchemy.MetaData(schema=bk_config.DATABASE_SCHEMA)


# SQLite does not support JSONB natively, so we use TEXT instead
JSONB = sqlalchemy.types.Text() if 'sqlite://' in bk_config.DATABASE_URL else sqlalchemy.dialects.postgresql.JSONB
#
mercure_events = sqlalchemy.Table(
"mercure_events",
metadata,
Expand Down
6 changes: 5 additions & 1 deletion common/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,9 @@ def __del__(self) -> None:

def free(self) -> None:
if self.lockCreated:
self.lockfile.unlink()
try:
self.lockfile.unlink()
except FileNotFoundError:
# Lock file was already removed by someone else
pass
self.lockCreated = False
8 changes: 5 additions & 3 deletions common/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,21 @@ async def do_post(endpoint, kwargs, catch_errors=False) -> None:
return
logger.debug(f"Posting to {endpoint}: {kwargs}")
try:
async with aiohttp.ClientSession(headers={"Authorization": f"Token {api_key}"}) as session:
async with aiohttp.ClientSession(headers={"Authorization": f"Token {api_key}"},
timeout=aiohttp.ClientTimeout(total=None, connect=120, sock_connect=120, sock_read=120)
) as session:
async with session.post(bookkeeper_address + "/" + endpoint, **kwargs) as resp:
logger.debug(f"Response from {endpoint}: {resp.status}")
if resp.status != 200:
logger.warning(
f"Failed POST request {kwargs} to bookkeeper endpoint {endpoint}: status: {resp.status}"
)
except aiohttp.client.ClientError as e:
logger.error(f"Failed POST request to bookkeeper endpoint {endpoint}: {e}")
logger.error(f"Failed POST request to {bookkeeper_address}/{endpoint}: {e}")
if not catch_errors:
raise
except asyncio.TimeoutError as e:
logger.error(f"Failed POST request to bookkeeper endpoint {endpoint} with timeout: {e}")
logger.error(f"Failed POST request to {bookkeeper_address}/{endpoint} with timeout: {e}")
if not catch_errors:
raise

Expand Down
1 change: 0 additions & 1 deletion common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ class Config(BaseModel, Compat):
server_time: str = "UTC"
local_time: str = "UTC"


class TaskInfo(BaseModel, Compat):
action: Literal["route", "both", "process", "discard", "notification"]
uid: str
Expand Down
Loading

0 comments on commit 79b38c8

Please sign in to comment.