This repository has been archived on 2024-07-22. You can view files and clone it, but cannot push or open issues or pull requests.
mastodon-block-bot/opter_outer.py

73 lines
1.8 KiB
Python
Raw Normal View History

2023-02-06 00:20:34 -08:00
#!/usr/bin/env python
from pleroma import Pleroma
import qtoml as toml
import asyncpg
import anyio
with open('opter_outer.toml') as f:
config = toml.load(f)
async def main():
db = await asyncpg.connect(**config['db'])
hostname, access_token = await db.fetchrow('SELECT hostname, token FROM blockbot_config')
async with (
Pleroma(
api_base_url='https://' + hostname,
access_token=access_token,
) as pleroma,
anyio.create_task_group() as nursery,
):
async for notif in pleroma.stream_notifications():
2023-09-11 11:22:24 -07:00
handler = None
2023-02-06 00:20:34 -08:00
if 'opt out' in notif['status']['content']:
2023-09-11 11:22:24 -07:00
handler = opt_out
2023-02-06 00:20:34 -08:00
elif 'opt in' in notif['status']['content']:
2023-09-11 11:22:24 -07:00
handler = opt_in
2023-02-06 00:20:34 -08:00
elif 'status' in notif['status']['content']:
2023-09-11 11:22:24 -07:00
handler = status
2023-02-06 00:20:34 -08:00
elif 'help' in notif['status']['content']:
2023-09-11 11:22:24 -07:00
handler = help
if handler:
nursery.start_soon(handler, db, pleroma, notif['status'])
2023-02-06 00:20:34 -08:00
async def opt_out(db, pleroma, status):
await db.execute(
'INSERT INTO blockbot_opt_out (account_id) VALUES ($1)',
int(status['account']['id']),
)
await pleroma.reply(
status,
'You have successfully opted out. Your blocks will no longer be sent to the people you block.',
)
async def opt_in(db, pleroma, status):
await db.execute(
'DELETE FROM blockbot_opt_out WHERE account_id = $1',
int(status['account']['id']),
)
await pleroma.reply(
status,
'You have successfully opted in. Your blocks will now be sent to the people you block.'
)
async def status(db, pleroma, status):
if await db.fetchval(
'SELECT 1 FROM blockbot_opt_out WHERE account_id = $1',
int(status['account']['id']),
):
await pleroma.reply(status, 'You are opted out.')
else:
await pleroma.reply(status, 'You are opted in.')
async def help(db, pleroma, status):
await pleroma.reply(status, 'Available commands: opt in, opt out, status, and help.')
if __name__ == '__main__':
anyio.run(main)