LangfuseDirectTransport
内置的 Langfuse transport 实现,支持直连和代理两种鉴权模式。
构造函数
ts
new LangfuseDirectTransport(config: LangfuseDirectConfig)LangfuseDirectConfig
ts
type LangfuseDirectConfig = {
host: string // Langfuse base URL,为空时 transport disabled
publicKey?: string // Langfuse public key(直连模式)
secretKey?: string // Langfuse secret key(直连模式)
authHeaders?: (sessionId: string) => Record<string, string> // 代理模式动态 headers
environment?: string // 环境名,默认 "default"
release?: string // 写入 langfuse.release attribute
version?: string // 写入 langfuse.version attribute
log?: LogFn // SDK 日志回调
fetch?: typeof fetch // 测试或特殊运行时注入 fetch
}鉴权模式
直连模式(publicKey + secretKey)
ts
new LangfuseDirectTransport({
host: "https://langfuse.example.com",
publicKey: "pk-xxx",
secretKey: "sk-xxx",
environment: "prod",
})使用 HTTP Basic 认证:Authorization: Basic base64(publicKey:secretKey)
代理模式(authHeaders 回调)
ts
new LangfuseDirectTransport({
host: "https://power.example.com/langfuse",
environment: "prod",
authHeaders: (sessionId) => ({
Authorization: `Bearer ${resolveToken(sessionId)}`,
}),
})authHeaders 在每次 OTEL export 前调用,确保 session token 过期后能自动刷新。
上报端点
| 类型 | URL |
|---|---|
| trace(OTEL) | ${host}/api/public/otel/v1/traces |
| health check | ${host}/api/public/health |
| score | ${host}/api/public/scores |
实现细节
startRun()首次调用时懒初始化 Langfuse SDK- 初始化后异步调用 health check;失败只打 warn,不阻塞主链路
- trace id 由
sha256(run.id).slice(0, 32)稳定生成 - 使用
LangfuseSpanProcessor+OTLPTraceExporter上报 trace score()直接 HTTP POST/api/public/scores,不走 OTEL exporter
Transport 接口
若需要自定义 transport,实现以下接口:
ts
type Transport = {
enabled(): boolean
environment(): string
startRun(input: RunInput): TransportRun | undefined
score(input: {
traceId: string
scores: ScoreSpec[]
sessionId: string
agent: string
environment: string
}): Promise<void>
flush(): Promise<void>
}约束:
enabled()为false时,startRun()不应返回有效 runenvironment()必须返回当前环境名,如test、prodflush()必须尽量推出 pending trace/score,但失败不能影响主链路
禁用
host 为空字符串时,transport 自动禁用:
ts
new LangfuseDirectTransport({ host: "" })
// transport.enabled() === false
// observer.startRun() 返回 undefined