-
Notifications
You must be signed in to change notification settings - Fork 1
/
wordcount.py
81 lines (62 loc) · 2.79 KB
/
wordcount.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import time
import requests
import re
from functools import partial
from itertools import islice
from streamexecutors import StreamThreadPoolExecutor
try:
from streamexecutors import secrets
auth = secrets.GITHUB_AUTHKEY
except ImportError:
auth = None
# TODO: add test to show how many items were processed by each stage
# TODO: change get_urls to an infinite stream
def wordcount_pipeline():
ex = StreamThreadPoolExecutor()
# note: without authentication, API rate limit is 60 requests per hour
def get_urls():
print(' downloading recently updated repos')
response = requests.get('https://api.github.com/events', auth=auth)
if response.status_code != 200 or type(response.json()) != list:
raise RuntimeError('Github API request failed')
for event in response.json():
yield 'http://github.com/' + event['repo']['name']
def download(url):
print(' downloading', url)
return requests.get(url)
def count_word(word, response):
'''Count number of occurrences of word in the page.
Return:
Dictionary with the url of the page and the number of word occurrences.
'''
print(' counting words in', response.url)
return {'url': response.url, 'count': response.text.count(word)}
def upload(json_obj):
'''Upload data to httpbin.org.
Return:
The copy of the post form data as received back from the server.
'''
print(' uploading', json_obj['url'])
response = requests.post('http://httpbin.org/post', data=json_obj)
return response.json()['form']
# All ex.map calls are non-blocking.
# Pause downloading when there are 2 downloaded pages waiting to be searched.
pages = ex.map(download, get_urls(), buffer_size=1)
# Pause searching when there are 2 results waiting to be uploaded.
counts = ex.map(partial(count_word, 'python'), pages, buffer_size=1)
# Pause uploading when there are 2 responses waiting to be iterated through.
upload_results = ex.map(upload, counts, buffer_size=1)
# Processing continues in the background until buffer_size limits are reached.
print('main thread busy')
time.sleep(5)
# islice is lazy - nothing is consumed until its results are needed.
first2 = islice(upload_results, 2)
print('main thread iterates through results')
# Greedily consume results. If sleep was long enough to fill the buffer, it won't block.
result = list(first2)
# As we consume the results, the processing will immediately continue.
print(result)
# As pipeline objects go out of scope, executor will cancel all pending tasks
# and wait for tasks progress to complete. This delays interpreter exit.
if __name__ == '__main__':
wordcount_pipeline()