mirror-bot/mirror_bot.py

175 lines
5.5 KiB
Python
Raw Normal View History

2022-05-19 18:28:46 -07:00
#!/usr/bin/env python
# SPDX-License-Identifer: AGPL-3.0-only
import io
import sys
import anyio
import aiohttp
import pleroma
import argparse
import platform
import pendulum
import aiosqlite
import contextlib
import qtoml as toml
from utils import suppress
from pleroma import Pleroma
from functools import partial
USER_AGENT = (
'mirror-bot; '
f'{aiohttp.__version__}; '
f'{platform.python_implementation()}/{platform.python_version()}'
)
UTC = pendulum.timezone('UTC')
JSON_CONTENT_TYPE = 'application/json'
ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
MIGRATION_VERSION = 1
class PostMirror:
def __init__(self, *, config):
self.config = config
async def __aenter__(self):
stack = contextlib.AsyncExitStack()
self._fedi = await stack.enter_async_context(
Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']),
)
self._http = await stack.enter_async_context(
aiohttp.ClientSession(
headers={
'User-Agent': USER_AGENT,
'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]),
},
trust_env=True,
raise_for_status=True,
),
)
self._ctx_stack = stack
return self
async def __aexit__(self, *excinfo):
return await self._ctx_stack.__aexit__(*excinfo)
async def mirror_posts(self):
outbox = await self.fetch_outbox(self.config['account'])
async with self._http.get(outbox['first']) as resp: page = await resp.json()
last_post = page['orderedItems'][0]['object']
try:
with open(self.config['timestamp_path']) as f:
last_mirrored_ts = pendulum.from_timestamp(float(f.read()))
except FileNotFoundError:
last_mirrored_ts = pendulum.from_timestamp(0.0)
last_post_ts = pendulum.parse(last_post['published'])
if last_post_ts < last_mirrored_ts:
print('Nothing to do')
return
# mirror the post and all its attachments
attachments = [None] * len(last_post['attachment'])
async with anyio.create_task_group() as tg:
for i, attachment in enumerate(last_post['attachment']):
tg.start_soon(self._mirror_attachment, i, attachments, attachment)
assert None not in attachments
await self._fedi.post(
last_post['source'],
cw=last_post['summary'],
visibility='unlisted',
media_ids=attachments,
)
with open(self.config['timestamp_path'], 'w') as f:
f.write(str(pendulum.now('UTC').timestamp()))
async def _mirror_attachment(self, i, out_attachments, attachment):
async with self._http.get(attachment['url']) as resp:
data = await resp.read()
out_attachments[i] = (await self._fedi.post_media(
io.BytesIO(data),
attachment['mediaType'],
filename=attachment['name'],
# TODO support descriptions
))['id']
async def fetch_outbox(self, handle):
"""
finger handle, a fully-qualified ActivityPub actor name,
returning their outbox info
"""
# it's fucking incredible how overengineered ActivityPub is btw
print('Fingering ', handle, '...', sep='')
username, at, instance = handle.lstrip('@').partition('@')
assert at == '@'
# i was planning on doing /.well-known/host-meta to find the webfinger URL, but
# 1) honk does not support host-meta
# 2) WebFinger is always located at the same location anyway
profile_url = await self._finger_actor(username, instance)
try:
async with self._http.get(profile_url) as resp: profile = await resp.json()
except aiohttp.ContentTypeError:
# we didn't get JSON, so just guess the outbox URL
outbox_url = profile_url + '/outbox'
else:
outbox_url = profile['outbox']
async with self._http.get(outbox_url) as resp: outbox = await resp.json()
assert outbox['type'] == 'OrderedCollection'
return outbox
async def _finger_actor(self, username, instance):
# despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support
# HTTPS-over-onion
finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}'
async with self._http.get(finger_url) as resp: finger_result = await resp.json()
return (profile_url := self._parse_webfinger_result(username, instance, finger_result))
def _parse_webfinger_result(self, username, instance, finger_result):
"""given webfinger data, return profile URL for handle"""
def check_content_type(type, ct): return ct == type or ct.startswith(type+';')
check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE)
try:
# note: the server might decide to return multiple links
# so we need to decide how to prefer one.
# i'd put "and yarl.URL(template).host == instance" here,
# but some instances have no subdomain for the handle yet use a subdomain for the canonical URL.
# Additionally, an instance could theoretically serve profile pages over I2P and the clearnet,
# for example.
return (profile_url := next(
link['href']
for link in finger_result['links']
if link['rel'] == 'self' and check_ap(link['type'])
))
except StopIteration:
# this should never happen either
raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL')
async def amain():
parser = argparse.ArgumentParser(description='Mirror posts from another fediverse account')
parser.add_argument(
'-c', '--cfg', dest='cfg', default='config.toml', nargs='?',
help='Specify a custom location for the config file.'
)
args = parser.parse_args()
with open(args.cfg) as f:
config = toml.load(f)
async with PostMirror(config=config) as pm: await pm.mirror_posts()
def main():
try:
anyio.run(amain)
except KeyboardInterrupt:
sys.exit(1)
if __name__ == '__main__':
main()