Documentation
¶
Overview ¶
Package modality provides the bus and pipeline abstractions for multi-modal communication in CogOS.
A modality bus routes typed messages (text, voice, structured data) between producers and consumers through a Gate/Decoder/Encoder pipeline. Each sensory modality (text, voice, vision, spatial) implements the Module interface.
The package is organized in layers:
- types.go: Core interfaces (Module, Gate, Decoder, Encoder) and data types
- bus.go: Bus orchestration and module lifecycle
- wire.go: D2 wire protocol for subprocess communication (JSON-lines)
- channel.go: Channel capability declarations and session binding
- events.go: Event type constants and data structures
- salience.go: Attentional field scoring with exponential decay
- supervisor.go: Process supervision with health monitoring and restart
- text.go: Reference Module implementation (identity passthrough)
Index ¶
- Constants
- func RequireFields(eventType string, pairs ...string) error
- type Bus
- func (b *Bus) Act(intent *CognitiveIntent) (*EncodedOutput, error)
- func (b *Bus) HUD() map[string]any
- func (b *Bus) Health() map[ModalityType]ModuleStatus
- func (b *Bus) LogEvent(ev BusEvent)
- func (b *Bus) Order() []ModalityType
- func (b *Bus) Perceive(raw []byte, modality ModalityType, channel string) (*CognitiveEvent, error)
- func (b *Bus) Register(module Module) error
- func (b *Bus) Start(ctx context.Context) error
- func (b *Bus) Stop(ctx context.Context) error
- type BusEvent
- type ChannelAdapter
- type ChannelConnection
- type ChannelDescriptor
- type ChannelRegistry
- func (r *ChannelRegistry) BindToSession(channelID, sessionID string) error
- func (r *ChannelRegistry) ChannelsForSession(sessionID string) []*ChannelDescriptor
- func (r *ChannelRegistry) Get(channelID string) (*ChannelDescriptor, bool)
- func (r *ChannelRegistry) Register(desc *ChannelDescriptor) error
- func (r *ChannelRegistry) Snapshot() map[string]*ChannelDescriptor
- func (r *ChannelRegistry) SupportsModality(sessionID string, modality ModalityType) []*ChannelDescriptor
- func (r *ChannelRegistry) UnbindFromSession(channelID, sessionID string) error
- func (r *ChannelRegistry) Unregister(channelID string) error
- type CognitiveEvent
- type CognitiveIntent
- type Decoder
- type EncodedOutput
- type Encoder
- type ErrorData
- type EventListener
- type FieldRequiredError
- type Gate
- type GateData
- type GateResult
- type InputData
- type ManagedModule
- type ModalityType
- type Module
- type ModuleState
- type ModuleStatus
- type OutputData
- type ProcessSupervisor
- func (s *ProcessSupervisor) Conn(name string) *SubprocessConn
- func (s *ProcessSupervisor) ModuleStatus(name string) (*ModuleState, error)
- func (s *ProcessSupervisor) Register(cfg *SupervisorConfig) error
- func (s *ProcessSupervisor) Restart(ctx context.Context, name string) error
- func (s *ProcessSupervisor) Start(ctx context.Context, name string) error
- func (s *ProcessSupervisor) StatusAll() map[string]*ModuleState
- func (s *ProcessSupervisor) Stop(_ context.Context, name string) error
- type Salience
- type SalienceEntry
- type StateChangeData
- type SubprocessConn
- func (c *SubprocessConn) Close() error
- func (c *SubprocessConn) HealthCheck(timeout time.Duration) (*WireMessage, error)
- func (c *SubprocessConn) Module() string
- func (c *SubprocessConn) NextRequestID() string
- func (c *SubprocessConn) PID() int
- func (c *SubprocessConn) Receive() (*WireMessage, error)
- func (c *SubprocessConn) Request(module, operation string, data map[string]any) (*WireMessage, error)
- func (c *SubprocessConn) Send(msg *WireMessage) error
- func (c *SubprocessConn) SendCommand(command string) error
- type SupervisorConfig
- type TextModule
- func (m *TextModule) Decoder() Decoder
- func (m *TextModule) Encoder() Encoder
- func (m *TextModule) Gate() Gate
- func (m *TextModule) Health() ModuleStatus
- func (m *TextModule) Start(_ context.Context) error
- func (m *TextModule) State() *ModuleState
- func (m *TextModule) Stop(_ context.Context) error
- func (m *TextModule) Type() ModalityType
- type TransformData
- type WireMessage
Constants ¶
const ( EventInput = "modality.input" EventOutput = "modality.output" EventTransform = "modality.transform" EventGate = "modality.gate" EventStateChange = "modality.state_change" EventError = "modality.error" )
Modality event type constants for the CogOS ledger.
const DecayThreshold = 0.01
DecayThreshold is the minimum score; entries below this are pruned.
const DefaultDecayHalfLife = 5 * time.Minute
DefaultDecayHalfLife is the default half-life for exponential decay.
const MaxWireLineSize = 1024 * 1024
MaxWireLineSize is the scanner buffer limit (1 MB) for base64 audio chunks.
Variables ¶
This section is empty.
Functions ¶
func RequireFields ¶
RequireFields returns an error naming the first empty field, or nil.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus orchestrates modality modules. It is the sensorimotor boundary between the agent and its environment.
func (*Bus) Act ¶
func (b *Bus) Act(intent *CognitiveIntent) (*EncodedOutput, error)
Act encodes a cognitive intent into raw output via the target modality's encoder.
func (*Bus) Health ¶
func (b *Bus) Health() map[ModalityType]ModuleStatus
Health returns the current status of every registered module.
func (*Bus) Order ¶
func (b *Bus) Order() []ModalityType
Order returns the module registration order (for HUD formatting).
func (*Bus) Perceive ¶
func (b *Bus) Perceive(raw []byte, modality ModalityType, channel string) (*CognitiveEvent, error)
Perceive runs raw input through gate (if present) -> decode -> cognitive event. Returns (nil, nil) when the gate rejects input (rejection is not an error).
func (*Bus) Register ¶
Register adds a modality module. Errors if the modality is already registered.
type BusEvent ¶
type BusEvent struct {
Type string `json:"type"`
Modality string `json:"modality"`
Channel string `json:"channel"`
Timestamp time.Time `json:"timestamp"`
Data map[string]any `json:"data,omitempty"`
}
BusEvent is an internal log entry for the HUD.
type ChannelAdapter ¶
type ChannelAdapter interface {
// Connect joins a session, declaring supported modalities.
Connect(sessionID string, descriptor *ChannelDescriptor) error
// Disconnect leaves a session.
Disconnect(sessionID string) error
// HandleIncoming sends raw input from the channel to the kernel.
HandleIncoming(channelID string, modality ModalityType, raw []byte) error
// Deliver sends output from the kernel to the channel.
Deliver(channelID string, output *EncodedOutput) error
}
ChannelAdapter is the interface a channel transport must implement.
type ChannelConnection ¶
type ChannelConnection struct {
ID string `json:"id"`
Modalities []ModalityType `json:"modalities"`
}
ChannelConnection tracks an active channel (placeholder for A5).
type ChannelDescriptor ¶
type ChannelDescriptor struct {
ID string `json:"id"` // "discord-text", "claude-code", etc.
Transport string `json:"transport"` // "openclaw-gateway", "mcp", "http", "stdio"
Input []ModalityType `json:"input"` // modalities this channel can receive
Output []ModalityType `json:"output"` // modalities this channel can deliver
SessionKey string `json:"session_key"` // pattern for session binding, e.g. "discord:{guild}:{channel}:{user}"
Metadata map[string]any `json:"metadata,omitempty"` // transport-specific data (guild ID, thread ID, etc.)
}
ChannelDescriptor declares a channel's identity and capabilities.
func (*ChannelDescriptor) SupportsOutput ¶
func (d *ChannelDescriptor) SupportsOutput(m ModalityType) bool
SupportsOutput reports whether this channel can deliver the given modality.
type ChannelRegistry ¶
type ChannelRegistry struct {
// contains filtered or unexported fields
}
ChannelRegistry manages active channel connections. All methods are safe for concurrent use.
func NewChannelRegistry ¶
func NewChannelRegistry() *ChannelRegistry
NewChannelRegistry creates an empty registry.
func (*ChannelRegistry) BindToSession ¶
func (r *ChannelRegistry) BindToSession(channelID, sessionID string) error
BindToSession associates a channel with a session.
func (*ChannelRegistry) ChannelsForSession ¶
func (r *ChannelRegistry) ChannelsForSession(sessionID string) []*ChannelDescriptor
ChannelsForSession returns all channel descriptors bound to a session.
func (*ChannelRegistry) Get ¶
func (r *ChannelRegistry) Get(channelID string) (*ChannelDescriptor, bool)
Get returns a channel descriptor by ID.
func (*ChannelRegistry) Register ¶
func (r *ChannelRegistry) Register(desc *ChannelDescriptor) error
Register adds a channel descriptor. Returns an error if a channel with the same ID is already registered.
func (*ChannelRegistry) Snapshot ¶
func (r *ChannelRegistry) Snapshot() map[string]*ChannelDescriptor
Snapshot returns a copy of all registered channels, keyed by ID. Intended for the agent HUD and diagnostic views.
func (*ChannelRegistry) SupportsModality ¶
func (r *ChannelRegistry) SupportsModality(sessionID string, modality ModalityType) []*ChannelDescriptor
SupportsModality returns channels in a session that support a given modality for output. Used by the kernel to fan out responses.
func (*ChannelRegistry) UnbindFromSession ¶
func (r *ChannelRegistry) UnbindFromSession(channelID, sessionID string) error
UnbindFromSession removes a channel from a session.
func (*ChannelRegistry) Unregister ¶
func (r *ChannelRegistry) Unregister(channelID string) error
Unregister removes a channel and unbinds it from all sessions.
type CognitiveEvent ¶
type CognitiveEvent struct {
Modality ModalityType `json:"modality"`
Channel string `json:"channel"`
Content string `json:"content"`
Confidence float64 `json:"confidence,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
CognitiveEvent represents a decoded perception -- raw signal transformed into meaning. This is what the agent "sees" after a decoder processes raw input (e.g., transcribed text from audio, a caption from an image).
type CognitiveIntent ¶
type CognitiveIntent struct {
Modality ModalityType `json:"modality"`
Channel string `json:"channel"`
Content string `json:"content"`
Params map[string]any `json:"params,omitempty"`
}
CognitiveIntent represents a desire to act -- meaning to be encoded into raw signal. This is what the agent "says" before an encoder transforms it into output (e.g., text into synthesized speech).
type Decoder ¶
type Decoder interface {
Decode(raw []byte, modality ModalityType, channel string) (*CognitiveEvent, error)
}
Decoder transforms raw signal into cognitive events. This is the inbound half of the sensorimotor boundary (e.g., STT for voice, OCR for vision).
type EncodedOutput ¶
type EncodedOutput struct {
Modality ModalityType `json:"modality"`
Data []byte `json:"data,omitempty"`
MimeType string `json:"mime_type,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
EncodedOutput represents the result of encoding an intent into raw signal, ready for channel delivery (e.g., WAV audio bytes, PNG image).
type Encoder ¶
type Encoder interface {
Encode(intent *CognitiveIntent) (*EncodedOutput, error)
}
Encoder transforms cognitive intents into raw signal. This is the outbound half of the sensorimotor boundary (e.g., TTS for voice, image generation for vision).
type ErrorData ¶
type ErrorData struct {
Modality string `json:"modality"`
Module string `json:"module"`
Error string `json:"error"`
ErrorType string `json:"error_type,omitempty"`
Recoverable bool `json:"recoverable,omitempty"`
}
ErrorData is the Data payload for a modality.error event.
type EventListener ¶
EventListener receives modality events for real-time processing.
type FieldRequiredError ¶
FieldRequiredError is returned when a required event field is empty.
func (*FieldRequiredError) Error ¶
func (e *FieldRequiredError) Error() string
type Gate ¶
type Gate interface {
Check(raw []byte, modality ModalityType) (*GateResult, error)
}
Gate filters incoming signals -- the perception threshold. A gate decides whether raw input contains signal worth decoding (e.g., VAD for voice, change detection for vision).
type GateData ¶
type GateData struct {
Modality string `json:"modality"`
Channel string `json:"channel"`
Decision string `json:"decision"`
Confidence float64 `json:"confidence"`
SpeechRatio float64 `json:"speech_ratio,omitempty"`
DurationMs int `json:"duration_ms,omitempty"`
Gate string `json:"gate,omitempty"`
}
GateData is the Data payload for a modality.gate event.
type GateResult ¶
type GateResult struct {
Allowed bool `json:"allowed"`
Confidence float64 `json:"confidence"`
Reason string `json:"reason,omitempty"`
}
GateResult represents the decision of an input gate -- whether raw input should pass through to the decoder for processing.
type InputData ¶
type InputData struct {
Modality string `json:"modality"`
Channel string `json:"channel"`
Transcript string `json:"transcript"`
GateConfidence float64 `json:"gate_confidence,omitempty"`
SpeechRatio float64 `json:"speech_ratio,omitempty"`
LatencyMs int `json:"latency_ms,omitempty"`
Engine string `json:"engine,omitempty"`
}
InputData is the Data payload for a modality.input event.
type ManagedModule ¶
type ManagedModule struct {
Name, Command string
Args []string
Conn *SubprocessConn
Status ModuleStatus
PID int
StartedAt time.Time
Restarts int
MaxRestarts int
LastError string
// contains filtered or unexported fields
}
ManagedModule tracks a supervised subprocess.
type ModalityType ¶
type ModalityType string
ModalityType identifies the sensory modality a module handles.
const ( Text ModalityType = "text" Voice ModalityType = "voice" Vision ModalityType = "vision" Spatial ModalityType = "spatial" )
type Module ¶
type Module interface {
// Type returns the modality this module handles.
Type() ModalityType
// Gate returns the input gate, or nil for passthrough modalities
// that accept all input (e.g., text).
Gate() Gate
// Decoder returns the signal-to-event decoder.
Decoder() Decoder
// Encoder returns the intent-to-signal encoder.
Encoder() Encoder
// State returns current operational state for health monitoring
// and the agent's HUD.
State() *ModuleState
// Start initializes the module. For subprocess-backed modules,
// this spawns the Python child process and waits for readiness.
Start(ctx context.Context) error
// Stop gracefully shuts down the module, sending shutdown commands
// to any child processes and waiting for clean exit.
Stop(ctx context.Context) error
// Health returns the current operational status of the module.
Health() ModuleStatus
}
Module is the full module contract. Each sensory modality (text, voice, vision, spatial) implements this interface to participate in the modality bus.
type ModuleState ¶
type ModuleState struct {
Status ModuleStatus `json:"status"`
Modality ModalityType `json:"modality"`
PID int `json:"pid,omitempty"`
Uptime time.Duration `json:"uptime,omitempty"`
LastError string `json:"last_error,omitempty"`
Metrics map[string]any `json:"metrics,omitempty"`
}
ModuleState represents the current operational state of a modality module, used for health monitoring and the agent's HUD.
type ModuleStatus ¶
type ModuleStatus string
ModuleStatus represents the operational status of a modality module.
const ( StatusStarting ModuleStatus = "starting" StatusHealthy ModuleStatus = "healthy" StatusDegraded ModuleStatus = "degraded" StatusStopped ModuleStatus = "stopped" StatusCrashed ModuleStatus = "crashed" )
type OutputData ¶
type OutputData struct {
Modality string `json:"modality"`
Channel string `json:"channel"`
Text string `json:"text"`
Engine string `json:"engine,omitempty"`
Voice string `json:"voice,omitempty"`
RTF float64 `json:"rtf,omitempty"`
DurationSec float64 `json:"duration_sec,omitempty"`
LatencyMs int `json:"latency_ms,omitempty"`
}
OutputData is the Data payload for a modality.output event.
type ProcessSupervisor ¶
type ProcessSupervisor struct {
// contains filtered or unexported fields
}
ProcessSupervisor manages subprocess lifecycles with health monitoring.
func NewProcessSupervisor ¶
func NewProcessSupervisor(rootDir string) *ProcessSupervisor
NewProcessSupervisor creates a new supervisor rooted at rootDir.
func (*ProcessSupervisor) Conn ¶
func (s *ProcessSupervisor) Conn(name string) *SubprocessConn
Conn returns the SubprocessConn for a named module, or nil.
func (*ProcessSupervisor) ModuleStatus ¶
func (s *ProcessSupervisor) ModuleStatus(name string) (*ModuleState, error)
ModuleStatus returns the current state of a named module.
func (*ProcessSupervisor) Register ¶
func (s *ProcessSupervisor) Register(cfg *SupervisorConfig) error
Register registers a subprocess configuration.
func (*ProcessSupervisor) Restart ¶
func (s *ProcessSupervisor) Restart(ctx context.Context, name string) error
Restart stops and restarts a managed subprocess.
func (*ProcessSupervisor) Start ¶
func (s *ProcessSupervisor) Start(ctx context.Context, name string) error
Start spawns a subprocess, waits for its ready signal, and starts health monitoring.
func (*ProcessSupervisor) StatusAll ¶
func (s *ProcessSupervisor) StatusAll() map[string]*ModuleState
StatusAll returns the state of all managed modules.
type Salience ¶
type Salience struct {
// contains filtered or unexported fields
}
Salience extends the attentional field with modality event scoring.
func NewSalience ¶
func NewSalience() *Salience
NewSalience creates a new modality salience tracker.
func (*Salience) Decay ¶
Decay applies exponential decay to all entries and prunes those below DecayThreshold. score *= 0.5 ^ (elapsed / halfLife)
func (*Salience) OnEvent ¶
OnEvent implements EventListener for the EventRouter. It extracts modality and channel from data, builds a key, and boosts the score.
func (*Salience) TopN ¶
func (ms *Salience) TopN(n int) []*SalienceEntry
TopN returns the N highest-scoring entries, sorted descending.
type SalienceEntry ¶
type SalienceEntry struct {
Key string `json:"key"`
Score float64 `json:"score"`
LastEvent time.Time `json:"last_event"`
EventType string `json:"event_type"`
Count int `json:"count"`
}
SalienceEntry tracks a single salience score with exponential decay.
type StateChangeData ¶
type StateChangeData struct {
Modality string `json:"modality"`
Module string `json:"module"`
FromState string `json:"from_state"`
ToState string `json:"to_state"`
PID int `json:"pid,omitempty"`
}
StateChangeData is the Data payload for a modality.state_change event.
type SubprocessConn ¶
type SubprocessConn struct {
// contains filtered or unexported fields
}
SubprocessConn manages a single subprocess connected via stdin/stdout JSON-lines.
func NewSubprocessConn ¶
func NewSubprocessConn(module, command, stderrLogPath string, args ...string) (*SubprocessConn, error)
NewSubprocessConn spawns a child process and wires stdin/stdout pipes. Stderr is appended to stderrLogPath (pass "" to discard).
func (*SubprocessConn) Close ¶
func (c *SubprocessConn) Close() error
Close gracefully shuts down the subprocess. Escalation sequence: shutdown command -> 5s wait -> SIGTERM -> 2s wait -> SIGKILL.
func (*SubprocessConn) HealthCheck ¶
func (c *SubprocessConn) HealthCheck(timeout time.Duration) (*WireMessage, error)
HealthCheck sends a health command and waits for the response within timeout.
func (*SubprocessConn) Module ¶
func (c *SubprocessConn) Module() string
Module returns the module name (e.g. "tts", "vad", "stt").
func (*SubprocessConn) NextRequestID ¶
func (c *SubprocessConn) NextRequestID() string
NextRequestID generates a unique request ID for this connection.
func (*SubprocessConn) PID ¶
func (c *SubprocessConn) PID() int
PID returns the OS process ID of the subprocess.
func (*SubprocessConn) Receive ¶
func (c *SubprocessConn) Receive() (*WireMessage, error)
Receive reads one JSON line from subprocess stdout and decodes it.
func (*SubprocessConn) Request ¶
func (c *SubprocessConn) Request(module, operation string, data map[string]any) (*WireMessage, error)
Request sends a request and blocks until the matching response arrives. Intermediate events are skipped. Error-type responses become Go errors.
func (*SubprocessConn) Send ¶
func (c *SubprocessConn) Send(msg *WireMessage) error
Send writes a single WireMessage as a JSON line to subprocess stdin.
func (*SubprocessConn) SendCommand ¶
func (c *SubprocessConn) SendCommand(command string) error
SendCommand sends a command message (e.g. "shutdown", "config").
type SupervisorConfig ¶
type SupervisorConfig struct {
Name string
Command string
Args []string
MaxRestarts int // default 5
HealthInterval time.Duration // default 10s
StderrLogDir string // default ".cog/run/modality/{name}"
}
SupervisorConfig configures a managed subprocess.
type TextModule ¶
type TextModule struct {
// contains filtered or unexported fields
}
TextModule implements Module for text — pure passthrough. No subprocess, no transformation. Text is the identity modality.
func (*TextModule) Decoder ¶
func (m *TextModule) Decoder() Decoder
Decoder returns the text decoder.
func (*TextModule) Encoder ¶
func (m *TextModule) Encoder() Encoder
Encoder returns the text encoder.
func (*TextModule) Gate ¶
func (m *TextModule) Gate() Gate
Gate returns nil — text has no input gate, all input passes through.
func (*TextModule) Health ¶
func (m *TextModule) Health() ModuleStatus
Health returns the current status.
func (*TextModule) Start ¶
func (m *TextModule) Start(_ context.Context) error
Start sets status to healthy (no subprocess needed).
func (*TextModule) State ¶
func (m *TextModule) State() *ModuleState
State returns the current module state.
type TransformData ¶
type TransformData struct {
FromModality string `json:"from_modality"`
ToModality string `json:"to_modality"`
Step string `json:"step"`
Engine string `json:"engine,omitempty"`
LatencyMs int `json:"latency_ms,omitempty"`
InputBytes int `json:"input_bytes,omitempty"`
OutputChars int `json:"output_chars,omitempty"`
}
TransformData is the Data payload for a modality.transform event.
type WireMessage ¶
type WireMessage struct {
ID string `json:"id"`
Type string `json:"type"` // "request", "response", "error", "command", "event"
Ts string `json:"ts"`
// Request
Module string `json:"module,omitempty"`
Operation string `json:"op,omitempty"`
Data map[string]any `json:"data,omitempty"`
// Response
Result map[string]any `json:"result,omitempty"`
// Streaming (reserved)
Chunk int `json:"chunk,omitempty"`
Done bool `json:"done,omitempty"`
// Command
Command string `json:"command,omitempty"`
// Event
Event string `json:"event,omitempty"`
Status string `json:"status,omitempty"`
// Error
Error string `json:"error,omitempty"`
ErrorType string `json:"error_type,omitempty"`
Recoverable bool `json:"recoverable,omitempty"`
}
WireMessage is the JSON-lines envelope for all subprocess communication. Fields are a flat union keyed by Type.