live

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package live streams real-time machine events to browser clients.

The Meticulous machine speaks Engine.IO v4 (the transport under socket.io). This package contains:

  • Client: a minimal Engine.IO v4 + socket.io v4 consumer that subscribes to the machine's "status" and "sensors" event streams over WebSocket.
  • Hub: fan-out from one machine client to many connected browsers.

We only need to RECEIVE events; we never emit socket.io packets back, so the implementation is substantially smaller than a general socket.io library.

Hub fans events out from the machine client to many browser websockets.

Recorder captures live shot samples from the Hub and, on shot end, persists them as a shot row and (optionally) kicks off an AI analysis.

The machine's /api/v1/history endpoint is the canonical source of truth but it can lag by many minutes on a busy machine. Subscribing to the live event stream lets us store the shot the instant the user releases the paddle — history sync, when it arrives, upserts by id and overwrites our row with the richer machine-side copy. Analyses are keyed on (shot_id, model) so re-analyzing after the upsert is idempotent.

Index

Constants

View Source
const MinSamplesToSave = 20

MinSamplesToSave is the threshold below which we treat a captured trace as noise rather than a real shot. A flush/flush-to-cup or a tare tap will produce a handful of 'extracting=true' frames; we don't want to save those as shots.

Variables

This section is empty.

Functions

This section is empty.

Types

type AnalysisTrigger

type AnalysisTrigger func(shotID string)

AnalysisTrigger runs an analysis for the given shot id asynchronously. Implemented by a small closure in main.go that wires the analyzer + store together — kept as an interface so the live package doesn't depend on internal/ai or internal/settings.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client consumes events from one machine over Engine.IO v4 WebSocket.

func NewClient

func NewClient(machineURL string, out chan<- Event) *Client

NewClient returns a Client that publishes events to out. The caller owns out.

func (*Client) Run

func (c *Client) Run(ctx context.Context)

Run connects and re-connects with exponential backoff until ctx ends.

func (*Client) State

func (c *Client) State() State

State returns a point-in-time snapshot.

type Event

type Event struct {
	Name string          `json:"name"`           // "status" | "sensors" | "connected" | "disconnected"
	At   time.Time       `json:"at"`             // server-side receive time
	Data json.RawMessage `json:"data,omitempty"` // raw payload, as sent by the machine
}

Event is a single machine event forwarded to browser clients.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub owns the machine Client and multiplexes its events to browser clients.

func NewHub

func NewHub(ctx context.Context, machineURL string) *Hub

NewHub constructs a Hub pointing at the given machine URL and starts the client goroutine. Call Close to tear down.

func (*Hub) ServeWS

func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request)

ServeWS upgrades an HTTP request to a browser WebSocket and streams events.

func (*Hub) State

func (h *Hub) State() State

State returns the current machine-connection state.

func (*Hub) Subscribe

func (h *Hub) Subscribe() (<-chan Event, func())

Subscribe registers an in-process consumer for machine events (e.g. a shot recorder). Browser clients go through ServeWS; this is for Go callers that want a typed channel. The returned cancel func removes the subscription and closes the channel — always call it.

Unlike the internal subscribe() used by ServeWS, this does not replay last-known events: an internal recorder wants a clean forward stream, not a retroactive re-delivery of the previous shot's final frame.

type LiveShot

type LiveShot struct {
	ID          string
	Time        float64 // unix seconds with ms fraction
	Name        string
	ProfileID   string
	ProfileName string
	// Samples is a JSON array matching the /api/v1/history per-shot `data`
	// shape so downstream code (analyzer, UI) reads it without branching.
	Samples json.RawMessage
	// Profile is the loaded profile JSON if the machine provided one as
	// a string on the status event; may be null.
	Profile json.RawMessage
}

LiveShot is the payload SaveLiveShot stores.

type Recorder

type Recorder struct {
	// contains filtered or unexported fields
}

Recorder subscribes to a Hub, buffers samples per-shot, and flushes on shot end. Create with NewRecorder; call Run in a goroutine.

func NewRecorder

func NewRecorder(hub Subscriber, sink ShotSink, analyze AnalysisTrigger) *Recorder

NewRecorder wires a recorder. analyze may be nil.

func (*Recorder) Run

func (r *Recorder) Run(ctx context.Context)

Run subscribes to the hub and processes events until ctx ends.

func (*Recorder) WithShotFinishedHook

func (r *Recorder) WithShotFinishedHook(hook ShotFinishedHook) *Recorder

WithShotFinishedHook attaches a hook that fires after a live shot is saved. Returns the recorder so it can be chained in main.go's wiring.

type ShotFinishedHook

type ShotFinishedHook func(shotID, name string)

ShotFinishedHook fires after a live shot has been successfully saved. Used to trigger push notifications; nil disables the hook. Kept as a plain function so the live package stays unaware of internal/push.

type ShotSink

type ShotSink interface {
	// SaveLiveShot inserts a shot captured from the live stream iff no
	// row exists for id. It must not clobber a row synced from the
	// canonical /api/v1/history endpoint.
	SaveLiveShot(ctx context.Context, shot LiveShot) error
}

ShotSink is what the recorder needs from the shot store: a narrow interface so tests don't have to spin up SQLite.

type State

type State struct {
	Connected   bool      `json:"connected"`
	LastConnect time.Time `json:"last_connect"`
	LastError   string    `json:"last_error,omitempty"`
	MachineURL  string    `json:"machine_url"`
}

State is the client's current observable state.

type Subscriber

type Subscriber interface {
	Subscribe() (<-chan Event, func())
}

Subscriber is the subset of *Hub the recorder uses. Extracted so tests can feed synthetic event streams without constructing a real Hub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL