Documentation
¶
Overview ¶
Package queue defines the pluggable interfaces for task distribution and real-time token streaming used by both the operator and the agent runtime.
There are two independent extension points:
- TaskQueue — work submission, polling, ack/nack, and result collection.
- StreamChannel — real-time token chunk delivery for SSE streaming.
Both follow the same registration pattern: importing a backend package (e.g. queue/redis) via a blank import registers it via init().
Index ¶
- Constants
- func Detect(url string) string
- func QueueBackends() []string
- func RegisterQueue(name string, f QueueFactory)
- func RegisterStream(name string, f StreamFactory)
- func StreamBackends() []string
- type QueueFactory
- type StreamChannel
- type StreamFactory
- type Task
- type TaskQueue
- type TaskResult
- type TokenUsage
Constants ¶
const StreamDone = "__EOF__"
StreamDone is the sentinel chunk value that signals end-of-stream.
Variables ¶
This section is empty.
Functions ¶
func Detect ¶
Detect infers a backend name from a connection URL scheme. Falls back to "redis" for unrecognised schemes.
func QueueBackends ¶
func QueueBackends() []string
QueueBackends returns the names of all registered TaskQueue backends, sorted.
func RegisterQueue ¶
func RegisterQueue(name string, f QueueFactory)
RegisterQueue makes a TaskQueue backend available under the given name. It is typically called from an init() function in the backend package.
func RegisterStream ¶
func RegisterStream(name string, f StreamFactory)
RegisterStream makes a StreamChannel backend available under the given name. It is typically called from an init() function in the backend package.
func StreamBackends ¶
func StreamBackends() []string
StreamBackends returns the names of all registered StreamChannel backends, sorted.
Types ¶
type QueueFactory ¶
QueueFactory constructs a TaskQueue from a connection URL and max retry count.
type StreamChannel ¶
type StreamChannel interface {
// Publish appends a token chunk to the named channel.
// Returns an error if the backend write fails — callers should log and continue.
Publish(key, chunk string) error
// Done signals end-of-stream and schedules cleanup on the named channel.
// Returns an error if the backend write fails.
Done(key string) error
// Read blocks briefly for the next token chunk on the named channel.
// Returns ("", nil) when no chunk is available yet, (StreamDone, nil) at end-of-stream.
Read(ctx context.Context, key string) (string, error)
}
StreamChannel is the pluggable interface for real-time token delivery. It is intentionally separate from TaskQueue so the two can use different backends (e.g. Kafka for tasks, Redis for streaming).
func NewStream ¶
func NewStream(url string) (StreamChannel, error)
NewStream creates a StreamChannel by inferring the backend from the URL scheme.
type StreamFactory ¶
type StreamFactory func(url string) (StreamChannel, error)
StreamFactory constructs a StreamChannel from a connection URL.
type Task ¶
type Task struct {
ID string
Prompt string
Meta map[string]string // arbitrary key-value pairs (e.g. stream_key, attempt)
}
Task is a unit of work delivered from the queue to an agent.
type TaskQueue ¶
type TaskQueue interface {
// Submit enqueues a task. meta carries optional key-value metadata
// (e.g. "stream_key" for SSE streaming). Returns the assigned task ID.
Submit(ctx context.Context, prompt string, meta map[string]string) (string, error)
// Poll blocks briefly for the next available task.
// Returns (nil, nil) when the queue is empty so callers can check ctx.Done().
Poll(ctx context.Context) (*Task, error)
// Ack marks a task as successfully completed and stores the result with token usage.
// The full Task is required so implementations can store the result under the original
// task ID when the task was retried (preserving the ID the flow controller tracks).
// Returns an error if the backend operation fails — callers should log and handle it.
Ack(task Task, result string, usage TokenUsage) error
// Nack marks a task as failed. Implementations handle retry and dead-letter logic.
// Returns an error if the backend operation fails — callers should log and handle it.
Nack(task Task, reason string) error
// Results returns completed task results for the given set of task IDs.
// Used by the operator to check whether running flow steps have finished.
Results(ctx context.Context, taskIDs []string) ([]TaskResult, error)
// Cancel abandons the given tasks. For each task ID the implementation should:
// - XDEL the stream entry (prevents unpolled tasks from being picked up)
// - XACK the entry (removes it from the PEL if already polled)
// - Store a "run cancelled" error result so collectResults doesn't hang
// Best-effort: errors are logged but do not fail the caller.
Cancel(ctx context.Context, taskIDs []string) error
// Close releases any resources held by the implementation.
Close()
}
TaskQueue is the pluggable interface for work distribution. Implementations register themselves via RegisterQueue and are selected by URL scheme via NewQueue.
type TaskResult ¶
type TaskResult struct {
TaskID string
Output string
Error string // non-empty when the task reached the dead-letter queue
Usage TokenUsage
}
TaskResult holds the outcome of a completed task as reported back to the operator.
type TokenUsage ¶
TokenUsage records LLM token consumption for a completed task.