Skip to main content
This release is 3 versions behind 1.3.3 — the latest version of @fedify/fedify. Jump to latest

Built and signed on GitHub Actions

An ActivityPub/fediverse server framework

This package works with Node.js, Deno, Bun
This package works with Node.js
This package works with Deno
This package works with Bun
JSR Score
100%
Published
2 months ago (1.3.0)
Package root>federation>middleware.ts
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698
import { verifyObject } from "../mod.ts"; import { getLogger, withContext } from "jsr:@logtape/logtape@^0.8.0"; import { context, propagation, type Span, SpanKind, SpanStatusCode, trace, type Tracer, type TracerProvider, } from "npm:@opentelemetry/api@^1.9.0"; import { ATTR_HTTP_REQUEST_HEADER, ATTR_HTTP_REQUEST_METHOD, ATTR_HTTP_RESPONSE_HEADER, ATTR_HTTP_RESPONSE_STATUS_CODE, ATTR_URL_FULL, } from "npm:@opentelemetry/semantic-conventions@^1.27.0"; import metadata from "../deno.json" with { type: "json" }; import { handleNodeInfo, handleNodeInfoJrd } from "../nodeinfo/handler.ts"; import { type AuthenticatedDocumentLoaderFactory, type DocumentLoader, getAuthenticatedDocumentLoader, getDocumentLoader, type GetUserAgentOptions, kvCache, } from "../runtime/docloader.ts"; import { verifyRequest } from "../sig/http.ts"; import { exportJwk, importJwk, validateCryptoKey } from "../sig/key.ts"; import { hasSignature, signJsonLd } from "../sig/ld.ts"; import { getKeyOwner } from "../sig/owner.ts"; import { signObject } from "../sig/proof.ts"; import type { Actor, Recipient } from "../vocab/actor.ts"; import { lookupObject, type LookupObjectOptions, traverseCollection, type TraverseCollectionOptions, } from "../vocab/lookup.ts"; import { getTypeId } from "../vocab/type.ts"; import { Activity, type Collection, CryptographicKey, type Hashtag, type Like, type Link, Multikey, type Object, } from "../vocab/vocab.ts"; import { handleWebFinger } from "../webfinger/handler.ts"; import type { ActorDispatcher, ActorHandleMapper, ActorKeyPairsDispatcher, AuthorizePredicate, CollectionCounter, CollectionCursor, CollectionDispatcher, InboxErrorHandler, InboxListener, NodeInfoDispatcher, ObjectAuthorizePredicate, ObjectDispatcher, OutboxErrorHandler, SharedInboxKeyDispatcher, } from "./callback.ts"; import { buildCollectionSynchronizationHeader } from "./collection.ts"; import type { ActorKeyPair, Context, ForwardActivityOptions, InboxContext, ParseUriResult, RequestContext, RouteActivityOptions, SendActivityOptions, } from "./context.ts"; import type { ActorCallbackSetters, CollectionCallbackSetters, Federation, FederationFetchOptions, FederationStartQueueOptions, InboxListenerSetters, ObjectCallbackSetters, } from "./federation.ts"; import { type CollectionCallbacks, handleActor, handleCollection, handleInbox, handleObject, } from "./handler.ts"; import { InboxListenerSet, routeActivity } from "./inbox.ts"; import { KvKeyCache } from "./keycache.ts"; import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage, Message, OutboxMessage, SenderKeyJwkPair, } from "./queue.ts"; import { createExponentialBackoffPolicy, type RetryPolicy } from "./retry.ts"; import { Router, RouterError } from "./router.ts"; import { extractInboxes, sendActivity, type SenderKeyPair } from "./send.ts"; /** * Options for {@link createFederation} function. * @since 0.10.0 */ export interface CreateFederationOptions { /** * The key-value store used for caching, outbox queues, and inbox idempotence. */ kv: KvStore; /** * Prefixes for namespacing keys in the Deno KV store. By default, all keys * are prefixed with `["_fedify"]`. */ kvPrefixes?: Partial<FederationKvPrefixes>; /** * The message queue for sending and receiving activities. If not provided, * activities will not be queued and will be processed immediately. * * If a `MessageQueue` is provided, both the `inbox` and `outbox` queues * will be set to the same queue. * * If a `FederationQueueOptions` object is provided, you can set the queues * separately (since Fedify 1.3.0). */ queue?: FederationQueueOptions | MessageQueue; /** * Whether to start the task queue manually or automatically. * * If `true`, the task queue will not start automatically and you need to * manually start it by calling the {@link Federation.startQueue} method. * * If `false`, the task queue will start automatically as soon as * the first task is enqueued. * * By default, the queue starts automatically. * * @since 0.12.0 */ manuallyStartQueue?: boolean; /** * A custom JSON-LD document loader. By default, this uses the built-in * cache-backed loader that fetches remote documents over HTTP(S). */ documentLoader?: DocumentLoader; /** * A custom JSON-LD context loader. By default, this uses the same loader * as the document loader. */ contextLoader?: DocumentLoader; /** * A factory function that creates an authenticated document loader for a * given identity. This is used for fetching documents that require * authentication. */ authenticatedDocumentLoaderFactory?: AuthenticatedDocumentLoaderFactory; /** * Whether to allow fetching private network addresses in the document loader. * * If turned on, {@link CreateFederationOptions.documentLoader}, * {@link CreateFederationOptions.contextLoader}, and * {@link CreateFederationOptions.authenticatedDocumentLoaderFactory} * cannot be configured. * * Mostly useful for testing purposes. *Do not use in production.* * * Turned off by default. */ allowPrivateAddress?: boolean; /** * Options for making `User-Agent` strings for HTTP requests. * If a string is provided, it is used as the `User-Agent` header. * If an object is provided, it is passed to the {@link getUserAgent} * function. */ userAgent?: GetUserAgentOptions | string; /** * A callback that handles errors during outbox processing. Note that this * callback can be called multiple times for the same activity, because * the delivery is retried according to the backoff schedule until it * succeeds or reaches the maximum retry count. * * If any errors are thrown in this callback, they are ignored. */ onOutboxError?: OutboxErrorHandler; /** * The time window for verifying HTTP Signatures of incoming requests. If the * request is older or newer than this window, it is rejected. Or if it is * `false`, the request's timestamp is not checked at all. * * By default, the window is an hour. */ signatureTimeWindow?: Temporal.Duration | Temporal.DurationLike | false; /** * Whether to skip HTTP Signatures verification for incoming activities. * This is useful for testing purposes, but should not be used in production. * * By default, this is `false` (i.e., signatures are verified). * @since 0.13.0 */ skipSignatureVerification?: boolean; /** * The retry policy for sending activities to recipients' inboxes. * By default, this uses an exponential backoff strategy with a maximum of * 10 attempts and a maximum delay of 12 hours. * @since 0.12.0 */ outboxRetryPolicy?: RetryPolicy; /** * The retry policy for processing incoming activities. By default, this * uses an exponential backoff strategy with a maximum of 10 attempts and a * maximum delay of 12 hours. * @since 0.12.0 */ inboxRetryPolicy?: RetryPolicy; /** * Whether the router should be insensitive to trailing slashes in the URL * paths. For example, if this option is `true`, `/foo` and `/foo/` are * treated as the same path. Turned off by default. * @since 0.12.0 */ trailingSlashInsensitive?: boolean; /** * The OpenTelemetry tracer provider for tracing operations. If not provided, * the default global tracer provider is used. * @since 1.3.0 */ tracerProvider?: TracerProvider; } /** * Configures the task queues for sending and receiving activities. * @since 1.3.0 */ export interface FederationQueueOptions { /** * The message queue for incoming activities. If not provided, incoming * activities will not be queued and will be processed immediately. */ inbox?: MessageQueue; /** * The message queue for outgoing activities. If not provided, outgoing * activities will not be queued and will be sent immediately. */ outbox?: MessageQueue; } /** * Prefixes for namespacing keys in the Deno KV store. */ export interface FederationKvPrefixes { /** * The key prefix used for storing whether activities have already been * processed or not. `["_fedify", "activityIdempotence"]` by default. */ activityIdempotence: KvKey; /** * The key prefix used for storing remote JSON-LD documents. * `["_fedify", "remoteDocument"]` by default. */ remoteDocument: KvKey; /** * The key prefix used for caching public keys. * `["_fedify", "publicKey"]` by default. * @since 0.12.0 */ publicKey: KvKey; } /** * Create a new {@link Federation} instance. * @param parameters Parameters for initializing the instance. * @returns A new {@link Federation} instance. * @since 0.10.0 */ export function createFederation<TContextData>( options: CreateFederationOptions, ): Federation<TContextData> { return new FederationImpl<TContextData>(options); } export class FederationImpl<TContextData> implements Federation<TContextData> { kv: KvStore; kvPrefixes: FederationKvPrefixes; inboxQueue?: MessageQueue; outboxQueue?: MessageQueue; inboxQueueStarted: boolean; outboxQueueStarted: boolean; manuallyStartQueue: boolean; router: Router; nodeInfoDispatcher?: NodeInfoDispatcher<TContextData>; actorCallbacks?: ActorCallbacks<TContextData>; objectCallbacks: Record<string, ObjectCallbacks<TContextData, string>>; objectTypeIds: Record< string, // deno-lint-ignore no-explicit-any (new (...args: any[]) => Object) & { typeId: URL } >; inboxPath?: string; inboxCallbacks?: CollectionCallbacks< Activity, RequestContext<TContextData>, TContextData, void >; outboxCallbacks?: CollectionCallbacks< Activity, RequestContext<TContextData>, TContextData, void >; followingCallbacks?: CollectionCallbacks< Actor | URL, RequestContext<TContextData>, TContextData, void >; followersCallbacks?: CollectionCallbacks< Recipient, Context<TContextData>, TContextData, URL >; likedCallbacks?: CollectionCallbacks< Like, RequestContext<TContextData>, TContextData, void >; featuredCallbacks?: CollectionCallbacks< Object, RequestContext<TContextData>, TContextData, void >; featuredTagsCallbacks?: CollectionCallbacks< Hashtag, RequestContext<TContextData>, TContextData, void >; inboxListeners?: InboxListenerSet<TContextData>; inboxErrorHandler?: InboxErrorHandler<TContextData>; sharedInboxKeyDispatcher?: SharedInboxKeyDispatcher<TContextData>; documentLoader: DocumentLoader; contextLoader: DocumentLoader; authenticatedDocumentLoaderFactory: AuthenticatedDocumentLoaderFactory; userAgent?: GetUserAgentOptions | string; onOutboxError?: OutboxErrorHandler; signatureTimeWindow: Temporal.Duration | Temporal.DurationLike | false; skipSignatureVerification: boolean; outboxRetryPolicy: RetryPolicy; inboxRetryPolicy: RetryPolicy; tracerProvider: TracerProvider; constructor(options: CreateFederationOptions) { this.kv = options.kv; this.kvPrefixes = { ...({ activityIdempotence: ["_fedify", "activityIdempotence"], remoteDocument: ["_fedify", "remoteDocument"], publicKey: ["_fedify", "publicKey"], } satisfies FederationKvPrefixes), ...(options.kvPrefixes ?? {}), }; if (options.queue == null) { this.inboxQueue = undefined; this.outboxQueue = undefined; } else if ("enqueue" in options.queue && "listen" in options.queue) { this.inboxQueue = options.queue; this.outboxQueue = options.queue; } else { this.inboxQueue = options.queue.inbox; this.outboxQueue = options.queue.outbox; } this.inboxQueueStarted = false; this.outboxQueueStarted = false; this.manuallyStartQueue = options.manuallyStartQueue ?? false; this.router = new Router({ trailingSlashInsensitive: options.trailingSlashInsensitive, }); this.router.add("/.well-known/webfinger", "webfinger"); this.router.add("/.well-known/nodeinfo", "nodeInfoJrd"); this.objectCallbacks = {}; this.objectTypeIds = {}; if (options.allowPrivateAddress || options.userAgent != null) { if (options.documentLoader != null) { throw new TypeError( "Cannot set documentLoader with allowPrivateAddress or " + "userAgent options.", ); } else if (options.contextLoader != null) { throw new TypeError( "Cannot set contextLoader with allowPrivateAddress or " + "userAgent options.", ); } else if (options.authenticatedDocumentLoaderFactory != null) { throw new TypeError( "Cannot set authenticatedDocumentLoaderFactory with " + "allowPrivateAddress or userAgent options.", ); } } const { allowPrivateAddress, userAgent } = options; this.documentLoader = options.documentLoader ?? kvCache({ loader: getDocumentLoader({ allowPrivateAddress, userAgent }), kv: options.kv, prefix: this.kvPrefixes.remoteDocument, }); this.contextLoader = options.contextLoader ?? this.documentLoader; this.authenticatedDocumentLoaderFactory = options.authenticatedDocumentLoaderFactory ?? ((identity) => getAuthenticatedDocumentLoader(identity, { allowPrivateAddress, userAgent, })); this.userAgent = userAgent; this.onOutboxError = options.onOutboxError; this.signatureTimeWindow = options.signatureTimeWindow ?? { hours: 1 }; this.skipSignatureVerification = options.skipSignatureVerification ?? false; this.outboxRetryPolicy = options.outboxRetryPolicy ?? createExponentialBackoffPolicy(); this.inboxRetryPolicy = options.inboxRetryPolicy ?? createExponentialBackoffPolicy(); this.tracerProvider = options.tracerProvider ?? trace.getTracerProvider(); } #getTracer() { return this.tracerProvider.getTracer(metadata.name, metadata.version); } async #startQueue( ctxData: TContextData, signal?: AbortSignal, queue?: keyof FederationQueueOptions, ): Promise<void> { if (this.inboxQueue == null && this.outboxQueue == null) return; const logger = getLogger(["fedify", "federation", "queue"]); const promises: Promise<void>[] = []; if ( this.inboxQueue != null && (queue == null || queue === "inbox") && !this.inboxQueueStarted ) { logger.debug("Starting an inbox task worker."); this.inboxQueueStarted = true; promises.push( this.inboxQueue.listen( (msg) => this.#listenQueue(ctxData, msg), { signal }, ), ); } if ( this.outboxQueue != null && this.outboxQueue !== this.inboxQueue && (queue == null || queue === "outbox") && !this.outboxQueueStarted ) { logger.debug("Starting an outbox task worker."); this.outboxQueueStarted = true; promises.push( this.outboxQueue.listen( (msg) => this.#listenQueue(ctxData, msg), { signal }, ), ); } await Promise.all(promises); } #listenQueue(ctxData: TContextData, message: Message): Promise<void> { const tracer = this.#getTracer(); const extractedContext = propagation.extract( context.active(), message.traceContext, ); return withContext({ messageId: message.id }, async () => { if (message.type === "outbox") { await tracer.startActiveSpan( "activitypub.outbox", { kind: SpanKind.CONSUMER, attributes: { "activitypub.activity.type": message.activityType, "activitypub.activity.retries": message.attempt, }, }, extractedContext, async (span) => { if (message.activityId != null) { span.setAttribute("activitypub.activity.id", message.activityId); } try { await this.#listenOutboxMessage(ctxData, message, span); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { span.end(); } }, ); } else if (message.type === "inbox") { await tracer.startActiveSpan( "activitypub.inbox", { kind: SpanKind.CONSUMER, attributes: { "activitypub.shared_inbox": message.identifier == null, }, }, extractedContext, async (span) => { try { await this.#listenInboxMessage(ctxData, message, span); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { span.end(); } }, ); } }); } async #listenOutboxMessage( _: TContextData, message: OutboxMessage, span: Span, ): Promise<void> { const logger = getLogger(["fedify", "federation", "outbox"]); const logData = { keyIds: message.keys.map((pair) => pair.keyId), inbox: message.inbox, activity: message.activity, activityId: message.activityId, attempt: message.attempt, headers: message.headers, }; const keys: SenderKeyPair[] = []; let rsaKeyPair: SenderKeyPair | null = null; for (const { keyId, privateKey } of message.keys) { const pair: SenderKeyPair = { keyId: new URL(keyId), privateKey: await importJwk(privateKey, "private"), }; if ( rsaKeyPair == null && pair.privateKey.algorithm.name === "RSASSA-PKCS1-v1_5" ) { rsaKeyPair = pair; } keys.push(pair); } try { await sendActivity({ keys, activity: message.activity, activityId: message.activityId, activityType: message.activityType, inbox: new URL(message.inbox), sharedInbox: message.sharedInbox, headers: new Headers(message.headers), tracerProvider: this.tracerProvider, }); } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); const activity = await Activity.fromJsonLd(message.activity, { contextLoader: this.contextLoader, documentLoader: rsaKeyPair == null ? this.documentLoader : this.authenticatedDocumentLoaderFactory(rsaKeyPair), tracerProvider: this.tracerProvider, }); try { this.onOutboxError?.(error as Error, activity); } catch (error) { logger.error( "An unexpected error occurred in onError handler:\n{error}", { ...logData, error }, ); } const delay = this.outboxRetryPolicy({ elapsedTime: Temporal.Instant.from(message.started).until( Temporal.Now.instant(), ), attempts: message.attempt, }); if (delay != null) { logger.error( "Failed to send activity {activityId} to {inbox} (attempt " + "#{attempt}); retry...:\n{error}", { ...logData, error }, ); await this.outboxQueue?.enqueue( { ...message, attempt: message.attempt + 1, } satisfies OutboxMessage, { delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 ? Temporal.Duration.from({ seconds: 0 }) : delay, }, ); } else { logger.error( "Failed to send activity {activityId} to {inbox} after {attempt} " + "attempts; giving up:\n{error}", { ...logData, error }, ); } return; } logger.info( "Successfully sent activity {activityId} to {inbox}.", { ...logData }, ); } async #listenInboxMessage( ctxData: TContextData, message: InboxMessage, span: Span, ): Promise<void> { const logger = getLogger(["fedify", "federation", "inbox"]); const baseUrl = new URL(message.baseUrl); let context = this.#createContext(baseUrl, ctxData); if (message.identifier != null) { context = this.#createContext(baseUrl, ctxData, { documentLoader: await context.getDocumentLoader({ identifier: message.identifier, }), }); } else if (this.sharedInboxKeyDispatcher != null) { const identity = await this.sharedInboxKeyDispatcher(context); if (identity != null) { context = this.#createContext(baseUrl, ctxData, { documentLoader: "identifier" in identity || "username" in identity || "handle" in identity ? await context.getDocumentLoader(identity) : context.getDocumentLoader(identity), }); } } const activity = await Activity.fromJsonLd(message.activity, context); span.setAttribute("activitypub.activity.type", getTypeId(activity).href); if (activity.id != null) { span.setAttribute("activitypub.activity.id", activity.id.href); } const cacheKey = activity.id == null ? null : [ ...this.kvPrefixes.activityIdempotence, activity.id.href, ] satisfies KvKey; if (cacheKey != null) { const cached = await this.kv.get(cacheKey); if (cached === true) { logger.debug("Activity {activityId} has already been processed.", { activityId: activity.id?.href, activity: message.activity, recipient: message.identifier, }); return; } } await this.#getTracer().startActiveSpan( "activitypub.dispatch_inbox_listener", { kind: SpanKind.INTERNAL }, async (span) => { const dispatched = this.inboxListeners?.dispatchWithClass(activity); if (dispatched == null) { logger.error( "Unsupported activity type:\n{activity}", { activityId: activity.id?.href, activity: message.activity, recipient: message.identifier, trial: message.attempt, }, ); span.setStatus({ code: SpanStatusCode.ERROR, message: `Unsupported activity type: ${getTypeId(activity).href}`, }); span.end(); return; } const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { await listener( context.toInboxContext( message.identifier, message.activity, activity.id?.href, getTypeId(activity).href, ), activity, ); } catch (error) { try { await this.inboxErrorHandler?.(context, error as Error); } catch (error) { logger.error( "An unexpected error occurred in inbox error handler:\n{error}", { error, trial: message.attempt, activityId: activity.id?.href, activity: message.activity, recipient: message.identifier, }, ); } const delay = this.inboxRetryPolicy({ elapsedTime: Temporal.Instant.from(message.started).until( Temporal.Now.instant(), ), attempts: message.attempt, }); if (delay != null) { logger.error( "Failed to process the incoming activity {activityId} (attempt " + "#{attempt}); retry...:\n{error}", { error, attempt: message.attempt, activityId: activity.id?.href, activity: message.activity, recipient: message.identifier, }, ); await this.inboxQueue?.enqueue( { ...message, attempt: message.attempt + 1, } satisfies InboxMessage, { delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 ? Temporal.Duration.from({ seconds: 0 }) : delay, }, ); } else { logger.error( "Failed to process the incoming activity {activityId} after " + "{trial} attempts; giving up:\n{error}", { error, activityId: activity.id?.href, activity: message.activity, recipient: message.identifier, }, ); } span.setStatus({ code: SpanStatusCode.ERROR, message: String(error), }); span.end(); return; } if (cacheKey != null) { await this.kv.set(cacheKey, true, { ttl: Temporal.Duration.from({ days: 1 }), }); } logger.info( "Activity {activityId} has been processed.", { activityId: activity.id?.href, activity: message.activity, recipient: message.identifier, }, ); span.end(); }, ); } startQueue( contextData: TContextData, options: FederationStartQueueOptions = {}, ): Promise<void> { return this.#startQueue(contextData, options.signal, options.queue); } createContext(baseUrl: URL, contextData: TContextData): Context<TContextData>; createContext( request: Request, contextData: TContextData, ): RequestContext<TContextData>; createContext( urlOrRequest: Request | URL, contextData: TContextData, ): Context<TContextData> { return urlOrRequest instanceof Request ? this.#createContext(urlOrRequest, contextData) : this.#createContext(urlOrRequest, contextData); } #createContext( baseUrl: URL, contextData: TContextData, opts?: { documentLoader?: DocumentLoader }, ): ContextImpl<TContextData>; #createContext( request: Request, contextData: TContextData, opts?: { documentLoader?: DocumentLoader; invokedFromActorDispatcher?: { identifier: string }; invokedFromObjectDispatcher?: { // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => Object) & { typeId: URL }; values: Record<string, string>; }; }, ): RequestContextImpl<TContextData>; #createContext( urlOrRequest: Request | URL, contextData: TContextData, opts: { documentLoader?: DocumentLoader; invokedFromActorDispatcher?: { identifier: string }; invokedFromObjectDispatcher?: { // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => Object) & { typeId: URL }; values: Record<string, string>; }; } = {}, ): ContextImpl<TContextData> | RequestContextImpl<TContextData> { const request = urlOrRequest instanceof Request ? urlOrRequest : null; const url = urlOrRequest instanceof URL ? new URL(urlOrRequest) : new URL(urlOrRequest.url); if (request == null) { url.pathname = "/"; url.hash = ""; url.search = ""; } const ctxOptions: ContextOptions<TContextData> = { url, federation: this, data: contextData, documentLoader: opts.documentLoader ?? this.documentLoader, }; if (request == null) return new ContextImpl(ctxOptions); return new RequestContextImpl({ ...ctxOptions, request, invokedFromActorDispatcher: opts.invokedFromActorDispatcher, invokedFromObjectDispatcher: opts.invokedFromObjectDispatcher, }); } setNodeInfoDispatcher( path: string, dispatcher: NodeInfoDispatcher<TContextData>, ) { if (this.router.has("nodeInfo")) { throw new RouterError("NodeInfo dispatcher already set."); } const variables = this.router.add(path, "nodeInfo"); if (variables.size !== 0) { throw new RouterError( "Path for NodeInfo dispatcher must have no variables.", ); } this.nodeInfoDispatcher = dispatcher; } setActorDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: ActorDispatcher<TContextData>, ): ActorCallbackSetters<TContextData> { if (this.router.has("actor")) { throw new RouterError("Actor dispatcher already set."); } const variables = this.router.add(path, "actor"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for actor dispatcher must have one variable: {identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "actor"]).warn( "The {{handle}} variable in the actor dispatcher path is deprecated. " + "Use {{identifier}} instead.", ); } const callbacks: ActorCallbacks<TContextData> = { dispatcher: async (context, identifier) => { const actor = await this.#getTracer().startActiveSpan( "activitypub.dispatch_actor", { kind: SpanKind.SERVER, attributes: { "fedify.actor.identifier": identifier }, }, async (span) => { try { const actor = await dispatcher(context, identifier); span.setAttribute( "activitypub.actor.id", (actor?.id ?? context.getActorUri(identifier)).href, ); if (actor == null) { span.setStatus({ code: SpanStatusCode.ERROR }); } else { span.setAttribute( "activitypub.actor.type", getTypeId(actor).href, ); } return actor; } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(error), }); throw error; } finally { span.end(); } }, ); if (actor == null) return null; const logger = getLogger(["fedify", "federation", "actor"]); if (actor.id == null) { logger.warn( "Actor dispatcher returned an actor without an id property. " + "Set the property with Context.getActorUri(identifier).", ); } else if (actor.id.href != context.getActorUri(identifier).href) { logger.warn( "Actor dispatcher returned an actor with an id property that " + "does not match the actor URI. Set the property with " + "Context.getActorUri(identifier).", ); } if ( this.followingCallbacks != null && this.followingCallbacks.dispatcher != null ) { if (actor.followingId == null) { logger.warn( "You configured a following collection dispatcher, but the " + "actor does not have a following property. Set the property " + "with Context.getFollowingUri(identifier).", ); } else if ( actor.followingId.href != context.getFollowingUri(identifier).href ) { logger.warn( "You configured a following collection dispatcher, but the " + "actor's following property does not match the following " + "collection URI. Set the property with " + "Context.getFollowingUri(identifier).", ); } } if ( this.followersCallbacks != null && this.followersCallbacks.dispatcher != null ) { if (actor.followersId == null) { logger.warn( "You configured a followers collection dispatcher, but the " + "actor does not have a followers property. Set the property " + "with Context.getFollowersUri(identifier).", ); } else if ( actor.followersId.href != context.getFollowersUri(identifier).href ) { logger.warn( "You configured a followers collection dispatcher, but the " + "actor's followers property does not match the followers " + "collection URI. Set the property with " + "Context.getFollowersUri(identifier).", ); } } if ( this.outboxCallbacks != null && this.outboxCallbacks.dispatcher != null ) { if (actor?.outboxId == null) { logger.warn( "You configured an outbox collection dispatcher, but the " + "actor does not have an outbox property. Set the property " + "with Context.getOutboxUri(identifier).", ); } else if ( actor.outboxId.href != context.getOutboxUri(identifier).href ) { logger.warn( "You configured an outbox collection dispatcher, but the " + "actor's outbox property does not match the outbox collection " + "URI. Set the property with Context.getOutboxUri(identifier).", ); } } if ( this.likedCallbacks != null && this.likedCallbacks.dispatcher != null ) { if (actor?.likedId == null) { logger.warn( "You configured a liked collection dispatcher, but the " + "actor does not have a liked property. Set the property " + "with Context.getLikedUri(identifier).", ); } else if ( actor.likedId.href != context.getLikedUri(identifier).href ) { logger.warn( "You configured a liked collection dispatcher, but the " + "actor's liked property does not match the liked collection " + "URI. Set the property with Context.getLikedUri(identifier).", ); } } if ( this.featuredCallbacks != null && this.featuredCallbacks.dispatcher != null ) { if (actor?.featuredId == null) { logger.warn( "You configured a featured collection dispatcher, but the " + "actor does not have a featured property. Set the property " + "with Context.getFeaturedUri(identifier).", ); } else if ( actor.featuredId.href != context.getFeaturedUri(identifier).href ) { logger.warn( "You configured a featured collection dispatcher, but the " + "actor's featured property does not match the featured collection " + "URI. Set the property with Context.getFeaturedUri(identifier).", ); } } if ( this.featuredTagsCallbacks != null && this.featuredTagsCallbacks.dispatcher != null ) { if (actor?.featuredTagsId == null) { logger.warn( "You configured a featured tags collection dispatcher, but the " + "actor does not have a featuredTags property. Set the property " + "with Context.getFeaturedTagsUri(identifier).", ); } else if ( actor.featuredTagsId.href != context.getFeaturedTagsUri(identifier).href ) { logger.warn( "You configured a featured tags collection dispatcher, but the " + "actor's featuredTags property does not match the featured tags " + "collection URI. Set the property with " + "Context.getFeaturedTagsUri(identifier).", ); } } if (this.router.has("inbox")) { if (actor.inboxId == null) { logger.warn( "You configured inbox listeners, but the actor does not " + "have an inbox property. Set the property with " + "Context.getInboxUri(identifier).", ); } else if ( actor.inboxId.href != context.getInboxUri(identifier).href ) { logger.warn( "You configured inbox listeners, but the actor's inbox " + "property does not match the inbox URI. Set the property " + "with Context.getInboxUri(identifier).", ); } if (actor.endpoints == null || actor.endpoints.sharedInbox == null) { logger.warn( "You configured inbox listeners, but the actor does not have " + "a endpoints.sharedInbox property. Set the property with " + "Context.getInboxUri().", ); } else if ( actor.endpoints.sharedInbox.href != context.getInboxUri().href ) { logger.warn( "You configured inbox listeners, but the actor's " + "endpoints.sharedInbox property does not match the shared inbox " + "URI. Set the property with Context.getInboxUri().", ); } } if (callbacks.keyPairsDispatcher != null) { if (actor.publicKeyId == null) { logger.warn( "You configured a key pairs dispatcher, but the actor does " + "not have a publicKey property. Set the property with " + "Context.getActorKeyPairs(identifier).", ); } if (actor.assertionMethodId == null) { logger.warn( "You configured a key pairs dispatcher, but the actor does " + "not have an assertionMethod property. Set the property " + "with Context.getActorKeyPairs(identifier).", ); } } return actor; }, }; this.actorCallbacks = callbacks; const setters: ActorCallbackSetters<TContextData> = { setKeyPairsDispatcher: ( dispatcher: ActorKeyPairsDispatcher<TContextData>, ) => { callbacks.keyPairsDispatcher = (ctx, identifier) => this.#getTracer().startActiveSpan( "activitypub.dispatch_actor_key_pairs", { kind: SpanKind.SERVER, attributes: { "activitypub.actor.id": ctx.getActorUri(identifier).href, "fedify.actor.identifier": identifier, }, }, async (span) => { try { return await dispatcher(ctx, identifier); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { span.end(); } }, ); return setters; }, mapHandle(mapper: ActorHandleMapper<TContextData>) { callbacks.handleMapper = mapper; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: `${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}`, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam>; setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: `${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}`, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam>; setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: `${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}`, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam>; setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: `${string}{${TParam}}${string}{${TParam}}${string}{${TParam}}${string}`, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam>; setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: `${string}{${TParam}}${string}{${TParam}}${string}`, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam>; setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: `${string}{${TParam}}${string}`, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam>; setObjectDispatcher<TObject extends Object, TParam extends string>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, path: string, dispatcher: ObjectDispatcher<TContextData, TObject, TParam>, ): ObjectCallbackSetters<TContextData, TObject, TParam> { const routeName = `object:${cls.typeId.href}`; if (this.router.has(routeName)) { throw new RouterError(`Object dispatcher for ${cls.name} already set.`); } const variables = this.router.add(path, routeName); if (variables.size < 1) { throw new RouterError( "Path for object dispatcher must have at least one variable.", ); } const callbacks: ObjectCallbacks<TContextData, TParam> = { dispatcher: (ctx, values) => { const tracer = this.#getTracer(); return tracer.startActiveSpan( "activitypub.dispatch_object", { kind: SpanKind.SERVER, attributes: { "fedify.object.type": cls.typeId.href, ...globalThis.Object.fromEntries( globalThis.Object.entries(values).map(([k, v]) => [ `fedify.object.values.${k}`, v, ]), ), }, }, async (span) => { try { const object = await dispatcher(ctx, values); span.setAttribute( "activitypub.object.id", (object?.id ?? ctx.getObjectUri(cls, values)).href, ); if (object == null) { span.setStatus({ code: SpanStatusCode.ERROR }); } else { span.setAttribute( "activitypub.object.type", getTypeId(object).href, ); } return object; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e), }); throw e; } finally { span.end(); } }, ); }, parameters: variables as unknown as Set<TParam>, }; this.objectCallbacks[cls.typeId.href] = callbacks; this.objectTypeIds[cls.typeId.href] = cls; const setters: ObjectCallbackSetters<TContextData, TObject, TParam> = { authorize(predicate: ObjectAuthorizePredicate<TContextData, TParam>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setInboxDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Activity, RequestContext<TContextData>, TContextData, void >, ): CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > { if (this.inboxCallbacks != null) { throw new RouterError("Inbox dispatcher already set."); } if (this.router.has("inbox")) { if (this.inboxPath !== path) { throw new RouterError( "Inbox dispatcher path must match inbox listener path.", ); } } else { const variables = this.router.add(path, "inbox"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for inbox dispatcher must have one variable: {identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "inbox"]).warn( "The {{handle}} variable in the inbox dispatcher path is deprecated. " + "Use {{identifier}} instead.", ); } this.inboxPath = path; } const callbacks: CollectionCallbacks< Activity, RequestContext<TContextData>, TContextData, void > = { dispatcher }; this.inboxCallbacks = callbacks; const setters: CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > = { setCounter(counter: CollectionCounter<TContextData, void>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setOutboxDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Activity, RequestContext<TContextData>, TContextData, void >, ): CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > { if (this.router.has("outbox")) { throw new RouterError("Outbox dispatcher already set."); } const variables = this.router.add(path, "outbox"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for outbox dispatcher must have one variable: {identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "outbox"]).warn( "The {{handle}} variable in the outbox dispatcher path is deprecated. " + "Use {{identifier}} instead.", ); } const callbacks: CollectionCallbacks< Activity, RequestContext<TContextData>, TContextData, void > = { dispatcher }; this.outboxCallbacks = callbacks; const setters: CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > = { setCounter(counter: CollectionCounter<TContextData, void>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setFollowingDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Actor | URL, RequestContext<TContextData>, TContextData, void >, ): CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > { if (this.router.has("following")) { throw new RouterError("Following collection dispatcher already set."); } const variables = this.router.add(path, "following"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for following collection dispatcher must have one variable: " + "{identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "collection"]).warn( "The {{handle}} variable in the following collection dispatcher path " + "is deprecated. Use {{identifier}} instead.", ); } const callbacks: CollectionCallbacks< Actor | URL, RequestContext<TContextData>, TContextData, void > = { dispatcher }; this.followingCallbacks = callbacks; const setters: CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > = { setCounter(counter: CollectionCounter<TContextData, void>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setFollowersDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Recipient, Context<TContextData>, TContextData, URL >, ): CollectionCallbackSetters<Context<TContextData>, TContextData, URL> { if (this.router.has("followers")) { throw new RouterError("Followers collection dispatcher already set."); } const variables = this.router.add(path, "followers"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for followers collection dispatcher must have one variable: " + "{identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "collection"]).warn( "The {{handle}} variable in the followers collection dispatcher path " + "is deprecated. Use {{identifier}} instead.", ); } const callbacks: CollectionCallbacks< Recipient, Context<TContextData>, TContextData, URL > = { dispatcher }; this.followersCallbacks = callbacks; const setters: CollectionCallbackSetters< Context<TContextData>, TContextData, URL > = { setCounter(counter: CollectionCounter<TContextData, URL>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor<Context<TContextData>, TContextData, URL>, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor<Context<TContextData>, TContextData, URL>, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setLikedDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Like, RequestContext<TContextData>, TContextData, void >, ): CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > { if (this.router.has("liked")) { throw new RouterError("Liked collection dispatcher already set."); } const variables = this.router.add(path, "liked"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for liked collection dispatcher must have one variable: " + "{identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "collection"]).warn( "The {{handle}} variable in the liked collection dispatcher path " + "is deprecated. Use {{identifier}} instead.", ); } const callbacks: CollectionCallbacks< Like, RequestContext<TContextData>, TContextData, void > = { dispatcher }; this.likedCallbacks = callbacks; const setters: CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > = { setCounter(counter: CollectionCounter<TContextData, void>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setFeaturedDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Object, RequestContext<TContextData>, TContextData, void >, ): CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > { if (this.router.has("featured")) { throw new RouterError("Featured collection dispatcher already set."); } const variables = this.router.add(path, "featured"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for featured collection dispatcher must have one variable: " + "{identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "collection"]).warn( "The {{handle}} variable in the featured collection dispatcher path " + "is deprecated. Use {{identifier}} instead.", ); } const callbacks: CollectionCallbacks< Object, RequestContext<TContextData>, TContextData, void > = { dispatcher }; this.featuredCallbacks = callbacks; const setters: CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > = { setCounter(counter: CollectionCounter<TContextData, void>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setFeaturedTagsDispatcher( path: `${string}{identifier}${string}` | `${string}{handle}${string}`, dispatcher: CollectionDispatcher< Hashtag, RequestContext<TContextData>, TContextData, void >, ): CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > { if (this.router.has("featuredTags")) { throw new RouterError("Featured tags collection dispatcher already set."); } const variables = this.router.add(path, "featuredTags"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for featured tags collection dispatcher must have one " + "variable: {identifier}", ); } if (variables.has("handle")) { getLogger(["fedify", "federation", "collection"]).warn( "The {{handle}} variable in the featured tags collection dispatcher " + "path is deprecated. Use {{identifier}} instead.", ); } const callbacks: CollectionCallbacks< Hashtag, RequestContext<TContextData>, TContextData, void > = { dispatcher }; this.featuredTagsCallbacks = callbacks; const setters: CollectionCallbackSetters< RequestContext<TContextData>, TContextData, void > = { setCounter(counter: CollectionCounter<TContextData, void>) { callbacks.counter = counter; return setters; }, setFirstCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.firstCursor = cursor; return setters; }, setLastCursor( cursor: CollectionCursor< RequestContext<TContextData>, TContextData, void >, ) { callbacks.lastCursor = cursor; return setters; }, authorize(predicate: AuthorizePredicate<TContextData>) { callbacks.authorizePredicate = predicate; return setters; }, }; return setters; } setInboxListeners( inboxPath: `${string}{identifier}${string}` | `${string}{handle}${string}`, sharedInboxPath?: string, ): InboxListenerSetters<TContextData> { if (this.inboxListeners != null) { throw new RouterError("Inbox listeners already set."); } if (this.router.has("inbox")) { if (this.inboxPath !== inboxPath) { throw new RouterError( "Inbox listener path must match inbox dispatcher path.", ); } } else { const variables = this.router.add(inboxPath, "inbox"); if ( variables.size !== 1 || !(variables.has("identifier") || variables.has("handle")) ) { throw new RouterError( "Path for inbox must have one variable: {identifier}", ); } this.inboxPath = inboxPath; if (variables.has("handle")) { getLogger(["fedify", "federation", "inbox"]).warn( "The {{handle}} variable in the inbox path is deprecated. " + "Use {{identifier}} instead.", ); } } if (sharedInboxPath != null) { const siVars = this.router.add(sharedInboxPath, "sharedInbox"); if (siVars.size !== 0) { throw new RouterError( "Path for shared inbox must have no variables.", ); } } const listeners = this.inboxListeners = new InboxListenerSet(); const setters: InboxListenerSetters<TContextData> = { on<TActivity extends Activity>( // deno-lint-ignore no-explicit-any type: new (...args: any[]) => TActivity, listener: InboxListener<TContextData, TActivity>, ): InboxListenerSetters<TContextData> { listeners.add(type, listener as InboxListener<TContextData, Activity>); return setters; }, onError: ( handler: InboxErrorHandler<TContextData>, ): InboxListenerSetters<TContextData> => { this.inboxErrorHandler = handler; return setters; }, setSharedKeyDispatcher: ( dispatcher: SharedInboxKeyDispatcher<TContextData>, ): InboxListenerSetters<TContextData> => { this.sharedInboxKeyDispatcher = dispatcher; return setters; }, }; return setters; } async sendActivity( keys: SenderKeyPair[], recipients: Recipient | Recipient[], activity: Activity, options: SendActivityInternalOptions<TContextData>, span?: Span, ): Promise<void> { const logger = getLogger(["fedify", "federation", "outbox"]); const { preferSharedInbox, immediate, excludeBaseUris, collectionSync, contextData, } = options; if (keys.length < 1) { throw new TypeError("The sender's keys must not be empty."); } for (const { privateKey } of keys) { validateCryptoKey(privateKey, "private"); } if (activity.actorId == null) { logger.error( "Activity {activityId} to send does not have an actor.", { activity, activityId: activity?.id?.href }, ); throw new TypeError( "The activity to send must have at least one actor property.", ); } if (activity.id == null) { const id = new URL(`urn:uuid:${crypto.randomUUID()}`); activity = activity.clone({ id }); span?.setAttribute("activitypub.activity.id", id.href); } const inboxes = extractInboxes({ recipients: Array.isArray(recipients) ? recipients : [recipients], preferSharedInbox, excludeBaseUris, }); logger.debug("Sending activity {activityId} to inboxes:\n{inboxes}", { inboxes: globalThis.Object.keys(inboxes), activityId: activity.id?.href, activity, }); if (activity.id == null) { throw new TypeError("The activity to send must have an id."); } if (activity.actorId == null) { throw new TypeError( "The activity to send must have at least one actor property.", ); } else if (keys.length < 1) { throw new TypeError("The keys must not be empty."); } const activityId = activity.id.href; let proofCreated = false; let rsaKey: { keyId: URL; privateKey: CryptoKey } | null = null; for (const { keyId, privateKey } of keys) { validateCryptoKey(privateKey, "private"); if (rsaKey == null && privateKey.algorithm.name === "RSASSA-PKCS1-v1_5") { rsaKey = { keyId, privateKey }; continue; } if (privateKey.algorithm.name === "Ed25519") { activity = await signObject(activity, privateKey, keyId, { contextLoader: this.contextLoader, tracerProvider: this.tracerProvider, }); proofCreated = true; } } let jsonLd = await activity.toJsonLd({ format: "compact", contextLoader: this.contextLoader, }); if (rsaKey == null) { logger.warn( "No supported key found to create a Linked Data signature for " + "the activity {activityId}. The activity will be sent without " + "a Linked Data signature. In order to create a Linked Data " + "signature, at least one RSASSA-PKCS1-v1_5 key must be provided.", { activityId, keys: keys.map((pair) => ({ keyId: pair.keyId.href, privateKey: pair.privateKey, })), }, ); } else { jsonLd = await signJsonLd(jsonLd, rsaKey.privateKey, rsaKey.keyId, { contextLoader: this.contextLoader, tracerProvider: this.tracerProvider, }); } if (!proofCreated) { logger.warn( "No supported key found to create a proof for the activity {activityId}. " + "The activity will be sent without a proof. " + "In order to create a proof, at least one Ed25519 key must be provided.", { activityId, keys: keys.map((pair) => ({ keyId: pair.keyId.href, privateKey: pair.privateKey, })), }, ); } if (immediate || this.outboxQueue == null) { if (immediate) { logger.debug( "Sending activity immediately without queue since immediate option " + "is set.", { activityId: activity.id!.href, activity: jsonLd }, ); } else { logger.debug( "Sending activity immediately without queue since queue is not set.", { activityId: activity.id!.href, activity: jsonLd }, ); } const promises: Promise<void>[] = []; for (const inbox in inboxes) { promises.push( sendActivity({ keys, activity: jsonLd, activityId: activity.id?.href, activityType: getTypeId(activity).href, inbox: new URL(inbox), sharedInbox: inboxes[inbox].sharedInbox, headers: collectionSync == null ? undefined : new Headers({ "Collection-Synchronization": await buildCollectionSynchronizationHeader( collectionSync, inboxes[inbox].actorIds, ), }), tracerProvider: this.tracerProvider, }), ); } await Promise.all(promises); return; } logger.debug( "Enqueuing activity {activityId} to send later.", { activityId: activity.id!.href, activity: jsonLd }, ); const keyJwkPairs: SenderKeyJwkPair[] = []; for (const { keyId, privateKey } of keys) { const privateKeyJwk = await exportJwk(privateKey); keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk }); } if (!this.manuallyStartQueue) this.#startQueue(contextData); const carrier: Record<string, string> = {}; propagation.inject(context.active(), carrier); const promises: Promise<void>[] = []; for (const inbox in inboxes) { const message: OutboxMessage = { type: "outbox", id: crypto.randomUUID(), keys: keyJwkPairs, activity: jsonLd, activityId: activity.id?.href, activityType: getTypeId(activity).href, inbox, sharedInbox: inboxes[inbox].sharedInbox, started: new Date().toISOString(), attempt: 0, headers: collectionSync == null ? {} : { "Collection-Synchronization": await buildCollectionSynchronizationHeader( collectionSync, inboxes[inbox].actorIds, ), }, traceContext: carrier, }; promises.push(this.outboxQueue.enqueue(message)); } const results = await Promise.allSettled(promises); const errors = results .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { logger.error( "Failed to enqueue activity {activityId} to send later: {errors}", { activityId: activity.id!.href, errors }, ); if (errors.length > 1) { throw new AggregateError( errors, `Failed to enqueue activity ${activityId} to send later.`, ); } throw errors[0]; } } fetch( request: Request, options: FederationFetchOptions<TContextData>, ): Promise<Response> { const requestId = getRequestId(request); return withContext({ requestId }, async () => { const tracer = this.#getTracer(); return await tracer.startActiveSpan( request.method, { kind: SpanKind.SERVER, attributes: { [ATTR_HTTP_REQUEST_METHOD]: request.method, [ATTR_URL_FULL]: request.url, }, }, async (span) => { const logger = getLogger(["fedify", "federation", "http"]); if (span.isRecording()) { for (const [k, v] of request.headers) { span.setAttribute(ATTR_HTTP_REQUEST_HEADER(k), [v]); } } let response: Response; try { response = await this.#fetch(request, { ...options, span, tracer }); } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: `${error}`, }); span.end(); logger.error( "An error occurred while serving request {method} {url}: {error}", { method: request.method, url: request.url, error }, ); throw error; } if (span.isRecording()) { span.setAttribute(ATTR_HTTP_RESPONSE_STATUS_CODE, response.status); for (const [k, v] of response.headers) { span.setAttribute(ATTR_HTTP_RESPONSE_HEADER(k), [v]); } span.setStatus({ code: response.status >= 500 ? SpanStatusCode.ERROR : SpanStatusCode.UNSET, message: response.statusText, }); } span.end(); const url = new URL(request.url); const logTpl = "{method} {path}: {status}"; const values = { method: request.method, path: `${url.pathname}${url.search}`, url: request.url, status: response.status, }; if (response.status >= 500) logger.error(logTpl, values); else if (response.status >= 400) logger.warn(logTpl, values); else logger.info(logTpl, values); return response; }, ); }); } async #fetch( request: Request, { onNotFound, onNotAcceptable, onUnauthorized, contextData, span, tracer, }: FederationFetchOptions<TContextData> & { span: Span; tracer: Tracer }, ): Promise<Response> { onNotFound ??= notFound; onNotAcceptable ??= notAcceptable; onUnauthorized ??= unauthorized; const url = new URL(request.url); const route = this.router.route(url.pathname); if (route == null) return await onNotFound(request); span.updateName(`${request.method} ${route.template}`); let context = this.#createContext(request, contextData); const routeName = route.name.replace(/:.*$/, ""); switch (routeName) { case "webfinger": return await handleWebFinger(request, { context, actorDispatcher: this.actorCallbacks?.dispatcher, actorHandleMapper: this.actorCallbacks?.handleMapper, onNotFound, tracer, }); case "nodeInfoJrd": return await handleNodeInfoJrd(request, context); case "nodeInfo": return await handleNodeInfo(request, { context, nodeInfoDispatcher: this.nodeInfoDispatcher!, }); case "actor": context = this.#createContext(request, contextData, { invokedFromActorDispatcher: { identifier: route.values.identifier ?? route.values.handle, }, }); return await handleActor(request, { identifier: route.values.identifier ?? route.values.handle, context, actorDispatcher: this.actorCallbacks?.dispatcher, authorizePredicate: this.actorCallbacks?.authorizePredicate, onUnauthorized, onNotFound, onNotAcceptable, }); case "object": { const typeId = route.name.replace(/^object:/, ""); const callbacks = this.objectCallbacks[typeId]; const cls = this.objectTypeIds[typeId]; context = this.#createContext(request, contextData, { invokedFromObjectDispatcher: { cls, values: route.values }, }); return await handleObject(request, { values: route.values, context, objectDispatcher: callbacks?.dispatcher, authorizePredicate: callbacks?.authorizePredicate, onUnauthorized, onNotFound, onNotAcceptable, }); } case "outbox": return await handleCollection(request, { name: "outbox", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getOutboxUri.bind(context), context, collectionCallbacks: this.outboxCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); case "inbox": if (request.method !== "POST") { return await handleCollection(request, { name: "inbox", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getInboxUri.bind(context), context, collectionCallbacks: this.inboxCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); } context = this.#createContext(request, contextData, { documentLoader: await context.getDocumentLoader({ identifier: route.values.identifier ?? route.values.handle, }), }); // falls through case "sharedInbox": if (routeName !== "inbox" && this.sharedInboxKeyDispatcher != null) { const identity = await this.sharedInboxKeyDispatcher(context); if (identity != null) { context = this.#createContext(request, contextData, { documentLoader: "identifier" in identity || "username" in identity || "handle" in identity ? await context.getDocumentLoader(identity) : context.getDocumentLoader(identity), }); } } if (!this.manuallyStartQueue) this.#startQueue(contextData); return await handleInbox(request, { recipient: route.values.identifier ?? route.values.handle ?? null, context, inboxContextFactory: context.toInboxContext.bind(context), kv: this.kv, kvPrefixes: this.kvPrefixes, queue: this.inboxQueue, actorDispatcher: this.actorCallbacks?.dispatcher, inboxListeners: this.inboxListeners, inboxErrorHandler: this.inboxErrorHandler, onNotFound, signatureTimeWindow: this.signatureTimeWindow, skipSignatureVerification: this.skipSignatureVerification, tracerProvider: this.tracerProvider, }); case "following": return await handleCollection(request, { name: "following", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getFollowingUri.bind(context), context, collectionCallbacks: this.followingCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); case "followers": { let baseUrl = url.searchParams.get("base-url"); if (baseUrl != null) { const u = new URL(baseUrl); baseUrl = `${u.origin}/`; } return await handleCollection(request, { name: "followers", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getFollowersUri.bind(context), context, filter: baseUrl != null ? new URL(baseUrl) : undefined, filterPredicate: baseUrl != null ? ((i) => (i instanceof URL ? i.href : i.id?.href ?? "").startsWith( baseUrl!, )) : undefined, collectionCallbacks: this.followersCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); } case "liked": return await handleCollection(request, { name: "liked", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getLikedUri.bind(context), context, collectionCallbacks: this.likedCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); case "featured": return await handleCollection(request, { name: "featured", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getFeaturedUri.bind(context), context, collectionCallbacks: this.featuredCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); case "featuredTags": return await handleCollection(request, { name: "featured tags", identifier: route.values.identifier ?? route.values.handle, uriGetter: context.getFeaturedTagsUri.bind(context), context, collectionCallbacks: this.featuredTagsCallbacks, tracerProvider: this.tracerProvider, onUnauthorized, onNotFound, onNotAcceptable, }); default: { const response = onNotFound(request); return response instanceof Promise ? await response : response; } } } } interface ContextOptions<TContextData> { url: URL; federation: FederationImpl<TContextData>; data: TContextData; documentLoader: DocumentLoader; invokedFromActorKeyPairsDispatcher?: { identifier: string }; } export class ContextImpl<TContextData> implements Context<TContextData> { readonly url: URL; readonly federation: FederationImpl<TContextData>; readonly data: TContextData; readonly documentLoader: DocumentLoader; readonly invokedFromActorKeyPairsDispatcher?: { identifier: string }; constructor( { url, federation, data, documentLoader, invokedFromActorKeyPairsDispatcher, }: ContextOptions<TContextData>, ) { this.url = url; this.federation = federation; this.data = data; this.documentLoader = documentLoader; this.invokedFromActorKeyPairsDispatcher = invokedFromActorKeyPairsDispatcher; } toInboxContext( recipient: string | null, activity: unknown, activityId: string | undefined, activityType: string, ): InboxContextImpl<TContextData> { return new InboxContextImpl(recipient, activity, activityId, activityType, { url: this.url, federation: this.federation, data: this.data, documentLoader: this.documentLoader, invokedFromActorKeyPairsDispatcher: this.invokedFromActorKeyPairsDispatcher, }); } get hostname(): string { return this.url.hostname; } get host(): string { return this.url.host; } get origin(): string { return this.url.origin; } get contextLoader(): DocumentLoader { return this.federation.contextLoader; } get tracerProvider(): TracerProvider { return this.federation.tracerProvider; } getNodeInfoUri(): URL { const path = this.federation.router.build("nodeInfo", {}); if (path == null) { throw new RouterError("No NodeInfo dispatcher registered."); } return new URL(path, this.url); } getActorUri(identifier: string): URL { const path = this.federation.router.build( "actor", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No actor dispatcher registered."); } return new URL(path, this.url); } getObjectUri<TObject extends Object>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, values: Record<string, string>, ): URL { const callbacks = this.federation.objectCallbacks[cls.typeId.href]; if (callbacks == null) { throw new RouterError("No object dispatcher registered."); } for (const param of callbacks.parameters) { if (!(param in values)) { throw new TypeError(`Missing parameter: ${param}`); } } const path = this.federation.router.build( `object:${cls.typeId.href}`, values, ); if (path == null) { throw new RouterError("No object dispatcher registered."); } return new URL(path, this.url); } getOutboxUri(identifier: string): URL { const path = this.federation.router.build( "outbox", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No outbox dispatcher registered."); } return new URL(path, this.url); } getInboxUri(): URL; getInboxUri(identifier: string): URL; getInboxUri(identifier?: string): URL { if (identifier == null) { const path = this.federation.router.build("sharedInbox", {}); if (path == null) { throw new RouterError("No shared inbox path registered."); } return new URL(path, this.url); } const path = this.federation.router.build( "inbox", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No inbox path registered."); } return new URL(path, this.url); } getFollowingUri(identifier: string): URL { const path = this.federation.router.build( "following", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No following collection path registered."); } return new URL(path, this.url); } getFollowersUri(identifier: string): URL { const path = this.federation.router.build( "followers", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No followers collection path registered."); } return new URL(path, this.url); } getLikedUri(identifier: string): URL { const path = this.federation.router.build( "liked", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No liked collection path registered."); } return new URL(path, this.url); } getFeaturedUri(identifier: string): URL { const path = this.federation.router.build( "featured", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No featured collection path registered."); } return new URL(path, this.url); } getFeaturedTagsUri(identifier: string): URL { const path = this.federation.router.build( "featuredTags", { identifier, handle: identifier }, ); if (path == null) { throw new RouterError("No featured tags collection path registered."); } return new URL(path, this.url); } parseUri(uri: URL | null): ParseUriResult | null { if (uri == null) return null; if (uri.origin !== this.url.origin) return null; const route = this.federation.router.route(uri.pathname); const logger = getLogger(["fedify", "federation"]); if (route == null) return null; else if (route.name === "sharedInbox") { return { type: "inbox", identifier: undefined, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return undefined; }, }; } const identifier = "identifier" in route.values ? route.values.identifier : route.values.handle; if (route.name === "actor") { return { type: "actor", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name.startsWith("object:")) { const typeId = route.name.replace(/^object:/, ""); return { type: "object", class: this.federation.objectTypeIds[typeId], typeId: new URL(typeId), values: route.values, }; } else if (route.name === "inbox") { return { type: "inbox", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name === "outbox") { return { type: "outbox", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name === "following") { return { type: "following", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name === "followers") { return { type: "followers", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name === "liked") { return { type: "liked", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name === "featured") { return { type: "featured", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } else if (route.name === "featuredTags") { return { type: "featuredTags", identifier, get handle() { logger.warn( "The ParseUriResult.handle property is deprecated; " + "use ParseUriResult.identifier instead.", ); return identifier; }, }; } return null; } async getActorKeyPairs(identifier: string): Promise<ActorKeyPair[]> { const logger = getLogger(["fedify", "federation", "actor"]); if (this.invokedFromActorKeyPairsDispatcher != null) { logger.warn( "Context.getActorKeyPairs({getActorKeyPairsIdentifier}) method is " + "invoked from the actor key pairs dispatcher " + "({actorKeyPairsDispatcherIdentifier}); this may cause " + "an infinite loop.", { getActorKeyPairsIdentifier: identifier, actorKeyPairsDispatcherIdentifier: this.invokedFromActorKeyPairsDispatcher.identifier, }, ); } let keyPairs: (CryptoKeyPair & { keyId: URL })[]; try { keyPairs = await this.getKeyPairsFromIdentifier(identifier); } catch (_) { logger.warn("No actor key pairs dispatcher registered."); return []; } const owner = this.getActorUri(identifier); const result = []; for (const keyPair of keyPairs) { const newPair: ActorKeyPair = { ...keyPair, cryptographicKey: new CryptographicKey({ id: keyPair.keyId, owner, publicKey: keyPair.publicKey, }), multikey: new Multikey({ id: keyPair.keyId, controller: owner, publicKey: keyPair.publicKey, }), }; result.push(newPair); } return result; } protected async getKeyPairsFromIdentifier( identifier: string, ): Promise<(CryptoKeyPair & { keyId: URL })[]> { const logger = getLogger(["fedify", "federation", "actor"]); if (this.federation.actorCallbacks?.keyPairsDispatcher == null) { throw new Error("No actor key pairs dispatcher registered."); } const path = this.federation.router.build( "actor", { identifier, handle: identifier }, ); if (path == null) { logger.warn("No actor dispatcher registered."); return []; } const actorUri = new URL(path, this.url); const keyPairs = await this.federation.actorCallbacks?.keyPairsDispatcher( new ContextImpl({ ...this, invokedFromActorKeyPairsDispatcher: { identifier }, }), identifier, ); if (keyPairs.length < 1) { logger.warn("No key pairs found for actor {identifier}.", { identifier }); } let i = 0; const result = []; for (const keyPair of keyPairs) { result.push({ ...keyPair, keyId: new URL( // For backwards compatibility, the first key is always the #main-key: i == 0 ? `#main-key` : `#key-${i + 1}`, actorUri, ), }); i++; } return result; } protected async getRsaKeyPairFromIdentifier( identifier: string, ): Promise<CryptoKeyPair & { keyId: URL } | null> { const keyPairs = await this.getKeyPairsFromIdentifier(identifier); for (const keyPair of keyPairs) { const { privateKey } = keyPair; if ( privateKey.algorithm.name === "RSASSA-PKCS1-v1_5" && (privateKey.algorithm as unknown as { hash: { name: string } }).hash .name === "SHA-256" ) { return keyPair; } } getLogger(["fedify", "federation", "actor"]).warn( "No RSA-PKCS#1-v1.5 SHA-256 key found for actor {identifier}.", { identifier }, ); return null; } getDocumentLoader( identity: | { identifier: string } | { username: string } | { handle: string }, ): Promise<DocumentLoader>; getDocumentLoader(identity: SenderKeyPair): DocumentLoader; getDocumentLoader( identity: | SenderKeyPair | { identifier: string } | { username: string } | { handle: string }, ): DocumentLoader | Promise<DocumentLoader> { if ( "identifier" in identity || "username" in identity || "handle" in identity ) { let identifierPromise: Promise<string | null>; if ("username" in identity || "handle" in identity) { let username: string; if ("username" in identity) { username = identity.username; } else { username = identity.handle; getLogger(["fedify", "runtime", "docloader"]).warn( 'The "handle" property is deprecated; use "identifier" or ' + '"username" instead.', { identity }, ); } const mapper = this.federation.actorCallbacks?.handleMapper; if (mapper == null) { identifierPromise = Promise.resolve(username); } else { const identifier = mapper(this, username); identifierPromise = identifier instanceof Promise ? identifier : Promise.resolve(identifier); } } else { identifierPromise = Promise.resolve(identity.identifier); } return identifierPromise.then((identifier) => { if (identifier == null) return this.documentLoader; const keyPair = this.getRsaKeyPairFromIdentifier(identifier); return keyPair.then((pair) => pair == null ? this.documentLoader : this.federation.authenticatedDocumentLoaderFactory(pair) ); }); } return this.federation.authenticatedDocumentLoaderFactory(identity); } lookupObject( identifier: string | URL, options: LookupObjectOptions = {}, ): Promise<Object | null> { return lookupObject(identifier, { documentLoader: options.documentLoader ?? this.documentLoader, contextLoader: options.contextLoader ?? this.contextLoader, userAgent: options.userAgent ?? this.federation.userAgent, tracerProvider: options.tracerProvider ?? this.tracerProvider, }); } traverseCollection( collection: Collection, options: TraverseCollectionOptions = {}, ): AsyncIterable<Object | Link> { return traverseCollection(collection, { documentLoader: options.documentLoader ?? this.documentLoader, contextLoader: options.contextLoader ?? this.contextLoader, }); } sendActivity( sender: | SenderKeyPair | SenderKeyPair[] | { identifier: string } | { username: string } | { handle: string }, recipients: Recipient | Recipient[] | "followers", activity: Activity, options: SendActivityOptions = {}, ): Promise<void> { const tracer = this.tracerProvider.getTracer( metadata.name, metadata.version, ); return tracer.startActiveSpan( "activitypub.outbox", { kind: this.federation.outboxQueue == null || options.immediate ? SpanKind.CLIENT : SpanKind.PRODUCER, attributes: { "activitypub.activity.type": getTypeId(activity).href, "activitypub.activity.to": activity.toIds.map((to) => to.href), "activitypub.activity.cc": activity.toIds.map((cc) => cc.href), "activitypub.activity.bto": activity.btoIds.map((bto) => bto.href), "activitypub.activity.bcc": activity.toIds.map((bcc) => bcc.href), }, }, async (span) => { try { if (activity.id != null) { span.setAttribute("activitypub.activity.id", activity.id.href); } await this.sendActivityInternal( sender, recipients, activity, options, span, ); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); } protected async sendActivityInternal( sender: | SenderKeyPair | SenderKeyPair[] | { identifier: string } | { username: string } | { handle: string }, recipients: Recipient | Recipient[] | "followers", activity: Activity, options: SendActivityOptions = {}, span: Span, ): Promise<void> { let keys: SenderKeyPair[]; let identifier: string | null = null; if ("identifier" in sender || "username" in sender || "handle" in sender) { if ("identifier" in sender) { identifier = sender.identifier; } else { let username: string; if ("username" in sender) { username = sender.username; } else { username = sender.handle; getLogger(["fedify", "federation", "outbox"]).warn( 'The "handle" property for the sender parameter is deprecated; ' + 'use "identifier" or "username" instead.', { sender }, ); } if (this.federation.actorCallbacks?.handleMapper == null) { identifier = username; } else { const mapped = await this.federation.actorCallbacks.handleMapper( this, username, ); if (mapped == null) { throw new Error( `No actor found for the given username ${ JSON.stringify(username) }.`, ); } identifier = mapped; } } span.setAttribute("fedify.actor.identifier", identifier); keys = await this.getKeyPairsFromIdentifier(identifier); if (keys.length < 1) { throw new Error( `No key pair found for actor ${JSON.stringify(identifier)}.`, ); } } else if (Array.isArray(sender)) { if (sender.length < 1) { throw new Error("The sender's key pairs are empty."); } keys = sender; } else { keys = [sender]; } const opts: SendActivityInternalOptions<TContextData> = { contextData: this.data, ...options, }; let expandedRecipients: Recipient[]; if (Array.isArray(recipients)) { expandedRecipients = recipients; } else if (recipients === "followers") { if (identifier == null) { throw new Error( 'If recipients is "followers", ' + "sender must be an actor identifier or username.", ); } expandedRecipients = []; for await ( const recipient of this.getFollowers(identifier) ) { expandedRecipients.push(recipient); } const collectionId = this.federation.router.build( "followers", { identifier, handle: identifier }, ); opts.collectionSync = collectionId == null ? undefined : new URL(collectionId, this.url).href; } else { expandedRecipients = [recipients]; } span.setAttribute("activitypub.inboxes", expandedRecipients.length); return await this.federation.sendActivity( keys, expandedRecipients, activity, opts, span, ); } async *getFollowers(identifier: string): AsyncIterable<Recipient> { if (this.federation.followersCallbacks == null) { throw new Error("No followers collection dispatcher registered."); } const result = await this.federation.followersCallbacks.dispatcher( this, identifier, null, ); if (result != null) { for (const recipient of result.items) yield recipient; return; } if (this.federation.followersCallbacks.firstCursor == null) { throw new Error( "No first cursor dispatcher registered for followers collection.", ); } let cursor = await this.federation.followersCallbacks.firstCursor( this, identifier, ); while (cursor != null) { const result = await this.federation.followersCallbacks.dispatcher( this, identifier, cursor, ); if (result == null) break; for (const recipient of result.items) yield recipient; cursor = result.nextCursor ?? null; } } routeActivity( recipient: string | null, activity: Activity, options: RouteActivityOptions = {}, ): Promise<boolean> { const tracerProvider = this.tracerProvider ?? this.tracerProvider; const tracer = tracerProvider.getTracer(metadata.name, metadata.version); return tracer.startActiveSpan( "activitypub.inbox", { kind: this.federation.inboxQueue == null || options.immediate ? SpanKind.INTERNAL : SpanKind.PRODUCER, attributes: { "activitypub.activity.type": getTypeId(activity).href, }, }, async (span) => { if (activity.id != null) { span.setAttribute("activitypub.activity.id", activity.id.href); } if (activity.toIds.length > 0) { span.setAttribute( "activitypub.activity.to", activity.toIds.map((to) => to.href), ); } if (activity.ccIds.length > 0) { span.setAttribute( "activitypub.activity.cc", activity.ccIds.map((cc) => cc.href), ); } if (activity.btoIds.length > 0) { span.setAttribute( "activitypub.activity.bto", activity.btoIds.map((bto) => bto.href), ); } if (activity.bccIds.length > 0) { span.setAttribute( "activitypub.activity.bcc", activity.bccIds.map((bcc) => bcc.href), ); } try { const ok = await this.routeActivityInternal( recipient, activity, options, span, ); if (ok) { span.setAttribute("activitypub.shared_inbox", recipient == null); if (recipient != null) { span.setAttribute("fedify.inbox.recipient", recipient); } } else { span.setStatus({ code: SpanStatusCode.ERROR }); } return ok; } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); } protected async routeActivityInternal( recipient: string | null, activity: Activity, options: RouteActivityOptions = {}, span: Span, ): Promise<boolean> { const logger = getLogger(["fedify", "federation", "inbox"]); const contextLoader = options.contextLoader ?? this.contextLoader; const json = await activity.toJsonLd({ contextLoader }); const keyCache = new KvKeyCache( this.federation.kv, this.federation.kvPrefixes.publicKey, this, ); const verified = await verifyObject( Activity, json, { contextLoader, documentLoader: options.documentLoader ?? this.documentLoader, tracerProvider: options.tracerProvider ?? this.tracerProvider, keyCache, }, ); if (verified == null) { logger.debug( "Object Integrity Proofs are not verified.", { recipient, activity: json }, ); if (activity.id == null) { logger.debug( "Activity is missing an ID; unable to fetch.", { recipient, activity: json }, ); return false; } const fetched = await this.lookupObject(activity.id, options); if (fetched == null) { logger.debug( "Failed to fetch the remote activity object {activityId}.", { recipient, activity: json, activityId: activity.id.href }, ); return false; } else if (!(fetched instanceof Activity)) { logger.debug( "Fetched object is not an Activity.", { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, ); return false; } else if (fetched.id?.href !== activity.id.href) { logger.debug( "Fetched activity object has a different ID; failed to verify.", { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, ); return false; } else if (fetched.actorIds.length < 1) { logger.debug( "Fetched activity object is missing an actor; unable to verify.", { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, ); return false; } const activityId = fetched.id; if ( !fetched.actorIds.every((actor) => actor.origin === activityId.origin) ) { logger.debug( "Fetched activity object has actors from different origins; " + "unable to verify.", { recipient, activity: await fetched.toJsonLd({ contextLoader }) }, ); return false; } logger.debug( "Successfully fetched the remote activity object {activityId}; " + "ignore the original activity and use the fetched one, which is trustworthy.", ); activity = fetched; } else { logger.debug( "Object Integrity Proofs are verified.", { recipient, activity: json }, ); } const routeResult = await routeActivity({ context: this, json, activity, recipient, inboxListeners: this.federation.inboxListeners, inboxContextFactory: this.toInboxContext.bind(this), inboxErrorHandler: this.federation.inboxErrorHandler, kv: this.federation.kv, kvPrefixes: this.federation.kvPrefixes, queue: this.federation.inboxQueue, span, tracerProvider: options.tracerProvider ?? this.tracerProvider, }); return routeResult === "alreadyProcessed" || routeResult === "enqueued" || routeResult === "unsupportedActivity" || routeResult === "success"; } } interface RequestContextOptions<TContextData> extends ContextOptions<TContextData> { request: Request; invokedFromActorDispatcher?: { identifier: string }; invokedFromObjectDispatcher?: { // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => Object) & { typeId: URL }; values: Record<string, string>; }; } class RequestContextImpl<TContextData> extends ContextImpl<TContextData> implements RequestContext<TContextData> { readonly #invokedFromActorDispatcher?: { identifier: string }; readonly #invokedFromObjectDispatcher?: { // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => Object) & { typeId: URL }; values: Record<string, string>; }; readonly request: Request; override readonly url: URL; constructor(options: RequestContextOptions<TContextData>) { super(options); this.#invokedFromActorDispatcher = options.invokedFromActorDispatcher; this.#invokedFromObjectDispatcher = options.invokedFromObjectDispatcher; this.request = options.request; this.url = options.url; } async getActor(identifier: string): Promise<Actor | null> { if ( this.federation.actorCallbacks == null || this.federation.actorCallbacks.dispatcher == null ) { throw new Error("No actor dispatcher registered."); } if (this.#invokedFromActorDispatcher != null) { getLogger(["fedify", "federation", "actor"]).warn( "RequestContext.getActor({getActorIdentifier}) is invoked from " + "the actor dispatcher ({actorDispatcherIdentifier}); " + "this may cause an infinite loop.", { getActorIdentifier: identifier, actorDispatcherIdentifier: this.#invokedFromActorDispatcher.identifier, }, ); } return await this.federation.actorCallbacks.dispatcher( new RequestContextImpl({ ...this, invokedFromActorDispatcher: { identifier }, }), identifier, ); } async getObject<TObject extends Object>( // deno-lint-ignore no-explicit-any cls: (new (...args: any[]) => TObject) & { typeId: URL }, values: Record<string, string>, ): Promise<TObject | null> { const callbacks = this.federation.objectCallbacks[cls.typeId.href]; if (callbacks == null) { throw new Error("No object dispatcher registered."); } for (const param of callbacks.parameters) { if (!(param in values)) { throw new TypeError(`Missing parameter: ${param}`); } } if (this.#invokedFromObjectDispatcher != null) { getLogger(["fedify", "federation"]).warn( "RequestContext.getObject({getObjectClass}, " + "{getObjectValues}) is invoked from the object dispatcher " + "({actorDispatcherClass}, {actorDispatcherValues}); " + "this may cause an infinite loop.", { getObjectClass: cls.name, getObjectValues: values, actorDispatcherClass: this.#invokedFromObjectDispatcher.cls.name, actorDispatcherValues: this.#invokedFromObjectDispatcher.values, }, ); } return await callbacks.dispatcher( new RequestContextImpl({ ...this, invokedFromObjectDispatcher: { cls, values }, }), values, // deno-lint-ignore no-explicit-any ) as any; } #signedKey: CryptographicKey | null | undefined = undefined; async getSignedKey(): Promise<CryptographicKey | null> { if (this.#signedKey !== undefined) return this.#signedKey; return this.#signedKey = await verifyRequest(this.request, { ...this, timeWindow: this.federation.signatureTimeWindow, tracerProvider: this.tracerProvider, }); } #signedKeyOwner: Actor | null | undefined = undefined; async getSignedKeyOwner(): Promise<Actor | null> { if (this.#signedKeyOwner !== undefined) return this.#signedKeyOwner; const key = await this.getSignedKey(); if (key == null) return this.#signedKeyOwner = null; return this.#signedKeyOwner = await getKeyOwner(key, this); } } export class InboxContextImpl<TContextData> extends ContextImpl<TContextData> implements InboxContext<TContextData> { readonly recipient: string | null; readonly activity: unknown; readonly activityId?: string; readonly activityType: string; constructor( recipient: string | null, activity: unknown, activityId: string | undefined, activityType: string, options: ContextOptions<TContextData>, ) { super(options); this.recipient = recipient; this.activity = activity; this.activityId = activityId; this.activityType = activityType; } forwardActivity( forwarder: | SenderKeyPair | SenderKeyPair[] | { identifier: string } | { username: string } | { handle: string }, recipients: Recipient | Recipient[], options?: ForwardActivityOptions, ): Promise<void>; forwardActivity( forwarder: | { identifier: string } | { username: string } | { handle: string }, recipients: "followers", options?: ForwardActivityOptions, ): Promise<void>; forwardActivity( forwarder: | SenderKeyPair | SenderKeyPair[] | { identifier: string } | { username: string } | { handle: string }, recipients: Recipient | Recipient[] | "followers", options?: ForwardActivityOptions, ): Promise<void> { const tracer = this.tracerProvider.getTracer( metadata.name, metadata.version, ); return tracer.startActiveSpan( "activitypub.outbox", { kind: this.federation.outboxQueue == null || options?.immediate ? SpanKind.CLIENT : SpanKind.PRODUCER, attributes: { "activitypub.activity.type": this.activityType }, }, async (span) => { try { if (this.activityId != null) { span.setAttribute("activitypub.activity.id", this.activityId); } await this.forwardActivityInternal(forwarder, recipients, options); } catch (e) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) }); throw e; } finally { span.end(); } }, ); } private async forwardActivityInternal( forwarder: | SenderKeyPair | SenderKeyPair[] | { identifier: string } | { username: string } | { handle: string }, recipients: Recipient | Recipient[] | "followers", options?: ForwardActivityOptions, ): Promise<void> { const logger = getLogger(["fedify", "federation", "inbox"]); let keys: SenderKeyPair[]; let identifier: string | null = null; if ( "identifier" in forwarder || "username" in forwarder || "handle" in forwarder ) { if ("identifier" in forwarder) { identifier = forwarder.identifier; } else { let username: string; if ("username" in forwarder) { username = forwarder.username; } else { username = forwarder.handle; logger.warn( 'The "handle" property for the forwarder parameter is deprecated; ' + 'use "identifier" or "username" instead.', { forwarder }, ); } if (this.federation.actorCallbacks?.handleMapper == null) { identifier = username; } else { const mapped = await this.federation.actorCallbacks.handleMapper( this, username, ); if (mapped == null) { throw new Error( `No actor found for the given username ${ JSON.stringify(username) }.`, ); } identifier = mapped; } } keys = await this.getKeyPairsFromIdentifier(identifier); if (keys.length < 1) { throw new Error( `No key pair found for actor ${JSON.stringify(identifier)}.`, ); } } else if (Array.isArray(forwarder)) { if (forwarder.length < 1) { throw new Error("The forwarder's key pairs are empty."); } keys = forwarder; } else { keys = [forwarder]; } if (!hasSignature(this.activity)) { let hasProof: boolean; try { const activity = await Activity.fromJsonLd(this.activity, this); hasProof = await activity.getProof() != null; } catch { hasProof = false; } if (!hasProof) { if (options?.skipIfUnsigned) return; logger.warn( "The received activity {activityId} is not signed; even if it is " + "forwarded to other servers as is, it may not be accepted by " + "them due to the lack of a signature/proof.", ); } } if (recipients === "followers") { if (identifier == null) { throw new Error( 'If recipients is "followers", ' + "forwarder must be an actor identifier or username.", ); } const followers: Recipient[] = []; for await (const recipient of this.getFollowers(identifier)) { followers.push(recipient); } recipients = followers; } const inboxes = extractInboxes({ recipients: Array.isArray(recipients) ? recipients : [recipients], preferSharedInbox: options?.preferSharedInbox, excludeBaseUris: options?.excludeBaseUris, }); logger.debug("Forwarding activity {activityId} to inboxes:\n{inboxes}", { inboxes: globalThis.Object.keys(inboxes), activityId: this.activityId, activity: this.activity, }); if (options?.immediate || this.federation.outboxQueue == null) { if (options?.immediate) { logger.debug( "Forwarding activity immediately without queue since immediate " + "option is set.", ); } else { logger.debug( "Forwarding activity immediately without queue since queue is not " + "set.", ); } const promises: Promise<void>[] = []; for (const inbox in inboxes) { promises.push( sendActivity({ keys, activity: this.activity, activityId: this.activityId, activityType: this.activityType, inbox: new URL(inbox), sharedInbox: inboxes[inbox].sharedInbox, tracerProvider: this.tracerProvider, }), ); } await Promise.all(promises); return; } logger.debug( "Enqueuing activity {activityId} to forward later.", { activityId: this.activityId, activity: this.activity }, ); const keyJwkPairs: SenderKeyJwkPair[] = []; for (const { keyId, privateKey } of keys) { const privateKeyJwk = await exportJwk(privateKey); keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk }); } const carrier: Record<string, string> = {}; propagation.inject(context.active(), carrier); const promises: Promise<void>[] = []; for (const inbox in inboxes) { const message: OutboxMessage = { type: "outbox", id: crypto.randomUUID(), keys: keyJwkPairs, activity: this.activity, activityId: this.activityId, activityType: this.activityType, inbox, sharedInbox: inboxes[inbox].sharedInbox, started: new Date().toISOString(), attempt: 0, headers: {}, traceContext: carrier, }; promises.push(this.federation.outboxQueue.enqueue(message)); } const results = await Promise.allSettled(promises); const errors: unknown[] = results .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { logger.error( "Failed to enqueue activity {activityId} to forward later:\n{errors}", { activityId: this.activityId, errors }, ); if (errors.length > 1) { throw new AggregateError( errors, `Failed to enqueue activity ${this.activityId} to forward later.`, ); } throw errors[0]; } } } interface ActorCallbacks<TContextData> { dispatcher?: ActorDispatcher<TContextData>; keyPairsDispatcher?: ActorKeyPairsDispatcher<TContextData>; handleMapper?: ActorHandleMapper<TContextData>; authorizePredicate?: AuthorizePredicate<TContextData>; } interface ObjectCallbacks<TContextData, TParam extends string> { dispatcher: ObjectDispatcher<TContextData, Object, string>; parameters: Set<TParam>; authorizePredicate?: ObjectAuthorizePredicate<TContextData, TParam>; } interface SendActivityInternalOptions<TContextData> extends SendActivityOptions { collectionSync?: string; contextData: TContextData; } function notFound(_request: Request): Response { return new Response("Not Found", { status: 404 }); } function notAcceptable(_request: Request): Response { return new Response("Not Acceptable", { status: 406, headers: { Vary: "Accept, Signature", }, }); } function unauthorized(_request: Request): Response { return new Response("Unauthorized", { status: 401, headers: { Vary: "Accept, Signature", }, }); } /** * Generates or extracts a unique identifier for a request. * * This function first attempts to extract an existing request ID from standard * tracing headers. If none exists, it generates a new one. The ID format is: * * - If from headers, uses the existing ID. * - If generated, uses format `req_` followed by a base36 timestamp and * 6 random chars. * * @param request The incoming HTTP request. * @returns A string identifier unique to this request. */ function getRequestId(request: Request): string { // First try to get existing trace ID from standard headers: const traceId = request.headers.get("X-Request-Id") || request.headers.get("X-Correlation-Id") || request.headers.get("Traceparent")?.split("-")[1]; if (traceId != null) return traceId; // Generate new ID if none exists: // - Use timestamp for rough chronological ordering // - Add random suffix for uniqueness within same millisecond // - Prefix to distinguish from potential existing IDs const timestamp = Date.now().toString(36); const random = Math.random().toString(36).slice(2, 8); return `req_${timestamp}${random}`; }