websocket

package
v0.14.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Message batching
	MaxBatchSize = 50

	// Throttling
	MaxMessagesPerSecond = 100

	// Backpressure threshold
	MaxLagMessages = 1000

	// WebSocket configuration
	WriteWait      = 10 * time.Second
	PongWait       = 60 * time.Second
	PingPeriod     = (PongWait * 9) / 10
	MaxMessageSize = 10 * 1024 // 10KB

	// Database polling for agent messages (agents write directly to DB)
	PollInterval = 500 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AckEvent

type AckEvent struct {
	ThreadID string `json:"thread_id"`
	AckSeq   int    `json:"ack_seq"` // Last message_seq processed
}

AckEvent - Client acknowledges messages up to a sequence number

func ParseAck

func ParseAck(event *Event) (*AckEvent, error)

ParseAck parses an ack event

type BatchEvent

type BatchEvent struct {
	ThreadID string         `json:"thread_id"`
	Messages []MessageEvent `json:"messages"`
	HasMore  bool           `json:"has_more"` // More messages available after this batch
}

BatchEvent - Server sends multiple messages in a batch

type BroadcastMessage

type BroadcastMessage struct {
	ThreadID string
	Message  *messaging.Message
}

BroadcastMessage represents a message to be broadcast to subscribers

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents a WebSocket client connection

type ErrorEvent

type ErrorEvent struct {
	Code    string `json:"code"`
	Message string `json:"message"`
}

ErrorEvent - Server sends an error to client

type Event

type Event struct {
	Type      EventType       `json:"type"`
	Timestamp int64           `json:"timestamp"` // Unix milliseconds
	Data      json.RawMessage `json:"data,omitempty"`
}

Event represents a WebSocket message envelope

func NewBatchEvent

func NewBatchEvent(threadID string, messages []MessageEvent, hasMore bool) (*Event, error)

NewBatchEvent creates a batch event

func NewErrorEvent

func NewErrorEvent(code, message string) (*Event, error)

NewErrorEvent creates an error event

func NewEvent

func NewEvent(eventType EventType, data interface{}) (*Event, error)

NewEvent creates a new event with timestamp

func NewInboxMessageEvent

func NewInboxMessageEvent(msg *InboxMessageEvent) (*Event, error)

NewInboxMessageEvent creates an inbox message event

func NewMessageEvent

func NewMessageEvent(msg *MessageEvent) (*Event, error)

NewMessageEvent creates a message event from a message

func NewPingEvent

func NewPingEvent() (*Event, error)

NewPingEvent creates a ping event

func NewPongEvent

func NewPongEvent() (*Event, error)

NewPongEvent creates a pong event

func NewTaskStreamEvent

func NewTaskStreamEvent(stream *TaskStreamEvent) (*Event, error)

NewTaskStreamEvent creates a task stream event

func NewTelemetryEvent

func NewTelemetryEvent(telem *TelemetryEvent) (*Event, error)

NewTelemetryEvent creates a telemetry event

func NewThreadStateEvent

func NewThreadStateEvent(threadID, status string, lastSeq int, updatedAt time.Time) (*Event, error)

NewThreadStateEvent creates a thread state event

func ParseEvent

func ParseEvent(data []byte) (*Event, error)

ParseEvent parses an event from JSON bytes

func (*Event) ToJSON

func (e *Event) ToJSON() ([]byte, error)

ToJSON serializes an event to JSON bytes

type EventType

type EventType string

EventType represents the type of WebSocket event

const (
	EventTypeSubscribe    EventType = "subscribe"
	EventTypeAck          EventType = "ack"
	EventTypeMessage      EventType = "message"
	EventTypeBatch        EventType = "batch"
	EventTypeError        EventType = "error"
	EventTypePing         EventType = "ping"
	EventTypePong         EventType = "pong"
	EventTypeThreadState  EventType = "thread_state"
	EventTypeTelemetry    EventType = "telemetry"     // Process telemetry updates
	EventTypeInboxMessage EventType = "inbox_message" // Async inbox messages
	EventTypeTaskStream   EventType = "task_stream"   // Task execution streaming events
)

type InboxMessageEvent

type InboxMessageEvent struct {
	ID            string `json:"id"`
	MessageID     string `json:"message_id"`
	CorrelationID string `json:"correlation_id,omitempty"`
	FromAgent     string `json:"from_agent"`
	ToInbox       string `json:"to_inbox"`
	MessageType   string `json:"message_type"`
	Title         string `json:"title"`
	Payload       string `json:"payload,omitempty"`
	Status        string `json:"status"`
	CreatedAt     int64  `json:"created_at"`
}

InboxMessageEvent - Server sends async inbox message updates

type MessageEvent

type MessageEvent struct {
	ID           string `json:"id"`
	ThreadID     string `json:"thread_id"`
	MessageSeq   int    `json:"message_seq"`
	CreatedAt    int64  `json:"created_at"`
	FromType     string `json:"from_type"`
	FromID       string `json:"from_id"`
	ToType       string `json:"to_type"`
	ToID         string `json:"to_id"`
	Kind         string `json:"kind"`
	Subject      string `json:"subject,omitempty"`
	Content      string `json:"content"`
	MetadataJSON string `json:"metadata_json,omitempty"`
}

MessageEvent - Server sends a single message to client

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server manages WebSocket connections and message broadcasting

func NewServer

func NewServer(store messaging.MessageStore) *Server

NewServer creates a new WebSocket server

func (*Server) BroadcastInboxMessage

func (s *Server) BroadcastInboxMessage(msg *messaging.InboxMessage)

BroadcastInboxMessage broadcasts an inbox message to ALL connected clients This is used for real-time notification of new async messages

func (*Server) BroadcastMessage

func (s *Server) BroadcastMessage(threadID string, msg *messaging.Message)

BroadcastMessage broadcasts a message to all subscribers of a thread

func (*Server) BroadcastTaskEvent

func (s *Server) BroadcastTaskEvent(stream *TaskStreamEvent)

BroadcastTaskEvent broadcasts a task streaming event to ALL connected clients This is used for real-time task execution monitoring

func (*Server) BroadcastTelemetry

func (s *Server) BroadcastTelemetry(telem *TelemetryEvent)

BroadcastTelemetry broadcasts a telemetry update to ALL connected clients This is used for real-time process monitoring updates

func (*Server) GetConnectionCount

func (s *Server) GetConnectionCount() int

GetConnectionCount returns the number of active connections

func (*Server) HandleWebSocket

func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request)

HandleWebSocket handles WebSocket upgrade requests

func (*Server) Run

func (s *Server) Run()

Run starts the WebSocket server event loop

func (*Server) SetToken

func (s *Server) SetToken(token string)

SetToken sets the authentication token for external WebSocket clients.

type SubscribeEvent

type SubscribeEvent struct {
	ThreadID string `json:"thread_id"`
	FromSeq  int    `json:"from_seq"` // Resume from this sequence (0 = from beginning)
}

SubscribeEvent - Client subscribes to a thread

func ParseSubscribe

func ParseSubscribe(event *Event) (*SubscribeEvent, error)

ParseSubscribe parses a subscribe event

type Subscription

type Subscription struct {
	ThreadID     string
	FromSeq      int
	LastAckSeq   int
	SubscribedAt time.Time
}

Subscription tracks a client's subscription to a thread

type TaskStreamEvent

type TaskStreamEvent struct {
	TaskID      string              `json:"task_id"`
	ThreadID    string              `json:"thread_id,omitempty"`
	StreamType  TaskStreamEventType `json:"stream_type"`
	TurnNum     int                 `json:"turn_num,omitempty"`
	Text        string              `json:"text,omitempty"`
	ToolName    string              `json:"tool_name,omitempty"`
	ToolInput   string              `json:"tool_input,omitempty"`
	ToolOutput  string              `json:"tool_output,omitempty"`
	ErrorMsg    string              `json:"error_msg,omitempty"`
	Status      string              `json:"status,omitempty"` // running, completed, failed
	TokensIn    int                 `json:"tokens_in,omitempty"`
	TokensOut   int                 `json:"tokens_out,omitempty"`
	Cost        float64             `json:"cost,omitempty"`
	DurationSec int                 `json:"duration_sec,omitempty"`

	// Context fields for task identification (added for dashboard event queue)
	Workspace     string `json:"workspace,omitempty"`      // Working directory path
	Directive     string `json:"directive,omitempty"`      // Initial user prompt (truncated)
	DirectiveFull string `json:"directive_full,omitempty"` // Full directive (for detail views)
	AgentID       string `json:"agent_id,omitempty"`       // Agent identifier (e.g., "design-doc-creator")
	SourceType    string `json:"source_type,omitempty"`    // Event source: coordinator, eval, github, direct
}

TaskStreamEvent - Server sends task execution streaming events

type TaskStreamEventType

type TaskStreamEventType string

TaskStreamEventType represents the type of task streaming event

const (
	TaskStreamTurnStart  TaskStreamEventType = "turn_start"
	TaskStreamText       TaskStreamEventType = "text"
	TaskStreamToolUse    TaskStreamEventType = "tool_use"
	TaskStreamToolResult TaskStreamEventType = "tool_result"
	TaskStreamTurnEnd    TaskStreamEventType = "turn_end"
	TaskStreamError      TaskStreamEventType = "error"
	TaskStreamStatus     TaskStreamEventType = "status" // Task status changes
)

type TelemetryEvent

type TelemetryEvent struct {
	InstanceID  string  `json:"instance_id"`
	PID         int     `json:"pid"`
	Turns       int     `json:"turns"`
	TokensIn    int     `json:"tokens_in"`
	TokensOut   int     `json:"tokens_out"`
	Cost        float64 `json:"cost"`
	Status      string  `json:"status"` // running, completed, error
	DurationSec int     `json:"duration_sec"`
}

TelemetryEvent - Server sends process telemetry updates

type ThreadStateEvent

type ThreadStateEvent struct {
	ThreadID  string `json:"thread_id"`
	Status    string `json:"status"`
	LastSeq   int    `json:"last_seq"`
	UpdatedAt int64  `json:"updated_at"`
}

ThreadStateEvent - Server sends thread state updates

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL