Documentation
¶
Overview ¶
Package watcher's Gmail adapter delivers normalized Events from a Gmail account via the Gmail Pub/Sub watch + streaming pull flow:
- users.Watch() registers a push target (Google Cloud Pub/Sub topic) that Gmail publishes change notifications to. The watch expires after ~7 days.
- Subscription.Receive subscribes to that topic via the cloud.google.com/go/pubsub client and blocks until ctx is cancelled. Google's client handles lease extension, flow control, and reconnects natively.
- Each envelope is shaped {emailAddress, historyId}. The handler calls users.history.list(startHistoryId=persistedWatchHistoryID) to fetch the new message IDs, then users.messages.get(format=metadata) to fetch From/Subject/ snippet/internalDate, normalizes those to Event, and sends on the channel.
- On success the Pub/Sub message is Acked. On 404 from history.list (stale historyId past Gmail's retention window) the handler falls back to the envelope's historyId, logs a warning, and Acks — dropping some events is acceptable per CONTEXT.md D-18. Malformed envelopes are Acked (not Nacked) so they cannot redeliver forever.
Plan 17-02 implements Setup, Listen (with a STUB renewalLoop that only waits on ctx.Done), Teardown, HealthCheck (minimal), processHistory, registerWatch, normalizeGmailMessage, the label filter, and the persistingTokenSource wrapper. The full renewalLoop body and expanded HealthCheck coverage land in Plan 17-03.
Index ¶
- Constants
- func AppendClientEntry(path, sender string, entry ClientEntry) error
- func AppendEventLog(name, entry string) error
- func BuildPrompt(event Event, clientsList map[string]ClientEntry, resultPath string) (string, error)
- func LayoutDir() (string, error)
- func LoadClientsJSON(path string) (map[string]ClientEntry, error)
- func MigrateLegacyWatchersDir() error
- func SaveState(name string, s *WatcherState) error
- func ScaffoldWatcherLayout() error
- func WatcherDir(name string) (string, error)
- type AdapterConfig
- type AgentDeckLaunchSpawner
- type Alert
- type ClientEntry
- type Clock
- type Config
- type Engine
- type EngineConfig
- type Event
- type GitHubAdapter
- type GmailAdapter
- type HealthBridge
- type HealthSample
- type HealthState
- type HealthStatus
- type HealthTracker
- type Notifier
- type NtfyAdapter
- type RouteResult
- type Router
- type SlackAdapter
- type TriageRequest
- type TriageSpawner
- type WatcherAdapter
- type WatcherState
- type WebhookAdapter
Constants ¶
const ( TriggerSilenceDetected = "silence_detected" TriggerErrorThresholdExceeded = "error_threshold_exceeded" TriggerAdapterTeardownUnexpected = "adapter_teardown_unexpected" )
Valid Trigger values.
const ( TriageMaxPerHour = 5 TriageWindow = 60 * time.Minute TriageQueueCap = 16 TriageReqChCap = 16 TriageReaperPoll = 5 * time.Second TriageTimeout = 10 * time.Minute )
Triage rate-limiting and queue constants (INTEL-03, D-10/10a/10b).
Variables ¶
This section is empty.
Functions ¶
func AppendClientEntry ¶
func AppendClientEntry(path, sender string, entry ClientEntry) error
AppendClientEntry atomically appends a sender→entry mapping to clients.json at the given path. It is safe to call concurrently from multiple goroutines.
Behavior:
- Wildcard senders ("*@domain") are rejected immediately — these must only be added by explicit human action (D-14, T-18-02).
- If the sender is already present, returns nil without modifying the file (idempotent, D-11 step 3).
- If clients.json does not exist, it is created with mode 0o600.
- The parent directory is created with mode 0o700 if it does not exist.
- The write is atomic via write-to-temp + os.Rename (POSIX, D-11, T-18-01).
- A stale <path>.tmp from a prior crash is silently overwritten by os.WriteFile (truncates) and then renamed away (T-18-04).
func AppendEventLog ¶ added in v1.6.0
AppendEventLog appends one Markdown line to <name>/task-log.md. Expected entry format: "## <RFC3339-ts> - <event_type>: <summary>" (no trailing newline — added here). Atomicity: O_APPEND on a single Write is POSIX-atomic for sizes <= PIPE_BUF (4096 on Linux, 512 on older BSD). The engine writerLoop is single-goroutine per engine instance so per-watcher serialization is guaranteed today; entries are truncated to <512 bytes by the caller (engine.go) so atomicity holds even if writerLoop becomes multi-goroutine.
func BuildPrompt ¶
func BuildPrompt(event Event, clientsList map[string]ClientEntry, resultPath string) (string, error)
BuildPrompt renders the triage prompt template with the given event, clients list, and absolute result path. The event body is truncated to maxBodyRunes if longer.
func LayoutDir ¶ added in v1.6.0
LayoutDir returns the root watcher dir (~/.agent-deck/watcher). Delegates to session.WatcherDir for single-source-of-truth.
func LoadClientsJSON ¶
func LoadClientsJSON(path string) (map[string]ClientEntry, error)
LoadClientsJSON reads a clients.json file from disk and returns the parsed entries. Returns an error if the file does not exist or contains invalid JSON.
Threat T-13-01: entries with empty conductor fields are logged as warnings at engine startup; the router itself does not reject them to preserve forward compatibility.
func MigrateLegacyWatchersDir ¶ added in v1.6.0
func MigrateLegacyWatchersDir() error
MigrateLegacyWatchersDir performs the one-shot ~/.agent-deck/watchers -> ~/.agent-deck/watcher rename and creates a relative compatibility symlink watchers -> watcher. Single-shot; subsequent calls no-op. SECURITY T-21-SL: uses os.Lstat (not os.Stat) and refuses if watcher/ is a symlink targeting outside the deck root.
func SaveState ¶ added in v1.6.0
func SaveState(name string, s *WatcherState) error
SaveState writes <name>/state.json atomically (write-temp-rename). Pattern copied from internal/session SaveWatcherMeta (atomic write-temp-rename).
func ScaffoldWatcherLayout ¶ added in v1.6.0
func ScaffoldWatcherLayout() error
ScaffoldWatcherLayout creates the singular watcher/ dir and writes default shared files if absent. Uses O_CREATE|O_EXCL for TOCTOU-safe idempotent writes.
func WatcherDir ¶ added in v1.6.0
WatcherDir returns ~/.agent-deck/watcher/<name> after validating name. T-21-PI mitigation.
Types ¶
type AdapterConfig ¶
type AdapterConfig struct {
// Type is the adapter type: "webhook", "ntfy", "github", "slack", "gmail"
Type string
// Name is the watcher name (used for logging and health tracking)
Name string
// Settings holds adapter-specific key-value pairs from the watcher.toml [source] section.
Settings map[string]string
}
AdapterConfig holds the configuration passed to a WatcherAdapter during Setup.
type AgentDeckLaunchSpawner ¶
type AgentDeckLaunchSpawner struct {
BinaryPath string
}
AgentDeckLaunchSpawner invokes `agent-deck launch` to spawn a triage Claude session. BinaryPath is resolved via session.FindAgentDeck() at engine startup (D-01/D-03).
func (AgentDeckLaunchSpawner) Spawn ¶
func (s AgentDeckLaunchSpawner) Spawn(ctx context.Context, req TriageRequest) (string, error)
Spawn creates the triage directory, builds the prompt, and exec's agent-deck launch. Returns an error without spawning if the binary does not exist (T-18-13).
type ClientEntry ¶
type ClientEntry struct {
// Conductor is the conductor name that handles events from this client
Conductor string `json:"conductor"`
// Group is the TUI group path to place sessions under (e.g., "client-a/inbox")
Group string `json:"group"`
// Name is a human-readable label for the client
Name string `json:"name"`
}
ClientEntry holds the routing configuration for a single client. Entries in clients.json are keyed by either exact email ("user@example.com") or wildcard domain ("*@example.com").
type Clock ¶
type Clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
NewTicker(d time.Duration) *time.Ticker
}
Clock abstracts time for rate-limiter and reaper determinism (D-26).
type Config ¶ added in v1.6.0
type Config struct {
Enabled bool
Channels []string
DebounceWindow time.Duration // defaults to 15*time.Minute when zero
}
Config controls HealthBridge behavior. Zero-value-safe.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine orchestrates the watcher event pipeline: adapter goroutines produce Events, a single-writer goroutine serializes DB writes via a buffered channel, dedup is handled by INSERT OR IGNORE, and the router determines event routing.
Lifecycle: NewEngine -> RegisterAdapter (1..N) -> Start -> Stop.
func NewEngine ¶
func NewEngine(cfg EngineConfig) *Engine
NewEngine creates a new Engine with the provided configuration. Call RegisterAdapter to add adapters, then Start to begin processing.
func (*Engine) EventCh ¶
EventCh returns a read-only channel of routed events for TUI consumption (D-20).
func (*Engine) HealthCh ¶
func (e *Engine) HealthCh() <-chan HealthState
HealthCh returns a read-only channel of health state updates for TUI consumption (D-20).
func (*Engine) PumpTriageQueue ¶
func (e *Engine) PumpTriageQueue()
PumpTriageQueue attempts to drain the triage queue by re-trying queued requests through the rate limiter. Called by the eviction ticker and by tests.
func (*Engine) RegisterAdapter ¶
func (e *Engine) RegisterAdapter(watcherID string, adapter WatcherAdapter, config AdapterConfig, maxSilenceMinutes int)
RegisterAdapter adds an adapter to the engine. Must be called before Start. watcherID is the statedb watcher ID used for SaveWatcherEvent persistence. maxSilenceMinutes is the threshold for silence detection in the health tracker.
func (*Engine) Start ¶
Start begins the event pipeline. For each registered adapter, it calls Setup, then launches an adapter goroutine. It also starts the single-writer goroutine and optionally the health check loop.
func (*Engine) Stop ¶
func (e *Engine) Stop()
Stop cancels all adapter contexts, calls Teardown on each adapter, waits for all goroutines to exit, and closes the exported channels (EventCh, HealthCh) so consumers receive (_, false) instead of blocking forever (V1.9 T5, critical-hunt #9). Safe to call multiple times — the close happens at most once via stopOnce.
type EngineConfig ¶
type EngineConfig struct {
// DB is the state database for event persistence and dedup.
DB *statedb.StateDB
// Router routes events to conductors based on sender.
Router *Router
// MaxEventsPerWatcher limits the number of stored events per watcher (pruning threshold).
MaxEventsPerWatcher int
// HealthCheckInterval controls how often adapter health checks run.
// Set to 0 to disable the health check loop (useful in tests).
HealthCheckInterval time.Duration
// Logger is the structured logger. Defaults to logging.ForComponent(logging.CompWatcher).
Logger *slog.Logger
// TriageSpawner launches triage sessions for unrouted events (INTEL-01, D-25).
// Defaults to AgentDeckLaunchSpawner when nil.
TriageSpawner TriageSpawner
// Clock abstracts time for the rate limiter and reaper (D-26).
// Defaults to realClock{} when nil.
Clock Clock
// TriageDir is the directory where triage session results are written.
// Defaults to $HOME/.agent-deck/triage/ when empty.
TriageDir string
// ClientsPath is the path to clients.json for triage hot-reload.
// Defaults to $HOME/.agent-deck/watcher/clients.json when empty.
ClientsPath string
// Profile is the agent-deck profile flag passed to spawned triage sessions.
// Defaults to AGENTDECK_PROFILE env var, then "default".
Profile string
}
EngineConfig holds the configuration for the watcher Engine.
type Event ¶
type Event struct {
// Source is the watcher adapter type (e.g., "webhook", "ntfy", "github", "slack", "gmail")
Source string `json:"source"`
// Sender is the normalized email or identifier of the event originator
Sender string `json:"sender"`
// Subject is a short human-readable summary of the event
Subject string `json:"subject"`
// Body is the full payload text of the event
Body string `json:"body"`
// Timestamp is the time the event occurred (from the source, not ingestion time)
Timestamp time.Time `json:"timestamp"`
// RawPayload holds the adapter-specific raw data for debugging and audit
RawPayload json.RawMessage `json:"raw_payload,omitempty"`
// CustomDedupKey overrides the computed SHA-256 DedupKey when non-empty.
// Used by adapters that need deterministic keys (e.g., Slack: "slack-{CHANNEL}-{TS}").
CustomDedupKey string `json:"custom_dedup_key,omitempty"`
// ParentDedupKey holds the dedup key of the parent event for thread replies.
// When non-empty, the engine looks up the parent's session_id for thread routing.
ParentDedupKey string `json:"parent_dedup_key,omitempty"`
// ThreadSessionID is populated by the engine's writerLoop when a thread reply
// is routed to an existing session. Empty means spawn a new session.
ThreadSessionID string `json:"thread_session_id,omitempty"`
// RoutedTo is populated by the engine's writerLoop with the conductor name
// from Router.Match(Sender), or "triage" / "" when no rule matches.
// Consumed by the TUI to deliver events into the conductor's tmux pane.
RoutedTo string `json:"routed_to,omitempty"`
}
Event is a normalized event from any watcher adapter. All fields use json tags for persistence and wire format compatibility.
func (Event) DedupKey ¶
DedupKey returns a deterministic hex-encoded SHA-256 hash of the event's source, sender, subject, and timestamp. The pipe delimiter prevents field-boundary collisions (e.g., sender="a|b" vs sender="a", subject="|b"). Identical events from the same source at the same time produce the same key.
type GitHubAdapter ¶
type GitHubAdapter struct {
// contains filtered or unexported fields
}
GitHubAdapter implements WatcherAdapter by running an HTTP server that accepts POST requests on /github and verifies X-Hub-Signature-256 HMAC-SHA256 signatures before normalizing GitHub webhook events to Event structs. The server binds to 127.0.0.1 by default (configurable via Settings["bind"]) to avoid accidental public exposure.
func (*GitHubAdapter) Addr ¶
func (a *GitHubAdapter) Addr() string
Addr returns the current listen address (thread-safe). Useful for tests when port 0 is used to get a random available port.
func (*GitHubAdapter) HealthCheck ¶
func (a *GitHubAdapter) HealthCheck() error
HealthCheck verifies the HTTP listener is accepting TCP connections.
func (*GitHubAdapter) Listen ¶
func (a *GitHubAdapter) Listen(ctx context.Context, events chan<- Event) error
Listen starts the HTTP server and blocks until the context is cancelled. On context cancellation, the server is gracefully shut down with a 5-second timeout.
func (*GitHubAdapter) Setup ¶
func (a *GitHubAdapter) Setup(_ context.Context, config AdapterConfig) error
Setup initializes the adapter with the HTTP server configuration but does NOT start the server. The server is started in Listen.
Settings:
- "secret": required webhook secret for HMAC-SHA256 verification
- "port": TCP port to listen on (default "18461")
- "bind": Bind address (default "127.0.0.1" per T-14-04)
func (*GitHubAdapter) Teardown ¶
func (a *GitHubAdapter) Teardown() error
Teardown is a no-op because the server is shut down in Listen via context cancellation.
type GmailAdapter ¶
type GmailAdapter struct {
// contains filtered or unexported fields
}
GmailAdapter implements WatcherAdapter against the Gmail Pub/Sub watch API. All fields are private; construction uses NewGmailAdapter().
func NewGmailAdapter ¶
func NewGmailAdapter() *GmailAdapter
NewGmailAdapter constructs a GmailAdapter with default time functions. Tests inject fakes by assigning a.nowFunc / a.afterFunc directly.
func (*GmailAdapter) HealthCheck ¶
func (a *GmailAdapter) HealthCheck() error
HealthCheck reports the adapter's ability to reach Gmail + Pub/Sub. Plan 17-02 covers the minimal path: returns a cached lastHealthErr (set by renewalLoop in Plan 17-03), exercises the token source, checks the Pub/Sub subscription, and flags a lapsed watch expiry. Plan 17-03 adds explicit unit coverage per branch.
func (*GmailAdapter) Listen ¶
func (a *GmailAdapter) Listen(ctx context.Context, events chan<- Event) error
Listen blocks on Subscription.Receive until ctx is cancelled. Each Pub/Sub envelope is decoded, used to drive a users.history.list + users.messages.get fan-out, and Acked on success (or on a 404 stale-history fallback). Transient Gmail errors result in Nack so Pub/Sub redelivers.
A sync.WaitGroup ensures the renewal goroutine is joined before Listen returns — even though the renewalLoop body is a STUB in Plan 17-02 (it only waits on ctx.Done), the WaitGroup is in place so Plan 17-03 can drop in the real body without changing the lifecycle contract. This prevents Pitfall 3 (renewal goroutine outliving Listen and surfacing as a goleak false positive).
func (*GmailAdapter) Setup ¶
func (a *GmailAdapter) Setup(ctx context.Context, config AdapterConfig) error
Setup loads OAuth credentials + token, builds gmail.Service and pubsub.Client (both authenticating with the same user token via persistingTokenSource), verifies the subscription exists, loads existing WatcherMeta, and conditionally calls users.Watch() if no watch is present or the existing watch expires in under 2 hours (D-11 threshold).
Settings keys:
- topic (required): projects/{projectID}/topics/{topic}
- subscription (required): projects/{projectID}/subscriptions/{sub}
- credentials_path: defaults to <watcher_dir>/credentials.json
- token_path: defaults to <watcher_dir>/token.json
- labels: optional comma-separated Gmail label filter
- account: optional informational Gmail address
func (*GmailAdapter) Teardown ¶
func (a *GmailAdapter) Teardown() error
Teardown closes the pubsub client so gRPC connections are released (required for goleak). It does NOT call users.Stop() per D-34 — stopping the watch would burn 50 quota units on every restart and Google treats users.Watch as idempotent on the next Setup.
type HealthBridge ¶ added in v1.6.0
type HealthBridge struct {
// contains filtered or unexported fields
}
HealthBridge subscribes to an engine health signal and fans alerts to a Notifier with per-(watcher x trigger) debounce. Resilient to notifier errors and panics.
func NewHealthBridge ¶ added in v1.6.0
func NewHealthBridge(cfg Config, notifier Notifier, source <-chan HealthState) *HealthBridge
NewHealthBridge constructs a bridge. Does not start it. Call Run with a context to begin consuming from source.
func (*HealthBridge) NotifyTeardown ¶ added in v1.6.0
func (b *HealthBridge) NotifyTeardown(watcherName string, unexpected bool)
NotifyTeardown is called when an adapter teardown is unexpected. Always passes through the same debounce path as regular health alerts.
type HealthSample ¶ added in v1.6.0
HealthSample is a single health snapshot stored in WatcherState.HealthWindow.
type HealthState ¶
type HealthState struct {
// WatcherName is the name of the watcher this state belongs to
WatcherName string
// Status is the overall health status
Status HealthStatus
// EventsPerHour is the rolling event rate over the last 60 minutes
EventsPerHour float64
// LastEventTime is the time of the most recently received event
LastEventTime time.Time
// ConsecutiveErrors is the number of consecutive HealthCheck errors since the last event
ConsecutiveErrors int
// Message is a human-readable explanation for Warning or Error states
Message string
}
HealthState is a snapshot of watcher health for display in the TUI and health alerts. It is emitted by HealthTracker.Check() and consumed by the engine in Phase 16.
type HealthStatus ¶
type HealthStatus string
HealthStatus is the overall health state of a watcher.
const ( // HealthStatusHealthy indicates the watcher is functioning normally. HealthStatusHealthy HealthStatus = "healthy" // HealthStatusWarning indicates a potential issue (e.g., silence or repeated errors). HealthStatusWarning HealthStatus = "warning" // HealthStatusError indicates the watcher is failing and requires attention. HealthStatusError HealthStatus = "error" )
type HealthTracker ¶
type HealthTracker struct {
// contains filtered or unexported fields
}
HealthTracker is a passive (no goroutine) health monitor for a single watcher. It is called from the engine's health check loop via Check(), and updated via RecordEvent() and RecordError() as events flow through the engine.
Thread-safe via mu; all public methods acquire appropriate locks.
func NewHealthTracker ¶
func NewHealthTracker(watcherName string, maxSilenceMinutes int) *HealthTracker
NewHealthTracker creates a HealthTracker for the named watcher. maxSilenceMinutes should come from WatcherSettings.GetMaxSilenceMinutes().
func (*HealthTracker) Check ¶
func (h *HealthTracker) Check() HealthState
Check evaluates the current health state and returns a HealthState snapshot. Per D-17, the status is computed as follows:
- Error: adapter unhealthy OR consecutiveErrors >= 10
- Warning: consecutiveErrors >= 3 OR silence beyond maxSilenceMinutes
- Healthy: otherwise
The health check is passive (no I/O); the caller (engine) drives the check loop.
func (*HealthTracker) EventsInLastHour ¶
func (h *HealthTracker) EventsInLastHour() int
EventsInLastHour returns the number of events received within the past hour. This uses a lazy read approach; old entries are pruned on RecordEvent.
func (*HealthTracker) RecordError ¶
func (h *HealthTracker) RecordError()
RecordError increments the consecutive error count. Call this when an adapter HealthCheck() returns an error.
func (*HealthTracker) RecordEvent ¶
func (h *HealthTracker) RecordEvent()
RecordEvent records a successfully received event. Resets consecutiveErrors, updates lastEventTime, and prunes old timestamps from the sliding window (T-13-03: prevents unbounded memory growth).
func (*HealthTracker) SetAdapterHealth ¶
func (h *HealthTracker) SetAdapterHealth(healthy bool)
SetAdapterHealth records the adapter's current health status. Pass false when HealthCheck() fails; pass true when it recovers.
func (*HealthTracker) SetLastEventTimeForTest ¶
func (h *HealthTracker) SetLastEventTimeForTest(t time.Time)
SetLastEventTimeForTest allows tests to set lastEventTime directly for deterministic silence detection testing. Only intended for use in tests.
type Notifier ¶ added in v1.6.0
Notifier is implemented by concrete alert sinks (conductor Telegram/Slack/Discord bridge). The HealthBridge calls Notify once per triggered alert after debounce.
type NtfyAdapter ¶
type NtfyAdapter struct {
// contains filtered or unexported fields
}
NtfyAdapter implements WatcherAdapter by subscribing to an ntfy topic via NDJSON streaming (one JSON object per line). It auto-reconnects with exponential backoff on disconnection and resumes from the last received message ID.
func (*NtfyAdapter) HealthCheck ¶
func (a *NtfyAdapter) HealthCheck() error
HealthCheck verifies the ntfy server is reachable by sending an HTTP HEAD request with a 5-second timeout (per D-10).
func (*NtfyAdapter) Listen ¶
func (a *NtfyAdapter) Listen(ctx context.Context, events chan<- Event) error
Listen connects to the ntfy NDJSON stream and emits normalized Events on the provided channel. On disconnection, it reconnects with exponential backoff (initial 2s, 2x factor, 30s cap) per D-08. Listen only returns when the context is cancelled.
func (*NtfyAdapter) Setup ¶
func (a *NtfyAdapter) Setup(_ context.Context, config AdapterConfig) error
Setup initializes the adapter with the ntfy server URL and topic. The topic is required; the server defaults to "https://ntfy.sh".
Settings:
- "topic": required ntfy topic name
- "server": ntfy server URL (default "https://ntfy.sh")
func (*NtfyAdapter) Teardown ¶
func (a *NtfyAdapter) Teardown() error
Teardown is a no-op. The streaming HTTP connection is closed by context cancellation in Listen.
type RouteResult ¶
type RouteResult struct {
// Conductor is the conductor name to route the event to
Conductor string
// Group is the TUI group path
Group string
// Name is the client's display name
Name string
// MatchType is either "exact" (email match) or "wildcard" (domain match)
MatchType string
}
RouteResult is the result of a Router.Match call.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router provides config-driven routing of events to conductors. It loads rules from clients.json and matches incoming senders using exact email lookup first, then wildcard domain fallback.
Router is safe for concurrent use. Match takes a read lock so multiple callers can proceed in parallel. Reload takes a write lock to atomically replace the internal routing tables (D-12/D-15/D-16).
func LoadFromWatcherDir ¶
LoadFromWatcherDir loads clients.json from the standard watcher directory (~/.agent-deck/watcher/clients.json) and returns a ready-to-use Router.
func NewRouter ¶
func NewRouter(clients map[string]ClientEntry) *Router
NewRouter builds a Router from a flat map of client entries. Keys starting with "*@" are treated as wildcard domain rules; all other keys are treated as exact email rules.
func (*Router) Match ¶
func (r *Router) Match(sender string) *RouteResult
Match returns the routing result for a given sender address. Exact email match takes priority over wildcard domain match (D-08). Returns nil if the sender does not match any rule (unrouted).
Safe to call concurrently with Reload (takes RLock for the duration, D-15).
func (*Router) Reload ¶
func (r *Router) Reload(newClients map[string]ClientEntry)
Reload atomically replaces the router's routing tables with newClients. Safe to call concurrently with Match. Builds fresh exact and wildcard maps before acquiring the write lock, then swaps both under a single lock (D-12/D-16). A nil input is treated as an empty map (never assigns nil to internal fields, D-15/Pitfall 5).
type SlackAdapter ¶
type SlackAdapter struct {
// contains filtered or unexported fields
}
SlackAdapter subscribes to an ntfy.sh topic that receives bridged Slack events from a Cloudflare Worker. It parses both v1 (plain text) and v2 (structured JSON) payloads, normalizes to Event with deterministic Slack-specific dedup keys, and detects thread replies for session routing.
The adapter duplicates the ntfy NDJSON streaming core (independent, not embedding NtfyAdapter) per design decision D-02.
func (*SlackAdapter) HealthCheck ¶
func (a *SlackAdapter) HealthCheck() error
HealthCheck verifies the ntfy server is reachable by sending an HTTP HEAD request with a 5-second timeout.
func (*SlackAdapter) Listen ¶
func (a *SlackAdapter) Listen(ctx context.Context, events chan<- Event) error
Listen connects to the ntfy NDJSON stream and emits normalized Events on the provided channel. On disconnection, it reconnects with exponential backoff (initial 2s, 2x factor, 30s cap). Listen only returns when the context is cancelled.
func (*SlackAdapter) Setup ¶
func (a *SlackAdapter) Setup(_ context.Context, config AdapterConfig) error
Setup initializes the adapter with the ntfy server URL and topic. The topic is required; the server defaults to "https://ntfy.sh".
Settings:
- "topic": required ntfy topic name for Slack bridge
- "server": ntfy server URL (default "https://ntfy.sh")
func (*SlackAdapter) Teardown ¶
func (a *SlackAdapter) Teardown() error
Teardown is a no-op. The streaming HTTP connection is closed by context cancellation in Listen.
type TriageRequest ¶
type TriageRequest struct {
Event Event
WatcherID string
Profile string
Tracker *HealthTracker
ResultPath string // absolute path: <TriageDir>/<DedupKey()>/result.json
TriageDir string // absolute: <EngineConfig.TriageDir>/<DedupKey()>/
SpawnedAt time.Time
}
TriageRequest is the work item the writerLoop hands to triageLoop.
type TriageSpawner ¶
type TriageSpawner interface {
Spawn(ctx context.Context, req TriageRequest) (sessionID string, err error)
}
TriageSpawner launches a triage session for an unrouted event (D-25).
type WatcherAdapter ¶
type WatcherAdapter interface {
// Setup initializes the adapter with the provided config. Called once before Listen.
Setup(ctx context.Context, config AdapterConfig) error
// Listen blocks, producing normalized events on the provided channel until ctx is cancelled.
Listen(ctx context.Context, events chan<- Event) error
// Teardown cleans up any resources allocated during Setup.
Teardown() error
// HealthCheck returns a non-nil error if the adapter cannot currently reach its source.
HealthCheck() error
}
WatcherAdapter is the interface that all event source adapters must implement. Adapters normalize raw events from external sources into Event structs.
type WatcherState ¶ added in v1.6.0
type WatcherState struct {
LastEventTS time.Time `json:"last_event_ts"`
ErrorCount int `json:"error_count"`
AdapterHealthy bool `json:"adapter_healthy"`
HealthWindow []HealthSample `json:"health_window"` // cap at 64 samples
DedupCursor string `json:"dedup_cursor"`
}
WatcherState is the persisted per-watcher state snapshot written by the writerLoop after every successful event insert. Hot-reload safe: no in-process cache.
func LoadState ¶ added in v1.6.0
func LoadState(name string) (*WatcherState, error)
LoadState reads <name>/state.json. Returns (nil, nil) if the file does not exist (fresh watcher). Always hits disk — no in-process cache, so external edits are visible on next call (hot-reload safe).
type WebhookAdapter ¶
type WebhookAdapter struct {
// contains filtered or unexported fields
}
WebhookAdapter implements WatcherAdapter by running an HTTP server that accepts POST requests on /webhook and normalizes them to Event structs. The server binds to 127.0.0.1 by default (configurable via Settings["bind"]) to avoid accidental public exposure.
func (*WebhookAdapter) HealthCheck ¶
func (a *WebhookAdapter) HealthCheck() error
HealthCheck verifies the HTTP listener is accepting TCP connections (per D-06).
func (*WebhookAdapter) Listen ¶
func (a *WebhookAdapter) Listen(ctx context.Context, events chan<- Event) error
Listen starts the HTTP server and blocks until the context is cancelled. On context cancellation, the server is gracefully shut down with a 5-second timeout.
func (*WebhookAdapter) Setup ¶
func (a *WebhookAdapter) Setup(_ context.Context, config AdapterConfig) error
Setup initializes the adapter with the HTTP server configuration but does NOT start the server. The server is started in Listen.
Settings:
- "port": TCP port to listen on (default "18460")
- "bind": Bind address (default "127.0.0.1" per T-14-04)
func (*WebhookAdapter) Teardown ¶
func (a *WebhookAdapter) Teardown() error
Teardown is a no-op because the server is shut down in Listen via context cancellation (per D-04).