serverless, aws, ai, mongodb

Dejando que un Agente de IA Elija Mis Publicaciones Relacionadas

2026-06-11
This post cover image
aws serverless lambda ai bedrock mongodb

Este archivo ha sido traducido automaticamente por IA, pueden ocurrir errores

Sigo extendiendo la pipeline de mi blog con pequeñas automatizaciones. Corrijo mis publicaciones con Amazon Nova, y las traduzco automáticamente, y cada paso se ejecuta serverless y basado en eventos. Esta vez fui tras algo en lo que he estado pensando durante mucho tiempo.

Escribo una publicación y alguien la encuentra, la lee hasta el final y luego se va. Esa última parte es en lo que he estado pensando. Un lector que acaba de terminar una publicación sobre Lambda probablemente está interesado en leer mis otras publicaciones serverless. No hice nada con ese momento, ningún "lee esto a continuación". Nada. La lectura terminó exactamente en el punto donde debería haber continuado.

Así que construí una pipeline que soluciona exactamente eso, un agente de IA que lee cada nueva publicación, busca todas mis publicaciones por significado y no por palabras clave, luego elige tres publicaciones relacionadas y me explica cada elección en una oración. La configuración es toda serverless y toda basada en eventos, ¿qué esperabas? Combiné Lambda Durable Functions, Strands Agents, Bedrock AgentCore Gateway y MongoDB Atlas Vector Search en una sola pipeline.

Todo el código para esta publicación se puede encontrar en serverless handbook como siempre

¿Por qué no usar simplemente etiquetas?

Mi blog es HTML estático y una ruta fácil sería usar etiquetas, como muchos otros sitios estáticos. Hice una prueba y simplemente no era lo suficientemente bueno. Uso etiquetas como serverless que son bastante amplias, pero también tengo algunas etiquetas más estrechas como lambda. Pero solo porque una publicación tenga la etiqueta serverless o lambda no significa que no pueda compartir algo con otras publicaciones. Las etiquetas responden a "¿con qué palabras la etiqueté?", no a "¿de qué trata realmente esta publicación?"

El siguiente paso sería usar la similitud de contenido. Aquí puedo crear embeddings vectoriales para cada publicación y devolver los tres vecinos más cercanos, ¡hecho! Bueno, sí, este es el respaldo de la pipeline, pero un crudo top tres vecinos más cercanos a menudo produce publicaciones casi duplicadas que cubren exactamente el mismo tema. La coincidencia pura de vecinos más cercanos puede decirme que dos publicaciones son similares, pero no me dice si un lector que terminó una obtendría algo de la otra.

Así que extendí el diseño, y el núcleo de todo es que la búsqueda vectorial recupera datos, y el agente de IA decide cuál se relaciona mejor. Comienzo reduciendo el número de publicaciones con búsqueda vectorial, luego un agente de IA, interpretando el papel de mi editor, lee las publicaciones y elige las tres que cree que los lectores disfrutarían más. Crea una justificación de una oración para explicar por qué eligió esa publicación. De esta manera puedo revisar y refinar el agente de IA con el tiempo.

Visión general de la arquitectura

Cuando se publica una publicación, la nueva pipeline del blog envía un evento PostPublished a un bus de EventBridge.

{
  "Source": "....",
  "DetailType": "PostPublished",
  "Detail": {
    "slug": "lambda-durable-functions-101",
    "language": "en",
    "branch": "main",
    "commit_sha": "f8a2c1d3..."
  }
}

Esto ahora invocará una Lambda Durable Function, que coordinará todo el flujo. Como la coordinación es principalmente pasos de código, decidí probar Durable Functions en lugar de Step Functions, que ha sido mi opción normal en el pasado. Con Durable Functions obtengo otra herramienta en mi caja de herramientas.

Imagen mostrando la visión general de la arquitectura, una Lambda durable orquestando Bedrock, AgentCore Gateway, MongoDB Atlas, DSQL y GitHub

La pipeline se ve así: obtiene el markdown de la publicación desde GitHub, llama a Bedrock para crear un embedding usando Amazon Titan Text Embeddings V2 y lo inserta en una colección de MongoDB Atlas. Luego, se llama al Agente de IA para elegir las tres publicaciones relacionadas, comenzando con una búsqueda vectorial en MongoDB Atlas usando una herramienta MCP alojada en AgentCore Gateway. Luego, las referencias de las publicaciones relacionadas se agregan a DSQL, que es parte de mi sistema CMS construido en casa, y finalmente actualiza el frontmatter de la publicación en GitHub y abre un PR.

También necesité asegurarme de que se crearan embeddings para todas las publicaciones actuales; de lo contrario, no funcionaría para nuevas publicaciones. Así que creé una pequeña configuración de relleno único.

Imagen mostrando la visión general de la arquitectura para el relleno

Eso es mucha piezas móviles para una función. Volveré a por qué eso funciona.

Desde Markdown hasta embeddings

Toda la pipeline depende de una transformación, convertir la publicación del blog en embeddings para que programáticamente podamos encontrar publicaciones similares.

Un modelo de embedding mapea texto a un vector de flotantes de longitud fija. En mi caso, 1024 flotantes de Titan Text Embeddings v2 de Bedrock. El modelo se entrenó para que textos sobre los mismos conceptos se ubiquen cerca en ese espacio de 1024 dimensiones, incluso cuando no comparten vocabulario. Mi publicación de Durable Functions y mi publicación de Step Functions terminan como vecinos porque ambas tratan sobre orquestar flujos de trabajo en AWS.

Imagen mostrando publicaciones del blog como puntos en un mapa semántico 2D con clústeres de temas y las tres publicaciones elegidas resaltadas

Lo que se incluye en el embedding importa mucho, al igual que el modelo. No incrusto todo el markdown crudo; primero ejecuto un pequeño paso de normalización que elimina el frontmatter YAML, elimina todos los ejemplos de código y las imágenes, y finalmente creo una nueva estructura que puede alimentarse al modelo.

TITLE: Lambda Durable Functions 101
TAGS: aws, lambda, serverless
SUMMARY: Un recorrido práctico para construir un flujo de trabajo duradero en AWS Lambda.
BODY: El cuerpo limpiado

Entonces, ¿por qué elimino todo el código? Bueno, los bloques de código son solo "ruido". Dos publicaciones sobre el mismo tema pueden tener ejemplos de código completamente diferentes, por lo que si incluyera el código, estas publicaciones ahora terminarían más lejos una de la otra en el espacio vectorial. El código no dice nada sobre la similitud, así que solo incluyo el contenido real del blog.

La llamada a Bedrock para crear el embedding es bastante pequeña.

_MODEL_ID = "amazon.titan-embed-text-v2:0"

def embed_text(text: str, dimensions: int = 1024) -> list[float]:
    response = _bedrock_client().invoke_model(
        modelId=_MODEL_ID,
        contentType="application/json",
        accept="application/json",
        body=json.dumps({
            "inputText": text,
            "dimensions": dimensions,
            "normalize": True,
        }),
    )
    payload = json.loads(response["body"].read())
    return [float(x) for x in payload["embedding"]]

Una bandera que necesitamos ver más de cerca es normalize: True. Piensa en cada embedding como una flecha apuntando a algún lugar en el espacio. Las publicaciones sobre el mismo tema apuntan en aproximadamente la misma dirección. Al comparar dos publicaciones, lo que importa es la dirección, no la longitud de las flechas. La receta estándar para esto es la similitud del coseno: multiplicar los dos vectores juntos, luego dividir por la longitud de cada flecha. Esa división solo está ahí para anular las longitudes. Con normalize: True, Titan recorta cada vector a una longitud de exactamente 1 antes de devolverlo. Y dividir por 1 no cambia nada. Entonces la comparación se reduce a solo el paso de multiplicación, un producto punto simple. El mismo resultado, menos matemáticas, y eso se suma cuando cada nueva publicación se compara con todo el catálogo anterior.

La entrada del embedding está limitada a 30,000 caracteres antes de esta llamada, por lo que las publicaciones muy largas se truncan en lugar de fallar (Titan v2 acepta alrededor de 8K tokens).

Al mismo tiempo se crea y almacena un hash SHA-256 del contenido junto con el vector, con esto puedo verificar si las publicaciones más antiguas han cambiado y necesitan actualizarse.

new_hash = compute_hash(markdown)  # sha256 sobre el cuerpo normalizado
existing = context.step(
    lambda _: find_by_id(slug, language),
    name="find_by_id",
)
if not force_recompute and existing and existing.get("content_hash") == new_hash:
    return {"skipped": True, "reason": "content_hash_unchanged"}

Almacenando los Vectores en MongoDB Atlas

Los vectores creados deben almacenarse en algún lugar donde puedan consultarse correctamente. Exploré algunas opciones diferentes como OpenSearch serverless y Aurora con pgvector. Pero al final decidí ejecutar esto con MongoDB Atlas, que se sintió como la mejor opción para mí. Y como puedo ejecutar un clúster M0 bajo el plan gratuito de MongoDB, eso fue una ventaja adicional.

MongoDB Atlas Vector Search ejecuta HNSW bajo el capó, el mismo algoritmo de vecinos más cercanos aproximado detrás de la mayoría de las bases de datos de vectores en producción. La consulta es una etapa de agregación $vectorSearch, lo que significa que el índice vectorial y los documentos viven en un solo sistema. Una consulta devuelve vecinos con sus títulos, resúmenes y etiquetas. Exactamente los datos que el agente necesita, sin segunda consulta, sin una base de datos de vectores separada para mantener sincronizada.

pipeline = [
    {"$vectorSearch": {
        "index": "posts_vector_idx",
        "path": "embedding",
        "queryVector": embedding,           # 1024 dims, from Titan
        "numCandidates": max(100, k * 10),
        "limit": k,
        "filter": {"language": language},   # evaluated DURING traversal
    }},
    {"$match": {"slug": {"$nin": exclude_slugs}}},
    {"$project": {
        "_id": 0,
        "slug": 1, "language": 1, "title": 1,
        "summary": 1, "tags": 1, "category": 1,
        "score": {"$meta": "vectorSearchScore"},
    }},
]
return list(_collection().aggregate(pipeline))

Esa línea de filter es muy importante para mí y mis blogs. Mis publicaciones existen en múltiples idiomas, y el vecino más cercano de cualquier publicación es confiablemente su propia traducción. Atlas me permite declarar language como un campo de filtro dentro del índice vectorial, por lo que la travesía HNSW solo considera vectores del mismo idioma.

Para la autenticación uso AWS IAM, sin contraseñas en ningún lugar. MongoDB Atlas acepta principios IAM de AWS como usuarios de base de datos a través del mecanismo MONGODB-AWS de pymongo. Mis funciones Lambda asumen un rol IAM dedicado a través de STS, un rol que fue federado con Atlas exactamente una vez, y las credenciales temporales fluyen directamente a la conexión de MongoDB.

def _assume_atlas_role() -> dict:
    sts = boto3.client("sts")
    response = sts.assume_role(
        RoleArn=os.environ["ATLAS_ROLE_ARN"],
        RoleSessionName="related-posts-atlas",
        DurationSeconds=3600,
    )
    return response["Credentials"]


def _client() -> MongoClient:
    creds = _assume_atlas_role()
    return MongoClient(
        _connection_config()["srvUri"],   # ...authMechanism=MONGODB-AWS
        username=creds["AccessKeyId"],
        password=creds["SecretAccessKey"],
        authMechanismProperties={"AWS_SESSION_TOKEN": creds["SessionToken"]},
    )

¿Por qué uso un rol dedicado en lugar del propio rol de ejecución de Lambda? Esto es básicamente por dos razones: en primer lugar, quiero poder manejar la configuración de MongoDB Atlas en un solo flujo. No quiero configurar parte de Atlas, luego necesitar desplegar funciones Lambda, extraer los ARNs del Rol y actualizar Atlas. Al separar esto, también puedo crear un rol que autorizo hacia Atlas, luego varias funciones Lambda pueden asumir el mismo rol y puedo controlar ese acceso correctamente con permisos IAM.

Si quieres profundizar en la autenticación sin contraseñas hacia Atlas, escribí sobre Federación de Identidad Saliente con MongoDB recientemente, que es una técnica diferente a la que uso aquí.

Editor de Contenido IA con herramientas

Aquí es donde la parte de IA se agudiza. El agente no es "llamar a un LLM con una indicación". Es un bucle. Se llama a Claude Sonnet 4.6 en Bedrock con una indicación del sistema para encontrar publicaciones relacionadas, dos herramientas, y él decide qué llamar, qué leer y cuándo terminar.

Imagen mostrando el bucle de herramientas del agente entre Strands, AgentCore Gateway, las funciones Lambda de herramientas y Atlas

Las dos herramientas son funciones Lambda simples manejadas como herramientas MCP en AgentCore Gateway. vector_search devuelve los top-k candidatos con títulos, resúmenes, etiquetas y puntuaciones de similitud. read_post_excerpt devuelve los primeros 1000 caracteres del cuerpo de una publicación, y el agente la llama solo cuando dos candidatos parecen intercambiables por sus resúmenes y quiere romper el empate leyendo realmente.

Un detalle de diseño deliberado en vector_search es que el agente pasa el slug de la publicación, nunca un embedding. La función Lambda de la herramienta resuelve el vector almacenado desde Atlas por sí mismo, por lo que el modelo no puede alucinar un array de 1024 flotantes malformado en el límite del índice.

Como se mencionó, las herramientas están frente a Bedrock AgentCore Gateway. El Gateway convierte mis funciones Lambda en un catálogo de herramientas que cualquier agente consciente de MCP puede descubrir y llamar, maneja el protocolo para que las Lambdas permanezcan libres de protocolo y autentica las llamadas entrantes con IAM SigV4 simple, sin un servidor OAuth para ejecutar. ¿Podría haber omitido el Gateway y pasar las herramientas como funciones Python en línea? Claro. Pero el siguiente agente que construya (y habrá uno) reutilizará el mismo catálogo de herramientas sin que yo tenga que copiar código.

El Gateway y sus objetivos son simplemente CloudFormation. Cada objetivo mapea una Lambda a un nombre de herramienta más un esquema de entrada, y ese esquema es lo que el modelo ve cuando decide cómo llamar a la herramienta.

RelatedPostsGateway:
  Type: AWS::BedrockAgentCore::Gateway
  Properties:
    Name: !Sub ${Application}-related-posts-gw
    ProtocolType: MCP
    RoleArn: !GetAtt GatewayExecutionRole.Arn
    AuthorizerType: AWS_IAM

VectorSearchTarget:
  Type: AWS::BedrockAgentCore::GatewayTarget
  Properties:
    GatewayIdentifier: !Ref RelatedPostsGateway
    Name: vector-search-target
    CredentialProviderConfigurations:
      - CredentialProviderType: GATEWAY_IAM_ROLE
    TargetConfiguration:
      Mcp:
        Lambda:
          LambdaArn: !GetAtt VectorSearchToolFunction.Arn
          ToolSchema:
            InlinePayload:
              - Name: vector_search
                Description: >
                  Búsqueda vectorial sobre el corpus de publicaciones del blog. Resuelve el
                  embedding almacenado de la publicación fuente desde Atlas por source_slug,
                  luego devuelve hasta k candidatos por similitud semántica.
                InputSchema:
                  Type: object
                  Properties:
                    source_slug:
                      Type: string
                    language:
                      Type: string
                    k:
                      Type: integer
                    exclude_slugs:
                      Type: array
                      Items:
                        Type: string
                  Required:
                    - source_slug

Y la función Lambda de la herramienta no sabe nada de MCP. El Gateway pasa la entrada de la herramienta como el evento, y el valor de retorno se convierte en la salida de la herramienta.

def handler(event: dict, context) -> dict:
    source_slug = event["source_slug"]
    language = event.get("language") or "en"

    embedding = find_embedding(source_slug, language)
    candidates = vector_search_neighbors(
        embedding=embedding,
        k=int(event.get("k", 20)),
        language=language,
        exclude_slugs=event.get("exclude_slugs"),
    )
    return {"candidates": candidates, "count": len(candidates)}

Strands Agents es el marco de agentes de código abierto de AWS, impulsa el bucle agente para que no tenga que implementarlo por mi cuenta.

from mcp_proxy_for_aws.client import aws_iam_streamablehttp_client
from strands import Agent
from strands.models import BedrockModel
from strands.tools.mcp.mcp_client import MCPClient

mcp_client = MCPClient(
    lambda: aws_iam_streamablehttp_client(
        endpoint=os.environ["AGENTCORE_GATEWAY_URL"],
        aws_service="bedrock-agentcore",
    )
)
model = BedrockModel(
    model_id="eu.anthropic.claude-sonnet-4-6",  # Perfil de inferencia entre regiones de la UE
    temperature=0.0,
    streaming=False,
)

with mcp_client:
    tools = mcp_client.list_tools_sync()
    agent = Agent(
        model=model,
        tools=tools,
        system_prompt=_SYSTEM_PROMPT,
        callback_handler=_BoundedToolCallHandler(max_calls=8),
    )
    result = agent(
        f"source_slug: {source_slug}\n"
        f"source_title: {source_title}\n"
        f"source_summary: {source_summary}\n"
        f"language: {language}\n\n"
        "Elige 3 publicaciones relacionadas."
    )

El comportamiento del modelo está moldeado casi completamente por la indicación del sistema. Persona, guía contra patrones y un contrato estricto.

Eres el editor de jimmydqv.com, un blog técnico sobre AWS, serverless y IA.
Tu trabajo es elegir exactamente 3 publicaciones relacionadas que un lector valoraría más después
de terminar la publicación actual.
Prefiere la profundidad temática sobre la superposición de palabras clave a nivel de superficie.
Cada elección necesita una justificación de una oración que nombre la conexión específica.

Flujo de trabajo:
1. Llama a vector_search con source_slug=, language=, k=20,
   exclude_slugs=[]. La herramienta resolverá el embedding desde Atlas
   internamente, NO intentes construir un embedding tú mismo.
2. Si dos candidatos se sienten intercambiables, opcionalmente llama a read_post_excerpt en uno
   para romper el empate.
3. Devuelve ESTRICTAMENTE esta forma JSON y nada más:
   {"picks": [{"slug": "...", "rationale": "..."}, ... exactamente 3 elementos ...]}
parsed = json.loads(_strip_fences(raw))
picks_raw = parsed.get("picks", [])
if len(picks_raw) != 3:
    raise ValueError(f"El agente debe devolver exactamente 3 elecciones, obtuvo {len(picks_raw)}")

Entonces, ¿por qué opté por el modelo Sonnet algo más costoso y no por el más delgado Haiku o incluso Nova 2? Probé varios modelos diferentes. Haiku y Nova 2 Light son más rápidos y económicos pero no devolvieron de manera confiable tres buenas elecciones; a veces obtuve dos, a veces cuatro, a veces elecciones realmente malas. Sonnet produjo elecciones de la manera más consistente, y una llamada toma dos o tres segundos.

Una ejecución típica: una vector_search, ocasionalmente una lectura de extracto, luego tres elecciones con justificaciones como "Ambos recorren el bucle de herramientas del agente en Bedrock; esta publicación se centra en la orquestación duradera alrededor de ella."

Orquestando con Lambda Durable Functions

La pipeline tarda de 30 a 90 segundos en ejecutarse, hace alrededor de seis llamadas a LLM, habla con cuatro sistemas externos. La respuesta clásica es Step Functions, y he usado Step Functions para orquestación muchas veces. Esta vez opté por Lambda Durable Functions en su lugar, y para una carga de trabajo de agente haría la misma elección de nuevo.

El modelo es simple. Todo el flujo de trabajo es Python ordinario en un manejador, y cada efecto secundario está envuelto en context.step(...). El resultado de cada paso está comprobado. Si la invocación falla, Bedrock se limita, tiempo de espera, cualquier cosa, una nueva invocación vuelve a ejecutar el manejador desde el principio, pero los pasos completados reproducen desde sus puntos de control en lugar de reejecutarse.

Imagen mostrando dos líneas de tiempo de invocación donde la segunda invocación reproduce pasos comprobados gratis y reanuda en el paso fallido

Convertir una Lambda regular en una función durable es una propiedad de SAM. El orquestador se declara así.

RelatedPostsOrchestrator:
  Type: AWS::Serverless::Function
  Properties:
    CodeUri: lambda/orchestrator
    Handler: handler.handler
    Runtime: python3.13
    Timeout: 900
    MemorySize: 1024
    AutoPublishAlias: live
    DurableConfig:
      ExecutionTimeout: 3600
      RetentionPeriodInDays: 7

El manejador en sí se lee como una lista de pasos.

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    slug = event["slug"]
    language = event.get("language", "en")

    # run_id no es determinista; compíralo dentro de un paso
    # para que las reproducciones reutilicen el mismo valor.
    run_id = context.step(lambda _: str(uuid.uuid4()), name="generate_run_id")

    file_path = context.step(
        lambda _: lookup_post_file_path(slug, language),
        name="lookup_file_path",
    )
    fetched = context.step(
        lambda _: invoke_fetch_source(file_path, branch, git_ref),
        name="fetch_source",
    )
    # ... cortocircuito de hash de contenido ...
    embedding = context.step(
        lambda _: embed_text(embed_input),
        name="embed_post",
    )
    context.step(lambda _: upsert_post(doc), name="upsert_post")

    primary_picks = context.run_in_child_context(
        lambda child_ctx: child_ctx.step(
            lambda _: _picks_for_one_post(slug, doc["title"], doc["summary"], language),
            name="agent_pick_primary",
        ),
        name="primary_agent_picks",
    )
    if len(primary_picks) != 3:
        raise ExecutionError(
            f"El agente devolvió {len(primary_picks)} elecciones, se necesitan exactamente 3"
        )
    # ... fan-out, persistir, PR ...

Observa el run_id. Incluso un uuid.uuid4() tiene que vivir dentro de un paso, porque una reproducción de otro modo generaría un id diferente y divergiría de la trayectoria del punto de control. La llamada al agente se ejecuta en un contexto secundario ya que orquesta varias invocaciones de herramientas MCP detrás de escena, y el contexto secundario mantiene el árbol de pasos ordenado.

Lo que el modelo de reproducción trae aquí es concreto: cuando una llamada al agente se limita, la reintento no vuelve a buscar desde GitHub, re-incrustar o re-insertar. El trabajo anterior costoso se reproduce gratis. Y la lógica del agente en sí, ramificación, análisis de JSON, "vuelve a ejecutar para cinco vecinos", es exactamente el tipo de código que lucha en una máquina de estados JSON y lee naturalmente en Python. El fan-out son unas pocas líneas.

batch = context.parallel(
    [_make_backlink_fn(n) for n in backlink_targets],
    name="backlink_fan_out",
    config=ParallelConfig(max_concurrency=5),
)
# Una sola pierna fallida no debería fallar la pipeline; guarda lo que tuvo éxito.
for item in batch.succeeded():
    result = item.result
    if result and len(result.get("picks") or []) == 3:
        backlink_results.append(result)

Los enlaces de publicaciones relacionadas hacia atrás

Esta es una parte interesante de la solución y la pipeline.

Encontrar las publicaciones relacionadas para una nueva publicación es la parte fácil y muy directa. La parte más difícil es, ¿qué pasa si agrego una nueva publicación que ahora es una mejor opción como publicación relacionada para una publicación antigua? Eso significaría que tendría que actualizar las publicaciones más antiguas también cuando se agrega una nueva, vinculándola hacia atrás. Porque si no hago esto, una publicación que escribí hace un año se quedaría "congelada" para siempre.

Imagen mostrando la nueva publicación en el centro con cinco publicaciones vecinas siendo reevaluadas en paralelo, cada una con su propio resultado

Así que después de incrustar la nueva publicación, el orquestador le pide a Atlas sus diez vecinos más cercanos y vuelve a ejecutar el agente para los cinco principales, cada uno como un paso durable independiente y paralelo. La ejecución del editor de cada vecino hace la pregunta completa desde cero: dado el blog como existe ahora, ¿cuáles son las tres mejores? A veces la nueva publicación desplaza una elección anterior. A veces nada cambia.

Sin trabajo por lotes nocturno, sin cron. El trabajo ocurre exactamente cuando la publicación del blog cambia, limitado al vecindario que cambió.

Entregando las elecciones

La salida del agente tiene que terminar en dos lugares, porque el blog es un sitio estático de 11ty.

Amazon DSQL es la fuente de verdad. Una fila por elección, escrita como DELETE-luego-INSERT en una sola transacción, por lo que las reejecuciones son idempotentes y una escritura fallida deja las elecciones antiguas intactas. Uso la misma configuración de DSQL que en mi serie AI Bartender, autenticación IAM y todo.

CREATE TABLE cms_content.related_posts (
    source_slug      TEXT NOT NULL,
    language         TEXT NOT NULL,
    position         SMALLINT NOT NULL,
    related_slug     TEXT NOT NULL,
    rationale        TEXT NOT NULL,
    similarity_score DOUBLE PRECISION,
    run_id           UUID NOT NULL,
    generated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (source_slug, language, position),
    CHECK (position BETWEEN 1 AND 3)
);

La escritura se ejecuta en una sola conexión, y el DELETE se revierte junto con una INSERT fallida, por lo que la tabla nunca está medio escrita.

conn = connector.get_connection(token, user)
try:
    with conn.cursor() as cur:
        cur.execute(
            "DELETE FROM cms_content.related_posts "
            "WHERE source_slug = %(s)s AND language = %(l)s",
            {"s": source_slug, "l": language},
        )
        for position, pick in enumerate(picks, start=1):
            cur.execute(
                "INSERT INTO cms_content.related_posts "
                "(source_slug, language, position, related_slug, "
                " rationale, similarity_score, run_id) "
                "VALUES (%(src)s, %(lang)s, %(pos)s, %(rel)s, "
                "        %(rat)s, %(sim)s, %(run)s)",
                {...},
            )
    conn.commit()
except Exception:
    conn.rollback()
    raise

Mi panel CMS lee esta tabla y muestra las elecciones, con justificaciones, junto a cada publicación. El último paso del orquestador emite un evento completado que fluye a través de EventBridge a AppSync Events, por lo que el panel se actualiza en tiempo real cuando termina una ejecución. Sin sondeo en ningún lugar.

Un PR de GitHub lo hace visible en el blog. La pipeline inyecta un bloque related_posts: en el frontmatter de las publicaciones, hasta seis archivos en un PR (la nueva publicación más los vecinos actualizados). La próxima compilación del sitio representa la sección.

related_posts:
  - slug: "step-functions-vs-durable-functions"
    rationale: "Comparación directa del enfoque de orquestación utilizado aquí contra la alternativa SFN."

¿Por qué un PR en lugar de comprometer directamente a main? Así es como hacen todas mis diferentes pipelines de mejora, me da como humano una última verificación antes de que salga al aire.

Resultado final

Al final, la pipeline agregará las publicaciones relacionadas en el frontmatter, y el proceso de compilación de 11ty las recogerá, buscará la imagen de portada y la descripción, e injertará una sección en la publicación construida.

Imagen mostrando la nueva sección relacionada en una publicación

Palabras finales

Empecé para mantener a los lectores leyendo, y terminé con algo que encuentro muy interesante, un patrón. La búsqueda vectorial hace la recuperación, barata, rápida, matemáticamente honesta sobre la similitud. Un agente hace el juicio, más lento, pero capaz de leer dos candidatos y decidir cuál uno humano realmente querría a continuación, y decir por qué. La ejecución durable envuelve todo; un flujo de trabajo de seis llamadas a LLM puede ser tratado tan casualmente como una sola función. Y MongoDB Atlas silenciosamente hace lo que toda pipeline de IA necesita: mantiene los vectores, los metadatos y el filtrado en un lugar consultable, gratis, con identidad IAM en lugar de contraseñas.

Echa un vistazo a mis otras publicaciones en jimmydqv.com y sígueme en LinkedIn y X para más contenido serverless.

Todo el código para esta publicación se puede encontrar en serverless handbook como siempre

¡Ahora ve a construir!

If this saved you an afternoon, you can buy me a coffee.