conduit

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Conduit

type Conduit struct {
	Name  string
	About string
	Input *jsonschema.Schema
	Reply *jsonschema.Schema
	// contains filtered or unexported fields
}

Conduit coordinates the flow: Source → Processor(s) → Sink. The pipeline is reusable across multiple source/sink pairs.

func New

func New(config *Config) *Conduit

New creates a new reusable processing pipeline. The pipeline can be run multiple times with different sources and sinks. Pass nil for config to use defaults (sequential, fail-fast).

func (*Conduit) AddProcessor

func (p *Conduit) AddProcessor(proc iosystem.Processor) *Conduit

AddProcessor adds a processing stage to the pipeline. Processors are applied in the order they are added.

func (*Conduit) Close

func (p *Conduit) Close() error

Close closes all processors. Note: Source and Sink are managed by the caller.

func (*Conduit) Cmd

func (p *Conduit) Cmd(ctx context.Context, json json.RawMessage) (*bytes.Buffer, error)

Cmd executes the pipeline in the context of MCP.

func (*Conduit) Run

func (p *Conduit) Run(ctx context.Context, source iosystem.Source, sink iosystem.Sink) (*Stats, error)

Run executes the pipeline with the given source and sink. The pipeline can be run multiple times with different sources and sinks. Returns statistics and an error if processing failed.

type Config

type Config struct {
	// Concurrency sets the number of parallel processing workers.
	// 0 or 1 means sequential processing.
	Concurrency int

	// ErrorMode determines how errors are handled.
	ErrorMode ErrorMode

	// Progress callback is invoked after each document is processed.
	Progress ProgressFunc

	// Metrics callback is invoked periodically with pipeline statistics.
	Metrics MetricsFunc
}

Config configures pipeline behavior.

type ErrorMode

type ErrorMode int

ErrorMode defines error handling strategies.

const (
	// FailFast stops the pipeline on first error.
	FailFast ErrorMode = iota

	// SkipError continues processing remaining documents after errors.
	SkipError
)

type MetricsFunc

type MetricsFunc func(stats Stats)

MetricsFunc is called periodically with pipeline statistics.

type ProgressFunc

type ProgressFunc func(doc *iosystem.Document, err error)

ProgressFunc is called after processing each document.

type Stats

type Stats struct {
	DocsProcessed int       // Successfully processed documents
	DocsSkipped   int       // Documents skipped due to errors
	BytesRead     int64     // Total bytes read from source
	BytesWritten  int64     // Total bytes written to sink
	Errors        []error   // Collected errors
	Created       time.Time // When processing started
	Stopped       time.Time // When processing ended
}

Stats tracks processing metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL