from typing import Iterable, Set from fastapi import FastAPI, Request, Response, status, HTTPException from fastapi.responses import PlainTextResponse import httpx from starlette.exceptions import HTTPException as StarletteHTTPException from uuid_extensions import uuid7str import mimeparse from mimeparse import parse_mime_type import json app = FastAPI() SUPPORTED_CONTENT_TYPES = [ 'application/activity+json', 'application/ld+json', 'text/turtle', ] THE_ACTOR_NAMESPACE = 'http://localhost:8000/' # Override JSON error responses to plain text instead @app.exception_handler(StarletteHTTPException) async def http_exception_handler(request, exc): return PlainTextResponse( content=str(exc.detail), status_code=exc.status_code, headers=exc.headers, ) async def get_inboxes_from_resources(resources: Iterable[str]) -> Set[str]: """ Discover an ldp:inbox for every resource passed in. """ pass async def get_inbox_from_resource(resource: str) -> str: """ Discover an ldp:inbox for a resource. """ try: async with httpx.AsyncClient() as client: if not '#' in resource: response = await client.head(resource) links = response.links print(links) return response = await client.get(resource) except: pass"/outbox") async def post_outbox(request: Request) -> Response: """ Accept a notification payload and deliver to ActivityPub targets, per the S2S "Federated Server" conformance profile. """ try: # Validate Content-Type ( mime_maintype, mime_subtype, mime_params ) = parse_mime_type( request.headers.get('Content-Type'), ) mime_type = mime_maintype + '/' + mime_subtype if mime_type not in SUPPORTED_CONTENT_TYPES: raise HTTPException( status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, headers={ 'Accept-Post': ', '.join(SUPPORTED_CONTENT_TYPES) }, detail=( 'HTTP Error 415: Unsupported Media Type\n' f'This endpoint does not requests of the type {mime_type}\n\n' 'The supported types are:\n\n' f'{'\n'.join(SUPPORTED_CONTENT_TYPES)}' ), ) # Handle notification payload IS_ACTIVITYSTREAMS = ( mime_type == 'application/activity+json' or ( mime_type == 'application/ld+json' and mime_params.get('profile') == '' ) ) if IS_ACTIVITYSTREAMS: """ We can use shorthand terms because it is guaranteed to be compacted against AS2 context at least. """ body = await request.body() activity = json.loads(body) # Get delivery targets as_to = activity.get('to', []) as_cc = activity.get('cc', []) as_audience = activity.get('audience', []) as_bto = activity.get('bto', []) as_bcc = activity.get('bcc', []) # TODO: Expand collections into actors # Deduplicate the final recipients list delivery_targets = set(as_to + as_cc + as_audience + as_bto + as_bcc) # Find their inboxes inboxes = await get_inboxes_from_resources(delivery_targets) else: """ We should use full IRIs and treat this as RDF. """ # TODO: do this raise HTTPException( status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=( 'HTTP Error 501: Not Implemented\n' 'I still need to add support for non-AS2 payloads...' ), ) # Prepare Location header for response and persist activity to storage path = f'activities/{uuid7str()}' location = THE_ACTOR_NAMESPACE+path headers = { 'Location': location } # Respond with 201 Created + Location header return PlainTextResponse( content=f'Created activity at {location}', status_code=status.HTTP_201_CREATED, headers=headers, ) except mimeparse.MimeTypeParseException as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=( 'HTTP Error 400: Bad Request.\n' 'Could not parse the Content-Type of the request.\n\n' f'{e}' ), ) except json.JSONDecodeError as e: line_where_the_error_occurs = str(body, encoding='utf-8').split('\n')[e.lineno-1] before_the_error = line_where_the_error_occurs[max(0,e.colno-11):e.colno] after_the_error = line_where_the_error_occurs[e.colno:min(e.colno+10,len(line_where_the_error_occurs))] where_the_error_occurs = before_the_error + after_the_error + '\n' + '.'*(len(before_the_error)-1) + '^' raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=( 'HTTP Error 400: Bad Request.\n' 'Content-Type was detected as JSON-based, ' 'but request body could not be parsed as JSON.\n\n' f'{e}\n' f'{where_the_error_occurs}' ), )"/inbox") async def post_inbox(request: Request): pass