forked from atheistpiece/ultimate-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async.py
81 lines (57 loc) · 2.14 KB
/
async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
from dataclasses import dataclass
from datetime import datetime
from uuid import uuid4
# Module-level constants
_MILLISECOND = .001
_HOUR = 3600
@dataclass
class JobRecord:
"""Job record with useful metadata."""
guid: str
queued_at: datetime
started_at: datetime
def _is_valid_record(record):
"""Check whether job record is valid or not."""
return record.queued_at < record.started_at
def _current_time():
"""Return current time that is timezone-naive."""
return datetime.now()
async def start_job(delay, job_id):
"""Start job_id after a certain delay in seconds."""
queue_time = _current_time()
print(f"{queue_time} -> Queue job {job_id[:16]}...")
await asyncio.sleep(delay)
start_time = _current_time()
print(f"{start_time} -> Start job {job_id[:16]}...")
return JobRecord(job_id, queue_time, start_time)
async def schedule_jobs():
"""Schedule jobs concurrently."""
print(f"{_current_time()} -> Send kickoff email")
# Create a job which also represents a coroutine
single_job = start_job(_MILLISECOND, uuid4().hex)
assert asyncio.iscoroutine(single_job)
# Grab a job record from the coroutine
single_record = await single_job
assert _is_valid_record(single_record)
# Task is a wrapped coroutine which also represents a future
single_task = asyncio.create_task(start_job(_HOUR, uuid4().hex))
assert asyncio.isfuture(single_task)
# Futures are different from coroutines in that they can be cancelled
single_task.cancel()
try:
await single_task
except asyncio.exceptions.CancelledError:
assert single_task.cancelled()
# Gather coroutines for batch start
batch_jobs = [start_job(.01, uuid4().hex) for _ in range(10)]
batch_records = await asyncio.gather(*batch_jobs)
# We get the same amount of records as we have coroutines
assert len(batch_records) == len(batch_jobs)
for batch_record in batch_records:
assert _is_valid_record(batch_record)
print(f"{_current_time()} -> Send confirmation email")
def main():
asyncio.run(schedule_jobs())
if __name__ == "__main__":
main()