-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathqueue_datacite_doi.py
116 lines (89 loc) · 3.85 KB
/
queue_datacite_doi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import argparse
from time import sleep
from time import time
from sqlalchemy import text
from app import db
from app import logger
from recordthresher.datacite import DataCiteRaw
from recordthresher.datacite_doi_record import DataCiteDoiRecord
from util import elapsed
from util import safe_commit
import endpoint # magic
"""
Ingest a DataCite DOI record into the database with this command:
heroku local:run python -- queue_datacite_doi.py --doi 10.5281/zenodo.123456
"""
class QueueDataCiteRecords:
def worker_run(self, **kwargs):
single_id = kwargs.get("doi", None)
chunk_size = kwargs.get("chunk", 100)
limit = kwargs.get("limit", None)
if limit is None:
limit = float("inf")
if single_id:
doi = DataCiteRaw.query.filter(DataCiteRaw.id == single_id).scalar().id
if record := DataCiteDoiRecord.from_doi(DataCiteDoiRecord, doi):
db.session.merge(record)
safe_commit(db) or logger.info("COMMIT fail")
else:
num_updated = 0
while num_updated < limit:
start_time = time()
dois = self.fetch_queue_chunk(chunk_size)
if not dois:
logger.info('no queued datacite works ready. waiting...')
sleep(5)
continue
for doi in dois:
if record := DataCiteDoiRecord.from_doi(DataCiteDoiRecord, doi):
db.session.merge(record)
db.session.execute(
text('''
delete from recordthresher.datacite_doi_record_queue q
using recordthresher.datacite w
where q.doi = w.id
and q.started > w.created_date
and q.doi = any(:dois)
''').bindparams(dois=dois)
)
db.session.execute(
text('''
update recordthresher.datacite_doi_record_queue q
set started = null
where q.doi = any(:dois)
''').bindparams(dois=dois)
)
commit_start_time = time()
safe_commit(db) or logger.info("commit fail")
logger.info(f'commit took {elapsed(commit_start_time, 2)} seconds')
num_updated += chunk_size
logger.info(f'processed {len(dois)} datacite works in {elapsed(start_time, 2)} seconds')
def fetch_queue_chunk(self, chunk_size):
logger.info("looking for new jobs")
queue_query = text("""
with queue_chunk as (
select doi
from recordthresher.datacite_doi_record_queue
where started is null
order by rand
limit :chunk
for update skip locked
)
update recordthresher.datacite_doi_record_queue q
set started = now()
from queue_chunk
where q.doi = queue_chunk.doi
returning q.doi;
""").bindparams(chunk=chunk_size)
job_time = time()
doi_list = [row[0] for row in db.engine.execute(queue_query.execution_options(autocommit=True)).all()]
logger.info(f'got {len(doi_list)} ids, took {elapsed(job_time)} seconds')
return doi_list
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--doi', nargs="?", type=str, help="doi you want to update")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many dois to update")
parser.add_argument('--chunk', "-ch", nargs="?", default=500, type=int, help="how many dois to update at once")
parsed_args = parser.parse_args()
my_queue = QueueDataCiteRecords()
my_queue.worker_run(**vars(parsed_args))