Skip to content

Commit

Permalink
✨ Computational backend: connect to resource tracking via RabbitMQ (🗃️,
Browse files Browse the repository at this point in the history
⚠️) (#4570)
  • Loading branch information
sanderegg authored Aug 9, 2023
1 parent 74636e0 commit 59767b4
Show file tree
Hide file tree
Showing 46 changed files with 1,701 additions and 379 deletions.
8 changes: 3 additions & 5 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# files and folders recursively
.codeclimate.yml @sanderegg @pcrespov
.env-* @mrnicegyu11 @YuryHrytsuk
.travis.yml @sanderegg
Makefile @pcrespov @sanderegg


Expand All @@ -21,10 +20,9 @@ Makefile @pcrespov @sanderegg
/packages/service-library/ @pcrespov
/packages/settings-library/ @pcrespov @sanderegg
/requirements/ @pcrespov @matusdrobuliak66
/scripts/json-schema-to-openapi-schema @sanderegg
/services/agent/ @GitHK
/services/api-server/ @pcrespov
/services/autoscaling/ @sanderegg @pcrespov
/services/autoscaling/ @sanderegg
/services/catalog/ @pcrespov @sanderegg
/services/clusters-keeper/ @sanderegg
/services/datcore-adapter/ @sanderegg
Expand All @@ -37,10 +35,10 @@ Makefile @pcrespov @sanderegg
/services/resource-usage-tracker/ @matusdrobuliak66
/services/static-webserver/ @GitHK
/services/static-webserver/client/ @odeimaiz
/services/storage/ @sanderegg @pcrespov
/services/storage/ @sanderegg
/services/web/server/ @pcrespov @sanderegg @GitHK @matusdrobuliak66
/tests/environment-setup/ @pcrespov
/tests/performance/ @pcrespov
/tests/performance/ @pcrespov @sanderegg
/tests/public-api/ @pcrespov
requirements/* @pcrespov
tools/* @pcrespov
4 changes: 4 additions & 0 deletions packages/models-library/src/models_library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class DockerLabelKey(ConstrainedStr):
# good practice: use reverse DNS notation
regex: re.Pattern[str] | None = DOCKER_LABEL_KEY_REGEX

@classmethod
def from_key(cls, key: str) -> "DockerLabelKey":
return cls(key.lower().replace("_", "-"))


class DockerGenericTag(ConstrainedStr):
# NOTE: https://docs.docker.com/engine/reference/commandline/tag/#description
Expand Down
2 changes: 0 additions & 2 deletions packages/models-library/src/models_library/projects_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class RunningState(str, Enum):
PENDING = "PENDING"
WAITING_FOR_RESOURCES = "WAITING_FOR_RESOURCES"
STARTED = "STARTED"
RETRY = "RETRY"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
ABORTED = "ABORTED"
Expand All @@ -34,7 +33,6 @@ def is_running(self) -> bool:
RunningState.PENDING,
RunningState.WAITING_FOR_RESOURCES,
RunningState.STARTED,
RunningState.RETRY,
)


Expand Down
12 changes: 11 additions & 1 deletion packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ class RabbitResourceTrackingBaseMessage(RabbitMessageBase):
service_run_id: str = Field(
..., description="uniquely identitifies the service run"
)
created_at: datetime.datetime = Field(..., description="message creation datetime")
created_at: datetime.datetime = Field(
default_factory=lambda: datetime.datetime.now(datetime.timezone.utc),
description="message creation datetime",
)

def routing_key(self) -> str | None:
return None
Expand Down Expand Up @@ -219,3 +222,10 @@ class RabbitResourceTrackingStoppedMessage(RabbitResourceTrackingBaseMessage):
...,
description=f"{SimcorePlatformStatus.BAD} if simcore failed to run the service properly",
)


RabbitResourceTrackingMessages = (
RabbitResourceTrackingStartedMessage
| RabbitResourceTrackingStoppedMessage
| RabbitResourceTrackingHeartbeatMessage
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""add heartbeat timestamps
Revision ID: 6da4357ce10f
Revises: 9b33ef4c690a
Create Date: 2023-08-07 06:31:14.681513+00:00
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "6da4357ce10f"
down_revision = "9b33ef4c690a"
branch_labels = None
depends_on = None


modified_timestamp_trigger = sa.DDL(
"""
DROP TRIGGER IF EXISTS trigger_auto_update on comp_tasks;
CREATE TRIGGER trigger_auto_update
BEFORE INSERT OR UPDATE ON comp_tasks
FOR EACH ROW EXECUTE PROCEDURE comp_tasks_auto_update_modified();
"""
)

update_modified_timestamp_procedure = sa.DDL(
"""
CREATE OR REPLACE FUNCTION comp_tasks_auto_update_modified()
RETURNS TRIGGER AS $$
BEGIN
NEW.modified := current_timestamp;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
)


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"comp_tasks",
sa.Column("last_heartbeat", sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
"comp_tasks",
sa.Column(
"created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
)
op.add_column(
"comp_tasks",
sa.Column(
"modified",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
)
# ### end Alembic commands ###
op.execute(update_modified_timestamp_procedure)
op.execute(modified_timestamp_trigger)


def downgrade():
op.execute(sa.DDL("DROP TRIGGER IF EXISTS trigger_auto_update on comp_tasks;"))
op.execute(sa.DDL("DROP FUNCTION comp_tasks_auto_update_modified();"))
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("comp_tasks", "modified")
op.drop_column("comp_tasks", "created")
op.drop_column("comp_tasks", "last_heartbeat")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ def upgrade():

conn = op.get_bind()
result = conn.execute(
f"SELECT * FROM pg_enum WHERE enumtypid = (SELECT oid FROM pg_type WHERE typname = '{enum_type_name}') AND enumlabel = '{new_value}'"
sa.DDL(
f"SELECT * FROM pg_enum WHERE enumtypid = (SELECT oid FROM pg_type WHERE typname = '{enum_type_name}') AND enumlabel = '{new_value}'"
)
)
value_exists = result.fetchone() is not None

if not value_exists:
# Step 1: Use ALTER TYPE to add the new value to the existing enum
op.execute(f"ALTER TYPE {enum_type_name} ADD VALUE '{new_value}'")
op.execute(sa.DDL(f"ALTER TYPE {enum_type_name} ADD VALUE '{new_value}'"))


def downgrade():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

from ._common import (
column_created_datetime,
column_modified_datetime,
register_modified_datetime_auto_update_trigger,
)
from .base import metadata
from .comp_pipeline import StateType

Expand Down Expand Up @@ -75,9 +80,17 @@ class NodeClass(enum.Enum):
sa.Column("submit", sa.DateTime, doc="UTC timestamp for task submission"),
sa.Column("start", sa.DateTime, doc="UTC timestamp when task started"),
sa.Column("end", sa.DateTime, doc="UTC timestamp for task completion"),
sa.Column(
"last_heartbeat",
sa.DateTime(timezone=True),
doc="UTC timestamp for last task running check",
),
column_created_datetime(timezone=True),
column_modified_datetime(timezone=True),
sa.UniqueConstraint("project_id", "node_id", name="project_node_uniqueness"),
)

register_modified_datetime_auto_update_trigger(comp_tasks)

DB_PROCEDURE_NAME: str = "notify_comp_tasks_changed"
DB_TRIGGER_NAME: str = f"{DB_PROCEDURE_NAME}_event"
Expand Down
1 change: 1 addition & 0 deletions packages/postgres-database/tests/docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ services:
volumes:
postgres_data:
name: ${POSTGRES_DATA_VOLUME}
external: true
6 changes: 3 additions & 3 deletions packages/postgres-database/tests/test_comp_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def test_listen_query(
db_connection, task, outputs=updated_output, state=StateType.ABORTED
)
tasks = await _assert_notification_queue_status(db_notification_queue, 1)
assert tasks[0]["changes"] == ["outputs", "state"]
assert tasks[0]["changes"] == ["modified", "outputs", "state"]
assert (
tasks[0]["data"]["outputs"] == updated_output
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}"
Expand All @@ -116,7 +116,7 @@ async def test_listen_query(
await _update_comp_task_with(db_connection, task, outputs=updated_output)
await _update_comp_task_with(db_connection, task, outputs=updated_output)
tasks = await _assert_notification_queue_status(db_notification_queue, 1)
assert tasks[0]["changes"] == ["outputs"]
assert tasks[0]["changes"] == ["modified", "outputs"]
assert (
tasks[0]["data"]["outputs"] == updated_output
), f"the data received from the database is {tasks[0]}, expected new output is {updated_output}"
Expand All @@ -132,7 +132,7 @@ async def test_listen_query(
tasks = await _assert_notification_queue_status(db_notification_queue, NUM_CALLS)

for n, output in enumerate(update_outputs):
assert tasks[n]["changes"] == ["outputs"]
assert tasks[n]["changes"] == ["modified", "outputs"]
assert (
tasks[n]["data"]["outputs"] == output
), f"the data received from the database is {tasks[n]}, expected new output is {output}"
2 changes: 2 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ async def subscribe(
Raises:
aio_pika.exceptions.ChannelPreconditionFailed: In case an existing exchange with different type is used
Returns:
queue name
"""

assert self._channel_pool # nosec
Expand Down
8 changes: 6 additions & 2 deletions packages/service-library/tests/rabbitmq/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ async def _assert_message_received(
reraise=True,
):
with attempt:
# NOTE: this sleep is here to ensure that there are not multiple messages coming in
await asyncio.sleep(1)
print(
f"--> waiting for rabbitmq message [{attempt.retry_state.attempt_number}, {attempt.retry_state.idle_for}]"
)
assert mocked_message_parser.call_count == expected_call_count
if expected_call_count == 1:
assert expected_message
Expand All @@ -112,6 +113,9 @@ async def _assert_message_received(
else:
assert expected_message
mocked_message_parser.assert_any_call(expected_message.message.encode())
print(
f"<-- rabbitmq message received after [{attempt.retry_state.attempt_number}, {attempt.retry_state.idle_for}]"
)


async def test_rabbit_client_pub_sub_message_is_lost_if_no_consumer_present(
Expand Down
Loading

0 comments on commit 59767b4

Please sign in to comment.