Compare commits

...

19 Commits

Author SHA1 Message Date
io 290bb33538 use external pleroma.py 2023-01-11 06:30:47 +00:00
lucdev d932256125 Rate limiter: fix datetime parser (fixes #5) 2023-01-11 06:28:57 +00:00
Joel Beckmeyer e2a18f8888 fix some pleroma errors with async and 500 errors (#4)
* fix some pleroma errors with async and 500 errors

* add better recovery/handling of HTTP 500

* remove unnecessary else
2023-01-11 06:28:54 +00:00
Kay Faraday f7cc00e443 fetch_posts.py: remove lots of dead code 2022-12-28 23:07:02 +00:00
Kay Faraday f58d85635a fetch_posts.py: support Wordpress ActivityPub plugin 2022-12-28 23:06:23 +00:00
Kay Faraday 0ca511e848 simplify gen.py logic a bit 2022-08-12 00:34:13 +00:00
Kay Faraday c7c2c6adcf gen.py: disable posting tracebacks
These were kind of doxy for some people and usually weren't helpful anyway.
2022-08-12 00:34:13 +00:00
Kay Faraday b6b96a8661 Merge pull request 'Reuse client and use client factory' (#3) from PeachyDelight/pleroma-ebooks:trunk into trunk
Reviewed-on: #3
2022-06-19 18:14:17 +00:00
PeachyDelight 970903c3c0 Reuse client and use client factory 2022-06-19 04:04:14 +02:00
Kay Faraday 5e50bccd53 deduplicate user-agent setting code 2022-06-19 02:02:32 +00:00
Kay Faraday bc5e8af32b Merge pull request 'add textsynth' (#2) from PeachyDelight/pleroma-ebooks:trunk into trunk
Reviewed-on: #2
2022-06-19 01:50:52 +00:00
PeachyDelight 879eb32b2d add textsynth 2022-06-19 03:32:34 +02:00
Kay Faraday b73526a895 simplify account fetch by removing the WebFinger step
Now we fetch the user's profile URL from our home server, skipping the need to WebFinger them
and also supporting instances that don't support http:// connections.
2022-06-15 19:53:43 +00:00
Kay Faraday 61e05de199 Fix limit_length 2022-05-24 04:45:55 +00:00
Kay Faraday ce6fcbd8b5 gen.py: update contact in error message 2022-02-05 21:42:38 +00:00
Kay Faraday e747e3c120 update User-Agent 2022-01-08 04:28:04 +00:00
Kay Faraday 94a8b53f28 strip access tokens for ease of use 2022-01-08 04:27:09 +00:00
Kay Faraday f709bed0e9 update defaults 2022-01-08 04:26:02 +00:00
Kay Faraday f6a17a25f7 fix for the prev commit 2022-01-01 22:49:40 +00:00
10 changed files with 198 additions and 304 deletions

View File

@ -1,7 +1,7 @@
{
"site": "https://botsin.space",
"site": "https://freak.university",
"cw": null,
"learn_from_cw": false,
"learn_from_cw": true,
"ignored_cws": [],
"mention_handling": 1,
"max_thread_length": 15,
@ -9,9 +9,11 @@
"limit_length": false,
"length_lower_limit": 5,
"length_upper_limit": 50,
"overlap_ratio_enabled": false,
"overlap_ratio_enabled": true,
"overlap_ratio": 0.7,
"generation_mode": "markov",
"access_token": "",
"db_path": "posts.db"
}
"db_path": "",
"textsynth_token": null,
"textsynth_engine_id": "gptneox_20B"
}

View File

@ -4,25 +4,18 @@
import sys
import anyio
import aiohttp
import platform
import pendulum
import operator
import aiosqlite
import contextlib
from yarl import URL
from pleroma import Pleroma
from pleroma import Pleroma, HandleRateLimits
from bs4 import BeautifulSoup
from functools import partial
from typing import Iterable, NewType
from utils import shield, HandleRateLimits, suppress
from utils import shield, suppress, http_session_factory
from third_party.utils import extract_post_content
USER_AGENT = (
'pleroma-ebooks; '
f'{aiohttp.__version__}; '
f'{platform.python_implementation()}/{platform.python_version()}'
)
UTC = pendulum.timezone('UTC')
JSON_CONTENT_TYPE = 'application/json'
ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
@ -40,11 +33,8 @@ class PostFetcher:
Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']),
)
self._http = await stack.enter_async_context(
aiohttp.ClientSession(
headers={
'User-Agent': USER_AGENT,
'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]),
},
http_session_factory(
headers={'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE])},
trust_env=True,
raise_for_status=True,
),
@ -75,32 +65,24 @@ class PostFetcher:
async def __aexit__(self, *excinfo):
return await self._ctx_stack.__aexit__(*excinfo)
# username@instance
AccountHandle = NewType('AccountHandle', str)
async def fetch_all(self):
"""fetch all following accounts, or an iterable of accounts if provided"""
await self._fedi.verify_credentials()
self._completed_accounts = {}
async with anyio.create_task_group() as tg:
for fqn in map(self.fqn, await self._fedi.following()):
tg.start_soon(self._do_account, fqn)
# XXX it's assumed that no more than one API page of people are being followed at one time
for account in await self._fedi.following():
profile_url = account['url']
tg.start_soon(self._do_account, profile_url)
def fqn(self, acc: dict):
try:
return acc['fqn']
except KeyError:
fqn = acc['acct']
if '@' in fqn: return fqn
return fqn + '@' + URL(self.config['site']).host
async def _do_account(self, acc: AccountHandle):
async def _do_account(self, profile_url: str):
async with anyio.create_task_group() as tg:
self._completed_accounts[acc] = done_ev = anyio.Event()
self._completed_accounts[profile_url] = done_ev = anyio.Event()
tx, rx = anyio.create_memory_object_stream()
async with rx, tx:
tg.start_soon(self._process_pages, rx, acc)
tg.start_soon(self._fetch_account, tx, acc)
tg.start_soon(self._process_pages, rx, profile_url)
tg.start_soon(self._fetch_account, tx, profile_url)
await done_ev.wait()
# processing is complete, so halt fetching.
# processing may complete before fetching if we get caught up on new posts.
@ -154,19 +136,19 @@ class PostFetcher:
# TODO figure out why i put shield here lol
@shield
async def _fetch_account(self, tx, account: AccountHandle):
done_ev = self._completed_accounts[account]
async def _fetch_account(self, tx, profile_url):
done_ev = self._completed_accounts[profile_url]
try:
outbox = await self.fetch_outbox(account)
outbox = await self.fetch_outbox(profile_url)
except Exception as exc:
import traceback
traceback.print_exception(type(exc), exc, exc.__traceback__)
done_ev.set()
self.erroneous_accounts.append(account)
self.erroneous_accounts.append(profile_url)
return
print(f'Fetching posts for {account}...')
print(f'Fetching posts for {profile_url}...')
next_page_url = outbox['first']
while True:
@ -189,19 +171,8 @@ class PostFetcher:
done_ev.set()
async def fetch_outbox(self, handle):
"""finger handle, a fully-qualified ActivityPub actor name, returning their outbox URL"""
# it's fucking incredible how overengineered ActivityPub is btw
print('Fingering ', handle, '...', sep='')
username, at, instance = handle.lstrip('@').partition('@')
assert at == '@'
# i was planning on doing /.well-known/host-meta to find the webfinger URL, but
# 1) honk does not support host-meta
# 2) WebFinger is always located at the same location anyway
profile_url = await self._finger_actor(username, instance)
async def fetch_outbox(self, profile_url):
"""fetch the first page of the outbox for the given ActivityPub profile URL"""
try:
async with self._http.get(profile_url) as resp: profile = await resp.json()
@ -212,39 +183,10 @@ class PostFetcher:
outbox_url = profile['outbox']
async with self._http.get(outbox_url) as resp: outbox = await resp.json()
assert outbox['type'] == 'OrderedCollection'
assert outbox['type'] in {'OrderedCollection', 'OrderedCollectionPage'}
return outbox
async def _finger_actor(self, username, instance):
# despite HTTP being a direct violation of the WebFinger spec, assume e.g. Tor instances do not support
# HTTPS-over-onion
finger_url = f'http://{instance}/.well-known/webfinger?resource=acct:{username}@{instance}'
async with self._http.get(finger_url) as resp: finger_result = await resp.json()
return (profile_url := self._parse_webfinger_result(username, instance, finger_result))
def _parse_webfinger_result(self, username, instance, finger_result):
"""given webfinger data, return profile URL for handle"""
def check_content_type(type, ct): return ct == type or ct.startswith(type+';')
check_ap = partial(check_content_type, ACTIVITYPUB_CONTENT_TYPE)
try:
# note: the server might decide to return multiple links
# so we need to decide how to prefer one.
# i'd put "and yarl.URL(template).host == instance" here,
# but some instances have no subdomain for the handle yet use a subdomain for the canonical URL.
# Additionally, an instance could theoretically serve profile pages over I2P and the clearnet,
# for example.
return (profile_url := next(
link['href']
for link in finger_result['links']
if link['rel'] == 'self' and check_ap(link['type'])
))
except StopIteration:
# this should never happen either
raise RuntimeError(f'fatal: while fingering {username}@{instance}, failed to find a profile URL')
async def amain():
import json5 as json
import third_party.utils as utils
args = utils.arg_parser_factory(description='Fetch posts from all followed accounts').parse_args()
config = utils.load_config(args.cfg)

20
gen.py
View File

@ -30,25 +30,17 @@ async def main():
toot = toot.replace("@", "@\u200b") # sanitize mentions
toot = utils.remove_mentions(cfg, toot)
if not args.simulate:
async with Pleroma(api_base_url=cfg['site'], access_token=cfg['access_token']) as pl:
try:
await pl.post(toot, visibility='unlisted', cw=cfg['cw'])
except Exception:
import traceback
toot = (
'An error occurred while submitting the generated post. '
'Contact io@csdisaster.club for assistance. Full traceback:\n\n'
+ traceback.format_exc()
)
await pl.post(toot, visibility='unlisted', cw='Error!')
raise
try:
print(toot)
except UnicodeEncodeError:
print(toot.encode("ascii", "ignore")) # encode as ASCII, dropping any non-ASCII characters
if args.simulate:
return
async with Pleroma(api_base_url=cfg['site'], access_token=cfg['access_token']) as pl:
await pl.post(toot, visibility='unlisted', cw=cfg['cw'])
if __name__ == '__main__':
import anyio
anyio.run(main)

View File

@ -2,6 +2,7 @@
import sqlite3
import markovify
from random import randint
def make_sentence(cfg):
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences

123
generators/textsynth.py Normal file
View File

@ -0,0 +1,123 @@
# SPDX-License-Identifier: MPL-2.0
import sqlite3
from random import expovariate
import typing
import aiohttp
from utils import http_session_factory
async def make_sentence(cfg):
# set default
if not cfg["textsynth_engine_id"]:
cfg["textsynth_engine_id"] = "gptneox_20B"
if not cfg["textsynth_token"]:
raise ValueError("textsynth_token is not set, create an account at textsynth.com")
db = sqlite3.connect(cfg["db_path"])
db.text_factory = str
c = db.cursor()
if cfg['learn_from_cw']:
ignored_cws_query_params = "(" + ",".join("?" * len(cfg["ignored_cws"])) + ")"
toots = c.execute(
f"""
SELECT content
FROM posts
WHERE
summary IS NULL
OR summary NOT IN {ignored_cws_query_params}
ORDER BY POST_ID DESC
""",
cfg["ignored_cws"],
).fetchall()
else:
toots = c.execute(
"""
SELECT content
FROM posts
WHERE summary IS NULL
ORDER POST_ID DESC
LIMIT 10000
""",
).fetchall()
if not toots:
raise ValueError("Database is empty! Try running main.py.")
# get first element of tuple
toots: list[str] = [toot[0] for toot in toots]
# replace stuff
toots = [toot.translate(str.maketrans({
ord('\n'): "\\n",
})) for toot in toots]
new_toots = []
def sum_of_chars(list_of_strings: list[str]) -> int:
return sum(len(string) + 1 for string in list_of_strings) # +1 for \n
while sum_of_chars(new_toots) < 8192:
index = expovariate(1 / 10) % len(toots) # more likely to pick newer posts but older ones are also sometimes picked
# round index to nearest int
index = int(index)
# remove toot from list
toot = toots.pop(index)
# add toot to new list
new_toots.append(toot)
toots = new_toots
# concatenate toots
toots = "\n".join(toots)
# truncate to last 8192 characters
toots = toots[-8192:]
# raise ValueError("toots: " + toots)
# POST https://api.textsynth.com/v1/engines/{engine_id}/completions
# using aiohttp
post = None
async with http_session_factory() as session:
while post is None:
async with session.post(
"https://api.textsynth.com/v1/engines/{}/completions".format(cfg["textsynth_engine_id"]),
headers={
"Authorization": "Bearer {}".format(cfg["textsynth_token"]),
},
json={
"prompt": toots + "\n",
"stop": "\n",
"max_tokens": 200
},
) as resp:
if resp.status != 200:
raise ValueError("TextSynth API returned status code {}".format(resp.status))
data = await resp.json()
if not data["text"]:
# raise ValueError("TextSynth API returned empty text")
# just generate a new sentence
continue
post: str = data["text"]
# check wether the post only consists of mentions
# split by words
words = post.split()
# check if all words are mentions
if all(word.startswith("@") for word in words):
# generate a new sentence
post = None
continue
db.close()
# replace stuff in post
post = post.replace("\\n", "\n")
# and post it
return post

View File

@ -1,168 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-only
import sys
import yarl
import json
import hashlib
import aiohttp
from http import HTTPStatus
def http_session_factory(headers={}):
py_version = '.'.join(map(str, sys.version_info))
user_agent = (
'pleroma-ebooks (https://github.com/ioistired/pleroma-ebooks); '
'aiohttp/{aiohttp.__version__}; '
'python/{py_version}'
)
return aiohttp.ClientSession(
headers={'User-Agent': user_agent, **headers},
)
class BadRequest(Exception):
pass
class LoginFailed(Exception):
pass
class Pleroma:
def __init__(self, *, api_base_url, access_token):
self.api_base_url = api_base_url.rstrip('/')
self.access_token = access_token
self._session = http_session_factory({'Authorization': 'Bearer ' + access_token})
self._logged_in_id = None
async def __aenter__(self):
self._session = await self._session.__aenter__()
return self
async def __aexit__(self, *excinfo):
return await self._session.__aexit__(*excinfo)
async def request(self, method, path, **kwargs):
# blocklist of some horrible instances
if hashlib.sha256(
yarl.URL(self.api_base_url).host.encode()
+ bytes.fromhex('d590e3c48d599db6776e89dfc8ebaf53c8cd84866a76305049d8d8c5d4126ce1')
).hexdigest() in {
'56704d4d95b882e81c8e7765e9079be0afc4e353925ba9add8fd65976f52db83',
'1932431fa41a0baaccce7815115b01e40e0237035bb155713712075b887f5a19',
'a42191105a9f3514a1d5131969c07a95e06d0fdf0058f18e478823bf299881c9',
}:
raise RuntimeError('stop being a chud')
async with self._session.request(method, self.api_base_url + path, **kwargs) as resp:
if resp.status == HTTPStatus.BAD_REQUEST:
raise BadRequest((await resp.json())['error'])
#resp.raise_for_status()
return await resp.json()
async def verify_credentials(self):
return await self.request('GET', '/api/v1/accounts/verify_credentials')
me = verify_credentials
async def _get_logged_in_id(self):
if self._logged_in_id is not None:
return self._logged_in_id
me = await self.me()
try:
self._logged_in_id = me['id']
except KeyError:
raise LoginFailed(me)
async def following(self, account_id=None):
account_id = account_id or await self._get_logged_in_id()
return await self.request('GET', f'/api/v1/accounts/{account_id}/following')
@staticmethod
def _unpack_id(obj):
if isinstance(obj, dict) and 'id' in obj:
return obj['id']
return obj
async def status_context(self, id):
id = self._unpack_id(id)
return await self.request('GET', f'/api/v1/statuses/{id}/context')
async def post(self, content, *, in_reply_to_id=None, cw=None, visibility=None):
if visibility not in {None, 'private', 'public', 'unlisted', 'direct'}:
raise ValueError('invalid visibility', visibility)
data = dict(status=content)
if in_reply_to_id := self._unpack_id(in_reply_to_id):
data['in_reply_to_id'] = in_reply_to_id
if visibility is not None:
data['visibility'] = visibility
# normally, this would be a check against None.
# however, apparently Pleroma serializes posts without CWs as posts with an empty string
# as a CW, so per the robustness principle we'll accept that too.
if cw:
data['spoiler_text'] = cw
return await self.request('POST', '/api/v1/statuses', data=data)
async def reply(self, to_status, content, *, cw=None):
user_id = await self._get_logged_in_id()
mentioned_accounts = {}
mentioned_accounts[to_status['account']['id']] = to_status['account']['acct']
for account in to_status['mentions']:
if account['id'] != user_id and account['id'] not in mentioned_accounts:
mentioned_accounts[account['id']] = account['acct']
content = ''.join('@' + x + ' ' for x in mentioned_accounts.values()) + content
visibility = 'unlisted' if to_status['visibility'] == 'public' else to_status['visibility']
if not cw and 'spoiler_text' in to_status and to_status['spoiler_text']:
cw = 're: ' + to_status['spoiler_text']
return await self.post(content, in_reply_to_id=to_status['id'], cw=cw, visibility=visibility)
async def favorite(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/favourite')
async def unfavorite(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/unfavourite')
async def react(self, id, reaction):
id = self._unpack_id(id)
return await self.request('PUT', f'/api/v1/pleroma/statuses/{id}/reactions/{reaction}')
async def remove_reaction(self, id, reaction):
id = self._unpack_id(id)
return await self.request('DELETE', f'/api/v1/pleroma/statuses/{id}/reactions/{reaction}')
async def pin(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/pin')
async def unpin(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/unpin')
async def stream(self, stream_name, *, target_event_type=None):
async with self._session.ws_connect(
self.api_base_url + f'/api/v1/streaming?stream={stream_name}&access_token={self.access_token}'
) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
event = msg.json()
# the only event type that doesn't define `payload` is `filters_changed`
if event['event'] == 'filters_changed':
yield event
elif target_event_type is None or event['event'] == target_event_type:
# don't ask me why the payload is also JSON encoded smh
yield json.loads(event['payload'])
async def stream_notifications(self):
async for notif in self.stream('user:notification', target_event_type='notification'):
yield notif
async def stream_mentions(self):
async for notif in self.stream_notifications():
if notif['type'] == 'mention':
yield notif

View File

@ -22,10 +22,22 @@ class ReplyBot:
async for notification in self.pleroma.stream_mentions():
await self.process_notification(notification)
async def process_notification(self, notification):
async def process_notification(self, notification, retry_count=0):
acct = "@" + notification['account']['acct'] # get the account's @
post_id = notification['status']['id']
context = await self.pleroma.status_context(post_id)
# catch HTTP 500 and backoff on requests
retry_count = retry_count + 1
try:
context = await self.pleroma.status_context(post_id)
except pleroma.BadResponse as exc:
if retry_count < 3:
await anyio.sleep(2**retry_count)
await self.process_notification(notification, retry_count)
else:
# failed too many times in a row, logging
print(f"Received HTTP 500 {retry_count} times in a row, aborting reply attempt.")
return
# check if we've already been participating in this thread
if self.check_thread_length(context):
@ -69,12 +81,12 @@ class ReplyBot:
await self.pleroma.react(post_id, '')
async def reply(self, notification):
toot = utils.make_toot(self.cfg) # generate a toot
toot = await utils.make_post(self.cfg) # generate a toot
await self.pleroma.reply(notification['status'], toot, cw=self.cfg['cw'])
@staticmethod
def extract_toot(toot):
text = utils.extract_toot(toot)
text = utils.extract_post_content(toot)
text = re.sub(r"^@\S+\s", r"", text) # remove the initial mention
text = text.lower() # treat text as lowercase for easier keyword matching (if this bot uses it)
return text

View File

@ -1,4 +1,5 @@
beautifulsoup4 ~= 4.9
pleroma.py ~= 0.0.1
aiohttp ~= 3.0
json5 ~= 0.9.5
anyio ~= 3.0

12
third_party/utils.py vendored
View File

@ -7,18 +7,19 @@ import html
import enum
import json
import shutil
import inspect
import sqlite3
import argparse
import itertools
import json5 as json
import multiprocessing
import anyio.to_process
from random import randint
from bs4 import BeautifulSoup
TextGenerationMode = enum.Enum('TextGenerationMode', """
markov
gpt_2
textsynth
""".split())
def arg_parser_factory(*, description):
@ -68,8 +69,15 @@ async def make_post(cfg, *, mode=TextGenerationMode.markov):
from generators.markov import make_sentence
elif mode is TextGenerationMode.gpt_2:
from generators.gpt_2 import make_sentence
elif mode is TextGenerationMode.textsynth:
from generators.textsynth import make_sentence
return await anyio.to_process.run_sync(make_sentence, cfg)
# return await anyio.to_process.run_sync(make_sentence, cfg)
# check if inspect.iscoroutinefunction(object)
if inspect.iscoroutinefunction(make_sentence):
return await make_sentence(cfg)
else:
return await anyio.to_process.run_sync(make_sentence, cfg)
def extract_post_content(text):
soup = BeautifulSoup(text, "html.parser")

View File

@ -1,9 +1,22 @@
# SPDX-License-Identifier: AGPL-3.0-only
import sys
import anyio
import aiohttp
import platform
import contextlib
from functools import wraps
from datetime import datetime, timezone
def http_session_factory(headers={}, **kwargs):
user_agent = (
'pleroma-ebooks (https://lab.freak.university/KayFaraday/pleroma-ebooks); '
f'aiohttp/{aiohttp.__version__}; '
f'{platform.python_implementation()}/{platform.python_version()}'
)
return aiohttp.ClientSession(
headers={'User-Agent': user_agent, **headers},
**kwargs,
)
def as_corofunc(f):
@wraps(f)
@ -34,35 +47,3 @@ def removeprefix(s, prefix):
except AttributeError:
# compatibility for pre-3.9
return s[len(prefix):] if s.startswith(prefix) else s
async def sleep_until(dt):
await anyio.sleep((dt - datetime.now(timezone.utc)).total_seconds())
class HandleRateLimits:
def __init__(self, http):
self.http = http
def request(self, *args, **kwargs):
return _RateLimitContextManager(self.http, args, kwargs)
class _RateLimitContextManager(contextlib.AbstractAsyncContextManager):
def __init__(self, http, args, kwargs):
self.http = http
self.args = args
self.kwargs = kwargs
async def __aenter__(self):
self._request_cm = self.http.request(*self.args, **self.kwargs)
return await self._do_enter()
async def _do_enter(self):
resp = await self._request_cm.__aenter__()
if resp.headers.get('X-RateLimit-Remaining') not in {'0', '1'}:
return resp
await sleep_until(datetime.fromisoformat(resp.headers['X-RateLimit-Reset']))
await self._request_cm.__aexit__(*(None,)*3)
return await self.__aenter__()
async def __aexit__(self, *excinfo):
return await self._request_cm.__aexit__(*excinfo)