核心概念
观测层级
SDK 把 Agent 的一次执行抽象为如下层级:
AgentObserver
└── AgentRun → Langfuse trace / root observation
├── Generation → Langfuse generation(LLM 调用)
│ └── reasoning → generation 下的子 observation
├── ToolSpan → Langfuse span,名称为 tool:<toolName>
├── named Generation → 辅助 LLM 调用,如 title / summary
└── Score → Langfuse /api/public/scoresAgentObserver
AgentObserver 是 SDK 的主入口,负责管理 run 生命周期和 session alias。
- 一个
AgentObserver可同时维护多个 run observer.link(childId, rootId)可把子 session 归并到父 runobserver.gate是FlushGate实例,用于等待异步上报完成
const observer = new AgentObserver({ transport })
const run = observer.startRun({ id, name, input })
await observer.endRun(id, output)
await observer.gate.waitAll()AgentRun
AgentRun 对应一次完整的 Agent 执行,映射到 Langfuse 的一条 trace。
关键约束:
- 一个 run 只有一个主 generation;再次调用
startGeneration()会自动结束旧 generation - named generation 独立维护,不影响主 generation
endRun()会自动关闭未结束的 generation,并把未结束的 tool span 标记为泄漏错误
Generation
Generation 对应一次 LLM 调用,映射到 Langfuse 的 generation observation。
run.startGeneration({
model: "openai/gpt-4.1",
input: [{ role: "user", content: "..." }],
})
// 流式输出
run.appendText("...")
// 推理过程(可选)
run.appendReasoning("thinking...")
// 结束调用
run.endGeneration({
usage: { input: 100, output: 20, total: 120 },
cost: 0.01,
})ToolSpan
ToolSpan 对应一次工具执行,映射到 Langfuse 的 span,名称格式为 tool:<toolName>。
const tool = run.startTool({
id: "call-1", // 必须唯一且与 endTool 一致
name: "search",
input: { q: "..." },
})
// 成功
tool.end({ output: { result: "ok" } })
// 失败
tool.end({ error: "timeout" })必须配对
startTool() 和 tool.end() 必须配对调用,无论工具执行是否成功。 如果 run 结束时 tool span 仍未关闭,SDK 会将其标记为: span leaked: run ended before tool completed
Score
Score 是附加在 run 上的质量评分,映射到 Langfuse 的 score。
// 单个 score
run.score({
name: "agent_outcome",
value: "completed",
dataType: "CATEGORICAL", // CATEGORICAL | NUMERIC | BOOLEAN
comment: "正常完成", // 可选
})
// 多个 score
run.score([
{ name: "session_duration_ms", value: 1200, dataType: "NUMERIC" },
{ name: "tool_error", value: false, dataType: "BOOLEAN" },
])INFO
Score 是异步 fire-and-forget,Promise 会注册到 FlushGate。
FlushGate
FlushGate 解决"主链路已 idle,但 trace/score 还在异步上报"的问题。
// observer.gate 即 FlushGate 实例
await observer.gate.waitAll()- 每个任务最多等待 8000ms(默认)
waitAll()等待所有 pending promise settle 或超时- promise 错误会被吞掉,不影响业务退出
不能省略
进程退出前必须调用 observer.gate.waitAll(),否则低流量下容易出现 trace 丢失。
Transport
Transport 是 SDK 的上报接口,支持替换。
内置实现:LangfuseDirectTransport
- 支持直连 Langfuse(publicKey + secretKey)
- 支持业务代理(动态 authHeaders 回调)
enabled()为false时,startRun()返回undefined
ObservationContext
ObservationContext 是按 session id 缓存的上下文 Map,用于延迟注入 userId / tags / metadata:
// 在 startRun 之前预先设置上下文
observer.context.set(sessionId, {
userId: "user-1",
tags: ["agentType:workflow_agent"],
metadata: { org_uuid: "org-1" },
})
// startRun 会自动合并 context 和显式参数
// 显式参数优先;metadata 则 context 铺底、显式覆盖
const run = observer.startRun({ id: sessionId, name: "workflow_agent" })Session Alias(子 session)
当一个根 run 下有多个子 session 时,通过 link() 把子 session 归并到父 run:
observer.link(childSessionId, rootSessionId)
// 之后 observer.getRun(childSessionId) 返回根 run