Documentation
¶
Overview ¶
Package pipeline provides ParallelPipeline for concurrent frame processing.
Package pipeline provides pipeline construction and execution.
Package pipeline provides PipelineProcessor to use a Pipeline as a Processor node.
Package pipeline provides a processor registry for dynamic loading by name from config.
Package pipeline provides ServiceSwitcher and LLMSwitcher for runtime service switching.
Package pipeline provides SyncParallelPipeline for synchronized parallel frame processing.
Package pipeline provides Task and PipelineTask for queue-based pipeline execution.
Package pipeline provides TaskObserver, a proxy observer that processes observer events asynchronously.
Index ¶
- func ProcessorsFromConfig(cfg *config.Config) ([]processors.Processor, error)
- func RegisterProcessor(name string, ctor ProcessorConstructor)
- type DownstreamCallback
- type LLMSwitcher
- type OutputFilter
- type ParallelPipeline
- func (pp *ParallelPipeline) Branches() []*Pipeline
- func (pp *ParallelPipeline) Cleanup(ctx context.Context) error
- func (pp *ParallelPipeline) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
- func (pp *ParallelPipeline) Push(ctx context.Context, f frames.Frame) error
- func (pp *ParallelPipeline) SetOutputFilter(f OutputFilter)
- func (pp *ParallelPipeline) Setup(ctx context.Context) error
- type Pipeline
- func (p *Pipeline) Add(proc processors.Processor)
- func (p *Pipeline) AddFromConfig(cfg *config.Config) error
- func (p *Pipeline) Cancel(ctx context.Context, reason any) error
- func (p *Pipeline) Cleanup(ctx context.Context)
- func (p *Pipeline) Link(procs ...processors.Processor)
- func (p *Pipeline) Processors() []processors.Processor
- func (p *Pipeline) Push(ctx context.Context, f frames.Frame) error
- func (p *Pipeline) PushUpstream(ctx context.Context, f frames.Frame) error
- func (p *Pipeline) Setup(ctx context.Context) error
- func (p *Pipeline) Start(ctx context.Context, start *frames.StartFrame) error
- type PipelineProcessor
- type PipelineSinkCallback
- type PipelineSource
- type PipelineTask
- func (t *PipelineTask) Cancel(ctx context.Context) error
- func (t *PipelineTask) HasFinished() bool
- func (t *PipelineTask) Name() string
- func (t *PipelineTask) QueueFrame(ctx context.Context, f frames.Frame) error
- func (t *PipelineTask) QueueFrames(ctx context.Context, frames []frames.Frame) error
- func (t *PipelineTask) Run(ctx context.Context, params TaskParams) error
- func (t *PipelineTask) StopWhenDone(ctx context.Context) error
- type ProcessorConstructor
- type Runner
- type ServiceSwitcher
- type ServiceSwitcherStrategy
- type ServiceSwitcherStrategyManual
- type Sink
- type Source
- type SyncParallelPipeline
- func (sp *SyncParallelPipeline) Branches() []*Pipeline
- func (sp *SyncParallelPipeline) Cleanup(ctx context.Context) error
- func (sp *SyncParallelPipeline) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
- func (sp *SyncParallelPipeline) Setup(ctx context.Context) error
- type Task
- type TaskObserver
- type TaskParams
- type Transport
- type UpstreamCallback
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ProcessorsFromConfig ¶
func ProcessorsFromConfig(cfg *config.Config) ([]processors.Processor, error)
ProcessorsFromConfig returns a slice of processors for the plugin names in cfg.Plugins. Unknown names return an error. Each constructor receives opts from cfg.PluginOptions[name] (may be nil).
func RegisterProcessor ¶
func RegisterProcessor(name string, ctor ProcessorConstructor)
RegisterProcessor registers a processor constructor by name. Used for dynamic loading from config. The constructor receives opts from cfg.PluginOptions[name]; opts may be nil.
Types ¶
type DownstreamCallback ¶
DownstreamCallback is called when a frame is emitted downstream (e.g. from a sink).
type LLMSwitcher ¶
type LLMSwitcher struct {
*ServiceSwitcher
}
LLMSwitcher is a ServiceSwitcher for LLM services. It uses the same mechanics; use NewServiceSwitcher with LLM processor instances and a strategy. This type exists for API parity with upstream.
func NewLLMSwitcher ¶
func NewLLMSwitcher(services []struct {
Name string
Processor processors.Processor
}, strategy ServiceSwitcherStrategy) (*LLMSwitcher, error)
NewLLMSwitcher builds an LLMSwitcher from named LLM processors and a strategy.
type OutputFilter ¶
OutputFilter if set is called before forwarding a frame from a branch. If it returns false the frame is not forwarded.
type ParallelPipeline ¶
type ParallelPipeline struct {
*processors.BaseProcessor
// contains filtered or unexported fields
}
ParallelPipeline processes frames through multiple sub-pipelines concurrently. Each branch receives the same input. Lifecycle frames (Start, End, Cancel) are synchronized: the frame is pushed to all branches and forwarded once when all have processed it. Non-lifecycle frames are pushed to all branches and forwarded once (deduplicated by frame ID).
func NewParallelPipeline ¶
func NewParallelPipeline(branches [][]processors.Processor) (*ParallelPipeline, error)
NewParallelPipeline builds a parallel pipeline from N branch definitions. Each branch is a slice of processors; each branch is wrapped with an internal source and sink so the parallel pipeline can inject input and collect output. At least one branch is required.
func (*ParallelPipeline) Branches ¶
func (pp *ParallelPipeline) Branches() []*Pipeline
Branches returns the internal pipelines (read-only).
func (*ParallelPipeline) Cleanup ¶
func (pp *ParallelPipeline) Cleanup(ctx context.Context) error
Cleanup calls Cleanup on all branch pipelines.
func (*ParallelPipeline) ProcessFrame ¶
func (pp *ParallelPipeline) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame implements Processor. Downstream frames are pushed to all branches; lifecycle frames are synchronized and forwarded once.
func (*ParallelPipeline) Push ¶
Push sends a frame to all branches (downstream). Used by callers (e.g. ServiceSwitcher) to inject frames.
func (*ParallelPipeline) SetOutputFilter ¶
func (pp *ParallelPipeline) SetOutputFilter(f OutputFilter)
SetOutputFilter sets an optional filter for frames emitted from branches. Used by ServiceSwitcher.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline holds a linear chain of processors and orchestrates the flow of frames through them. It manages the lifecycle of processors (Setup/Cleanup) and provides methods to inject frames. THREAD SAFETY: mu guards processors and startFrame; Push/PushUpstream may be called from multiple goroutines (e.g. runner worker and nested pipelines).
func (*Pipeline) Add ¶
func (p *Pipeline) Add(proc processors.Processor)
Add appends a processor and links it to the previous one.
func (*Pipeline) AddFromConfig ¶
AddFromConfig appends processors to the pipeline from cfg.Plugins (by name). Processors must be registered first.
func (*Pipeline) Link ¶
func (p *Pipeline) Link(procs ...processors.Processor)
Link adds multiple processors in order (same as repeated Add).
func (*Pipeline) Processors ¶
func (p *Pipeline) Processors() []processors.Processor
Processors returns a copy of the processor list.
func (*Pipeline) PushUpstream ¶
PushUpstream injects a frame into the last processor (upstream). Used when the pipeline is nested (e.g. inside ParallelPipeline).
type PipelineProcessor ¶
type PipelineProcessor struct {
*processors.BaseProcessor
Pipeline *Pipeline
// contains filtered or unexported fields
}
PipelineProcessor wraps a Pipeline so it can be used as a Processor (e.g. inside ParallelPipeline).
func NewPipelineProcessor ¶
func NewPipelineProcessor(name string, pl *Pipeline) *PipelineProcessor
NewPipelineProcessor returns a Processor that delegates to the given pipeline. name is used for Name(); if empty, "Pipeline" is used.
func (*PipelineProcessor) Cleanup ¶
func (pp *PipelineProcessor) Cleanup(ctx context.Context) error
Cleanup calls Cleanup on the wrapped pipeline.
func (*PipelineProcessor) Name ¶
func (pp *PipelineProcessor) Name() string
Name returns the processor name.
func (*PipelineProcessor) ProcessFrame ¶
func (pp *PipelineProcessor) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame forwards downstream frames into the pipeline via Push, upstream frames via PushUpstream.
type PipelineSinkCallback ¶
type PipelineSinkCallback struct {
*processors.BaseProcessor
OnDownstream DownstreamCallback
// contains filtered or unexported fields
}
PipelineSinkCallback is a processor that acts as the exit point of a pipeline; it forwards upstream frames to the previous processor and downstream frames to the provided callback.
func NewPipelineSinkCallback ¶
func NewPipelineSinkCallback(name string, onDownstream DownstreamCallback) *PipelineSinkCallback
NewPipelineSinkCallback returns a sink that forwards downstream frames to the callback.
func (*PipelineSinkCallback) ProcessFrame ¶
func (s *PipelineSinkCallback) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame implements Processor. Upstream goes to prev; downstream goes to OnDownstream.
type PipelineSource ¶
type PipelineSource struct {
*processors.BaseProcessor
OnUpstream UpstreamCallback
// contains filtered or unexported fields
}
PipelineSource is a processor that acts as the entry point of a pipeline; it forwards downstream frames to the next processor and upstream frames to the provided callback.
func NewPipelineSource ¶
func NewPipelineSource(name string, onUpstream UpstreamCallback) *PipelineSource
NewPipelineSource returns a source that forwards upstream frames to the callback.
func (*PipelineSource) ProcessFrame ¶
func (s *PipelineSource) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame implements Processor. Downstream goes to next; upstream goes to OnUpstream.
type PipelineTask ¶
type PipelineTask struct {
// contains filtered or unexported fields
}
PipelineTask runs a pipeline with a frame queue. Frames are queued and drained into the pipeline. StopWhenDone pushes an EndFrame and stops when the pipeline has drained; Cancel pushes a CancelFrame.
func NewPipelineTask ¶
func NewPipelineTask(name string, pl *Pipeline) *PipelineTask
NewPipelineTask creates a task that runs the given pipeline. The queue is unbuffered by default; QueueFrame blocks until the drain goroutine can receive. Call Run in a goroutine and then QueueFrame from other goroutines.
func (*PipelineTask) Cancel ¶
func (t *PipelineTask) Cancel(ctx context.Context) error
Cancel implements Task. It queues a CancelFrame and cancels the context.
func (*PipelineTask) HasFinished ¶
func (t *PipelineTask) HasFinished() bool
HasFinished implements Task.
func (*PipelineTask) QueueFrame ¶
QueueFrame implements Task. It sends the frame to the queue; blocks if the drain is not reading. After Run has returned, QueueFrame is a no-op.
func (*PipelineTask) QueueFrames ¶
QueueFrames implements Task.
func (*PipelineTask) Run ¶
func (t *PipelineTask) Run(ctx context.Context, params TaskParams) error
Run implements Task. It starts a goroutine that drains the queue into the pipeline, then blocks until ctx is done. The pipeline is not started (no StartFrame) by Run; the caller should queue a StartFrame first if needed.
func (*PipelineTask) StopWhenDone ¶
func (t *PipelineTask) StopWhenDone(ctx context.Context) error
StopWhenDone implements Task. It queues an EndFrame so the pipeline drains and then stops.
type ProcessorConstructor ¶
type ProcessorConstructor func(name string, opts json.RawMessage) processors.Processor
ProcessorConstructor builds a processor from a name and optional JSON options (nil = use defaults).
type Runner ¶
type Runner struct {
Pipeline *Pipeline
Transport Transport
StartFrame *frames.StartFrame // optional; if nil, NewStartFrame() is used in Run
// InputQueueCap overrides the default input queue capacity when > 0 (see inputQueueCapDefault).
InputQueueCap int
// contains filtered or unexported fields
}
Runner builds a pipeline from processors and runs it with a transport. THREAD SAFETY: done and mu; Run is the only closer of done; Done() may be called from any goroutine.
func NewRunner ¶
func NewRunner(pl *Pipeline, tr Transport, start *frames.StartFrame) *Runner
NewRunner returns a Runner that will run the given pipeline with the transport. If start is non-nil, it is pushed as the StartFrame when Run starts; otherwise frames.NewStartFrame() is used.
func (*Runner) Done ¶
func (r *Runner) Done() <-chan struct{}
Done returns a channel that is closed when Run returns.
func (*Runner) Run ¶
Run starts the pipeline and transport, feeds input frames into the pipeline, and sends pipeline output to transport. It blocks until ctx is cancelled or a fatal error occurs. Caller can push frames into the pipeline from another goroutine; output is typically sent to Transport.Output() by the last processor (sink).
type ServiceSwitcher ¶
type ServiceSwitcher struct {
*ParallelPipeline
// contains filtered or unexported fields
}
ServiceSwitcher is a ParallelPipeline that routes frames to one active service at a time. Each branch is [Filter(downstream), service, Filter(upstream)]; only the active branch passes frames.
func NewServiceSwitcher ¶
func NewServiceSwitcher(services []struct {
Name string
Processor processors.Processor
}, strategy ServiceSwitcherStrategy) (*ServiceSwitcher, error)
NewServiceSwitcher builds a service switcher from a list of named processors and a strategy.
func (*ServiceSwitcher) ProcessFrame ¶
func (ss *ServiceSwitcher) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame implements Processor. Intercepts ManuallySwitchServiceFrame and requests metadata on switch.
type ServiceSwitcherStrategy ¶
type ServiceSwitcherStrategy interface {
ActiveServiceName() string
// HandleFrame handles a service switcher frame (e.g. ManuallySwitchServiceFrame).
// Returns true if the active service was changed.
HandleFrame(ctx context.Context, f frames.Frame, dir processors.Direction) (switched bool)
}
ServiceSwitcherStrategy decides which service is active and handles switch frames.
type ServiceSwitcherStrategyManual ¶
type ServiceSwitcherStrategyManual struct {
// contains filtered or unexported fields
}
ServiceSwitcherStrategyManual keeps the first service active until a ManuallySwitchServiceFrame requests a switch.
func NewServiceSwitcherStrategyManual ¶
func NewServiceSwitcherStrategyManual(serviceNames []string) *ServiceSwitcherStrategyManual
NewServiceSwitcherStrategyManual returns a strategy that starts with the first service and switches on ManuallySwitchServiceFrame.
func (*ServiceSwitcherStrategyManual) ActiveServiceName ¶
func (s *ServiceSwitcherStrategyManual) ActiveServiceName() string
ActiveServiceName returns the currently active service name.
func (*ServiceSwitcherStrategyManual) HandleFrame ¶
func (s *ServiceSwitcherStrategyManual) HandleFrame(ctx context.Context, f frames.Frame, dir processors.Direction) (switched bool)
HandleFrame implements ServiceSwitcherStrategy. On ManuallySwitchServiceFrame, switches if the name is in the list.
type Sink ¶
type Sink struct {
*processors.BaseProcessor
Out chan<- frames.Frame
// contains filtered or unexported fields
}
Sink is a processor that forwards all frames to a channel (for transport output).
func (*Sink) ProcessFrame ¶
ProcessFrame forwards the frame to Out and does not call next (end of chain).
type Source ¶
type Source struct {
*processors.BaseProcessor
In <-chan frames.Frame
}
Source is a processor that reads frames from a channel and pushes them downstream.
type SyncParallelPipeline ¶
type SyncParallelPipeline struct {
*processors.BaseProcessor
// contains filtered or unexported fields
}
SyncParallelPipeline processes frames through multiple parallel branches and synchronizes their output so that we only proceed when all branches have processed the current frame. It uses SyncFrame so that after each frame we wait for all branches to emit SyncFrame.
func NewSyncParallelPipeline ¶
func NewSyncParallelPipeline(branches [][]processors.Processor) (*SyncParallelPipeline, error)
NewSyncParallelPipeline builds a synchronous parallel pipeline. Each branch is a slice of processors; each branch is wrapped with a sink that writes to an internal channel. At least one branch is required.
func (*SyncParallelPipeline) Branches ¶
func (sp *SyncParallelPipeline) Branches() []*Pipeline
Branches returns the internal pipelines (read-only).
func (*SyncParallelPipeline) Cleanup ¶
func (sp *SyncParallelPipeline) Cleanup(ctx context.Context) error
Cleanup calls Cleanup on all branch pipelines.
func (*SyncParallelPipeline) ProcessFrame ¶
func (sp *SyncParallelPipeline) ProcessFrame(ctx context.Context, f frames.Frame, dir processors.Direction) error
ProcessFrame implements Processor. It pushes the frame and then a SyncFrame to each branch, then waits until each branch has emitted SyncFrame (reading and collecting intermediate frames), then forwards collected frames with dedupe.
type Task ¶
type Task interface {
// Name returns the task name.
Name() string
// Run runs the task until context is cancelled or the task finishes. It blocks.
Run(ctx context.Context, params TaskParams) error
// QueueFrame queues a single frame for processing.
QueueFrame(ctx context.Context, f frames.Frame) error
// QueueFrames queues multiple frames for processing.
QueueFrames(ctx context.Context, frames []frames.Frame) error
// StopWhenDone schedules the task to stop after processing all queued frames (e.g. pushes EndFrame).
StopWhenDone(ctx context.Context) error
// Cancel stops the task immediately.
Cancel(ctx context.Context) error
// HasFinished returns true when the task has reached a terminal state.
HasFinished() bool
}
Task is the interface for pipeline task execution with queue, lifecycle, and cancellation.
type TaskObserver ¶
type TaskObserver struct {
// contains filtered or unexported fields
}
TaskObserver is a proxy observer that queues OnFrameProcessed events and processes them in a background goroutine so slow observers do not block the pipeline.
func NewTaskObserver ¶
func NewTaskObserver(observersList []observers.Observer, queueSize int) *TaskObserver
NewTaskObserver creates a TaskObserver that forwards events to the given observers asynchronously. Queue size defaults to defaultTaskObserverQueueSize. Start must be called before use.
func (*TaskObserver) AddObserver ¶
func (t *TaskObserver) AddObserver(o observers.Observer)
AddObserver adds an observer to the list. Safe to call before or after Start.
func (*TaskObserver) OnFrameProcessed ¶
func (t *TaskObserver) OnFrameProcessed(processorName string, f frames.Frame, dir processors.Direction)
OnFrameProcessed implements observers.Observer. It queues the event; if the queue is full, it drops.
func (*TaskObserver) Start ¶
func (t *TaskObserver) Start()
Start starts the background goroutine that drains the queue and notifies all observers.
func (*TaskObserver) Stop ¶
func (t *TaskObserver) Stop(ctx context.Context)
Stop closes the queue and waits for the drain goroutine to finish.
type TaskParams ¶
type TaskParams struct{}
TaskParams holds optional parameters for running a task (e.g. for future use).
type Transport ¶
type Transport interface {
// Input returns a channel of frames from the client (or nil if not used).
Input() <-chan frames.Frame
// Output sends frames to the client.
Output() chan<- frames.Frame
// Start starts the transport; block until Ready or error.
Start(ctx context.Context) error
// Close closes the transport.
Close() error
}
Transport is the minimal interface for runner: input frames from transport, output frames to transport.