queue

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package queue defines the pluggable interfaces for task distribution and real-time token streaming used by both the operator and the agent runtime.

There are two independent extension points:

  • TaskQueue — work submission, polling, ack/nack, and result collection.
  • StreamChannel — real-time token chunk delivery for SSE streaming.

Both follow the same registration pattern: importing a backend package (e.g. queue/redis) via a blank import registers it via init().

Index

Constants

View Source
const StreamDone = "__EOF__"

StreamDone is the sentinel chunk value that signals end-of-stream.

Variables

This section is empty.

Functions

func Detect

func Detect(url string) string

Detect infers a backend name from a connection URL scheme. Falls back to "redis" for unrecognised schemes.

func QueueBackends

func QueueBackends() []string

QueueBackends returns the names of all registered TaskQueue backends, sorted.

func RegisterQueue

func RegisterQueue(name string, f QueueFactory)

RegisterQueue makes a TaskQueue backend available under the given name. It is typically called from an init() function in the backend package.

func RegisterStream

func RegisterStream(name string, f StreamFactory)

RegisterStream makes a StreamChannel backend available under the given name. It is typically called from an init() function in the backend package.

func StreamBackends

func StreamBackends() []string

StreamBackends returns the names of all registered StreamChannel backends, sorted.

Types

type QueueFactory

type QueueFactory func(url string, maxRetries int) (TaskQueue, error)

QueueFactory constructs a TaskQueue from a connection URL and max retry count.

type StreamChannel

type StreamChannel interface {
	// Publish appends a token chunk to the named channel.
	// Returns an error if the backend write fails — callers should log and continue.
	Publish(key, chunk string) error

	// Done signals end-of-stream and schedules cleanup on the named channel.
	// Returns an error if the backend write fails.
	Done(key string) error

	// Read blocks briefly for the next token chunk on the named channel.
	// Returns ("", nil) when no chunk is available yet, (StreamDone, nil) at end-of-stream.
	Read(ctx context.Context, key string) (string, error)
}

StreamChannel is the pluggable interface for real-time token delivery. It is intentionally separate from TaskQueue so the two can use different backends (e.g. Kafka for tasks, Redis for streaming).

func NewStream

func NewStream(url string) (StreamChannel, error)

NewStream creates a StreamChannel by inferring the backend from the URL scheme.

type StreamFactory

type StreamFactory func(url string) (StreamChannel, error)

StreamFactory constructs a StreamChannel from a connection URL.

type Task

type Task struct {
	ID     string
	Prompt string
	Meta   map[string]string // arbitrary key-value pairs (e.g. stream_key, attempt)
}

Task is a unit of work delivered from the queue to an agent.

type TaskQueue

type TaskQueue interface {
	// Submit enqueues a task. meta carries optional key-value metadata
	// (e.g. "stream_key" for SSE streaming). Returns the assigned task ID.
	Submit(ctx context.Context, prompt string, meta map[string]string) (string, error)

	// Poll blocks briefly for the next available task.
	// Returns (nil, nil) when the queue is empty so callers can check ctx.Done().
	Poll(ctx context.Context) (*Task, error)

	// Ack marks a task as successfully completed and stores the result with token usage.
	// The full Task is required so implementations can store the result under the original
	// task ID when the task was retried (preserving the ID the flow controller tracks).
	// Returns an error if the backend operation fails — callers should log and handle it.
	Ack(task Task, result string, usage TokenUsage) error

	// Nack marks a task as failed. Implementations handle retry and dead-letter logic.
	// Returns an error if the backend operation fails — callers should log and handle it.
	Nack(task Task, reason string) error

	// Results returns completed task results for the given set of task IDs.
	// Used by the operator to check whether running flow steps have finished.
	Results(ctx context.Context, taskIDs []string) ([]TaskResult, error)

	// Cancel abandons the given tasks. For each task ID the implementation should:
	//   - XDEL the stream entry (prevents unpolled tasks from being picked up)
	//   - XACK the entry (removes it from the PEL if already polled)
	//   - Store a "run cancelled" error result so collectResults doesn't hang
	// Best-effort: errors are logged but do not fail the caller.
	Cancel(ctx context.Context, taskIDs []string) error

	// Close releases any resources held by the implementation.
	Close()
}

TaskQueue is the pluggable interface for work distribution. Implementations register themselves via RegisterQueue and are selected by URL scheme via NewQueue.

func NewQueue

func NewQueue(url string, maxRetries int) (TaskQueue, error)

NewQueue creates a TaskQueue by inferring the backend from the URL scheme.

type TaskResult

type TaskResult struct {
	TaskID string
	Output string
	Error  string // non-empty when the task reached the dead-letter queue
	Usage  TokenUsage
}

TaskResult holds the outcome of a completed task as reported back to the operator.

type TokenUsage

type TokenUsage struct {
	InputTokens  int64
	OutputTokens int64
}

TokenUsage records LLM token consumption for a completed task.

Directories

Path Synopsis
Package redis registers Redis Streams-backed implementations of TaskQueue and StreamChannel.
Package redis registers Redis Streams-backed implementations of TaskQueue and StreamChannel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL