mirror all posts since last ran (fixes #1)

This commit is contained in:
io 2022-06-28 04:45:22 +00:00
parent da515baf0c
commit 40e3867f82
3 changed files with 72 additions and 21 deletions

View File

@ -4,6 +4,7 @@
import io
import sys
import anyio
import cursor
import aiohttp
import pleroma
import argparse
@ -12,7 +13,7 @@ import pendulum
import aiosqlite
import contextlib
import qtoml as toml
from utils import suppress
from utils import suppress, loading_spinner
from pleroma import Pleroma
from functools import partial
@ -26,8 +27,6 @@ UTC = pendulum.timezone('UTC')
JSON_CONTENT_TYPE = 'application/json'
ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
MIGRATION_VERSION = 1
class PostMirror:
def __init__(self, *, config):
self.config = config
@ -54,38 +53,73 @@ class PostMirror:
return await self._ctx_stack.__aexit__(*excinfo)
async def mirror_posts(self):
spinner = loading_spinner()
outbox = await self.fetch_outbox(self.config['account'])
async with self._http.get(outbox['first']) as resp: page = await resp.json()
last_post = page['orderedItems'][0]['object']
try:
with open(self.config['timestamp_path']) as f:
last_mirrored_ts = pendulum.from_timestamp(float(f.read()))
last_mirrored_at = pendulum.from_timestamp(float(f.read()))
except FileNotFoundError:
last_mirrored_ts = pendulum.from_timestamp(0.0)
last_mirrored_at = pendulum.from_timestamp(0.0)
last_post_ts = pendulum.parse(last_post['published'])
page_url = outbox['first']
posts = []
print('Fetching posts to mirror...', end=' ')
cursor.hide()
done = False
while not done:
async with self._http.get(page_url) as resp: page = await resp.json()
try:
page_url = page['next']
except KeyError:
done = True
if last_post_ts < last_mirrored_ts:
print(next(spinner), end='', flush=True)
for item in page['orderedItems']:
post = item['object']
published_at = pendulum.parse(post['published'])
if published_at < last_mirrored_at:
done = True
break
posts.append(post)
print()
cursor.show()
if not posts:
print('Nothing to do')
return
# mirror the post and all its attachments
attachments = [None] * len(last_post['attachment'])
async with anyio.create_task_group() as tg:
for i, attachment in enumerate(last_post['attachment']):
tg.start_soon(self._mirror_attachment, i, attachments, attachment)
assert None not in attachments
await self._fedi.post(
last_post['source'],
cw=last_post['summary'],
visibility='unlisted',
media_ids=attachments,
)
print('Mirroring posts...', end=' ')
cursor.hide()
for post in reversed(posts): # oldest to newest
# we use for ... await instead of a task group in order to ensure order is preserved
# TODO mirror all attachments (from all posts) in parallel
await self._mirror_post(post)
print(next(spinner), end='', flush=True)
print()
cursor.show()
with open(self.config['timestamp_path'], 'w') as f:
f.write(str(pendulum.now('UTC').timestamp()))
async def _mirror_post(self, post):
attachments = [None] * len(post['attachment'])
async with anyio.create_task_group() as tg:
for i, attachment in enumerate(post['attachment']):
tg.start_soon(self._mirror_attachment, i, attachments, attachment)
assert None not in attachments
await self._fedi.post(
post['source'],
cw=post['summary'],
visibility='unlisted',
media_ids=attachments,
)
async def _mirror_attachment(self, i, out_attachments, attachment):
async with self._http.get(attachment['url']) as resp:
data = await resp.read()
@ -168,6 +202,7 @@ def main():
try:
anyio.run(amain)
except KeyboardInterrupt:
cursor.show()
sys.exit(1)
if __name__ == '__main__':

View File

@ -3,3 +3,4 @@ qtoml ~= 0.3.1
anyio ~= 3.0
aiosqlite ~= 0.17.0
pendulum ~= 2.0
cursor ~= 1.3

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
import anyio
import itertools
import contextlib
from functools import wraps
@ -19,3 +20,17 @@ def as_async_cm(cls):
return wrapped
suppress = as_async_cm(contextlib.suppress)
def loading_spinner():
return itertools.cycle('\b' + x for x in [
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
])