modality

package module
v0.0.0-...-dddc2fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package modality provides the bus and pipeline abstractions for multi-modal communication in CogOS.

A modality bus routes typed messages (text, voice, structured data) between producers and consumers through a Gate/Decoder/Encoder pipeline. Each sensory modality (text, voice, vision, spatial) implements the Module interface.

The package is organized in layers:

  • types.go: Core interfaces (Module, Gate, Decoder, Encoder) and data types
  • bus.go: Bus orchestration and module lifecycle
  • wire.go: D2 wire protocol for subprocess communication (JSON-lines)
  • channel.go: Channel capability declarations and session binding
  • events.go: Event type constants and data structures
  • salience.go: Attentional field scoring with exponential decay
  • supervisor.go: Process supervision with health monitoring and restart
  • text.go: Reference Module implementation (identity passthrough)

Index

Constants

View Source
const (
	EventInput       = "modality.input"
	EventOutput      = "modality.output"
	EventTransform   = "modality.transform"
	EventGate        = "modality.gate"
	EventStateChange = "modality.state_change"
	EventError       = "modality.error"
)

Modality event type constants for the CogOS ledger.

View Source
const DecayThreshold = 0.01

DecayThreshold is the minimum score; entries below this are pruned.

View Source
const DefaultDecayHalfLife = 5 * time.Minute

DefaultDecayHalfLife is the default half-life for exponential decay.

View Source
const MaxWireLineSize = 1024 * 1024

MaxWireLineSize is the scanner buffer limit (1 MB) for base64 audio chunks.

Variables

This section is empty.

Functions

func RequireFields

func RequireFields(eventType string, pairs ...string) error

RequireFields returns an error naming the first empty field, or nil.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus orchestrates modality modules. It is the sensorimotor boundary between the agent and its environment.

func NewBus

func NewBus() *Bus

NewBus creates a bus with default configuration.

func (*Bus) Act

func (b *Bus) Act(intent *CognitiveIntent) (*EncodedOutput, error)

Act encodes a cognitive intent into raw output via the target modality's encoder.

func (*Bus) HUD

func (b *Bus) HUD() map[string]any

HUD returns current bus state for the agent's context assembly.

func (*Bus) Health

func (b *Bus) Health() map[ModalityType]ModuleStatus

Health returns the current status of every registered module.

func (*Bus) LogEvent

func (b *Bus) LogEvent(ev BusEvent)

LogEvent appends a BusEvent, trimming to maxEvents.

func (*Bus) Order

func (b *Bus) Order() []ModalityType

Order returns the module registration order (for HUD formatting).

func (*Bus) Perceive

func (b *Bus) Perceive(raw []byte, modality ModalityType, channel string) (*CognitiveEvent, error)

Perceive runs raw input through gate (if present) -> decode -> cognitive event. Returns (nil, nil) when the gate rejects input (rejection is not an error).

func (*Bus) Register

func (b *Bus) Register(module Module) error

Register adds a modality module. Errors if the modality is already registered.

func (*Bus) Start

func (b *Bus) Start(ctx context.Context) error

Start starts all registered modules in registration order.

func (*Bus) Stop

func (b *Bus) Stop(ctx context.Context) error

Stop stops all registered modules in reverse registration order.

type BusEvent

type BusEvent struct {
	Type      string         `json:"type"`
	Modality  string         `json:"modality"`
	Channel   string         `json:"channel"`
	Timestamp time.Time      `json:"timestamp"`
	Data      map[string]any `json:"data,omitempty"`
}

BusEvent is an internal log entry for the HUD.

type ChannelAdapter

type ChannelAdapter interface {
	// Connect joins a session, declaring supported modalities.
	Connect(sessionID string, descriptor *ChannelDescriptor) error

	// Disconnect leaves a session.
	Disconnect(sessionID string) error

	// HandleIncoming sends raw input from the channel to the kernel.
	HandleIncoming(channelID string, modality ModalityType, raw []byte) error

	// Deliver sends output from the kernel to the channel.
	Deliver(channelID string, output *EncodedOutput) error
}

ChannelAdapter is the interface a channel transport must implement.

type ChannelConnection

type ChannelConnection struct {
	ID         string         `json:"id"`
	Modalities []ModalityType `json:"modalities"`
}

ChannelConnection tracks an active channel (placeholder for A5).

type ChannelDescriptor

type ChannelDescriptor struct {
	ID         string         `json:"id"`                 // "discord-text", "claude-code", etc.
	Transport  string         `json:"transport"`          // "openclaw-gateway", "mcp", "http", "stdio"
	Input      []ModalityType `json:"input"`              // modalities this channel can receive
	Output     []ModalityType `json:"output"`             // modalities this channel can deliver
	SessionKey string         `json:"session_key"`        // pattern for session binding, e.g. "discord:{guild}:{channel}:{user}"
	Metadata   map[string]any `json:"metadata,omitempty"` // transport-specific data (guild ID, thread ID, etc.)
}

ChannelDescriptor declares a channel's identity and capabilities.

func (*ChannelDescriptor) SupportsOutput

func (d *ChannelDescriptor) SupportsOutput(m ModalityType) bool

SupportsOutput reports whether this channel can deliver the given modality.

type ChannelRegistry

type ChannelRegistry struct {
	// contains filtered or unexported fields
}

ChannelRegistry manages active channel connections. All methods are safe for concurrent use.

func NewChannelRegistry

func NewChannelRegistry() *ChannelRegistry

NewChannelRegistry creates an empty registry.

func (*ChannelRegistry) BindToSession

func (r *ChannelRegistry) BindToSession(channelID, sessionID string) error

BindToSession associates a channel with a session.

func (*ChannelRegistry) ChannelsForSession

func (r *ChannelRegistry) ChannelsForSession(sessionID string) []*ChannelDescriptor

ChannelsForSession returns all channel descriptors bound to a session.

func (*ChannelRegistry) Get

func (r *ChannelRegistry) Get(channelID string) (*ChannelDescriptor, bool)

Get returns a channel descriptor by ID.

func (*ChannelRegistry) Register

func (r *ChannelRegistry) Register(desc *ChannelDescriptor) error

Register adds a channel descriptor. Returns an error if a channel with the same ID is already registered.

func (*ChannelRegistry) Snapshot

func (r *ChannelRegistry) Snapshot() map[string]*ChannelDescriptor

Snapshot returns a copy of all registered channels, keyed by ID. Intended for the agent HUD and diagnostic views.

func (*ChannelRegistry) SupportsModality

func (r *ChannelRegistry) SupportsModality(sessionID string, modality ModalityType) []*ChannelDescriptor

SupportsModality returns channels in a session that support a given modality for output. Used by the kernel to fan out responses.

func (*ChannelRegistry) UnbindFromSession

func (r *ChannelRegistry) UnbindFromSession(channelID, sessionID string) error

UnbindFromSession removes a channel from a session.

func (*ChannelRegistry) Unregister

func (r *ChannelRegistry) Unregister(channelID string) error

Unregister removes a channel and unbinds it from all sessions.

type CognitiveEvent

type CognitiveEvent struct {
	Modality   ModalityType   `json:"modality"`
	Channel    string         `json:"channel"`
	Content    string         `json:"content"`
	Confidence float64        `json:"confidence,omitempty"`
	Metadata   map[string]any `json:"metadata,omitempty"`
	Timestamp  time.Time      `json:"timestamp"`
}

CognitiveEvent represents a decoded perception -- raw signal transformed into meaning. This is what the agent "sees" after a decoder processes raw input (e.g., transcribed text from audio, a caption from an image).

type CognitiveIntent

type CognitiveIntent struct {
	Modality ModalityType   `json:"modality"`
	Channel  string         `json:"channel"`
	Content  string         `json:"content"`
	Params   map[string]any `json:"params,omitempty"`
}

CognitiveIntent represents a desire to act -- meaning to be encoded into raw signal. This is what the agent "says" before an encoder transforms it into output (e.g., text into synthesized speech).

type Decoder

type Decoder interface {
	Decode(raw []byte, modality ModalityType, channel string) (*CognitiveEvent, error)
}

Decoder transforms raw signal into cognitive events. This is the inbound half of the sensorimotor boundary (e.g., STT for voice, OCR for vision).

type EncodedOutput

type EncodedOutput struct {
	Modality ModalityType   `json:"modality"`
	Data     []byte         `json:"data,omitempty"`
	MimeType string         `json:"mime_type,omitempty"`
	Duration time.Duration  `json:"duration,omitempty"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

EncodedOutput represents the result of encoding an intent into raw signal, ready for channel delivery (e.g., WAV audio bytes, PNG image).

type Encoder

type Encoder interface {
	Encode(intent *CognitiveIntent) (*EncodedOutput, error)
}

Encoder transforms cognitive intents into raw signal. This is the outbound half of the sensorimotor boundary (e.g., TTS for voice, image generation for vision).

type ErrorData

type ErrorData struct {
	Modality    string `json:"modality"`
	Module      string `json:"module"`
	Error       string `json:"error"`
	ErrorType   string `json:"error_type,omitempty"`
	Recoverable bool   `json:"recoverable,omitempty"`
}

ErrorData is the Data payload for a modality.error event.

type EventListener

type EventListener func(eventType string, data map[string]any)

EventListener receives modality events for real-time processing.

type FieldRequiredError

type FieldRequiredError struct {
	EventType string
	Field     string
}

FieldRequiredError is returned when a required event field is empty.

func (*FieldRequiredError) Error

func (e *FieldRequiredError) Error() string

type Gate

type Gate interface {
	Check(raw []byte, modality ModalityType) (*GateResult, error)
}

Gate filters incoming signals -- the perception threshold. A gate decides whether raw input contains signal worth decoding (e.g., VAD for voice, change detection for vision).

type GateData

type GateData struct {
	Modality    string  `json:"modality"`
	Channel     string  `json:"channel"`
	Decision    string  `json:"decision"`
	Confidence  float64 `json:"confidence"`
	SpeechRatio float64 `json:"speech_ratio,omitempty"`
	DurationMs  int     `json:"duration_ms,omitempty"`
	Gate        string  `json:"gate,omitempty"`
}

GateData is the Data payload for a modality.gate event.

type GateResult

type GateResult struct {
	Allowed    bool    `json:"allowed"`
	Confidence float64 `json:"confidence"`
	Reason     string  `json:"reason,omitempty"`
}

GateResult represents the decision of an input gate -- whether raw input should pass through to the decoder for processing.

type InputData

type InputData struct {
	Modality       string  `json:"modality"`
	Channel        string  `json:"channel"`
	Transcript     string  `json:"transcript"`
	GateConfidence float64 `json:"gate_confidence,omitempty"`
	SpeechRatio    float64 `json:"speech_ratio,omitempty"`
	LatencyMs      int     `json:"latency_ms,omitempty"`
	Engine         string  `json:"engine,omitempty"`
}

InputData is the Data payload for a modality.input event.

type ManagedModule

type ManagedModule struct {
	Name, Command string
	Args          []string
	Conn          *SubprocessConn
	Status        ModuleStatus
	PID           int
	StartedAt     time.Time
	Restarts      int
	MaxRestarts   int
	LastError     string
	// contains filtered or unexported fields
}

ManagedModule tracks a supervised subprocess.

type ModalityType

type ModalityType string

ModalityType identifies the sensory modality a module handles.

const (
	Text    ModalityType = "text"
	Voice   ModalityType = "voice"
	Vision  ModalityType = "vision"
	Spatial ModalityType = "spatial"
)

type Module

type Module interface {
	// Type returns the modality this module handles.
	Type() ModalityType

	// Gate returns the input gate, or nil for passthrough modalities
	// that accept all input (e.g., text).
	Gate() Gate

	// Decoder returns the signal-to-event decoder.
	Decoder() Decoder

	// Encoder returns the intent-to-signal encoder.
	Encoder() Encoder

	// State returns current operational state for health monitoring
	// and the agent's HUD.
	State() *ModuleState

	// Start initializes the module. For subprocess-backed modules,
	// this spawns the Python child process and waits for readiness.
	Start(ctx context.Context) error

	// Stop gracefully shuts down the module, sending shutdown commands
	// to any child processes and waiting for clean exit.
	Stop(ctx context.Context) error

	// Health returns the current operational status of the module.
	Health() ModuleStatus
}

Module is the full module contract. Each sensory modality (text, voice, vision, spatial) implements this interface to participate in the modality bus.

type ModuleState

type ModuleState struct {
	Status    ModuleStatus   `json:"status"`
	Modality  ModalityType   `json:"modality"`
	PID       int            `json:"pid,omitempty"`
	Uptime    time.Duration  `json:"uptime,omitempty"`
	LastError string         `json:"last_error,omitempty"`
	Metrics   map[string]any `json:"metrics,omitempty"`
}

ModuleState represents the current operational state of a modality module, used for health monitoring and the agent's HUD.

type ModuleStatus

type ModuleStatus string

ModuleStatus represents the operational status of a modality module.

const (
	StatusStarting ModuleStatus = "starting"
	StatusHealthy  ModuleStatus = "healthy"
	StatusDegraded ModuleStatus = "degraded"
	StatusStopped  ModuleStatus = "stopped"
	StatusCrashed  ModuleStatus = "crashed"
)

type OutputData

type OutputData struct {
	Modality    string  `json:"modality"`
	Channel     string  `json:"channel"`
	Text        string  `json:"text"`
	Engine      string  `json:"engine,omitempty"`
	Voice       string  `json:"voice,omitempty"`
	RTF         float64 `json:"rtf,omitempty"`
	DurationSec float64 `json:"duration_sec,omitempty"`
	LatencyMs   int     `json:"latency_ms,omitempty"`
}

OutputData is the Data payload for a modality.output event.

type ProcessSupervisor

type ProcessSupervisor struct {
	// contains filtered or unexported fields
}

ProcessSupervisor manages subprocess lifecycles with health monitoring.

func NewProcessSupervisor

func NewProcessSupervisor(rootDir string) *ProcessSupervisor

NewProcessSupervisor creates a new supervisor rooted at rootDir.

func (*ProcessSupervisor) Conn

func (s *ProcessSupervisor) Conn(name string) *SubprocessConn

Conn returns the SubprocessConn for a named module, or nil.

func (*ProcessSupervisor) ModuleStatus

func (s *ProcessSupervisor) ModuleStatus(name string) (*ModuleState, error)

ModuleStatus returns the current state of a named module.

func (*ProcessSupervisor) Register

func (s *ProcessSupervisor) Register(cfg *SupervisorConfig) error

Register registers a subprocess configuration.

func (*ProcessSupervisor) Restart

func (s *ProcessSupervisor) Restart(ctx context.Context, name string) error

Restart stops and restarts a managed subprocess.

func (*ProcessSupervisor) Start

func (s *ProcessSupervisor) Start(ctx context.Context, name string) error

Start spawns a subprocess, waits for its ready signal, and starts health monitoring.

func (*ProcessSupervisor) StatusAll

func (s *ProcessSupervisor) StatusAll() map[string]*ModuleState

StatusAll returns the state of all managed modules.

func (*ProcessSupervisor) Stop

func (s *ProcessSupervisor) Stop(_ context.Context, name string) error

Stop gracefully stops a managed subprocess.

type Salience

type Salience struct {
	// contains filtered or unexported fields
}

Salience extends the attentional field with modality event scoring.

func NewSalience

func NewSalience() *Salience

NewSalience creates a new modality salience tracker.

func (*Salience) Decay

func (ms *Salience) Decay(now time.Time, halfLife time.Duration)

Decay applies exponential decay to all entries and prunes those below DecayThreshold. score *= 0.5 ^ (elapsed / halfLife)

func (*Salience) OnEvent

func (ms *Salience) OnEvent(eventType string, data map[string]any)

OnEvent implements EventListener for the EventRouter. It extracts modality and channel from data, builds a key, and boosts the score.

func (*Salience) Score

func (ms *Salience) Score(key string) float64

Score returns the current score for a single key, or 0 if absent.

func (*Salience) Snapshot

func (ms *Salience) Snapshot() map[string]float64

Snapshot returns all scores as a flat map (for HUD integration).

func (*Salience) TopN

func (ms *Salience) TopN(n int) []*SalienceEntry

TopN returns the N highest-scoring entries, sorted descending.

type SalienceEntry

type SalienceEntry struct {
	Key       string    `json:"key"`
	Score     float64   `json:"score"`
	LastEvent time.Time `json:"last_event"`
	EventType string    `json:"event_type"`
	Count     int       `json:"count"`
}

SalienceEntry tracks a single salience score with exponential decay.

type StateChangeData

type StateChangeData struct {
	Modality  string `json:"modality"`
	Module    string `json:"module"`
	FromState string `json:"from_state"`
	ToState   string `json:"to_state"`
	PID       int    `json:"pid,omitempty"`
}

StateChangeData is the Data payload for a modality.state_change event.

type SubprocessConn

type SubprocessConn struct {
	// contains filtered or unexported fields
}

SubprocessConn manages a single subprocess connected via stdin/stdout JSON-lines.

func NewSubprocessConn

func NewSubprocessConn(module, command, stderrLogPath string, args ...string) (*SubprocessConn, error)

NewSubprocessConn spawns a child process and wires stdin/stdout pipes. Stderr is appended to stderrLogPath (pass "" to discard).

func (*SubprocessConn) Close

func (c *SubprocessConn) Close() error

Close gracefully shuts down the subprocess. Escalation sequence: shutdown command -> 5s wait -> SIGTERM -> 2s wait -> SIGKILL.

func (*SubprocessConn) HealthCheck

func (c *SubprocessConn) HealthCheck(timeout time.Duration) (*WireMessage, error)

HealthCheck sends a health command and waits for the response within timeout.

func (*SubprocessConn) Module

func (c *SubprocessConn) Module() string

Module returns the module name (e.g. "tts", "vad", "stt").

func (*SubprocessConn) NextRequestID

func (c *SubprocessConn) NextRequestID() string

NextRequestID generates a unique request ID for this connection.

func (*SubprocessConn) PID

func (c *SubprocessConn) PID() int

PID returns the OS process ID of the subprocess.

func (*SubprocessConn) Receive

func (c *SubprocessConn) Receive() (*WireMessage, error)

Receive reads one JSON line from subprocess stdout and decodes it.

func (*SubprocessConn) Request

func (c *SubprocessConn) Request(module, operation string, data map[string]any) (*WireMessage, error)

Request sends a request and blocks until the matching response arrives. Intermediate events are skipped. Error-type responses become Go errors.

func (*SubprocessConn) Send

func (c *SubprocessConn) Send(msg *WireMessage) error

Send writes a single WireMessage as a JSON line to subprocess stdin.

func (*SubprocessConn) SendCommand

func (c *SubprocessConn) SendCommand(command string) error

SendCommand sends a command message (e.g. "shutdown", "config").

type SupervisorConfig

type SupervisorConfig struct {
	Name           string
	Command        string
	Args           []string
	MaxRestarts    int           // default 5
	HealthInterval time.Duration // default 10s
	StderrLogDir   string        // default ".cog/run/modality/{name}"
}

SupervisorConfig configures a managed subprocess.

type TextModule

type TextModule struct {
	// contains filtered or unexported fields
}

TextModule implements Module for text — pure passthrough. No subprocess, no transformation. Text is the identity modality.

func NewTextModule

func NewTextModule() *TextModule

NewTextModule creates a text modality module.

func (*TextModule) Decoder

func (m *TextModule) Decoder() Decoder

Decoder returns the text decoder.

func (*TextModule) Encoder

func (m *TextModule) Encoder() Encoder

Encoder returns the text encoder.

func (*TextModule) Gate

func (m *TextModule) Gate() Gate

Gate returns nil — text has no input gate, all input passes through.

func (*TextModule) Health

func (m *TextModule) Health() ModuleStatus

Health returns the current status.

func (*TextModule) Start

func (m *TextModule) Start(_ context.Context) error

Start sets status to healthy (no subprocess needed).

func (*TextModule) State

func (m *TextModule) State() *ModuleState

State returns the current module state.

func (*TextModule) Stop

func (m *TextModule) Stop(_ context.Context) error

Stop sets status to stopped.

func (*TextModule) Type

func (m *TextModule) Type() ModalityType

Type returns Text.

type TransformData

type TransformData struct {
	FromModality string `json:"from_modality"`
	ToModality   string `json:"to_modality"`
	Step         string `json:"step"`
	Engine       string `json:"engine,omitempty"`
	LatencyMs    int    `json:"latency_ms,omitempty"`
	InputBytes   int    `json:"input_bytes,omitempty"`
	OutputChars  int    `json:"output_chars,omitempty"`
}

TransformData is the Data payload for a modality.transform event.

type WireMessage

type WireMessage struct {
	ID   string `json:"id"`
	Type string `json:"type"` // "request", "response", "error", "command", "event"
	Ts   string `json:"ts"`
	// Request
	Module    string         `json:"module,omitempty"`
	Operation string         `json:"op,omitempty"`
	Data      map[string]any `json:"data,omitempty"`
	// Response
	Result map[string]any `json:"result,omitempty"`
	// Streaming (reserved)
	Chunk int  `json:"chunk,omitempty"`
	Done  bool `json:"done,omitempty"`
	// Command
	Command string `json:"command,omitempty"`
	// Event
	Event  string `json:"event,omitempty"`
	Status string `json:"status,omitempty"`
	// Error
	Error       string `json:"error,omitempty"`
	ErrorType   string `json:"error_type,omitempty"`
	Recoverable bool   `json:"recoverable,omitempty"`
}

WireMessage is the JSON-lines envelope for all subprocess communication. Fields are a flat union keyed by Type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL