-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcrawler.py
149 lines (121 loc) · 4.48 KB
/
crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
This script iterates over a specified collection of nxml articles and extracts a
specified set of data from them.
You can specify the url for the Mongo server, as well as the name of the
database and collection.
You *must* also specify one or more of the extractor functions from the
pubcrawler.extractors module. But specify them by just their name; this package
adds the correct suffix automatically. This should be fixed in a later version,
but it was the only good way to allow an argument from the command line.
You can also specify a -skip_field. You don't have to do this, but it's best to,
because this is what's used to report progress (because of ugly multiprocess
stuff, and because python's Queue.qsize() method is not implemented on macOS).
You can also specify a limit, as well as the number of worker processes you
want.
"""
import multiprocessing as mp
import time
import sys
import pymongo
import pubcrawler.extractors as ex
def chunk_slices(length, by):
items = list(range(0, length + 1, by))
if length % by != 0:
items.append(length)
slices = [slice(items[i], items[i+1]) for i in range(0, len(items)-1)]
return(slices)
def worker(url, db, collection, to_extract, query, index_queue):
articles = pymongo.MongoClient(url)[db][collection]
for i in iter(index_queue.get, 'STOP'):
try:
article = articles.find_one(i)
to_write = ex.combine_extracted_info(article, to_extract)
articles.update_one(i, {'$set': to_write})
except Exception as e:
print("Extraction error: {}".format(e))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"-u", "--mongo_url", default="localhost", dest="u"
)
parser.add_argument(
"-d", "--mongo_db", default="pmc", dest="d"
)
parser.add_argument(
"-c", "--mongo_collection", default="articlesubset", dest="c"
)
parser.add_argument(
"-x", "--extract", action="append", default=None, dest="x"
)
parser.add_argument(
"-s", "--skip_field", default=None, dest="s"
)
parser.add_argument(
"-w", "--workers", default=4, dest="w"
)
parser.add_argument(
"-l", "--limit", default=None, dest="l"
)
parser.add_argument(
"-b", "--batch_size", default=10000, dest="b"
)
args = parser.parse_args()
print(args)
if args.x is not None:
extractor_funs = [eval(x) for x in ['ex.' + x for x in args.x]]
else:
print("Please specify at least one extractor function", file=sys.stderr)
sys.exit(1)
if args.s is not None:
query = {args.s: {'$exists': False}}
else:
query = {}
batch_size = int(args.b)
num_workers = int(args.w)
print("Making connection.")
articles = pymongo.MongoClient(args.u)[args.d][args.c]
print("Counting...")
remaining_articles = articles.count(query)
while remaining_articles > 0:
print("Remaining articles: {}. Fetching next {}...".format(remaining_articles, batch_size))
cursor = articles.find(query, ["_id"], limit=batch_size)
# print("Enqueueing...")
# t1 = time.time()
queue = mp.Queue()
for i in cursor:
queue.put(i)
for w in range(num_workers):
queue.put('STOP')
# print("Queue construction time: {} seconds.".format(time.time()-t1))
worker_args = (
args.u,
args.d,
args.c,
extractor_funs,
query,
queue,
)
workers = [mp.Process(target=worker, args=worker_args) for w in range(num_workers)]
for w in workers:
w.start()
print("Workers started...")
for w in workers:
w.join()
print("Workers finished. Counting...")
remaining_articles = articles.count(query)
print("Finished.")
# # Chunking, which we don't do any more.
# queue = mp.Queue()
# for i in chunk_slices(num_to_annotate, by = 100):
# queue.put(i)
# for w in range(num_workers):
# queue.put('STOP')
# while not queue.empty():
# print("Still going...")
# # total_for_query_now = articles.count(query)
# # done = total_for_query - total_for_query_now
# # left = num_to_annotate - done
# # print("Annotated {} out of {} articles ({:.2%}). {} remaining.".format(done,
# # num_to_annotate, done / num_to_annotate, left))
# time.sleep(5)