mirror-bot/mirror_bot.py

205 lines
6.2 KiB
Python
Raw Normal View History

2022-05-19 18:28:46 -07:00
#!/usr/bin/env python
# SPDX-License-Identifer: AGPL-3.0-only
import io
import sys
import anyio
import cursor
2022-05-19 18:28:46 -07:00
import aiohttp
import pleroma
import argparse
import pendulum
import aiosqlite
import contextlib
import qtoml as toml
from pleroma import Pleroma
from functools import partial
2022-07-02 16:35:16 -07:00
from utils import loading_spinner, http_session_factory, HandleRateLimits
2022-05-19 18:28:46 -07:00
JSON_CONTENT_TYPE = 'application/json'
ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
class PostMirror:
def __init__(self, *, config):
self.config = config
async def __aenter__(self):
stack = contextlib.AsyncExitStack()
self._fedi = await stack.enter_async_context(
Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']),
)
self._http = await stack.enter_async_context(
2022-07-02 16:09:46 -07:00
http_session_factory(
2022-05-19 18:28:46 -07:00
headers={
'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]),
},
trust_env=True,
raise_for_status=True,
),
)
2022-07-02 16:35:16 -07:00
self._rl_handler = HandleRateLimits(self._http)
2022-05-19 18:28:46 -07:00
self._ctx_stack = stack
return self
async def __aexit__(self, *excinfo):
return await self._ctx_stack.__aexit__(*excinfo)
async def mirror_posts(self):
spinner = loading_spinner()
2022-05-19 18:28:46 -07:00
outbox = await self.fetch_outbox(self.config['account'])
try:
with open(self.config['timestamp_path']) as f:
last_mirrored_at = pendulum.from_timestamp(float(f.read()))
2022-05-19 18:28:46 -07:00
except FileNotFoundError:
last_mirrored_at = pendulum.from_timestamp(0.0)
page_url = outbox['first']
posts = []
print('Fetching posts to mirror...', end=' ')
cursor.hide()
done = False
while not done:
2022-07-02 16:35:16 -07:00
async with self._rl_handler.request('GET', page_url) as resp: page = await resp.json()
try:
page_url = page['next']
except KeyError:
done = True
print(next(spinner), end='', flush=True)
2022-07-02 15:53:45 -07:00
for activity in page['orderedItems']:
if activity['type'] != 'Create':
# it's not a post, but a boost or like or something
continue
post = activity['object']
published_at = pendulum.parse(post['published'])
if published_at < last_mirrored_at:
done = True
break
posts.append(post)
print()
cursor.show()
if not posts:
2022-05-19 18:28:46 -07:00
print('Nothing to do')
return
print('Mirroring posts...', end=' ')
cursor.hide()
for post in reversed(posts): # oldest to newest
# we use for ... await instead of a task group in order to ensure order is preserved
# TODO mirror all attachments (from all posts) in parallel
await self._mirror_post(post)
print(next(spinner), end='', flush=True)
print()
cursor.show()
with open(self.config['timestamp_path'], 'w') as f:
f.write(str(pendulum.now('UTC').timestamp()))
async def _mirror_post(self, post):
attachments = [None] * len(post['attachment'])
2022-05-19 18:28:46 -07:00
async with anyio.create_task_group() as tg:
for i, attachment in enumerate(post['attachment']):
2022-05-19 18:28:46 -07:00
tg.start_soon(self._mirror_attachment, i, attachments, attachment)
2022-05-19 18:28:46 -07:00
assert None not in attachments
2022-05-19 18:28:46 -07:00
await self._fedi.post(
2022-07-02 15:53:55 -07:00
post['source'].replace('@', '@\u200b'),
cw=post['summary'],
2022-05-19 18:28:46 -07:00
visibility='unlisted',
media_ids=attachments,
)
async def _mirror_attachment(self, i, out_attachments, attachment):
async with self._http.get(attachment['url']) as resp:
data = await resp.read()
out_attachments[i] = (await self._fedi.post_media(
io.BytesIO(data),
attachment['mediaType'],
filename=attachment['name'],
# TODO support descriptions
))['id']
async def fetch_outbox(self, handle):
"""
finger handle, a fully-qualified ActivityPub actor name,
returning their outbox info
"""
# it's fucking incredible how overengineered ActivityPub is btw
print('Fingering ', handle, '...', sep='')
username, at, instance = handle.lstrip('@').partition('@')
assert at == '@'
# i was planning on doing /.well-known/host-meta to find the webfinger URL, but
# 1) honk does not support host-meta
# 2) WebFinger is always located at the same location anyway
profile_url = await self._finger_actor(username, instance)
try:
async with self._http.get(profile_url) as resp: profile = await resp.json()
except aiohttp.ContentTypeError:
# we didn't get JSON, so just guess the outbox URL
outbox_url = profile_url + '/outbox'
else:
outbox_url = profile['outbox']
async with self._http.get(outbox_url) as resp: outbox = await resp.json()
assert outbox['type'] == 'OrderedCollection'
return outbox
async def _finger_actor(self, username, instance):
# despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support
# HTTPS-over-onion
finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}'
async with self._http.get(finger_url) as resp: finger_result = await resp.json()
return (profile_url := self._parse_webfinger_result(username, instance, finger_result))
def _parse_webfinger_result(self, username, instance, finger_result):
"""given webfinger data, return profile URL for handle"""
def check_content_type(type, ct): return ct == type or ct.startswith(type+';')
check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE)
try:
# note: the server might decide to return multiple links
# so we need to decide how to prefer one.
# i'd put "and yarl.URL(template).host == instance" here,
# but some instances have no subdomain for the handle yet use a subdomain for the canonical URL.
# Additionally, an instance could theoretically serve profile pages over I2P and the clearnet,
# for example.
return (profile_url := next(
link['href']
for link in finger_result['links']
if link['rel'] == 'self' and check_ap(link['type'])
))
except StopIteration:
# this should never happen either
raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL')
async def amain():
parser = argparse.ArgumentParser(description='Mirror posts from another fediverse account')
parser.add_argument(
'-c', '--cfg', dest='cfg', default='config.toml', nargs='?',
help='Specify a custom location for the config file.'
)
args = parser.parse_args()
with open(args.cfg) as f:
config = toml.load(f)
async with PostMirror(config=config) as pm: await pm.mirror_posts()
def main():
try:
anyio.run(amain)
except KeyboardInterrupt:
cursor.show()
2022-05-19 18:28:46 -07:00
sys.exit(1)
if __name__ == '__main__':
main()