controller

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: GPL-3.0 Imports: 26 Imported by: 0

Documentation

Overview

Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.

Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention

Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentHealthController

type AgentHealthController struct {
	// contains filtered or unexported fields
}

AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.

func NewAgentHealthController

func NewAgentHealthController(
	agentRepo agent.Repository,
	config *AgentHealthControllerConfig,
) *AgentHealthController

NewAgentHealthController creates a new AgentHealthController.

func (*AgentHealthController) Interval

func (c *AgentHealthController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*AgentHealthController) Name

func (c *AgentHealthController) Name() string

Name returns the controller name.

func (*AgentHealthController) Reconcile

func (c *AgentHealthController) Reconcile(ctx context.Context) (int, error)

Reconcile checks agent health and marks stale agents as offline. Uses the MarkStaleAgentsOffline method which also updates last_offline_at timestamp.

type AgentHealthControllerConfig

type AgentHealthControllerConfig struct {
	// Interval is how often to run the health check.
	// Default: 30 seconds.
	Interval time.Duration

	// StaleTimeout is how long since last heartbeat before marking an agent as offline.
	// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
	StaleTimeout time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

AgentHealthControllerConfig configures the AgentHealthController.

type ApprovalExpirationController added in v0.1.2

type ApprovalExpirationController struct {
	// contains filtered or unexported fields
}

ApprovalExpirationController auto-reopens findings whose risk acceptances have expired. It scans for approved approvals with an expires_at in the past, marks them as expired, and transitions the associated findings back to "confirmed" status.

func NewApprovalExpirationController added in v0.1.2

func NewApprovalExpirationController(
	approvalRepo vulnerability.ApprovalRepository,
	findingRepo vulnerability.FindingRepository,
	config *ApprovalExpirationControllerConfig,
) *ApprovalExpirationController

NewApprovalExpirationController creates a new ApprovalExpirationController.

func (*ApprovalExpirationController) Interval added in v0.1.2

Interval returns the reconciliation interval.

func (*ApprovalExpirationController) Name added in v0.1.2

Name returns the controller name.

func (*ApprovalExpirationController) Reconcile added in v0.1.2

func (c *ApprovalExpirationController) Reconcile(ctx context.Context) (int, error)

Reconcile finds expired approved approvals and reopens associated findings.

type ApprovalExpirationControllerConfig added in v0.1.2

type ApprovalExpirationControllerConfig struct {
	// Interval is how often to run the expiration check.
	// Default: 1 hour.
	Interval time.Duration

	// BatchSize is the maximum number of expired approvals to process per reconciliation.
	// Default: 100.
	BatchSize int

	// Logger for logging.
	Logger *logger.Logger
}

ApprovalExpirationControllerConfig configures the ApprovalExpirationController.

type AuditChainVerifyController added in v0.2.0

type AuditChainVerifyController struct {
	// contains filtered or unexported fields
}

AuditChainVerifyController periodically walks audit_log_chain for every active tenant and surfaces breaks. The audit hash chain (migration 000154) is tamper-evident — hashes of prior rows feed into each new row — but the guarantee is only useful if someone actually runs VerifyChain. An admin endpoint exists (GET /api/v1/audit-logs/verify, audit_handler.go) but that is a pull path; a malicious insider who deletes rows at 02:00 UTC can sit undetected until someone hits that endpoint.

This controller closes the gap by running VerifyChain on a timer. On any break it:

  1. Logs at ERROR level with the tenant, chain_position, and reason so the SIEM can alert on the keyword "audit chain break".
  2. Exposes a metric via the Manager's metrics sink so alerting systems can threshold on sustained drift.

The FK on audit_log_chain.audit_log_id → audit_logs.id is ON DELETE RESTRICT (see 000154_audit_hash_chain.up.sql), so DB- level deletion is already blocked. This controller handles the out-of-band tamper cases: direct UPDATE of audit_logs fields, TRUNCATE, restore-from-different-backup, etc.

func NewAuditChainVerifyController added in v0.2.0

func NewAuditChainVerifyController(
	auditSvc *audit.AuditService,
	tenantRepo tenantdom.Repository,
	cfg *AuditChainVerifyControllerConfig,
) *AuditChainVerifyController

NewAuditChainVerifyController wires the controller. tenantRepo must expose ListActiveTenantIDs (verified at Reconcile time via a type assertion — we don't want a new domain method just to thread this through).

func (*AuditChainVerifyController) Interval added in v0.2.0

Interval returns the reconciliation interval.

func (*AuditChainVerifyController) Name added in v0.2.0

Name returns the controller name.

func (*AuditChainVerifyController) Reconcile added in v0.2.0

func (c *AuditChainVerifyController) Reconcile(ctx context.Context) (int, error)

Reconcile walks every active tenant's chain once and returns the total number of tenants processed. Breaks are surfaced via the logger; the return int is the "work unit" counter the controller Manager uses for rate / health telemetry, not the break count.

type AuditChainVerifyControllerConfig added in v0.2.0

type AuditChainVerifyControllerConfig struct {
	// Interval is how often to walk every active tenant's audit chain.
	// Default: 1 hour. A malicious insider has at most (Interval + the
	// duration of this run) before detection, so tune down if your
	// compliance regime demands tighter MTTD for tamper events.
	Interval time.Duration

	// PerTenantLimit is the maximum chain entries walked per tenant per
	// run. Matches the AuditService cap (10_000). Memory-bound; raise
	// only after benchmarking.
	PerTenantLimit int

	Logger *logger.Logger
}

AuditChainVerifyControllerConfig configures AuditChainVerifyController.

type AuditRetentionController

type AuditRetentionController struct {
	// contains filtered or unexported fields
}

AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes

The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years

IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.

func NewAuditRetentionController

func NewAuditRetentionController(
	auditRepo admin.AuditLogRepository,
	config *AuditRetentionControllerConfig,
) *AuditRetentionController

NewAuditRetentionController creates a new AuditRetentionController.

func (*AuditRetentionController) Interval

func (c *AuditRetentionController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*AuditRetentionController) Name

func (c *AuditRetentionController) Name() string

Name returns the controller name.

func (*AuditRetentionController) Reconcile

func (c *AuditRetentionController) Reconcile(ctx context.Context) (int, error)

Reconcile deletes audit logs older than the retention period.

type AuditRetentionControllerConfig

type AuditRetentionControllerConfig struct {
	// Interval is how often to run the retention check.
	// Default: 24 hours (once a day).
	Interval time.Duration

	// RetentionDays is how long to keep audit logs.
	// Logs older than this will be deleted.
	// Default: 365 days (1 year).
	RetentionDays int

	// BatchSize is the maximum number of logs to delete in one batch.
	// This prevents long-running transactions.
	// Default: 10000.
	BatchSize int

	// DryRun if true, only counts logs that would be deleted without actually deleting.
	// Useful for testing retention policies.
	// Default: false.
	DryRun bool

	// Logger for logging.
	Logger *logger.Logger
}

AuditRetentionControllerConfig configures the AuditRetentionController.

type ControlChangePublisher added in v0.2.0

type ControlChangePublisher struct {
	// contains filtered or unexported fields
}

ControlChangePublisher enqueues reclassify requests scoped to the assets a changed control protects.

func NewControlChangePublisher added in v0.2.0

func NewControlChangePublisher(q ReclassifyQueue, log *logger.Logger) *ControlChangePublisher

NewControlChangePublisher wires the queue.

func (*ControlChangePublisher) PublishChange added in v0.2.0

func (p *ControlChangePublisher) PublishChange(
	ctx context.Context,
	tenantID shared.ID,
	assetIDs []shared.ID,
	reason string,
)

PublishChange enqueues a reclassify request. Errors are logged but NOT returned — a failed enqueue must not roll back the control write that triggered it.

type ControlTestCadenceConfig added in v0.2.0

type ControlTestCadenceConfig struct {
	Interval  time.Duration // default 1h
	Grace     time.Duration // default 7 days
	Publisher *ControlChangePublisher
	Logger    *logger.Logger
}

ControlTestCadenceConfig tunes the controller.

type ControlTestCadenceController added in v0.2.0

type ControlTestCadenceController struct {
	// contains filtered or unexported fields
}

ControlTestCadenceController implements Name/Interval/Reconcile.

func NewControlTestCadenceController added in v0.2.0

func NewControlTestCadenceController(store ControlTestSink, cfg *ControlTestCadenceConfig) *ControlTestCadenceController

NewControlTestCadenceController wires deps with safe defaults.

func (*ControlTestCadenceController) Interval added in v0.2.0

Interval returns the tick interval.

func (*ControlTestCadenceController) Name added in v0.2.0

Name returns the controller name.

func (*ControlTestCadenceController) Reconcile added in v0.2.0

func (c *ControlTestCadenceController) Reconcile(ctx context.Context) (int, error)

Reconcile marks overdue, expires with grace, and triggers reclassification for every expired control.

type ControlTestSchedulerConfig added in v0.1.7

type ControlTestSchedulerConfig struct {
	// Interval is how often the scheduler runs (default: 24 hours).
	Interval time.Duration

	// StaleDays is the number of days without a test before a control test
	// is considered overdue and reset to "untested" (default: 30).
	StaleDays int

	// BatchSize is the maximum number of overdue tests to process per cycle (default: 500).
	BatchSize int

	// Logger is passed by the controller manager.
	Logger *logger.Logger
}

ControlTestSchedulerConfig configures the controller.

type ControlTestSchedulerController added in v0.1.7

type ControlTestSchedulerController struct {
	// contains filtered or unexported fields
}

ControlTestSchedulerController automatically marks control tests as overdue when they have not been run within the configured stale window.

Design:

  • Runs every 24 hours (daily sweep).
  • Any control test not tested for >StaleDays days is reset to "untested" so it surfaces in the Detection Coverage dashboard.
  • Never blocks other operations — failures are logged and skipped.

func NewControlTestSchedulerController added in v0.1.7

func NewControlTestSchedulerController(
	repo simulation.ControlTestRepository,
	cfg *ControlTestSchedulerConfig,
) *ControlTestSchedulerController

NewControlTestSchedulerController creates a new controller.

func (*ControlTestSchedulerController) Interval added in v0.1.7

Interval implements Controller.

func (*ControlTestSchedulerController) Name added in v0.1.7

Name implements Controller.

func (*ControlTestSchedulerController) Reconcile added in v0.1.7

Reconcile finds all overdue control tests and resets their status to "untested". Returns the count of tests marked overdue.

type ControlTestSink added in v0.2.0

type ControlTestSink interface {
	// MarkOverdue flags controls whose last_tested_at + cadence <
	// now AND status = 'active'. Returns the count marked.
	MarkOverdue(ctx context.Context, now time.Time) (int64, error)
	// ExpireWithGrace flips controls that have been overdue past
	// the grace period to status='expired' and returns the tenant
	// + asset pairs that need reclassification.
	ExpireWithGrace(ctx context.Context, now time.Time, grace time.Duration) ([]ExpiredControl, error)
}

ControlTestSink is the narrow store surface the controller needs.

type Controller

type Controller interface {
	// Name returns the unique name of this controller.
	Name() string

	// Interval returns how often this controller should run.
	Interval() time.Duration

	// Reconcile performs the reconciliation logic.
	// It should be idempotent - running multiple times should have the same effect.
	// Returns the number of items processed and any error encountered.
	Reconcile(ctx context.Context) (int, error)
}

Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.

type DataExpirationController added in v0.1.2

type DataExpirationController struct {
	// contains filtered or unexported fields
}

DataExpirationController handles periodic expiration of stale data:

  • Suppression rules past their expires_at
  • Scope exclusions past their expires_at
  • Audit logs older than the retention period

Without this controller, expired suppression rules and scope exclusions remain in 'active'/'approved' status indefinitely, and audit logs accumulate without bounds.

func NewDataExpirationController added in v0.1.2

func NewDataExpirationController(
	suppressionRepo suppression.Repository,
	exclusionRepo scope.ExclusionRepository,
	auditRepo audit.Repository,
	config *DataExpirationControllerConfig,
) *DataExpirationController

NewDataExpirationController creates a new DataExpirationController.

func (*DataExpirationController) Interval added in v0.1.2

func (c *DataExpirationController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*DataExpirationController) Name added in v0.1.2

func (c *DataExpirationController) Name() string

Name returns the controller name.

func (*DataExpirationController) Reconcile added in v0.1.2

func (c *DataExpirationController) Reconcile(ctx context.Context) (int, error)

Reconcile expires stale suppression rules, scope exclusions, and old audit logs.

type DataExpirationControllerConfig added in v0.1.2

type DataExpirationControllerConfig struct {
	// Interval is how often to run the expiration check.
	// Default: 1 hour.
	Interval time.Duration

	// AuditRetentionDays is how long to keep audit logs.
	// Logs older than this will be deleted.
	// Default: 365 days (1 year).
	AuditRetentionDays int

	// Logger for logging.
	Logger *logger.Logger
}

DataExpirationControllerConfig configures the DataExpirationController.

type ExpiredControl added in v0.2.0

type ExpiredControl struct {
	TenantID  shared.ID
	ControlID shared.ID
	AssetIDs  []shared.ID
}

ExpiredControl describes one control that just expired. Used to drive the downstream reclassify sweep.

type GroupSyncController added in v0.1.2

type GroupSyncController struct {
	// contains filtered or unexported fields
}

GroupSyncController periodically synchronizes groups from external providers. It implements the Controller interface and is managed by the controller Manager.

This controller: 1. Iterates over all active tenants 2. For each tenant, triggers SyncAll to synchronize all configured providers 3. Logs sync results for monitoring

NOT WIRED: this controller is not registered in cmd/server/workers.go because app.GroupSyncService is also not constructed in services.go — the periodic sync side of the group-provider feature is not shipped. On-demand sync works through the HTTP handler (group_handler.go). A future PR that turns on periodic SCIM/LDAP sync needs to:

  1. Construct app.NewGroupSyncService(repos.Group, log) in services.go
  2. Register this controller in workers.go

func NewGroupSyncController added in v0.1.2

func NewGroupSyncController(
	syncService *app.GroupSyncService,
	tenantRepo tenant.Repository,
	config *GroupSyncControllerConfig,
) *GroupSyncController

NewGroupSyncController creates a new GroupSyncController.

func (*GroupSyncController) Interval added in v0.1.2

func (c *GroupSyncController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*GroupSyncController) Name added in v0.1.2

func (c *GroupSyncController) Name() string

Name returns the controller name.

func (*GroupSyncController) Reconcile added in v0.1.2

func (c *GroupSyncController) Reconcile(ctx context.Context) (int, error)

Reconcile performs the periodic group sync across all active tenants. Returns the number of tenants synced and any error encountered.

func (*GroupSyncController) TriggerSync added in v0.1.2

func (c *GroupSyncController) TriggerSync(ctx context.Context, tenantID shared.ID) error

TriggerSync manually triggers a sync for a specific tenant. This is used by the HTTP handler for on-demand sync requests.

type GroupSyncControllerConfig added in v0.1.2

type GroupSyncControllerConfig struct {
	// Interval is how often to run the sync.
	// Default: 1 hour.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

GroupSyncControllerConfig configures the GroupSyncController.

type JobRecoveryController

type JobRecoveryController struct {
	// contains filtered or unexported fields
}

JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.

The controller performs three main tasks:

  1. Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
  2. Expire old jobs: Mark jobs as expired if they've been in queue too long
  3. Clean up: Mark orphaned jobs as failed if they exceed retry limit

func NewJobRecoveryController

func NewJobRecoveryController(
	commandRepo command.Repository,
	config *JobRecoveryControllerConfig,
) *JobRecoveryController

NewJobRecoveryController creates a new JobRecoveryController.

func (*JobRecoveryController) Interval

func (c *JobRecoveryController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*JobRecoveryController) Name

func (c *JobRecoveryController) Name() string

Name returns the controller name.

func (*JobRecoveryController) Reconcile

func (c *JobRecoveryController) Reconcile(ctx context.Context) (int, error)

Reconcile recovers stuck jobs and expires old ones.

type JobRecoveryControllerConfig

type JobRecoveryControllerConfig struct {
	// Interval is how often to run the job recovery check.
	// Default: 60 seconds.
	Interval time.Duration

	// StuckThresholdMinutes is how long a job can be in acknowledged/running state
	// without progress before being considered stuck.
	// Default: 30 minutes.
	StuckThresholdMinutes int

	// TenantStuckThresholdMinutes is how long a tenant command can be assigned
	// to an agent without being picked up before being reassigned.
	// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
	TenantStuckThresholdMinutes int

	// MaxRetries is the maximum number of retry attempts for a job.
	// After this many retries, the job will be marked as failed.
	// Default: 3.
	MaxRetries int

	// MaxQueueMinutes is how long a job can wait in the queue before expiring.
	// Default: 60 minutes.
	MaxQueueMinutes int

	// Logger for logging.
	Logger *logger.Logger
}

JobRecoveryControllerConfig configures the JobRecoveryController.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages multiple controllers, running them in parallel goroutines.

func NewManager

func NewManager(cfg *ManagerConfig) *Manager

NewManager creates a new controller manager.

func (*Manager) ControllerCount

func (m *Manager) ControllerCount() int

ControllerCount returns the number of registered controllers.

func (*Manager) ControllerNames

func (m *Manager) ControllerNames() []string

ControllerNames returns the names of all registered controllers.

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning checks if the manager is running.

func (*Manager) Register

func (m *Manager) Register(c Controller)

Register adds a controller to the manager.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts all registered controllers.

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops all controllers gracefully.

type ManagerConfig

type ManagerConfig struct {
	// Metrics collector (optional)
	Metrics Metrics

	// Logger (required)
	Logger *logger.Logger
}

ManagerConfig configures the controller manager.

type Metrics

type Metrics interface {
	// RecordReconcile records a reconciliation run.
	RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

	// SetControllerRunning sets whether a controller is running.
	SetControllerRunning(controller string, running bool)

	// IncrementReconcileErrors increments the error counter.
	IncrementReconcileErrors(controller string)

	// SetLastReconcileTime sets the last reconcile timestamp.
	SetLastReconcileTime(controller string, t time.Time)
}

Metrics defines the interface for controller metrics collection.

type NoopMetrics

type NoopMetrics struct{}

NoopMetrics is a no-op implementation of Metrics for testing.

func (*NoopMetrics) IncrementReconcileErrors

func (m *NoopMetrics) IncrementReconcileErrors(controller string)

IncrementReconcileErrors does nothing.

func (*NoopMetrics) RecordReconcile

func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

RecordReconcile does nothing.

func (*NoopMetrics) SetControllerRunning

func (m *NoopMetrics) SetControllerRunning(controller string, running bool)

SetControllerRunning does nothing.

func (*NoopMetrics) SetLastReconcileTime

func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)

SetLastReconcileTime does nothing.

type OrderConfig added in v0.2.0

type OrderConfig struct {
	// MaxAgeBonus is the age after which an item jumps one
	// priority band. Zero disables age bonus.
	MaxAgeBonus time.Duration
	// Now is injectable for tests.
	Now func() time.Time
}

OrderConfig tunes the scheduler.

type OwnerResolutionController added in v0.2.0

type OwnerResolutionController struct {
	// contains filtered or unexported fields
}

OwnerResolutionController periodically resolves asset ownership by matching owner_ref (email/username text) to actual user accounts.

When a scanner ingests an asset, it may set owner_ref (e.g., "alice@example.com") but cannot know the internal user UUID. This controller finds those assets and populates owner_id from the users table.

Runs every 30 minutes.

func NewOwnerResolutionController added in v0.2.0

func NewOwnerResolutionController(db *sql.DB, log *logger.Logger) *OwnerResolutionController

NewOwnerResolutionController creates a new controller.

func (*OwnerResolutionController) Interval added in v0.2.0

func (c *OwnerResolutionController) Interval() time.Duration

Interval returns 30 minutes.

func (*OwnerResolutionController) Name added in v0.2.0

Name returns the controller name.

func (*OwnerResolutionController) Reconcile added in v0.2.0

func (c *OwnerResolutionController) Reconcile(ctx context.Context) (int, error)

Reconcile resolves owner_ref to owner_id for assets missing owner_id. Uses a single UPDATE with JOIN for efficiency.

type PriorityAuditRetentionConfig added in v0.2.0

type PriorityAuditRetentionConfig struct {
	// Interval between runs (default 24h).
	Interval time.Duration
	// RetentionDays — rows older than this are deleted (default 180).
	RetentionDays int
	// DryRun skips deletion and only reports count.
	DryRun bool
	// Logger for logging.
	Logger *logger.Logger
}

PriorityAuditRetentionConfig configures the controller.

type PriorityAuditRetentionController added in v0.2.0

type PriorityAuditRetentionController struct {
	// contains filtered or unexported fields
}

PriorityAuditRetentionController implements the background controller contract (Name / Interval / Reconcile).

func NewPriorityAuditRetentionController added in v0.2.0

func NewPriorityAuditRetentionController(
	repo PriorityAuditRetentionStore,
	config *PriorityAuditRetentionConfig,
) *PriorityAuditRetentionController

NewPriorityAuditRetentionController constructs the controller with sensible defaults for any zero-valued config fields.

func (*PriorityAuditRetentionController) Interval added in v0.2.0

Interval returns the reconciliation interval.

func (*PriorityAuditRetentionController) Name added in v0.2.0

Name returns the controller name.

func (*PriorityAuditRetentionController) Reconcile added in v0.2.0

Reconcile deletes rows older than the configured retention window.

type PriorityAuditRetentionStore added in v0.2.0

type PriorityAuditRetentionStore interface {
	CountOlderThan(ctx context.Context, before time.Time) (int64, error)
	DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)
}

PriorityAuditRetentionStore is the subset of PriorityAuditRepository that this controller needs. Defined locally so we don't depend on the app layer.

type PriorityReclassifyConfig added in v0.2.0

type PriorityReclassifyConfig struct {
	// Interval between sweep ticks. Default 5m.
	Interval time.Duration
	// BatchSize is the max number of requests drained per tick.
	// Default 64.
	BatchSize int
	// Logger (optional; defaults to no-op).
	Logger *logger.Logger
}

PriorityReclassifyConfig configures the sweep controller.

type PriorityReclassifyController added in v0.2.0

type PriorityReclassifyController struct {
	// contains filtered or unexported fields
}

PriorityReclassifyController drains the queue and dispatches each request to the Reclassifier.

func NewPriorityReclassifyController added in v0.2.0

func NewPriorityReclassifyController(
	queue ReclassifyQueue,
	reclassifier Reclassifier,
	cfg *PriorityReclassifyConfig,
) *PriorityReclassifyController

NewPriorityReclassifyController wires the controller.

func (*PriorityReclassifyController) Interval added in v0.2.0

Interval returns the sweep interval.

func (*PriorityReclassifyController) Name added in v0.2.0

Name returns the controller name.

func (*PriorityReclassifyController) Reconcile added in v0.2.0

func (c *PriorityReclassifyController) Reconcile(ctx context.Context) (int, error)

Reconcile drains up to BatchSize requests from the queue and applies each. Returns the total number of findings re-examined across all drained requests (for metrics).

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements the Metrics interface using Prometheus.

func NewPrometheusMetrics

func NewPrometheusMetrics(namespace string) *PrometheusMetrics

NewPrometheusMetrics creates a new PrometheusMetrics.

func (*PrometheusMetrics) IncrementReconcileErrors

func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)

IncrementReconcileErrors increments the error counter.

func (*PrometheusMetrics) RecordReconcile

func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

RecordReconcile records a reconciliation run.

func (*PrometheusMetrics) SetControllerRunning

func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)

SetControllerRunning sets whether a controller is running.

func (*PrometheusMetrics) SetLastReconcileTime

func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)

SetLastReconcileTime sets the last reconcile timestamp.

type QueueItem added in v0.2.0

type QueueItem struct {
	ID            string
	TenantID      string
	PriorityClass string // "P0" | "P1" | "P2" | "P3" | ""
	EnqueuedAt    time.Time
}

QueueItem is the minimal shape the scheduler orders. A production queue wraps this with a payload pointer; for ordering we only need the priority, tenant, and enqueue time.

func OrderBatch added in v0.2.0

func OrderBatch(items []QueueItem, cfg OrderConfig) []QueueItem

OrderBatch sorts items for dispatch. The contract:

  • Lower effectiveWeight dispatches first.
  • Within the same weight, tenants are rotated (round-robin) to prevent one tenant from monopolising the band.
  • Within the same (weight, tenant), older items dispatch first.

OrderBatch returns a NEW slice; the input is not mutated.

type QueuePriorityController

type QueuePriorityController struct {
	// contains filtered or unexported fields
}

QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)

The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)

This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.

func NewQueuePriorityController

func NewQueuePriorityController(
	commandRepo command.Repository,
	config *QueuePriorityControllerConfig,
) *QueuePriorityController

NewQueuePriorityController creates a new QueuePriorityController.

func (*QueuePriorityController) Interval

func (c *QueuePriorityController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*QueuePriorityController) Name

func (c *QueuePriorityController) Name() string

Name returns the controller name.

func (*QueuePriorityController) Reconcile

func (c *QueuePriorityController) Reconcile(ctx context.Context) (int, error)

Reconcile recalculates queue priorities for all pending platform jobs.

type QueuePriorityControllerConfig

type QueuePriorityControllerConfig struct {
	// Interval is how often to recalculate queue priorities.
	// Default: 60 seconds.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

QueuePriorityControllerConfig configures the QueuePriorityController.

type Reclassifier added in v0.2.0

type Reclassifier interface {
	// ReclassifyForRequest processes a single request. Returns the
	// number of findings re-examined (not necessarily changed — a
	// sweep that re-confirms the same class is still "work done").
	ReclassifyForRequest(ctx context.Context, req ReclassifyRequest) (int, error)
}

Reclassifier applies one request: it loads the matching open findings and invokes the PriorityClassificationService on them. Kept as a narrow interface so the controller has no knowledge of the app-layer service or its repo dependencies.

type ReclassifyQueue added in v0.2.0

type ReclassifyQueue interface {
	// Enqueue adds a reclassify request. MUST be safe for concurrent use.
	Enqueue(ctx context.Context, req ReclassifyRequest) error
	// DequeueBatch pops up to `max` requests. An empty slice (no error)
	// means "nothing to do this tick". Implementations should return
	// quickly (non-blocking).
	DequeueBatch(ctx context.Context, max int) ([]ReclassifyRequest, error)
}

ReclassifyQueue is the minimal contract for the in/out queue. Implementations may be Redis lists, Postgres advisory-locked rows, or an in-memory channel for tests.

type ReclassifyReasonKind added in v0.2.0

type ReclassifyReasonKind string

ReclassifyReasonKind enumerates why a sweep was enqueued.

const (
	ReasonEPSSRefresh   ReclassifyReasonKind = "epss_refresh"
	ReasonKEVRefresh    ReclassifyReasonKind = "kev_refresh"
	ReasonRuleChanged   ReclassifyReasonKind = "rule_changed"
	ReasonControlChange ReclassifyReasonKind = "control_change"
	ReasonAssetChange   ReclassifyReasonKind = "asset_change"
	ReasonManual        ReclassifyReasonKind = "manual"
)

type ReclassifyRequest added in v0.2.0

type ReclassifyRequest struct {
	TenantID  shared.ID
	Reason    ReclassifyReasonKind
	CVEIDs    []string
	AssetIDs  []shared.ID
	RuleID    *shared.ID
	EnqueueAt time.Time
}

ReclassifyRequest describes one unit of sweep work.

Scope dimensions (all optional; nil/empty = broadest match within tenant):

  • CVEIDs — only findings matching these CVEs (typical EPSS/KEV path).
  • AssetIDs — only findings on these assets (typical control/asset path).
  • RuleID — evaluated by the running service against each finding.

Batching is the caller's responsibility — a giant EPSS refresh should split into per-tenant requests; the controller treats each request atomically.

type RetryDispatcher added in v0.1.5

type RetryDispatcher interface {
	RetryScanRun(ctx context.Context, tenantID, scanID shared.ID, retryAttempt int) error
}

RetryDispatcher is the dependency the ScanRetryController uses to actually re-trigger a failed scan run. The scan service implements this.

type RiskSnapshotController added in v0.2.0

type RiskSnapshotController struct {
	// contains filtered or unexported fields
}

RiskSnapshotController computes daily risk snapshots for all tenants. Runs every 6 hours, but only inserts one row per tenant per day (UPSERT).

RFC-005 Gap 4: Risk Trend / Outcome Metrics.

func NewRiskSnapshotController added in v0.2.0

func NewRiskSnapshotController(db *sql.DB, log *logger.Logger) *RiskSnapshotController

NewRiskSnapshotController creates a new controller.

func (*RiskSnapshotController) Interval added in v0.2.0

func (c *RiskSnapshotController) Interval() time.Duration

Interval returns 6 hours (computes at most once per day per tenant via UPSERT).

func (*RiskSnapshotController) Name added in v0.2.0

func (c *RiskSnapshotController) Name() string

Name returns the controller name.

func (*RiskSnapshotController) Reconcile added in v0.2.0

func (c *RiskSnapshotController) Reconcile(ctx context.Context) (int, error)

Reconcile computes and persists risk snapshots for all tenants. Uses pre-aggregated CTEs with GROUP BY tenant_id for O(1) queries regardless of tenant count.

type RoleSyncController added in v0.1.2

type RoleSyncController struct {
	// contains filtered or unexported fields
}

RoleSyncController ensures tenant_members and user_roles stay in sync.

The RBAC permission system reads from user_roles, while tenant_members stores the canonical membership. A trigger (sync_tenant_member_to_user_roles) handles new INSERT/UPDATE, but entries can become desynced after:

  • Database restore from backup
  • Migration rollback and re-apply
  • Manual data manipulation
  • Trigger being temporarily disabled

This controller detects missing user_roles entries and backfills them, preventing "Access Denied" errors for affected users.

func NewRoleSyncController added in v0.1.2

func NewRoleSyncController(
	db *sql.DB,
	config *RoleSyncControllerConfig,
) *RoleSyncController

NewRoleSyncController creates a new RoleSyncController.

func (*RoleSyncController) Interval added in v0.1.2

func (c *RoleSyncController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*RoleSyncController) Name added in v0.1.2

func (c *RoleSyncController) Name() string

Name returns the controller name.

func (*RoleSyncController) Reconcile added in v0.1.2

func (c *RoleSyncController) Reconcile(ctx context.Context) (int, error)

Reconcile detects and fixes missing user_roles entries.

type RoleSyncControllerConfig added in v0.1.2

type RoleSyncControllerConfig struct {
	// Interval is how often to run the sync check.
	// Default: 1 hour.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

RoleSyncControllerConfig configures the RoleSyncController.

type SLABreachEvent added in v0.2.0

type SLABreachEvent struct {
	TenantID        shared.ID
	FindingID       shared.ID
	SLADeadline     time.Time
	OverdueDuration time.Duration
	At              time.Time
}

SLABreachEvent is emitted once per finding on the transition into the `breached` SLA state. Downstream consumers (notification outbox, Jira commenter, PagerDuty router) subscribe via the publisher.

B4: closes the feedback edge where SLA status was previously computed but never acted on. Dedup via the SLA breach transition itself — a second controller run won't re-emit because rows already in `breached` state don't match the UPDATE WHERE clause.

type SLABreachPublisher added in v0.2.0

type SLABreachPublisher interface {
	Publish(ctx context.Context, event SLABreachEvent) error
}

SLABreachPublisher delivers breach events to downstream consumers. Optional — nil publisher means "log only" (legacy behaviour).

type SLAEscalationController added in v0.2.0

type SLAEscalationController struct {
	// contains filtered or unexported fields
}

SLAEscalationController periodically checks for overdue findings and updates their sla_status to 'breached'. Runs every 15 minutes.

Note: This operates across all tenants intentionally — it's a system-level background job that marks overdue findings within their own rows. Each finding's tenant_id remains unchanged.

RFC-005 Gap 7 + B4: Automated SLA Escalation with publisher.

func NewSLAEscalationController added in v0.2.0

func NewSLAEscalationController(db *sql.DB, log *logger.Logger) *SLAEscalationController

NewSLAEscalationController creates a new SLA escalation controller.

func (*SLAEscalationController) Interval added in v0.2.0

func (c *SLAEscalationController) Interval() time.Duration

Interval returns 15 minutes.

func (*SLAEscalationController) Name added in v0.2.0

func (c *SLAEscalationController) Name() string

Name returns the controller name.

func (*SLAEscalationController) Reconcile added in v0.2.0

func (c *SLAEscalationController) Reconcile(ctx context.Context) (int, error)

Reconcile checks for overdue findings and marks them as breached.

B4: For every row newly-transitioned into `breached`, a SLABreachEvent is emitted via the publisher. Dedup is structural — the WHERE clause excludes rows already in `breached`, so a second run won't re-emit.

func (*SLAEscalationController) SetBreachPublisher added in v0.2.0

func (c *SLAEscalationController) SetBreachPublisher(p SLABreachPublisher)

SetBreachPublisher wires the breach-event publisher. Safe after construction; nil disables publishing.

type ScanRetryController added in v0.1.5

type ScanRetryController struct {
	// contains filtered or unexported fields
}

ScanRetryController periodically checks for failed pipeline runs that are eligible for automatic retry (based on the parent scan's max_retries + retry_backoff_seconds with exponential backoff) and dispatches retries through the RetryDispatcher.

func NewScanRetryController added in v0.1.5

func NewScanRetryController(
	runRepo pipeline.RunRepository,
	dispatcher RetryDispatcher,
	config *ScanRetryControllerConfig,
) *ScanRetryController

NewScanRetryController creates a new ScanRetryController.

func (*ScanRetryController) Interval added in v0.1.5

func (c *ScanRetryController) Interval() time.Duration

func (*ScanRetryController) Name added in v0.1.5

func (c *ScanRetryController) Name() string

func (*ScanRetryController) Reconcile added in v0.1.5

func (c *ScanRetryController) Reconcile(ctx context.Context) (int, error)

type ScanRetryControllerConfig added in v0.1.5

type ScanRetryControllerConfig struct {
	// Interval is how often to check for retry candidates.
	// Default: 60 seconds.
	Interval time.Duration

	// BatchSize is the maximum candidates processed per cycle.
	// Default: 100.
	BatchSize int

	Logger *logger.Logger
}

ScanRetryControllerConfig configures the ScanRetryController.

type ScanTimeoutController added in v0.1.5

type ScanTimeoutController struct {
	// contains filtered or unexported fields
}

ScanTimeoutController periodically marks pipeline_runs as timed out when they exceed their scan's configured timeout_seconds.

This complements JobRecoveryController (which marks stuck commands) by enforcing per-scan timeouts. A scan can specify its own timeout_seconds (default 1h, max 24h), and runs that exceed that are forcefully marked as timeout with an appropriate error message.

func NewScanTimeoutController added in v0.1.5

func NewScanTimeoutController(
	runRepo pipeline.RunRepository,
	config *ScanTimeoutControllerConfig,
) *ScanTimeoutController

NewScanTimeoutController creates a new ScanTimeoutController.

func (*ScanTimeoutController) Interval added in v0.1.5

func (c *ScanTimeoutController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*ScanTimeoutController) Name added in v0.1.5

func (c *ScanTimeoutController) Name() string

Name returns the controller name.

func (*ScanTimeoutController) Reconcile added in v0.1.5

func (c *ScanTimeoutController) Reconcile(ctx context.Context) (int, error)

Reconcile marks expired runs as timed out.

type ScanTimeoutControllerConfig added in v0.1.5

type ScanTimeoutControllerConfig struct {
	// Interval is how often to check for timed-out runs.
	// Default: 60 seconds.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

ScanTimeoutControllerConfig configures the ScanTimeoutController.

type ScopeReconciliationController added in v0.1.2

type ScopeReconciliationController struct {
	// contains filtered or unexported fields
}

ScopeReconciliationController periodically reconciles scope rule assignments as a safety net for the real-time event-driven hooks.

This controller: 1. Lists all tenants with active scope rules 2. For each tenant, lists all groups with active scope rules 3. Reconciles each group by re-evaluating matching assets

Design: This is a background safety net (K8s-style eventual consistency). The primary path is real-time hooks in AssetService and AssetGroupService.

func NewScopeReconciliationController added in v0.1.2

func NewScopeReconciliationController(
	acRepo accesscontrol.Repository,
	reconciler scopeGroupReconciler,
	config *ScopeReconciliationControllerConfig,
) *ScopeReconciliationController

NewScopeReconciliationController creates a new ScopeReconciliationController.

func (*ScopeReconciliationController) Interval added in v0.1.2

Interval returns the reconciliation interval.

func (*ScopeReconciliationController) Name added in v0.1.2

Name returns the controller name.

func (*ScopeReconciliationController) Reconcile added in v0.1.2

func (c *ScopeReconciliationController) Reconcile(ctx context.Context) (int, error)

Reconcile performs periodic scope rule reconciliation across all tenants. Returns the number of groups reconciled and any error encountered.

type ScopeReconciliationControllerConfig added in v0.1.2

type ScopeReconciliationControllerConfig struct {
	// Interval is how often to run the reconciliation.
	// Default: 30 minutes.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

ScopeReconciliationControllerConfig configures the ScopeReconciliationController.

type ThreatIntelRefreshController added in v0.1.7

type ThreatIntelRefreshController struct {
	// contains filtered or unexported fields
}

ThreatIntelRefreshController periodically refreshes EPSS scores and KEV catalog. Runs every 24 hours. Fetches latest data from FIRST.org (EPSS) and CISA (KEV), then persists to database via ThreatIntelService.SyncAll(). After sync, auto-escalates findings whose CVEs appear in the KEV catalog.

func NewThreatIntelRefreshController added in v0.1.7

func NewThreatIntelRefreshController(service *threat.IntelService, escalator threat.KEVEscalator, log *logger.Logger) *ThreatIntelRefreshController

NewThreatIntelRefreshController creates a new controller.

func (*ThreatIntelRefreshController) Interval added in v0.1.7

Interval returns 24 hours — daily refresh.

func (*ThreatIntelRefreshController) Name added in v0.1.7

Name returns the controller name.

func (*ThreatIntelRefreshController) Reconcile added in v0.1.7

func (c *ThreatIntelRefreshController) Reconcile(ctx context.Context) (int, error)

Reconcile fetches and persists latest EPSS + KEV data, then auto-escalates findings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL