vizpipe

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: AGPL-3.0 Imports: 4 Imported by: 0

Documentation

Overview

Package vizpipe implements the live pipeline visualization event channel for the BubbleFish Nexus dashboard. Events are sent via a lossy buffered channel and broadcast to SSE clients.

INVARIANT: The visualization channel NEVER blocks hot paths. The channel is lossy by design — if the channel is full, events are dropped and a metric is incremented.

Reference: Tech Spec Section 13.2, Phase R-21.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MarshalSSE

func MarshalSSE(e Event) ([]byte, error)

MarshalSSE encodes an Event as an SSE data line.

Types

type DropMetric

type DropMetric interface {
	Inc()
}

DropMetric is the interface for the dropped events counter.

type Event

type Event struct {
	Timestamp   time.Time   `json:"ts"`
	RequestID   string      `json:"request_id"`
	Source      string      `json:"source"`
	Op          string      `json:"op"` // "WRITE" or "QUERY"
	Subject     string      `json:"subject"`
	ActorType   string      `json:"actor_type"` // "user", "agent", "system"
	Status      string      `json:"status"`     // "ALLOWED", "FILTERED", "DENIED"
	Labels      []string    `json:"labels"`
	ResultCount int         `json:"result_count"`
	TotalMs     float64     `json:"total_ms"`
	Stages      []StageInfo `json:"stages"` // nil for WRITE
}

Event represents a pipeline visualization event matching the dashboard contract shape exactly. One event per request. Reference: dashboard-contract.md GET /api/viz/events.

type Pipe

type Pipe struct {
	// contains filtered or unexported fields
}

Pipe is the lossy visualization event pipe. Emit from the query path (non-blocking). SSE clients subscribe to receive events.

func New

func New(capacity int, metric DropMetric, logger *slog.Logger) *Pipe

New creates a Pipe with a lossy buffered channel of the given capacity.

func (*Pipe) Emit

func (p *Pipe) Emit(e Event)

Emit sends an event to the visualization channel. It NEVER blocks — if the channel is full, the event is dropped and the metric is incremented.

INVARIANT: Called on the query hot path. Must be non-blocking.

func (*Pipe) Start

func (p *Pipe) Start()

Start launches the dispatcher goroutine that fans out events to SSE clients.

func (*Pipe) Stop

func (p *Pipe) Stop()

Stop shuts down the dispatcher. Safe to call multiple times (sync.Once).

func (*Pipe) Subscribe

func (p *Pipe) Subscribe() (<-chan Event, func())

Subscribe registers a new SSE client and returns a channel to receive events and an unsubscribe function. The channel has a small buffer; slow clients will miss events (lossy).

type StageInfo

type StageInfo struct {
	Stage int     `json:"stage"`
	Name  string  `json:"name"`
	Ms    float64 `json:"ms"`
	Hit   bool    `json:"hit"`
}

StageInfo describes a single pipeline stage within a request. Reference: dashboard-contract.md viz/events stages[] element.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL