Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Job Store > Job Manager #335

Merged
merged 4 commits into from
Oct 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions docs/glossary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ Let's go through a few words and their meaning.
App
Application
This is meant to be the main entry point of Procrastinate. The app knows
all the tasks of your project, and thanks to the job store, it knows how
all the tasks of your project, and thanks to the job manager, it knows how
to launch jobs to execute your tasks (see `App`).

Job Store
The job store responsibility is to store and retrieve jobs. In Procrastinate, the
job store will store your jobs in your PostgreSQL database.
Job Manager
The job manager responsibility is to operate on jobs in the database. This
includes both read and write operations.

Schema
The schema designates all the tables, relations, indexes, procedures, etc. in the
database. Applying the schema means installing all those objects in the database.
An evolution in the schema (modifying the table structure, or the procedures) is
called a migration.

This term is not to be confused with that of PostgreSQL. In PostgreSQL a database
contains one or more *schemas*, which in turn contains tables. Schemas in PostgreSQL
are namespaces for objects of the database. See the `PostgreSQL Schema
documentation`_ for more detail.
The schema designates all the tables, relations, indexes, procedures, etc. in
the database. Applying the schema means installing all those objects in the
database. An evolution in the schema (modifying the table structure, or the
procedures) is called a migration.

This term is not to be confused with that of PostgreSQL. In PostgreSQL a
database contains one or more *schemas*, which in turn contains tables. Schemas
in PostgreSQL are namespaces for objects of the database. See the `PostgreSQL
Schema documentation`_ for more detail.

Lock
When configuring a job using `Task.configure` you can attach a lock to the job.
Expand Down
2 changes: 1 addition & 1 deletion docs/howto/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ To use it, you can do::
# See the jobs created:
print(app.connector.jobs)

# Reset the store between tests:
# Reset the "in-memory pseudo-database" between tests:
app.connector.reset()
10 changes: 5 additions & 5 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from procrastinate import admin
from procrastinate import connector as connector_module
from procrastinate import exceptions, healthchecks, jobs
from procrastinate import exceptions, healthchecks, jobs, manager
from procrastinate import retry as retry_module
from procrastinate import schema, store, utils
from procrastinate import schema, utils

if TYPE_CHECKING:
from procrastinate import tasks, worker
Expand Down Expand Up @@ -97,9 +97,9 @@ def __init__(
self.worker_defaults = worker_defaults or {}
periodic_defaults = periodic_defaults or {}

self.job_store = store.JobStore(connector=self.connector)
self.job_manager = manager.JobManager(connector=self.connector)
self.periodic_deferrer = periodic.PeriodicDeferrer(
job_store=self.job_store, **periodic_defaults
job_manager=self.job_manager, **periodic_defaults
)

self._register_builtin_tasks()
Expand Down Expand Up @@ -249,7 +249,7 @@ def configure_task(
except KeyError as exc:
if allow_unknown:
return tasks.configure_task(
name=name, job_store=self.job_store, **kwargs
name=name, job_manager=self.job_manager, **kwargs
)
raise exceptions.TaskNotFound from exc

Expand Down
2 changes: 1 addition & 1 deletion procrastinate/builtin_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def remove_old_jobs(
failed jobs will also be deleted.
"""
assert context.app
await context.app.job_store.delete_old_jobs(
await context.app.job_manager.delete_old_jobs(
nb_hours=max_hours, queue=queue, include_error=remove_error
)

Expand Down
16 changes: 8 additions & 8 deletions procrastinate/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from procrastinate import types

if TYPE_CHECKING:
from procrastinate import store # noqa
from procrastinate import manager # noqa

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,14 +105,14 @@ def call_string(self):

class JobDeferrer:
"""
The main purpose of ``JobDeferrer`` is to get a hold of the job_store and the job,
so that we can call ``defer`` without having to specify the job_store, and the job
doesn't need a job_store property.
The main purpose of ``JobDeferrer`` is to get a hold of the job_manager and the job,
so that we can call ``defer`` without having to specify the job_manager, and the job
doesn't need a job_manager property.
"""

def __init__(self, job_store: "store.JobStore", job: Job):
def __init__(self, job_manager: "manager.JobManager", job: Job):
self.job = job
self.job_store = job_store
self.job_manager = job_manager

def make_new_job(self, **task_kwargs: types.JSONValue) -> Job:
final_kwargs = self.job.task_kwargs.copy()
Expand Down Expand Up @@ -140,14 +140,14 @@ async def defer_async(self, **task_kwargs: types.JSONValue) -> int:
# Make sure this code stays synchronized with .defer()
job = self.make_new_job(**task_kwargs)
self._log_before_defer_job(job=job)
id = await self.job_store.defer_job_async(job=job)
id = await self.job_manager.defer_job_async(job=job)
self._log_after_defer_job(job=job.evolve(id=id))
return id

def defer(self, **task_kwargs: types.JSONValue) -> int:
# Make sure this code stays synchronized with .defer_async()
job = self.make_new_job(**task_kwargs)
self._log_before_defer_job(job=job)
id = self.job_store.defer_job(job=job)
id = self.job_manager.defer_job(job=job)
self._log_after_defer_job(job=job.evolve(id=id))
return id
2 changes: 1 addition & 1 deletion procrastinate/store.py → procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def get_channel_for_queues(queues: Optional[Iterable[str]] = None) -> Iterable[s
return ["procrastinate_queue#" + queue for queue in queues]


class JobStore:
class JobManager:
def __init__(self, connector: connector.BaseConnector):
self.connector = connector

Expand Down
8 changes: 4 additions & 4 deletions procrastinate/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import attr
import croniter

from procrastinate import exceptions, store, tasks
from procrastinate import exceptions, manager, tasks

# The maximum delay after which tasks will be considered as
# outdated, and ignored.
Expand All @@ -32,9 +32,9 @@ def croniter(self) -> croniter.croniter:


class PeriodicDeferrer:
def __init__(self, job_store: store.JobStore, max_delay: float = MAX_DELAY):
def __init__(self, job_manager: manager.JobManager, max_delay: float = MAX_DELAY):
self.periodic_tasks: List[PeriodicTask] = []
self.job_store = job_store
self.job_manager = job_manager
# {task_name: defer_timestamp}
self.last_defers: Dict[str, int] = {}
self.max_delay = max_delay
Expand Down Expand Up @@ -152,7 +152,7 @@ async def defer_jobs(self, jobs_to_defer: Iterable[TaskAtTime]) -> None:
"""
for task, timestamp in jobs_to_defer:
try:
job_id = await self.job_store.defer_periodic_job(
job_id = await self.job_manager.defer_periodic_job(
task=task, defer_timestamp=timestamp
)
except exceptions.AlreadyEnqueued:
Expand Down
10 changes: 5 additions & 5 deletions procrastinate/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import logging
from typing import Any, Callable, Dict, List, Optional

from procrastinate import app, exceptions, jobs
from procrastinate import app, exceptions, jobs, manager
from procrastinate import retry as retry_module
from procrastinate import store, types, utils
from procrastinate import types, utils

logger = logging.getLogger(__name__)

Expand All @@ -21,7 +21,7 @@ def load_task(path: str) -> "Task":
def configure_task(
*,
name: str,
job_store: store.JobStore,
job_manager: manager.JobManager,
lock: Optional[str] = None,
queueing_lock: Optional[str] = None,
task_kwargs: Optional[types.JSONDict] = None,
Expand All @@ -46,7 +46,7 @@ def configure_task(
task_kwargs=task_kwargs,
scheduled_at=schedule_at,
),
job_store=job_store,
job_manager=job_manager,
)


Expand Down Expand Up @@ -178,7 +178,7 @@ def configure(
"""
return configure_task(
name=self.name,
job_store=self.app.job_store,
job_manager=self.app.job_manager,
lock=lock if lock is not None else self.lock,
queueing_lock=(
queueing_lock if queueing_lock is not None else self.queueing_lock
Expand Down
3 changes: 2 additions & 1 deletion procrastinate/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self):

def reset(self):
"""
Removes anything the store contains, to ensure test independence.
Removes anything the in-memory pseudo-database contains, to ensure test
independence.
"""
self.jobs: Dict[int, JobRow] = {}
self.events: Dict[int, List[EventRow]] = {}
Expand Down
8 changes: 4 additions & 4 deletions procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(

# Handling the info about the currently running task.
self.known_missing_tasks: Set[str] = set()
self.job_store = self.app.job_store
self.job_manager = self.app.job_manager

if name:
self.logger = logger.getChild(name)
Expand Down Expand Up @@ -74,7 +74,7 @@ def context_for_worker(
def listener(self):
assert self.notify_event
return utils.task_context(
awaitable=self.job_store.listen_for_jobs(
awaitable=self.job_manager.listen_for_jobs(
event=self.notify_event, queues=self.queues
),
name="listener",
Expand Down Expand Up @@ -119,7 +119,7 @@ async def run(self) -> None:
async def single_worker(self, worker_id: int):
current_timeout = self.timeout * (worker_id + 1)
while not self.stop_requested:
job = await self.job_store.fetch_job(self.queues)
job = await self.job_manager.fetch_job(self.queues)
if job:
await self.process_job(job=job, worker_id=worker_id)
else:
Expand Down Expand Up @@ -168,7 +168,7 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None:
extra=context.log_extra(action="task_not_found", exception=str(exc)),
)
finally:
await self.job_store.finish_job(
await self.job_manager.finish_job(
job=job, status=status, scheduled_at=next_attempt_scheduled_at
)

Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ def app(not_opened_app):


@pytest.fixture
def job_store(app):
return app.job_store
def job_manager(app):
return app.job_manager


@pytest.fixture
Expand Down
20 changes: 10 additions & 10 deletions tests/integration/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest

from procrastinate import admin as admin_module
from procrastinate import jobs, store
from procrastinate import jobs, manager

pytestmark = pytest.mark.asyncio

Expand All @@ -13,20 +13,20 @@ def admin(aiopg_connector):


@pytest.fixture
def pg_job_store(aiopg_connector):
return store.JobStore(connector=aiopg_connector)
def pg_job_manager(aiopg_connector):
return manager.JobManager(connector=aiopg_connector)


@pytest.fixture
async def fixture_jobs(pg_job_store, job_factory):
async def fixture_jobs(pg_job_manager, job_factory):
j1 = job_factory(
queue="q1",
lock="lock1",
queueing_lock="queueing_lock1",
task_name="task_foo",
task_kwargs={"key": "a"},
)
j1 = attr.evolve(j1, id=await pg_job_store.defer_job_async(j1))
j1 = attr.evolve(j1, id=await pg_job_manager.defer_job_async(j1))

j2 = job_factory(
queue="q1",
Expand All @@ -35,8 +35,8 @@ async def fixture_jobs(pg_job_store, job_factory):
task_name="task_bar",
task_kwargs={"key": "b"},
)
j2 = attr.evolve(j2, id=await pg_job_store.defer_job_async(j2))
await pg_job_store.finish_job(j2, jobs.Status.FAILED)
j2 = attr.evolve(j2, id=await pg_job_manager.defer_job_async(j2))
await pg_job_manager.finish_job(j2, jobs.Status.FAILED)

j3 = job_factory(
queue="q2",
Expand All @@ -45,13 +45,13 @@ async def fixture_jobs(pg_job_store, job_factory):
task_name="task_foo",
task_kwargs={"key": "c"},
)
j3 = attr.evolve(j3, id=await pg_job_store.defer_job_async(j3))
await pg_job_store.finish_job(j3, jobs.Status.SUCCEEDED)
j3 = attr.evolve(j3, id=await pg_job_manager.defer_job_async(j3))
await pg_job_manager.finish_job(j3, jobs.Status.SUCCEEDED)

return [j1, j2, j3]


async def test_list_jobs_dict(fixture_jobs, admin, pg_job_store):
async def test_list_jobs_dict(fixture_jobs, admin, pg_job_manager):
j1, *_ = fixture_jobs
assert (await admin.list_jobs_async())[0] == {
"id": j1.id,
Expand Down
Loading