agenticdispatch

package
v1.0.0-alpha.45 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 21 Imported by: 0

README

Agentic Dispatch Component

Message routing between users and agentic loops with command parsing, permissions, and loop tracking.

Overview

The agentic-dispatch component is the central hub for user interaction with the agentic system. It:

  • Parses commands from user messages
  • Checks permissions before executing commands
  • Tracks active loops per user and channel
  • Routes tasks to the agentic-loop component
  • Delivers responses back to users

Configuration

{
  "default_role": "general",
  "default_model": "qwen2.5-coder:32b",
  "auto_continue": true,
  "stream_name": "USER",
  "permissions": {
    "view": ["*"],
    "submit_task": ["*"],
    "cancel_own": true,
    "cancel_any": ["admin"],
    "approve": ["admin", "reviewer"]
  }
}
Configuration Options
Option Type Default Description
default_role string "general" Default role for tasks
default_model string "" Default model for tasks
auto_continue bool true Auto-continue conversations
stream_name string "USER" JetStream stream for user messages
permissions object (see above) Permission configuration
JetStream Integration

All messaging uses JetStream for durability:

  • User messages consumed from USER stream
  • Agent tasks published to AGENT stream
  • Completion events consumed from AGENT stream

Consumer naming: agentic-dispatch-{port-name}

Built-in Commands

Command Permission Description
/cancel [id] cancel_own Cancel current or specified loop
/status [id] view Show loop status
/loops view List your active loops
/help (none) Show available commands

Custom Command Registration

External packages can register custom commands using the global init() pattern:

package semspec

import (
    "context"
    "github.com/c360/semstreams/agentic"
    agenticdispatch "github.com/c360/semstreams/processor/agentic-dispatch"
)

func init() {
    agenticdispatch.RegisterCommand("spec", &SpecCommand{})
}

type SpecCommand struct{}

func (c *SpecCommand) Config() agenticdispatch.CommandConfig {
    return agenticdispatch.CommandConfig{
        Pattern:     `^/spec\s*(.*)$`,
        Permission:  "submit_task",
        RequireLoop: false,
        Help:        "/spec [name] - Run spec-driven development",
    }
}

func (c *SpecCommand) Execute(
    ctx context.Context,
    cmdCtx *agenticdispatch.CommandContext,
    msg agentic.UserMessage,
    args []string,
    loopID string,
) (agentic.UserResponse, error) {
    // Use cmdCtx.NATSClient to publish messages
    // Use cmdCtx.LoopTracker to track loops
    // Use cmdCtx.HasPermission for permission checks
    // Use cmdCtx.Logger for logging

    return agentic.UserResponse{
        ResponseID:  "...",
        ChannelType: msg.ChannelType,
        ChannelID:   msg.ChannelID,
        UserID:      msg.UserID,
        Type:        agentic.ResponseTypeStatus,
        Content:     "Spec workflow started",
    }, nil
}

CommandExecutor Interface

type CommandExecutor interface {
    Execute(ctx context.Context, cmdCtx *CommandContext, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)
    Config() CommandConfig
}

CommandContext

The CommandContext provides access to agentic-dispatch services:

type CommandContext struct {
    NATSClient    *natsclient.Client                      // Publish NATS messages
    LoopTracker   *LoopTracker                            // Track active loops
    Logger        *slog.Logger                            // Structured logging
    HasPermission func(userID, permission string) bool    // Check permissions
}

CommandConfig

type CommandConfig struct {
    Pattern     string  // Regex pattern with capture groups for args
    Permission  string  // Required permission (empty = no permission required)
    RequireLoop bool    // Whether command requires an active loop
    Help        string  // Help text shown in /help output
}

NATS Subjects

Subject Direction Description
user.message.{channel}.{id} Subscribe User input from channels
user.response.{channel}.{id} Publish Responses to users
agent.task.{task_id} Publish Task dispatch
agent.signal.{loop_id} Publish Signals (cancel, pause)
agent.complete.{loop_id} Subscribe Completion events

Metrics

Metric Type Labels Description
router_messages_received_total counter channel_type Messages received
router_commands_executed_total counter command Commands executed
router_tasks_submitted_total counter Tasks submitted
router_loops_active gauge Currently active loops
router_routing_duration_seconds histogram Message routing latency

Integration Example

// Register commands before creating agentic-dispatch component
func init() {
    agenticdispatch.RegisterCommand("mycommand", &MyCommand{})
}

// Commands are automatically loaded when component starts
comp, _ := agenticdispatch.NewComponent(config, deps)
comp.Start(ctx)

See Also

Documentation

Overview

Package agenticdispatch provides message routing between users and agentic loops. It handles command parsing, permission checking, loop tracking, and message dispatch.

Package agenticdispatch provides message routing between users and agentic loops.

The agentic-dispatch component handles command parsing, permission checking, loop tracking, and message dispatch. It bridges input components (CLI, Slack, Discord, Web) with the agentic processing system.

Architecture

┌─────────────┐                           ┌─────────────────┐
│  CLI/Slack/ │     user.message.*        │                 │
│  Discord/   │ ─────────────────────────▶│ agentic-dispatch│
│  Web Input  │                           │                 │
│             │◀───────────────────────── │  • Commands     │
└─────────────┘     user.response.*       │  • Perms        │
                                          │  • Loops        │
                                          └────────┬────────┘
                                                   │
                                                   │ agent.task.*
                                                   │ agent.signal.*
                                                   ▼
                                          ┌─────────────┐
                                          │ agentic-    │
                                          │ loop        │
                                          └─────────────┘

Command Registration

Commands can be registered in two ways:

1. Global registration via init() - preferred for reusable commands:

package mycommands

import agenticdispatch "github.com/c360studio/semstreams/processor/agentic-dispatch"

func init() {
    agenticdispatch.RegisterCommand("mycommand", &MyCommandExecutor{})
}

2. Per-component registration - for component-specific commands:

registry := dispatchComponent.CommandRegistry()
registry.Register("local", config, handler)

CommandExecutor Interface

External commands implement the CommandExecutor interface:

type CommandExecutor interface {
    Execute(ctx context.Context, cmdCtx *CommandContext, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)
    Config() CommandConfig
}

The CommandContext provides access to dispatch services:

type CommandContext struct {
    NATSClient    *natsclient.Client       // For publishing messages
    LoopTracker   *LoopTracker             // For tracking loops
    Logger        *slog.Logger             // For logging
    HasPermission func(userID, permission string) bool  // For permission checks
}

Built-in Commands

The agentic-dispatch component provides these built-in commands:

  • /cancel [loop_id] - Cancel current or specified loop
  • /status [loop_id] - Show loop status
  • /loops - List active loops
  • /help - Show available commands

Permissions

Commands can require permissions:

  • view - View status, loops, history
  • submit_task - Submit new tasks
  • cancel_own - Cancel own loops
  • cancel_any - Cancel any loop (admin)
  • approve - Approve/reject results

NATS Subjects

The agentic-dispatch component uses these subject patterns:

  • user.message.{channel}.{id} - Incoming user messages
  • user.response.{channel}.{id} - Outgoing responses
  • agent.task.{task_id} - Task dispatch to agentic-loop
  • agent.signal.{loop_id} - Signals to agentic-loop
  • agent.complete.{loop_id} - Completion events from agentic-loop

JetStream Integration

All messaging uses JetStream for durability:

  • User messages consumed from USER stream
  • Agent tasks published to AGENT stream
  • Completion events consumed from AGENT stream

Consumer naming follows the pattern: agentic-dispatch-{port-name}

Default stream is USER (not AGENT) since this component bridges user input to the agentic system.

Package agenticdispatch provides Prometheus metrics for agentic-dispatch component.

Index

Constants

This section is empty.

Variables

View Source
var ErrNATSClientNil = errs.ErrNoConnection

ErrNATSClientNil is returned when NATS client is nil.

Functions

func ClearGlobalCommands

func ClearGlobalCommands()

ClearGlobalCommands removes all globally registered commands. This is intended for testing use only.

func ListRegisteredCommands

func ListRegisteredCommands() map[string]CommandExecutor

ListRegisteredCommands returns a copy of all globally registered commands. The returned map is safe to mutate without affecting the internal registry.

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new router component

func Register

func Register(registry *component.Registry) error

Register registers the router component factory with the registry

func RegisterCommand

func RegisterCommand(name string, executor CommandExecutor) error

RegisterCommand registers a command executor globally via init(). Returns an error if the command name is empty or already registered. Panics if executor is nil (programmer error).

Types

type ActivityEvent

type ActivityEvent struct {
	Type      string          `json:"type"` // loop_created, loop_updated, loop_deleted
	LoopID    string          `json:"loop_id"`
	Timestamp time.Time       `json:"timestamp"`
	Data      json.RawMessage `json:"data,omitempty"`
}

ActivityEvent represents a real-time activity event sent via SSE.

type CommandConfig

type CommandConfig struct {
	Pattern     string `json:"pattern"`      // Regex pattern to match
	Permission  string `json:"permission"`   // Required permission
	RequireLoop bool   `json:"require_loop"` // Requires an active loop
	Help        string `json:"help"`         // Help text
}

CommandConfig defines a command's configuration

type CommandContext

type CommandContext struct {
	NATSClient    *natsclient.Client
	LoopTracker   *LoopTracker
	Logger        *slog.Logger
	HasPermission func(userID, permission string) bool
}

CommandContext provides services to command executors

type CommandExecutor

type CommandExecutor interface {
	Execute(ctx context.Context, cmdCtx *CommandContext, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)
	Config() CommandConfig
}

CommandExecutor is the interface for command implementations

type CommandHandler

type CommandHandler func(ctx context.Context, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)

CommandHandler is a function that handles a command

type CommandRegistry

type CommandRegistry struct {
	// contains filtered or unexported fields
}

CommandRegistry manages command registration and matching

func NewCommandRegistry

func NewCommandRegistry() *CommandRegistry

NewCommandRegistry creates a new CommandRegistry

func (*CommandRegistry) All

func (r *CommandRegistry) All() map[string]CommandConfig

All returns all registered commands

func (*CommandRegistry) Count

func (r *CommandRegistry) Count() int

Count returns the number of registered commands

func (*CommandRegistry) Get

func (r *CommandRegistry) Get(name string) (*RegisteredCommand, bool)

Get retrieves a registered command by name

func (*CommandRegistry) Match

func (r *CommandRegistry) Match(input string) (string, *RegisteredCommand, []string, bool)

Match finds a command matching the input Returns the command name, matched command, captured groups, and whether a match was found

func (*CommandRegistry) Register

func (r *CommandRegistry) Register(name string, config CommandConfig, handler CommandHandler) error

Register adds a command to the registry

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the router processor

func (*Component) CommandRegistry

func (c *Component) CommandRegistry() *CommandRegistry

CommandRegistry returns the command registry for external registration

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions

func (*Component) LoopTracker

func (c *Component) LoopTracker() *LoopTracker

LoopTracker returns the loop tracker

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions

func (*Component) RegisterHTTPHandlers

func (c *Component) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)

RegisterHTTPHandlers registers HTTP endpoints for agentic-dispatch. This enables synchronous message processing via HTTP for web clients and E2E tests.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop halts processing with graceful shutdown

type Config

type Config struct {
	DefaultRole          string                `json:"default_role" schema:"type:string,description:Default role for new tasks,default:general,category:basic,required"`
	AutoContinue         bool                  `json:"auto_continue" schema:"type:bool,description:Automatically continue last active loop,default:true,category:basic"` // Continue last loop if exists
	Permissions          PermissionConfig      `json:"permissions" schema:"type:object,description:Permission configuration,category:advanced"`
	StreamName           string                `json:"stream_name" schema:"type:string,description:NATS stream name for user messages,default:USER,category:advanced"`
	ConsumerNameSuffix   string                `` /* 137-byte string literal not displayed */
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	Ports                *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}

Config represents the configuration for the router processor. Model selection is resolved from the unified model registry (component.Dependencies.ModelRegistry).

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration

func (Config) Validate

func (c Config) Validate() error

Validate validates the configuration

type DebugConfig

type DebugConfig struct {
	DefaultRole  string `json:"default_role"`
	DefaultModel string `json:"default_model"` // Resolved from model registry
	AutoContinue bool   `json:"auto_continue"`
	StreamName   string `json:"stream_name"`
}

DebugConfig contains non-sensitive configuration for debugging.

type DebugState

type DebugState struct {
	Started      bool        `json:"started"`
	StartTime    time.Time   `json:"start_time,omitempty"`
	Uptime       string      `json:"uptime,omitempty"`
	LoopCount    int         `json:"loop_count"`
	CommandCount int         `json:"command_count"`
	Loops        []*LoopInfo `json:"loops"`
	Commands     []string    `json:"commands"`
	Config       DebugConfig `json:"config"`
}

DebugState represents the internal state of the component for debugging.

type HTTPMessageRequest

type HTTPMessageRequest struct {
	Content     string            `json:"content"`
	UserID      string            `json:"user_id,omitempty"`
	ChannelType string            `json:"channel_type,omitempty"`
	ChannelID   string            `json:"channel_id,omitempty"`
	ReplyTo     string            `json:"reply_to,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

HTTPMessageRequest represents a message request via HTTP. This is the request format for the POST /message endpoint.

type HTTPMessageResponse

type HTTPMessageResponse struct {
	ResponseID string `json:"response_id"`
	Type       string `json:"type"`
	Content    string `json:"content"`
	InReplyTo  string `json:"in_reply_to,omitempty"`
	Error      string `json:"error,omitempty"`
	Timestamp  string `json:"timestamp"`
}

HTTPMessageResponse represents the response from the HTTP message endpoint.

type LoopInfo

type LoopInfo struct {
	LoopID        string    `json:"loop_id"`
	TaskID        string    `json:"task_id"`
	UserID        string    `json:"user_id"`
	ChannelType   string    `json:"channel_type"`
	ChannelID     string    `json:"channel_id"`
	State         string    `json:"state"`
	Iterations    int       `json:"iterations"`
	MaxIterations int       `json:"max_iterations"`
	CreatedAt     time.Time `json:"created_at"`

	// Workflow context (for loops created by workflow commands)
	WorkflowSlug string `json:"workflow_slug,omitempty"` // e.g., "add-user-auth"
	WorkflowStep string `json:"workflow_step,omitempty"` // e.g., "design"

	// Context assembly reference (links to assembled context)
	ContextRequestID string `json:"context_request_id,omitempty"`

	// Completion data (populated when loop completes)
	Outcome     string    `json:"outcome,omitempty"`      // success, failed, cancelled
	Result      string    `json:"result,omitempty"`       // LLM response content
	Error       string    `json:"error,omitempty"`        // Error message on failure
	CompletedAt time.Time `json:"completed_at,omitempty"` // When the loop completed
}

LoopInfo contains information about an active loop

type LoopTracker

type LoopTracker struct {
	// contains filtered or unexported fields
}

LoopTracker tracks active loops per user and channel

func NewLoopTracker

func NewLoopTracker() *LoopTracker

NewLoopTracker creates a new LoopTracker

func NewLoopTrackerWithLogger

func NewLoopTrackerWithLogger(logger *slog.Logger) *LoopTracker

NewLoopTrackerWithLogger creates a new LoopTracker with logging.

func (*LoopTracker) Count

func (t *LoopTracker) Count() int

Count returns the number of tracked loops

func (*LoopTracker) Get

func (t *LoopTracker) Get(loopID string) *LoopInfo

Get retrieves loop info by ID

func (*LoopTracker) GetActiveLoop

func (t *LoopTracker) GetActiveLoop(userID, channelID string) string

GetActiveLoop returns the most recent active loop for a user/channel

func (*LoopTracker) GetAllLoops

func (t *LoopTracker) GetAllLoops() []*LoopInfo

GetAllLoops returns all tracked loops

func (*LoopTracker) GetUserLoops

func (t *LoopTracker) GetUserLoops(userID string) []*LoopInfo

GetUserLoops returns all loops for a specific user

func (*LoopTracker) Remove

func (t *LoopTracker) Remove(loopID string)

Remove removes a loop from the tracker

func (*LoopTracker) SendSignal

func (t *LoopTracker) SendSignal(ctx context.Context, nc *natsclient.Client, loopID, signalType, reason string) error

SendSignal publishes a control signal to a loop via NATS.

func (*LoopTracker) SetLogger

func (t *LoopTracker) SetLogger(logger *slog.Logger)

SetLogger sets the logger for the LoopTracker.

func (*LoopTracker) Track

func (t *LoopTracker) Track(info *LoopInfo)

Track adds or updates a loop in the tracker

func (*LoopTracker) UpdateCompletion

func (t *LoopTracker) UpdateCompletion(loopID, outcome, result, errMsg string) error

UpdateCompletion updates a loop with completion data (outcome, result, error). This is called when a loop finishes to populate fields for SSE delivery. It also updates the State field to match the terminal state implied by the outcome.

func (*LoopTracker) UpdateContextRequestID

func (t *LoopTracker) UpdateContextRequestID(loopID, contextRequestID string) bool

UpdateContextRequestID atomically updates the context request ID for a loop. Returns true if the update was applied (loop exists and had no context request ID).

func (*LoopTracker) UpdateIterations

func (t *LoopTracker) UpdateIterations(loopID string, iterations int)

UpdateIterations updates the iteration count of a loop

func (*LoopTracker) UpdateState

func (t *LoopTracker) UpdateState(loopID, state string)

UpdateState updates the state of a loop

func (*LoopTracker) UpdateWorkflowContext

func (t *LoopTracker) UpdateWorkflowContext(loopID, workflowSlug, workflowStep string) bool

UpdateWorkflowContext atomically updates the workflow context for a loop. Returns true if the update was applied (loop exists and had no workflow context).

type PermissionConfig

type PermissionConfig struct {
	View       []string `json:"view"`        // Who can view status, loops, history
	SubmitTask []string `json:"submit_task"` // Who can submit new tasks
	CancelOwn  bool     `json:"cancel_own"`  // Users can cancel their own loops
	CancelAny  []string `json:"cancel_any"`  // Who can cancel any loop
	Approve    []string `json:"approve"`     // Who can approve results
}

PermissionConfig defines permission rules for the router

type RegisteredCommand

type RegisteredCommand struct {
	Name    string
	Config  CommandConfig
	Pattern *regexp.Regexp
	Handler CommandHandler
}

RegisteredCommand contains a command's config and handler

type SignalMessage

type SignalMessage struct {
	LoopID    string    `json:"loop_id"`
	Type      string    `json:"type"`   // pause, resume, cancel
	Reason    string    `json:"reason"` // optional reason
	Timestamp time.Time `json:"timestamp"`
}

SignalMessage represents a control signal sent to a loop.

func (*SignalMessage) MarshalJSON

func (s *SignalMessage) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler

func (*SignalMessage) Schema

func (s *SignalMessage) Schema() message.Type

Schema implements message.Payload

func (*SignalMessage) UnmarshalJSON

func (s *SignalMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler

func (*SignalMessage) Validate

func (s *SignalMessage) Validate() error

Validate implements message.Payload

type SignalRequest

type SignalRequest struct {
	Type   string `json:"type"`   // pause, resume, cancel
	Reason string `json:"reason"` // optional reason
}

SignalRequest represents a control signal request for a loop.

type SignalResponse

type SignalResponse struct {
	LoopID    string `json:"loop_id"`
	Signal    string `json:"signal"`
	Accepted  bool   `json:"accepted"`
	Message   string `json:"message,omitempty"`
	Timestamp string `json:"timestamp"`
}

SignalResponse represents the response to a signal request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL