Documentation
¶
Overview ¶
Package tap 提供对 LLM 响应流的拦截和解析能力,零缓冲地统计 token 用量。
Index ¶
- func BuildAnthropicSSE(inputTokens, outputTokens int, textChunks []string) string
- func ParseNonStreaming(body []byte) (inputTokens, outputTokens int, err error)
- type AnthropicSSEParser
- type OnCompleteFunc
- type TeeResponseWriter
- func (tw *TeeResponseWriter) Flush()
- func (tw *TeeResponseWriter) RecordNonStreaming(body []byte, statusCode int, durationMs int64)
- func (tw *TeeResponseWriter) StatusCode() int
- func (tw *TeeResponseWriter) UpdateModel(model string)
- func (tw *TeeResponseWriter) Write(p []byte) (int, error)
- func (tw *TeeResponseWriter) WriteHeader(statusCode int)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildAnthropicSSE ¶
BuildAnthropicSSE 构建一个标准的 Anthropic SSE 响应序列(测试用)。
func ParseNonStreaming ¶
ParseNonStreaming 解析 Anthropic 普通(非 streaming)JSON 响应体, 返回 inputTokens 和 outputTokens。 body 应为完整的响应 JSON,如:
{"id":"msg_1","type":"message","usage":{"input_tokens":100,"output_tokens":50},...}
Types ¶
type AnthropicSSEParser ¶
type AnthropicSSEParser struct {
// contains filtered or unexported fields
}
AnthropicSSEParser 是一个流式 SSE 解析器,从任意大小的字节块中提取 Anthropic 流式响应的 token 用量,在 message_stop 事件触发回调。
设计原则:
- 线程不安全(由调用方保证单线程 Feed)
- 不分配大 buffer,每次 Feed 只在内部维护行缓冲
- 回调仅触发一次(message_stop)
func NewAnthropicSSEParser ¶
func NewAnthropicSSEParser(cb OnCompleteFunc) *AnthropicSSEParser
NewAnthropicSSEParser 创建解析器,注册完成回调。 cb 在解析到 message_stop 事件后调用一次。
func (*AnthropicSSEParser) Feed ¶
func (p *AnthropicSSEParser) Feed(chunk []byte)
Feed 向解析器输入一段字节(可以是任意大小的 chunk,包括跨行的片段)。 调用者不应在 Feed 之后修改 p。
func (*AnthropicSSEParser) InputTokens ¶
func (p *AnthropicSSEParser) InputTokens() int
InputTokens 返回已解析的输入 token 数量。
func (*AnthropicSSEParser) OutputTokens ¶
func (p *AnthropicSSEParser) OutputTokens() int
OutputTokens 返回已解析的输出 token 数量。
type OnCompleteFunc ¶
type OnCompleteFunc func(inputTokens, outputTokens int)
OnCompleteFunc 是 token 统计完成后的回调函数类型。
type TeeResponseWriter ¶
type TeeResponseWriter struct {
http.ResponseWriter // 原始 writer(透传 Header() 等方法)
// contains filtered or unexported fields
}
TeeResponseWriter 包装 http.ResponseWriter,在不缓冲的情况下同时:
- 将字节原样转发给原始 Writer(客户端)
- 将字节 Feed 给 AnthropicSSEParser 解析 token 用量
当 streaming 响应结束(检测到 message_stop)时,异步调用 UsageWriter.Record()。 对于非 streaming 响应,调用方需手动调用 RecordNonStreaming()。
func NewTeeResponseWriter ¶
func NewTeeResponseWriter( w http.ResponseWriter, logger *zap.Logger, usageWriter *db.UsageWriter, record db.UsageRecord, ) *TeeResponseWriter
NewTeeResponseWriter 创建 TeeResponseWriter。 record 应预填充 RequestID、UserID、Model、UpstreamURL、SourceNode 等字段; InputTokens/OutputTokens/StatusCode/DurationMs 将由 Tee 在流结束时填写。
func (*TeeResponseWriter) Flush ¶
func (tw *TeeResponseWriter) Flush()
Flush 透传 Flush 调用(SSE 流必须立即刷新)。 实现 http.Flusher 接口。
func (*TeeResponseWriter) RecordNonStreaming ¶
func (tw *TeeResponseWriter) RecordNonStreaming(body []byte, statusCode int, durationMs int64)
RecordNonStreaming 用于非 streaming 响应:解析完整 body,记录 usage。 调用方(sproxy.go 的 ModifyResponse 钩子)在读取完 body 后调用此方法。
func (*TeeResponseWriter) StatusCode ¶
func (tw *TeeResponseWriter) StatusCode() int
StatusCode 返回记录的 HTTP 状态码(WriteHeader 调用后有效)。
func (*TeeResponseWriter) UpdateModel ¶
func (tw *TeeResponseWriter) UpdateModel(model string)
UpdateModel 补充 usage record 中的模型字段(在 Director 之后获取 body 时调用)。
func (*TeeResponseWriter) Write ¶
func (tw *TeeResponseWriter) Write(p []byte) (int, error)
Write 同时写入原始 writer 并 Feed 给 SSE 解析器。 实现 http.ResponseWriter.Write。
func (*TeeResponseWriter) WriteHeader ¶
func (tw *TeeResponseWriter) WriteHeader(statusCode int)
WriteHeader 记录状态码并透传。