commit 1d8b4fd8c9328befd42fdb577b63c40e792e1ab7 Author: io Date: Fri May 20 01:28:46 2022 +0000 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..01fe8e4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.toml +!*.example.toml +*.timestamp diff --git a/config.example.toml b/config.example.toml new file mode 100644 index 0000000..e1efc95 --- /dev/null +++ b/config.example.toml @@ -0,0 +1,4 @@ +site = "https://botsin.space" +access_token = "" +timestamp_path = "last_post.timestamp" +account = "example_1@example.com" diff --git a/mirror_bot.py b/mirror_bot.py new file mode 100755 index 0000000..3f1c247 --- /dev/null +++ b/mirror_bot.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +# SPDX-License-Identifer: AGPL-3.0-only + +import io +import sys +import anyio +import aiohttp +import pleroma +import argparse +import platform +import pendulum +import aiosqlite +import contextlib +import qtoml as toml +from utils import suppress +from pleroma import Pleroma +from functools import partial + +USER_AGENT = ( + 'mirror-bot; ' + f'{aiohttp.__version__}; ' + f'{platform.python_implementation()}/{platform.python_version()}' +) + +UTC = pendulum.timezone('UTC') +JSON_CONTENT_TYPE = 'application/json' +ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json' + +MIGRATION_VERSION = 1 + +class PostMirror: + def __init__(self, *, config): + self.config = config + + async def __aenter__(self): + stack = contextlib.AsyncExitStack() + self._fedi = await stack.enter_async_context( + Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']), + ) + self._http = await stack.enter_async_context( + aiohttp.ClientSession( + headers={ + 'User-Agent': USER_AGENT, + 'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]), + }, + trust_env=True, + raise_for_status=True, + ), + ) + self._ctx_stack = stack + return self + + async def __aexit__(self, *excinfo): + return await self._ctx_stack.__aexit__(*excinfo) + + async def mirror_posts(self): + outbox = await self.fetch_outbox(self.config['account']) + async with self._http.get(outbox['first']) as resp: page = await resp.json() + last_post = page['orderedItems'][0]['object'] + + try: + with open(self.config['timestamp_path']) as f: + last_mirrored_ts = pendulum.from_timestamp(float(f.read())) + except FileNotFoundError: + last_mirrored_ts = pendulum.from_timestamp(0.0) + + last_post_ts = pendulum.parse(last_post['published']) + + if last_post_ts < last_mirrored_ts: + print('Nothing to do') + return + + # mirror the post and all its attachments + attachments = [None] * len(last_post['attachment']) + async with anyio.create_task_group() as tg: + for i, attachment in enumerate(last_post['attachment']): + tg.start_soon(self._mirror_attachment, i, attachments, attachment) + assert None not in attachments + await self._fedi.post( + last_post['source'], + cw=last_post['summary'], + visibility='unlisted', + media_ids=attachments, + ) + + with open(self.config['timestamp_path'], 'w') as f: + f.write(str(pendulum.now('UTC').timestamp())) + + async def _mirror_attachment(self, i, out_attachments, attachment): + async with self._http.get(attachment['url']) as resp: + data = await resp.read() + out_attachments[i] = (await self._fedi.post_media( + io.BytesIO(data), + attachment['mediaType'], + filename=attachment['name'], + # TODO support descriptions + ))['id'] + + async def fetch_outbox(self, handle): + """ + finger handle, a fully-qualified ActivityPub actor name, + returning their outbox info + """ + # it's fucking incredible how overengineered ActivityPub is btw + print('Fingering ', handle, '...', sep='') + + username, at, instance = handle.lstrip('@').partition('@') + assert at == '@' + + # i was planning on doing /.well-known/host-meta to find the webfinger URL, but + # 1) honk does not support host-meta + # 2) WebFinger is always located at the same location anyway + + profile_url = await self._finger_actor(username, instance) + + try: + async with self._http.get(profile_url) as resp: profile = await resp.json() + except aiohttp.ContentTypeError: + # we didn't get JSON, so just guess the outbox URL + outbox_url = profile_url + '/outbox' + else: + outbox_url = profile['outbox'] + + async with self._http.get(outbox_url) as resp: outbox = await resp.json() + assert outbox['type'] == 'OrderedCollection' + return outbox + + async def _finger_actor(self, username, instance): + # despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support + # HTTPS-over-onion + finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}' + async with self._http.get(finger_url) as resp: finger_result = await resp.json() + return (profile_url := self._parse_webfinger_result(username, instance, finger_result)) + + def _parse_webfinger_result(self, username, instance, finger_result): + """given webfinger data, return profile URL for handle""" + def check_content_type(type, ct): return ct == type or ct.startswith(type+';') + check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE) + + try: + # note: the server might decide to return multiple links + # so we need to decide how to prefer one. + # i'd put "and yarl.URL(template).host == instance" here, + # but some instances have no subdomain for the handle yet use a subdomain for the canonical URL. + # Additionally, an instance could theoretically serve profile pages over I2P and the clearnet, + # for example. + return (profile_url := next( + link['href'] + for link in finger_result['links'] + if link['rel'] == 'self' and check_ap(link['type']) + )) + except StopIteration: + # this should never happen either + raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL') + +async def amain(): + parser = argparse.ArgumentParser(description='Mirror posts from another fediverse account') + parser.add_argument( + '-c', '--cfg', dest='cfg', default='config.toml', nargs='?', + help='Specify a custom location for the config file.' + ) + args = parser.parse_args() + with open(args.cfg) as f: + config = toml.load(f) + async with PostMirror(config=config) as pm: await pm.mirror_posts() + +def main(): + try: + anyio.run(amain) + except KeyboardInterrupt: + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/pleroma.py b/pleroma.py new file mode 100644 index 0000000..dae25ef --- /dev/null +++ b/pleroma.py @@ -0,0 +1,183 @@ +# SPDX-License-Identifier: AGPL-3.0-only + +import sys +import yarl +import json +import hashlib +import aiohttp +from multidict import MultiDict +from http import HTTPStatus + +def http_session_factory(headers={}): + py_version = '.'.join(map(str, sys.version_info)) + user_agent = ( + 'pleroma-ebooks (https://lab.freak.university/KayFaraday/pleroma-ebooks); ' + 'aiohttp/{aiohttp.__version__}; ' + 'python/{py_version}' + ) + return aiohttp.ClientSession( + headers={'User-Agent': user_agent, **headers}, + ) + +class BadRequest(Exception): + pass + +class LoginFailed(Exception): + pass + +class Pleroma: + def __init__(self, *, api_base_url, access_token): + self.api_base_url = api_base_url.rstrip('/') + self.access_token = access_token.strip() + self._session = http_session_factory({'Authorization': 'Bearer ' + self.access_token}) + self._logged_in_id = None + + async def __aenter__(self): + self._session = await self._session.__aenter__() + return self + + async def __aexit__(self, *excinfo): + return await self._session.__aexit__(*excinfo) + + async def request(self, method, path, **kwargs): + # blocklist of some horrible instances + if hashlib.sha256( + yarl.URL(self.api_base_url).host.encode() + + bytes.fromhex('d590e3c48d599db6776e89dfc8ebaf53c8cd84866a76305049d8d8c5d4126ce1') + ).hexdigest() in { + '56704d4d95b882e81c8e7765e9079be0afc4e353925ba9add8fd65976f52db83', + '1932431fa41a0baaccce7815115b01e40e0237035bb155713712075b887f5a19', + 'a42191105a9f3514a1d5131969c07a95e06d0fdf0058f18e478823bf299881c9', + }: + raise RuntimeError('stop being a chud') + + async with self._session.request(method, self.api_base_url + path, **kwargs) as resp: + if resp.status == HTTPStatus.BAD_REQUEST: + raise BadRequest((await resp.json())['error']) + #resp.raise_for_status() + return await resp.json() + + async def verify_credentials(self): + return await self.request('GET', '/api/v1/accounts/verify_credentials') + + me = verify_credentials + + async def _get_logged_in_id(self): + if self._logged_in_id is not None: + return self._logged_in_id + + me = await self.me() + + try: + self._logged_in_id = me['id'] + except KeyError: + raise LoginFailed(me) + + return self._logged_in_id + + async def following(self, account_id=None): + account_id = account_id or await self._get_logged_in_id() + return await self.request('GET', f'/api/v1/accounts/{account_id}/following') + + @staticmethod + def _unpack_id(obj): + if isinstance(obj, dict) and 'id' in obj: + return obj['id'] + return obj + + async def status_context(self, id): + id = self._unpack_id(id) + return await self.request('GET', f'/api/v1/statuses/{id}/context') + + async def post(self, content, *, in_reply_to_id=None, cw=None, visibility=None, media_ids=()): + if visibility not in {None, 'private', 'public', 'unlisted', 'direct'}: + raise ValueError('invalid visibility', visibility) + + data = MultiDict(status=content) + if in_reply_to_id := self._unpack_id(in_reply_to_id): + data['in_reply_to_id'] = in_reply_to_id + if visibility is not None: + data['visibility'] = visibility + # normally, this would be a check against None. + # however, apparently Pleroma serializes posts without CWs as posts with an empty string + # as a CW, so per the robustness principle we'll accept that too. + if cw: + data['spoiler_text'] = cw + + for media_id in media_ids: + data.add('media_ids[]', media_id) + + return await self.request('POST', '/api/v1/statuses', data=data) + + async def post_media(self, fp, mime_type, filename=None, description=None, focus=None): + data = aiohttp.FormData() + data.add_field('file', fp, content_type=mime_type, filename=filename) + if description is not None: + data.add_field('description', description) + if focus is not None: + data.add_field(','.join(map(str, focus))) + return await self.request('POST', '/api/v2/media', data=data) + + async def reply(self, to_status, content, *, cw=None): + user_id = await self._get_logged_in_id() + + mentioned_accounts = {} + mentioned_accounts[to_status['account']['id']] = to_status['account']['acct'] + for account in to_status['mentions']: + if account['id'] != user_id and account['id'] not in mentioned_accounts: + mentioned_accounts[account['id']] = account['acct'] + + content = ''.join('@' + x + ' ' for x in mentioned_accounts.values()) + content + + visibility = 'unlisted' if to_status['visibility'] == 'public' else to_status['visibility'] + if not cw and 'spoiler_text' in to_status and to_status['spoiler_text']: + cw = 're: ' + to_status['spoiler_text'] + + return await self.post(content, in_reply_to_id=to_status['id'], cw=cw, visibility=visibility) + + async def favorite(self, id): + id = self._unpack_id(id) + return await self.request('POST', f'/api/v1/statuses/{id}/favourite') + + async def unfavorite(self, id): + id = self._unpack_id(id) + return await self.request('POST', f'/api/v1/statuses/{id}/unfavourite') + + async def react(self, id, reaction): + id = self._unpack_id(id) + return await self.request('PUT', f'/api/v1/pleroma/statuses/{id}/reactions/{reaction}') + + async def remove_reaction(self, id, reaction): + id = self._unpack_id(id) + return await self.request('DELETE', f'/api/v1/pleroma/statuses/{id}/reactions/{reaction}') + + async def pin(self, id): + id = self._unpack_id(id) + return await self.request('POST', f'/api/v1/statuses/{id}/pin') + + async def unpin(self, id): + id = self._unpack_id(id) + return await self.request('POST', f'/api/v1/statuses/{id}/unpin') + + async def stream(self, stream_name, *, target_event_type=None): + async with self._session.ws_connect( + self.api_base_url + f'/api/v1/streaming?stream={stream_name}&access_token={self.access_token}' + ) as ws: + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + event = msg.json() + # the only event type that doesn't define `payload` is `filters_changed` + if event['event'] == 'filters_changed': + yield event + elif target_event_type is None or event['event'] == target_event_type: + # don't ask me why the payload is also JSON encoded smh + yield json.loads(event['payload']) + + async def stream_notifications(self): + async for notif in self.stream('user:notification', target_event_type='notification'): + yield notif + + async def stream_mentions(self): + async for notif in self.stream_notifications(): + if notif['type'] == 'mention': + yield notif diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b1649bc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +aiohttp ~= 3.0 +qtoml ~= 0.3.1 +anyio ~= 3.0 +aiosqlite ~= 0.17.0 +pendulum ~= 2.0 diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..7d08eae --- /dev/null +++ b/utils.py @@ -0,0 +1,21 @@ +# SPDX-License-Identifier: AGPL-3.0-only + +import anyio +import contextlib +from functools import wraps + +def as_corofunc(f): + @wraps(f) + async def wrapped(*args, **kwargs): + # can't decide if i want an `anyio.sleep(0)` here. + return f(*args, **kwargs) + return wrapped + +def as_async_cm(cls): + @wraps(cls, updated=()) # cls.__dict__ doesn't support .update() + class wrapped(cls, contextlib.AbstractAsyncContextManager): + __aenter__ = as_corofunc(cls.__enter__) + __aexit__ = as_corofunc(cls.__exit__) + return wrapped + +suppress = as_async_cm(contextlib.suppress)