Documentation
¶
Overview ¶
Package streamx provides generic stream combinator primitives built on Go iterators.
Stream[T] is a typed event stream defined as iter.Seq2[T, error]. Combinators such as Merge, Race, FanIn, and Drain compose streams with context-aware cancellation and clean goroutine lifecycle.
Index ¶
- func Drain[T any](stream Stream[T]) ([]T, error)
- func FanIn[T any](ctx context.Context, streams map[string]Stream[T]) (map[string][]T, error)
- func IsEligible(t *agent.Tool) bool
- type AgentStreamFanIn
- type ParallelReadOnlyExecutor
- type ProgressBus
- type ProgressEvent
- type ProgressType
- type Stream
- type Tag
- type ToolInvocation
- type ToolResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Drain ¶
Drain consumes a stream completely, returning all events as a slice. Iteration stops on the first error; collected events up to that point are returned along with the error.
func FanIn ¶
FanIn collects ALL events from N named streams, returning them grouped by source name. It waits for all streams to complete. The first error from any stream causes all others to be cancelled and the error is returned.
func IsEligible ¶
IsEligible checks if a tool can be executed in parallel. A tool is eligible when both Capability.ReadOnly and Capability.ConcurrencySafe are true.
Types ¶
type AgentStreamFanIn ¶
type AgentStreamFanIn struct {
// contains filtered or unexported fields
}
AgentStreamFanIn merges multiple child agent output streams into a single tagged stream, emitting progress events for child lifecycle via ProgressBus.
func NewAgentStreamFanIn ¶
func NewAgentStreamFanIn(parent string, bus *ProgressBus) *AgentStreamFanIn
NewAgentStreamFanIn creates a fan-in merger for child agent streams. If bus is nil, progress emission is skipped.
func (*AgentStreamFanIn) AddChild ¶
func (f *AgentStreamFanIn) AddChild(childID string, stream Stream[string])
AddChild registers a child agent's output stream.
func (*AgentStreamFanIn) MergedStream ¶
MergedStream returns a single stream of tagged events from all children. It emits ProgressStarted for each child when merging begins and ProgressCompleted/Failed per child as they finish.
type ParallelReadOnlyExecutor ¶
type ParallelReadOnlyExecutor struct {
// contains filtered or unexported fields
}
ParallelReadOnlyExecutor executes eligible read-only, concurrency-safe tools concurrently.
func NewParallelReadOnlyExecutor ¶
func NewParallelReadOnlyExecutor(maxConcurrency int) *ParallelReadOnlyExecutor
NewParallelReadOnlyExecutor creates a new executor with the given concurrency limit. If maxConcurrency is less than 1, it defaults to 1.
func (*ParallelReadOnlyExecutor) ExecuteParallel ¶
func (e *ParallelReadOnlyExecutor) ExecuteParallel(ctx context.Context, invocations []ToolInvocation) []ToolResult
ExecuteParallel runs eligible tools concurrently, respecting the configured concurrency limit. Non-eligible tools are rejected with an error in the result without being executed. Results are returned in the same order as invocations. Context cancellation stops pending invocations.
type ProgressBus ¶
type ProgressBus struct {
// contains filtered or unexported fields
}
ProgressBus provides pub/sub for progress events.
func (*ProgressBus) Emit ¶
func (b *ProgressBus) Emit(event ProgressEvent)
Emit publishes a progress event to all matching subscribers. Non-blocking: if a subscriber's buffer is full, the event is dropped.
func (*ProgressBus) Subscribe ¶
func (b *ProgressBus) Subscribe(filter string) (<-chan ProgressEvent, func())
Subscribe returns a channel that receives events matching the filter prefix. Call the returned cancel func to unsubscribe and close the channel.
func (*ProgressBus) SubscribeAll ¶
func (b *ProgressBus) SubscribeAll() (<-chan ProgressEvent, func())
SubscribeAll returns a channel receiving all events.
type ProgressEvent ¶
type ProgressEvent struct {
Source string // e.g. "tool:web_search", "agent:operator", "bg:task-123"
Type ProgressType
Message string // human-readable progress text
Progress float64 // 0.0 to 1.0, or -1 if not applicable
Metadata map[string]any // optional additional data
}
ProgressEvent represents a progress update from any source.
type ProgressType ¶
type ProgressType string
ProgressType classifies progress events.
const ( ProgressStarted ProgressType = "started" ProgressUpdate ProgressType = "update" ProgressCompleted ProgressType = "completed" ProgressFailed ProgressType = "failed" )
type Stream ¶
Stream is a typed event stream built on Go iterators. Each iteration yields either a valid event T or a non-nil error.
func Merge ¶
Merge takes N named streams and yields tagged events as they arrive from any stream. One goroutine is launched per stream; all send to a shared channel. Context cancellation stops all goroutines cleanly.
func Race ¶
Race takes N named streams and yields all events from the first stream to produce a value. Once one stream wins, the others are cancelled via context and the winner is drained completely.
Loser goroutines exit when their stream returns or when the parent context is cancelled. Callers should use a cancellable context if streams may block.
type ToolInvocation ¶
ToolInvocation represents a single tool call request.