ingest

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Overview

Package ingest provides an HTTP server that accepts completed LLM conversation turns for storage in the Merkle DAG. This enables "sidecar mode" where an external gateway (e.g., Envoy AI Gateway) handles upstream LLM traffic and tapes only stores, embeds, and publishes the data.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEnvelope means the POST body could not be decoded as a TurnPayload.
	// Returned as 400 Bad Request.
	ErrEnvelope = errors.New("invalid envelope")

	// ErrUnprocessable covers validation / parse failures inside a well-formed
	// envelope: unknown provider, unparseable provider-specific request /
	// response body, etc. Returned as 422 Unprocessable Entity.
	ErrUnprocessable = errors.New("unprocessable turn")

	// ErrDownstream covers failures that originate below the handler: worker
	// pool saturation, DAG write errors, storage unavailability. Returned as
	// 502 Bad Gateway.
	ErrDownstream = errors.New("downstream failure")
)

Ingest error classes. Each maps to a distinct HTTP status so operators can tell malformed envelopes from unknown providers from downstream outages without tailing logs.

Functions

This section is empty.

Types

type BatchPayload

type BatchPayload struct {
	Turns []TurnPayload `json:"turns"`
}

BatchPayload is the ingest request body for multiple conversation turns.

type BatchResult

type BatchResult struct {
	Accepted int      `json:"accepted"`
	Rejected int      `json:"rejected"`
	Errors   []string `json:"errors,omitempty"`
}

BatchResult reports the outcome of a batch ingest.

type Config

type Config struct {
	// ListenAddr is the address to listen on (e.g., ":8082")
	ListenAddr string

	// VectorDriver is an optional vector store for storing embeddings.
	// If nil, vector storage is disabled.
	VectorDriver vector.Driver

	// Embedder is an optional embedder for generating embeddings.
	// Required if VectorDriver is set.
	Embedder embeddings.Embedder

	// Publisher is an optional event publisher for new DAG nodes.
	// If nil, publishing is disabled.
	Publisher publisher.Publisher

	// Project is the git repository or project name to tag on stored nodes.
	Project string
}

Config is the ingest server configuration.

type Metrics added in v0.5.2

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics enumerates the Prometheus counters and histograms emitted by the ingest server. Metric names are fixed so dashboards and alerts reference stable identifiers.

func NewMetrics added in v0.5.2

func NewMetrics() *Metrics

NewMetrics builds a fresh registry and registers the ingest metric set on it. Each Server owns its own registry so tests don't leak counters across suite runs (the default prometheus registry is global state).

func (*Metrics) Handler added in v0.5.2

func (m *Metrics) Handler() http.Handler

Handler returns an http.Handler that serves the Prometheus scrape endpoint backed by this Metrics' registry.

func (*Metrics) ObserveDAGLatency added in v0.5.2

func (m *Metrics) ObserveDAGLatency(provider string, seconds float64)

ObserveDAGLatency records how long it took to enqueue a turn into the worker pool. Latency is a cheap proxy for back-pressure so we graph it even though enqueue is nominally O(1) — a slow enqueue hints at queue saturation.

func (*Metrics) ObserveWrite added in v0.5.2

func (m *Metrics) ObserveWrite(provider string, result Result, bodyBytes int)

ObserveWrite increments the writes counter for a given provider/result. A zero-length provider label becomes "unknown" so scrapes don't drop rows.

func (*Metrics) Registry added in v0.5.2

func (m *Metrics) Registry() *prometheus.Registry

Registry exposes the backing *prometheus.Registry so callers can mount a scrape handler or assert on the metric state in tests.

func (*Metrics) SetQueueDepth added in v0.5.2

func (m *Metrics) SetQueueDepth(depth int)

SetQueueDepth updates the worker queue depth gauge.

type NodeListResponse added in v0.5.2

type NodeListResponse struct {
	Count int           `json:"count"`
	Nodes []NodeSummary `json:"nodes"`
}

NodeListResponse wraps a slice of NodeSummary in a count+items envelope so the shape is consistent with the rest of the tapes query surface.

type NodeSummary added in v0.5.2

type NodeSummary struct {
	Hash      string `json:"hash"`
	Role      string `json:"role"`
	Provider  string `json:"provider"`
	Model     string `json:"model"`
	AgentName string `json:"agent_name,omitempty"`
}

NodeSummary is the per-item shape returned by GET /v1/ingest/nodes. Intentionally minimal — operators and canaries only need to confirm a row landed for a given agent; rich querying is served by the tapes-api.

type Result added in v0.5.2

type Result string

Result enumerates the status-label values emitted on the writes counter. Closed enumeration keeps dashboards safe against label typos.

const (
	ResultAccepted      Result = "accepted"
	ResultRejectEnv     Result = "reject_envelope"
	ResultRejectParse   Result = "reject_parse"
	ResultUnknownProv   Result = "unknown_provider"
	ResultQueueFull     Result = "queue_full"
	ResultDownstreamErr Result = "downstream_error"
)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an HTTP server that accepts completed LLM conversation turns for async storage in the Merkle DAG.

func New

func New(config Config, driver storage.Driver, log *slog.Logger) (*Server, error)

New creates a new ingest Server.

func (*Server) Close

func (s *Server) Close() error

Close gracefully shuts down the server and waits for the worker pool to drain.

func (*Server) Metrics added in v0.5.2

func (s *Server) Metrics() *Metrics

Metrics exposes the ingest metrics so tests and health checks can scrape the registry programmatically.

func (*Server) Run

func (s *Server) Run() error

Run starts the ingest server on the configured address.

func (*Server) RunWithListener

func (s *Server) RunWithListener(listener net.Listener) error

RunWithListener starts the ingest server using the provided listener.

type TurnPayload

type TurnPayload struct {
	// Provider type: "openai", "anthropic", "ollama"
	Provider string `json:"provider"`

	// AgentName optionally tags the turn (same as X-Tapes-Agent-Name header)
	AgentName string `json:"agent_name,omitempty"`

	// RawRequest is the original request body sent to the LLM provider.
	RawRequest json.RawMessage `json:"request"`

	// Response is the already reduced, provider-agnostic response for the turn.
	Response llm.ChatResponse `json:"response"`
}

TurnPayload is the ingest request body for a single completed conversation turn. It carries the raw provider request plus an already-reduced response. Capture adapters such as tapes-extproc own protocol-specific stream reduction; ingest owns request parsing, validation, and durable storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL