Documentation
¶
Overview ¶
Package agenticdispatch provides message routing between users and agentic loops. It handles command parsing, permission checking, loop tracking, and message dispatch.
Package agenticdispatch provides message routing between users and agentic loops.
The agentic-dispatch component handles command parsing, permission checking, loop tracking, and message dispatch. It bridges input components (CLI, Slack, Discord, Web) with the agentic processing system.
Architecture ¶
┌─────────────┐ ┌─────────────────┐
│ CLI/Slack/ │ user.message.* │ │
│ Discord/ │ ─────────────────────────▶│ agentic-dispatch│
│ Web Input │ │ │
│ │◀───────────────────────── │ • Commands │
└─────────────┘ user.response.* │ • Perms │
│ • Loops │
└────────┬────────┘
│
│ agent.task.*
│ agent.signal.*
▼
┌─────────────┐
│ agentic- │
│ loop │
└─────────────┘
Command Registration ¶
Commands can be registered in two ways:
1. Global registration via init() - preferred for reusable commands:
package mycommands
import agenticdispatch "github.com/c360studio/semstreams/processor/agentic-dispatch"
func init() {
agenticdispatch.RegisterCommand("mycommand", &MyCommandExecutor{})
}
2. Per-component registration - for component-specific commands:
registry := dispatchComponent.CommandRegistry()
registry.Register("local", config, handler)
CommandExecutor Interface ¶
External commands implement the CommandExecutor interface:
type CommandExecutor interface {
Execute(ctx context.Context, cmdCtx *CommandContext, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)
Config() CommandConfig
}
The CommandContext provides access to dispatch services:
type CommandContext struct {
NATSClient *natsclient.Client // For publishing messages
LoopTracker *LoopTracker // For tracking loops
Logger *slog.Logger // For logging
HasPermission func(userID, permission string) bool // For permission checks
}
Built-in Commands ¶
The agentic-dispatch component provides these built-in commands:
- /cancel [loop_id] - Cancel current or specified loop
- /status [loop_id] - Show loop status
- /loops - List active loops
- /help - Show available commands
Permissions ¶
Commands can require permissions:
- view - View status, loops, history
- submit_task - Submit new tasks
- cancel_own - Cancel own loops
- cancel_any - Cancel any loop (admin)
- approve - Approve/reject results
NATS Subjects ¶
The agentic-dispatch component uses these subject patterns:
- user.message.{channel}.{id} - Incoming user messages
- user.response.{channel}.{id} - Outgoing responses
- agent.task.{task_id} - Task dispatch to agentic-loop
- agent.signal.{loop_id} - Signals to agentic-loop
- agent.complete.{loop_id} - Completion events from agentic-loop
JetStream Integration ¶
All messaging uses JetStream for durability:
- User messages consumed from USER stream
- Agent tasks published to AGENT stream
- Completion events consumed from AGENT stream
Consumer naming follows the pattern: agentic-dispatch-{port-name}
Default stream is USER (not AGENT) since this component bridges user input to the agentic system.
Package agenticdispatch provides Prometheus metrics for agentic-dispatch component.
Index ¶
- Variables
- func ClearGlobalCommands()
- func ListRegisteredCommands() map[string]CommandExecutor
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- func RegisterCommand(name string, executor CommandExecutor) error
- type ActivityEvent
- type CommandConfig
- type CommandContext
- type CommandExecutor
- type CommandHandler
- type CommandRegistry
- func (r *CommandRegistry) All() map[string]CommandConfig
- func (r *CommandRegistry) Count() int
- func (r *CommandRegistry) Get(name string) (*RegisteredCommand, bool)
- func (r *CommandRegistry) Match(input string) (string, *RegisteredCommand, []string, bool)
- func (r *CommandRegistry) Register(name string, config CommandConfig, handler CommandHandler) error
- type Component
- func (c *Component) CommandRegistry() *CommandRegistry
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) LoopTracker() *LoopTracker
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type DebugConfig
- type DebugState
- type HTTPMessageRequest
- type HTTPMessageResponse
- type LoopInfo
- type LoopTracker
- func (t *LoopTracker) Count() int
- func (t *LoopTracker) Get(loopID string) *LoopInfo
- func (t *LoopTracker) GetActiveLoop(userID, channelID string) string
- func (t *LoopTracker) GetAllLoops() []*LoopInfo
- func (t *LoopTracker) GetUserLoops(userID string) []*LoopInfo
- func (t *LoopTracker) Remove(loopID string)
- func (t *LoopTracker) SendSignal(ctx context.Context, nc *natsclient.Client, loopID, signalType, reason string) error
- func (t *LoopTracker) SetLogger(logger *slog.Logger)
- func (t *LoopTracker) Track(info *LoopInfo)
- func (t *LoopTracker) UpdateCompletion(loopID, outcome, result, errMsg string) error
- func (t *LoopTracker) UpdateContextRequestID(loopID, contextRequestID string) bool
- func (t *LoopTracker) UpdateIterations(loopID string, iterations int)
- func (t *LoopTracker) UpdateState(loopID, state string)
- func (t *LoopTracker) UpdateWorkflowContext(loopID, workflowSlug, workflowStep string) bool
- type PermissionConfig
- type RegisteredCommand
- type SignalMessage
- type SignalRequest
- type SignalResponse
Constants ¶
This section is empty.
Variables ¶
var ErrNATSClientNil = errs.ErrNoConnection
ErrNATSClientNil is returned when NATS client is nil.
Functions ¶
func ClearGlobalCommands ¶
func ClearGlobalCommands()
ClearGlobalCommands removes all globally registered commands. This is intended for testing use only.
func ListRegisteredCommands ¶
func ListRegisteredCommands() map[string]CommandExecutor
ListRegisteredCommands returns a copy of all globally registered commands. The returned map is safe to mutate without affecting the internal registry.
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new router component
func RegisterCommand ¶
func RegisterCommand(name string, executor CommandExecutor) error
RegisterCommand registers a command executor globally via init(). Returns an error if the command name is empty or already registered. Panics if executor is nil (programmer error).
Types ¶
type ActivityEvent ¶
type ActivityEvent struct {
Type string `json:"type"` // loop_created, loop_updated, loop_deleted
LoopID string `json:"loop_id"`
Timestamp time.Time `json:"timestamp"`
Data json.RawMessage `json:"data,omitempty"`
}
ActivityEvent represents a real-time activity event sent via SSE.
type CommandConfig ¶
type CommandConfig struct {
Pattern string `json:"pattern"` // Regex pattern to match
Permission string `json:"permission"` // Required permission
RequireLoop bool `json:"require_loop"` // Requires an active loop
Help string `json:"help"` // Help text
}
CommandConfig defines a command's configuration
type CommandContext ¶
type CommandContext struct {
NATSClient *natsclient.Client
LoopTracker *LoopTracker
Logger *slog.Logger
HasPermission func(userID, permission string) bool
}
CommandContext provides services to command executors
type CommandExecutor ¶
type CommandExecutor interface {
Execute(ctx context.Context, cmdCtx *CommandContext, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)
Config() CommandConfig
}
CommandExecutor is the interface for command implementations
type CommandHandler ¶
type CommandHandler func(ctx context.Context, msg agentic.UserMessage, args []string, loopID string) (agentic.UserResponse, error)
CommandHandler is a function that handles a command
type CommandRegistry ¶
type CommandRegistry struct {
// contains filtered or unexported fields
}
CommandRegistry manages command registration and matching
func NewCommandRegistry ¶
func NewCommandRegistry() *CommandRegistry
NewCommandRegistry creates a new CommandRegistry
func (*CommandRegistry) All ¶
func (r *CommandRegistry) All() map[string]CommandConfig
All returns all registered commands
func (*CommandRegistry) Count ¶
func (r *CommandRegistry) Count() int
Count returns the number of registered commands
func (*CommandRegistry) Get ¶
func (r *CommandRegistry) Get(name string) (*RegisteredCommand, bool)
Get retrieves a registered command by name
func (*CommandRegistry) Match ¶
func (r *CommandRegistry) Match(input string) (string, *RegisteredCommand, []string, bool)
Match finds a command matching the input Returns the command name, matched command, captured groups, and whether a match was found
func (*CommandRegistry) Register ¶
func (r *CommandRegistry) Register(name string, config CommandConfig, handler CommandHandler) error
Register adds a command to the registry
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the router processor
func (*Component) CommandRegistry ¶
func (c *Component) CommandRegistry() *CommandRegistry
CommandRegistry returns the command registry for external registration
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize prepares the component
func (*Component) InputPorts ¶
InputPorts returns input port definitions
func (*Component) LoopTracker ¶
func (c *Component) LoopTracker() *LoopTracker
LoopTracker returns the loop tracker
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions
func (*Component) RegisterHTTPHandlers ¶
RegisterHTTPHandlers registers HTTP endpoints for agentic-dispatch. This enables synchronous message processing via HTTP for web clients and E2E tests.
type Config ¶
type Config struct {
DefaultRole string `json:"default_role" schema:"type:string,description:Default role for new tasks,default:general,category:basic,required"`
AutoContinue bool `json:"auto_continue" schema:"type:bool,description:Automatically continue last active loop,default:true,category:basic"` // Continue last loop if exists
Permissions PermissionConfig `json:"permissions" schema:"type:object,description:Permission configuration,category:advanced"`
StreamName string `json:"stream_name" schema:"type:string,description:NATS stream name for user messages,default:USER,category:advanced"`
ConsumerNameSuffix string `` /* 137-byte string literal not displayed */
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}
Config represents the configuration for the router processor. Model selection is resolved from the unified model registry (component.Dependencies.ModelRegistry).
type DebugConfig ¶
type DebugConfig struct {
DefaultRole string `json:"default_role"`
DefaultModel string `json:"default_model"` // Resolved from model registry
AutoContinue bool `json:"auto_continue"`
StreamName string `json:"stream_name"`
}
DebugConfig contains non-sensitive configuration for debugging.
type DebugState ¶
type DebugState struct {
Started bool `json:"started"`
StartTime time.Time `json:"start_time,omitempty"`
Uptime string `json:"uptime,omitempty"`
LoopCount int `json:"loop_count"`
CommandCount int `json:"command_count"`
Loops []*LoopInfo `json:"loops"`
Commands []string `json:"commands"`
Config DebugConfig `json:"config"`
}
DebugState represents the internal state of the component for debugging.
type HTTPMessageRequest ¶
type HTTPMessageRequest struct {
Content string `json:"content"`
UserID string `json:"user_id,omitempty"`
ChannelType string `json:"channel_type,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
ReplyTo string `json:"reply_to,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
HTTPMessageRequest represents a message request via HTTP. This is the request format for the POST /message endpoint.
type HTTPMessageResponse ¶
type HTTPMessageResponse struct {
ResponseID string `json:"response_id"`
Type string `json:"type"`
Content string `json:"content"`
InReplyTo string `json:"in_reply_to,omitempty"`
Error string `json:"error,omitempty"`
Timestamp string `json:"timestamp"`
}
HTTPMessageResponse represents the response from the HTTP message endpoint.
type LoopInfo ¶
type LoopInfo struct {
LoopID string `json:"loop_id"`
TaskID string `json:"task_id"`
UserID string `json:"user_id"`
ChannelType string `json:"channel_type"`
ChannelID string `json:"channel_id"`
State string `json:"state"`
Iterations int `json:"iterations"`
MaxIterations int `json:"max_iterations"`
CreatedAt time.Time `json:"created_at"`
// Workflow context (for loops created by workflow commands)
WorkflowSlug string `json:"workflow_slug,omitempty"` // e.g., "add-user-auth"
WorkflowStep string `json:"workflow_step,omitempty"` // e.g., "design"
// Context assembly reference (links to assembled context)
ContextRequestID string `json:"context_request_id,omitempty"`
// Completion data (populated when loop completes)
Outcome string `json:"outcome,omitempty"` // success, failed, cancelled
Result string `json:"result,omitempty"` // LLM response content
Error string `json:"error,omitempty"` // Error message on failure
CompletedAt time.Time `json:"completed_at,omitempty"` // When the loop completed
}
LoopInfo contains information about an active loop
type LoopTracker ¶
type LoopTracker struct {
// contains filtered or unexported fields
}
LoopTracker tracks active loops per user and channel
func NewLoopTrackerWithLogger ¶
func NewLoopTrackerWithLogger(logger *slog.Logger) *LoopTracker
NewLoopTrackerWithLogger creates a new LoopTracker with logging.
func (*LoopTracker) Count ¶
func (t *LoopTracker) Count() int
Count returns the number of tracked loops
func (*LoopTracker) Get ¶
func (t *LoopTracker) Get(loopID string) *LoopInfo
Get retrieves loop info by ID
func (*LoopTracker) GetActiveLoop ¶
func (t *LoopTracker) GetActiveLoop(userID, channelID string) string
GetActiveLoop returns the most recent active loop for a user/channel
func (*LoopTracker) GetAllLoops ¶
func (t *LoopTracker) GetAllLoops() []*LoopInfo
GetAllLoops returns all tracked loops
func (*LoopTracker) GetUserLoops ¶
func (t *LoopTracker) GetUserLoops(userID string) []*LoopInfo
GetUserLoops returns all loops for a specific user
func (*LoopTracker) Remove ¶
func (t *LoopTracker) Remove(loopID string)
Remove removes a loop from the tracker
func (*LoopTracker) SendSignal ¶
func (t *LoopTracker) SendSignal(ctx context.Context, nc *natsclient.Client, loopID, signalType, reason string) error
SendSignal publishes a control signal to a loop via NATS.
func (*LoopTracker) SetLogger ¶
func (t *LoopTracker) SetLogger(logger *slog.Logger)
SetLogger sets the logger for the LoopTracker.
func (*LoopTracker) Track ¶
func (t *LoopTracker) Track(info *LoopInfo)
Track adds or updates a loop in the tracker
func (*LoopTracker) UpdateCompletion ¶
func (t *LoopTracker) UpdateCompletion(loopID, outcome, result, errMsg string) error
UpdateCompletion updates a loop with completion data (outcome, result, error). This is called when a loop finishes to populate fields for SSE delivery. It also updates the State field to match the terminal state implied by the outcome.
func (*LoopTracker) UpdateContextRequestID ¶
func (t *LoopTracker) UpdateContextRequestID(loopID, contextRequestID string) bool
UpdateContextRequestID atomically updates the context request ID for a loop. Returns true if the update was applied (loop exists and had no context request ID).
func (*LoopTracker) UpdateIterations ¶
func (t *LoopTracker) UpdateIterations(loopID string, iterations int)
UpdateIterations updates the iteration count of a loop
func (*LoopTracker) UpdateState ¶
func (t *LoopTracker) UpdateState(loopID, state string)
UpdateState updates the state of a loop
func (*LoopTracker) UpdateWorkflowContext ¶
func (t *LoopTracker) UpdateWorkflowContext(loopID, workflowSlug, workflowStep string) bool
UpdateWorkflowContext atomically updates the workflow context for a loop. Returns true if the update was applied (loop exists and had no workflow context).
type PermissionConfig ¶
type PermissionConfig struct {
View []string `json:"view"` // Who can view status, loops, history
SubmitTask []string `json:"submit_task"` // Who can submit new tasks
CancelOwn bool `json:"cancel_own"` // Users can cancel their own loops
CancelAny []string `json:"cancel_any"` // Who can cancel any loop
Approve []string `json:"approve"` // Who can approve results
}
PermissionConfig defines permission rules for the router
type RegisteredCommand ¶
type RegisteredCommand struct {
Name string
Config CommandConfig
Pattern *regexp.Regexp
Handler CommandHandler
}
RegisteredCommand contains a command's config and handler
type SignalMessage ¶
type SignalMessage struct {
LoopID string `json:"loop_id"`
Type string `json:"type"` // pause, resume, cancel
Reason string `json:"reason"` // optional reason
Timestamp time.Time `json:"timestamp"`
}
SignalMessage represents a control signal sent to a loop.
func (*SignalMessage) MarshalJSON ¶
func (s *SignalMessage) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler
func (*SignalMessage) Schema ¶
func (s *SignalMessage) Schema() message.Type
Schema implements message.Payload
func (*SignalMessage) UnmarshalJSON ¶
func (s *SignalMessage) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler
func (*SignalMessage) Validate ¶
func (s *SignalMessage) Validate() error
Validate implements message.Payload
type SignalRequest ¶
type SignalRequest struct {
Type string `json:"type"` // pause, resume, cancel
Reason string `json:"reason"` // optional reason
}
SignalRequest represents a control signal request for a loop.