Skip to content

Commit

Permalink
Merge pull request #51 from eddvrs/master
Browse files Browse the repository at this point in the history
Pushshift API updates
  • Loading branch information
mattpodolak authored Dec 21, 2022
2 parents 540a67f + 3d9f0da commit f1156cd
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 35 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ pmaw.code-workspace
pmaw.egg-info

/**/__pycache__
/**/cache
/**/cache
.idea/
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ A user-defined function can be provided using the `filter_fn` parameter for eith

## Unsupported Parameters

- `sort='asc'` is unsupported as it can have unexpected results
- `before` and `after` only support epoch time (float or int)
- `order='asc'` is unsupported as it can have unexpected results
- `until` and `since` only support epoch time (float or int)
- `aggs` are unsupported, as **PMAW** is intended to be used for collecting large numbers of submissions or comments. Use [PSAW](https://github.com/dmarx/psaw) for aggregation requests.

### Feature Requests
Expand Down Expand Up @@ -180,7 +180,7 @@ A user-defined function can be provided using the `filter_fn` parameter for eith

### Keyword Arguments

- Unlike the Pushshift API, the `before` and `after` keyword arguments must be in epoch time
- Unlike the Pushshift API, the `until` and `since` keyword arguments must be in epoch time
- `limit` is the number of submissions/comments to return. If set to `None` or if the set `limit` is higher than the number of available submissions/comments for the provided parameters then `limit` will be set to the amount available.
- Other accepted parameters are covered in the Pushshift documentation for [submissions](https://github.com/pushshift/api#searching-submissions) and [comments](https://github.com/pushshift/api#searching-comments).

Expand Down Expand Up @@ -287,7 +287,7 @@ reddit = praw.Reddit(
)

api_praw = PushshiftAPI(praw=reddit)
comments = api_praw.search_comments(q="quantum", subreddit="science", limit=100, before=1629990795)
comments = api_praw.search_comments(q="quantum", subreddit="science", limit=100, until=1629990795)
```

## Custom Filter
Expand Down Expand Up @@ -339,11 +339,11 @@ If you expect that your query may be interrupted while its running, setting `saf
from pmaw import PushshiftAPI

api = PushshiftAPI()
posts = api.search_submissions(subreddit="science", limit=700000, before=1613234822, safe_exit=True)
posts = api.search_submissions(subreddit="science", limit=700000, until=1613234822, safe_exit=True)
print(f'{len(posts)} posts retrieved from Pushshift')
```

A `before` value is required to load previous responses / requests when using non-id search, as `before` is set to the current time when the `search` method is called, which would result in a different set of parameters then when you last ran the search despite all other parameters being the same.
A `until` value is required to load previous responses / requests when using non-id search, as `until` is set to the current time when the `search` method is called, which would result in a different set of parameters then when you last ran the search despite all other parameters being the same.

### Loading Cache with Key

Expand Down
32 changes: 22 additions & 10 deletions pmaw/PushshiftAPIBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,14 @@ def _get(self, url, payload={}):

@property
def shards_are_down(self):
shards = self.metadata_.get('shards')
try:
shards = self.metadata_['es'].get('_shards')
except KeyError:
return True

if shards is None:
return
return True

return shards['successful'] != shards['total']

def _multithread(self, check_total=False):
Expand Down Expand Up @@ -146,13 +151,13 @@ def _futures_handler(self, futures, check_total):
break

# handle time slicing logic
if 'before' in payload and 'after' in payload:
before = payload['before']
after = payload['after']
if 'until' in payload and 'since' in payload:
until = payload['until']
since = payload['since']
log.debug(
f"Time slice from {after} - {before} returned {len(data)} results")
f"Time slice from {since} - {until} returned {len(data)} results")
total_results = self.resp_dict.get(
(after, before), 0)
(since, until), 0)
log.debug(
f'{total_results} total results for this time slice')
# calculate remaining results
Expand All @@ -171,10 +176,10 @@ def _futures_handler(self, futures, check_total):
# Fix issue where Pushshift occasionally reports remaining results that it is
# unable to return - len(data) == 0 when this happens
if len(data) > 0:
before = data[-1]['created_utc']
until = data[-1]['created_utc']
# generate payloads
self.req.gen_slices(
url, payload, after, before, num)
url, payload, since, until, num)

except HTTPError as exc:
log.debug(f"Request Failed -- {exc}")
Expand Down Expand Up @@ -249,7 +254,14 @@ def _search(self,
# check to see how many results are remaining
self.req.req_list.appendleft((url, self.req.payload))
self._multithread(check_total=True)
total_avail = self.metadata_.get('total_results', 0)
if len(self.metadata_) != 0:
try:
total_avail = self.metadata_['es']['hits']['total']['value']
except KeyError:
log.info(f'Result(s) in Pushshift undetermined')
total_avail = 0 # ¯\_(ツ)_/¯
else:
total_avail = 0 # ¯\_(ツ)_/¯

if self.req.limit is None:
log.info(f'{total_avail} result(s) available in Pushshift')
Expand Down
28 changes: 16 additions & 12 deletions pmaw/Request.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def __init__(self, payload, filter_fn, kind, max_results_per_request, max_ids_pe
if filter_fn is not None and not callable(filter_fn):
raise ValueError('filter_fn must be a callable function')

if safe_exit and self.payload.get('before', None) is None:
if safe_exit and self.payload.get('until', None) is None:
# warn the user not to use safe_exit without setting before,
# doing otherwise will make it impossible to resume without modifying
# future query to use before value from first run
before = int(dt.datetime.now().timestamp())
payload['before'] = before
payload['until'] = before
warnings.warn(f'Using safe_exit without setting before value is not recommended. Setting before to {before}')

if self.praw is not None:
Expand Down Expand Up @@ -186,21 +186,25 @@ def _add_nec_args(self, payload):
payload['size'] = self.max_results_per_request

if 'sort' not in payload:
payload['sort'] = 'desc'
elif payload.get('sort') != 'desc':
payload['sort'] = 'created_utc'
elif payload.get('sort') != 'created_utc':
err_msg = "Support for non-default sort has not been implemented as it may cause unexpected results"
raise NotImplementedError(err_msg)

if 'order' not in payload:
payload['order'] = 'asc'

if 'metadata' not in payload:
payload['metadata'] = 'true'
if 'before' not in payload:
payload['before'] = int(dt.datetime.now().timestamp())
if 'until' not in payload:
payload['until'] = int(dt.datetime.now().timestamp())
if 'filter' in payload:
if not isinstance(payload['filter'], list):
if isinstance(payload['filter'], str):
payload['filter'] = [payload['filter']]
else:
payload['filter'] = list(payload['filter'])

# make sure that the created_utc field is returned
if 'created_utc' not in payload['filter']:
payload['filter'].append('created_utc')
Expand Down Expand Up @@ -255,22 +259,22 @@ def gen_url_payloads(self, url, batch_size, search_window):
self.req_list.extend(url_payloads)

else:
if 'after' not in self.payload:
if 'since' not in self.payload:
search_window = dt.timedelta(days=search_window)
num = batch_size
before = self.payload['before']
before = self.payload['until']
after = int((dt.datetime.fromtimestamp(
before) - search_window).timestamp())

# set before to after for future time slices
self.payload['before'] = after
self.payload['until'] = after

else:
before = self.payload['before']
after = self.payload['after']
before = self.payload['until']
after = self.payload['since']

# set before to avoid repeated time slices when there are missed responses
self.payload['before'] = after
self.payload['until'] = after
num = batch_size

# generate payloads
Expand Down
12 changes: 6 additions & 6 deletions pmaw/utils/slices.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
log = logging.getLogger(__name__)


def timeslice(after, before, num):
def timeslice(since, until, num):
log.debug(
f'Generating {num} slices between {after} and {before}')
return [int((before-after)*i/num + after) for i in range(num+1)]
f'Generating {num} slices between {since} and {until}')
return [int((until-since)*i/num + since) for i in range(num+1)]


def mapslice(payload, after, before):
payload['before'] = before
payload['after'] = after
def mapslice(payload, since, until):
payload['until'] = until
payload['since'] = since
return payload

0 comments on commit f1156cd

Please sign in to comment.