Documentation
¶
Overview ¶
Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.
Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention
Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring
Index ¶
- type AgentHealthController
- type AgentHealthControllerConfig
- type ApprovalExpirationController
- type ApprovalExpirationControllerConfig
- type AssetLifecycleController
- type AssetLifecycleControllerConfig
- type AuditChainVerifyController
- type AuditChainVerifyControllerConfig
- type AuditRetentionController
- type AuditRetentionControllerConfig
- type ControlChangePublisher
- type ControlTestCadenceConfig
- type ControlTestCadenceController
- type ControlTestSchedulerConfig
- type ControlTestSchedulerController
- type ControlTestSink
- type Controller
- type DataExpirationController
- type DataExpirationControllerConfig
- type ExpiredControl
- type GroupSyncController
- type GroupSyncControllerConfig
- type JobRecoveryController
- type JobRecoveryControllerConfig
- type Manager
- type ManagerConfig
- type Metrics
- type NoopMetrics
- func (m *NoopMetrics) IncrementReconcileErrors(controller string)
- func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
- func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
- type OrderConfig
- type OwnerResolutionController
- type PriorityAuditRetentionConfig
- type PriorityAuditRetentionController
- type PriorityAuditRetentionStore
- type PriorityReclassifyConfig
- type PriorityReclassifyController
- type PrometheusMetrics
- func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
- func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
- func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
- type QueueItem
- type QueuePriorityController
- type QueuePriorityControllerConfig
- type Reclassifier
- type ReclassifyQueue
- type ReclassifyReasonKind
- type ReclassifyRequest
- type RetryDispatcher
- type RiskSnapshotController
- type RoleSyncController
- type RoleSyncControllerConfig
- type SLABreachEvent
- type SLABreachPublisher
- type SLAEscalationController
- type ScanRetryController
- type ScanRetryControllerConfig
- type ScanTimeoutController
- type ScanTimeoutControllerConfig
- type ScopeReconciliationController
- type ScopeReconciliationControllerConfig
- type ThreatIntelRefreshController
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHealthController ¶
type AgentHealthController struct {
// contains filtered or unexported fields
}
AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.
func NewAgentHealthController ¶
func NewAgentHealthController( agentRepo agent.Repository, config *AgentHealthControllerConfig, ) *AgentHealthController
NewAgentHealthController creates a new AgentHealthController.
func (*AgentHealthController) Interval ¶
func (c *AgentHealthController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AgentHealthController) Name ¶
func (c *AgentHealthController) Name() string
Name returns the controller name.
type AgentHealthControllerConfig ¶
type AgentHealthControllerConfig struct {
// Interval is how often to run the health check.
// Default: 30 seconds.
Interval time.Duration
// StaleTimeout is how long since last heartbeat before marking an agent as offline.
// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
StaleTimeout time.Duration
// Logger for logging.
Logger *logger.Logger
}
AgentHealthControllerConfig configures the AgentHealthController.
type ApprovalExpirationController ¶ added in v0.1.2
type ApprovalExpirationController struct {
// contains filtered or unexported fields
}
ApprovalExpirationController auto-reopens findings whose risk acceptances have expired. It scans for approved approvals with an expires_at in the past, marks them as expired, and transitions the associated findings back to "confirmed" status.
func NewApprovalExpirationController ¶ added in v0.1.2
func NewApprovalExpirationController( approvalRepo vulnerability.ApprovalRepository, findingRepo vulnerability.FindingRepository, config *ApprovalExpirationControllerConfig, ) *ApprovalExpirationController
NewApprovalExpirationController creates a new ApprovalExpirationController.
func (*ApprovalExpirationController) Interval ¶ added in v0.1.2
func (c *ApprovalExpirationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ApprovalExpirationController) Name ¶ added in v0.1.2
func (c *ApprovalExpirationController) Name() string
Name returns the controller name.
type ApprovalExpirationControllerConfig ¶ added in v0.1.2
type ApprovalExpirationControllerConfig struct {
// Interval is how often to run the expiration check.
// Default: 1 hour.
Interval time.Duration
// BatchSize is the maximum number of expired approvals to process per reconciliation.
// Default: 100.
BatchSize int
// Logger for logging.
Logger *logger.Logger
}
ApprovalExpirationControllerConfig configures the ApprovalExpirationController.
type AssetLifecycleController ¶ added in v0.2.1
type AssetLifecycleController struct {
// contains filtered or unexported fields
}
AssetLifecycleController fans the lifecycle worker out across tenants. Each tenant is handled serially to keep the implementation predictable — the worker's SQL is cheap (indexed scan) and a tenant with millions of assets is still sub-second. If we ever need parallelism we can add a tunable here without changing the controller's external interface.
func NewAssetLifecycleController ¶ added in v0.2.1
func NewAssetLifecycleController( worker *asset.AssetLifecycleWorker, tenantRepo tenant.Repository, config *AssetLifecycleControllerConfig, ) *AssetLifecycleController
NewAssetLifecycleController constructs the controller.
func (*AssetLifecycleController) Interval ¶ added in v0.2.1
func (c *AssetLifecycleController) Interval() time.Duration
Interval implements controller.Controller.
func (*AssetLifecycleController) Name ¶ added in v0.2.1
func (c *AssetLifecycleController) Name() string
Name implements controller.Controller.
func (*AssetLifecycleController) Reconcile ¶ added in v0.2.1
func (c *AssetLifecycleController) Reconcile(ctx context.Context) (int, error)
Reconcile iterates every tenant and invokes the worker against those that have opted in. Returns the total number of assets transitioned across all tenants so the reconciler metrics line up with actual work done.
Errors on individual tenants are logged but not returned — one broken tenant should not halt the pass for every other tenant. Unrecoverable failures (e.g. tenant repo lookup) do return an error so the controller runner can retry on the next tick.
type AssetLifecycleControllerConfig ¶ added in v0.2.1
type AssetLifecycleControllerConfig struct {
// Interval is how often to run. Default: 24h. Lifecycle is
// inherently slow-moving (asset thresholds are measured in days)
// so sub-daily cron frequency would only add DB churn without
// changing outcomes.
Interval time.Duration
// Logger for structured log output. Defaults to NewNop when nil.
Logger *logger.Logger
}
AssetLifecycleControllerConfig configures the daily stale-detection pass. The controller itself holds no per-tenant state; it drives the worker against every tenant on the cron cadence.
type AuditChainVerifyController ¶ added in v0.2.0
type AuditChainVerifyController struct {
// contains filtered or unexported fields
}
AuditChainVerifyController periodically walks audit_log_chain for every active tenant and surfaces breaks. The audit hash chain (migration 000154) is tamper-evident — hashes of prior rows feed into each new row — but the guarantee is only useful if someone actually runs VerifyChain. An admin endpoint exists (GET /api/v1/audit-logs/verify, audit_handler.go) but that is a pull path; a malicious insider who deletes rows at 02:00 UTC can sit undetected until someone hits that endpoint.
This controller closes the gap by running VerifyChain on a timer. On any break it:
- Logs at ERROR level with the tenant, chain_position, and reason so the SIEM can alert on the keyword "audit chain break".
- Exposes a metric via the Manager's metrics sink so alerting systems can threshold on sustained drift.
The FK on audit_log_chain.audit_log_id → audit_logs.id is ON DELETE RESTRICT (see 000154_audit_hash_chain.up.sql), so DB- level deletion is already blocked. This controller handles the out-of-band tamper cases: direct UPDATE of audit_logs fields, TRUNCATE, restore-from-different-backup, etc.
func NewAuditChainVerifyController ¶ added in v0.2.0
func NewAuditChainVerifyController( auditSvc *audit.AuditService, tenantRepo tenantdom.Repository, cfg *AuditChainVerifyControllerConfig, ) *AuditChainVerifyController
NewAuditChainVerifyController wires the controller. tenantRepo must expose ListActiveTenantIDs (verified at Reconcile time via a type assertion — we don't want a new domain method just to thread this through).
func (*AuditChainVerifyController) Interval ¶ added in v0.2.0
func (c *AuditChainVerifyController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AuditChainVerifyController) Name ¶ added in v0.2.0
func (c *AuditChainVerifyController) Name() string
Name returns the controller name.
func (*AuditChainVerifyController) Reconcile ¶ added in v0.2.0
func (c *AuditChainVerifyController) Reconcile(ctx context.Context) (int, error)
Reconcile walks every active tenant's chain once and returns the total number of tenants processed. Breaks are surfaced via the logger; the return int is the "work unit" counter the controller Manager uses for rate / health telemetry, not the break count.
type AuditChainVerifyControllerConfig ¶ added in v0.2.0
type AuditChainVerifyControllerConfig struct {
// Interval is how often to walk every active tenant's audit chain.
// Default: 1 hour. A malicious insider has at most (Interval + the
// duration of this run) before detection, so tune down if your
// compliance regime demands tighter MTTD for tamper events.
Interval time.Duration
// PerTenantLimit is the maximum chain entries walked per tenant per
// run. Matches the AuditService cap (10_000). Memory-bound; raise
// only after benchmarking.
PerTenantLimit int
Logger *logger.Logger
}
AuditChainVerifyControllerConfig configures AuditChainVerifyController.
type AuditRetentionController ¶
type AuditRetentionController struct {
// contains filtered or unexported fields
}
AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes
The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years
IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.
func NewAuditRetentionController ¶
func NewAuditRetentionController( auditRepo admin.AuditLogRepository, config *AuditRetentionControllerConfig, ) *AuditRetentionController
NewAuditRetentionController creates a new AuditRetentionController.
func (*AuditRetentionController) Interval ¶
func (c *AuditRetentionController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AuditRetentionController) Name ¶
func (c *AuditRetentionController) Name() string
Name returns the controller name.
type AuditRetentionControllerConfig ¶
type AuditRetentionControllerConfig struct {
// Interval is how often to run the retention check.
// Default: 24 hours (once a day).
Interval time.Duration
// RetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
RetentionDays int
// BatchSize is the maximum number of logs to delete in one batch.
// This prevents long-running transactions.
// Default: 10000.
BatchSize int
// DryRun if true, only counts logs that would be deleted without actually deleting.
// Useful for testing retention policies.
// Default: false.
DryRun bool
// Logger for logging.
Logger *logger.Logger
}
AuditRetentionControllerConfig configures the AuditRetentionController.
type ControlChangePublisher ¶ added in v0.2.0
type ControlChangePublisher struct {
// contains filtered or unexported fields
}
ControlChangePublisher enqueues reclassify requests scoped to the assets a changed control protects.
func NewControlChangePublisher ¶ added in v0.2.0
func NewControlChangePublisher(q ReclassifyQueue, log *logger.Logger) *ControlChangePublisher
NewControlChangePublisher wires the queue.
func (*ControlChangePublisher) PublishChange ¶ added in v0.2.0
func (p *ControlChangePublisher) PublishChange( ctx context.Context, tenantID shared.ID, assetIDs []shared.ID, reason string, )
PublishChange enqueues a reclassify request. Errors are logged but NOT returned — a failed enqueue must not roll back the control write that triggered it.
type ControlTestCadenceConfig ¶ added in v0.2.0
type ControlTestCadenceConfig struct {
Interval time.Duration // default 1h
Grace time.Duration // default 7 days
Publisher *ControlChangePublisher
Logger *logger.Logger
}
ControlTestCadenceConfig tunes the controller.
type ControlTestCadenceController ¶ added in v0.2.0
type ControlTestCadenceController struct {
// contains filtered or unexported fields
}
ControlTestCadenceController implements Name/Interval/Reconcile.
func NewControlTestCadenceController ¶ added in v0.2.0
func NewControlTestCadenceController(store ControlTestSink, cfg *ControlTestCadenceConfig) *ControlTestCadenceController
NewControlTestCadenceController wires deps with safe defaults.
func (*ControlTestCadenceController) Interval ¶ added in v0.2.0
func (c *ControlTestCadenceController) Interval() time.Duration
Interval returns the tick interval.
func (*ControlTestCadenceController) Name ¶ added in v0.2.0
func (c *ControlTestCadenceController) Name() string
Name returns the controller name.
type ControlTestSchedulerConfig ¶ added in v0.1.7
type ControlTestSchedulerConfig struct {
// Interval is how often the scheduler runs (default: 24 hours).
Interval time.Duration
// StaleDays is the number of days without a test before a control test
// is considered overdue and reset to "untested" (default: 30).
StaleDays int
// BatchSize is the maximum number of overdue tests to process per cycle (default: 500).
BatchSize int
// Logger is passed by the controller manager.
Logger *logger.Logger
}
ControlTestSchedulerConfig configures the controller.
type ControlTestSchedulerController ¶ added in v0.1.7
type ControlTestSchedulerController struct {
// contains filtered or unexported fields
}
ControlTestSchedulerController automatically marks control tests as overdue when they have not been run within the configured stale window.
Design:
- Runs every 24 hours (daily sweep).
- Any control test not tested for >StaleDays days is reset to "untested" so it surfaces in the Detection Coverage dashboard.
- Never blocks other operations — failures are logged and skipped.
func NewControlTestSchedulerController ¶ added in v0.1.7
func NewControlTestSchedulerController( repo simulation.ControlTestRepository, cfg *ControlTestSchedulerConfig, ) *ControlTestSchedulerController
NewControlTestSchedulerController creates a new controller.
func (*ControlTestSchedulerController) Interval ¶ added in v0.1.7
func (c *ControlTestSchedulerController) Interval() time.Duration
Interval implements Controller.
func (*ControlTestSchedulerController) Name ¶ added in v0.1.7
func (c *ControlTestSchedulerController) Name() string
Name implements Controller.
type ControlTestSink ¶ added in v0.2.0
type ControlTestSink interface {
// MarkOverdue flags controls whose last_tested_at + cadence <
// now AND status = 'active'. Returns the count marked.
MarkOverdue(ctx context.Context, now time.Time) (int64, error)
// ExpireWithGrace flips controls that have been overdue past
// the grace period to status='expired' and returns the tenant
// + asset pairs that need reclassification.
ExpireWithGrace(ctx context.Context, now time.Time, grace time.Duration) ([]ExpiredControl, error)
}
ControlTestSink is the narrow store surface the controller needs.
type Controller ¶
type Controller interface {
// Name returns the unique name of this controller.
Name() string
// Interval returns how often this controller should run.
Interval() time.Duration
// Reconcile performs the reconciliation logic.
// It should be idempotent - running multiple times should have the same effect.
// Returns the number of items processed and any error encountered.
Reconcile(ctx context.Context) (int, error)
}
Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.
type DataExpirationController ¶ added in v0.1.2
type DataExpirationController struct {
// contains filtered or unexported fields
}
DataExpirationController handles periodic expiration of stale data:
- Suppression rules past their expires_at
- Scope exclusions past their expires_at
- Audit logs older than the retention period
Without this controller, expired suppression rules and scope exclusions remain in 'active'/'approved' status indefinitely, and audit logs accumulate without bounds.
func NewDataExpirationController ¶ added in v0.1.2
func NewDataExpirationController( suppressionRepo suppression.Repository, exclusionRepo scope.ExclusionRepository, auditRepo audit.Repository, config *DataExpirationControllerConfig, ) *DataExpirationController
NewDataExpirationController creates a new DataExpirationController.
func (*DataExpirationController) Interval ¶ added in v0.1.2
func (c *DataExpirationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*DataExpirationController) Name ¶ added in v0.1.2
func (c *DataExpirationController) Name() string
Name returns the controller name.
type DataExpirationControllerConfig ¶ added in v0.1.2
type DataExpirationControllerConfig struct {
// Interval is how often to run the expiration check.
// Default: 1 hour.
Interval time.Duration
// AuditRetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
AuditRetentionDays int
// Logger for logging.
Logger *logger.Logger
}
DataExpirationControllerConfig configures the DataExpirationController.
type ExpiredControl ¶ added in v0.2.0
ExpiredControl describes one control that just expired. Used to drive the downstream reclassify sweep.
type GroupSyncController ¶ added in v0.1.2
type GroupSyncController struct {
// contains filtered or unexported fields
}
GroupSyncController periodically synchronizes groups from external providers. It implements the Controller interface and is managed by the controller Manager.
This controller: 1. Iterates over all active tenants 2. For each tenant, triggers SyncAll to synchronize all configured providers 3. Logs sync results for monitoring
NOT WIRED: this controller is not registered in cmd/server/workers.go because app.GroupSyncService is also not constructed in services.go — the periodic sync side of the group-provider feature is not shipped. On-demand sync works through the HTTP handler (group_handler.go). A future PR that turns on periodic SCIM/LDAP sync needs to:
- Construct app.NewGroupSyncService(repos.Group, log) in services.go
- Register this controller in workers.go
func NewGroupSyncController ¶ added in v0.1.2
func NewGroupSyncController( syncService *app.GroupSyncService, tenantRepo tenant.Repository, config *GroupSyncControllerConfig, ) *GroupSyncController
NewGroupSyncController creates a new GroupSyncController.
func (*GroupSyncController) Interval ¶ added in v0.1.2
func (c *GroupSyncController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*GroupSyncController) Name ¶ added in v0.1.2
func (c *GroupSyncController) Name() string
Name returns the controller name.
func (*GroupSyncController) Reconcile ¶ added in v0.1.2
func (c *GroupSyncController) Reconcile(ctx context.Context) (int, error)
Reconcile performs the periodic group sync across all active tenants. Returns the number of tenants synced and any error encountered.
func (*GroupSyncController) TriggerSync ¶ added in v0.1.2
TriggerSync manually triggers a sync for a specific tenant. This is used by the HTTP handler for on-demand sync requests.
type GroupSyncControllerConfig ¶ added in v0.1.2
type GroupSyncControllerConfig struct {
// Interval is how often to run the sync.
// Default: 1 hour.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
GroupSyncControllerConfig configures the GroupSyncController.
type JobRecoveryController ¶
type JobRecoveryController struct {
// contains filtered or unexported fields
}
JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.
The controller performs three main tasks:
- Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
- Expire old jobs: Mark jobs as expired if they've been in queue too long
- Clean up: Mark orphaned jobs as failed if they exceed retry limit
func NewJobRecoveryController ¶
func NewJobRecoveryController( commandRepo command.Repository, config *JobRecoveryControllerConfig, ) *JobRecoveryController
NewJobRecoveryController creates a new JobRecoveryController.
func (*JobRecoveryController) Interval ¶
func (c *JobRecoveryController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*JobRecoveryController) Name ¶
func (c *JobRecoveryController) Name() string
Name returns the controller name.
type JobRecoveryControllerConfig ¶
type JobRecoveryControllerConfig struct {
// Interval is how often to run the job recovery check.
// Default: 60 seconds.
Interval time.Duration
// StuckThresholdMinutes is how long a job can be in acknowledged/running state
// without progress before being considered stuck.
// Default: 30 minutes.
StuckThresholdMinutes int
// TenantStuckThresholdMinutes is how long a tenant command can be assigned
// to an agent without being picked up before being reassigned.
// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
TenantStuckThresholdMinutes int
// MaxRetries is the maximum number of retry attempts for a job.
// After this many retries, the job will be marked as failed.
// Default: 3.
MaxRetries int
// MaxQueueMinutes is how long a job can wait in the queue before expiring.
// Default: 60 minutes.
MaxQueueMinutes int
// Logger for logging.
Logger *logger.Logger
}
JobRecoveryControllerConfig configures the JobRecoveryController.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages multiple controllers, running them in parallel goroutines.
func NewManager ¶
func NewManager(cfg *ManagerConfig) *Manager
NewManager creates a new controller manager.
func (*Manager) ControllerCount ¶
ControllerCount returns the number of registered controllers.
func (*Manager) ControllerNames ¶
ControllerNames returns the names of all registered controllers.
func (*Manager) Register ¶
func (m *Manager) Register(c Controller)
Register adds a controller to the manager.
type ManagerConfig ¶
type ManagerConfig struct {
// Metrics collector (optional)
Metrics Metrics
// Logger (required)
Logger *logger.Logger
}
ManagerConfig configures the controller manager.
type Metrics ¶
type Metrics interface {
// RecordReconcile records a reconciliation run.
RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
// SetControllerRunning sets whether a controller is running.
SetControllerRunning(controller string, running bool)
// IncrementReconcileErrors increments the error counter.
IncrementReconcileErrors(controller string)
// SetLastReconcileTime sets the last reconcile timestamp.
SetLastReconcileTime(controller string, t time.Time)
}
Metrics defines the interface for controller metrics collection.
type NoopMetrics ¶
type NoopMetrics struct{}
NoopMetrics is a no-op implementation of Metrics for testing.
func (*NoopMetrics) IncrementReconcileErrors ¶
func (m *NoopMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors does nothing.
func (*NoopMetrics) RecordReconcile ¶
func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile does nothing.
func (*NoopMetrics) SetControllerRunning ¶
func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning does nothing.
func (*NoopMetrics) SetLastReconcileTime ¶
func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime does nothing.
type OrderConfig ¶ added in v0.2.0
type OrderConfig struct {
// MaxAgeBonus is the age after which an item jumps one
// priority band. Zero disables age bonus.
MaxAgeBonus time.Duration
// Now is injectable for tests.
Now func() time.Time
}
OrderConfig tunes the scheduler.
type OwnerResolutionController ¶ added in v0.2.0
type OwnerResolutionController struct {
// contains filtered or unexported fields
}
OwnerResolutionController periodically resolves asset ownership by matching owner_ref (email/username text) to actual user accounts.
When a scanner ingests an asset, it may set owner_ref (e.g., "alice@example.com") but cannot know the internal user UUID. This controller finds those assets and populates owner_id from the users table.
Runs every 30 minutes.
func NewOwnerResolutionController ¶ added in v0.2.0
func NewOwnerResolutionController(db *sql.DB, log *logger.Logger) *OwnerResolutionController
NewOwnerResolutionController creates a new controller.
func (*OwnerResolutionController) Interval ¶ added in v0.2.0
func (c *OwnerResolutionController) Interval() time.Duration
Interval returns 30 minutes.
func (*OwnerResolutionController) Name ¶ added in v0.2.0
func (c *OwnerResolutionController) Name() string
Name returns the controller name.
type PriorityAuditRetentionConfig ¶ added in v0.2.0
type PriorityAuditRetentionConfig struct {
// Interval between runs (default 24h).
Interval time.Duration
// RetentionDays — rows older than this are deleted (default 180).
RetentionDays int
// DryRun skips deletion and only reports count.
DryRun bool
// Logger for logging.
Logger *logger.Logger
}
PriorityAuditRetentionConfig configures the controller.
type PriorityAuditRetentionController ¶ added in v0.2.0
type PriorityAuditRetentionController struct {
// contains filtered or unexported fields
}
PriorityAuditRetentionController implements the background controller contract (Name / Interval / Reconcile).
func NewPriorityAuditRetentionController ¶ added in v0.2.0
func NewPriorityAuditRetentionController( repo PriorityAuditRetentionStore, config *PriorityAuditRetentionConfig, ) *PriorityAuditRetentionController
NewPriorityAuditRetentionController constructs the controller with sensible defaults for any zero-valued config fields.
func (*PriorityAuditRetentionController) Interval ¶ added in v0.2.0
func (c *PriorityAuditRetentionController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*PriorityAuditRetentionController) Name ¶ added in v0.2.0
func (c *PriorityAuditRetentionController) Name() string
Name returns the controller name.
type PriorityAuditRetentionStore ¶ added in v0.2.0
type PriorityAuditRetentionStore interface {
CountOlderThan(ctx context.Context, before time.Time) (int64, error)
DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)
}
PriorityAuditRetentionStore is the subset of PriorityAuditRepository that this controller needs. Defined locally so we don't depend on the app layer.
type PriorityReclassifyConfig ¶ added in v0.2.0
type PriorityReclassifyConfig struct {
// Interval between sweep ticks. Default 5m.
Interval time.Duration
// BatchSize is the max number of requests drained per tick.
// Default 64.
BatchSize int
// Logger (optional; defaults to no-op).
Logger *logger.Logger
}
PriorityReclassifyConfig configures the sweep controller.
type PriorityReclassifyController ¶ added in v0.2.0
type PriorityReclassifyController struct {
// contains filtered or unexported fields
}
PriorityReclassifyController drains the queue and dispatches each request to the Reclassifier.
func NewPriorityReclassifyController ¶ added in v0.2.0
func NewPriorityReclassifyController( queue ReclassifyQueue, reclassifier Reclassifier, cfg *PriorityReclassifyConfig, ) *PriorityReclassifyController
NewPriorityReclassifyController wires the controller.
func (*PriorityReclassifyController) Interval ¶ added in v0.2.0
func (c *PriorityReclassifyController) Interval() time.Duration
Interval returns the sweep interval.
func (*PriorityReclassifyController) Name ¶ added in v0.2.0
func (c *PriorityReclassifyController) Name() string
Name returns the controller name.
func (*PriorityReclassifyController) Reconcile ¶ added in v0.2.0
func (c *PriorityReclassifyController) Reconcile(ctx context.Context) (int, error)
Reconcile drains up to BatchSize requests from the queue and applies each. Returns the total number of findings re-examined across all drained requests (for metrics).
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements the Metrics interface using Prometheus.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(namespace string) *PrometheusMetrics
NewPrometheusMetrics creates a new PrometheusMetrics.
func (*PrometheusMetrics) IncrementReconcileErrors ¶
func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors increments the error counter.
func (*PrometheusMetrics) RecordReconcile ¶
func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile records a reconciliation run.
func (*PrometheusMetrics) SetControllerRunning ¶
func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning sets whether a controller is running.
func (*PrometheusMetrics) SetLastReconcileTime ¶
func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime sets the last reconcile timestamp.
type QueueItem ¶ added in v0.2.0
type QueueItem struct {
ID string
TenantID string
PriorityClass string // "P0" | "P1" | "P2" | "P3" | ""
EnqueuedAt time.Time
}
QueueItem is the minimal shape the scheduler orders. A production queue wraps this with a payload pointer; for ordering we only need the priority, tenant, and enqueue time.
func OrderBatch ¶ added in v0.2.0
func OrderBatch(items []QueueItem, cfg OrderConfig) []QueueItem
OrderBatch sorts items for dispatch. The contract:
- Lower effectiveWeight dispatches first.
- Within the same weight, tenants are rotated (round-robin) to prevent one tenant from monopolising the band.
- Within the same (weight, tenant), older items dispatch first.
OrderBatch returns a NEW slice; the input is not mutated.
type QueuePriorityController ¶
type QueuePriorityController struct {
// contains filtered or unexported fields
}
QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)
The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)
This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.
func NewQueuePriorityController ¶
func NewQueuePriorityController( commandRepo command.Repository, config *QueuePriorityControllerConfig, ) *QueuePriorityController
NewQueuePriorityController creates a new QueuePriorityController.
func (*QueuePriorityController) Interval ¶
func (c *QueuePriorityController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*QueuePriorityController) Name ¶
func (c *QueuePriorityController) Name() string
Name returns the controller name.
type QueuePriorityControllerConfig ¶
type QueuePriorityControllerConfig struct {
// Interval is how often to recalculate queue priorities.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
QueuePriorityControllerConfig configures the QueuePriorityController.
type Reclassifier ¶ added in v0.2.0
type Reclassifier interface {
// ReclassifyForRequest processes a single request. Returns the
// number of findings re-examined (not necessarily changed — a
// sweep that re-confirms the same class is still "work done").
ReclassifyForRequest(ctx context.Context, req ReclassifyRequest) (int, error)
}
Reclassifier applies one request: it loads the matching open findings and invokes the PriorityClassificationService on them. Kept as a narrow interface so the controller has no knowledge of the app-layer service or its repo dependencies.
type ReclassifyQueue ¶ added in v0.2.0
type ReclassifyQueue interface {
// Enqueue adds a reclassify request. MUST be safe for concurrent use.
Enqueue(ctx context.Context, req ReclassifyRequest) error
// DequeueBatch pops up to `max` requests. An empty slice (no error)
// means "nothing to do this tick". Implementations should return
// quickly (non-blocking).
DequeueBatch(ctx context.Context, max int) ([]ReclassifyRequest, error)
}
ReclassifyQueue is the minimal contract for the in/out queue. Implementations may be Redis lists, Postgres advisory-locked rows, or an in-memory channel for tests.
type ReclassifyReasonKind ¶ added in v0.2.0
type ReclassifyReasonKind string
ReclassifyReasonKind enumerates why a sweep was enqueued.
const ( ReasonEPSSRefresh ReclassifyReasonKind = "epss_refresh" ReasonKEVRefresh ReclassifyReasonKind = "kev_refresh" ReasonRuleChanged ReclassifyReasonKind = "rule_changed" ReasonControlChange ReclassifyReasonKind = "control_change" ReasonAssetChange ReclassifyReasonKind = "asset_change" ReasonManual ReclassifyReasonKind = "manual" )
type ReclassifyRequest ¶ added in v0.2.0
type ReclassifyRequest struct {
TenantID shared.ID
Reason ReclassifyReasonKind
CVEIDs []string
AssetIDs []shared.ID
RuleID *shared.ID
EnqueueAt time.Time
}
ReclassifyRequest describes one unit of sweep work.
Scope dimensions (all optional; nil/empty = broadest match within tenant):
- CVEIDs — only findings matching these CVEs (typical EPSS/KEV path).
- AssetIDs — only findings on these assets (typical control/asset path).
- RuleID — evaluated by the running service against each finding.
Batching is the caller's responsibility — a giant EPSS refresh should split into per-tenant requests; the controller treats each request atomically.
type RetryDispatcher ¶ added in v0.1.5
type RetryDispatcher interface {
RetryScanRun(ctx context.Context, tenantID, scanID shared.ID, retryAttempt int) error
}
RetryDispatcher is the dependency the ScanRetryController uses to actually re-trigger a failed scan run. The scan service implements this.
type RiskSnapshotController ¶ added in v0.2.0
type RiskSnapshotController struct {
// contains filtered or unexported fields
}
RiskSnapshotController computes daily risk snapshots for all tenants. Runs every 6 hours, but only inserts one row per tenant per day (UPSERT).
RFC-005 Gap 4: Risk Trend / Outcome Metrics.
func NewRiskSnapshotController ¶ added in v0.2.0
func NewRiskSnapshotController(db *sql.DB, log *logger.Logger) *RiskSnapshotController
NewRiskSnapshotController creates a new controller.
func (*RiskSnapshotController) Interval ¶ added in v0.2.0
func (c *RiskSnapshotController) Interval() time.Duration
Interval returns 6 hours (computes at most once per day per tenant via UPSERT).
func (*RiskSnapshotController) Name ¶ added in v0.2.0
func (c *RiskSnapshotController) Name() string
Name returns the controller name.
type RoleSyncController ¶ added in v0.1.2
type RoleSyncController struct {
// contains filtered or unexported fields
}
RoleSyncController ensures tenant_members and user_roles stay in sync.
The RBAC permission system reads from user_roles, while tenant_members stores the canonical membership. A trigger (sync_tenant_member_to_user_roles) handles new INSERT/UPDATE, but entries can become desynced after:
- Database restore from backup
- Migration rollback and re-apply
- Manual data manipulation
- Trigger being temporarily disabled
This controller detects missing user_roles entries and backfills them, preventing "Access Denied" errors for affected users.
func NewRoleSyncController ¶ added in v0.1.2
func NewRoleSyncController( db *sql.DB, config *RoleSyncControllerConfig, ) *RoleSyncController
NewRoleSyncController creates a new RoleSyncController.
func (*RoleSyncController) Interval ¶ added in v0.1.2
func (c *RoleSyncController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*RoleSyncController) Name ¶ added in v0.1.2
func (c *RoleSyncController) Name() string
Name returns the controller name.
type RoleSyncControllerConfig ¶ added in v0.1.2
type RoleSyncControllerConfig struct {
// Interval is how often to run the sync check.
// Default: 1 hour.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
RoleSyncControllerConfig configures the RoleSyncController.
type SLABreachEvent ¶ added in v0.2.0
type SLABreachEvent struct {
TenantID shared.ID
FindingID shared.ID
SLADeadline time.Time
OverdueDuration time.Duration
At time.Time
}
SLABreachEvent is emitted once per finding on the transition into the `breached` SLA state. Downstream consumers (notification outbox, Jira commenter, PagerDuty router) subscribe via the publisher.
B4: closes the feedback edge where SLA status was previously computed but never acted on. Dedup via the SLA breach transition itself — a second controller run won't re-emit because rows already in `breached` state don't match the UPDATE WHERE clause.
type SLABreachPublisher ¶ added in v0.2.0
type SLABreachPublisher interface {
Publish(ctx context.Context, event SLABreachEvent) error
}
SLABreachPublisher delivers breach events to downstream consumers. Optional — nil publisher means "log only" (legacy behaviour).
type SLAEscalationController ¶ added in v0.2.0
type SLAEscalationController struct {
// contains filtered or unexported fields
}
SLAEscalationController periodically checks for overdue findings and updates their sla_status to 'breached'. Runs every 15 minutes.
Note: This operates across all tenants intentionally — it's a system-level background job that marks overdue findings within their own rows. Each finding's tenant_id remains unchanged.
RFC-005 Gap 7 + B4: Automated SLA Escalation with publisher.
func NewSLAEscalationController ¶ added in v0.2.0
func NewSLAEscalationController(db *sql.DB, log *logger.Logger) *SLAEscalationController
NewSLAEscalationController creates a new SLA escalation controller.
func (*SLAEscalationController) Interval ¶ added in v0.2.0
func (c *SLAEscalationController) Interval() time.Duration
Interval returns 15 minutes.
func (*SLAEscalationController) Name ¶ added in v0.2.0
func (c *SLAEscalationController) Name() string
Name returns the controller name.
func (*SLAEscalationController) Reconcile ¶ added in v0.2.0
func (c *SLAEscalationController) Reconcile(ctx context.Context) (int, error)
Reconcile checks for overdue findings and marks them as breached.
B4: For every row newly-transitioned into `breached`, a SLABreachEvent is emitted via the publisher. Dedup is structural — the WHERE clause excludes rows already in `breached`, so a second run won't re-emit.
func (*SLAEscalationController) SetBreachPublisher ¶ added in v0.2.0
func (c *SLAEscalationController) SetBreachPublisher(p SLABreachPublisher)
SetBreachPublisher wires the breach-event publisher. Safe after construction; nil disables publishing.
type ScanRetryController ¶ added in v0.1.5
type ScanRetryController struct {
// contains filtered or unexported fields
}
ScanRetryController periodically checks for failed pipeline runs that are eligible for automatic retry (based on the parent scan's max_retries + retry_backoff_seconds with exponential backoff) and dispatches retries through the RetryDispatcher.
func NewScanRetryController ¶ added in v0.1.5
func NewScanRetryController( runRepo pipeline.RunRepository, dispatcher RetryDispatcher, config *ScanRetryControllerConfig, ) *ScanRetryController
NewScanRetryController creates a new ScanRetryController.
func (*ScanRetryController) Interval ¶ added in v0.1.5
func (c *ScanRetryController) Interval() time.Duration
func (*ScanRetryController) Name ¶ added in v0.1.5
func (c *ScanRetryController) Name() string
type ScanRetryControllerConfig ¶ added in v0.1.5
type ScanRetryControllerConfig struct {
// Interval is how often to check for retry candidates.
// Default: 60 seconds.
Interval time.Duration
// BatchSize is the maximum candidates processed per cycle.
// Default: 100.
BatchSize int
Logger *logger.Logger
}
ScanRetryControllerConfig configures the ScanRetryController.
type ScanTimeoutController ¶ added in v0.1.5
type ScanTimeoutController struct {
// contains filtered or unexported fields
}
ScanTimeoutController periodically marks pipeline_runs as timed out when they exceed their scan's configured timeout_seconds.
This complements JobRecoveryController (which marks stuck commands) by enforcing per-scan timeouts. A scan can specify its own timeout_seconds (default 1h, max 24h), and runs that exceed that are forcefully marked as timeout with an appropriate error message.
func NewScanTimeoutController ¶ added in v0.1.5
func NewScanTimeoutController( runRepo pipeline.RunRepository, config *ScanTimeoutControllerConfig, ) *ScanTimeoutController
NewScanTimeoutController creates a new ScanTimeoutController.
func (*ScanTimeoutController) Interval ¶ added in v0.1.5
func (c *ScanTimeoutController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ScanTimeoutController) Name ¶ added in v0.1.5
func (c *ScanTimeoutController) Name() string
Name returns the controller name.
type ScanTimeoutControllerConfig ¶ added in v0.1.5
type ScanTimeoutControllerConfig struct {
// Interval is how often to check for timed-out runs.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
ScanTimeoutControllerConfig configures the ScanTimeoutController.
type ScopeReconciliationController ¶ added in v0.1.2
type ScopeReconciliationController struct {
// contains filtered or unexported fields
}
ScopeReconciliationController periodically reconciles scope rule assignments as a safety net for the real-time event-driven hooks.
This controller: 1. Lists all tenants with active scope rules 2. For each tenant, lists all groups with active scope rules 3. Reconciles each group by re-evaluating matching assets
Design: This is a background safety net (K8s-style eventual consistency). The primary path is real-time hooks in AssetService and AssetGroupService.
func NewScopeReconciliationController ¶ added in v0.1.2
func NewScopeReconciliationController( acRepo accesscontrol.Repository, reconciler scopeGroupReconciler, config *ScopeReconciliationControllerConfig, ) *ScopeReconciliationController
NewScopeReconciliationController creates a new ScopeReconciliationController.
func (*ScopeReconciliationController) Interval ¶ added in v0.1.2
func (c *ScopeReconciliationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ScopeReconciliationController) Name ¶ added in v0.1.2
func (c *ScopeReconciliationController) Name() string
Name returns the controller name.
type ScopeReconciliationControllerConfig ¶ added in v0.1.2
type ScopeReconciliationControllerConfig struct {
// Interval is how often to run the reconciliation.
// Default: 30 minutes.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
ScopeReconciliationControllerConfig configures the ScopeReconciliationController.
type ThreatIntelRefreshController ¶ added in v0.1.7
type ThreatIntelRefreshController struct {
// contains filtered or unexported fields
}
ThreatIntelRefreshController periodically refreshes EPSS scores and KEV catalog. Runs every 24 hours. Fetches latest data from FIRST.org (EPSS) and CISA (KEV), then persists to database via ThreatIntelService.SyncAll(). After sync, auto-escalates findings whose CVEs appear in the KEV catalog.
func NewThreatIntelRefreshController ¶ added in v0.1.7
func NewThreatIntelRefreshController(service *threat.IntelService, escalator threat.KEVEscalator, log *logger.Logger) *ThreatIntelRefreshController
NewThreatIntelRefreshController creates a new controller.
func (*ThreatIntelRefreshController) Interval ¶ added in v0.1.7
func (c *ThreatIntelRefreshController) Interval() time.Duration
Interval returns 24 hours — daily refresh.
func (*ThreatIntelRefreshController) Name ¶ added in v0.1.7
func (c *ThreatIntelRefreshController) Name() string
Name returns the controller name.
Source Files
¶
- agent_health.go
- approval_expiration.go
- asset_lifecycle.go
- audit_chain_verify.go
- audit_retention.go
- control_change_publisher.go
- control_test_cadence.go
- control_test_scheduler.go
- controller.go
- data_expiration.go
- group_sync.go
- job_recovery.go
- metrics.go
- owner_resolution.go
- priority_audit_retention.go
- priority_queue_order.go
- priority_reclassify.go
- queue_priority.go
- risk_snapshot.go
- role_sync.go
- scan_retry.go
- scan_timeout.go
- scope_reconciliation.go
- sla_escalation.go
- threat_intel_refresh.go