Documentation
¶
Overview ¶
Package worker is the v1.3 background job runner. The daemon spawns a Pool at startup; the pool polls scans WHERE status = 'queued' (SQLite) or LISTENs on a Postgres channel for new rows, transitions each picked-up row through running → completed/failed, and invokes a Runner for the actual work.
In phase 8 the Runner is a stub: it logs "would scan providers X" and marks the row completed with zero findings. The real scan- engine integration ships with v1.4 phase 9 (scan-now SSE flow), where the studio loads the operator's compliancekit.yaml + constructs the Engine + streams progress. The interface here is designed so swapping in a real Runner is a one-line constructor change with no schema or API impact.
LISTEN/NOTIFY for Postgres lets multiple daemon replicas share a single queue without polling. SQLite drops to a 500ms polling loop — fine for the single-process case Default sqlite serves.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Concurrency caps how many jobs run in parallel. 0 → 2.
Concurrency int
// MaxConcurrency is the upper bound the v1.11 phase 8 autoscale
// loop can grow to when queue depth sustains above the threshold.
// 0 → Concurrency + 4 (room for one burst of 4 extra workers).
MaxConcurrency int
// PollEvery is how often the SQLite path scans the scans table
// for new queued rows. Ignored on Postgres (LISTEN/NOTIFY is
// event-driven). 0 → 500ms.
PollEvery time.Duration
// Runner is the work-doer. Defaults to StubRunner if nil.
Runner Runner
// Log destination. Defaults to slog.Default().
Log *slog.Logger
// Events is the v1.6 SSE event-bus producer. nil-safe: when nil
// the worker still runs but no events fan out to the daemon's
// /api/v1/events subscribers.
Events *events.Producer
// DepthObserver is the v1.11 phase 8 queue-depth metric sink.
// nil-safe: when unset, the autoscale loop still runs but the
// gauge isn't exported.
DepthObserver DepthObserver
}
Config is the runtime knobs the pool takes. Zero values resolve to sensible defaults via Default.
type DepthObserver ¶ added in v1.11.0
type DepthObserver interface {
ObserveQueueDepth(d int)
}
DepthObserver is the metric sink. The daemon's metricsRegistry implements this with a Prometheus gauge; tests pass an in-memory recorder.
type Job ¶
type Job struct {
ScanID string
ProvidersScanned []string
FrameworksScanned []string
TriggeredByUser string
TriggeredByToken string
}
Job is one row picked off the queue. The pool hands it to the Runner; the runner decides what to do with the providers / frameworks list (in phase 8 the stub just logs).
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is the running background worker pool. Construct via New; start with Start (returns immediately after spawning goroutines); stop via the parent context's cancellation.
func (*Pool) Start ¶
Start spawns the worker goroutines. Returns immediately. Workers stop when ctx is canceled and Wait()-able via the pool's internal WaitGroup (use Stop()).
func (*Pool) Stop ¶
func (p *Pool) Stop()
Stop waits for every spawned goroutine to exit. Call after the parent context is canceled.
func (*Pool) WithLeader ¶ added in v1.15.0
WithLeader installs the HA leader-election gate. When set, the drain loop short-circuits when leader() returns false — only the leader claims rows from the shared scans table. Nil leaves the pool in single-leader (always-true) mode, the v1.14-and-earlier behavior.
type RealRunner ¶ added in v1.5.1
type RealRunner struct {
// contains filtered or unexported fields
}
RealRunner wires the daemon's queue to the real compliancekit engine. Constructed via NewRealRunner; pass to worker.Default() or worker.Config.Runner.
func NewRealRunner ¶ added in v1.5.1
func NewRealRunner(st *store.Store) *RealRunner
NewRealRunner returns a Runner that builds collectors from the daemon's providers table + filters checks by checks_state + invokes engine.Run + persists findings.
func (*RealRunner) Run ¶ added in v1.5.1
func (r *RealRunner) Run(ctx context.Context, j Job) error
Run satisfies worker.Runner.
func (*RealRunner) WithEvents ¶ added in v1.6.0
func (r *RealRunner) WithEvents(p *events.Producer) *RealRunner
WithEvents installs the v1.6 SSE Producer so per-finding + per-scan progress events fan out to /api/v1/events subscribers. Returns the receiver for chaining.
func (*RealRunner) WithOpenLineage ¶ added in v1.17.0
func (r *RealRunner) WithOpenLineage(em *warehouse.OpenLineageEmitter) *RealRunner
WithOpenLineage installs the v1.17 phase 5 OpenLineage emitter so every scan publishes a START + COMPLETE event to the configured Marquez/DataHub receiver. Returns the receiver for chaining. Nil/unset means no events fire (the daemon still works; lineage just stays unrecorded).
type Runner ¶
Runner is the work-doer. Real implementations call compliancekit's Engine; the phase-8 default just stubs.
var StubRunner Runner = RunnerFunc(func(_ context.Context, j Job) error { slog.Default().Info("worker: stub scan run", "scan_id", j.ScanID, "providers", j.ProvidersScanned, ) time.Sleep(50 * time.Millisecond) return nil })
StubRunner is the phase-8 default: log + sleep 100ms + mark completed. Wired into Pool.New() so the daemon comes up runnable without any extra dep — v1.4 phase 9 swaps this for a real scan-engine Runner.