tap

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package tap 提供对 LLM 响应流的拦截和解析能力,零缓冲地统计 token 用量。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildAnthropicSSE

func BuildAnthropicSSE(inputTokens, outputTokens int, textChunks []string) string

BuildAnthropicSSE 构建一个标准的 Anthropic SSE 响应序列(测试用)。

func ParseNonStreaming

func ParseNonStreaming(body []byte) (inputTokens, outputTokens int, err error)

ParseNonStreaming 解析 Anthropic 普通(非 streaming)JSON 响应体, 返回 inputTokens 和 outputTokens。 body 应为完整的响应 JSON,如:

{"id":"msg_1","type":"message","usage":{"input_tokens":100,"output_tokens":50},...}

Types

type AnthropicSSEParser

type AnthropicSSEParser struct {
	// contains filtered or unexported fields
}

AnthropicSSEParser 是一个流式 SSE 解析器,从任意大小的字节块中提取 Anthropic 流式响应的 token 用量,在 message_stop 事件触发回调。

设计原则:

  • 线程不安全(由调用方保证单线程 Feed)
  • 不分配大 buffer,每次 Feed 只在内部维护行缓冲
  • 回调仅触发一次(message_stop)

func NewAnthropicSSEParser

func NewAnthropicSSEParser(cb OnCompleteFunc) *AnthropicSSEParser

NewAnthropicSSEParser 创建解析器,注册完成回调。 cb 在解析到 message_stop 事件后调用一次。

func (*AnthropicSSEParser) Feed

func (p *AnthropicSSEParser) Feed(chunk []byte)

Feed 向解析器输入一段字节(可以是任意大小的 chunk,包括跨行的片段)。 调用者不应在 Feed 之后修改 p。

func (*AnthropicSSEParser) InputTokens

func (p *AnthropicSSEParser) InputTokens() int

InputTokens 返回已解析的输入 token 数量。

func (*AnthropicSSEParser) OutputTokens

func (p *AnthropicSSEParser) OutputTokens() int

OutputTokens 返回已解析的输出 token 数量。

type OnCompleteFunc

type OnCompleteFunc func(inputTokens, outputTokens int)

OnCompleteFunc 是 token 统计完成后的回调函数类型。

type TeeResponseWriter

type TeeResponseWriter struct {
	http.ResponseWriter // 原始 writer(透传 Header() 等方法)
	// contains filtered or unexported fields
}

TeeResponseWriter 包装 http.ResponseWriter,在不缓冲的情况下同时:

  1. 将字节原样转发给原始 Writer(客户端)
  2. 将字节 Feed 给 AnthropicSSEParser 解析 token 用量

当 streaming 响应结束(检测到 message_stop)时,异步调用 UsageWriter.Record()。 对于非 streaming 响应,调用方需手动调用 RecordNonStreaming()。

func NewTeeResponseWriter

func NewTeeResponseWriter(
	w http.ResponseWriter,
	logger *zap.Logger,
	usageWriter *db.UsageWriter,
	record db.UsageRecord,
) *TeeResponseWriter

NewTeeResponseWriter 创建 TeeResponseWriter。 record 应预填充 RequestID、UserID、Model、UpstreamURL、SourceNode 等字段; InputTokens/OutputTokens/StatusCode/DurationMs 将由 Tee 在流结束时填写。

func (*TeeResponseWriter) Flush

func (tw *TeeResponseWriter) Flush()

Flush 透传 Flush 调用(SSE 流必须立即刷新)。 实现 http.Flusher 接口。

func (*TeeResponseWriter) RecordNonStreaming

func (tw *TeeResponseWriter) RecordNonStreaming(body []byte, statusCode int, durationMs int64)

RecordNonStreaming 用于非 streaming 响应:解析完整 body,记录 usage。 调用方(sproxy.go 的 ModifyResponse 钩子)在读取完 body 后调用此方法。

func (*TeeResponseWriter) StatusCode

func (tw *TeeResponseWriter) StatusCode() int

StatusCode 返回记录的 HTTP 状态码(WriteHeader 调用后有效)。

func (*TeeResponseWriter) UpdateModel

func (tw *TeeResponseWriter) UpdateModel(model string)

UpdateModel 补充 usage record 中的模型字段(在 Director 之后获取 body 时调用)。

func (*TeeResponseWriter) Write

func (tw *TeeResponseWriter) Write(p []byte) (int, error)

Write 同时写入原始 writer 并 Feed 给 SSE 解析器。 实现 http.ResponseWriter.Write。

func (*TeeResponseWriter) WriteHeader

func (tw *TeeResponseWriter) WriteHeader(statusCode int)

WriteHeader 记录状态码并透传。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL