Documentation
¶
Index ¶
- Constants
- type AckEvent
- type BatchEvent
- type BroadcastMessage
- type Connection
- type ErrorEvent
- type Event
- func NewBatchEvent(threadID string, messages []MessageEvent, hasMore bool) (*Event, error)
- func NewErrorEvent(code, message string) (*Event, error)
- func NewEvent(eventType EventType, data interface{}) (*Event, error)
- func NewInboxMessageEvent(msg *InboxMessageEvent) (*Event, error)
- func NewMessageEvent(msg *MessageEvent) (*Event, error)
- func NewPingEvent() (*Event, error)
- func NewPongEvent() (*Event, error)
- func NewTaskStreamEvent(stream *TaskStreamEvent) (*Event, error)
- func NewTelemetryEvent(telem *TelemetryEvent) (*Event, error)
- func NewThreadStateEvent(threadID, status string, lastSeq int, updatedAt time.Time) (*Event, error)
- func ParseEvent(data []byte) (*Event, error)
- type EventType
- type InboxMessageEvent
- type MessageEvent
- type Server
- func (s *Server) BroadcastInboxMessage(msg *messaging.InboxMessage)
- func (s *Server) BroadcastMessage(threadID string, msg *messaging.Message)
- func (s *Server) BroadcastTaskEvent(stream *TaskStreamEvent)
- func (s *Server) BroadcastTelemetry(telem *TelemetryEvent)
- func (s *Server) GetConnectionCount() int
- func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request)
- func (s *Server) Run()
- func (s *Server) SetToken(token string)
- type SubscribeEvent
- type Subscription
- type TaskStreamEvent
- type TaskStreamEventType
- type TelemetryEvent
- type ThreadStateEvent
Constants ¶
const ( // Message batching MaxBatchSize = 50 // Throttling MaxMessagesPerSecond = 100 // Backpressure threshold MaxLagMessages = 1000 // WebSocket configuration WriteWait = 10 * time.Second PongWait = 60 * time.Second PingPeriod = (PongWait * 9) / 10 MaxMessageSize = 10 * 1024 // 10KB // Database polling for agent messages (agents write directly to DB) PollInterval = 500 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckEvent ¶
type AckEvent struct {
ThreadID string `json:"thread_id"`
AckSeq int `json:"ack_seq"` // Last message_seq processed
}
AckEvent - Client acknowledges messages up to a sequence number
type BatchEvent ¶
type BatchEvent struct {
ThreadID string `json:"thread_id"`
Messages []MessageEvent `json:"messages"`
HasMore bool `json:"has_more"` // More messages available after this batch
}
BatchEvent - Server sends multiple messages in a batch
type BroadcastMessage ¶
BroadcastMessage represents a message to be broadcast to subscribers
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection represents a WebSocket client connection
type ErrorEvent ¶
ErrorEvent - Server sends an error to client
type Event ¶
type Event struct {
Type EventType `json:"type"`
Timestamp int64 `json:"timestamp"` // Unix milliseconds
Data json.RawMessage `json:"data,omitempty"`
}
Event represents a WebSocket message envelope
func NewBatchEvent ¶
func NewBatchEvent(threadID string, messages []MessageEvent, hasMore bool) (*Event, error)
NewBatchEvent creates a batch event
func NewErrorEvent ¶
NewErrorEvent creates an error event
func NewInboxMessageEvent ¶
func NewInboxMessageEvent(msg *InboxMessageEvent) (*Event, error)
NewInboxMessageEvent creates an inbox message event
func NewMessageEvent ¶
func NewMessageEvent(msg *MessageEvent) (*Event, error)
NewMessageEvent creates a message event from a message
func NewTaskStreamEvent ¶
func NewTaskStreamEvent(stream *TaskStreamEvent) (*Event, error)
NewTaskStreamEvent creates a task stream event
func NewTelemetryEvent ¶
func NewTelemetryEvent(telem *TelemetryEvent) (*Event, error)
NewTelemetryEvent creates a telemetry event
func NewThreadStateEvent ¶
NewThreadStateEvent creates a thread state event
func ParseEvent ¶
ParseEvent parses an event from JSON bytes
type EventType ¶
type EventType string
EventType represents the type of WebSocket event
const ( EventTypeSubscribe EventType = "subscribe" EventTypeAck EventType = "ack" EventTypeMessage EventType = "message" EventTypeBatch EventType = "batch" EventTypeError EventType = "error" EventTypePing EventType = "ping" EventTypePong EventType = "pong" EventTypeThreadState EventType = "thread_state" EventTypeTelemetry EventType = "telemetry" // Process telemetry updates EventTypeInboxMessage EventType = "inbox_message" // Async inbox messages EventTypeTaskStream EventType = "task_stream" // Task execution streaming events )
type InboxMessageEvent ¶
type InboxMessageEvent struct {
ID string `json:"id"`
MessageID string `json:"message_id"`
CorrelationID string `json:"correlation_id,omitempty"`
FromAgent string `json:"from_agent"`
ToInbox string `json:"to_inbox"`
MessageType string `json:"message_type"`
Title string `json:"title"`
Payload string `json:"payload,omitempty"`
Status string `json:"status"`
CreatedAt int64 `json:"created_at"`
}
InboxMessageEvent - Server sends async inbox message updates
type MessageEvent ¶
type MessageEvent struct {
ID string `json:"id"`
ThreadID string `json:"thread_id"`
MessageSeq int `json:"message_seq"`
CreatedAt int64 `json:"created_at"`
FromType string `json:"from_type"`
FromID string `json:"from_id"`
ToType string `json:"to_type"`
ToID string `json:"to_id"`
Kind string `json:"kind"`
Subject string `json:"subject,omitempty"`
Content string `json:"content"`
MetadataJSON string `json:"metadata_json,omitempty"`
}
MessageEvent - Server sends a single message to client
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server manages WebSocket connections and message broadcasting
func NewServer ¶
func NewServer(store messaging.MessageStore) *Server
NewServer creates a new WebSocket server
func (*Server) BroadcastInboxMessage ¶
func (s *Server) BroadcastInboxMessage(msg *messaging.InboxMessage)
BroadcastInboxMessage broadcasts an inbox message to ALL connected clients This is used for real-time notification of new async messages
func (*Server) BroadcastMessage ¶
BroadcastMessage broadcasts a message to all subscribers of a thread
func (*Server) BroadcastTaskEvent ¶
func (s *Server) BroadcastTaskEvent(stream *TaskStreamEvent)
BroadcastTaskEvent broadcasts a task streaming event to ALL connected clients This is used for real-time task execution monitoring
func (*Server) BroadcastTelemetry ¶
func (s *Server) BroadcastTelemetry(telem *TelemetryEvent)
BroadcastTelemetry broadcasts a telemetry update to ALL connected clients This is used for real-time process monitoring updates
func (*Server) GetConnectionCount ¶
GetConnectionCount returns the number of active connections
func (*Server) HandleWebSocket ¶
func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request)
HandleWebSocket handles WebSocket upgrade requests
type SubscribeEvent ¶
type SubscribeEvent struct {
ThreadID string `json:"thread_id"`
FromSeq int `json:"from_seq"` // Resume from this sequence (0 = from beginning)
}
SubscribeEvent - Client subscribes to a thread
func ParseSubscribe ¶
func ParseSubscribe(event *Event) (*SubscribeEvent, error)
ParseSubscribe parses a subscribe event
type Subscription ¶
Subscription tracks a client's subscription to a thread
type TaskStreamEvent ¶
type TaskStreamEvent struct {
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
StreamType TaskStreamEventType `json:"stream_type"`
TurnNum int `json:"turn_num,omitempty"`
Text string `json:"text,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ToolInput string `json:"tool_input,omitempty"`
ToolOutput string `json:"tool_output,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
Status string `json:"status,omitempty"` // running, completed, failed
TokensIn int `json:"tokens_in,omitempty"`
TokensOut int `json:"tokens_out,omitempty"`
Cost float64 `json:"cost,omitempty"`
DurationSec int `json:"duration_sec,omitempty"`
// Context fields for task identification (added for dashboard event queue)
Workspace string `json:"workspace,omitempty"` // Working directory path
Directive string `json:"directive,omitempty"` // Initial user prompt (truncated)
DirectiveFull string `json:"directive_full,omitempty"` // Full directive (for detail views)
AgentID string `json:"agent_id,omitempty"` // Agent identifier (e.g., "design-doc-creator")
SourceType string `json:"source_type,omitempty"` // Event source: coordinator, eval, github, direct
}
TaskStreamEvent - Server sends task execution streaming events
type TaskStreamEventType ¶
type TaskStreamEventType string
TaskStreamEventType represents the type of task streaming event
const ( TaskStreamTurnStart TaskStreamEventType = "turn_start" TaskStreamText TaskStreamEventType = "text" TaskStreamToolUse TaskStreamEventType = "tool_use" TaskStreamToolResult TaskStreamEventType = "tool_result" TaskStreamTurnEnd TaskStreamEventType = "turn_end" TaskStreamError TaskStreamEventType = "error" TaskStreamStatus TaskStreamEventType = "status" // Task status changes )
type TelemetryEvent ¶
type TelemetryEvent struct {
InstanceID string `json:"instance_id"`
PID int `json:"pid"`
Turns int `json:"turns"`
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
Cost float64 `json:"cost"`
Status string `json:"status"` // running, completed, error
DurationSec int `json:"duration_sec"`
}
TelemetryEvent - Server sends process telemetry updates