#!/usr/bin/env python3 # SPDX-License-Identifier: AGPL-3.0-only import sys import anyio import aiohttp import pendulum import operator import aiosqlite import contextlib from yarl import URL from pleroma import Pleroma from bs4 import BeautifulSoup from functools import partial from typing import Iterable, NewType from utils import shield, HandleRateLimits, suppress, http_session_factory from third_party.utils import extract_post_content UTC = pendulum.timezone('UTC') JSON_CONTENT_TYPE = 'application/json' ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json' MIGRATION_VERSION = 1 class PostFetcher: def __init__(self, *, config): self.config = config self.erroneous_accounts = [] async def __aenter__(self): stack = contextlib.AsyncExitStack() self._fedi = await stack.enter_async_context( Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']), ) self._http = await stack.enter_async_context( http_session_factory( headers={ 'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]), }, trust_env=True, raise_for_status=True, ), ) self._rl_handler = HandleRateLimits(self._http) self._db = await stack.enter_async_context(aiosqlite.connect(self.config['db_path'])) await self._maybe_run_migrations() await self._db.commit() self._db.row_factory = aiosqlite.Row self._ctx_stack = stack return self async def _maybe_run_migrations(self): async with self._db.cursor() as cur, suppress(aiosqlite.OperationalError): if await (await cur.execute('SELECT migration_version FROM migrations')).fetchone(): return await self._run_migrations() async def _run_migrations(self): # TODO proper migrations, not just "has the schema ever been run" migrations async with await (anyio.Path(__file__).parent/'schema.sql').open() as f: schema = await f.read() async with self._db.cursor() as cur: await cur.executescript(schema) await cur.execute('INSERT INTO migrations (migration_version) VALUES (?)', (MIGRATION_VERSION,)) async def __aexit__(self, *excinfo): return await self._ctx_stack.__aexit__(*excinfo) async def fetch_all(self): """fetch all following accounts, or an iterable of accounts if provided""" await self._fedi.verify_credentials() self._completed_accounts = {} async with anyio.create_task_group() as tg: # XXX it's assumed that no more than one API page of people are being followed at one time for account in await self._fedi.following(): profile_url = account['url'] tg.start_soon(self._do_account, profile_url) async def _do_account(self, profile_url: str): async with anyio.create_task_group() as tg: self._completed_accounts[profile_url] = done_ev = anyio.Event() tx, rx = anyio.create_memory_object_stream() async with rx, tx: tg.start_soon(self._process_pages, rx, profile_url) tg.start_soon(self._fetch_account, tx, profile_url) await done_ev.wait() # processing is complete, so halt fetching. # processing may complete before fetching if we get caught up on new posts. tg.cancel_scope.cancel() async def _process_pages(self, stream, account): done_ev = self._completed_accounts[account] try: async for activity in stream: try: await self._insert_activity(activity) except aiosqlite.IntegrityError as exc: # LOL sqlite error handling is so bad if exc.args[0].startswith('UNIQUE constraint failed: '): # this means we've encountered an item we already have saved # TODO we need to ignore this if we don't actually have all the posts. # For example, if a prior fetch was interrupted, we'll have k pages of the most recent posts, # but no more. But since we still have the most recent page saved, it'll *look* like # we've saved everything, since we stop as soon as we encounter a post we already have. # To fix this we can check against totalItems in the user's outbox. break self.erroneous_accounts.append(account) raise finally: print('Saving posts from', account, 'to the DB') await self._db.commit() done_ev.set() async def _insert_activity(self, activity): if activity['type'] != 'Create': # this isn't a post but something else (like, boost, reaction, etc) return obj = activity['object'] await self._db.execute( """ INSERT INTO posts (post_id, summary, content, published_at) VALUES (?, ?, ?, ?) """, ( obj['id'], # Pleroma returns an empty string here for posts without a CW, # which is semantically incorrect IMO obj['summary'] or None, extract_post_content(obj['content']), pendulum.parse(obj['published']).astimezone(pendulum.timezone('UTC')).timestamp(), ), ) # TODO figure out why i put shield here lol @shield async def _fetch_account(self, tx, profile_url): done_ev = self._completed_accounts[profile_url] try: outbox = await self.fetch_outbox(profile_url) except Exception as exc: import traceback traceback.print_exception(type(exc), exc, exc.__traceback__) done_ev.set() self.erroneous_accounts.append(profile_url) return print(f'Fetching posts for {profile_url}...') next_page_url = outbox['first'] while True: print(f'Fetching {next_page_url}... ') async with self._rl_handler.request('GET', next_page_url) as resp: page = await resp.json() for activity in page['orderedItems']: try: await tx.send(activity) except anyio.BrokenResourceError: # already closed means we're already done return # show progress #print('.', end='', flush=True) if not (next_page_url := page.get('next')): #done_ev.set() break done_ev.set() async def fetch_outbox(self, profile_url): """fetch the first page of the outbox for the given ActivityPub profile URL""" try: async with self._http.get(profile_url) as resp: profile = await resp.json() except aiohttp.ContentTypeError: # we didn't get JSON, so just guess the outbox URL outbox_url = profile_url + '/outbox' else: outbox_url = profile['outbox'] async with self._http.get(outbox_url) as resp: outbox = await resp.json() assert outbox['type'] == 'OrderedCollection' return outbox async def _finger_actor(self, username, instance): # despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support # HTTPS-over-onion finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}' async with self._http.get(finger_url) as resp: finger_result = await resp.json() return (profile_url := self._parse_webfinger_result(username, instance, finger_result)) def _parse_webfinger_result(self, username, instance, finger_result): """given webfinger data, return profile URL for handle""" def check_content_type(type, ct): return ct == type or ct.startswith(type+';') check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE) try: # note: the server might decide to return multiple links # so we need to decide how to prefer one. # i'd put "and yarl.URL(template).host == instance" here, # but some instances have no subdomain for the handle yet use a subdomain for the canonical URL. # Additionally, an instance could theoretically serve profile pages over I2P and the clearnet, # for example. return (profile_url := next( link['href'] for link in finger_result['links'] if link['rel'] == 'self' and check_ap(link['type']) )) except StopIteration: # this should never happen either raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL') async def amain(): import json5 as json import third_party.utils as utils args = utils.arg_parser_factory(description='Fetch posts from all followed accounts').parse_args() config = utils.load_config(args.cfg) async with PostFetcher(config=config) as fetcher: await fetcher.fetch_all() if (accs := fetcher.erroneous_accounts): print( 'Exiting unsuccessfully due to previous errors in these accounts:', ', '.join(accs), file=sys.stderr, ) sys.exit(1) def main(): try: anyio.run(amain) except KeyboardInterrupt: # suppress the lengthy traceback sys.exit(1) if __name__ == '__main__': main()