163 lines
3.7 KiB
Python
163 lines
3.7 KiB
Python
from fastapi import APIRouter, Response
|
|
from fastapi.responses import PlainTextResponse, JSONResponse
|
|
from fastapi.params import Query
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from webfinger.io import get_document
|
|
|
|
|
|
router = APIRouter(
|
|
prefix="/.well-known/webfinger"
|
|
)
|
|
|
|
|
|
## String literals
|
|
|
|
MEDIA_TYPE = "application/jrd+json"
|
|
|
|
EXAMPLE_RESOURCE = r'''
|
|
{
|
|
"aliases": ["https://trwnh.com/actors/7057bc10-db1c-4ebe-9e00-22cf04be4e5e", "https://trwnh.com/~a"],
|
|
"links": [
|
|
{
|
|
"rel": "self",
|
|
"type": "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"",
|
|
"href": "https://trwnh.com/actors/7057bc10-db1c-4ebe-9e00-22cf04be4e5e"
|
|
},
|
|
{
|
|
"rel": "https://webfinger.net/rel/profile-page/",
|
|
"type": "text/html",
|
|
"href": "https://trwnh.com/~a"
|
|
}
|
|
]
|
|
}
|
|
'''
|
|
|
|
NO_RESOURCE_HINT = "Query ?resource={resource} to obtain information about that resource."
|
|
|
|
|
|
## Pydantic models for OpenAPI schema
|
|
|
|
class Link(BaseModel):
|
|
rel: str
|
|
type: str | None
|
|
href: str | None
|
|
titles: str | None
|
|
properties: str | None
|
|
|
|
class Config:
|
|
schema_extra = {
|
|
"example": '''{
|
|
"rel": "https://webfinger.net/rel/profile-page/",
|
|
"type": "text/html",
|
|
"href": "https://trwnh.com/~a"
|
|
}'''
|
|
}
|
|
|
|
class Resource(BaseModel):
|
|
subject: str
|
|
aliases: list[str] | None
|
|
links: list[Link] | None
|
|
|
|
class Config:
|
|
schema_extra = {
|
|
"example": EXAMPLE_RESOURCE
|
|
}
|
|
|
|
class WebfingerResponse(Response):
|
|
media_type = MEDIA_TYPE
|
|
|
|
|
|
## Example responses for OpenAPI schema
|
|
|
|
RESPONSES = {
|
|
400: {
|
|
"description": "Bad Request. No resource was provided.",
|
|
"content": {
|
|
"text/plain": {
|
|
"example": NO_RESOURCE_HINT
|
|
}
|
|
},
|
|
},
|
|
404: {
|
|
"description": "Not Found. Could not find a document for the provided resource.",
|
|
"content": {
|
|
"text/plain": {
|
|
"example": "{resource} not found"
|
|
}
|
|
},
|
|
},
|
|
200: {
|
|
"description": "The resource you requested was found, and has the returned resource document.",
|
|
"model": Resource,
|
|
"content": {
|
|
MEDIA_TYPE: {
|
|
"example": EXAMPLE_RESOURCE
|
|
}
|
|
}
|
|
},
|
|
}
|
|
|
|
|
|
## The lookup method as described in RFC7033
|
|
|
|
@router.get(
|
|
"",
|
|
summary = "Lookup a Webfinger resource",
|
|
description = "Query the Webfinger service for a resource URI and obtain its associated resource document",
|
|
tags = ["RFC7033"],
|
|
response_model = Resource,
|
|
response_class = WebfingerResponse,
|
|
responses = {**RESPONSES},
|
|
|
|
)
|
|
async def lookup(
|
|
resource: str = Query(
|
|
None,
|
|
description = "The subject whose document you are looking up. If not provided, you will get a 400 Bad Request."
|
|
),
|
|
rel: list[str] = Query(
|
|
None,
|
|
description = "If provided, filter the links array for this rel-value only. This parameter may be included multiple times."
|
|
)
|
|
):
|
|
"""
|
|
Respond to a WebFinger query.
|
|
"""
|
|
|
|
# If no resource is given, then show a basic hint.
|
|
if not resource:
|
|
return PlainTextResponse(
|
|
content=NO_RESOURCE_HINT,
|
|
status_code=400,
|
|
)
|
|
|
|
# Otherwise, try to read the resource document.
|
|
data = get_document(resource)
|
|
|
|
# If the document could not be read, then it likely didn't exist.
|
|
if data is None:
|
|
return PlainTextResponse(
|
|
content=f"{resource} not found",
|
|
status_code=404,
|
|
)
|
|
|
|
# Obtain values from the resource document.
|
|
aliases = [alias for alias in data.get('aliases', [])]
|
|
links = [link for link in data.get('links', [])]
|
|
properties = data.get('properties', {})
|
|
|
|
# If optional rel is provided, then filter only for links with that rel-type.
|
|
if rel:
|
|
links = [link for link in links if link.get('rel', '') in rel]
|
|
|
|
# Construct JSON response and return it.
|
|
content = {
|
|
"subject": resource,
|
|
"aliases": aliases,
|
|
"properties": properties,
|
|
"links": links,
|
|
}
|
|
content = {k: v for k, v in content.items() if v} # remove null values
|
|
return JSONResponse(content=content, media_type=MEDIA_TYPE) |