pipe

package
v0.4.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Package pipe provides the Processor interface and built-in processors for transforming Frame streams in a Pipeline.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline chains Processors into a concurrent processing graph. Each Processor runs in its own goroutine with backpressure-aware channels connecting the stages.

input → [Proc A] → [Proc B] → [Proc C] → output
         goroutine   goroutine   goroutine

Canceling the context tears down the entire pipeline. An error in any stage cancels all other stages.

func New

func New(procs ...Processor) *Pipeline

New creates a Pipeline from the given Processors. Processors are executed in order: the output of each feeds into the input of the next.

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context, in *niro.Stream) *niro.Stream

Run starts the pipeline. It reads Frames from the input Stream and returns a new Stream containing the output of the final Processor.

Each Processor runs in its own goroutine. The pipeline self-destructs when the context is canceled or any processor returns an error.

The returned Stream is safe to iterate immediately.

func (*Pipeline) WithBuffer

func (p *Pipeline) WithBuffer(size int) *Pipeline

WithBuffer sets the channel buffer size between pipeline stages. Default is 16.

  • 0: fully synchronous — minimum latency, maximum backpressure
  • 16: good default for streaming
  • 64+: high-throughput batch scenarios

type Processor

type Processor interface {
	Process(ctx context.Context, in *niro.Stream, out *niro.Emitter) error
}

Processor transforms Frames from an input Stream to an output Emitter. It is the fundamental unit of composition in a Niro pipeline.

Contracts:

  • Process must not close the Emitter — the Pipeline manages that.
  • Process should return when ctx is canceled or the input stream ends.
  • Errors returned from Process are propagated to the output stream.
  • Process runs in its own goroutine when used in a Pipeline.

func Accumulate

func Accumulate() Processor

Accumulate creates a Processor that collects all text into a single frame emitted at the end of the stream. Useful for converting a token stream into a complete response.

func Filter

func Filter(fn func(niro.Frame) bool) Processor

Filter creates a Processor that only forwards frames matching the predicate.

func Map

func Map(fn func(niro.Frame) niro.Frame) Processor

Map creates a Processor that transforms each frame through fn.

func PassThrough

func PassThrough() Processor

PassThrough creates a Processor that forwards all frames unchanged.

func Tap

func Tap(fn func(niro.Frame)) Processor

Tap creates a Processor that calls fn for each frame as a side effect without modifying the stream. Useful for logging, metrics, or debugging.

func TextOnly

func TextOnly() Processor

TextOnly creates a Processor that only forwards KindText frames.

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, in *niro.Stream, out *niro.Emitter) error

ProcessorFunc adapts a plain function to the Processor interface.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(ctx context.Context, in *niro.Stream, out *niro.Emitter) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL