This release is 23 versions behind 1.4.9 — the latest version of @fedify/fedify. Jump to latest
Built and signed on GitHub ActionsBuilt and signed on GitHub Actions
Built and signed on GitHub Actions
An ActivityPub/fediverse server framework
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759import { getLogger } from "jsr:@logtape/logtape@^0.8.0"; import type { Span, TracerProvider } from "npm:@opentelemetry/api@^1.9.0"; import { SpanKind, SpanStatusCode, trace } from "npm:@opentelemetry/api@^1.9.0"; import { accepts } from "jsr:/@std/http@^1.0.6/negotiation"; import metadata from "../deno.json" with { type: "json" }; import type { DocumentLoader } from "../runtime/docloader.ts"; import { verifyRequest } from "../sig/http.ts"; import { detachSignature, verifyJsonLd } from "../sig/ld.ts"; import { doesActorOwnKey } from "../sig/owner.ts"; import { verifyObject } from "../sig/proof.ts"; import type { Recipient } from "../vocab/actor.ts"; import { getTypeId } from "../vocab/type.ts"; import { Activity, type CryptographicKey, Link, Object, OrderedCollection, OrderedCollectionPage, } from "../vocab/vocab.ts"; import type { ActorDispatcher, AuthorizePredicate, CollectionCounter, CollectionCursor, CollectionDispatcher, InboxErrorHandler, ObjectAuthorizePredicate, ObjectDispatcher, } from "./callback.ts"; import type { Context, InboxContext, RequestContext } from "./context.ts"; import { type InboxListenerSet, routeActivity } from "./inbox.ts"; import { KvKeyCache } from "./keycache.ts"; import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; export function acceptsJsonLd(request: Request): boolean { const types = accepts(request); if (types == null) return true; if (types[0] === "text/html" || types[0] === "application/xhtml+xml") { return false; } return types.includes("application/activity+json") || types.includes("application/ld+json") || types.includes("application/json"); } export interface ActorHandlerParameters<TContextData> { identifier: string; context: RequestContext<TContextData>; actorDispatcher?: ActorDispatcher<TContextData>; authorizePredicate?: AuthorizePredicate<TContextData>; onUnauthorized(request: Request): Response | Promise<Response>; onNotFound(request: Request): Response | Promise<Response>; onNotAcceptable(request: Request): Response | Promise<Response>; } export async function handleActor<TContextData>( request: Request, { identifier, context, actorDispatcher, authorizePredicate, onNotFound, onNotAcceptable, onUnauthorized, }: ActorHandlerParameters<TContextData>, ): Promise<Response> { const logger = getLogger(["fedify", "federation", "actor"]); if (actorDispatcher == null) { logger.debug("Actor dispatcher is not set.", { identifier }); return await onNotFound(request); } const actor = await actorDispatcher(context, identifier); if (actor == null) { logger.debug("Actor {identifier} not found.", { identifier }); return await onNotFound(request); } if (!acceptsJsonLd(request)) return await onNotAcceptable(request); if (authorizePredicate != null) { const key = await context.getSignedKey(); const keyOwner = await context.getSignedKeyOwner(); if (!await authorizePredicate(context, identifier, key, keyOwner)) { return await onUnauthorized(request); } } const jsonLd = await actor.toJsonLd(context); return new Response(JSON.stringify(jsonLd), { headers: { "Content-Type": "application/activity+json", Vary: "Accept", }, }); } export interface ObjectHandlerParameters<TContextData> { values: Record<string, string>; context: RequestContext<TContextData>; objectDispatcher?: ObjectDispatcher<TContextData, Object, string>; authorizePredicate?: ObjectAuthorizePredicate<TContextData, string>; onUnauthorized(request: Request): Response | Promise<Response>; onNotFound(request: Request): Response | Promise<Response>; onNotAcceptable(request: Request): Response | Promise<Response>; } export async function handleObject<TContextData>( request: Request, { values, context, objectDispatcher, authorizePredicate, onNotFound, onNotAcceptable, onUnauthorized, }: ObjectHandlerParameters<TContextData>, ): Promise<Response> { if (objectDispatcher == null) return await onNotFound(request); const object = await objectDispatcher(context, values); if (object == null) return await onNotFound(request); if (!acceptsJsonLd(request)) return await onNotAcceptable(request); if (authorizePredicate != null) { const key = await context.getSignedKey(); const keyOwner = await context.getSignedKeyOwner(); if (!await authorizePredicate(context, values, key, keyOwner)) { return await onUnauthorized(request); } } const jsonLd = await object.toJsonLd(context); return new Response(JSON.stringify(jsonLd), { headers: { "Content-Type": "application/activity+json", Vary: "Accept", }, }); } /** * Callbacks for handling a collection. */ export interface CollectionCallbacks< TItem, TContext extends Context<TContextData>, TContextData, TFilter, > { /** * A callback that dispatches a collection. */ dispatcher: CollectionDispatcher<TItem, TContext, TContextData, TFilter>; /** * A callback that counts the number of items in a collection. */ counter?: CollectionCounter<TContextData, TFilter>; /** * A callback that returns the first cursor for a collection. */ firstCursor?: CollectionCursor<TContext, TContextData, TFilter>; /** * A callback that returns the last cursor for a collection. */ lastCursor?: CollectionCursor<TContext, TContextData, TFilter>; /** * A callback that determines if a request is authorized to access the collection. */ authorizePredicate?: AuthorizePredicate<TContextData>; } export interface CollectionHandlerParameters< TItem, TContext extends RequestContext<TContextData>, TContextData, TFilter, > { name: string; identifier: string; uriGetter: (handle: string) => URL; filter?: TFilter; filterPredicate?: (item: TItem) => boolean; context: TContext; collectionCallbacks?: CollectionCallbacks< TItem, TContext, TContextData, TFilter >; tracerProvider?: TracerProvider; onUnauthorized(request: Request): Response | Promise<Response>; onNotFound(request: Request): Response | Promise<Response>; onNotAcceptable(request: Request): Response | Promise<Response>; } export async function handleCollection< TItem extends URL | Object | Link | Recipient, TContext extends RequestContext<TContextData>, TContextData, TFilter, >( request: Request, { name, identifier, uriGetter, filter, filterPredicate, context, collectionCallbacks, tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>, ): Promise<Response> { const spanName = name.trim().replace(/\s+/g, "_"); tracerProvider = tracerProvider ?? trace.getTracerProvider(); const tracer = tracerProvider.getTracer(metadata.name, metadata.version); const url = new URL(request.url); const cursor = url.searchParams.get("cursor"); if (collectionCallbacks == null) return await onNotFound(request); let collection: OrderedCollection | OrderedCollectionPage; const baseUri = uriGetter(identifier); if (cursor == null) { const firstCursor = await collectionCallbacks.firstCursor?.( context, identifier, ); const totalItems = await collectionCallbacks.counter?.(context, identifier); if (firstCursor == null) { const itemsOrResponse = await tracer.startActiveSpan( `activitypub.dispatch_collection ${spanName}`, { kind: SpanKind.SERVER, attributes: { "activitypub.collection.id": baseUri.href, "activitypub.collection.type": OrderedCollection.typeId.href, }, }, async (span) => { if (totalItems != null) { span.setAttribute( "activitypub.collection.total_items", Number(totalItems), ); } try { const page = await collectionCallbacks.dispatcher( context, identifier, null, filter, ); if (page == null) { span.setStatus({ code: SpanStatusCode.ERROR }); return await onNotFound(request); } const { items } = page; span.setAttribute("fedify.collection.items", items.length); return items; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); if (itemsOrResponse instanceof Response) return itemsOrResponse; collection = new OrderedCollection({ id: baseUri, totalItems: totalItems == null ? null : Number(totalItems), items: filterCollectionItems(itemsOrResponse, name, filterPredicate), }); } else { const lastCursor = await collectionCallbacks.lastCursor?.( context, identifier, ); const first = new URL(context.url); first.searchParams.set("cursor", firstCursor); let last = null; if (lastCursor != null) { last = new URL(context.url); last.searchParams.set("cursor", lastCursor); } collection = new OrderedCollection({ id: baseUri, totalItems: Number(totalItems), first, last, }); } } else { const uri = new URL(baseUri); uri.searchParams.set("cursor", cursor); const pageOrResponse = await tracer.startActiveSpan( `activitypub.dispatch_collection_page ${name}`, { kind: SpanKind.SERVER, attributes: { "activitypub.collection.id": uri.href, "activitypub.collection.type": OrderedCollectionPage.typeId.href, "fedify.collection.cursor": cursor, }, }, async (span) => { try { const page = await collectionCallbacks.dispatcher( context, identifier, cursor, filter, ); if (page == null) { span.setStatus({ code: SpanStatusCode.ERROR }); return await onNotFound(request); } span.setAttribute("fedify.collection.items", page.items.length); return page; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); if (pageOrResponse instanceof Response) return pageOrResponse; const { items, prevCursor, nextCursor } = pageOrResponse; let prev = null; if (prevCursor != null) { prev = new URL(context.url); prev.searchParams.set("cursor", prevCursor); } let next = null; if (nextCursor != null) { next = new URL(context.url); next.searchParams.set("cursor", nextCursor); } const partOf = new URL(context.url); partOf.searchParams.delete("cursor"); collection = new OrderedCollectionPage({ id: uri, prev, next, items: filterCollectionItems(items, name, filterPredicate), partOf, }); } if (!acceptsJsonLd(request)) return await onNotAcceptable(request); if (collectionCallbacks.authorizePredicate != null) { const key = await context.getSignedKey(); const keyOwner = await context.getSignedKeyOwner(); if ( !await collectionCallbacks.authorizePredicate( context, identifier, key, keyOwner, ) ) { return await onUnauthorized(request); } } const jsonLd = await collection.toJsonLd(context); return new Response(JSON.stringify(jsonLd), { headers: { "Content-Type": "application/activity+json", Vary: "Accept", }, }); } function filterCollectionItems<TItem extends Object | Link | Recipient | URL>( items: TItem[], collectionName: string, filterPredicate?: (item: TItem) => boolean, ): (Object | Link | URL)[] { const result: (Object | Link | URL)[] = []; let logged = false; for (const item of items) { let mappedItem: Object | Link | URL; if (item instanceof Object || item instanceof Link || item instanceof URL) { mappedItem = item; } else if (item.id == null) continue; else mappedItem = item.id; if (filterPredicate != null && !filterPredicate(item)) { if (!logged) { getLogger(["fedify", "federation", "collection"]).warn( `The ${collectionName} collection apparently does not implement ` + "filtering. This may result in a large response payload. " + "Please consider implementing filtering for the collection.", ); logged = true; } continue; } result.push(mappedItem); } return result; } export interface InboxHandlerParameters<TContextData> { recipient: string | null; context: RequestContext<TContextData>; inboxContextFactory( recipient: string | null, activity: unknown, activityId: string | undefined, activityType: string, ): InboxContext<TContextData>; kv: KvStore; kvPrefixes: { activityIdempotence: KvKey; publicKey: KvKey; }; queue?: MessageQueue; actorDispatcher?: ActorDispatcher<TContextData>; inboxListeners?: InboxListenerSet<TContextData>; inboxErrorHandler?: InboxErrorHandler<TContextData>; onNotFound(request: Request): Response | Promise<Response>; signatureTimeWindow: Temporal.Duration | Temporal.DurationLike | false; skipSignatureVerification: boolean; tracerProvider?: TracerProvider; } export async function handleInbox<TContextData>( request: Request, options: InboxHandlerParameters<TContextData>, ): Promise<Response> { const tracerProvider = options.tracerProvider ?? trace.getTracerProvider(); const tracer = tracerProvider.getTracer(metadata.name, metadata.version); return await tracer.startActiveSpan( "activitypub.inbox", { kind: options.queue == null ? SpanKind.SERVER : SpanKind.PRODUCER, attributes: { "activitypub.shared_inbox": options.recipient == null }, }, async (span) => { if (options.recipient != null) { span.setAttribute("fedify.inbox.recipient", options.recipient); } try { return await handleInboxInternal(request, options, span); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); } async function handleInboxInternal<TContextData>( request: Request, { recipient, context: ctx, inboxContextFactory, kv, kvPrefixes, queue, actorDispatcher, inboxListeners, inboxErrorHandler, onNotFound, signatureTimeWindow, skipSignatureVerification, tracerProvider, }: InboxHandlerParameters<TContextData>, span: Span, ): Promise<Response> { const logger = getLogger(["fedify", "federation", "inbox"]); if (actorDispatcher == null) { logger.error("Actor dispatcher is not set.", { recipient }); span.setStatus({ code: SpanStatusCode.ERROR, message: "Actor dispatcher is not set.", }); return await onNotFound(request); } else if (recipient != null) { const actor = await actorDispatcher(ctx, recipient); if (actor == null) { logger.error("Actor {recipient} not found.", { recipient }); span.setStatus({ code: SpanStatusCode.ERROR, message: `Actor ${recipient} not found.`, }); return await onNotFound(request); } } if (request.bodyUsed) { logger.error("Request body has already been read.", { recipient }); span.setStatus({ code: SpanStatusCode.ERROR, message: "Request body has already been read.", }); return new Response("Internal server error.", { status: 500, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } else if (request.body?.locked) { logger.error("Request body is locked.", { recipient }); span.setStatus({ code: SpanStatusCode.ERROR, message: "Request body is locked.", }); return new Response("Internal server error.", { status: 500, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } let json: unknown; try { json = await request.clone().json(); } catch (error) { logger.error("Failed to parse JSON:\n{error}", { recipient, error }); try { await inboxErrorHandler?.(ctx, error as Error); } catch (error) { logger.error( "An unexpected error occurred in inbox error handler:\n{error}", { error, activity: json, recipient }, ); } span.setStatus({ code: SpanStatusCode.ERROR, message: `Failed to parse JSON:\n${error}`, }); return new Response("Invalid JSON.", { status: 400, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } const keyCache = new KvKeyCache(kv, kvPrefixes.publicKey, ctx); const ldSigVerified = await verifyJsonLd(json, { contextLoader: ctx.contextLoader, documentLoader: ctx.documentLoader, keyCache, tracerProvider, }); const jsonWithoutSig = detachSignature(json); let activity: Activity | null = null; if (ldSigVerified) { logger.debug("Linked Data Signatures are verified.", { recipient, json }); activity = await Activity.fromJsonLd(jsonWithoutSig, ctx); } else { logger.debug( "Linked Data Signatures are not verified.", { recipient, json }, ); try { activity = await verifyObject(Activity, jsonWithoutSig, { contextLoader: ctx.contextLoader, documentLoader: ctx.documentLoader, keyCache, tracerProvider, }); } catch (error) { logger.error("Failed to parse activity:\n{error}", { recipient, activity: json, error, }); try { await inboxErrorHandler?.(ctx, error as Error); } catch (error) { logger.error( "An unexpected error occurred in inbox error handler:\n{error}", { error, activity: json, recipient }, ); } span.setStatus({ code: SpanStatusCode.ERROR, message: `Failed to parse activity:\n${error}`, }); return new Response("Invalid activity.", { status: 400, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } if (activity == null) { logger.debug( "Object Integrity Proofs are not verified.", { recipient, activity: json }, ); } else { logger.debug( "Object Integrity Proofs are verified.", { recipient, activity: json }, ); } } let httpSigKey: CryptographicKey | null = null; if (activity == null) { if (!skipSignatureVerification) { const key = await verifyRequest(request, { contextLoader: ctx.contextLoader, documentLoader: ctx.documentLoader, timeWindow: signatureTimeWindow, keyCache, tracerProvider, }); if (key == null) { logger.error( "Failed to verify the request's HTTP Signatures.", { recipient }, ); span.setStatus({ code: SpanStatusCode.ERROR, message: `Failed to verify the request's HTTP Signatures.`, }); const response = new Response( "Failed to verify the request signature.", { status: 401, headers: { "Content-Type": "text/plain; charset=utf-8" }, }, ); return response; } else { logger.debug("HTTP Signatures are verified.", { recipient }); } httpSigKey = key; } activity = await Activity.fromJsonLd(jsonWithoutSig, ctx); } if (activity.id != null) { span.setAttribute("activitypub.activity.id", activity.id.href); } span.setAttribute("activitypub.activity.type", getTypeId(activity).href); const routeResult = await routeActivity({ context: ctx, json, activity, recipient, inboxListeners, inboxContextFactory, inboxErrorHandler, kv, kvPrefixes, queue, span, tracerProvider, }); if ( httpSigKey != null && !await doesActorOwnKey(activity, httpSigKey, ctx) ) { logger.error( "The signer ({keyId}) and the actor ({actorId}) do not match.", { activity: json, recipient, keyId: httpSigKey.id?.href, actorId: activity.actorId?.href, }, ); span.setStatus({ code: SpanStatusCode.ERROR, message: `The signer (${httpSigKey.id?.href}) and ` + `the actor (${activity.actorId?.href}) do not match.`, }); return new Response("The signer and the actor do not match.", { status: 401, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } if (routeResult === "alreadyProcessed") { return new Response( `Activity <${activity.id}> has already been processed.`, { status: 202, headers: { "Content-Type": "text/plain; charset=utf-8" }, }, ); } else if (routeResult === "missingActor") { return new Response("Missing actor.", { status: 400, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } else if (routeResult === "enqueued") { return new Response("Activity is enqueued.", { status: 202, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } else if (routeResult === "unsupportedActivity") { return new Response("", { status: 202, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } else if (routeResult === "error") { return new Response("Internal server error.", { status: 500, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } else { return new Response("", { status: 202, headers: { "Content-Type": "text/plain; charset=utf-8" }, }); } } /** * Options for the {@link respondWithObject} and * {@link respondWithObjectIfAcceptable} functions. * @since 0.3.0 */ export interface RespondWithObjectOptions { /** * The document loader to use for compacting JSON-LD. * @since 0.8.0 */ contextLoader: DocumentLoader; } /** * Responds with the given object in JSON-LD format. * * @param object The object to respond with. * @param options Options. * @since 0.3.0 */ export async function respondWithObject( object: Object, options?: RespondWithObjectOptions, ): Promise<Response> { const jsonLd = await object.toJsonLd(options); return new Response(JSON.stringify(jsonLd), { headers: { "Content-Type": "application/activity+json", }, }); } /** * Responds with the given object in JSON-LD format if the request accepts * JSON-LD. * * @param object The object to respond with. * @param request The request to check for JSON-LD acceptability. * @param options Options. * @since 0.3.0 */ export async function respondWithObjectIfAcceptable( object: Object, request: Request, options?: RespondWithObjectOptions, ): Promise<Response | null> { if (!acceptsJsonLd(request)) return null; const response = await respondWithObject(object, options); response.headers.set("Vary", "Accept"); return response; }