Documentation
¶
Overview ¶
Package daemon implements the long-lived ppz daemon: IPC server, on-disk state, NATS connection, and HTTP client to ppz-server.
Index ¶
- Constants
- Variables
- func Call(sock, method string, params, result any) error
- func ClassifyHeartbeatStatus(last time.Time, now time.Time, intervalSec int) string
- func CursorKey(accountID, handle, channel string) string
- func IsAlive(pid int) bool
- func PIDFromHome(home string) int
- type Credentials
- type Daemon
- type HeartbeatCache
- type HeartbeatEntry
- type NATSEvent
- type NATSEventRing
- type OnRefreshedFn
- type PatternHit
- type RefreshFn
- type RefreshLoop
- func (r *RefreshLoop) Current() (jwt, seed string)
- func (r *RefreshLoop) JWTExp() int64
- func (r *RefreshLoop) LastRefreshAt() time.Time
- func (r *RefreshLoop) RefreshNowIfDue(ctx context.Context, now time.Time) (bool, error)
- func (r *RefreshLoop) Start(ctx context.Context, jwt, seed string, expUnix int64) error
- func (r *RefreshLoop) Stop()
- type State
- func (s *State) AccountID() string
- func (s *State) AccountName() string
- func (s *State) ClearCurrent(session string) error
- func (s *State) ClearCurrentForHandle(handle string) error
- func (s *State) ClearNamespace(session string) error
- func (s *State) Credentials() (*Credentials, bool)
- func (s *State) Current(session string) string
- func (s *State) CurrentNamespace(session string) string
- func (s *State) ForgetPipe(handle string)
- func (s *State) HandleManifold(handle string) string
- func (s *State) Home() string
- func (s *State) KeyPrefix() string
- func (s *State) KnowsPipe(handle string) bool
- func (s *State) LoadFromDisk() error
- func (s *State) LoginCheck() string
- func (s *State) RememberPipe(handle string)
- func (s *State) RememberSource(handle, manifold string)
- func (s *State) ResetPipes(handles []string)
- func (s *State) ResetSources(handles []string, manifolds map[string]string)
- func (s *State) SetAccountID(id string)
- func (s *State) SetCurrent(session, handle string) error
- func (s *State) SetLogin(creds Credentials, accountID, accountName, keyPrefix string) error
- func (s *State) SetLoginCheck(state string)
- func (s *State) SetNamespace(session, namespace string) error
Constants ¶
const NATSEventSchemaVersion = 1
NATSEventSchemaVersion is the on-disk + on-wire schema version stamped on every NATSEvent. Bumped when fields are renamed or semantics change; new fields with json:",omitempty" do not require a bump. Reader contract is documented in docs/diagnostics.md.
Variables ¶
var ErrCredsRequired = errors.New("credentials required")
ErrUnauthorized is what RefreshFn returns when the bearer was revoked / expired — distinct from transient network failures (which the loop retries). Triggers OnUnauthorized + stop.
Functions ¶
func Call ¶
Call sends one request to the daemon over a fresh connection and decodes either result or error.
func ClassifyHeartbeatStatus ¶ added in v0.33.0
ClassifyHeartbeatStatus is the exported alias of classifyHeartbeatStatus for callers outside the daemon package (currently `ppz who`'s renderer in internal/cli). Keeping the rule in one place ensures the colour boundaries and the filter boundaries can never drift apart.
func PIDFromHome ¶
PIDFromHome reads $PPZ_HOME/daemon.pid. Returns 0 if absent.
Types ¶
type Credentials ¶
type Credentials struct {
URL string `json:"url"`
APIKey string `json:"api_key"`
AccountID string `json:"account_id,omitempty"`
AccountName string `json:"account_name,omitempty"`
// Auth V2 Phase 3 — short-lived NATS user credentials.
// Re-fetched periodically by the daemon's refresh goroutine.
NATSUserJWT string `json:"nats_user_jwt,omitempty"`
NATSUserSeed string `json:"nats_user_seed,omitempty"`
}
Credentials are persisted at $PPZ_HOME/credentials (mode 0600). AccountID + AccountName are stored alongside URL+APIKey so a SIGHUP / file-poller reload doesn't drop the resolved org info (the alternative would be re-calling /auth/exchange after every reload).
type Daemon ¶
type Daemon struct {
Home string
Sock string
State *State
HTTP *http.Client
NC *nats.Conn // nil until Login
NATSURL string
Cursors *cursors
// Phase 3.5 — JWT refresh loop. Started on Login (and restored
// in ensureNATS for daemon restarts), holds the latest minted
// (jwt, seed) for the current org, and re-runs /auth/exchange
// at exp-30s. nats.Connect's UserJWT callback reads from
// Refresh.Current() so reconnects pick up fresh credentials.
// Stopped on Logout / replaced on Login.
Refresh *RefreshLoop
// Phase 0 (agent hardening) — short tail of NATS connection-state
// transitions. Surfaced by `ppz status` (latest state) and
// `ppz diagnostics` (full ring). Initialised in New() so the handlers
// registered on the very first nats.Connect have a non-nil ring
// to append to.
NATSEvents *NATSEventRing
// Heartbeats holds the most recent <handle>.heartbeat payload per
// source, populated by handleSend / handleSendBatch on the fly.
// Read by `ppz who`. Memory-only — cleared on daemon restart.
Heartbeats *HeartbeatCache
// Follows tracks live IPC conns that handleRead is streaming
// JetStream events on (Follow: true). Used by swapNC to evict
// stale follows when the NATS connection is replaced — the old
// consumers go silent and the CLI needs to redial against the
// new NC. See follow_registry.go.
Follows *followRegistry
}
Daemon is the singleton process per $PPZ_HOME.
type HeartbeatCache ¶ added in v0.33.0
type HeartbeatCache struct {
// contains filtered or unexported fields
}
HeartbeatCache stores the latest heartbeat per source handle in memory. Lifetime is the daemon process — a daemon restart clears every entry, and the next beat from each agent re-populates it (so `ppz who` may show "offline" for up to one interval after a daemon restart). That's deliberate: persistent cross-restart state would need DB/server-side support, which is intentionally deferred.
func NewHeartbeatCache ¶ added in v0.33.0
func NewHeartbeatCache() *HeartbeatCache
func (*HeartbeatCache) Snapshot ¶ added in v0.33.0
func (c *HeartbeatCache) Snapshot() []HeartbeatEntry
Snapshot returns all entries sorted by handle (ASCII order). Returns an empty slice (never nil) so callers can range freely.
type HeartbeatEntry ¶ added in v0.33.0
type HeartbeatEntry struct {
Handle string
Payload string // verbatim heartbeat JSON, as published by the pty wrapper
ArrivedAt time.Time // wall-clock time the daemon received the publish (not the payload's ts)
}
HeartbeatEntry is one row of the daemon's heartbeat cache — effectively "the most recent heartbeat we forwarded for this source". `ppz who` formats one row per entry.
type NATSEvent ¶ added in v0.26.0
type NATSEvent struct {
V int `json:"v"`
Type string `json:"type"`
At time.Time `json:"at"`
Caller string `json:"caller,omitempty"`
NCID string `json:"nc_id,omitempty"`
JWTExp int64 `json:"jwt_exp,omitempty"`
Reason string `json:"reason,omitempty"`
}
NATSEvent is one entry in the daemon's NATS connection-state log.
Type vocabulary (closed set — extend with care, document in docs/diagnostics.md):
- "connect" — nats.go ConnectHandler fired (initial / reconnect success)
- "disconnect" — nats.go DisconnectErrHandler fired
- "reconnect" — nats.go ReconnectHandler fired
- "closed" — nats.go ClosedHandler fired
- "swap" — daemon code called swapNC (Caller names which fn)
- "warn" — non-fatal failure (e.g. resubscribe error)
- "daemon_start" / "daemon_stop" — lifecycle hooks
Caller distinguishes daemon-initiated transitions from nats.go-initiated ones: "nats.go" for library callbacks, the Go function name otherwise ("ensureNATS", "OnRefreshed-callback", "handleLogin", "watchState"). This is the single most useful field for "who closed this connection?" — see the burst-swap-storm pattern.
NCID is the connection's pointer address ("0x14000123abc"), letting a reader trace which logical NC each event references across a rotation.
JWTExp is the unix-seconds `exp` of the JWT in use at event time. 0 means unknown (lifecycle / warn events, or pre-login). Used by the post-rotation-auth-violation pattern.
type NATSEventRing ¶ added in v0.26.0
type NATSEventRing struct {
// contains filtered or unexported fields
}
NATSEventRing is a fixed-capacity, append-only, drop-oldest ring of NATSEvent records. Used by the daemon to surface a recent tail of connection-state transitions through `ppz diagnostics`.
Thread-safe: every accessor takes mu. The ring lives on Daemon and is initialised in New() — before any nats.Connect call — so the handlers we register can append without nil-checking.
func (*NATSEventRing) Append ¶ added in v0.26.0
func (r *NATSEventRing) Append(ev NATSEvent)
Append records one event. When the ring is full the oldest entry is dropped — diag's value is precisely catching transient events that happened "a few minutes ago", so we keep the tail and lose the head. The full history (beyond ring cap) lives on disk in nats-events.jsonl; see nats_events_persistence.go.
V is stamped to NATSEventSchemaVersion if zero, so callers don't have to remember to set it on every Append.
func (*NATSEventRing) Snapshot ¶ added in v0.26.0
func (r *NATSEventRing) Snapshot() []NATSEvent
Snapshot returns a copy of the current events in chronological order (oldest first). Safe to retain — the returned slice is independent of the ring's internal storage.
type OnRefreshedFn ¶ added in v0.36.3
OnRefreshedFn fires after refreshNow successfully swaps in fresh credentials. The daemon hooks this to proactively rebuild its NATS connection within the 60s overlap window (User JWT `nbf` is set 30s before issuance, so old + new are both valid for ~60s around the rotation point), eliminating the disconnect/reconnect gap that otherwise lands when the server kicks the live connection at the old JWT's exp — and the transient E_NATS_UNREACHABLE that a send running inside that gap surfaces to callers.
Runs synchronously on the refresh goroutine. Implementations must not block on anything that could deadlock with the goroutine that called RefreshNowIfDue.
type PatternHit ¶ added in v0.37.0
type PatternHit struct {
Name string `json:"name"`
At time.Time `json:"at"`
Detail string `json:"detail"`
}
PatternHit is one match of one detector against the event window. Name is the stable detector identifier (kebab-case; doubles as the docs anchor). At is the timestamp of the first event in the match — used to sort hits chronologically. Detail is the human-readable one-liner shown in CLI output and the --json pattern array.
type RefreshFn ¶
type RefreshFn func(ctx context.Context, accountID string) (jwt, seed string, expUnix int64, err error)
RefreshFn is the work the refresh loop calls when a JWT is about to expire. Implementations re-run POST /api/v1/auth/exchange and return the new (jwt, seed). accountID lets a multi-org daemon route to the right account.
type RefreshLoop ¶
type RefreshLoop struct {
AccountID string
Refresh RefreshFn
OnRefreshed OnRefreshedFn
// contains filtered or unexported fields
}
RefreshLoop monitors one (org, JWT) pair and refreshes it before expiry. Concurrency: Current() may be called from any goroutine; Start/Stop must be called from the same goroutine.
func (*RefreshLoop) Current ¶
func (r *RefreshLoop) Current() (jwt, seed string)
Current returns the freshest (jwt, seed) — used by nats.UserJWT() callbacks on every NATS (re)connect.
func (*RefreshLoop) JWTExp ¶ added in v0.37.0
func (r *RefreshLoop) JWTExp() int64
JWTExp returns the unix-seconds `exp` claim of the cached JWT, or 0 if no credentials have been cached yet. Used by natsObserveOptions to stamp the JWT exp onto every connection-state event — the post-rotation-auth-violation pattern relies on this to correlate disconnects with rotation timing. Safe for concurrent callers; nil- receiver returns 0 so callers can pass r.JWTExp without nil-checking.
func (*RefreshLoop) LastRefreshAt ¶ added in v0.17.0
func (r *RefreshLoop) LastRefreshAt() time.Time
LastRefreshAt returns when the loop last accepted fresh credentials. Start counts as the first refresh because its credentials came from /auth/exchange immediately before the loop was started.
func (*RefreshLoop) RefreshNowIfDue ¶ added in v0.17.9
RefreshNowIfDue refreshes synchronously when the cached credential is already inside its refresh window. It covers machines waking from sleep: the timer goroutine may not have run yet, but the next command must not continue with an expired JWT.
func (*RefreshLoop) Start ¶
Start begins the refresh goroutine with an initial credential. expUnix is the JWT's `exp` claim in unix seconds.
func (*RefreshLoop) Stop ¶
func (r *RefreshLoop) Stop()
Stop halts the refresh goroutine. Idempotent.
type State ¶
type State struct {
// contains filtered or unexported fields
}
State is the daemon's in-memory mirror of on-disk credentials + current handle. "current" is keyed by session id (tty / $PPZ_SESSION) so each terminal window has its own current source — same scoping as cursors, avoids the "new terminal silently inherits a stale current set hours ago in another window" footgun.
func (*State) AccountName ¶ added in v0.30.0
func (*State) ClearCurrent ¶
ClearCurrent drops this session's current. Used by `ppz disconnect`. Idempotent — clearing an already-clear session is a no-op.
func (*State) ClearCurrentForHandle ¶ added in v0.21.0
ClearCurrentForHandle drops every session whose current equals handle. Called after a source destroy so stale per-session pointers don't linger.
func (*State) ClearNamespace ¶ added in v0.31.0
ClearNamespace drops this session's namespace. Idempotent.
func (*State) Credentials ¶
func (s *State) Credentials() (*Credentials, bool)
func (*State) CurrentNamespace ¶ added in v0.31.0
CurrentNamespace returns the per-session namespace, empty when unset. Phase 1.5 (locked decision #18 / #20).
func (*State) ForgetPipe ¶ added in v0.21.0
ForgetPipe removes a handle from the known-pipes cache. Called after a source is destroyed so the cache doesn't return stale hits.
func (*State) HandleManifold ¶ added in v0.31.1
HandleManifold returns the cached manifold for a known source handle, or "" (root) if the handle isn't cached. Phase 1.5.1.
func (*State) LoadFromDisk ¶
LoadFromDisk reads credentials and current from $PPZ_HOME. Called at startup and on SIGHUP. Missing files mean "not logged in" / "no current".
`current.json` is the session→handle map. The legacy plain-text `current` file (pre-per-session) is migrated into session "default" if both exist.
func (*State) LoginCheck ¶
LoginCheck returns the cached server-validation result. "" means "not observed yet" — callers (e.g. status) should probe.
func (*State) RememberPipe ¶
func (*State) RememberSource ¶ added in v0.31.2
RememberSource is the Phase 1.5.2 superset of RememberPipe — caches both the known-handle bit AND the handle→manifold mapping. Callers that have just minted a source should prefer this so later same-session sends don't trigger a refresh just to learn the source's manifold.
func (*State) ResetPipes ¶
func (*State) ResetSources ¶ added in v0.31.1
ResetSources replaces both the known-handles set AND the per-handle manifold cache from a server-supplied source list. Preferred over ResetPipes for callers that have full cliproto.Source rows. Phase 1.5.1.
func (*State) SetAccountID ¶ added in v0.30.0
func (*State) SetCurrent ¶
func (*State) SetLogin ¶
func (s *State) SetLogin(creds Credentials, accountID, accountName, keyPrefix string) error
func (*State) SetLoginCheck ¶
SetLoginCheck records the latest server-validation result. Called from callServer based on response status. Idempotent — same value twice is fine, but transitions (ok→invalid, invalid→ok) are kept.
func (*State) SetNamespace ¶ added in v0.31.0
SetNamespace stores the per-session namespace. Empty value clears (same semantics as ClearNamespace). Persisted to disk.
Source Files
¶
- ack_emit.go
- cursors.go
- daemon.go
- follow_registry.go
- handlers.go
- handlers_diag.go
- handlers_who.go
- heartbeat_cache.go
- heartbeat_owners.go
- heartbeat_status.go
- heartbeat_subscriber.go
- ipc.go
- lifecycle.go
- list_snapshot.go
- list_watch.go
- nats_event_patterns.go
- nats_events.go
- nats_events_persistence.go
- publish.go
- read.go
- refresh.go
- sender_resolve.go
- state.go
- watcher.go