Skip to content

Commit

Permalink
ATProto: first pass at polling posts (timelines)
Browse files Browse the repository at this point in the history
for #694
  • Loading branch information
snarfed committed Nov 14, 2023
1 parent 9c5adab commit a5b5078
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 3 deletions.
63 changes: 63 additions & 0 deletions atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,13 @@ def poll_notifications():
Uses the ``listNotifications`` endpoint, which is intended for end users. 🤷
https://github.com/bluesky-social/atproto/discussions/1538
TODO: unify with poll_posts
"""
repos = {r.key.id(): r for r in AtpRepo.query()}
logger.info(f'Got {len(repos)} repos')
if not repos:
return

# TODO: switch from atproto_did to copies
users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos)))
Expand Down Expand Up @@ -459,3 +463,62 @@ def poll_notifications():
# User yet.

return 'OK'


# URL route is registered in hub.py
def poll_posts():
"""Fetches and enqueueus new posts from the AppView for our users.
Uses the ``getTimeline`` endpoint, which is intended for end users. 🤷
TODO: unify with poll_notifications
"""
repos = {r.key.id(): r for r in AtpRepo.query()}
logger.info(f'Got {len(repos)} repos')
if not repos:
return

# TODO: switch from atproto_did to copies
users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos)))
for cls in set(PROTOCOLS.values())
if cls and cls != ATProto))

# TODO: convert to Session for connection pipelining!
client = Client(f'https://{os.environ["APPVIEW_HOST"]}',
headers={'User-Agent': USER_AGENT})

for user in users:
logging.debug(f'Fetching notifs for {user.key.id()}')

# TODO: store and use cursor
# seenAt would be easier, but they don't support it yet
# https://github.com/bluesky-social/atproto/issues/1636
repo = repos[user.atproto_did]
client.session['accessJwt'] = service_jwt(os.environ['APPVIEW_HOST'],
repo_did=user.atproto_did,
privkey=repo.signing_key)
resp = client.app.bsky.feed.getTimeline()
for item in resp['feed']:
uri = item['post']['uri']
logger.debug(f'Got {uri}: {json_dumps(item, indent=2)}')

# TODO: handle reposts once we have a URI for them
# https://github.com/bluesky-social/atproto/issues/1811
#
# TODO: verify sig. skipping this for now because we're getting
# these from the AppView, which is trusted, specifically we expect
# the BGS and/or the AppView already checked sigs.
obj = Object.get_or_create(id=uri, bsky=item['post'],
source_protocol=ATProto.ABBREV)
if not obj.status:
obj.status = 'new'
obj.add('feed', user.key)
obj.put()

common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=user.atproto_did)
# note that we don't pass a user param above. it's the acting user,
# which is different for every notif, and may not actually have a BF
# User yet.

return 'OK'
4 changes: 4 additions & 0 deletions hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def health_check():
view_func=atproto.poll_notifications,
methods=['POST'])

app.add_url_rule('/queue/atproto-poll-posts',
view_func=atproto.poll_posts,
methods=['POST'])

@app.post('/queue/atproto-commit')
@flask_util.cloud_tasks_only
def atproto_commit():
Expand Down
95 changes: 92 additions & 3 deletions tests/test_atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import copy
import logging
from unittest import skip
from unittest.mock import call, MagicMock, patch
from unittest.mock import ANY, call, MagicMock, patch

from arroba.datastore_storage import AtpBlock, AtpRemoteBlob, AtpRepo, DatastoreStorage
from arroba.did import encode_did_key
Expand Down Expand Up @@ -704,8 +704,8 @@ def test_send_translates_ids(self, mock_create_task):
@patch('requests.get')
def test_poll_notifications(self, mock_get, mock_create_task):
user_a = self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a')
user_b = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:b')
user_c = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:c')
user_b = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:b')
user_c = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:c')

Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY)
Expand Down Expand Up @@ -809,3 +809,92 @@ def test_poll_notifications(self, mock_get, mock_create_task):
self.assertEqual(follow, follow_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=follow_obj.key.urlsafe(), authed_as='did:plc:a')

@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.get')
def test_poll_posts(self, mock_get, mock_create_task):
user_a = self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a')
user_b = self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:b')
user_c = self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:c')
Repo.create(self.storage, 'did:plc:a', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:b', signing_key=ATPROTO_KEY)
Repo.create(self.storage, 'did:plc:c', signing_key=ATPROTO_KEY)

post_view = {
'$type': 'app.bsky.feed.defs#postView',
'uri': 'at://did:web:alice.com/app.bsky.feed.post/123',
'cid': 'TODO',
'record': {
'$type': 'app.bsky.feed.post',
'text': 'My original post',
'createdAt': '2007-07-07T03:04:05',
},
'author': {
'$type': 'app.bsky.actor.defs#profileViewBasic',
'did': 'did:web:alice.com',
'handle': 'alice.com',
},
}

mock_get.side_effect = [
requests_response({
'cursor': '...',
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
}],
}),
requests_response({
**DID_DOC,
'id': 'did:plc:alice.com',
}),
requests_response({
'cursor': '...',
'feed': [],
}),
requests_response({
'cursor': '...',
'feed': [{
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': post_view,
'reason': {
'$type': 'app.bsky.feed.defs#reasonRepost',
'by': {
'$type': 'app.bsky.actor.defs#profileViewBasic',
'did': 'did:web:bob.com',
'handle': 'bob.com',
},
'indexedAt': '2022-01-02T03:04:05+00:00',
},
}],
}),
]

resp = self.post('/queue/atproto-poll-posts', client=hub.app.test_client())
self.assertEqual(200, resp.status_code)

get_timeline = call(
'https://api.bsky-sandbox.dev/xrpc/app.bsky.feed.getTimeline',
json=None,
headers={
'Content-Type': 'application/json',
'User-Agent': common.USER_AGENT,
'Authorization': ANY,
})
self.assertEqual([
get_timeline,
self.req('https://alice.com/.well-known/did.json'),
get_timeline,
get_timeline,
], mock_get.call_args_list)

post_obj = Object.get_by_id('at://did:web:alice.com/app.bsky.feed.post/123')
self.assertEqual(post_view, post_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=post_obj.key.urlsafe(), authed_as='did:plc:a')

# TODO
# repost_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456')
# self.assertEqual(repost, repost_obj.bsky)
# self.assert_task(mock_create_task, 'receive', '/queue/receive',
# obj=repost_obj.key.urlsafe(), authed_as='did:plc:eve')

0 comments on commit a5b5078

Please sign in to comment.