This repository has been archived on 2024-07-22. You can view files and clone it, but cannot push or open issues or pull requests.
mastodon-block-bot/opter_outer.py

69 lines
1.9 KiB
Python
Raw Normal View History

2023-02-06 00:20:34 -08:00
#!/usr/bin/env python
from pleroma import Pleroma
import qtoml as toml
import asyncpg
import anyio
with open('opter_outer.toml') as f:
config = toml.load(f)
async def main():
db = await asyncpg.connect(**config['db'])
hostname, access_token = await db.fetchrow('SELECT hostname, token FROM blockbot_config')
async with (
Pleroma(
api_base_url='https://' + hostname,
access_token=access_token,
) as pleroma,
anyio.create_task_group() as nursery,
):
async for notif in pleroma.stream_notifications():
if 'opt out' in notif['status']['content']:
nursery.start_soon(opt_out, db, pleroma, notif['status'])
elif 'opt in' in notif['status']['content']:
nursery.start_soon(opt_in, db, pleroma, notif['status'])
elif 'status' in notif['status']['content']:
nursery.start_soon(status, db, pleroma, notif['status'])
elif 'help' in notif['status']['content']:
nursery.start_soon(help, db, pleroma, notif['status'])
async def opt_out(db, pleroma, status):
await db.execute(
'INSERT INTO blockbot_opt_out (account_id) VALUES ($1)',
int(status['account']['id']),
)
await pleroma.reply(
status,
'You have successfully opted out. Your blocks will no longer be sent to the people you block.',
)
async def opt_in(db, pleroma, status):
await db.execute(
'DELETE FROM blockbot_opt_out WHERE account_id = $1',
int(status['account']['id']),
)
await pleroma.reply(
status,
'You have successfully opted in. Your blocks will now be sent to the people you block.'
)
async def status(db, pleroma, status):
if await db.fetchval(
'SELECT 1 FROM blockbot_opt_out WHERE account_id = $1',
int(status['account']['id']),
):
await pleroma.reply(status, 'You are opted out.')
else:
await pleroma.reply(status, 'You are opted in.')
async def help(db, pleroma, status):
await pleroma.reply(status, 'Available commands: opt in, opt out, status, and help.')
if __name__ == '__main__':
anyio.run(main)