Identity fanout (#196)
This commit is contained in:
parent
bbe60202e7
commit
ed83726247
|
@ -262,6 +262,8 @@ class Migration(migrations.Migration):
|
||||||
("post_deleted", "Post Deleted"),
|
("post_deleted", "Post Deleted"),
|
||||||
("interaction", "Interaction"),
|
("interaction", "Interaction"),
|
||||||
("undo_interaction", "Undo Interaction"),
|
("undo_interaction", "Undo Interaction"),
|
||||||
|
("identity_edited", "Identity Edited"),
|
||||||
|
("identity_deleted", "Identity Deleted"),
|
||||||
],
|
],
|
||||||
max_length=100,
|
max_length=100,
|
||||||
),
|
),
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
# Generated by Django 4.1.4 on 2022-12-18 00:20
|
||||||
|
|
||||||
|
import django.db.models.deletion
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
("users", "0005_report"),
|
||||||
|
("activities", "0005_post_type_timeline_urls"),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AddField(
|
||||||
|
model_name="fanout",
|
||||||
|
name="subject_identity",
|
||||||
|
field=models.ForeignKey(
|
||||||
|
blank=True,
|
||||||
|
null=True,
|
||||||
|
on_delete=django.db.models.deletion.CASCADE,
|
||||||
|
related_name="subject_fan_outs",
|
||||||
|
to="users.identity",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
|
@ -159,6 +159,36 @@ class FanOutStates(StateGraph):
|
||||||
except httpx.RequestError:
|
except httpx.RequestError:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Handle sending identity edited to remote
|
||||||
|
case (FanOut.Types.identity_edited, False):
|
||||||
|
identity = await fan_out.subject_identity.afetch_full()
|
||||||
|
try:
|
||||||
|
await identity.signed_request(
|
||||||
|
method="post",
|
||||||
|
uri=fan_out.identity.inbox_uri,
|
||||||
|
body=canonicalise(fan_out.subject_identity.to_update_ap()),
|
||||||
|
)
|
||||||
|
except httpx.RequestError:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Handle sending identity deleted to remote
|
||||||
|
case (FanOut.Types.identity_deleted, False):
|
||||||
|
identity = await fan_out.subject_identity.afetch_full()
|
||||||
|
try:
|
||||||
|
await identity.signed_request(
|
||||||
|
method="post",
|
||||||
|
uri=fan_out.identity.inbox_uri,
|
||||||
|
body=canonicalise(fan_out.subject_identity.to_delete_ap()),
|
||||||
|
)
|
||||||
|
except httpx.RequestError:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Sending identity edited/deleted to local is a no-op
|
||||||
|
case (FanOut.Types.identity_edited, True):
|
||||||
|
pass
|
||||||
|
case (FanOut.Types.identity_deleted, True):
|
||||||
|
pass
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Cannot fan out with type {fan_out.type} local={fan_out.identity.local}"
|
f"Cannot fan out with type {fan_out.type} local={fan_out.identity.local}"
|
||||||
|
@ -178,6 +208,8 @@ class FanOut(StatorModel):
|
||||||
post_deleted = "post_deleted"
|
post_deleted = "post_deleted"
|
||||||
interaction = "interaction"
|
interaction = "interaction"
|
||||||
undo_interaction = "undo_interaction"
|
undo_interaction = "undo_interaction"
|
||||||
|
identity_edited = "identity_edited"
|
||||||
|
identity_deleted = "identity_deleted"
|
||||||
|
|
||||||
state = StateField(FanOutStates)
|
state = StateField(FanOutStates)
|
||||||
|
|
||||||
|
@ -206,6 +238,13 @@ class FanOut(StatorModel):
|
||||||
null=True,
|
null=True,
|
||||||
related_name="fan_outs",
|
related_name="fan_outs",
|
||||||
)
|
)
|
||||||
|
subject_identity = models.ForeignKey(
|
||||||
|
"users.Identity",
|
||||||
|
on_delete=models.CASCADE,
|
||||||
|
blank=True,
|
||||||
|
null=True,
|
||||||
|
related_name="subject_fan_outs",
|
||||||
|
)
|
||||||
|
|
||||||
created = models.DateTimeField(auto_now_add=True)
|
created = models.DateTimeField(auto_now_add=True)
|
||||||
updated = models.DateTimeField(auto_now=True)
|
updated = models.DateTimeField(auto_now=True)
|
||||||
|
@ -221,6 +260,8 @@ class FanOut(StatorModel):
|
||||||
"identity",
|
"identity",
|
||||||
"subject_post",
|
"subject_post",
|
||||||
"subject_post_interaction",
|
"subject_post_interaction",
|
||||||
|
"subject_identity",
|
||||||
|
"subject_identity__domain",
|
||||||
)
|
)
|
||||||
.prefetch_related(
|
.prefetch_related(
|
||||||
"subject_post__emojis",
|
"subject_post__emojis",
|
||||||
|
|
|
@ -231,7 +231,13 @@ class Migration(migrations.Migration):
|
||||||
(
|
(
|
||||||
"state",
|
"state",
|
||||||
stator.models.StateField(
|
stator.models.StateField(
|
||||||
choices=[("outdated", "outdated"), ("updated", "updated")],
|
choices=[
|
||||||
|
("outdated", "outdated"),
|
||||||
|
("updated", "updated"),
|
||||||
|
("edited", "edited"),
|
||||||
|
("deleted", "deleted"),
|
||||||
|
("deleted_fanned_out", "deleted_fanned_out"),
|
||||||
|
],
|
||||||
default="outdated",
|
default="outdated",
|
||||||
graph=users.models.identity.IdentityStates,
|
graph=users.models.identity.IdentityStates,
|
||||||
max_length=100,
|
max_length=100,
|
||||||
|
|
|
@ -30,19 +30,76 @@ from users.models.system_actor import SystemActor
|
||||||
|
|
||||||
class IdentityStates(StateGraph):
|
class IdentityStates(StateGraph):
|
||||||
"""
|
"""
|
||||||
There are only two states in a cycle.
|
|
||||||
Identities sit in "updated" for up to system.identity_max_age, and then
|
Identities sit in "updated" for up to system.identity_max_age, and then
|
||||||
go back to "outdated" for refetching.
|
go back to "outdated" for refetching.
|
||||||
|
|
||||||
|
When a local identity is "edited" or "deleted", it will fanout the change to
|
||||||
|
all followers and transition to "updated"
|
||||||
"""
|
"""
|
||||||
|
|
||||||
outdated = State(try_interval=3600, force_initial=True)
|
outdated = State(try_interval=3600, force_initial=True)
|
||||||
updated = State(try_interval=86400 * 7, attempt_immediately=False)
|
updated = State(try_interval=86400 * 7, attempt_immediately=False)
|
||||||
|
|
||||||
|
edited = State(try_interval=300, attempt_immediately=True)
|
||||||
|
deleted = State(try_interval=300, attempt_immediately=True)
|
||||||
|
deleted_fanned_out = State(externally_progressed=True)
|
||||||
|
|
||||||
|
deleted.transitions_to(deleted_fanned_out)
|
||||||
|
|
||||||
|
edited.transitions_to(updated)
|
||||||
|
updated.transitions_to(edited)
|
||||||
|
edited.transitions_to(deleted)
|
||||||
|
|
||||||
outdated.transitions_to(updated)
|
outdated.transitions_to(updated)
|
||||||
updated.transitions_to(outdated)
|
updated.transitions_to(outdated)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def targets_fan_out(cls, identity: "Identity", type_: str) -> None:
|
||||||
|
from activities.models import FanOut
|
||||||
|
from users.models import Follow
|
||||||
|
|
||||||
|
# Fan out to each target
|
||||||
|
shared_inboxes = set()
|
||||||
|
async for follower in Follow.objects.select_related("source", "target").filter(
|
||||||
|
target=identity
|
||||||
|
):
|
||||||
|
# Dedupe shared_inbox_uri
|
||||||
|
shared_uri = follower.source.shared_inbox_uri
|
||||||
|
if shared_uri and shared_uri in shared_inboxes:
|
||||||
|
continue
|
||||||
|
|
||||||
|
await FanOut.objects.acreate(
|
||||||
|
identity=follower.source,
|
||||||
|
type=type_,
|
||||||
|
subject_identity=identity,
|
||||||
|
)
|
||||||
|
shared_inboxes.add(shared_uri)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def handle_edited(cls, instance: "Identity"):
|
||||||
|
from activities.models import FanOut
|
||||||
|
|
||||||
|
if not instance.local:
|
||||||
|
return cls.updated
|
||||||
|
|
||||||
|
identity = await instance.afetch_full()
|
||||||
|
await cls.targets_fan_out(identity, FanOut.Types.identity_edited)
|
||||||
|
return cls.updated
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def handle_deleted(cls, instance: "Identity"):
|
||||||
|
from activities.models import FanOut
|
||||||
|
|
||||||
|
if not instance.local:
|
||||||
|
return cls.updated
|
||||||
|
|
||||||
|
identity = await instance.afetch_full()
|
||||||
|
await cls.targets_fan_out(identity, FanOut.Types.identity_deleted)
|
||||||
|
return cls.deleted_fanned_out
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def handle_outdated(cls, identity: "Identity"):
|
async def handle_outdated(cls, identity: "Identity"):
|
||||||
|
|
||||||
# Local identities never need fetching
|
# Local identities never need fetching
|
||||||
if identity.local:
|
if identity.local:
|
||||||
return cls.updated
|
return cls.updated
|
||||||
|
@ -56,6 +113,22 @@ class IdentityStates(StateGraph):
|
||||||
return cls.outdated
|
return cls.outdated
|
||||||
|
|
||||||
|
|
||||||
|
class IdentityQuerySet(models.QuerySet):
|
||||||
|
def not_deleted(self):
|
||||||
|
query = self.exclude(
|
||||||
|
state__in=[IdentityStates.deleted, IdentityStates.deleted_fanned_out]
|
||||||
|
)
|
||||||
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
class IdentityManager(models.Manager):
|
||||||
|
def get_queryset(self):
|
||||||
|
return IdentityQuerySet(self.model, using=self._db)
|
||||||
|
|
||||||
|
def not_deleted(self):
|
||||||
|
return self.get_queryset().not_deleted()
|
||||||
|
|
||||||
|
|
||||||
class Identity(StatorModel):
|
class Identity(StatorModel):
|
||||||
"""
|
"""
|
||||||
Represents both local and remote Fediverse identities (actors)
|
Represents both local and remote Fediverse identities (actors)
|
||||||
|
@ -135,6 +208,8 @@ class Identity(StatorModel):
|
||||||
fetched = models.DateTimeField(null=True, blank=True)
|
fetched = models.DateTimeField(null=True, blank=True)
|
||||||
deleted = models.DateTimeField(null=True, blank=True)
|
deleted = models.DateTimeField(null=True, blank=True)
|
||||||
|
|
||||||
|
objects = IdentityManager()
|
||||||
|
|
||||||
### Model attributes ###
|
### Model attributes ###
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
|
@ -313,6 +388,14 @@ class Identity(StatorModel):
|
||||||
def limited(self) -> bool:
|
def limited(self) -> bool:
|
||||||
return self.restriction == self.Restriction.limited
|
return self.restriction == self.Restriction.limited
|
||||||
|
|
||||||
|
### Async helpers ###
|
||||||
|
|
||||||
|
async def afetch_full(self):
|
||||||
|
"""
|
||||||
|
Returns a version of the object with all relations pre-loaded
|
||||||
|
"""
|
||||||
|
return await Identity.objects.select_related("domain").aget(pk=self.pk)
|
||||||
|
|
||||||
### ActivityPub (outbound) ###
|
### ActivityPub (outbound) ###
|
||||||
|
|
||||||
def to_ap(self):
|
def to_ap(self):
|
||||||
|
@ -363,6 +446,30 @@ class Identity(StatorModel):
|
||||||
"type": "Mention",
|
"type": "Mention",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def to_update_ap(self):
|
||||||
|
"""
|
||||||
|
Returns the AP JSON to update this object
|
||||||
|
"""
|
||||||
|
object = self.to_ap()
|
||||||
|
return {
|
||||||
|
"type": "Update",
|
||||||
|
"id": self.actor_uri + "#update",
|
||||||
|
"actor": self.actor_uri,
|
||||||
|
"object": object,
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_delete_ap(self):
|
||||||
|
"""
|
||||||
|
Returns the AP JSON to delete this object
|
||||||
|
"""
|
||||||
|
object = self.to_ap()
|
||||||
|
return {
|
||||||
|
"type": "Delete",
|
||||||
|
"id": self.actor_uri + "#delete",
|
||||||
|
"actor": self.actor_uri,
|
||||||
|
"object": object,
|
||||||
|
}
|
||||||
|
|
||||||
### ActivityPub (inbound) ###
|
### ActivityPub (inbound) ###
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
|
@ -7,6 +7,7 @@ from django.views.generic import FormView
|
||||||
from core.files import resize_image
|
from core.files import resize_image
|
||||||
from core.models.config import Config
|
from core.models.config import Config
|
||||||
from users.decorators import identity_required
|
from users.decorators import identity_required
|
||||||
|
from users.models import IdentityStates
|
||||||
|
|
||||||
|
|
||||||
@method_decorator(identity_required, name="dispatch")
|
@method_decorator(identity_required, name="dispatch")
|
||||||
|
@ -76,6 +77,8 @@ class ProfilePage(FormView):
|
||||||
resize_image(image, size=(1500, 500)),
|
resize_image(image, size=(1500, 500)),
|
||||||
)
|
)
|
||||||
identity.save()
|
identity.save()
|
||||||
|
identity.transition_perform(IdentityStates.edited)
|
||||||
|
|
||||||
# Save profile-specific identity Config
|
# Save profile-specific identity Config
|
||||||
Config.set_identity(
|
Config.set_identity(
|
||||||
identity, "visible_follows", form.cleaned_data["visible_follows"]
|
identity, "visible_follows", form.cleaned_data["visible_follows"]
|
||||||
|
|
Loading…
Reference in New Issue