watcher

package
v1.9.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: MIT Imports: 36 Imported by: 0

Documentation

Overview

Package watcher's Gmail adapter delivers normalized Events from a Gmail account via the Gmail Pub/Sub watch + streaming pull flow:

  1. users.Watch() registers a push target (Google Cloud Pub/Sub topic) that Gmail publishes change notifications to. The watch expires after ~7 days.
  2. Subscription.Receive subscribes to that topic via the cloud.google.com/go/pubsub client and blocks until ctx is cancelled. Google's client handles lease extension, flow control, and reconnects natively.
  3. Each envelope is shaped {emailAddress, historyId}. The handler calls users.history.list(startHistoryId=persistedWatchHistoryID) to fetch the new message IDs, then users.messages.get(format=metadata) to fetch From/Subject/ snippet/internalDate, normalizes those to Event, and sends on the channel.
  4. On success the Pub/Sub message is Acked. On 404 from history.list (stale historyId past Gmail's retention window) the handler falls back to the envelope's historyId, logs a warning, and Acks — dropping some events is acceptable per CONTEXT.md D-18. Malformed envelopes are Acked (not Nacked) so they cannot redeliver forever.

Plan 17-02 implements Setup, Listen (with a STUB renewalLoop that only waits on ctx.Done), Teardown, HealthCheck (minimal), processHistory, registerWatch, normalizeGmailMessage, the label filter, and the persistingTokenSource wrapper. The full renewalLoop body and expanded HealthCheck coverage land in Plan 17-03.

Index

Constants

View Source
const (
	TriggerSilenceDetected           = "silence_detected"
	TriggerErrorThresholdExceeded    = "error_threshold_exceeded"
	TriggerAdapterTeardownUnexpected = "adapter_teardown_unexpected"
)

Valid Trigger values.

View Source
const (
	TriageMaxPerHour = 5
	TriageWindow     = 60 * time.Minute
	TriageQueueCap   = 16
	TriageReqChCap   = 16
	TriageReaperPoll = 5 * time.Second
	TriageTimeout    = 10 * time.Minute
)

Triage rate-limiting and queue constants (INTEL-03, D-10/10a/10b).

Variables

This section is empty.

Functions

func AppendClientEntry

func AppendClientEntry(path, sender string, entry ClientEntry) error

AppendClientEntry atomically appends a sender→entry mapping to clients.json at the given path. It is safe to call concurrently from multiple goroutines.

Behavior:

  • Wildcard senders ("*@domain") are rejected immediately — these must only be added by explicit human action (D-14, T-18-02).
  • If the sender is already present, returns nil without modifying the file (idempotent, D-11 step 3).
  • If clients.json does not exist, it is created with mode 0o600.
  • The parent directory is created with mode 0o700 if it does not exist.
  • The write is atomic via write-to-temp + os.Rename (POSIX, D-11, T-18-01).
  • A stale <path>.tmp from a prior crash is silently overwritten by os.WriteFile (truncates) and then renamed away (T-18-04).

func AppendEventLog added in v1.6.0

func AppendEventLog(name, entry string) error

AppendEventLog appends one Markdown line to <name>/task-log.md. Expected entry format: "## <RFC3339-ts> - <event_type>: <summary>" (no trailing newline — added here). Atomicity: O_APPEND on a single Write is POSIX-atomic for sizes <= PIPE_BUF (4096 on Linux, 512 on older BSD). The engine writerLoop is single-goroutine per engine instance so per-watcher serialization is guaranteed today; entries are truncated to <512 bytes by the caller (engine.go) so atomicity holds even if writerLoop becomes multi-goroutine.

func BuildPrompt

func BuildPrompt(event Event, clientsList map[string]ClientEntry, resultPath string) (string, error)

BuildPrompt renders the triage prompt template with the given event, clients list, and absolute result path. The event body is truncated to maxBodyRunes if longer.

func LayoutDir added in v1.6.0

func LayoutDir() (string, error)

LayoutDir returns the root watcher dir (~/.agent-deck/watcher). Delegates to session.WatcherDir for single-source-of-truth.

func LoadClientsJSON

func LoadClientsJSON(path string) (map[string]ClientEntry, error)

LoadClientsJSON reads a clients.json file from disk and returns the parsed entries. Returns an error if the file does not exist or contains invalid JSON.

Threat T-13-01: entries with empty conductor fields are logged as warnings at engine startup; the router itself does not reject them to preserve forward compatibility.

func MigrateLegacyWatchersDir added in v1.6.0

func MigrateLegacyWatchersDir() error

MigrateLegacyWatchersDir performs the one-shot ~/.agent-deck/watchers -> ~/.agent-deck/watcher rename and creates a relative compatibility symlink watchers -> watcher. Single-shot; subsequent calls no-op. SECURITY T-21-SL: uses os.Lstat (not os.Stat) and refuses if watcher/ is a symlink targeting outside the deck root.

func SaveState added in v1.6.0

func SaveState(name string, s *WatcherState) error

SaveState writes <name>/state.json atomically (write-temp-rename). Pattern copied from internal/session SaveWatcherMeta (atomic write-temp-rename).

func ScaffoldWatcherLayout added in v1.6.0

func ScaffoldWatcherLayout() error

ScaffoldWatcherLayout creates the singular watcher/ dir and writes default shared files if absent. Uses O_CREATE|O_EXCL for TOCTOU-safe idempotent writes.

func WatcherDir added in v1.6.0

func WatcherDir(name string) (string, error)

WatcherDir returns ~/.agent-deck/watcher/<name> after validating name. T-21-PI mitigation.

Types

type AdapterConfig

type AdapterConfig struct {
	// Type is the adapter type: "webhook", "ntfy", "github", "slack", "gmail"
	Type string

	// Name is the watcher name (used for logging and health tracking)
	Name string

	// Settings holds adapter-specific key-value pairs from the watcher.toml [source] section.
	Settings map[string]string
}

AdapterConfig holds the configuration passed to a WatcherAdapter during Setup.

type AgentDeckLaunchSpawner

type AgentDeckLaunchSpawner struct {
	BinaryPath string
}

AgentDeckLaunchSpawner invokes `agent-deck launch` to spawn a triage Claude session. BinaryPath is resolved via session.FindAgentDeck() at engine startup (D-01/D-03).

func (AgentDeckLaunchSpawner) Spawn

Spawn creates the triage directory, builds the prompt, and exec's agent-deck launch. Returns an error without spawning if the binary does not exist (T-18-13).

type Alert added in v1.6.0

type Alert struct {
	WatcherName string
	Trigger     string
	Message     string
	Timestamp   time.Time
}

Alert is a single outbound alert emitted by the HealthBridge.

type ClientEntry

type ClientEntry struct {
	// Conductor is the conductor name that handles events from this client
	Conductor string `json:"conductor"`

	// Group is the TUI group path to place sessions under (e.g., "client-a/inbox")
	Group string `json:"group"`

	// Name is a human-readable label for the client
	Name string `json:"name"`
}

ClientEntry holds the routing configuration for a single client. Entries in clients.json are keyed by either exact email ("user@example.com") or wildcard domain ("*@example.com").

type Clock

type Clock interface {
	Now() time.Time
	After(d time.Duration) <-chan time.Time
	NewTicker(d time.Duration) *time.Ticker
}

Clock abstracts time for rate-limiter and reaper determinism (D-26).

type Config added in v1.6.0

type Config struct {
	Enabled        bool
	Channels       []string
	DebounceWindow time.Duration // defaults to 15*time.Minute when zero
}

Config controls HealthBridge behavior. Zero-value-safe.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates the watcher event pipeline: adapter goroutines produce Events, a single-writer goroutine serializes DB writes via a buffered channel, dedup is handled by INSERT OR IGNORE, and the router determines event routing.

Lifecycle: NewEngine -> RegisterAdapter (1..N) -> Start -> Stop.

func NewEngine

func NewEngine(cfg EngineConfig) *Engine

NewEngine creates a new Engine with the provided configuration. Call RegisterAdapter to add adapters, then Start to begin processing.

func (*Engine) EventCh

func (e *Engine) EventCh() <-chan Event

EventCh returns a read-only channel of routed events for TUI consumption (D-20).

func (*Engine) HealthCh

func (e *Engine) HealthCh() <-chan HealthState

HealthCh returns a read-only channel of health state updates for TUI consumption (D-20).

func (*Engine) PumpTriageQueue

func (e *Engine) PumpTriageQueue()

PumpTriageQueue attempts to drain the triage queue by re-trying queued requests through the rate limiter. Called by the eviction ticker and by tests.

func (*Engine) RegisterAdapter

func (e *Engine) RegisterAdapter(watcherID string, adapter WatcherAdapter, config AdapterConfig, maxSilenceMinutes int)

RegisterAdapter adds an adapter to the engine. Must be called before Start. watcherID is the statedb watcher ID used for SaveWatcherEvent persistence. maxSilenceMinutes is the threshold for silence detection in the health tracker.

func (*Engine) Start

func (e *Engine) Start() error

Start begins the event pipeline. For each registered adapter, it calls Setup, then launches an adapter goroutine. It also starts the single-writer goroutine and optionally the health check loop.

func (*Engine) Stop

func (e *Engine) Stop()

Stop cancels all adapter contexts, calls Teardown on each adapter, waits for all goroutines to exit, and closes the exported channels (EventCh, HealthCh) so consumers receive (_, false) instead of blocking forever (V1.9 T5, critical-hunt #9). Safe to call multiple times — the close happens at most once via stopOnce.

type EngineConfig

type EngineConfig struct {
	// DB is the state database for event persistence and dedup.
	DB *statedb.StateDB

	// Router routes events to conductors based on sender.
	Router *Router

	// MaxEventsPerWatcher limits the number of stored events per watcher (pruning threshold).
	MaxEventsPerWatcher int

	// HealthCheckInterval controls how often adapter health checks run.
	// Set to 0 to disable the health check loop (useful in tests).
	HealthCheckInterval time.Duration

	// Logger is the structured logger. Defaults to logging.ForComponent(logging.CompWatcher).
	Logger *slog.Logger

	// TriageSpawner launches triage sessions for unrouted events (INTEL-01, D-25).
	// Defaults to AgentDeckLaunchSpawner when nil.
	TriageSpawner TriageSpawner

	// Clock abstracts time for the rate limiter and reaper (D-26).
	// Defaults to realClock{} when nil.
	Clock Clock

	// TriageDir is the directory where triage session results are written.
	// Defaults to $HOME/.agent-deck/triage/ when empty.
	TriageDir string

	// ClientsPath is the path to clients.json for triage hot-reload.
	// Defaults to $HOME/.agent-deck/watcher/clients.json when empty.
	ClientsPath string

	// Profile is the agent-deck profile flag passed to spawned triage sessions.
	// Defaults to AGENTDECK_PROFILE env var, then "default".
	Profile string
}

EngineConfig holds the configuration for the watcher Engine.

type Event

type Event struct {
	// Source is the watcher adapter type (e.g., "webhook", "ntfy", "github", "slack", "gmail")
	Source string `json:"source"`

	// Sender is the normalized email or identifier of the event originator
	Sender string `json:"sender"`

	// Subject is a short human-readable summary of the event
	Subject string `json:"subject"`

	// Body is the full payload text of the event
	Body string `json:"body"`

	// Timestamp is the time the event occurred (from the source, not ingestion time)
	Timestamp time.Time `json:"timestamp"`

	// RawPayload holds the adapter-specific raw data for debugging and audit
	RawPayload json.RawMessage `json:"raw_payload,omitempty"`

	// CustomDedupKey overrides the computed SHA-256 DedupKey when non-empty.
	// Used by adapters that need deterministic keys (e.g., Slack: "slack-{CHANNEL}-{TS}").
	CustomDedupKey string `json:"custom_dedup_key,omitempty"`

	// ParentDedupKey holds the dedup key of the parent event for thread replies.
	// When non-empty, the engine looks up the parent's session_id for thread routing.
	ParentDedupKey string `json:"parent_dedup_key,omitempty"`

	// ThreadSessionID is populated by the engine's writerLoop when a thread reply
	// is routed to an existing session. Empty means spawn a new session.
	ThreadSessionID string `json:"thread_session_id,omitempty"`

	// RoutedTo is populated by the engine's writerLoop with the conductor name
	// from Router.Match(Sender), or "triage" / "" when no rule matches.
	// Consumed by the TUI to deliver events into the conductor's tmux pane.
	RoutedTo string `json:"routed_to,omitempty"`
}

Event is a normalized event from any watcher adapter. All fields use json tags for persistence and wire format compatibility.

func (Event) DedupKey

func (e Event) DedupKey() string

DedupKey returns a deterministic hex-encoded SHA-256 hash of the event's source, sender, subject, and timestamp. The pipe delimiter prevents field-boundary collisions (e.g., sender="a|b" vs sender="a", subject="|b"). Identical events from the same source at the same time produce the same key.

type GitHubAdapter

type GitHubAdapter struct {
	// contains filtered or unexported fields
}

GitHubAdapter implements WatcherAdapter by running an HTTP server that accepts POST requests on /github and verifies X-Hub-Signature-256 HMAC-SHA256 signatures before normalizing GitHub webhook events to Event structs. The server binds to 127.0.0.1 by default (configurable via Settings["bind"]) to avoid accidental public exposure.

func (*GitHubAdapter) Addr

func (a *GitHubAdapter) Addr() string

Addr returns the current listen address (thread-safe). Useful for tests when port 0 is used to get a random available port.

func (*GitHubAdapter) HealthCheck

func (a *GitHubAdapter) HealthCheck() error

HealthCheck verifies the HTTP listener is accepting TCP connections.

func (*GitHubAdapter) Listen

func (a *GitHubAdapter) Listen(ctx context.Context, events chan<- Event) error

Listen starts the HTTP server and blocks until the context is cancelled. On context cancellation, the server is gracefully shut down with a 5-second timeout.

func (*GitHubAdapter) Setup

func (a *GitHubAdapter) Setup(_ context.Context, config AdapterConfig) error

Setup initializes the adapter with the HTTP server configuration but does NOT start the server. The server is started in Listen.

Settings:

  • "secret": required webhook secret for HMAC-SHA256 verification
  • "port": TCP port to listen on (default "18461")
  • "bind": Bind address (default "127.0.0.1" per T-14-04)

func (*GitHubAdapter) Teardown

func (a *GitHubAdapter) Teardown() error

Teardown is a no-op because the server is shut down in Listen via context cancellation.

type GmailAdapter

type GmailAdapter struct {
	// contains filtered or unexported fields
}

GmailAdapter implements WatcherAdapter against the Gmail Pub/Sub watch API. All fields are private; construction uses NewGmailAdapter().

func NewGmailAdapter

func NewGmailAdapter() *GmailAdapter

NewGmailAdapter constructs a GmailAdapter with default time functions. Tests inject fakes by assigning a.nowFunc / a.afterFunc directly.

func (*GmailAdapter) HealthCheck

func (a *GmailAdapter) HealthCheck() error

HealthCheck reports the adapter's ability to reach Gmail + Pub/Sub. Plan 17-02 covers the minimal path: returns a cached lastHealthErr (set by renewalLoop in Plan 17-03), exercises the token source, checks the Pub/Sub subscription, and flags a lapsed watch expiry. Plan 17-03 adds explicit unit coverage per branch.

func (*GmailAdapter) Listen

func (a *GmailAdapter) Listen(ctx context.Context, events chan<- Event) error

Listen blocks on Subscription.Receive until ctx is cancelled. Each Pub/Sub envelope is decoded, used to drive a users.history.list + users.messages.get fan-out, and Acked on success (or on a 404 stale-history fallback). Transient Gmail errors result in Nack so Pub/Sub redelivers.

A sync.WaitGroup ensures the renewal goroutine is joined before Listen returns — even though the renewalLoop body is a STUB in Plan 17-02 (it only waits on ctx.Done), the WaitGroup is in place so Plan 17-03 can drop in the real body without changing the lifecycle contract. This prevents Pitfall 3 (renewal goroutine outliving Listen and surfacing as a goleak false positive).

func (*GmailAdapter) Setup

func (a *GmailAdapter) Setup(ctx context.Context, config AdapterConfig) error

Setup loads OAuth credentials + token, builds gmail.Service and pubsub.Client (both authenticating with the same user token via persistingTokenSource), verifies the subscription exists, loads existing WatcherMeta, and conditionally calls users.Watch() if no watch is present or the existing watch expires in under 2 hours (D-11 threshold).

Settings keys:

  • topic (required): projects/{projectID}/topics/{topic}
  • subscription (required): projects/{projectID}/subscriptions/{sub}
  • credentials_path: defaults to <watcher_dir>/credentials.json
  • token_path: defaults to <watcher_dir>/token.json
  • labels: optional comma-separated Gmail label filter
  • account: optional informational Gmail address

func (*GmailAdapter) Teardown

func (a *GmailAdapter) Teardown() error

Teardown closes the pubsub client so gRPC connections are released (required for goleak). It does NOT call users.Stop() per D-34 — stopping the watch would burn 50 quota units on every restart and Google treats users.Watch as idempotent on the next Setup.

type HealthBridge added in v1.6.0

type HealthBridge struct {
	// contains filtered or unexported fields
}

HealthBridge subscribes to an engine health signal and fans alerts to a Notifier with per-(watcher x trigger) debounce. Resilient to notifier errors and panics.

func NewHealthBridge added in v1.6.0

func NewHealthBridge(cfg Config, notifier Notifier, source <-chan HealthState) *HealthBridge

NewHealthBridge constructs a bridge. Does not start it. Call Run with a context to begin consuming from source.

func (*HealthBridge) NotifyTeardown added in v1.6.0

func (b *HealthBridge) NotifyTeardown(watcherName string, unexpected bool)

NotifyTeardown is called when an adapter teardown is unexpected. Always passes through the same debounce path as regular health alerts.

func (*HealthBridge) Run added in v1.6.0

func (b *HealthBridge) Run(ctx context.Context) error

Run blocks until ctx is done or source is closed. Honors cfg.Enabled. Never returns a non-nil error (notifier failures are logged; ctx cancel returns nil).

type HealthSample added in v1.6.0

type HealthSample struct {
	TS time.Time `json:"ts"`
	OK bool      `json:"ok"`
}

HealthSample is a single health snapshot stored in WatcherState.HealthWindow.

type HealthState

type HealthState struct {
	// WatcherName is the name of the watcher this state belongs to
	WatcherName string

	// Status is the overall health status
	Status HealthStatus

	// EventsPerHour is the rolling event rate over the last 60 minutes
	EventsPerHour float64

	// LastEventTime is the time of the most recently received event
	LastEventTime time.Time

	// ConsecutiveErrors is the number of consecutive HealthCheck errors since the last event
	ConsecutiveErrors int

	// Message is a human-readable explanation for Warning or Error states
	Message string
}

HealthState is a snapshot of watcher health for display in the TUI and health alerts. It is emitted by HealthTracker.Check() and consumed by the engine in Phase 16.

type HealthStatus

type HealthStatus string

HealthStatus is the overall health state of a watcher.

const (
	// HealthStatusHealthy indicates the watcher is functioning normally.
	HealthStatusHealthy HealthStatus = "healthy"

	// HealthStatusWarning indicates a potential issue (e.g., silence or repeated errors).
	HealthStatusWarning HealthStatus = "warning"

	// HealthStatusError indicates the watcher is failing and requires attention.
	HealthStatusError HealthStatus = "error"
)

type HealthTracker

type HealthTracker struct {
	// contains filtered or unexported fields
}

HealthTracker is a passive (no goroutine) health monitor for a single watcher. It is called from the engine's health check loop via Check(), and updated via RecordEvent() and RecordError() as events flow through the engine.

Thread-safe via mu; all public methods acquire appropriate locks.

func NewHealthTracker

func NewHealthTracker(watcherName string, maxSilenceMinutes int) *HealthTracker

NewHealthTracker creates a HealthTracker for the named watcher. maxSilenceMinutes should come from WatcherSettings.GetMaxSilenceMinutes().

func (*HealthTracker) Check

func (h *HealthTracker) Check() HealthState

Check evaluates the current health state and returns a HealthState snapshot. Per D-17, the status is computed as follows:

  • Error: adapter unhealthy OR consecutiveErrors >= 10
  • Warning: consecutiveErrors >= 3 OR silence beyond maxSilenceMinutes
  • Healthy: otherwise

The health check is passive (no I/O); the caller (engine) drives the check loop.

func (*HealthTracker) EventsInLastHour

func (h *HealthTracker) EventsInLastHour() int

EventsInLastHour returns the number of events received within the past hour. This uses a lazy read approach; old entries are pruned on RecordEvent.

func (*HealthTracker) RecordError

func (h *HealthTracker) RecordError()

RecordError increments the consecutive error count. Call this when an adapter HealthCheck() returns an error.

func (*HealthTracker) RecordEvent

func (h *HealthTracker) RecordEvent()

RecordEvent records a successfully received event. Resets consecutiveErrors, updates lastEventTime, and prunes old timestamps from the sliding window (T-13-03: prevents unbounded memory growth).

func (*HealthTracker) SetAdapterHealth

func (h *HealthTracker) SetAdapterHealth(healthy bool)

SetAdapterHealth records the adapter's current health status. Pass false when HealthCheck() fails; pass true when it recovers.

func (*HealthTracker) SetLastEventTimeForTest

func (h *HealthTracker) SetLastEventTimeForTest(t time.Time)

SetLastEventTimeForTest allows tests to set lastEventTime directly for deterministic silence detection testing. Only intended for use in tests.

type Notifier added in v1.6.0

type Notifier interface {
	Notify(ctx context.Context, alert Alert) error
}

Notifier is implemented by concrete alert sinks (conductor Telegram/Slack/Discord bridge). The HealthBridge calls Notify once per triggered alert after debounce.

type NtfyAdapter

type NtfyAdapter struct {
	// contains filtered or unexported fields
}

NtfyAdapter implements WatcherAdapter by subscribing to an ntfy topic via NDJSON streaming (one JSON object per line). It auto-reconnects with exponential backoff on disconnection and resumes from the last received message ID.

func (*NtfyAdapter) HealthCheck

func (a *NtfyAdapter) HealthCheck() error

HealthCheck verifies the ntfy server is reachable by sending an HTTP HEAD request with a 5-second timeout (per D-10).

func (*NtfyAdapter) Listen

func (a *NtfyAdapter) Listen(ctx context.Context, events chan<- Event) error

Listen connects to the ntfy NDJSON stream and emits normalized Events on the provided channel. On disconnection, it reconnects with exponential backoff (initial 2s, 2x factor, 30s cap) per D-08. Listen only returns when the context is cancelled.

func (*NtfyAdapter) Setup

func (a *NtfyAdapter) Setup(_ context.Context, config AdapterConfig) error

Setup initializes the adapter with the ntfy server URL and topic. The topic is required; the server defaults to "https://ntfy.sh".

Settings:

  • "topic": required ntfy topic name
  • "server": ntfy server URL (default "https://ntfy.sh")

func (*NtfyAdapter) Teardown

func (a *NtfyAdapter) Teardown() error

Teardown is a no-op. The streaming HTTP connection is closed by context cancellation in Listen.

type RouteResult

type RouteResult struct {
	// Conductor is the conductor name to route the event to
	Conductor string

	// Group is the TUI group path
	Group string

	// Name is the client's display name
	Name string

	// MatchType is either "exact" (email match) or "wildcard" (domain match)
	MatchType string
}

RouteResult is the result of a Router.Match call.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router provides config-driven routing of events to conductors. It loads rules from clients.json and matches incoming senders using exact email lookup first, then wildcard domain fallback.

Router is safe for concurrent use. Match takes a read lock so multiple callers can proceed in parallel. Reload takes a write lock to atomically replace the internal routing tables (D-12/D-15/D-16).

func LoadFromWatcherDir

func LoadFromWatcherDir() (*Router, error)

LoadFromWatcherDir loads clients.json from the standard watcher directory (~/.agent-deck/watcher/clients.json) and returns a ready-to-use Router.

func NewRouter

func NewRouter(clients map[string]ClientEntry) *Router

NewRouter builds a Router from a flat map of client entries. Keys starting with "*@" are treated as wildcard domain rules; all other keys are treated as exact email rules.

func (*Router) Match

func (r *Router) Match(sender string) *RouteResult

Match returns the routing result for a given sender address. Exact email match takes priority over wildcard domain match (D-08). Returns nil if the sender does not match any rule (unrouted).

Safe to call concurrently with Reload (takes RLock for the duration, D-15).

func (*Router) Reload

func (r *Router) Reload(newClients map[string]ClientEntry)

Reload atomically replaces the router's routing tables with newClients. Safe to call concurrently with Match. Builds fresh exact and wildcard maps before acquiring the write lock, then swaps both under a single lock (D-12/D-16). A nil input is treated as an empty map (never assigns nil to internal fields, D-15/Pitfall 5).

type SlackAdapter

type SlackAdapter struct {
	// contains filtered or unexported fields
}

SlackAdapter subscribes to an ntfy.sh topic that receives bridged Slack events from a Cloudflare Worker. It parses both v1 (plain text) and v2 (structured JSON) payloads, normalizes to Event with deterministic Slack-specific dedup keys, and detects thread replies for session routing.

The adapter duplicates the ntfy NDJSON streaming core (independent, not embedding NtfyAdapter) per design decision D-02.

func (*SlackAdapter) HealthCheck

func (a *SlackAdapter) HealthCheck() error

HealthCheck verifies the ntfy server is reachable by sending an HTTP HEAD request with a 5-second timeout.

func (*SlackAdapter) Listen

func (a *SlackAdapter) Listen(ctx context.Context, events chan<- Event) error

Listen connects to the ntfy NDJSON stream and emits normalized Events on the provided channel. On disconnection, it reconnects with exponential backoff (initial 2s, 2x factor, 30s cap). Listen only returns when the context is cancelled.

func (*SlackAdapter) Setup

func (a *SlackAdapter) Setup(_ context.Context, config AdapterConfig) error

Setup initializes the adapter with the ntfy server URL and topic. The topic is required; the server defaults to "https://ntfy.sh".

Settings:

  • "topic": required ntfy topic name for Slack bridge
  • "server": ntfy server URL (default "https://ntfy.sh")

func (*SlackAdapter) Teardown

func (a *SlackAdapter) Teardown() error

Teardown is a no-op. The streaming HTTP connection is closed by context cancellation in Listen.

type TriageRequest

type TriageRequest struct {
	Event      Event
	WatcherID  string
	Profile    string
	Tracker    *HealthTracker
	ResultPath string // absolute path: <TriageDir>/<DedupKey()>/result.json
	TriageDir  string // absolute: <EngineConfig.TriageDir>/<DedupKey()>/
	SpawnedAt  time.Time
}

TriageRequest is the work item the writerLoop hands to triageLoop.

type TriageSpawner

type TriageSpawner interface {
	Spawn(ctx context.Context, req TriageRequest) (sessionID string, err error)
}

TriageSpawner launches a triage session for an unrouted event (D-25).

type WatcherAdapter

type WatcherAdapter interface {
	// Setup initializes the adapter with the provided config. Called once before Listen.
	Setup(ctx context.Context, config AdapterConfig) error

	// Listen blocks, producing normalized events on the provided channel until ctx is cancelled.
	Listen(ctx context.Context, events chan<- Event) error

	// Teardown cleans up any resources allocated during Setup.
	Teardown() error

	// HealthCheck returns a non-nil error if the adapter cannot currently reach its source.
	HealthCheck() error
}

WatcherAdapter is the interface that all event source adapters must implement. Adapters normalize raw events from external sources into Event structs.

type WatcherState added in v1.6.0

type WatcherState struct {
	LastEventTS    time.Time      `json:"last_event_ts"`
	ErrorCount     int            `json:"error_count"`
	AdapterHealthy bool           `json:"adapter_healthy"`
	HealthWindow   []HealthSample `json:"health_window"` // cap at 64 samples
	DedupCursor    string         `json:"dedup_cursor"`
}

WatcherState is the persisted per-watcher state snapshot written by the writerLoop after every successful event insert. Hot-reload safe: no in-process cache.

func LoadState added in v1.6.0

func LoadState(name string) (*WatcherState, error)

LoadState reads <name>/state.json. Returns (nil, nil) if the file does not exist (fresh watcher). Always hits disk — no in-process cache, so external edits are visible on next call (hot-reload safe).

type WebhookAdapter

type WebhookAdapter struct {
	// contains filtered or unexported fields
}

WebhookAdapter implements WatcherAdapter by running an HTTP server that accepts POST requests on /webhook and normalizes them to Event structs. The server binds to 127.0.0.1 by default (configurable via Settings["bind"]) to avoid accidental public exposure.

func (*WebhookAdapter) HealthCheck

func (a *WebhookAdapter) HealthCheck() error

HealthCheck verifies the HTTP listener is accepting TCP connections (per D-06).

func (*WebhookAdapter) Listen

func (a *WebhookAdapter) Listen(ctx context.Context, events chan<- Event) error

Listen starts the HTTP server and blocks until the context is cancelled. On context cancellation, the server is gracefully shut down with a 5-second timeout.

func (*WebhookAdapter) Setup

func (a *WebhookAdapter) Setup(_ context.Context, config AdapterConfig) error

Setup initializes the adapter with the HTTP server configuration but does NOT start the server. The server is started in Listen.

Settings:

  • "port": TCP port to listen on (default "18460")
  • "bind": Bind address (default "127.0.0.1" per T-14-04)

func (*WebhookAdapter) Teardown

func (a *WebhookAdapter) Teardown() error

Teardown is a no-op because the server is shut down in Listen via context cancellation (per D-04).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL