sessionpipeline

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package sessionpipeline defines the shape convention and runner for streaming session transforms in the ox session-stop pipeline.

Why this package exists now (with one consumer)

Today there is one streaming transform (pkg/tokenopt). Normally we would wait for a second consumer before extracting an interface. We introduced the shape now — deliberately — because:

  1. The planned pipeline has concrete additional stages with known shapes: a REDACT.md-aware LLM redactor (streaming, buffers internally to batch), and eventually a streamified pattern redactor. Pre-baking the Stage interface means the second implementation adopts it on arrival, not as a retrofit across two packages.

  2. The planned pipeline reorders: the realistic sequence is raw → pattern-redact → compress → llm-redact → summarize. Hardcoded function-call sequencing would resist that reordering; a slice of Stage makes it a data change.

  3. Stage composition via io.Pipe is cheap enough (~100 lines) that writing it now and feeding the one current transform through it exercises the seam. If the seam is wrong, we find out today with one consumer, not next quarter with three.

What we deliberately did NOT do

The existing in-memory redactor (internal/session.Redactor) is NOT rewritten to streaming. It works, it's fast, and ROI on the rewrite comes only when a second redactor ships. When the LLM redactor lands, build it streaming from day 1 — that's the second data point that justifies retroactively streaming the first. Until then, the redactor keeps its slice-based API and runs before the pipeline (Phase 2), writing the canonical raw.jsonl that the pipeline reads from.

Shape convention

Every streaming session transform should satisfy the Stage interface. Transforms that need cross-entry state (LLM redaction batching entries, cross-session dedup) buffer internally but still present the streaming interface at the boundary. That buffering is their concern, not the pipeline's.

Telemetry

Stage stats types should implement slog.LogValuer. The runner emits a per-stage log via slog.Info with the stage name and stats. A single telemetry event per pipeline run is emitted at the orchestrator level; this package stays out of the telemetry pipeline.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Stage

type Stage interface {
	// Name identifies the stage in logs and errors. Short, no spaces.
	// Conventionally the package name or a qualified variant.
	Name() string

	// Apply runs the transform. Must drain r (or honor ctx cancellation)
	// and flush w before returning. Returns implementation-specific Stats
	// that should implement slog.LogValuer for telemetry integration.
	Apply(ctx context.Context, r io.Reader, w io.Writer) (Stats, error)
}

Stage is a streaming session transform. Implementations read a jsonl stream from r, apply their transform, and write a jsonl stream to w.

Streaming is the canonical shape even for transforms that need cross-entry state: buffer internally, present an io.Reader at the boundary. That keeps this package ignorant of per-transform concerns.

type StageFunc

type StageFunc struct {
	StageName string
	Fn        func(ctx context.Context, r io.Reader, w io.Writer) (Stats, error)
}

StageFunc adapts a plain func into a Stage. Convenient when the transform logic lives as a package-level function.

func (StageFunc) Apply

func (s StageFunc) Apply(ctx context.Context, r io.Reader, w io.Writer) (Stats, error)

func (StageFunc) Name

func (s StageFunc) Name() string

type StageResult

type StageResult struct {
	Name  string
	Stats Stats
	Err   error
}

StageResult records one stage's outcome in a pipeline run.

func Run

func Run(ctx context.Context, src io.Reader, dst io.Writer, stages []Stage) ([]StageResult, error)

Run composes stages in order, feeding each stage's output into the next stage's input via io.Pipe. The first stage reads from src; the last stage writes to dst.

On any stage error, all running goroutines are torn down and the error is returned with per-stage results captured so far. Stages that completed successfully before the failure still appear in results.

Zero stages: src is copied to dst verbatim and nil results are returned. This lets callers treat an empty pipeline as a pass-through without a special case at the call site.

type Stats

type Stats interface {
	slog.LogValuer
}

Stats is what a Stage returns for logging / telemetry. Implementations should produce a compact structured representation via slog.LogValuer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL