streamx

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package streamx provides generic stream combinator primitives built on Go iterators.

Stream[T] is a typed event stream defined as iter.Seq2[T, error]. Combinators such as Merge, Race, FanIn, and Drain compose streams with context-aware cancellation and clean goroutine lifecycle.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Drain

func Drain[T any](stream Stream[T]) ([]T, error)

Drain consumes a stream completely, returning all events as a slice. Iteration stops on the first error; collected events up to that point are returned along with the error.

func FanIn

func FanIn[T any](ctx context.Context, streams map[string]Stream[T]) (map[string][]T, error)

FanIn collects ALL events from N named streams, returning them grouped by source name. It waits for all streams to complete. The first error from any stream causes all others to be cancelled and the error is returned.

func IsEligible

func IsEligible(t *agent.Tool) bool

IsEligible checks if a tool can be executed in parallel. A tool is eligible when both Capability.ReadOnly and Capability.ConcurrencySafe are true.

Types

type AgentStreamFanIn

type AgentStreamFanIn struct {
	// contains filtered or unexported fields
}

AgentStreamFanIn merges multiple child agent output streams into a single tagged stream, emitting progress events for child lifecycle via ProgressBus.

func NewAgentStreamFanIn

func NewAgentStreamFanIn(parent string, bus *ProgressBus) *AgentStreamFanIn

NewAgentStreamFanIn creates a fan-in merger for child agent streams. If bus is nil, progress emission is skipped.

func (*AgentStreamFanIn) AddChild

func (f *AgentStreamFanIn) AddChild(childID string, stream Stream[string])

AddChild registers a child agent's output stream.

func (*AgentStreamFanIn) MergedStream

func (f *AgentStreamFanIn) MergedStream(ctx context.Context) Stream[Tag[string]]

MergedStream returns a single stream of tagged events from all children. It emits ProgressStarted for each child when merging begins and ProgressCompleted/Failed per child as they finish.

type ParallelReadOnlyExecutor

type ParallelReadOnlyExecutor struct {
	// contains filtered or unexported fields
}

ParallelReadOnlyExecutor executes eligible read-only, concurrency-safe tools concurrently.

func NewParallelReadOnlyExecutor

func NewParallelReadOnlyExecutor(maxConcurrency int) *ParallelReadOnlyExecutor

NewParallelReadOnlyExecutor creates a new executor with the given concurrency limit. If maxConcurrency is less than 1, it defaults to 1.

func (*ParallelReadOnlyExecutor) ExecuteParallel

func (e *ParallelReadOnlyExecutor) ExecuteParallel(ctx context.Context, invocations []ToolInvocation) []ToolResult

ExecuteParallel runs eligible tools concurrently, respecting the configured concurrency limit. Non-eligible tools are rejected with an error in the result without being executed. Results are returned in the same order as invocations. Context cancellation stops pending invocations.

type ProgressBus

type ProgressBus struct {
	// contains filtered or unexported fields
}

ProgressBus provides pub/sub for progress events.

func NewProgressBus

func NewProgressBus() *ProgressBus

NewProgressBus creates a new ProgressBus.

func (*ProgressBus) Emit

func (b *ProgressBus) Emit(event ProgressEvent)

Emit publishes a progress event to all matching subscribers. Non-blocking: if a subscriber's buffer is full, the event is dropped.

func (*ProgressBus) Subscribe

func (b *ProgressBus) Subscribe(filter string) (<-chan ProgressEvent, func())

Subscribe returns a channel that receives events matching the filter prefix. Call the returned cancel func to unsubscribe and close the channel.

func (*ProgressBus) SubscribeAll

func (b *ProgressBus) SubscribeAll() (<-chan ProgressEvent, func())

SubscribeAll returns a channel receiving all events.

type ProgressEvent

type ProgressEvent struct {
	Source   string // e.g. "tool:web_search", "agent:operator", "bg:task-123"
	Type     ProgressType
	Message  string         // human-readable progress text
	Progress float64        // 0.0 to 1.0, or -1 if not applicable
	Metadata map[string]any // optional additional data
}

ProgressEvent represents a progress update from any source.

type ProgressType

type ProgressType string

ProgressType classifies progress events.

const (
	ProgressStarted   ProgressType = "started"
	ProgressUpdate    ProgressType = "update"
	ProgressCompleted ProgressType = "completed"
	ProgressFailed    ProgressType = "failed"
)

type Stream

type Stream[T any] iter.Seq2[T, error]

Stream is a typed event stream built on Go iterators. Each iteration yields either a valid event T or a non-nil error.

func Merge

func Merge[T any](ctx context.Context, streams map[string]Stream[T]) Stream[Tag[T]]

Merge takes N named streams and yields tagged events as they arrive from any stream. One goroutine is launched per stream; all send to a shared channel. Context cancellation stops all goroutines cleanly.

func Race

func Race[T any](ctx context.Context, streams map[string]Stream[T]) Stream[Tag[T]]

Race takes N named streams and yields all events from the first stream to produce a value. Once one stream wins, the others are cancelled via context and the winner is drained completely.

Loser goroutines exit when their stream returns or when the parent context is cancelled. Callers should use a cancellable context if streams may block.

type Tag

type Tag[T any] struct {
	Source string
	Event  T
}

Tag wraps an event with its source identifier.

type ToolInvocation

type ToolInvocation struct {
	Tool   *agent.Tool
	Params map[string]any
}

ToolInvocation represents a single tool call request.

type ToolResult

type ToolResult struct {
	ToolName string
	Result   any
	Error    error
	Duration time.Duration
}

ToolResult represents the outcome of a tool invocation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL