pipeline

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

README

Pipeline

Package pipeline provides pipeline construction and execution: a linear chain of processors that flow frames downstream (and optionally upstream), plus a runner that wires transport I/O to the pipeline.

Purpose

  • Pipeline: Holds processors in order; Add/Link build the chain; Push injects frames into the first processor; Setup/Cleanup manage lifecycle.
  • Runner: Connects a Transport (input/output channels) to a Pipeline; starts transport and pipeline, pushes a StartFrame, forwards transport input through a queue into the pipeline, and sends pipeline output (from the sink) to transport. Runs until context is cancelled.
  • Source/Sink: Source reads from a channel and pushes downstream; Sink forwards downstream frames to a channel (used as the pipeline tail for transport output).
  • Task: PipelineTask runs a pipeline with a frame queue; callers queue frames (e.g. StartFrame, audio, EndFrame); StopWhenDone queues EndFrame; Cancel queues CancelFrame and cancels context.
  • Registry: RegisterProcessor and ProcessorsFromConfig build processors by name from config (plugins).
  • ParallelPipeline: Multiple sub-pipelines (branches) receive the same input; lifecycle frames are synchronized across branches; optional OutputFilter for branch output.
  • PipelineProcessor: Wraps a Pipeline as a Processor for use inside another pipeline (e.g. a branch of ParallelPipeline).

Exported symbols

Symbol Type Description
Pipeline struct Linear chain of processors; Add, Link, Processors, Setup, Cleanup, Push, PushUpstream, Start, Cancel, AddFromConfig
Runner struct Runs pipeline with transport; NewRunner, Run, Done
Transport interface Input() <-chan Frame, Output() chan<- Frame, Start(ctx), Close()
Source struct Processor that reads from In channel and pushes downstream; NewSource, Run
Sink struct Processor that forwards downstream frames to Out channel; NewSink, ProcessFrame
PipelineSource struct Entry point; downstream → next, upstream → OnUpstream; NewPipelineSource
PipelineSinkCallback struct Exit point; downstream → OnDownstream, upstream → prev; NewPipelineSinkCallback
DownstreamCallback type func(ctx, f Frame) error
UpstreamCallback type func(ctx, f Frame) error
Task interface Name, Run, QueueFrame, QueueFrames, StopWhenDone, Cancel, HasFinished
TaskParams struct Optional params for task run (reserved)
PipelineTask struct Task that runs a pipeline with a queue; NewPipelineTask, implements Task
ProcessorConstructor type func(name string, opts json.RawMessage) Processor
RegisterProcessor func Register constructor by name
ProcessorsFromConfig func Build processors from cfg.Plugins and cfg.PluginOptions
ParallelPipeline struct Multi-branch pipeline; NewParallelPipeline, SetOutputFilter; implements Processor
OutputFilter type func(f Frame) bool — if false, frame not forwarded from branch
PipelineProcessor struct Wraps a Pipeline as a Processor; NewPipelineProcessor
inputQueueCap const Buffer size (256) between transport read and pipeline push

Data flow

flowchart LR
    Transport["Transport\nInput channel"] --> Queue["Queue\ncap 256"]
    Queue --> Worker["Worker goroutine\nPipeline.Push"]
    Worker --> Chain["Processor 1\n→ ... → N"]
    Chain --> Sink["Sink"]
    Sink --> Out["Transport\nOutput channel"]
  • Reader goroutine: Reads from Transport.Input(), sends to a buffered queue (capacity inputQueueCap). Prevents transport read from blocking on pipeline processing.
  • Worker goroutine: Reads from queue, calls Pipeline.Push for each frame. Stops on ErrorFrame (fatal), CancelFrame, or context done.
  • Sink: Forwards downstream frames to Transport.Output() in a separate goroutine so the pipeline does not block on a slow transport.

Concurrency

  • Pipeline: Protected by sync.Mutex for Add/Link/Processors and startFrame; frame processing is single-threaded along the chain.
  • Runner: Spawns two goroutines in Run when Transport.Input() != nil: one reads from transport into a queue, one drains queue into Pipeline.Push. Main goroutine blocks on <-ctx.Done(). Done() returns a channel closed when Run returns.
  • PipelineTask: One goroutine in Run drains the queue into the pipeline; QueueFrame blocks if the drain is not reading. After Run returns, queue is closed and QueueFrame is a no-op.
  • Sink: Each downstream frame is sent to Out in a new goroutine to avoid blocking the pipeline on slow transport.
  • ParallelPipeline: Uses mutex for branch synchronization and buffering of lifecycle frames.

Files

File Description
pipeline.go Pipeline, New, Add, Link, Processors, Setup, Cleanup, Push, PushUpstream, Start, Cancel
runner.go Transport interface, Runner, NewRunner, Run, Done, inputQueueCap
source_sink.go Source, Sink, PipelineSource, PipelineSinkCallback, callbacks
task.go Task, TaskParams, PipelineTask, NewPipelineTask
registry.go ProcessorConstructor, RegisterProcessor, ProcessorsFromConfig, Pipeline.AddFromConfig
parallel.go ParallelPipeline, OutputFilter, NewParallelPipeline, SetOutputFilter
pipeline_processor.go PipelineProcessor, NewPipelineProcessor
sync_parallel.go Internal sync for ParallelPipeline lifecycle frames
service_switcher.go Service switcher (uses ParallelPipeline and output filter)
task_observer.go Task observer helpers

See also

Documentation

Overview

Package pipeline provides ParallelPipeline for concurrent frame processing.

Package pipeline provides pipeline construction and execution.

Package pipeline provides PipelineProcessor to use a Pipeline as a Processor node.

Package pipeline provides a processor registry for dynamic loading by name from config.

Package pipeline provides ServiceSwitcher and LLMSwitcher for runtime service switching.

Package pipeline provides SyncParallelPipeline for synchronized parallel frame processing.

Package pipeline provides Task and PipelineTask for queue-based pipeline execution.

Package pipeline provides TaskObserver, a proxy observer that processes observer events asynchronously.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ProcessorsFromConfig

func ProcessorsFromConfig(cfg *config.Config) ([]processors.Processor, error)

ProcessorsFromConfig returns a slice of processors for the plugin names in cfg.Plugins. Unknown names return an error. Each constructor receives opts from cfg.PluginOptions[name] (may be nil).

func RegisterProcessor

func RegisterProcessor(name string, ctor ProcessorConstructor)

RegisterProcessor registers a processor constructor by name. Used for dynamic loading from config. The constructor receives opts from cfg.PluginOptions[name]; opts may be nil.

Types

type DownstreamCallback

type DownstreamCallback func(ctx context.Context, f frames.Frame) error

DownstreamCallback is called when a frame is emitted downstream (e.g. from a sink).

type LLMSwitcher

type LLMSwitcher struct {
	*ServiceSwitcher
}

LLMSwitcher is a ServiceSwitcher for LLM services. It uses the same mechanics; use NewServiceSwitcher with LLM processor instances and a strategy. This type exists for API parity with upstream.

func NewLLMSwitcher

func NewLLMSwitcher(services []struct {
	Name      string
	Processor processors.Processor
}, strategy ServiceSwitcherStrategy) (*LLMSwitcher, error)

NewLLMSwitcher builds an LLMSwitcher from named LLM processors and a strategy.

type OutputFilter

type OutputFilter func(f frames.Frame) bool

OutputFilter if set is called before forwarding a frame from a branch. If it returns false the frame is not forwarded.

type ParallelPipeline

type ParallelPipeline struct {
	*processors.BaseProcessor
	// contains filtered or unexported fields
}

ParallelPipeline processes frames through multiple sub-pipelines concurrently. Each branch receives the same input. Lifecycle frames (Start, End, Cancel) are synchronized: the frame is pushed to all branches and forwarded once when all have processed it. Non-lifecycle frames are pushed to all branches and forwarded once (deduplicated by frame ID).

func NewParallelPipeline

func NewParallelPipeline(branches [][]processors.Processor) (*ParallelPipeline, error)

NewParallelPipeline builds a parallel pipeline from N branch definitions. Each branch is a slice of processors; each branch is wrapped with an internal source and sink so the parallel pipeline can inject input and collect output. At least one branch is required.

func (*ParallelPipeline) Branches

func (pp *ParallelPipeline) Branches() []*Pipeline

Branches returns the internal pipelines (read-only).

func (*ParallelPipeline) Cleanup

func (pp *ParallelPipeline) Cleanup(ctx context.Context) error

Cleanup calls Cleanup on all branch pipelines.

func (*ParallelPipeline) ProcessFrame

func (pp *ParallelPipeline) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame implements Processor. Downstream frames are pushed to all branches; lifecycle frames are synchronized and forwarded once.

func (*ParallelPipeline) Push

func (pp *ParallelPipeline) Push(ctx context.Context, f frames.Frame) error

Push sends a frame to all branches (downstream). Used by callers (e.g. ServiceSwitcher) to inject frames.

func (*ParallelPipeline) SetOutputFilter

func (pp *ParallelPipeline) SetOutputFilter(f OutputFilter)

SetOutputFilter sets an optional filter for frames emitted from branches. Used by ServiceSwitcher.

func (*ParallelPipeline) Setup

func (pp *ParallelPipeline) Setup(ctx context.Context) error

Setup calls Setup on all branch pipelines.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline holds a linear chain of processors and orchestrates the flow of frames through them. It manages the lifecycle of processors (Setup/Cleanup) and provides methods to inject frames. THREAD SAFETY: mu guards processors and startFrame; Push/PushUpstream may be called from multiple goroutines (e.g. runner worker and nested pipelines).

func New

func New() *Pipeline

New returns a new Pipeline.

func (*Pipeline) Add

func (p *Pipeline) Add(proc processors.Processor)

Add appends a processor and links it to the previous one.

func (*Pipeline) AddFromConfig

func (p *Pipeline) AddFromConfig(cfg *config.Config) error

AddFromConfig appends processors to the pipeline from cfg.Plugins (by name). Processors must be registered first.

func (*Pipeline) Cancel

func (p *Pipeline) Cancel(ctx context.Context, reason any) error

Cancel pushes a CancelFrame into the pipeline.

func (*Pipeline) Cleanup

func (p *Pipeline) Cleanup(ctx context.Context)

Cleanup calls Cleanup on all processors (reverse order).

func (p *Pipeline) Link(procs ...processors.Processor)

Link adds multiple processors in order (same as repeated Add).

func (*Pipeline) Processors

func (p *Pipeline) Processors() []processors.Processor

Processors returns a copy of the processor list.

func (*Pipeline) Push

func (p *Pipeline) Push(ctx context.Context, f frames.Frame) error

Push injects a frame into the first processor (downstream).

func (*Pipeline) PushUpstream

func (p *Pipeline) PushUpstream(ctx context.Context, f frames.Frame) error

PushUpstream injects a frame into the last processor (upstream). Used when the pipeline is nested (e.g. inside ParallelPipeline).

func (*Pipeline) Setup

func (p *Pipeline) Setup(ctx context.Context) error

Setup calls Setup on all processors.

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context, start *frames.StartFrame) error

Start pushes a StartFrame and stores it for reference; call once before feeding frames.

type PipelineProcessor

type PipelineProcessor struct {
	*processors.BaseProcessor
	Pipeline *Pipeline
	// contains filtered or unexported fields
}

PipelineProcessor wraps a Pipeline so it can be used as a Processor (e.g. inside ParallelPipeline).

func NewPipelineProcessor

func NewPipelineProcessor(name string, pl *Pipeline) *PipelineProcessor

NewPipelineProcessor returns a Processor that delegates to the given pipeline. name is used for Name(); if empty, "Pipeline" is used.

func (*PipelineProcessor) Cleanup

func (pp *PipelineProcessor) Cleanup(ctx context.Context) error

Cleanup calls Cleanup on the wrapped pipeline.

func (*PipelineProcessor) Name

func (pp *PipelineProcessor) Name() string

Name returns the processor name.

func (*PipelineProcessor) ProcessFrame

func (pp *PipelineProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame forwards downstream frames into the pipeline via Push, upstream frames via PushUpstream.

func (*PipelineProcessor) Setup

func (pp *PipelineProcessor) Setup(ctx context.Context) error

Setup calls Setup on the wrapped pipeline.

type PipelineSinkCallback

type PipelineSinkCallback struct {
	*processors.BaseProcessor
	OnDownstream DownstreamCallback
	// contains filtered or unexported fields
}

PipelineSinkCallback is a processor that acts as the exit point of a pipeline; it forwards upstream frames to the previous processor and downstream frames to the provided callback.

func NewPipelineSinkCallback

func NewPipelineSinkCallback(name string, onDownstream DownstreamCallback) *PipelineSinkCallback

NewPipelineSinkCallback returns a sink that forwards downstream frames to the callback.

func (*PipelineSinkCallback) ProcessFrame

ProcessFrame implements Processor. Upstream goes to prev; downstream goes to OnDownstream.

type PipelineSource

type PipelineSource struct {
	*processors.BaseProcessor
	OnUpstream UpstreamCallback
	// contains filtered or unexported fields
}

PipelineSource is a processor that acts as the entry point of a pipeline; it forwards downstream frames to the next processor and upstream frames to the provided callback.

func NewPipelineSource

func NewPipelineSource(name string, onUpstream UpstreamCallback) *PipelineSource

NewPipelineSource returns a source that forwards upstream frames to the callback.

func (*PipelineSource) ProcessFrame

func (s *PipelineSource) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame implements Processor. Downstream goes to next; upstream goes to OnUpstream.

type PipelineTask

type PipelineTask struct {
	// contains filtered or unexported fields
}

PipelineTask runs a pipeline with a frame queue. Frames are queued and drained into the pipeline. StopWhenDone pushes an EndFrame and stops when the pipeline has drained; Cancel pushes a CancelFrame.

func NewPipelineTask

func NewPipelineTask(name string, pl *Pipeline) *PipelineTask

NewPipelineTask creates a task that runs the given pipeline. The queue is unbuffered by default; QueueFrame blocks until the drain goroutine can receive. Call Run in a goroutine and then QueueFrame from other goroutines.

func (*PipelineTask) Cancel

func (t *PipelineTask) Cancel(ctx context.Context) error

Cancel implements Task. It queues a CancelFrame and cancels the context.

func (*PipelineTask) HasFinished

func (t *PipelineTask) HasFinished() bool

HasFinished implements Task.

func (*PipelineTask) Name

func (t *PipelineTask) Name() string

Name implements Task.

func (*PipelineTask) QueueFrame

func (t *PipelineTask) QueueFrame(ctx context.Context, f frames.Frame) error

QueueFrame implements Task. It sends the frame to the queue; blocks if the drain is not reading. After Run has returned, QueueFrame is a no-op.

func (*PipelineTask) QueueFrames

func (t *PipelineTask) QueueFrames(ctx context.Context, frames []frames.Frame) error

QueueFrames implements Task.

func (*PipelineTask) Run

func (t *PipelineTask) Run(ctx context.Context, params TaskParams) error

Run implements Task. It starts a goroutine that drains the queue into the pipeline, then blocks until ctx is done. The pipeline is not started (no StartFrame) by Run; the caller should queue a StartFrame first if needed.

func (*PipelineTask) StopWhenDone

func (t *PipelineTask) StopWhenDone(ctx context.Context) error

StopWhenDone implements Task. It queues an EndFrame so the pipeline drains and then stops.

type ProcessorConstructor

type ProcessorConstructor func(name string, opts json.RawMessage) processors.Processor

ProcessorConstructor builds a processor from a name and optional JSON options (nil = use defaults).

type Runner

type Runner struct {
	Pipeline   *Pipeline
	Transport  Transport
	StartFrame *frames.StartFrame // optional; if nil, NewStartFrame() is used in Run
	// InputQueueCap overrides the default input queue capacity when > 0 (see inputQueueCapDefault).
	InputQueueCap int
	// contains filtered or unexported fields
}

Runner builds a pipeline from processors and runs it with a transport. THREAD SAFETY: done and mu; Run is the only closer of done; Done() may be called from any goroutine.

func NewRunner

func NewRunner(pl *Pipeline, tr Transport, start *frames.StartFrame) *Runner

NewRunner returns a Runner that will run the given pipeline with the transport. If start is non-nil, it is pushed as the StartFrame when Run starts; otherwise frames.NewStartFrame() is used.

func (*Runner) Done

func (r *Runner) Done() <-chan struct{}

Done returns a channel that is closed when Run returns.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run starts the pipeline and transport, feeds input frames into the pipeline, and sends pipeline output to transport. It blocks until ctx is cancelled or a fatal error occurs. Caller can push frames into the pipeline from another goroutine; output is typically sent to Transport.Output() by the last processor (sink).

type ServiceSwitcher

type ServiceSwitcher struct {
	*ParallelPipeline
	// contains filtered or unexported fields
}

ServiceSwitcher is a ParallelPipeline that routes frames to one active service at a time. Each branch is [Filter(downstream), service, Filter(upstream)]; only the active branch passes frames.

func NewServiceSwitcher

func NewServiceSwitcher(services []struct {
	Name      string
	Processor processors.Processor
}, strategy ServiceSwitcherStrategy) (*ServiceSwitcher, error)

NewServiceSwitcher builds a service switcher from a list of named processors and a strategy.

func (*ServiceSwitcher) ProcessFrame

func (ss *ServiceSwitcher) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame implements Processor. Intercepts ManuallySwitchServiceFrame and requests metadata on switch.

type ServiceSwitcherStrategy

type ServiceSwitcherStrategy interface {
	ActiveServiceName() string
	// HandleFrame handles a service switcher frame (e.g. ManuallySwitchServiceFrame).
	// Returns true if the active service was changed.
	HandleFrame(ctx context.Context, f frames.Frame, dir processors.Direction) (switched bool)
}

ServiceSwitcherStrategy decides which service is active and handles switch frames.

type ServiceSwitcherStrategyManual

type ServiceSwitcherStrategyManual struct {
	// contains filtered or unexported fields
}

ServiceSwitcherStrategyManual keeps the first service active until a ManuallySwitchServiceFrame requests a switch.

func NewServiceSwitcherStrategyManual

func NewServiceSwitcherStrategyManual(serviceNames []string) *ServiceSwitcherStrategyManual

NewServiceSwitcherStrategyManual returns a strategy that starts with the first service and switches on ManuallySwitchServiceFrame.

func (*ServiceSwitcherStrategyManual) ActiveServiceName

func (s *ServiceSwitcherStrategyManual) ActiveServiceName() string

ActiveServiceName returns the currently active service name.

func (*ServiceSwitcherStrategyManual) HandleFrame

func (s *ServiceSwitcherStrategyManual) HandleFrame(ctx context.Context, f frames.Frame, dir processors.Direction) (switched bool)

HandleFrame implements ServiceSwitcherStrategy. On ManuallySwitchServiceFrame, switches if the name is in the list.

type Sink

type Sink struct {
	*processors.BaseProcessor
	Out chan<- frames.Frame
	// contains filtered or unexported fields
}

Sink is a processor that forwards all frames to a channel (for transport output).

func NewSink

func NewSink(name string, ch chan<- frames.Frame) *Sink

NewSink returns a Sink that writes to ch.

func (*Sink) ProcessFrame

func (s *Sink) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame forwards the frame to Out and does not call next (end of chain).

type Source

type Source struct {
	*processors.BaseProcessor
	In <-chan frames.Frame
}

Source is a processor that reads frames from a channel and pushes them downstream.

func NewSource

func NewSource(name string, ch <-chan frames.Frame) *Source

NewSource returns a Source that reads from ch.

func (*Source) Run

func (s *Source) Run(ctx context.Context)

Run reads from In and pushes to next until context is done or channel closed.

type SyncParallelPipeline

type SyncParallelPipeline struct {
	*processors.BaseProcessor
	// contains filtered or unexported fields
}

SyncParallelPipeline processes frames through multiple parallel branches and synchronizes their output so that we only proceed when all branches have processed the current frame. It uses SyncFrame so that after each frame we wait for all branches to emit SyncFrame.

func NewSyncParallelPipeline

func NewSyncParallelPipeline(branches [][]processors.Processor) (*SyncParallelPipeline, error)

NewSyncParallelPipeline builds a synchronous parallel pipeline. Each branch is a slice of processors; each branch is wrapped with a sink that writes to an internal channel. At least one branch is required.

func (*SyncParallelPipeline) Branches

func (sp *SyncParallelPipeline) Branches() []*Pipeline

Branches returns the internal pipelines (read-only).

func (*SyncParallelPipeline) Cleanup

func (sp *SyncParallelPipeline) Cleanup(ctx context.Context) error

Cleanup calls Cleanup on all branch pipelines.

func (*SyncParallelPipeline) ProcessFrame

func (sp *SyncParallelPipeline) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error

ProcessFrame implements Processor. It pushes the frame and then a SyncFrame to each branch, then waits until each branch has emitted SyncFrame (reading and collecting intermediate frames), then forwards collected frames with dedupe.

func (*SyncParallelPipeline) Setup

func (sp *SyncParallelPipeline) Setup(ctx context.Context) error

Setup calls Setup on all branch pipelines.

type Task

type Task interface {
	// Name returns the task name.
	Name() string
	// Run runs the task until context is cancelled or the task finishes. It blocks.
	Run(ctx context.Context, params TaskParams) error
	// QueueFrame queues a single frame for processing.
	QueueFrame(ctx context.Context, f frames.Frame) error
	// QueueFrames queues multiple frames for processing.
	QueueFrames(ctx context.Context, frames []frames.Frame) error
	// StopWhenDone schedules the task to stop after processing all queued frames (e.g. pushes EndFrame).
	StopWhenDone(ctx context.Context) error
	// Cancel stops the task immediately.
	Cancel(ctx context.Context) error
	// HasFinished returns true when the task has reached a terminal state.
	HasFinished() bool
}

Task is the interface for pipeline task execution with queue, lifecycle, and cancellation.

type TaskObserver

type TaskObserver struct {
	// contains filtered or unexported fields
}

TaskObserver is a proxy observer that queues OnFrameProcessed events and processes them in a background goroutine so slow observers do not block the pipeline.

func NewTaskObserver

func NewTaskObserver(observersList []observers.Observer, queueSize int) *TaskObserver

NewTaskObserver creates a TaskObserver that forwards events to the given observers asynchronously. Queue size defaults to defaultTaskObserverQueueSize. Start must be called before use.

func (*TaskObserver) AddObserver

func (t *TaskObserver) AddObserver(o observers.Observer)

AddObserver adds an observer to the list. Safe to call before or after Start.

func (*TaskObserver) OnFrameProcessed

func (t *TaskObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)

OnFrameProcessed implements observers.Observer. It queues the event; if the queue is full, it drops.

func (*TaskObserver) Start

func (t *TaskObserver) Start()

Start starts the background goroutine that drains the queue and notifies all observers.

func (*TaskObserver) Stop

func (t *TaskObserver) Stop(ctx context.Context)

Stop closes the queue and waits for the drain goroutine to finish.

type TaskParams

type TaskParams struct{}

TaskParams holds optional parameters for running a task (e.g. for future use).

type Transport

type Transport interface {
	// Input returns a channel of frames from the client (or nil if not used).
	Input() <-chan frames.Frame
	// Output sends frames to the client.
	Output() chan<- frames.Frame
	// Start starts the transport; block until Ready or error.
	Start(ctx context.Context) error
	// Close closes the transport.
	Close() error
}

Transport is the minimal interface for runner: input frames from transport, output frames to transport.

type UpstreamCallback

type UpstreamCallback func(ctx context.Context, f frames.Frame) error

UpstreamCallback is called when a frame is emitted upstream (e.g. from a source).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL