takahe/stator/runner.py

145 lines
5.2 KiB
Python

import asyncio
import datetime
import time
import traceback
import uuid
from django.utils import timezone
from core import exceptions, sentry
from core.models import Config
from stator.models import StatorModel
class StatorRunner:
"""
Runs tasks on models that are looking for state changes.
Designed to run either indefinitely, or just for a few seconds.
"""
def __init__(
self,
models: list[type[StatorModel]],
concurrency: int = 50,
concurrency_per_model: int = 10,
liveness_file: str | None = None,
schedule_interval: int = 30,
lock_expiry: int = 300,
run_for: int = 0,
):
self.models = models
self.runner_id = uuid.uuid4().hex
self.concurrency = concurrency
self.concurrency_per_model = concurrency_per_model
self.liveness_file = liveness_file
self.schedule_interval = schedule_interval
self.lock_expiry = lock_expiry
self.run_for = run_for
async def run(self):
sentry.set_takahe_app("stator")
self.handled = 0
self.started = time.monotonic()
self.last_clean = time.monotonic() - self.schedule_interval
self.tasks = []
# For the first time period, launch tasks
print("Running main task loop")
try:
with sentry.configure_scope() as scope:
while True:
# Do we need to do cleaning?
if (time.monotonic() - self.last_clean) >= self.schedule_interval:
# Refresh the config
Config.system = await Config.aload_system()
print(f"{self.handled} tasks processed so far")
print("Running cleaning and scheduling")
await self.run_scheduling()
# Clear the cleaning breadcrumbs/extra for the main part of the loop
sentry.scope_clear(scope)
self.remove_completed_tasks()
await self.fetch_and_process_tasks()
# Are we in limited run mode?
if (
self.run_for
and (time.monotonic() - self.started) > self.run_for
):
break
# Prevent busylooping
await asyncio.sleep(0.5)
# Clear the Sentry breadcrumbs and extra for next loop
sentry.scope_clear(scope)
except KeyboardInterrupt:
pass
# Wait for tasks to finish
print("Waiting for tasks to complete")
while True:
self.remove_completed_tasks()
if not self.tasks:
break
# Prevent busylooping
await asyncio.sleep(0.1)
print("Complete")
return self.handled
async def run_scheduling(self):
"""
Do any transition cleanup tasks
"""
with sentry.start_transaction(op="task", name="stator.run_scheduling"):
for model in self.models:
asyncio.create_task(model.atransition_clean_locks())
asyncio.create_task(model.atransition_schedule_due())
self.last_clean = time.monotonic()
async def fetch_and_process_tasks(self):
# Calculate space left for tasks
space_remaining = self.concurrency - len(self.tasks)
# Fetch new tasks
for model in self.models:
if space_remaining > 0:
for instance in await model.atransition_get_with_lock(
number=min(space_remaining, self.concurrency_per_model),
lock_expiry=(
timezone.now() + datetime.timedelta(seconds=self.lock_expiry)
),
):
self.tasks.append(
asyncio.create_task(self.run_transition(instance))
)
self.handled += 1
space_remaining -= 1
async def run_transition(self, instance: StatorModel):
"""
Wrapper for atransition_attempt with fallback error handling
"""
task_name = f"stator.run_transition:{instance._meta.label_lower}#{{id}} from {instance.state}"
with sentry.start_transaction(op="task", name=task_name):
sentry.set_context(
"instance",
{
"model": instance._meta.label_lower,
"pk": instance.pk,
"state": instance.state,
"state_age": instance.state_age,
},
)
try:
print(
f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
)
await instance.atransition_attempt()
except BaseException as e:
await exceptions.acapture_exception(e)
traceback.print_exc()
def remove_completed_tasks(self):
"""
Removes all completed asyncio.Tasks from our local in-progress list
"""
self.tasks = [t for t in self.tasks if not t.done()]