Skip to content

Commit

Permalink
Add tests for the json dumps and loads functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Éric Lemoine committed Jan 24, 2020
1 parent 00646ec commit 6d07b0e
Showing 1 changed file with 97 additions and 1 deletion.
98 changes: 97 additions & 1 deletion tests/integration/test_pg_store.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import decimal
import functools
import json
import random
import string

import pendulum
import pytest

from procrastinate import aiopg_connector, jobs

pytestmark = pytest.mark.asyncio
Expand Down Expand Up @@ -50,6 +52,64 @@ async def test_fetch_job(pg_job_store, job):
assert await pg_job_store.fetch_job(queues=["queue_a"], loads=None) == job


@pytest.mark.parametrize(
"job",
[
jobs.Job(
id=2,
queue="queue_a",
task_name="task_2",
lock="lock_2",
task_kwargs={"c": "d", "dt": datetime.datetime(2020, 1, 24, 14, 39, 12)},
),
jobs.Job(
id=2,
queue="queue_a",
task_name="task_3",
lock="lock_3",
task_kwargs={"i": "j", "dt": datetime.datetime(2020, 1, 24, 14, 39, 13)},
scheduled_at=pendulum.datetime(2000, 1, 1),
),
],
)
async def test_fetch_job_with_loads_function(pg_job_store, job):
# Function used for encoding datetime objects
def encode(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
raise TypeError()

dumps = functools.partial(json.dumps, default=encode)

def decode(dct):
if "dt" in dct:
dct["dt"] = datetime.datetime.fromisoformat(dct["dt"])
return dct

loads = functools.partial(json.loads, object_hook=decode)

# Add a first started job
await pg_job_store.defer_job(
jobs.Job(
id=1,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b", "dt": datetime.datetime(2020, 1, 24, 14, 39, 11)},
),
dumps=dumps,
)
# close the connection for fetch_job to create a new one with our "loads" function
await pg_job_store.close_connection()
await pg_job_store.fetch_job(queues=None, loads=loads)

# Now add the job we're testing
await pg_job_store.defer_job(job, dumps=dumps)
# close the connection for fetch_job to create a new one with our "loads" function
await pg_job_store.close_connection()
assert await pg_job_store.fetch_job(queues=["queue_a"], loads=loads) == job


@pytest.mark.parametrize(
"job",
[
Expand Down Expand Up @@ -400,6 +460,42 @@ async def test_defer_job(pg_job_store, get_all):
]


async def test_defer_job_with_dumps_function(pg_job_store, get_all):
queue = "marsupilami"
dt = datetime.datetime(2020, 1, 24, 14, 39, 10)
de = decimal.Decimal("2.45")
job = jobs.Job(
id=0,
queue=queue,
task_name="bob",
lock="sher",
task_kwargs={"a": 1, "b": 2, "c": dt, "d": de},
)

def encode(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, decimal.Decimal):
return str(obj)
raise TypeError()

dumps = functools.partial(json.dumps, default=encode)
pk = await pg_job_store.defer_job(job=job, dumps=dumps)

result = await get_all(
"procrastinate_jobs", "id", "args", "status", "lock", "task_name"
)
assert result == [
{
"id": pk,
"args": {"a": 1, "b": 2, "c": "2020-01-24T14:39:10", "d": "2.45"},
"status": "todo",
"lock": "sher",
"task_name": "bob",
}
]


async def test_execute_query(pg_job_store):
await pg_job_store.execute_query(
"COMMENT ON TABLE \"procrastinate_jobs\" IS 'foo' "
Expand Down

0 comments on commit 6d07b0e

Please sign in to comment.