Skip to content

核心概念

观测层级

SDK 把 Agent 的一次执行抽象为如下层级:

AgentObserver
└── AgentRun              → Langfuse trace / root observation
    ├── Generation        → Langfuse generation(LLM 调用)
    │   └── reasoning     → generation 下的子 observation
    ├── ToolSpan          → Langfuse span,名称为 tool:<toolName>
    ├── named Generation  → 辅助 LLM 调用,如 title / summary
    └── Score             → Langfuse /api/public/scores

AgentObserver

AgentObserver 是 SDK 的主入口,负责管理 run 生命周期和 session alias。

  • 一个 AgentObserver 可同时维护多个 run
  • observer.link(childId, rootId) 可把子 session 归并到父 run
  • observer.gateFlushGate 实例,用于等待异步上报完成
ts
const observer = new AgentObserver({ transport })
const run = observer.startRun({ id, name, input })
await observer.endRun(id, output)
await observer.gate.waitAll()

AgentRun

AgentRun 对应一次完整的 Agent 执行,映射到 Langfuse 的一条 trace。

关键约束:

  • 一个 run 只有一个主 generation;再次调用 startGeneration() 会自动结束旧 generation
  • named generation 独立维护,不影响主 generation
  • endRun() 会自动关闭未结束的 generation,并把未结束的 tool span 标记为泄漏错误

Generation

Generation 对应一次 LLM 调用,映射到 Langfuse 的 generation observation。

ts
run.startGeneration({
  model: "openai/gpt-4.1",
  input: [{ role: "user", content: "..." }],
})

// 流式输出
run.appendText("...")

// 推理过程(可选)
run.appendReasoning("thinking...")

// 结束调用
run.endGeneration({
  usage: { input: 100, output: 20, total: 120 },
  cost: 0.01,
})

ToolSpan

ToolSpan 对应一次工具执行,映射到 Langfuse 的 span,名称格式为 tool:<toolName>

ts
const tool = run.startTool({
  id: "call-1",      // 必须唯一且与 endTool 一致
  name: "search",
  input: { q: "..." },
})

// 成功
tool.end({ output: { result: "ok" } })

// 失败
tool.end({ error: "timeout" })

必须配对

startTool()tool.end() 必须配对调用,无论工具执行是否成功。 如果 run 结束时 tool span 仍未关闭,SDK 会将其标记为: span leaked: run ended before tool completed

Score

Score 是附加在 run 上的质量评分,映射到 Langfuse 的 score。

ts
// 单个 score
run.score({
  name: "agent_outcome",
  value: "completed",
  dataType: "CATEGORICAL",  // CATEGORICAL | NUMERIC | BOOLEAN
  comment: "正常完成",       // 可选
})

// 多个 score
run.score([
  { name: "session_duration_ms", value: 1200, dataType: "NUMERIC" },
  { name: "tool_error", value: false, dataType: "BOOLEAN" },
])

INFO

Score 是异步 fire-and-forget,Promise 会注册到 FlushGate

FlushGate

FlushGate 解决"主链路已 idle,但 trace/score 还在异步上报"的问题。

ts
// observer.gate 即 FlushGate 实例
await observer.gate.waitAll()
  • 每个任务最多等待 8000ms(默认)
  • waitAll() 等待所有 pending promise settle 或超时
  • promise 错误会被吞掉,不影响业务退出

不能省略

进程退出前必须调用 observer.gate.waitAll(),否则低流量下容易出现 trace 丢失。

Transport

Transport 是 SDK 的上报接口,支持替换。

内置实现:LangfuseDirectTransport

  • 支持直连 Langfuse(publicKey + secretKey)
  • 支持业务代理(动态 authHeaders 回调)
  • enabled()false 时,startRun() 返回 undefined

ObservationContext

ObservationContext 是按 session id 缓存的上下文 Map,用于延迟注入 userId / tags / metadata:

ts
// 在 startRun 之前预先设置上下文
observer.context.set(sessionId, {
  userId: "user-1",
  tags: ["agentType:workflow_agent"],
  metadata: { org_uuid: "org-1" },
})

// startRun 会自动合并 context 和显式参数
// 显式参数优先;metadata 则 context 铺底、显式覆盖
const run = observer.startRun({ id: sessionId, name: "workflow_agent" })

Session Alias(子 session)

当一个根 run 下有多个子 session 时,通过 link() 把子 session 归并到父 run:

ts
observer.link(childSessionId, rootSessionId)
// 之后 observer.getRun(childSessionId) 返回根 run

MIT License