events

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package events implements the in-memory pub/sub hub and SSE handler that fan tool-call, quota, and lifecycle events out to the UI.

All routing is in-process: publishers (tailers, hook handlers, attention engine, quota ingest) call Hub.Publish; subscribers (SSE handlers, the attention engine) call Hub.Subscribe. The hub never blocks publishers — slow consumers drop events and increment a per-sub counter.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Handler

func Handler(h *Hub, filter string) http.HandlerFunc

Handler returns an http.HandlerFunc that streams hub events as an SSE response. filter == "" streams the global feed; otherwise only events whose Session matches filter are delivered.

Honours the Last-Event-ID request header for ring replay. Sends a keepalive comment every sseKeepalive so corporate proxies don't time the stream out. Exits when the client disconnects.

Types

type Event

type Event struct {
	ID      string          `json:"id"`
	Type    string          `json:"type"`
	Session string          `json:"session,omitempty"`
	Payload json.RawMessage `json:"payload"`
	// contains filtered or unexported fields
}

Event is a single message routed through the hub. ID is assigned by the hub at Publish time if empty: "<unix-nano>-<seq>" where seq is a per-second monotonic counter so bursts within the same nanosecond remain unique and ordered.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the in-process pub/sub fan-out plus per-session ring buffer.

func NewHub

func NewHub(ringSize int) *Hub

NewHub returns a hub with the given per-ring capacity. ringSize <= 0 uses the default (500).

func (*Hub) Publish

func (h *Hub) Publish(e Event)

Publish appends e to the appropriate rings and fans out to subscribers. Never blocks: a full subscriber channel causes the event to be dropped for that sub (drop counter incremented; WARN logged at most once/min).

func (*Hub) Snapshot

func (h *Hub) Snapshot(filter string) []Event

Snapshot returns a chronological copy of the ring for filter (empty string = global ring). Caller owns the returned slice. Used by REST seed endpoints (/api/feed, /api/sessions/{name}/feed) so first paint renders historical events immediately — hub.Subscribe returns no replay on empty Last-Event-ID, and we don't want fresh browser tabs to stare at an empty list until the next publish.

func (*Hub) Stats

func (h *Hub) Stats() Stats

Stats returns a point-in-time snapshot of hub counters and ring sizes.

func (*Hub) Subscribe

func (h *Hub) Subscribe(filter, since string) (*Sub, []Event)

Subscribe registers a new subscriber and returns it together with the replay slice from the appropriate ring.

filter == "" subscribes to the global stream (every event); a non-empty filter restricts delivery to events whose Session matches.

since is a Last-Event-ID value: the replay slice contains buffered events strictly after that ID, in chronological order. since == "" returns an empty slice (start live). If since predates the ring's oldest entry the entire ring snapshot is returned and Lost reports true so the caller can emit a "lost" marker to the SSE client.

type Stats

type Stats struct {
	Published   uint64         `json:"published"`
	Dropped     uint64         `json:"dropped"`
	Subscribers int            `json:"subscribers"`
	RingSizes   map[string]int `json:"ring_sizes"`
}

Stats describes hub state for /health.

type Sub

type Sub struct {
	// contains filtered or unexported fields
}

Sub is a hub subscription. Consumers read events off Events(); the hub drops events for this sub when its channel is full.

func (*Sub) Close

func (s *Sub) Close()

Close removes the sub from the hub and closes its channel. Idempotent.

func (*Sub) Dropped

func (s *Sub) Dropped() uint64

Dropped returns the number of events that were dropped for this sub because its channel was full when the publisher tried to enqueue.

func (*Sub) Events

func (s *Sub) Events() <-chan Event

Events returns the receive channel. The channel is closed when Close is called or the hub is shut down.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL