rewrite fetch_posts.py from scratch

now it should be properly async by fetching posts in a separate task and sending them across
a queue to a task that inserts to the DB
This commit is contained in:
io 2021-07-26 04:59:04 +00:00
parent c22a493dff
commit 330fdc2809
1 changed files with 176 additions and 188 deletions

View File

@ -1,223 +1,211 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# SPDX-License-Identifier: EUPL-1.2 # SPDX-License-Identifier: AGPL-3.0-only
import re
import sys import sys
import json
import anyio import anyio
import asqlite
import sqlite3
import asyncio
import aiohttp import aiohttp
import argparse import platform
import functions import pendulum
import aiosqlite
import contextlib import contextlib
from http import HTTPStatus from yarl import URL
from pleroma import Pleroma, http_session_factory from utils import shield
from pleroma import Pleroma
from bs4 import BeautifulSoup
from functools import partial
from third_party.utils import extract_post_content
PATTERNS = { USER_AGENT = (
"handle": re.compile(r'^.*@(.+)'), 'fedi-ebooks; '
"base_url": re.compile(r'https?:\/\/(.*)'), f'{aiohttp.__version__}; '
"webfinger_template_url": re.compile(r'template="([^"]+)"'), f'{platform.python_implementation()}/{platform.python_version()}; '
"post_id": re.compile(r'[^\/]+$'), )
}
@contextlib.asynccontextmanager UTC = pendulum.timezone('UTC')
async def get_db(): JSON_CONTENT_TYPE = 'application/json'
async with asqlite.connect('toots.db') as conn: ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
async with conn.cursor() as cur:
await cur.execute("""
CREATE TABLE IF NOT EXISTS toots (
sortid INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
id VARCHAR NOT NULL,
cw VARCHAR,
userid VARCHAR NOT NULL,
uri VARCHAR NOT NULL,
content VARCHAR NOT NULL
)
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS cursors (
userid VARCHAR PRIMARY KEY,
next_page VARCHAR NOT NULL
)
""")
await cur.execute("""
CREATE TRIGGER IF NOT EXISTS dedup
AFTER INSERT ON toots
FOR EACH ROW BEGIN
DELETE FROM toots
WHERE rowid NOT IN (
SELECT MIN(sortid)
FROM toots GROUP BY uri
);
END
""")
await conn.commit()
yield conn
async def main(): class PostFetcher:
args = functions.parse_args(description='Log in and download posts.') def __init__(self, *, config):
cfg = functions.load_config(args.cfg) self.config = config
async with ( async def __aenter__(self):
Pleroma(api_base_url=cfg['site'], access_token=cfg['access_token']) as client, stack = contextlib.AsyncExitStack()
get_db() as db, db.cursor() as cur, self._fedi = await stack.enter_async_context(
http_session_factory() as http, Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']),
): )
try: self._http = await stack.enter_async_context(
following = await client.following() aiohttp.ClientSession(
except aiohttp.ClientResponseError as exc: headers={
if exc.status == HTTPStatus.FORBIDDEN: 'User-Agent': USER_AGENT,
print(f'The provided access token in {args.cfg} is invalid.', file=sys.stderr) 'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]),
sys.exit(1) },
trust_env=True,
raise_for_status=True,
),
)
self._db = await stack.enter_async_context(aiosqlite.connect(self.config.get('db_path', 'posts.db')))
self._db.row_factory = aiosqlite.Row
self._ctx_stack = stack
return self
async def __aexit__(self, *excinfo):
return await self._ctx_stack.__aexit__(*excinfo)
async def fetch_all(self):
await self._fedi.verify_credentials()
self._completed_accounts = {}
async with anyio.create_task_group() as tg: async with anyio.create_task_group() as tg:
for acc in following: for acc in await self._fedi.following():
tg.start_soon(fetch_posts, cfg, http, cur, acc) tg.start_soon(self._do_account, acc)
print('Done!') async def _do_account(self, acc):
async with anyio.create_task_group() as tg:
self._completed_accounts[acc['fqn']] = done_ev = anyio.Event()
tx, rx = anyio.create_memory_object_stream()
async with rx, tx:
tg.start_soon(self._process_pages, rx, acc)
tg.start_soon(self._fetch_account, tx, acc)
await done_ev.wait()
# processing is complete, so halt fetching.
# processing may complete before fetching if we get caught up on new posts.
tg.cancel_scope.cancel()
await db.commit() async def _process_pages(self, stream, account):
await db.execute('VACUUM') # compact db done_ev = self._completed_accounts[account['fqn']]
await db.commit()
async def fetch_posts(cfg, http, cur, acc):
next_page = await (await cur.execute('SELECT next_page FROM cursors WHERE userid = ?', (acc['id'],))).fetchone()
direction = 'next'
if next_page is not None:
next_page ,= next_page
direction = 'prev'
print('Downloading posts for user @' + acc['acct'])
page = await fetch_first_page(cfg, http, acc, next_page)
if 'next' not in page and 'prev' not in page:
# there's only one page of results, don't bother doing anything special
pass
else:
# this is for when we're all done. it points to the activities created *after* we started fetching.
next_page = page['prev']
print('Downloading and saving posts', end='', flush=True)
done = False
while not done and len(page['orderedItems']) > 0:
try: try:
async with anyio.create_task_group() as tg: async for activity in stream:
for obj in page['orderedItems']: try:
tg.start_soon(process_object, cur, acc, obj) await self._insert_activity(activity)
except DoneWithAccount: except aiosqlite.IntegrityError as exc:
done = True # LOL sqlite error handling is so bad
continue if exc.args[0].startswith('UNIQUE constraint failed: '):
except anyio.ExceptionGroup as eg: # this means we've encountered an item we already have saved
for exc in eg.exceptions: done_ev.set()
if isinstance(exc, DoneWithAccount): break
done = True
continue raise
finally:
print('COMMIT')
await self._db.commit()
async def _insert_activity(self, activity):
if activity['type'] != 'Create':
# this isn't a post but something else (like, boost, reaction, etc)
return
obj = activity['object']
content = extract_post_content(obj['content'])
await self._db.execute(
"""
INSERT INTO posts (post_id, summary, content, published_at)
VALUES (?, ?, ?, ?)
""",
(
obj['id'],
obj['summary'],
extract_post_content(obj['content']),
pendulum.parse(obj['published']).astimezone(pendulum.timezone('UTC')).timestamp(),
),
)
@shield
async def _fetch_account(self, tx, account):
done_ev = self._completed_accounts[account['fqn']]
# get the next/previous page
try: try:
async with http.get(page[direction], timeout=15) as resp: outbox = await self.fetch_outbox(account['fqn'])
page = await resp.json() except Exception as exc:
except asyncio.TimeoutError:
print('HTTP timeout, site did not respond within 15 seconds', file=sys.stderr)
except KeyError:
print("Couldn't get next page - we've probably got all the posts", file=sys.stderr)
except KeyboardInterrupt:
done = True
break
except aiohttp.ClientResponseError as exc:
if exc.status == HTTPStatus.TOO_MANY_REQUESTS:
print("We're rate limited. Skipping to next account.")
done = True
break
raise
except Exception:
import traceback import traceback
print('An error occurred while trying to obtain more posts:', file=sys.stderr) traceback.print_exception(type(exc), exc, exc.__traceback__)
traceback.print_exc() return
print('.', end='', flush=True) print(f'Fetching posts for {account["acct"]}...')
else:
# the while loop ran without breaking
await cur.execute('REPLACE INTO cursors (userid, next_page) VALUES (?, ?)', (acc['id'], next_page))
await cur.connection.commit()
print(' Done!') next_page_url = outbox['first']
while True:
print(f'Fetching {next_page_url}... ', end='', flush=True)
async with self._http.get(next_page_url) as resp: page = await resp.json()
print('done.')
async def finger(cfg, http, acc): for activity in page['orderedItems']:
instance = PATTERNS['handle'].search(acc['acct']) try:
if instance is None: await tx.send(activity)
instance = PATTERNS['base_url'].search(cfg['site'])[1] except anyio.BrokenResourceError:
else: # already closed means we're already done
instance = instance[1] return
# 1. download host-meta to find webfinger URL # show progress
async with http.get('https://{}/.well-known/host-meta'.format(instance), timeout=10) as resp: #print('.', end='', flush=True)
host_meta = await resp.text()
# 2. use webfinger to find user's info page if not (next_page_url := page.get('next')):
webfinger_url = PATTERNS['webfinger_template_url'].search(host_meta).group(1) #done_ev.set()
webfinger_url = webfinger_url.format(uri='{}@{}'.format(acc['username'], instance)) break
async with http.get(webfinger_url, headers={'Accept': 'application/json'}, timeout=10) as resp: done_ev.set()
profile = await resp.json()
for link in profile['links']: async def fetch_outbox(self, handle):
if link['rel'] == 'self': """finger handle, a fully-qualified ActivityPub actor name, returning their outbox URL"""
# this is a link formatted like 'https://instan.ce/users/username', which is what we need # it's fucking incredible how overengineered ActivityPub is btw
return link['href'] print('Fingering ', handle, '...', sep='')
print("Couldn't find a valid ActivityPub outbox URL.", file=sys.stderr) username, at, instance = handle.lstrip('@').partition('@')
sys.exit(1) assert at == '@'
class DoneWithAccount(Exception): pass # i was planning on doing /.well-known/host-meta to find the webfinger URL, but
# 1) honk does not support host-meta
# 2) WebFinger is always located at the same location anyway
async def process_object(cur, acc, obj): profile_url = await self._finger_actor(username, instance)
if obj['type'] != 'Create':
# this isn't a toot/post/status/whatever, it's a boost or a follow or some other activitypub thing. ignore
return
# its a toost baby try:
content = obj['object']['content'] async with self._http.get(profile_url) as resp: profile = await resp.json()
toot = extract_toot(content) except aiohttp.ContentTypeError:
try: # we didn't get JSON, so just guess the outbox URL
await cur.execute('SELECT COUNT(*) FROM toots WHERE uri = ?', (obj['object']['id'],)) outbox_url = profile_url + '/outbox'
existing = await cur.fetchone() else:
if existing is not None and existing[0]: outbox_url = profile['outbox']
# we've caught up to the notices we've already downloaded, so we can stop now
# you might be wondering, 'lynne, what if the instance ratelimits you after 40 posts, and they've made 60 since main.py was last run? wouldn't the bot miss 20 posts and never be able to see them?' to which i reply, 'i know but i don't know how to fix it'
raise DoneWithAccount
await insert_toot(cur, acc, obj, toot)
except sqlite3.Error:
pass # ignore any toots that don't successfully go into the DB
async def fetch_first_page(cfg, http, acc, next_page): async with self._http.get(outbox_url) as resp: outbox = await resp.json()
# download a page of the outbox assert outbox['type'] == 'OrderedCollection'
if not next_page: return outbox
print('Fingering UwU...')
# find the user's activitypub outbox
outbox_url = await finger(cfg, http, acc) + '/outbox?page=true'
else:
outbox_url = next_page
async with http.get(outbox_url, timeout=15) as resp: async def _finger_actor(self, username, instance):
return await resp.json() # despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support
# HTTPS-over-onion
finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}'
async with self._http.get(finger_url) as resp: finger_result = await resp.json()
return (profile_url := self._parse_webfinger_result(username, instance, finger_result))
def extract_toot(toot): def _parse_webfinger_result(self, username, instance, finger_result):
toot = functions.extract_toot(toot) """given webfinger data, return profile URL for handle"""
toot = toot.replace('@', '@\u200B') # put a zws between @ and username to avoid mentioning def check_content_type(type, ct): return ct == type or ct.startswith(type+';')
return(toot) check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE)
async def insert_toot(cursor, acc, obj, content): try:
post_id = PATTERNS['post_id'].search(obj['object']['id']).group(0) # note: the server might decide to return multiple links
await cursor.execute('REPLACE INTO toots (id, cw, userid, uri, content) VALUES (?, ?, ?, ?, ?)', ( # so we need to decide how to prefer one.
post_id, # i'd put "and URL(template).host == instance" here,
obj['object']['summary'] or None, # but some instances have no subdomain for the handle yet use a subdomain for the canonical URL.
acc['id'], # Additionally, an instance could theoretically serve profile pages over I2P and the clearnet,
obj['object']['id'], # for example.
content, return (profile_url := next(
)) link['href']
for link in finger_result['links']
if link['rel'] == 'self' and check_ap(link['type'])
))
except StopIteration:
# this should never happen either
raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL')
async def amain():
import json5 as json
with open('config.json' if len(sys.argv) < 2 else sys.argv[1]) as f: config = json.load(f)
async with PostFetcher(config=config) as fetcher: await fetcher.fetch_all()
def main():
anyio.run(amain)
if __name__ == '__main__': if __name__ == '__main__':
anyio.run(main) main()