Skip to content

Commit

Permalink
Removes migration logic (#8255)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Jan 25, 2023
1 parent ee49d48 commit 04fe311
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 552 deletions.
2 changes: 1 addition & 1 deletion src/prefect/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from prefect.infrastructure import Infrastructure, Process
from prefect.logging.loggers import flow_run_logger
from prefect.orion import schemas
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.states import Scheduled
from prefect.tasks import Task
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
Expand Down
11 changes: 1 addition & 10 deletions src/prefect/orion/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.exceptions import MissingVariableError, ObjectNotFoundError
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS
Expand Down Expand Up @@ -85,15 +85,6 @@ async def create_deployment(
session=session,
work_pool_name=deployment.work_pool_name,
)
elif deployment.work_queue_name:
# If just a work queue name was provided, we assume this deployment is using
# an agent and create a queue in the default agents work pool. This is a
# legacy case and can be removed once agents are removed.
_, work_pool_queue = await models.work_queues._ensure_work_queue_exists(
session=session, name=deployment.work_queue_name, db=db
)
if work_pool_queue:
deployment_dict["work_pool_queue_id"] = work_pool_queue.id

deployment = schemas.core.Deployment(**deployment_dict)
# check to see if relevant blocks exist, allowing us throw a useful error message
Expand Down
36 changes: 0 additions & 36 deletions src/prefect/orion/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from prefect.orion.utilities.server import method_paths_from_routes
from prefect.settings import (
PREFECT_DEBUG_MODE,
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS,
PREFECT_MEMO_STORE_PATH,
PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION,
PREFECT_ORION_DATABASE_CONNECTION_URL,
Expand Down Expand Up @@ -366,40 +365,6 @@ async def add_block_types():
except Exception as exc:
logger.warn(f"Error occurred during block auto-registration: {exc!r}")

async def migrate_work_queues():
"""Duplicates work queues to work pool queues"""
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.models.configuration import (
read_configuration,
write_configuration,
)
from prefect.orion.models.workers_migration import migrate_all_work_queues
from prefect.orion.schemas.core import Configuration

db = provide_database_interface()
session = await db.session()

async with session.begin():
migration_status_configuration = await read_configuration(
session=session, key="WORK_POOL_QUEUE_MIGRATION", db=db
)
has_run = (
migration_status_configuration.value.get("has_run", False)
if migration_status_configuration is not None
else False
)
if not has_run:
await migrate_all_work_queues(session=session, db=db)
await write_configuration(
session=session,
configuration=Configuration(
key="WORK_POOL_QUEUE_MIGRATION",
value={"has_run": True},
),
db=db,
)

async def start_services():
"""Start additional services when the Orion API starts up."""

Expand Down Expand Up @@ -470,7 +435,6 @@ def on_service_exit(service, task):
run_migrations,
add_block_types,
start_services,
migrate_work_queues,
],
on_shutdown=[stop_services],
)
Expand Down
33 changes: 10 additions & 23 deletions src/prefect/orion/api/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import prefect.orion.schemas as schemas
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS
Expand Down Expand Up @@ -104,28 +103,16 @@ async def _get_work_pool_queue_id_from_name(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Work pool queue '{work_pool_name}/{work_pool_queue_name}' not found.",
)
if work_pool_name == DEFAULT_AGENT_WORK_POOL_NAME:
work_pool = await models.workers_migration.get_or_create_default_agent_work_pool(
session=session
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool.id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)
else:
work_pool_id = await self._get_work_pool_id_from_name(
session=session, work_pool_name=work_pool_name
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool_id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)
work_pool_id = await self._get_work_pool_id_from_name(
session=session, work_pool_name=work_pool_name
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool_id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)

return work_pool_queue.id

Expand Down
1 change: 0 additions & 1 deletion src/prefect/orion/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
saved_searches,
logs,
workers,
workers_migration,
work_queues,
agents,
)
15 changes: 6 additions & 9 deletions src/prefect/orion/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,6 @@ async def update_deployment(
session=session,
work_pool_name=deployment.work_pool_name,
)
elif deployment.work_queue_name:
# If just a work queue name was provided, we assume this deployment is using
# an agent and create a queue in the default agents work pool. This is a
# legacy case and can be removed once agents are removed.
_, work_pool_queue = await models.work_queues._ensure_work_queue_exists(
session=session, name=update_data["work_queue_name"], db=db
)
if work_pool_queue:
update_data["work_pool_queue_id"] = work_pool_queue.id

update_stmt = (
sa.update(db.Deployment)
Expand All @@ -190,6 +181,12 @@ async def update_deployment(
session=session, deployment_id=deployment_id, db=db, auto_scheduled_only=True
)

# create work queue if it doesn't exist
if update_data.get("work_queue_name"):
await models.work_queues._ensure_work_queue_exists(
session=session, name=update_data["work_queue_name"], db=db
)

return result.rowcount > 0


Expand Down
13 changes: 0 additions & 13 deletions src/prefect/orion/models/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from prefect.orion.schemas.responses import OrchestrationResult, SetStateStatus
from prefect.orion.schemas.states import State
from prefect.orion.utilities.schemas import PrefectBaseModel
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


@inject_db
Expand Down Expand Up @@ -65,18 +64,6 @@ async def create_flow_run(
created=now,
)

# If the flow run has a work queue name but no worker pool queue id, migrate it
# This is unusual and would only come from legacy internal systems.
if (
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value()
and flow_run_dict.get("work_queue_name")
and not flow_run_dict.get("work_pool_queue_id")
):
work_pool_queue = await models.workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_name=flow_run_dict["work_queue_name"]
)
flow_run_dict["work_pool_queue_id"] = work_pool_queue.id

# if no idempotency key was provided, create the run directly
if not flow_run.idempotency_key:
model = db.FlowRun(**flow_run_dict)
Expand Down
74 changes: 9 additions & 65 deletions src/prefect/orion/models/work_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
from prefect.orion.database.dependencies import inject_db
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.exceptions import ObjectNotFoundError
from prefect.orion.models import workers_migration
from prefect.orion.schemas.states import StateType
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


@inject_db
Expand All @@ -45,11 +43,6 @@ async def create_work_queue(
session.add(model)
await session.flush()

if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value() and model.filter is None:
await workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_id=model.id
)

return model


Expand Down Expand Up @@ -153,20 +146,6 @@ async def update_work_queue(
)
result = await session.execute(update_stmt)

# update the related work pool queue
wpq_updates = {
k: v for k, v in update_data.items() if k in ("is_paused", "concurrency_limit")
}
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value() and wpq_updates:
wpq = await workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_id=work_queue_id
)
await models.workers.update_work_pool_queue(
session=session,
work_pool_queue_id=wpq.id,
work_pool_queue=schemas.actions.WorkPoolQueueUpdate(**wpq_updates),
)

return result.rowcount > 0


Expand All @@ -184,19 +163,6 @@ async def delete_work_queue(
Returns:
bool: whether or not the WorkQueue was deleted
"""
# delete the related work pool queue
try:
wpq = await workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_id=work_queue_id
)
await models.workers.delete_work_pool_queue(
session=session, work_pool_queue_id=wpq.id
)
except ObjectNotFoundError:
# The work queue doesn't exist so there isn't a corresponding work pool
# queue to delete.
pass

result = await session.execute(
delete(db.WorkQueue).where(db.WorkQueue.id == work_queue_id)
)
Expand Down Expand Up @@ -229,28 +195,14 @@ async def get_runs_in_work_queue(
raise ObjectNotFoundError(f"Work queue with id {work_queue_id} not found.")

if work_queue.filter is None:
# If workers are enabled, ensure that a corresponding work pool queue exists
# and retrieve runs from that work pool queue.
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
worker_flow_run_response = (
await workers_migration.get_runs_from_work_pool_queue(
session=session,
work_queue_id=work_queue_id,
scheduled_before=scheduled_before,
limit=limit,
)
)
return [r.flow_run for r in worker_flow_run_response]
# Otherwise get runs from the legacy work queue.
else:
query = db.queries.get_scheduled_flow_runs_from_work_queues(
db=db,
limit_per_queue=limit,
work_queue_ids=[work_queue_id],
scheduled_before=scheduled_before,
)
result = await session.execute(query)
return result.scalars().unique().all()
query = db.queries.get_scheduled_flow_runs_from_work_queues(
db=db,
limit_per_queue=limit,
work_queue_ids=[work_queue_id],
scheduled_before=scheduled_before,
)
result = await session.execute(query)
return result.scalars().unique().all()

# if the work queue has a filter, it's a deprecated tag-based work queue
# and uses an old approach
Expand Down Expand Up @@ -357,15 +309,7 @@ async def _ensure_work_queue_exists(
work_queue=schemas.core.WorkQueue(name=name),
)

# Ensure the corresponding work pool queue exists
work_pool_queue = None
# Filter-based work queues cannot be migrated
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value() and work_queue.filter is None:
work_pool_queue = await models.workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_name=name
)

return work_queue, work_pool_queue
return work_queue


@inject_db
Expand Down
6 changes: 4 additions & 2 deletions src/prefect/orion/models/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.database.orm_models import ORMWorker, ORMWorkPool, ORMWorkPoolQueue

DEFAULT_AGENT_WORK_POOL_NAME = "default-agent-pool"

# -----------------------------------------------------
# --
# --
# -- Worker Pools
# -- Work Pools
# --
# --
# -----------------------------------------------------
Expand Down Expand Up @@ -232,7 +234,7 @@ async def get_scheduled_flow_runs(
# -----------------------------------------------------
# --
# --
# -- work pool queues
# -- Work Pool Queues
# --
# --
# -----------------------------------------------------
Expand Down
Loading

0 comments on commit 04fe311

Please sign in to comment.