takahe/core/signatures.py

128 lines
4.2 KiB
Python

import base64
import json
from typing import TYPE_CHECKING, Dict, List, Literal, TypedDict
from urllib.parse import urlparse
import httpx
from cryptography.hazmat.primitives import hashes
from django.http import HttpRequest
from django.utils.http import http_date
# Prevent a circular import
if TYPE_CHECKING:
from users.models import Identity
class HttpSignature:
"""
Allows for calculation and verification of HTTP signatures
"""
@classmethod
def calculate_digest(cls, data, algorithm="sha-256") -> str:
"""
Calculates the digest header value for a given HTTP body
"""
if algorithm == "sha-256":
digest = hashes.Hash(hashes.SHA256())
digest.update(data)
return "SHA-256=" + base64.b64encode(digest.finalize()).decode("ascii")
else:
raise ValueError(f"Unknown digest algorithm {algorithm}")
@classmethod
def headers_from_request(cls, request: HttpRequest, header_names: List[str]) -> str:
"""
Creates the to-be-signed header payload from a Django request
"""
headers = {}
for header_name in header_names:
if header_name == "(request-target)":
value = f"post {request.path}"
elif header_name == "content-type":
value = request.META["CONTENT_TYPE"]
else:
value = request.META[f"HTTP_{header_name.upper()}"]
headers[header_name] = value
return "\n".join(f"{name.lower()}: {value}" for name, value in headers.items())
@classmethod
def parse_signature(cls, signature: str) -> "SignatureDetails":
bits = {}
for item in signature.split(","):
name, value = item.split("=", 1)
value = value.strip('"')
bits[name.lower()] = value
signature_details: SignatureDetails = {
"headers": bits["headers"].split(),
"signature": base64.b64decode(bits["signature"]),
"algorithm": bits["algorithm"],
"keyid": bits["keyid"],
}
return signature_details
@classmethod
def compile_signature(cls, details: "SignatureDetails") -> str:
value = f'keyId="{details["keyid"]}",headers="'
value += " ".join(h.lower() for h in details["headers"])
value += '",signature="'
value += base64.b64encode(details["signature"]).decode("ascii")
value += f'",algorithm="{details["algorithm"]}"'
return value
@classmethod
async def signed_request(
self,
uri: str,
body: Dict,
identity: "Identity",
content_type: str = "application/json",
method: Literal["post"] = "post",
):
"""
Performs an async request to the given path, with a document, signed
as an identity.
"""
uri_parts = urlparse(uri)
date_string = http_date()
body_bytes = json.dumps(body).encode("utf8")
headers = {
"(request-target)": f"{method} {uri_parts.path}",
"Host": uri_parts.hostname,
"Date": date_string,
"Digest": self.calculate_digest(body_bytes),
"Content-Type": content_type,
}
signed_string = "\n".join(
f"{name.lower()}: {value}" for name, value in headers.items()
)
headers["Signature"] = self.compile_signature(
{
"keyid": identity.key_id,
"headers": list(headers.keys()),
"signature": identity.sign(signed_string),
"algorithm": "rsa-sha256",
}
)
del headers["(request-target)"]
async with httpx.AsyncClient() as client:
print(f"Calling {method} {uri}")
response = await client.request(
method,
uri,
headers=headers,
content=body_bytes,
)
if response.status_code >= 400:
raise ValueError(
f"Request error: {response.status_code} {response.content}"
)
return response
class SignatureDetails(TypedDict):
algorithm: str
headers: List[str]
signature: bytes
keyid: str