Documentation
¶
Overview ¶
Package ingest provides an HTTP server that accepts completed LLM conversation turns for storage in the Merkle DAG. This enables "sidecar mode" where an external gateway (e.g., Envoy AI Gateway) handles upstream LLM traffic and tapes only stores, embeds, and publishes the data.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEnvelope means the POST body could not be decoded as a TurnPayload. // Returned as 400 Bad Request. ErrEnvelope = errors.New("invalid envelope") // ErrUnprocessable covers validation / parse failures inside a well-formed // envelope: unknown provider, unparseable provider-specific request / // response body, etc. Returned as 422 Unprocessable Entity. ErrUnprocessable = errors.New("unprocessable turn") // ErrDownstream covers failures that originate below the handler: worker // pool saturation, DAG write errors, storage unavailability. Returned as // 502 Bad Gateway. ErrDownstream = errors.New("downstream failure") )
Ingest error classes. Each maps to a distinct HTTP status so operators can tell malformed envelopes from unknown providers from downstream outages without tailing logs.
Functions ¶
This section is empty.
Types ¶
type BatchPayload ¶
type BatchPayload struct {
Turns []TurnPayload `json:"turns"`
}
BatchPayload is the ingest request body for multiple conversation turns.
type BatchResult ¶
type BatchResult struct {
Accepted int `json:"accepted"`
Rejected int `json:"rejected"`
Errors []string `json:"errors,omitempty"`
}
BatchResult reports the outcome of a batch ingest.
type Config ¶
type Config struct {
// ListenAddr is the address to listen on (e.g., ":8082")
ListenAddr string
// VectorDriver is an optional vector store for storing embeddings.
// If nil, vector storage is disabled.
VectorDriver vector.Driver
// Embedder is an optional embedder for generating embeddings.
// Required if VectorDriver is set.
Embedder embeddings.Embedder
// Publisher is an optional event publisher for new DAG nodes.
// If nil, publishing is disabled.
Publisher publisher.Publisher
// Project is the git repository or project name to tag on stored nodes.
Project string
}
Config is the ingest server configuration.
type Metrics ¶ added in v0.5.2
type Metrics struct {
// contains filtered or unexported fields
}
Metrics enumerates the Prometheus counters and histograms emitted by the ingest server. Metric names are fixed so dashboards and alerts reference stable identifiers.
func NewMetrics ¶ added in v0.5.2
func NewMetrics() *Metrics
NewMetrics builds a fresh registry and registers the ingest metric set on it. Each Server owns its own registry so tests don't leak counters across suite runs (the default prometheus registry is global state).
func (*Metrics) Handler ¶ added in v0.5.2
Handler returns an http.Handler that serves the Prometheus scrape endpoint backed by this Metrics' registry.
func (*Metrics) ObserveDAGLatency ¶ added in v0.5.2
ObserveDAGLatency records how long it took to enqueue a turn into the worker pool. Latency is a cheap proxy for back-pressure so we graph it even though enqueue is nominally O(1) — a slow enqueue hints at queue saturation.
func (*Metrics) ObserveWrite ¶ added in v0.5.2
ObserveWrite increments the writes counter for a given provider/result. A zero-length provider label becomes "unknown" so scrapes don't drop rows.
func (*Metrics) Registry ¶ added in v0.5.2
func (m *Metrics) Registry() *prometheus.Registry
Registry exposes the backing *prometheus.Registry so callers can mount a scrape handler or assert on the metric state in tests.
func (*Metrics) SetQueueDepth ¶ added in v0.5.2
SetQueueDepth updates the worker queue depth gauge.
type NodeListResponse ¶ added in v0.5.2
type NodeListResponse struct {
Count int `json:"count"`
Nodes []NodeSummary `json:"nodes"`
}
NodeListResponse wraps a slice of NodeSummary in a count+items envelope so the shape is consistent with the rest of the tapes query surface.
type NodeSummary ¶ added in v0.5.2
type NodeSummary struct {
Hash string `json:"hash"`
Role string `json:"role"`
Provider string `json:"provider"`
Model string `json:"model"`
AgentName string `json:"agent_name,omitempty"`
}
NodeSummary is the per-item shape returned by GET /v1/ingest/nodes. Intentionally minimal — operators and canaries only need to confirm a row landed for a given agent; rich querying is served by the tapes-api.
type Result ¶ added in v0.5.2
type Result string
Result enumerates the status-label values emitted on the writes counter. Closed enumeration keeps dashboards safe against label typos.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is an HTTP server that accepts completed LLM conversation turns for async storage in the Merkle DAG.
func (*Server) Close ¶
Close gracefully shuts down the server and waits for the worker pool to drain.
func (*Server) Metrics ¶ added in v0.5.2
Metrics exposes the ingest metrics so tests and health checks can scrape the registry programmatically.
type TurnPayload ¶
type TurnPayload struct {
// Provider type: "openai", "anthropic", "ollama"
Provider string `json:"provider"`
// AgentName optionally tags the turn (same as X-Tapes-Agent-Name header)
AgentName string `json:"agent_name,omitempty"`
// RawRequest is the original request body sent to the LLM provider.
RawRequest json.RawMessage `json:"request"`
// Response is the already reduced, provider-agnostic response for the turn.
Response llm.ChatResponse `json:"response"`
}
TurnPayload is the ingest request body for a single completed conversation turn. It carries the raw provider request plus an already-reduced response. Capture adapters such as tapes-extproc own protocol-specific stream reduction; ingest owns request parsing, validation, and durable storage.