mirror-bot/utils.py

66 lines
1.6 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-only
import anyio
import aiohttp
import platform
import itertools
import contextlib
from datetime import datetime, timezone
def http_session_factory(headers={}, **kwargs):
user_agent = (
'mirror-bot (https://lab.freak.university/KayFaraday/mirror-bot); '
f'aiohttp/{aiohttp.__version__}; '
f'{platform.python_implementation()}/{platform.python_version()}'
)
return aiohttp.ClientSession(
headers={'User-Agent': user_agent, **headers},
**kwargs,
)
async def sleep_until(dt):
await anyio.sleep((dt - datetime.now(timezone.utc)).total_seconds())
class HandleRateLimits:
def __init__(self, http):
self.http = http
def request(self, *args, **kwargs):
return _RateLimitContextManager(self.http, args, kwargs)
class _RateLimitContextManager(contextlib.AbstractAsyncContextManager):
def __init__(self, http, args, kwargs):
self.http = http
self.args = args
self.kwargs = kwargs
async def __aenter__(self):
self._request_cm = self.http.request(*self.args, **self.kwargs)
return await self._do_enter()
async def _do_enter(self):
resp = await self._request_cm.__aenter__()
if resp.headers.get('X-RateLimit-Remaining') not in {'0', '1'}:
return resp
await sleep_until(datetime.fromisoformat(resp.headers['X-RateLimit-Reset']))
await self._request_cm.__aexit__(*(None,)*3)
return await self.__aenter__()
async def __aexit__(self, *excinfo):
return await self._request_cm.__aexit__(*excinfo)
def loading_spinner():
return itertools.cycle('\b' + x for x in [
'',
'',
'',
'',
'',
'',
'',
'',
'',
'',
])