serverless, aws, ai, mongodb

Lassen Sie einen KI-Agenten meine verwandten Beiträge auswählen

2026-06-11
This post cover image
aws serverless lambda ai bedrock mongodb

Diese Datei wurde automatisch von KI ubersetzt, es konnen Fehler auftreten

Ich erweiteere meine Blog-Pipeline ständig mit kleinen Automatisierungen. Ich korrekturlese meine Beiträge mit Amazon Nova, und ich übersetze sie automatisch, und jeder Schritt läuft serverlos und ereignisgesteuert. Diesmal habe ich mich mit etwas beschäftigt, über das ich schon lange nachdenke.

Ich schreibe einen Beitrag und jemand findet ihn, liest ihn bis zum Ende und geht dann. Dieser letzte Teil ist es, über den ich nachgedacht habe. Ein Leser, der gerade einen Beitrag über Lambda beendet hat, ist wahrscheinlich daran interessiert, meine anderen Serverless-Beiträge zu lesen. Ich habe bis jetzt nichts mit diesem Moment gemacht, kein "Lesen Sie als Nächstes". Nichts. Die Lektüre endete genau an dem Punkt, an dem sie hätte weitergehen sollen.

Also habe ich eine Pipeline gebaut, die genau das repariert, einen KI-Agenten, der jeden neuen Beitrag liest, alle meine Blogbeiträge nach Bedeutung und nicht nach Stichwörtern durchsucht, dann drei verwandte Beiträge auswählt und mir für jede Wahl einen ein-Satz-Gründ aufschreibt. Die Einrichtung ist alles serverlos und alles ereignisgesteuert, was haben Sie erwartet? Ich habe Lambda Durable Functions, Strands Agents, Bedrock AgentCore Gateway und MongoDB Atlas Vector Search in einer Pipeline kombiniert.

Der gesamte Code für diesen Beitrag findet sich wie gewohnt auf der Serverless-Handbuch-Website

Warum nicht einfach Tags verwenden?

Mein Blog ist statisches HTML und ein einfacher Weg wäre es, Tags zu verwenden, wie so viele andere statische Seiten. Ich habe einen Test gemacht und es war einfach nicht gut genug. Ich verwende Tags wie serverless, die ziemlich breit gefächert sind, aber ich habe auch einige engere Tags wie lambda. Aber nur weil ein Beitrag mit serverless oder lambda getaggt ist, heißt das nicht, dass er möglicherweise nichts mit anderen Beiträgen gemeinsam hat. Tags beantworten "Mit welchen Wörtern habe ich dies beschriftet?", nicht "Worum geht es in diesem Beitrag eigentlich?"

Der nächste Schritt wäre die Verwendung von Inhaltsähnlichkeit. Hier kann ich Vektor-Embeddings für jeden Beitrag erstellen und die drei nächsten Nachbarn zurückgeben, erledigt! Nun ja, das ist das Rückgrat der Pipeline, aber eine rohe Top-drei-Ähnlichkeitsabfrage liefert oft nahezu duplizierte Beiträge, die genau dasselbe Thema abdecken. Ein reines Nächste-Nachbarn-Matching kann mir sagen, dass zwei Beiträge ähnlich sind, aber es sagt mir nicht, ob ein Leser, der einen beendet hat, etwas aus dem anderen mitnehmen würde.

Also habe ich das Design erweitert, und der Kern von allem ist, dass Vektorsuche Daten abruft und der KI-Agent entscheidet, was am besten passt. Ich beginne damit, die Anzahl der Beiträge mithilfe von Vektorsuche einzugrenzen, dann liest ein KI-Agent, der die Rolle meines Redakteurs spielt, die Beiträge und wählt die drei aus, von denen er denkt, dass der Leser sie am meisten genießen würde. Er erstellt eine ein-Satz-Begründung, warum er diesen Beitrag ausgewählt hat. Auf diese Weise kann ich den KI-Agenten im Laufe der Zeit überprüfen und verfeinern.

Architekturüberblick

Wenn ein Beitrag veröffentlicht wird, postet die neue Blog-Pipeline ein PostPublished-Ereignis an einen EventBridge-Bus.

{
  "Source": "....",
  "DetailType": "PostPublished",
  "Detail": {
    "slug": "lambda-durable-functions-101",
    "language": "en",
    "branch": "main",
    "commit_sha": "f8a2c1d3..."
  }
}

Dies wird nun eine Lambda Durable Function aufrufen, die den gesamten Ablauf koordiniert. Da die Koordination größtenteils aus Codeschritten besteht, habe ich mich entschieden, Durable Functions anstelle von Step Functions auszuprobieren, die in der Vergangenheit mein üblicher Ansatz waren. Mit Durable Functions bekomme ich ein weiteres Werkzeug in meine Werkzeugkiste.

Bild zeigt den Architekturüberblick, eine durable Lambda, die Bedrock, AgentCore Gateway, MongoDB Atlas, DSQL und GitHub orchestriert

Die Pipeline sieht so aus: Sie holt das Beitrag-Markdown von GitHub, ruft Bedrock auf, um ein Embedding mit Amazon Titan Text Embeddings V2 zu erstellen, und fügt es in eine MongoDB Atlas-Sammlung ein. Als Nächstes wird der KI-Agent aufgerufen, um die drei verwandten Beiträge auszuwählen, beginnend mit einer Vektorsuche in MongoDB Atlas mithilfe eines auf AgentCore Gateway gehosteten MCP-Tools. Dann werden die verwandten Beitragreferenzen zu DSQL hinzugefügt, was Teil meines selbstgebauten CMS-Systems ist, und schließlich wird das Frontmatter des Beitrags in GitHub aktualisiert und ein PR geöffnet.

Ich musste auch sicherstellen, dass Embeddings für alle aktuellen Beiträge erstellt wurden; andernfalls würde es für neue Beiträge nicht funktionieren. Also habe ich eine kleine einmalige Backfill-Einrichtung erstellt.

Bild zeigt den Architekturüberblick für das Backfilling

Das sind viele bewegliche Teile für eine Funktion. Ich werde darauf zurückkommen, warum das funktioniert.

Von Markdown zu Embeddings

Die gesamte Pipeline beruht auf einer Transformation, die den Blogbeitrag in Embeddings umwandelt, damit wir programmgesteuert ähnliche Beiträge finden können.

Ein Embedding-Modell ordnet Text einem Vektor fester Länge von Gleitkommazahlen zu. In meinem Fall 1024 Gleitkommazahlen von Bedrocks Titan Text Embeddings v2. Das Modell wurde so trainiert, dass Texte über dieselben Konzepte in diesem 1024-dimensionalen Raum nahe beieinander liegen, selbst wenn sie kein gemeinsames Vokabular teilen. Mein Durable Functions-Beitrag und mein Step Functions-Beitrag landen als Nachbarn, weil beide um die Orchestrierung von Workflows auf AWS gehen.

Bild zeigt Blogbeiträge als Punkte in einer 2D-semantischen Karte mit Themengruppen und den drei ausgewählten Beiträgen hervorgehoben

Was in das Embedding eingeht, ist sehr wichtig, genau wie das Modell. Ich embedde nicht das gesamte rohe Markdown; ich führe zuerst einen kleinen Normalisierungsschritt aus, der das YAML-Frontmatter entfernt, jedes Codebeispiel entfernt und Bilder weglässt, und erstelle schließlich eine neue Struktur, die an das Modell übergeben werden kann.

TITEL: Lambda Durable Functions 101
TAGS: aws, lambda, serverless
ZUSAMMENFASSUNG: Ein praktischer Leitfaden zum Aufbau eines dauerhaften Workflows auf AWS Lambda.
KÖRPER: Der bereinigte Beitrag

Warum entferne ich also den gesamten Code? Nun, Codeblöcke sind einfach "Rauschen". Zwei Beiträge zum gleichen Thema können völlig unterschiedliche Codebeispiele haben, wenn ich also den Code einbeziehe, würden diese Beiträge im Vektorraum jetzt weiter voneinander entfernt landen. Code sagt nichts über Ähnlichkeit aus, also nehme ich nur den tatsächlichen Bloginhalt auf.

Der Aufruf zu Bedrock, um das Embedding zu erstellen, ist ziemlich klein.

_MODEL_ID = "amazon.titan-embed-text-v2:0"

def embed_text(text: str, dimensions: int = 1024) -> list[float]:
    response = _bedrock_client().invoke_model(
        modelId=_MODEL_ID,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "inputText": text,
            "dimensions": dimensions,
            "normalize": True,
        }),
    )
    payload = json.loads(response["body"].read())
    return [float(x) for x in payload["embedding"]]

Eine Flagge, die wir genauer betrachten müssen, ist normalize: True. Stellen Sie sich jedes Embedding als einen Pfeil vor, der irgendwo im Raum zeigt. Beiträge zum gleichen Thema zeigen ungefähr in die gleiche Richtung. Beim Vergleich zweier Beiträge ist die Richtung das, was zählt, nicht wie lang die Pfeile sind. Das Standardrezept dafür ist die Kosinusähnlichkeit: Multiplizieren Sie die beiden Vektoren und teilen Sie dann durch die Länge jedes Pfeils. Diese Division ist nur da, um die Längen aufzuheben. Mit normalize: True schneidet Titan jeden Vektor auf eine Länge von genau 1, bevor er ihn zurückgibt. Und durch 1 zu teilen ändert nichts. Die Vergleichsoperation reduziert sich also nur auf den Multiplikationsschritt, ein einfaches Skalarprodukt. Gleiches Ergebnis, weniger Mathematik, und das summiert sich, wenn jeder neue Beitrag mit dem gesamten Rückkatalog verglichen wird.

Der Embed-Eingabe ist vor diesem Aufruf auf 30.000 Zeichen begrenzt, sodass sehr lange Beiträge trunkiert werden, anstatt zu scheitern (Titan v2 akzeptiert etwa 8K Tokens).

Gleichzeitig wird ein SHA-256-Inhalts-Hash erstellt und zusammen mit dem Vektor gespeichert, damit ich überprüfen kann, ob ältere Beiträge geändert wurden und aktualisiert werden müssen.

new_hash = compute_hash(markdown)  # sha256 über den normalisierten Körper
existing = context.step(
    lambda _: find_by_id(slug, language),
    name="find_by_id",
)
if not force_recompute and existing and existing.get("content_hash") == new_hash:
    return {"skipped": True, "reason": "content_hash_unchanged"}

Speichern der Vektoren in MongoDB Atlas

Die erstellten Vektoren müssen irgendwo gespeichert werden, wo sie ordnungsgemäß abgefragt werden können. Ich habe einige verschiedene Optionen wie OpenSearch serverless und Aurora mit pgvector untersucht. Aber am Ende habe ich mich entschieden, dies mit MongoDB Atlas zu betreiben, was für mich die beste Wahl zu sein schien. Und da ich einen M0-Cluster unter der MongoDB-Free-Tier laufen lassen kann, war das ein zusätzlicher Bonus.

MongoDB Atlas Vector Search läuft HNSW im Hintergrund, dem gleichen approximativen Nächste-Nachbarn-Algorithmus, der den meisten Produktions-Vektor-Datenbanken zugrunde liegt. Die Abfrage ist ein $vectorSearch-Aggregationsschritt, was bedeutet, dass der Vektorindex und die Dokumente in einem System leben. Eine Abfrage gibt Nachbarn mit ihren Titeln, Zusammenfassungen und Tags zurück. Genau die Daten, die der Agent benötigt, keine zweite Suche, keine separate Vektor-Datenbank, die synchron gehalten werden muss.

pipeline = [
    {"$vectorSearch": {
        "index": "posts_vector_idx",
        "path": "embedding",
        "queryVector": embedding,           # 1024 dims, von Titan
        "numCandidates": max(100, k * 10),
        "limit": k,
        "filter": {"language": language},   # WÄHREND der Traversierung ausgewertet
    }},
    {"$match": {"slug": {"$nin": exclude_slugs}}},
    {"$project": {
        "_id": 0,
        "slug": 1, "language": 1, "title": 1,
        "summary": 1, "tags": 1, "category": 1,
        "score": {"$meta": "vectorSearchScore"},
    }},
]
return list(_collection().aggregate(pipeline))

Diese filter-Zeile ist für mich und meine Blogs sehr wichtig. Meine Beiträge existieren in mehreren Sprachen, und der nächste Nachbar eines Beitrags ist zuverlässig seine eigene Übersetzung. Atlas lässt mich language als Filterfeld im Vektorindex deklarieren, sodass die HNSW-Traversierung nur Vektoren derselben Sprache berücksichtigt.

Für die Authentifizierung verwende ich AWS IAM, keine Passwörter irgendwo. MongoDB Atlas akzeptiert AWS IAM-Prinzipien als Datenbankbenutzer über den MONGODB-AWS-Mechanismus von pymongo. Meine Lambda-Funktionen übernehmen eine dedizierte IAM-Rolle über STS, eine Rolle, die genau einmal mit Atlas verbunden wurde, und die temporären Berechtigungen fließen direkt in die MongoDB-Verbindung.

def _assume_atlas_role() -> dict:
    sts = boto3.client("sts")
    response = sts.assume_role(
        RoleArn=os.environ["ATLAS_ROLE_ARN"],
        RoleSessionName="related-posts-atlas",
        DurationSeconds=3600,
    )
    return response["Credentials"]


def _client() -> MongoClient:
    creds = _assume_atlas_role()
    return MongoClient(
        _connection_config()["srvUri"],   # ...authMechanism=MONGODB-AWS
        username=creds["AccessKeyId"],
        password=creds["SecretAccessKey"],
        authMechanismProperties={"AWS_SESSION_TOKEN": creds["SessionToken"]},
    )

Warum verwende ich eine dedizierte Rolle anstelle der eigenen Ausführungsrolle von Lambda? Das hat im Grunde zwei Gründe: Erstens möchte ich die Einrichtung von MongoDB Atlas in einem Ablauf handhaben können. Ich möchte nicht einen Teil von Atlas einrichten, dann Lambda-Funktionen bereitstellen müssen, die Role ARNs abrufen und Atlas aktualisieren. Indem ich dies trenne, kann ich auch eine Rolle erstellen, die ich gegenüber Atlas autorisiere, und dann können mehrere Lambda-Funktionen dieselbe Rolle übernehmen, und ich kann diesen Zugriff ordnungsgemäß mit IAM-Berechtigungen steuern.

Wenn Sie tiefer in die passwortfreie Authentifizierung gegenüber Atlas einsteigen möchten, habe ich kürzlich über Outbound Identity Federation mit MongoDB geschrieben, was eine andere Technik ist als die, die ich hier verwende.

KI-Inhaltsredakteur mit Tools

Hier wird der KI-Teil schärfer. Der Agent ist nicht "rufen Sie eine LLM mit einem Prompt". Es ist eine Schleife. Claude Sonnet 4.6 auf Bedrock wird mit einem System-Prompt aufgerufen, um verwandte Beiträge zu finden, zwei Tools, und er entscheidet, was er aufruft, was er liest und wann er fertig ist.

Bild zeigt die Agent-Tool-Schleife zwischen Strands, AgentCore Gateway, den Tool-Lambda-Funktionen und Atlas

Die beiden Tools sind einfache Lambda-Funktionen, die als MCP-Tools im AgentCore Gateway behandelt werden. vector_search gibt die Top-k-Kandidaten mit Titeln, Zusammenfassungen, Tags und Ähnlichkeits-Scores zurück. read_post_excerpt gibt die ersten 1000 Zeichen des Körpers eines Beitrags zurück, und der Agent ruft ihn nur auf, wenn zwei Kandidaten aus ihren Zusammenfassungen austauschbar aussehen und er das Unentschieden durch tatsächliches Lesen lösen möchte.

Ein bewusstes Design-Detail in vector_search ist, dass der Agent den Slug des Beitrags übergeben, nie ein Embedding. Die Tool-Lambda löst den gespeicherten Vektor selbst aus Atlas auf, sodass das Modell kein fehlerhaftes 1024-Float-Array in die Index-Grenze halluzinieren kann.

Wie erwähnt, werden Tools von Bedrock AgentCore Gateway vorgelagert. Das Gateway macht meine Lambda-Funktionen zu einem Tool-Katalog, den jeder MCP-bewusste Agent entdecken und aufrufen kann, behandelt das Protokoll, damit die Lambdas protokollfrei bleiben, und authentifiziert eingehende Aufrufe mit einfachem IAM SigV4, kein OAuth-Server, der ausgeführt werden muss. Hätte ich das Gateway überspringen und die Tools als Inline-Python-Funktionen übergeben können? Sicher. Aber der nächste Agent, den ich baue (und es wird einen geben), kann denselben Tool-Katalog wiederverwenden, ohne dass ich Code herumkopieren muss.

Das Gateway und seine Ziele sind einfaches CloudFormation. Jedes Ziel ordnet eine Lambda einem Tool-Namen plus einem Eingabeschema zu, und dieses Schema ist das, was das Modell sieht, wenn es entscheidet, wie es das Tool aufruft.

RelatedPostsGateway:
  Type: AWS::BedrockAgentCore::Gateway
  Properties:
    Name: !Sub ${Application}-related-posts-gw
    ProtocolType: MCP
    RoleArn: !GetAtt GatewayExecutionRole.Arn
    AuthorizerType: AWS_IAM

VectorSearchTarget:
  Type: AWS::BedrockAgentCore::GatewayTarget
  Properties:
    GatewayIdentifier: !Ref RelatedPostsGateway
    Name: vector-search-target
    CredentialProviderConfigurations:
      - CredentialProviderType: GATEWAY_IAM_ROLE
    TargetConfiguration:
      Mcp:
        Lambda:
          LambdaArn: !GetAtt VectorSearchToolFunction.Arn
          ToolSchema:
            InlinePayload:
              - Name: vector_search
                Description: >
                  Vektorsuche über den Blog-Beitrags-Korpus. Löst das gespeicherte Embedding des Quellbeitrags
                  aus Atlas durch source_slug auf und gibt bis zu k Kandidaten nach semantischer Ähnlichkeit zurück.
                InputSchema:
                  Type: object
                  Properties:
                    source_slug:
                      Type: string
                    language:
                      Type: string
                    k:
                      Type: integer
                    exclude_slugs:
                      Type: array
                      Items:
                        Type: string
                  Required:
                    - source_slug

Und die Tool-Lambda weiß nichts von MCP. Das Gateway übergibt die Tool-Eingabe als Ereignis, und der Rückgabewert wird zum Tool-Ausgabe.

def handler(event: dict, context) -> dict:
    source_slug = event["source_slug"]
    language = event.get("language") or "en"

    embedding = find_embedding(source_slug, language)
    candidates = vector_search_neighbors(
        embedding=embedding,
        k=int(event.get("k", 20)),
        language=language,
        exclude_slugs=event.get("exclude_slugs"),
    )
    return {"candidates": candidates, "count": len(candidates)}

Strands Agents ist das Open-Source-Agenten-Framework von AWS und steuert die agentische Schleife, sodass ich das nicht selbst implementieren muss.

from mcp_proxy_for_aws.client import aws_iam_streamablehttp_client
from strands import Agent
from strands.models import BedrockModel
from strands.tools.mcp.mcp_client import MCPClient

mcp_client = MCPClient(
    lambda: aws_iam_streamablehttp_client(
        endpoint=os.environ["AGENTCORE_GATEWAY_URL"],
        aws_service="bedrock-agentcore",
    )
)
model = BedrockModel(
    model_id="eu.anthropic.claude-sonnet-4-6",  # EU-Cross-Region-Inference-Profil
    temperature=0.0,
    streaming=False,
)

with mcp_client:
    tools = mcp_client.list_tools_sync()
    agent = Agent(
        model=model,
        tools=tools,
        system_prompt=_SYSTEM_PROMPT,
        callback_handler=_BoundedToolCallHandler(max_calls=8),
    )
    result = agent(
        f"source_slug: {source_slug}\n"
        f"source_title: {source_title}\n"
        f"source_summary: {source_summary}\n"
        f"language: {language}\n\n"
        "Wählen Sie 3 verwandte Beiträge aus."
    )

Das Verhalten des Modells wird fast vollständig durch den System-Prompt geformt. Persona, Anti-Pattern-Anleitung und ein strenger Vertrag.

Sie sind der Redakteur für jimmydqv.com, ein technischer Blog über AWS, Serverless und KI.
Ihre Aufgabe ist es, genau 3 verwandte Beiträge auszuwählen, die ein Leser nach
dem aktuellen Beitrag am meisten schätzen würde.
Bevorzugen Sie thematische Tiefe gegenüber oberflächlicher Stichwortüberlappung.
Jede Auswahl benötigt eine ein-Satz-Begründung, die die spezifische Verbindung nennt.

Arbeitsablauf:
1. Rufen Sie vector_search mit source_slug=, language=, k=20,
   exclude_slugs=[] auf. Das Tool löst das Embedding aus Atlas intern auf,
   versuchen Sie NICHT, ein Embedding selbst zu konstruieren.
2. Wenn sich zwei Kandidaten austauschbar anfühlen, rufen Sie optional read_post_excerpt für einen auf,
   um das Unentschieden zu lösen.
3. Geben Sie STRENG dieses JSON-Format und nichts anderes zurück:
   {"picks": [{"slug": "...", "rationale": "..."}, ... genau 3 Elemente ...]}
parsed = json.loads(_strip_fences(raw))
picks_raw = parsed.get("picks", [])
if len(picks_raw) != 3:
    raise ValueError(f"Agent muss genau 3 Picks zurückgeben, erhalten {len(picks_raw)}")

Also, warum habe ich mich für das etwas teurere Sonnet-Modell und nicht für das schlankere Haiku oder sogar Nova 2 entschieden? Ich habe mehrere verschiedene Modelle getestet. Haiku und Nova 2 Light sind beide schneller und günstiger, haben aber nicht zuverlässig drei großartige Picks zurückgegeben; manchmal bekam ich zwei, manchmal vier, manchmal wirklich schlechte Picks. Sonnet produzierte die Picks am konsistentesten, und ein Aufruf dauert zwei bis drei Sekunden.

Ein typischer Ablauf: ein vector_search, gelegentlich ein Auszug lesen, dann drei Picks mit Begründungen wie "Beide gehen durch die Agent-Tool-Schleife auf Bedrock; dieser Beitrag konzentriert sich auf die dauerhafte Orchestrierung darum herum."

Orchestrierung mit Lambda Durable Functions

Die Pipeline dauert 30 bis 90 Sekunden, macht etwa sechs LLM-Aufrufe, spricht mit vier externen Systemen. Die klassische Antwort ist Step Functions, und ich habe Step Functions viele Male zur Orchestrierung verwendet. Diesmal habe ich Lambda Durable Functions stattdessen verwendet, und für eine Agenten-Arbeitslast würde ich dieselbe Entscheidung wieder treffen.

Das Modell ist einfach. Der gesamte Workflow ist gewöhnliches Python in einem Handler, und jeder Nebeneffekt ist in context.step(...) eingebettet. Das Ergebnis jedes Schritts wird checkpointed. Wenn die Aufrufausführung abstürzt, Bedrock throttelt, Zeitüberschreitung, alles, eine frische Ausführung führt den Handler von oben neu aus, aber abgeschlossene Schritte wiederholen sich von ihren Checkpoints statt neu auszuführen.

Bild zeigt zwei Aufrufzeitlinien, bei denen der zweite Aufruf checkpointed Schritte kostenlos wiedergibt und am fehlgeschlagenen Schritt fortfährt

Eine reguläre Lambda in eine durable Funktion zu verwandeln, ist eine SAM-Eigenschaft. Der Orchestrierer wird wie folgt deklariert.

RelatedPostsOrchestrator:
  Type: AWS::Serverless::Function
  Properties:
    CodeUri: lambda/orchestrator
    Handler: handler.handler
    Runtime: python3.13
    Timeout: 900
    MemorySize: 1024
    AutoPublishAlias: live
    DurableConfig:
      ExecutionTimeout: 3600
      RetentionPeriodInDays: 7

Der Handler selbst liest sich wie eine Liste von Schritten.

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    slug = event["slug"]
    language = event.get("language", "en")

    # run_id ist nicht deterministisch; berechnen Sie es innerhalb eines Schritts
    # damit Wiederholungen denselben Wert wiederverwenden.
    run_id = context.step(lambda _: str(uuid.uuid4()), name="generate_run_id")

    file_path = context.step(
        lambda _: lookup_post_file_path(slug, language),
        name="lookup_file_path",
    )
    fetched = context.step(
        lambda _: invoke_fetch_source(file_path, branch, git_ref),
        name="fetch_source",
    )
    # ... content-hash-Kurzschaltung ...
    embedding = context.step(
        lambda _: embed_text(embed_input),
        name="embed_post",
    )
    context.step(lambda _: upsert_post(doc), name="upsert_post")

    primary_picks = context.run_in_child_context(
        lambda child_ctx: child_ctx.step(
            lambda _: _picks_for_one_post(slug, doc["title"], doc["summary"], language),
            name="agent_pick_primary",
        ),
        name="primary_agent_picks",
    )
    if len(primary_picks) != 3:
        raise ExecutionError(
            f"Agent hat {len(primary_picks)} Picks zurückgegeben, es werden genau 3 benötigt"
        )
    # ... Fan-out, persistieren, PR ...

Beachten Sie das run_id. Selbst ein uuid.uuid4() muss innerhalb eines Schritts leben, da eine Wiederholung sonst eine andere ID erzeugen und von der Checkpoint-Trajektorie abweichen würde. Der Agentenaufruf läuft in einem untergeordneten Kontext, da er mehrere MCP-Tool-Aufrufe im Hintergrund orchestriert, und der untergeordnete Kontext hält den Schrittsbaum übersichtlich.

Was das Wiederholungsmodell hier bringt, ist konkret: Wenn ein Agentenaufruf gedrosselt wird, holt die Wiederholung nicht von GitHub neu, embeddet nicht neu oder fügt nicht neu ein. Die teure frühere Arbeit wird kostenlos wiederholt. Und die Agentenlogik selbst, Verzweigung, JSON-Parsing, "Nochmal für fünf Nachbarn ausführen", ist genau die Art von Code, die in einer JSON-State-Maschine gegen Sie kämpft und in Python natürlich gelesen wird. Fan-out sind ein paar Zeilen.

batch = context.parallel(
    [_make_backlink_fn(n) for n in backlink_targets],
    name="backlink_fan_out",
    config=ParallelConfig(max_concurrency=5),
)
# Ein einzelner fehlgeschlagener Ast sollte die Pipeline nicht scheitern lassen; behalten Sie, was erfolgreich war.
for item in batch.succeeded():
    result = item.result
    if result and len(result.get("picks") or []) == 3:
        backlink_results.append(result)

Die verwandten Beiträge-Links rückwärts

Dies ist ein interessanter Teil der Lösung und der Pipeline.

Die Verwandten-Beiträge für einen neuen Beitrag zu finden, ist der einfache Teil und sehr geradlinig. Der schwierigere Teil ist, was, wenn ich einen neuen Beitrag hinzufüge, der jetzt eine bessere Wahl als verwandter Beitrag für einen alten Beitrag ist? Das würde bedeuten, dass ich ältere Beiträge aktualisieren müsste, wenn ein neuer hinzugefügt wird, und ihn rückwärts verlinke. Denn wenn ich das nicht tue, bleibt ein vor einem Jahr geschriebener Beitrag für immer "eingefroren".

Bild zeigt den neuen Beitrag in der Mitte mit fünf Nachbarsbeiträgen, die parallel neu ausgewertet werden, jeder mit seinem eigenen Ergebnis

Also bittet der Orchestrierer nach dem Einbetten des neuen Beitrags Atlas um seine zehn nächsten Nachbarn und führt den Agenten für die Top fünf neu aus, jeder als unabhängigen, parallelen dauerhaften Schritt. Jeder Nachbarsredakteur läuft die volle Frage von Grund auf neu: Angesichts des Blogs, wie er jetzt existiert, was sind die besten drei? Manchmal verdrängt der neue Beitrag eine alte Auswahl. Manchmal ändert sich nichts.

Kein nächtlicher Batch-Job, kein Cron. Die Arbeit passiert genau dann, wenn sich der Blogbeitrag ändert, begrenzt auf die Nachbarschaft, die sich geändert hat.

Die Auswahl landen

Die Ausgabe des Agenten muss an zwei Stellen landen, weil der Blog eine statische 11ty-Website ist.

Amazon DSQL ist die Quelle der Wahrheit. Eine Zeile pro Auswahl, geschrieben als DELETE-then-INSERT in einer einzigen Transaktion, sodass Wiederholungen idempotent sind und ein fehlgeschlagener Schreibvorgang die alten Auswahlmöglichkeiten intakt lässt. Ich verwende dasselbe DSQL-Setup wie in meiner AI Bartender-Serie, IAM-Auth und alles.

CREATE TABLE cms_content.related_posts (
    source_slug      TEXT NOT NULL,
    language         TEXT NOT NULL,
    position         SMALLINT NOT NULL,
    related_slug     TEXT NOT NULL,
    rationale        TEXT NOT NULL,
    similarity_score DOUBLE PRECISION,
    run_id           UUID NOT NULL,
    generated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (source_slug, language, position),
    CHECK (position BETWEEN 1 AND 3)
);

Der Schreibvorgang läuft auf einer einzigen Verbindung, und das DELETE wird zusammen mit einem fehlgeschlagenen INSERT zurückgerollt, sodass die Tabelle nie halbgeschrieben ist.

conn = connector.get_connection(token, user)
try:
    with conn.cursor() as cur:
        cur.execute(
            "DELETE FROM cms_content.related_posts "
            "WHERE source_slug = %(s)s AND language = %(l)s",
            {"s": source_slug, "l": language},
        )
        for position, pick in enumerate(picks, start=1):
            cur.execute(
                "INSERT INTO cms_content.related_posts "
                "(source_slug, language, position, related_slug, "
                " rationale, similarity_score, run_id) "
                "VALUES (%(src)s, %(lang)s, %(pos)s, %(rel)s, "
                "        %(rat)s, %(sim)s, %(run)s)",
                {...},
            )
    conn.commit()
except Exception:
    conn.rollback()
    raise

Mein CMS-Dashboard liest diese Tabelle und zeigt die Auswahlmöglichkeiten mit Begründungen neben jedem Beitrag an. Der letzte Schritt des Orchestrierers gibt ein abgeschlossenes Ereignis aus, das über EventBridge zu AppSync Events fließt, sodass sich das Dashboard in Echtzeit aktualisiert, wenn ein Lauf endet. Nirgendwo Polling.

Ein GitHub PR macht es auf dem Blog sichtbar. Die Pipeline injiziert einen related_posts:-Block in das Frontmatter der Beiträge, bis zu sechs Dateien in einem PR (der neue Beitrag plus die aktualisierten Nachbarn). Der nächste Site-Build rendert den Abschnitt.

related_posts:
  - slug: "step-functions-vs-durable-functions"
    rationale: "Direkter Vergleich des hier verwendeten Orchestrierungsansatzes gegen die SFN-Alternative."

Warum ein PR anstatt direkt in main zu committen? So machen alle meine verschiedenen Verbesserungs-Pipelines es, es gibt mir als Mensch eine letzte Prüfung, bevor es live geht.

Endergebnis

Am Ende fügt die Pipeline die verwandten Beiträge in das Frontmatter hinzu, und der 11ty-Build-Prozess nimmt sie auf, sucht das Titelbild und die Beschreibung und injiziert einen Abschnitt in den tatsächlichen gebauten Beitrag.

Bild zeigt den neuen verwandten Abschnitt in einem Beitrag

Letzte Worte

Ich machte mich daran, Leser länger lesen zu lassen, und endete mit etwas, das ich sehr interessant finde, einem Muster. Vektorsuche erledigt das Abrufen, billig, schnell, mathematisch ehrlich über Ähnlichkeit. Ein Agent macht die Entscheidung, langsamer, aber in der Lage, zwei Kandidaten zu lesen und zu entscheiden, welchen ein Mensch tatsächlich als Nächstes möchte, und zu sagen, warum. Durable Execution umhüllt das Ganze; ein Workflow mit sechs LLM-Aufrufen kann so lässig wie eine einzelne Funktion behandelt werden. Und MongoDB Atlas macht still das, was jede KI-Pipeline braucht: Es hält die Vektoren, die Metadaten und die Filterung an einem abfragbaren Ort, kostenlos, mit IAM-Identität statt Passwörtern.

Schauen Sie sich meine anderen Beiträge auf jimmydqv.com an und folgen Sie mir auf LinkedIn und X für mehr Serverless-Inhalte.

Der gesamte Code für diesen Beitrag findet sich wie gewohnt auf der Serverless-Handbuch-Website

Jetzt loslegen!

If this saved you an afternoon, you can buy me a coffee.