From b2768e7f2e380af2bbe83e336b5571c3d5833535 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Mon, 15 May 2023 11:33:31 -0600 Subject: [PATCH] Improve stator's performance in larger installs --- stator/models.py | 17 +++++++++++------ stator/runner.py | 4 ++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/stator/models.py b/stator/models.py index ddb8547..aacaddb 100644 --- a/stator/models.py +++ b/stator/models.py @@ -81,6 +81,8 @@ class StatorModel(models.Model): concrete model yourself. """ + SCHEDULE_BATCH_SIZE = 1000 + state: StateField # If this row is up for transition attempts (which it always is on creation!) @@ -141,7 +143,8 @@ class StatorModel(models.Model): ), state=state.name, ) - await cls.objects.filter(q).aupdate(state_ready=True) + select_query = cls.objects.filter(q)[: cls.SCHEDULE_BATCH_SIZE] + await cls.objects.filter(pk__in=select_query).aupdate(state_ready=True) @classmethod async def atransition_delete_due(cls, now=None): @@ -153,12 +156,13 @@ class StatorModel(models.Model): for state in cls.state_graph.states.values(): state = cast(State, state) if state.delete_after: - await cls.objects.filter( + select_query = cls.objects.filter( state=state, state_changed__lte=( now - datetime.timedelta(seconds=state.delete_after) ), - ).adelete() + )[: cls.SCHEDULE_BATCH_SIZE] + await cls.objects.filter(pk__in=select_query).adelete() @classmethod def transition_get_with_lock( @@ -199,9 +203,10 @@ class StatorModel(models.Model): @classmethod async def atransition_clean_locks(cls): - await cls.objects.filter(state_locked_until__lte=timezone.now()).aupdate( - state_locked_until=None - ) + select_query = cls.objects.filter(state_locked_until__lte=timezone.now())[ + : cls.SCHEDULE_BATCH_SIZE + ] + await cls.objects.filter(pk__in=select_query).aupdate(state_locked_until=None) def transition_schedule(self): """ diff --git a/stator/runner.py b/stator/runner.py index 758d09b..278cfca 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -169,9 +169,13 @@ class StatorRunner: print("No tasks handled since last flush.") with sentry.start_transaction(op="task", name="stator.run_scheduling"): for model in self.models: + print(f"Scheduling {model._meta.label_lower}") await self.submit_stats(model) + print(" Cleaning locks") await model.atransition_clean_locks() + print(" Scheduling due items") await model.atransition_schedule_due() + print(" Deleting due items") await model.atransition_delete_due() async def submit_stats(self, model):