This repository has been archived on 2024-07-22. You can view files and clone it, but cannot push or open issues or pull requests.
mastodon-fingerd/fingerd.py

85 lines
2.0 KiB
Python
Executable File

#!/usr/bin/env python
import os
import asyncio
import asyncpg
import aiohttp
import qtoml as toml
from contextvars import ContextVar
FINGER_PORT = 79
MASTODON_USERNAME_MAX_LENGTH = 30
db = ContextVar('db')
with open('fingerd.toml') as f:
config = toml.load(f)
async def handle_finger(reader, writer):
try:
data = await reader.read(
len('@') * 2
+ MASTODON_USERNAME_MAX_LENGTH
+ len(config['instance_name'])
)
handle = data.decode().rstrip()
if not handle:
# `finger @instance.example`
await handle_finger_instance(reader, writer)
return
username = handle.removeprefix('@').removesuffix('@' + config['instance_name'])
if '@' in username:
writer.write(b'Error: user must be local to this instance\n')
return
resp = await db.get().fetchval(
'SELECT note FROM accounts WHERE username = $1 AND domain IS NULL',
username,
)
if resp is None:
writer.write(b'Error: user not found\n')
return
writer.write(resp.encode())
writer.write(b'\n')
finally:
await writer.drain()
writer.close()
await writer.wait_closed()
async def handle_finger_instance(reader, writer):
async with http.get(f'https://{config["instance_name"]}/nodeinfo/2.0') as resp:
data = (await resp.json())['usage']
user_data = data['users']
resp = (
f'Total users: {user_data["total"]:,}\n'
f'Active users (past month): {user_data["activeMonth"]:,}\n'
f'Active users (past six months): {user_data["activeHalfyear"]:,}\n'
f'Total posts: {data["localPosts"]:,}\n'
).encode()
writer.write(resp)
async def main():
global http
db.set(await asyncpg.create_pool(**config['db']))
async with (
# globals used here only because ContextVar wasn't working
aiohttp.ClientSession() as http,
await asyncio.start_server(
handle_finger,
'0.0.0.0',
os.getenv('PORT', FINGER_PORT),
) as server,
):
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())