AgentObserver
SDK 主入口,负责管理 run 生命周期和 session alias。
构造函数
ts
new AgentObserver(config: ObserverConfig)ObserverConfig
ts
type ObserverConfig = {
transport: Transport
}示例:
ts
const observer = new AgentObserver({
transport: new LangfuseDirectTransport({
host: "https://langfuse.example.com",
publicKey: process.env.LANGFUSE_PUBLIC_KEY!,
secretKey: process.env.LANGFUSE_SECRET_KEY!,
environment: "prod",
}),
})属性
observer.gate
类型:FlushGate
用于等待所有异步上报完成。进程退出前必须调用 await observer.gate.waitAll()。
observer.context
类型:ObservationContext
按 session id 缓存上下文(userId / tags / metadata),供 startRun() 合并使用。
方法
startRun(input: RunInput): AgentRun | undefined
开始一次 Agent 运行,返回 AgentRun 实例。
若 transport 已禁用(transport.enabled() 返回 false),返回 undefined。
ts
const run = observer.startRun({
id: "session-1",
name: "my_agent",
input: "用户输入",
userId: "user-123",
tags: ["agentType:my_agent"],
metadata: { org_uuid: "org-456" },
})RunInput
ts
type RunInput = {
id: string // 稳定唯一的 run 标识,对应 Langfuse trace id 来源
name: string // agent 类型或名称
input?: unknown // 用户输入
userId?: string // 用户 ID
tags?: string[] // 标签列表
metadata?: Record<string, unknown> // 业务元数据
}Context 合并规则
startRun() 会把 observer.context.get(id) 和显式 RunInput 合并:
userId、tags:显式传入优先,否则用 contextmetadata:context 铺底,显式 metadata 覆盖
endRun(id: string, output?: unknown): Promise<void>
结束指定 run,触发 transport flush,并将 flush Promise 注册到 FlushGate。
ts
await observer.endRun("session-1", "最终输出")getRun(id: string): AgentRun | undefined
获取正在运行的 run 实例。支持 session alias(通过 link() 注册的子 id)。
ts
const run = observer.getRun("session-1")link(childId: string, rootId: string): void
将子 session 归并到父 run。之后对 getRun(childId) 的访问会返回父 run。
ts
observer.link("child-session-1", "root-session-1")用于多级 Agent 场景:
ts
// 子 session 创建时
observer.link(childSessionId, rootSessionId)
// 之后对子 session 的操作都落到父 run
const run = observer.getRun(childSessionId) // 返回根 runWARNING
一个 child id 只能指向一个 root id。