worker

package
v1.19.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package worker is the v1.3 background job runner. The daemon spawns a Pool at startup; the pool polls scans WHERE status = 'queued' (SQLite) or LISTENs on a Postgres channel for new rows, transitions each picked-up row through running → completed/failed, and invokes a Runner for the actual work.

In phase 8 the Runner is a stub: it logs "would scan providers X" and marks the row completed with zero findings. The real scan- engine integration ships with v1.4 phase 9 (scan-now SSE flow), where the studio loads the operator's compliancekit.yaml + constructs the Engine + streams progress. The interface here is designed so swapping in a real Runner is a one-line constructor change with no schema or API impact.

LISTEN/NOTIFY for Postgres lets multiple daemon replicas share a single queue without polling. SQLite drops to a 500ms polling loop — fine for the single-process case Default sqlite serves.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Concurrency caps how many jobs run in parallel. 0 → 2.
	Concurrency int

	// MaxConcurrency is the upper bound the v1.11 phase 8 autoscale
	// loop can grow to when queue depth sustains above the threshold.
	// 0 → Concurrency + 4 (room for one burst of 4 extra workers).
	MaxConcurrency int

	// PollEvery is how often the SQLite path scans the scans table
	// for new queued rows. Ignored on Postgres (LISTEN/NOTIFY is
	// event-driven). 0 → 500ms.
	PollEvery time.Duration

	// Runner is the work-doer. Defaults to StubRunner if nil.
	Runner Runner

	// Log destination. Defaults to slog.Default().
	Log *slog.Logger

	// Events is the v1.6 SSE event-bus producer. nil-safe: when nil
	// the worker still runs but no events fan out to the daemon's
	// /api/v1/events subscribers.
	Events *events.Producer

	// DepthObserver is the v1.11 phase 8 queue-depth metric sink.
	// nil-safe: when unset, the autoscale loop still runs but the
	// gauge isn't exported.
	DepthObserver DepthObserver
}

Config is the runtime knobs the pool takes. Zero values resolve to sensible defaults via Default.

func Default

func Default() Config

Default returns the recommended baseline Config.

type DepthObserver added in v1.11.0

type DepthObserver interface {
	ObserveQueueDepth(d int)
}

DepthObserver is the metric sink. The daemon's metricsRegistry implements this with a Prometheus gauge; tests pass an in-memory recorder.

type Job

type Job struct {
	ScanID            string
	ProvidersScanned  []string
	FrameworksScanned []string
	TriggeredByUser   string
	TriggeredByToken  string
}

Job is one row picked off the queue. The pool hands it to the Runner; the runner decides what to do with the providers / frameworks list (in phase 8 the stub just logs).

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is the running background worker pool. Construct via New; start with Start (returns immediately after spawning goroutines); stop via the parent context's cancellation.

func New

func New(st *store.Store, cfg Config) *Pool

New constructs the pool. Call Start to begin processing jobs.

func (*Pool) Start

func (p *Pool) Start(ctx context.Context)

Start spawns the worker goroutines. Returns immediately. Workers stop when ctx is canceled and Wait()-able via the pool's internal WaitGroup (use Stop()).

func (*Pool) Stop

func (p *Pool) Stop()

Stop waits for every spawned goroutine to exit. Call after the parent context is canceled.

func (*Pool) WithLeader added in v1.15.0

func (p *Pool) WithLeader(leader func() bool) *Pool

WithLeader installs the HA leader-election gate. When set, the drain loop short-circuits when leader() returns false — only the leader claims rows from the shared scans table. Nil leaves the pool in single-leader (always-true) mode, the v1.14-and-earlier behavior.

type RealRunner added in v1.5.1

type RealRunner struct {
	// contains filtered or unexported fields
}

RealRunner wires the daemon's queue to the real compliancekit engine. Constructed via NewRealRunner; pass to worker.Default() or worker.Config.Runner.

func NewRealRunner added in v1.5.1

func NewRealRunner(st *store.Store) *RealRunner

NewRealRunner returns a Runner that builds collectors from the daemon's providers table + filters checks by checks_state + invokes engine.Run + persists findings.

func (*RealRunner) Run added in v1.5.1

func (r *RealRunner) Run(ctx context.Context, j Job) error

Run satisfies worker.Runner.

func (*RealRunner) WithEvents added in v1.6.0

func (r *RealRunner) WithEvents(p *events.Producer) *RealRunner

WithEvents installs the v1.6 SSE Producer so per-finding + per-scan progress events fan out to /api/v1/events subscribers. Returns the receiver for chaining.

func (*RealRunner) WithOpenLineage added in v1.17.0

func (r *RealRunner) WithOpenLineage(em *warehouse.OpenLineageEmitter) *RealRunner

WithOpenLineage installs the v1.17 phase 5 OpenLineage emitter so every scan publishes a START + COMPLETE event to the configured Marquez/DataHub receiver. Returns the receiver for chaining. Nil/unset means no events fire (the daemon still works; lineage just stays unrecorded).

type Runner

type Runner interface {
	Run(ctx context.Context, j Job) error
}

Runner is the work-doer. Real implementations call compliancekit's Engine; the phase-8 default just stubs.

var StubRunner Runner = RunnerFunc(func(_ context.Context, j Job) error {
	slog.Default().Info("worker: stub scan run",
		"scan_id", j.ScanID,
		"providers", j.ProvidersScanned,
	)

	time.Sleep(50 * time.Millisecond)
	return nil
})

StubRunner is the phase-8 default: log + sleep 100ms + mark completed. Wired into Pool.New() so the daemon comes up runnable without any extra dep — v1.4 phase 9 swaps this for a real scan-engine Runner.

type RunnerFunc

type RunnerFunc func(ctx context.Context, j Job) error

RunnerFunc adapts a function to the Runner interface.

func (RunnerFunc) Run

func (f RunnerFunc) Run(ctx context.Context, j Job) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL