Documentation
¶
Overview ¶
Package events implements the in-memory pub/sub hub and SSE handler that fan tool-call, quota, and lifecycle events out to the UI.
All routing is in-process: publishers (tailers, hook handlers, attention engine, quota ingest) call Hub.Publish; subscribers (SSE handlers, the attention engine) call Hub.Subscribe. The hub never blocks publishers — slow consumers drop events and increment a per-sub counter.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Handler ¶
func Handler(h *Hub, filter string) http.HandlerFunc
Handler returns an http.HandlerFunc that streams hub events as an SSE response. filter == "" streams the global feed; otherwise only events whose Session matches filter are delivered.
Honours the Last-Event-ID request header for ring replay. Sends a keepalive comment every sseKeepalive so corporate proxies don't time the stream out. Exits when the client disconnects.
Types ¶
type Event ¶
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Session string `json:"session,omitempty"`
Payload json.RawMessage `json:"payload"`
// contains filtered or unexported fields
}
Event is a single message routed through the hub. ID is assigned by the hub at Publish time if empty: "<unix-nano>-<seq>" where seq is a per-second monotonic counter so bursts within the same nanosecond remain unique and ordered.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is the in-process pub/sub fan-out plus per-session ring buffer.
func NewHub ¶
NewHub returns a hub with the given per-ring capacity. ringSize <= 0 uses the default (500).
func (*Hub) Publish ¶
Publish appends e to the appropriate rings and fans out to subscribers. Never blocks: a full subscriber channel causes the event to be dropped for that sub (drop counter incremented; WARN logged at most once/min).
func (*Hub) Snapshot ¶
Snapshot returns a chronological copy of the ring for filter (empty string = global ring). Caller owns the returned slice. Used by REST seed endpoints (/api/feed, /api/sessions/{name}/feed) so first paint renders historical events immediately — hub.Subscribe returns no replay on empty Last-Event-ID, and we don't want fresh browser tabs to stare at an empty list until the next publish.
func (*Hub) Subscribe ¶
Subscribe registers a new subscriber and returns it together with the replay slice from the appropriate ring.
filter == "" subscribes to the global stream (every event); a non-empty filter restricts delivery to events whose Session matches.
since is a Last-Event-ID value: the replay slice contains buffered events strictly after that ID, in chronological order. since == "" returns an empty slice (start live). If since predates the ring's oldest entry the entire ring snapshot is returned and Lost reports true so the caller can emit a "lost" marker to the SSE client.
type Stats ¶
type Stats struct {
Published uint64 `json:"published"`
Dropped uint64 `json:"dropped"`
Subscribers int `json:"subscribers"`
RingSizes map[string]int `json:"ring_sizes"`
}
Stats describes hub state for /health.
type Sub ¶
type Sub struct {
// contains filtered or unexported fields
}
Sub is a hub subscription. Consumers read events off Events(); the hub drops events for this sub when its channel is full.
func (*Sub) Close ¶
func (s *Sub) Close()
Close removes the sub from the hub and closes its channel. Idempotent.