Skip to content

Commit

Permalink
fix tests: accidentally was reading from /opt/mercure, fixing require…
Browse files Browse the repository at this point in the history
…d delaying resolving database connection information until runtime

add basic booting test for bookkeeper
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Oct 22, 2024
1 parent 8960f75 commit 0d028ca
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 375 deletions.
24 changes: 17 additions & 7 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.set_main_option('sqlalchemy.url', os.getenv("DATABASE_URL", "not set"))
_os_env_database_url = os.getenv("DATABASE_URL")
if _os_env_database_url is not None:
config.set_main_option('sqlalchemy.url', _os_env_database_url)

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name) # type: ignore
# fileConfig(config.config_file_name) # type: ignore

# add your model's MetaData object here
# for 'autogenerate' support
import bookkeeper
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = bookkeeper.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand All @@ -41,7 +39,15 @@ def run_migrations_offline() -> None:
script output.
"""
url = config.get_main_option("sqlalchemy.url")
import bookkeeping.database as db
# url = config.get_main_option("sqlalchemy.url")
url = os.environ.get('DATABASE_URL')
schema = os.environ.get('DATABASE_SCHEMA')
if schema == 'None':
schema = None
db.init_database(url, schema)
target_metadata = db.metadata

context.configure(
url=url,
target_metadata=target_metadata,
Expand All @@ -60,6 +66,10 @@ def run_migrations_online() -> None:
and associate a connection with the context.
"""
import bookkeeping.database as db
db.init_database(os.environ.get('DATABASE_URL'), os.environ.get('DATABASE_SCHEMA'))
target_metadata = db.metadata

connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
Expand Down
103 changes: 54 additions & 49 deletions bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
bookkeeper.py
=============
The bookkeeper service of mercure, which receives notifications from all mercure services
and stores the information in a Postgres database.
and stores the information in a database.
"""

# Standard python includes
Expand Down Expand Up @@ -32,7 +32,7 @@
from common import config
import common.monitor as monitor
from common.constants import mercure_defs
from bookkeeping.database import *
import bookkeeping.database as db
import bookkeeping.query as query
import bookkeeping.config as bk_config
from decoRouter import Router as decoRouter
Expand Down Expand Up @@ -63,18 +63,14 @@ async def verify(self, token: str):
###################################################################################


def create_database() -> None:
"""Creates all tables in the database if they do not exist."""
subprocess.run(
["alembic", "upgrade", "head"],
check=True,
env={
**os.environ,
"PATH": "/opt/mercure/env/bin:" + os.environ["PATH"],
"DATABASE_URL": bk_config.DATABASE_URL,
},
)
from alembic.config import Config
from alembic import command

def create_database() -> None:
alembic_cfg = Config()
alembic_cfg.set_main_option('script_location', os.path.dirname(os.path.realpath(__file__))+'/alembic')
alembic_cfg.set_main_option('sqlalchemy.url', bk_config.DATABASE_URL)
command.upgrade(alembic_cfg, 'head')


###################################################################################
Expand All @@ -83,7 +79,7 @@ def create_database() -> None:

# async def execute_db_operation(operation) -> None:
# global connection
# """Executes a previously prepared database operation."""
# """Executes a previously prepared db.database operation."""
# try:
# connection.execute(operation)
# except:
Expand All @@ -106,10 +102,10 @@ async def post_mercure_event(request) -> JSONResponse:
severity = int(payload.get("severity", monitor.severity.INFO))
description = payload.get("description", "")

query = mercure_events.insert().values(
query = db.mercure_events.insert().values(
sender=sender, event=event, severity=severity, description=description, time=datetime.datetime.now()
)
result = await database.execute(query)
result = await db.database.execute(query)
logger.debug(result)
return JSONResponse({"ok": ""})

Expand Down Expand Up @@ -138,16 +134,16 @@ async def processor_logs(request) -> JSONResponse:
if (logs_folder_str := config.mercure.processing_logs.logs_file_store) and (
logs_path := Path(logs_folder_str)
).exists():
query = processor_logs_table.insert().values(task_id=task_id, module_name=module_name, time=time, logs=None)
result = await database.execute(query)
query = db.processor_logs_table.insert().values(task_id=task_id, module_name=module_name, time=time, logs=None)
result = await db.database.execute(query)

logs_path = logs_path / task_id
logs_path.mkdir(exist_ok=True)
logs_file = logs_path / f"{module_name}.{str(result)}.txt"
logs_file.write_text(logs, encoding="utf-8")
else:
query = processor_logs_table.insert().values(task_id=task_id, module_name=module_name, time=time, logs=logs)
result = await database.execute(query)
query = db.processor_logs_table.insert().values(task_id=task_id, module_name=module_name, time=time, logs=logs)
result = await db.database.execute(query)

logger.debug(result)
return JSONResponse({"ok": ""})
Expand All @@ -163,10 +159,10 @@ async def post_webgui_event(request) -> JSONResponse:
user = payload.get("user", "UNKNOWN")
description = payload.get("description", "")

query = webgui_events.insert().values(
query = db.webgui_events.insert().values(
sender=sender, event=event, user=user, description=description, time=datetime.datetime.now()
)
await database.execute(query)
await db.database.execute(query)
# tasks = BackgroundTasks()
# tasks.add_task(execute_db_operation, operation=query)
return JSONResponse({"ok": ""})
Expand All @@ -181,10 +177,10 @@ async def register_dicom(request) -> JSONResponse:
file_uid = payload.get("file_uid", "")
series_uid = payload.get("series_uid", "")

query = dicom_files.insert().values(
query = db.dicom_files.insert().values(
filename=filename, file_uid=file_uid, series_uid=series_uid, time=datetime.datetime.now()
)
result = await database.execute(query)
result = await db.database.execute(query)
logger.debug(f"Result: {result}")

# tasks = BackgroundTasks()
Expand All @@ -194,7 +190,7 @@ async def register_dicom(request) -> JSONResponse:

async def parse_and_submit_tags(payload) -> None:
"""Helper function that reads series information from the request body."""
query = dicom_series.insert().values(
query = db.dicom_series.insert().values(
time=datetime.datetime.now(),
series_uid=payload.get("SeriesInstanceUID", ""),
study_uid=payload.get("StudyInstanceUID", ""),
Expand Down Expand Up @@ -227,7 +223,7 @@ async def parse_and_submit_tags(payload) -> None:
tag_softwareversions=payload.get("SoftwareVersions", ""),
tag_stationname=payload.get("StationName", ""),
)
await database.execute(query)
await db.database.execute(query)


@router.post("/register-series")
Expand All @@ -251,7 +247,7 @@ async def register_task(request) -> JSONResponse:
# Registering the task ordinarily happens first, but if "update-task"
# came in first, we need to update the task instead. So we do an upsert.
query = (
insert(tasks_table)
insert(db.tasks_table)
.values(
id=payload["id"],
series_uid=payload["series_uid"],
Expand All @@ -267,7 +263,7 @@ async def register_task(request) -> JSONResponse:
)
)

await database.execute(query)
await db.database.execute(query)
return JSONResponse({"ok": ""})


Expand Down Expand Up @@ -295,14 +291,14 @@ async def update_task(request) -> JSONResponse:
# Ordinarily, update-task is called on an existing task. But if the task is
# not yet registered, we need to create it. So we use an upsert here.
query = (
insert(tasks_table)
insert(db.tasks_table)
.values(**update_values)
.on_conflict_do_update( # update if exists
index_elements=["id"],
set_=update_values,
)
)
await database.execute(query)
await db.database.execute(query)
return JSONResponse({"ok": ""})


Expand All @@ -314,7 +310,7 @@ async def test_begin(request) -> JSONResponse:
type = payload.get("type", "route")
rule_type = payload.get("rule_type", "series")
task_id = payload.get("task_id", None)
query_a = insert(tests_table).values(
query_a = insert(db.tests_table).values(
id=id, time_begin=datetime.datetime.now(), type=type, status="begin", task_id=task_id, rule_type=rule_type
)

Expand All @@ -324,7 +320,7 @@ async def test_begin(request) -> JSONResponse:
"task_id": task_id or query_a.excluded.task_id,
},
)
await database.execute(query)
await db.database.execute(query)
return JSONResponse({"ok": ""})


Expand All @@ -335,8 +331,8 @@ async def test_end(request) -> JSONResponse:
id = payload["id"]
status = payload.get("status", "")

query = tests_table.update(tests_table.c.id == id).values(time_end=datetime.datetime.now(), status=status)
await database.execute(query)
query = db.tests_table.update(db.tests_table.c.id == id).values(time_end=datetime.datetime.now(), status=status)
await db.database.execute(query)
return JSONResponse({"ok": ""})


Expand Down Expand Up @@ -373,7 +369,7 @@ async def post_task_event(request) -> JSONResponse:
info = payload.get("info", "")
task_id = payload.get("task_id")

query = task_events.insert().values(
query = db.task_events.insert().values(
sender=sender,
event=event,
task_id=task_id,
Expand All @@ -384,7 +380,7 @@ async def post_task_event(request) -> JSONResponse:
time=event_time,
client_timestamp=client_timestamp,
)
await database.execute(query)
await db.database.execute(query)
return JSONResponse({"ok": ""})


Expand All @@ -393,8 +389,8 @@ async def post_task_event(request) -> JSONResponse:
async def store_processor_output(request) -> JSONResponse:
payload = dict(await request.json())
values_dict = {k:payload[k] for k in ("task_id", "task_acc", "task_mrn", "module", "index", "settings", "output")}
query = processor_outputs_table.insert().values(**values_dict)
await database.execute(query)
query = db.processor_outputs_table.insert().values(**values_dict)
await db.database.execute(query)
return JSONResponse({"ok": ""})


Expand All @@ -405,11 +401,12 @@ async def store_processor_output(request) -> JSONResponse:

@contextlib.asynccontextmanager
async def lifespan(app):
await database.connect()
await db.database.connect()
assert db.metadata
create_database()
bk_config.set_api_key()
yield
await database.disconnect()
await db.database.disconnect()


async def server_error(request, exc) -> Response:
Expand All @@ -422,14 +419,19 @@ async def server_error(request, exc) -> Response:
500: server_error
}

app = None

app = Starlette(debug=bk_config.DEBUG_MODE, routes=router, lifespan=lifespan, exception_handlers=exception_handlers)
app.add_middleware(
AuthenticationMiddleware,
backend=TokenAuth(),
on_error=lambda _, exc: PlainTextResponse(str(exc), status_code=401),
)
app.mount("/query", query.query_app)
def create_app() -> Starlette:
global app
bk_config.read_bookkeeper_config()
app = Starlette(debug=bk_config.DEBUG_MODE, routes=router, lifespan=lifespan, exception_handlers=exception_handlers)
app.add_middleware(
AuthenticationMiddleware,
backend=TokenAuth(),
on_error=lambda _, exc: PlainTextResponse(str(exc), status_code=401),
)
app.mount("/query", query.query_app)
return app

def main(args=sys.argv[1:]) -> None:
if "--reload" in args or os.getenv("MERCURE_ENV", "PROD").lower() == "dev":
Expand All @@ -445,15 +447,18 @@ def main(args=sys.argv[1:]) -> None:
logger.info("")

try:
bk_config.read_bookkeeper_config()
db.init_database()
config.read_config()
query.set_timezone_conversion()
app = create_app()
uvicorn.run(app, host=bk_config.BOOKKEEPER_HOST, port=bk_config.BOOKKEEPER_PORT)

except Exception as e:
logger.error(f"Could not read configuration file: {e}")
logger.info("Going down.")
sys.exit(1)

uvicorn.run(app, host=bk_config.BOOKKEEPER_HOST, port=bk_config.BOOKKEEPER_PORT)


if __name__ == "__main__":
main()
33 changes: 21 additions & 12 deletions bookkeeping/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# Standard python includes
import os
from typing import Any
from typing import Any, Optional
import daiquiri

# Starlette-related includes
Expand All @@ -15,17 +15,26 @@

# Create local logger instance
logger = daiquiri.getLogger("config")


bookkeeper_config = Config((os.getenv("MERCURE_CONFIG_FOLDER") or "/opt/mercure/config") + "/bookkeeper.env")

BOOKKEEPER_PORT = bookkeeper_config("PORT", cast=int, default=8080)
BOOKKEEPER_HOST = bookkeeper_config("HOST", default="0.0.0.0")
DATABASE_URL = bookkeeper_config("DATABASE_URL", default="postgresql://mercure@localhost")
DATABASE_SCHEMA: Any = bookkeeper_config("DATABASE_SCHEMA", default=None)
DEBUG_MODE = bookkeeper_config("DEBUG", cast=bool, default=False)
API_KEY = None

bookkeeper_config: Config
config_filename:str = (os.getenv("MERCURE_CONFIG_FOLDER") or "/opt/mercure/config") + "/bookkeeper.env"
DATABASE_URL: str
BOOKKEEPER_PORT: int
BOOKKEEPER_HOST: str
DATABASE_SCHEMA: Optional[str]
API_KEY: Optional[str]
DEBUG_MODE: bool

def read_bookkeeper_config() -> Config:
global bookkeeper_config, BOOKKEEPER_PORT, BOOKKEEPER_HOST, DATABASE_URL, DATABASE_SCHEMA, DEBUG_MODE, API_KEY
bookkeeper_config = Config(config_filename)

BOOKKEEPER_PORT = bookkeeper_config("PORT", cast=int, default=8080)
BOOKKEEPER_HOST = bookkeeper_config("HOST", default="0.0.0.0")
DATABASE_URL = bookkeeper_config("DATABASE_URL", default="postgresql://mercure@localhost")
DATABASE_SCHEMA = bookkeeper_config("DATABASE_SCHEMA", default=None)
DEBUG_MODE = bookkeeper_config("DEBUG", cast=bool, default=False)
API_KEY = None
return bookkeeper_config

def set_api_key() -> None:
global API_KEY
Expand Down
Loading

0 comments on commit 0d028ca

Please sign in to comment.