Documentation
¶
Overview ¶
Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.
Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention
Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring
Index ¶
- type AgentHealthController
- type AgentHealthControllerConfig
- type ApprovalExpirationController
- type ApprovalExpirationControllerConfig
- type AuditChainVerifyController
- type AuditChainVerifyControllerConfig
- type AuditRetentionController
- type AuditRetentionControllerConfig
- type ControlChangePublisher
- type ControlTestCadenceConfig
- type ControlTestCadenceController
- type ControlTestSchedulerConfig
- type ControlTestSchedulerController
- type ControlTestSink
- type Controller
- type DataExpirationController
- type DataExpirationControllerConfig
- type ExpiredControl
- type GroupSyncController
- type GroupSyncControllerConfig
- type JobRecoveryController
- type JobRecoveryControllerConfig
- type Manager
- type ManagerConfig
- type Metrics
- type NoopMetrics
- func (m *NoopMetrics) IncrementReconcileErrors(controller string)
- func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
- func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
- type OrderConfig
- type OwnerResolutionController
- type PriorityAuditRetentionConfig
- type PriorityAuditRetentionController
- type PriorityAuditRetentionStore
- type PriorityReclassifyConfig
- type PriorityReclassifyController
- type PrometheusMetrics
- func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
- func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
- func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
- type QueueItem
- type QueuePriorityController
- type QueuePriorityControllerConfig
- type Reclassifier
- type ReclassifyQueue
- type ReclassifyReasonKind
- type ReclassifyRequest
- type RetryDispatcher
- type RiskSnapshotController
- type RoleSyncController
- type RoleSyncControllerConfig
- type SLABreachEvent
- type SLABreachPublisher
- type SLAEscalationController
- type ScanRetryController
- type ScanRetryControllerConfig
- type ScanTimeoutController
- type ScanTimeoutControllerConfig
- type ScopeReconciliationController
- type ScopeReconciliationControllerConfig
- type ThreatIntelRefreshController
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHealthController ¶
type AgentHealthController struct {
// contains filtered or unexported fields
}
AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.
func NewAgentHealthController ¶
func NewAgentHealthController( agentRepo agent.Repository, config *AgentHealthControllerConfig, ) *AgentHealthController
NewAgentHealthController creates a new AgentHealthController.
func (*AgentHealthController) Interval ¶
func (c *AgentHealthController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AgentHealthController) Name ¶
func (c *AgentHealthController) Name() string
Name returns the controller name.
type AgentHealthControllerConfig ¶
type AgentHealthControllerConfig struct {
// Interval is how often to run the health check.
// Default: 30 seconds.
Interval time.Duration
// StaleTimeout is how long since last heartbeat before marking an agent as offline.
// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
StaleTimeout time.Duration
// Logger for logging.
Logger *logger.Logger
}
AgentHealthControllerConfig configures the AgentHealthController.
type ApprovalExpirationController ¶ added in v0.1.2
type ApprovalExpirationController struct {
// contains filtered or unexported fields
}
ApprovalExpirationController auto-reopens findings whose risk acceptances have expired. It scans for approved approvals with an expires_at in the past, marks them as expired, and transitions the associated findings back to "confirmed" status.
func NewApprovalExpirationController ¶ added in v0.1.2
func NewApprovalExpirationController( approvalRepo vulnerability.ApprovalRepository, findingRepo vulnerability.FindingRepository, config *ApprovalExpirationControllerConfig, ) *ApprovalExpirationController
NewApprovalExpirationController creates a new ApprovalExpirationController.
func (*ApprovalExpirationController) Interval ¶ added in v0.1.2
func (c *ApprovalExpirationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ApprovalExpirationController) Name ¶ added in v0.1.2
func (c *ApprovalExpirationController) Name() string
Name returns the controller name.
type ApprovalExpirationControllerConfig ¶ added in v0.1.2
type ApprovalExpirationControllerConfig struct {
// Interval is how often to run the expiration check.
// Default: 1 hour.
Interval time.Duration
// BatchSize is the maximum number of expired approvals to process per reconciliation.
// Default: 100.
BatchSize int
// Logger for logging.
Logger *logger.Logger
}
ApprovalExpirationControllerConfig configures the ApprovalExpirationController.
type AuditChainVerifyController ¶ added in v0.2.0
type AuditChainVerifyController struct {
// contains filtered or unexported fields
}
AuditChainVerifyController periodically walks audit_log_chain for every active tenant and surfaces breaks. The audit hash chain (migration 000154) is tamper-evident — hashes of prior rows feed into each new row — but the guarantee is only useful if someone actually runs VerifyChain. An admin endpoint exists (GET /api/v1/audit-logs/verify, audit_handler.go) but that is a pull path; a malicious insider who deletes rows at 02:00 UTC can sit undetected until someone hits that endpoint.
This controller closes the gap by running VerifyChain on a timer. On any break it:
- Logs at ERROR level with the tenant, chain_position, and reason so the SIEM can alert on the keyword "audit chain break".
- Exposes a metric via the Manager's metrics sink so alerting systems can threshold on sustained drift.
The FK on audit_log_chain.audit_log_id → audit_logs.id is ON DELETE RESTRICT (see 000154_audit_hash_chain.up.sql), so DB- level deletion is already blocked. This controller handles the out-of-band tamper cases: direct UPDATE of audit_logs fields, TRUNCATE, restore-from-different-backup, etc.
func NewAuditChainVerifyController ¶ added in v0.2.0
func NewAuditChainVerifyController( auditSvc *audit.AuditService, tenantRepo tenantdom.Repository, cfg *AuditChainVerifyControllerConfig, ) *AuditChainVerifyController
NewAuditChainVerifyController wires the controller. tenantRepo must expose ListActiveTenantIDs (verified at Reconcile time via a type assertion — we don't want a new domain method just to thread this through).
func (*AuditChainVerifyController) Interval ¶ added in v0.2.0
func (c *AuditChainVerifyController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AuditChainVerifyController) Name ¶ added in v0.2.0
func (c *AuditChainVerifyController) Name() string
Name returns the controller name.
func (*AuditChainVerifyController) Reconcile ¶ added in v0.2.0
func (c *AuditChainVerifyController) Reconcile(ctx context.Context) (int, error)
Reconcile walks every active tenant's chain once and returns the total number of tenants processed. Breaks are surfaced via the logger; the return int is the "work unit" counter the controller Manager uses for rate / health telemetry, not the break count.
type AuditChainVerifyControllerConfig ¶ added in v0.2.0
type AuditChainVerifyControllerConfig struct {
// Interval is how often to walk every active tenant's audit chain.
// Default: 1 hour. A malicious insider has at most (Interval + the
// duration of this run) before detection, so tune down if your
// compliance regime demands tighter MTTD for tamper events.
Interval time.Duration
// PerTenantLimit is the maximum chain entries walked per tenant per
// run. Matches the AuditService cap (10_000). Memory-bound; raise
// only after benchmarking.
PerTenantLimit int
Logger *logger.Logger
}
AuditChainVerifyControllerConfig configures AuditChainVerifyController.
type AuditRetentionController ¶
type AuditRetentionController struct {
// contains filtered or unexported fields
}
AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes
The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years
IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.
func NewAuditRetentionController ¶
func NewAuditRetentionController( auditRepo admin.AuditLogRepository, config *AuditRetentionControllerConfig, ) *AuditRetentionController
NewAuditRetentionController creates a new AuditRetentionController.
func (*AuditRetentionController) Interval ¶
func (c *AuditRetentionController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AuditRetentionController) Name ¶
func (c *AuditRetentionController) Name() string
Name returns the controller name.
type AuditRetentionControllerConfig ¶
type AuditRetentionControllerConfig struct {
// Interval is how often to run the retention check.
// Default: 24 hours (once a day).
Interval time.Duration
// RetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
RetentionDays int
// BatchSize is the maximum number of logs to delete in one batch.
// This prevents long-running transactions.
// Default: 10000.
BatchSize int
// DryRun if true, only counts logs that would be deleted without actually deleting.
// Useful for testing retention policies.
// Default: false.
DryRun bool
// Logger for logging.
Logger *logger.Logger
}
AuditRetentionControllerConfig configures the AuditRetentionController.
type ControlChangePublisher ¶ added in v0.2.0
type ControlChangePublisher struct {
// contains filtered or unexported fields
}
ControlChangePublisher enqueues reclassify requests scoped to the assets a changed control protects.
func NewControlChangePublisher ¶ added in v0.2.0
func NewControlChangePublisher(q ReclassifyQueue, log *logger.Logger) *ControlChangePublisher
NewControlChangePublisher wires the queue.
func (*ControlChangePublisher) PublishChange ¶ added in v0.2.0
func (p *ControlChangePublisher) PublishChange( ctx context.Context, tenantID shared.ID, assetIDs []shared.ID, reason string, )
PublishChange enqueues a reclassify request. Errors are logged but NOT returned — a failed enqueue must not roll back the control write that triggered it.
type ControlTestCadenceConfig ¶ added in v0.2.0
type ControlTestCadenceConfig struct {
Interval time.Duration // default 1h
Grace time.Duration // default 7 days
Publisher *ControlChangePublisher
Logger *logger.Logger
}
ControlTestCadenceConfig tunes the controller.
type ControlTestCadenceController ¶ added in v0.2.0
type ControlTestCadenceController struct {
// contains filtered or unexported fields
}
ControlTestCadenceController implements Name/Interval/Reconcile.
func NewControlTestCadenceController ¶ added in v0.2.0
func NewControlTestCadenceController(store ControlTestSink, cfg *ControlTestCadenceConfig) *ControlTestCadenceController
NewControlTestCadenceController wires deps with safe defaults.
func (*ControlTestCadenceController) Interval ¶ added in v0.2.0
func (c *ControlTestCadenceController) Interval() time.Duration
Interval returns the tick interval.
func (*ControlTestCadenceController) Name ¶ added in v0.2.0
func (c *ControlTestCadenceController) Name() string
Name returns the controller name.
type ControlTestSchedulerConfig ¶ added in v0.1.7
type ControlTestSchedulerConfig struct {
// Interval is how often the scheduler runs (default: 24 hours).
Interval time.Duration
// StaleDays is the number of days without a test before a control test
// is considered overdue and reset to "untested" (default: 30).
StaleDays int
// BatchSize is the maximum number of overdue tests to process per cycle (default: 500).
BatchSize int
// Logger is passed by the controller manager.
Logger *logger.Logger
}
ControlTestSchedulerConfig configures the controller.
type ControlTestSchedulerController ¶ added in v0.1.7
type ControlTestSchedulerController struct {
// contains filtered or unexported fields
}
ControlTestSchedulerController automatically marks control tests as overdue when they have not been run within the configured stale window.
Design:
- Runs every 24 hours (daily sweep).
- Any control test not tested for >StaleDays days is reset to "untested" so it surfaces in the Detection Coverage dashboard.
- Never blocks other operations — failures are logged and skipped.
func NewControlTestSchedulerController ¶ added in v0.1.7
func NewControlTestSchedulerController( repo simulation.ControlTestRepository, cfg *ControlTestSchedulerConfig, ) *ControlTestSchedulerController
NewControlTestSchedulerController creates a new controller.
func (*ControlTestSchedulerController) Interval ¶ added in v0.1.7
func (c *ControlTestSchedulerController) Interval() time.Duration
Interval implements Controller.
func (*ControlTestSchedulerController) Name ¶ added in v0.1.7
func (c *ControlTestSchedulerController) Name() string
Name implements Controller.
type ControlTestSink ¶ added in v0.2.0
type ControlTestSink interface {
// MarkOverdue flags controls whose last_tested_at + cadence <
// now AND status = 'active'. Returns the count marked.
MarkOverdue(ctx context.Context, now time.Time) (int64, error)
// ExpireWithGrace flips controls that have been overdue past
// the grace period to status='expired' and returns the tenant
// + asset pairs that need reclassification.
ExpireWithGrace(ctx context.Context, now time.Time, grace time.Duration) ([]ExpiredControl, error)
}
ControlTestSink is the narrow store surface the controller needs.
type Controller ¶
type Controller interface {
// Name returns the unique name of this controller.
Name() string
// Interval returns how often this controller should run.
Interval() time.Duration
// Reconcile performs the reconciliation logic.
// It should be idempotent - running multiple times should have the same effect.
// Returns the number of items processed and any error encountered.
Reconcile(ctx context.Context) (int, error)
}
Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.
type DataExpirationController ¶ added in v0.1.2
type DataExpirationController struct {
// contains filtered or unexported fields
}
DataExpirationController handles periodic expiration of stale data:
- Suppression rules past their expires_at
- Scope exclusions past their expires_at
- Audit logs older than the retention period
Without this controller, expired suppression rules and scope exclusions remain in 'active'/'approved' status indefinitely, and audit logs accumulate without bounds.
func NewDataExpirationController ¶ added in v0.1.2
func NewDataExpirationController( suppressionRepo suppression.Repository, exclusionRepo scope.ExclusionRepository, auditRepo audit.Repository, config *DataExpirationControllerConfig, ) *DataExpirationController
NewDataExpirationController creates a new DataExpirationController.
func (*DataExpirationController) Interval ¶ added in v0.1.2
func (c *DataExpirationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*DataExpirationController) Name ¶ added in v0.1.2
func (c *DataExpirationController) Name() string
Name returns the controller name.
type DataExpirationControllerConfig ¶ added in v0.1.2
type DataExpirationControllerConfig struct {
// Interval is how often to run the expiration check.
// Default: 1 hour.
Interval time.Duration
// AuditRetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
AuditRetentionDays int
// Logger for logging.
Logger *logger.Logger
}
DataExpirationControllerConfig configures the DataExpirationController.
type ExpiredControl ¶ added in v0.2.0
ExpiredControl describes one control that just expired. Used to drive the downstream reclassify sweep.
type GroupSyncController ¶ added in v0.1.2
type GroupSyncController struct {
// contains filtered or unexported fields
}
GroupSyncController periodically synchronizes groups from external providers. It implements the Controller interface and is managed by the controller Manager.
This controller: 1. Iterates over all active tenants 2. For each tenant, triggers SyncAll to synchronize all configured providers 3. Logs sync results for monitoring
NOT WIRED: this controller is not registered in cmd/server/workers.go because app.GroupSyncService is also not constructed in services.go — the periodic sync side of the group-provider feature is not shipped. On-demand sync works through the HTTP handler (group_handler.go). A future PR that turns on periodic SCIM/LDAP sync needs to:
- Construct app.NewGroupSyncService(repos.Group, log) in services.go
- Register this controller in workers.go
func NewGroupSyncController ¶ added in v0.1.2
func NewGroupSyncController( syncService *app.GroupSyncService, tenantRepo tenant.Repository, config *GroupSyncControllerConfig, ) *GroupSyncController
NewGroupSyncController creates a new GroupSyncController.
func (*GroupSyncController) Interval ¶ added in v0.1.2
func (c *GroupSyncController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*GroupSyncController) Name ¶ added in v0.1.2
func (c *GroupSyncController) Name() string
Name returns the controller name.
func (*GroupSyncController) Reconcile ¶ added in v0.1.2
func (c *GroupSyncController) Reconcile(ctx context.Context) (int, error)
Reconcile performs the periodic group sync across all active tenants. Returns the number of tenants synced and any error encountered.
func (*GroupSyncController) TriggerSync ¶ added in v0.1.2
TriggerSync manually triggers a sync for a specific tenant. This is used by the HTTP handler for on-demand sync requests.
type GroupSyncControllerConfig ¶ added in v0.1.2
type GroupSyncControllerConfig struct {
// Interval is how often to run the sync.
// Default: 1 hour.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
GroupSyncControllerConfig configures the GroupSyncController.
type JobRecoveryController ¶
type JobRecoveryController struct {
// contains filtered or unexported fields
}
JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.
The controller performs three main tasks:
- Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
- Expire old jobs: Mark jobs as expired if they've been in queue too long
- Clean up: Mark orphaned jobs as failed if they exceed retry limit
func NewJobRecoveryController ¶
func NewJobRecoveryController( commandRepo command.Repository, config *JobRecoveryControllerConfig, ) *JobRecoveryController
NewJobRecoveryController creates a new JobRecoveryController.
func (*JobRecoveryController) Interval ¶
func (c *JobRecoveryController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*JobRecoveryController) Name ¶
func (c *JobRecoveryController) Name() string
Name returns the controller name.
type JobRecoveryControllerConfig ¶
type JobRecoveryControllerConfig struct {
// Interval is how often to run the job recovery check.
// Default: 60 seconds.
Interval time.Duration
// StuckThresholdMinutes is how long a job can be in acknowledged/running state
// without progress before being considered stuck.
// Default: 30 minutes.
StuckThresholdMinutes int
// TenantStuckThresholdMinutes is how long a tenant command can be assigned
// to an agent without being picked up before being reassigned.
// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
TenantStuckThresholdMinutes int
// MaxRetries is the maximum number of retry attempts for a job.
// After this many retries, the job will be marked as failed.
// Default: 3.
MaxRetries int
// MaxQueueMinutes is how long a job can wait in the queue before expiring.
// Default: 60 minutes.
MaxQueueMinutes int
// Logger for logging.
Logger *logger.Logger
}
JobRecoveryControllerConfig configures the JobRecoveryController.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages multiple controllers, running them in parallel goroutines.
func NewManager ¶
func NewManager(cfg *ManagerConfig) *Manager
NewManager creates a new controller manager.
func (*Manager) ControllerCount ¶
ControllerCount returns the number of registered controllers.
func (*Manager) ControllerNames ¶
ControllerNames returns the names of all registered controllers.
func (*Manager) Register ¶
func (m *Manager) Register(c Controller)
Register adds a controller to the manager.
type ManagerConfig ¶
type ManagerConfig struct {
// Metrics collector (optional)
Metrics Metrics
// Logger (required)
Logger *logger.Logger
}
ManagerConfig configures the controller manager.
type Metrics ¶
type Metrics interface {
// RecordReconcile records a reconciliation run.
RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
// SetControllerRunning sets whether a controller is running.
SetControllerRunning(controller string, running bool)
// IncrementReconcileErrors increments the error counter.
IncrementReconcileErrors(controller string)
// SetLastReconcileTime sets the last reconcile timestamp.
SetLastReconcileTime(controller string, t time.Time)
}
Metrics defines the interface for controller metrics collection.
type NoopMetrics ¶
type NoopMetrics struct{}
NoopMetrics is a no-op implementation of Metrics for testing.
func (*NoopMetrics) IncrementReconcileErrors ¶
func (m *NoopMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors does nothing.
func (*NoopMetrics) RecordReconcile ¶
func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile does nothing.
func (*NoopMetrics) SetControllerRunning ¶
func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning does nothing.
func (*NoopMetrics) SetLastReconcileTime ¶
func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime does nothing.
type OrderConfig ¶ added in v0.2.0
type OrderConfig struct {
// MaxAgeBonus is the age after which an item jumps one
// priority band. Zero disables age bonus.
MaxAgeBonus time.Duration
// Now is injectable for tests.
Now func() time.Time
}
OrderConfig tunes the scheduler.
type OwnerResolutionController ¶ added in v0.2.0
type OwnerResolutionController struct {
// contains filtered or unexported fields
}
OwnerResolutionController periodically resolves asset ownership by matching owner_ref (email/username text) to actual user accounts.
When a scanner ingests an asset, it may set owner_ref (e.g., "alice@example.com") but cannot know the internal user UUID. This controller finds those assets and populates owner_id from the users table.
Runs every 30 minutes.
func NewOwnerResolutionController ¶ added in v0.2.0
func NewOwnerResolutionController(db *sql.DB, log *logger.Logger) *OwnerResolutionController
NewOwnerResolutionController creates a new controller.
func (*OwnerResolutionController) Interval ¶ added in v0.2.0
func (c *OwnerResolutionController) Interval() time.Duration
Interval returns 30 minutes.
func (*OwnerResolutionController) Name ¶ added in v0.2.0
func (c *OwnerResolutionController) Name() string
Name returns the controller name.
type PriorityAuditRetentionConfig ¶ added in v0.2.0
type PriorityAuditRetentionConfig struct {
// Interval between runs (default 24h).
Interval time.Duration
// RetentionDays — rows older than this are deleted (default 180).
RetentionDays int
// DryRun skips deletion and only reports count.
DryRun bool
// Logger for logging.
Logger *logger.Logger
}
PriorityAuditRetentionConfig configures the controller.
type PriorityAuditRetentionController ¶ added in v0.2.0
type PriorityAuditRetentionController struct {
// contains filtered or unexported fields
}
PriorityAuditRetentionController implements the background controller contract (Name / Interval / Reconcile).
func NewPriorityAuditRetentionController ¶ added in v0.2.0
func NewPriorityAuditRetentionController( repo PriorityAuditRetentionStore, config *PriorityAuditRetentionConfig, ) *PriorityAuditRetentionController
NewPriorityAuditRetentionController constructs the controller with sensible defaults for any zero-valued config fields.
func (*PriorityAuditRetentionController) Interval ¶ added in v0.2.0
func (c *PriorityAuditRetentionController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*PriorityAuditRetentionController) Name ¶ added in v0.2.0
func (c *PriorityAuditRetentionController) Name() string
Name returns the controller name.
type PriorityAuditRetentionStore ¶ added in v0.2.0
type PriorityAuditRetentionStore interface {
CountOlderThan(ctx context.Context, before time.Time) (int64, error)
DeleteOlderThan(ctx context.Context, before time.Time) (int64, error)
}
PriorityAuditRetentionStore is the subset of PriorityAuditRepository that this controller needs. Defined locally so we don't depend on the app layer.
type PriorityReclassifyConfig ¶ added in v0.2.0
type PriorityReclassifyConfig struct {
// Interval between sweep ticks. Default 5m.
Interval time.Duration
// BatchSize is the max number of requests drained per tick.
// Default 64.
BatchSize int
// Logger (optional; defaults to no-op).
Logger *logger.Logger
}
PriorityReclassifyConfig configures the sweep controller.
type PriorityReclassifyController ¶ added in v0.2.0
type PriorityReclassifyController struct {
// contains filtered or unexported fields
}
PriorityReclassifyController drains the queue and dispatches each request to the Reclassifier.
func NewPriorityReclassifyController ¶ added in v0.2.0
func NewPriorityReclassifyController( queue ReclassifyQueue, reclassifier Reclassifier, cfg *PriorityReclassifyConfig, ) *PriorityReclassifyController
NewPriorityReclassifyController wires the controller.
func (*PriorityReclassifyController) Interval ¶ added in v0.2.0
func (c *PriorityReclassifyController) Interval() time.Duration
Interval returns the sweep interval.
func (*PriorityReclassifyController) Name ¶ added in v0.2.0
func (c *PriorityReclassifyController) Name() string
Name returns the controller name.
func (*PriorityReclassifyController) Reconcile ¶ added in v0.2.0
func (c *PriorityReclassifyController) Reconcile(ctx context.Context) (int, error)
Reconcile drains up to BatchSize requests from the queue and applies each. Returns the total number of findings re-examined across all drained requests (for metrics).
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements the Metrics interface using Prometheus.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(namespace string) *PrometheusMetrics
NewPrometheusMetrics creates a new PrometheusMetrics.
func (*PrometheusMetrics) IncrementReconcileErrors ¶
func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors increments the error counter.
func (*PrometheusMetrics) RecordReconcile ¶
func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile records a reconciliation run.
func (*PrometheusMetrics) SetControllerRunning ¶
func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning sets whether a controller is running.
func (*PrometheusMetrics) SetLastReconcileTime ¶
func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime sets the last reconcile timestamp.
type QueueItem ¶ added in v0.2.0
type QueueItem struct {
ID string
TenantID string
PriorityClass string // "P0" | "P1" | "P2" | "P3" | ""
EnqueuedAt time.Time
}
QueueItem is the minimal shape the scheduler orders. A production queue wraps this with a payload pointer; for ordering we only need the priority, tenant, and enqueue time.
func OrderBatch ¶ added in v0.2.0
func OrderBatch(items []QueueItem, cfg OrderConfig) []QueueItem
OrderBatch sorts items for dispatch. The contract:
- Lower effectiveWeight dispatches first.
- Within the same weight, tenants are rotated (round-robin) to prevent one tenant from monopolising the band.
- Within the same (weight, tenant), older items dispatch first.
OrderBatch returns a NEW slice; the input is not mutated.
type QueuePriorityController ¶
type QueuePriorityController struct {
// contains filtered or unexported fields
}
QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)
The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)
This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.
func NewQueuePriorityController ¶
func NewQueuePriorityController( commandRepo command.Repository, config *QueuePriorityControllerConfig, ) *QueuePriorityController
NewQueuePriorityController creates a new QueuePriorityController.
func (*QueuePriorityController) Interval ¶
func (c *QueuePriorityController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*QueuePriorityController) Name ¶
func (c *QueuePriorityController) Name() string
Name returns the controller name.
type QueuePriorityControllerConfig ¶
type QueuePriorityControllerConfig struct {
// Interval is how often to recalculate queue priorities.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
QueuePriorityControllerConfig configures the QueuePriorityController.
type Reclassifier ¶ added in v0.2.0
type Reclassifier interface {
// ReclassifyForRequest processes a single request. Returns the
// number of findings re-examined (not necessarily changed — a
// sweep that re-confirms the same class is still "work done").
ReclassifyForRequest(ctx context.Context, req ReclassifyRequest) (int, error)
}
Reclassifier applies one request: it loads the matching open findings and invokes the PriorityClassificationService on them. Kept as a narrow interface so the controller has no knowledge of the app-layer service or its repo dependencies.
type ReclassifyQueue ¶ added in v0.2.0
type ReclassifyQueue interface {
// Enqueue adds a reclassify request. MUST be safe for concurrent use.
Enqueue(ctx context.Context, req ReclassifyRequest) error
// DequeueBatch pops up to `max` requests. An empty slice (no error)
// means "nothing to do this tick". Implementations should return
// quickly (non-blocking).
DequeueBatch(ctx context.Context, max int) ([]ReclassifyRequest, error)
}
ReclassifyQueue is the minimal contract for the in/out queue. Implementations may be Redis lists, Postgres advisory-locked rows, or an in-memory channel for tests.
type ReclassifyReasonKind ¶ added in v0.2.0
type ReclassifyReasonKind string
ReclassifyReasonKind enumerates why a sweep was enqueued.
const ( ReasonEPSSRefresh ReclassifyReasonKind = "epss_refresh" ReasonKEVRefresh ReclassifyReasonKind = "kev_refresh" ReasonRuleChanged ReclassifyReasonKind = "rule_changed" ReasonControlChange ReclassifyReasonKind = "control_change" ReasonAssetChange ReclassifyReasonKind = "asset_change" ReasonManual ReclassifyReasonKind = "manual" )
type ReclassifyRequest ¶ added in v0.2.0
type ReclassifyRequest struct {
TenantID shared.ID
Reason ReclassifyReasonKind
CVEIDs []string
AssetIDs []shared.ID
RuleID *shared.ID
EnqueueAt time.Time
}
ReclassifyRequest describes one unit of sweep work.
Scope dimensions (all optional; nil/empty = broadest match within tenant):
- CVEIDs — only findings matching these CVEs (typical EPSS/KEV path).
- AssetIDs — only findings on these assets (typical control/asset path).
- RuleID — evaluated by the running service against each finding.
Batching is the caller's responsibility — a giant EPSS refresh should split into per-tenant requests; the controller treats each request atomically.
type RetryDispatcher ¶ added in v0.1.5
type RetryDispatcher interface {
RetryScanRun(ctx context.Context, tenantID, scanID shared.ID, retryAttempt int) error
}
RetryDispatcher is the dependency the ScanRetryController uses to actually re-trigger a failed scan run. The scan service implements this.
type RiskSnapshotController ¶ added in v0.2.0
type RiskSnapshotController struct {
// contains filtered or unexported fields
}
RiskSnapshotController computes daily risk snapshots for all tenants. Runs every 6 hours, but only inserts one row per tenant per day (UPSERT).
RFC-005 Gap 4: Risk Trend / Outcome Metrics.
func NewRiskSnapshotController ¶ added in v0.2.0
func NewRiskSnapshotController(db *sql.DB, log *logger.Logger) *RiskSnapshotController
NewRiskSnapshotController creates a new controller.
func (*RiskSnapshotController) Interval ¶ added in v0.2.0
func (c *RiskSnapshotController) Interval() time.Duration
Interval returns 6 hours (computes at most once per day per tenant via UPSERT).
func (*RiskSnapshotController) Name ¶ added in v0.2.0
func (c *RiskSnapshotController) Name() string
Name returns the controller name.
type RoleSyncController ¶ added in v0.1.2
type RoleSyncController struct {
// contains filtered or unexported fields
}
RoleSyncController ensures tenant_members and user_roles stay in sync.
The RBAC permission system reads from user_roles, while tenant_members stores the canonical membership. A trigger (sync_tenant_member_to_user_roles) handles new INSERT/UPDATE, but entries can become desynced after:
- Database restore from backup
- Migration rollback and re-apply
- Manual data manipulation
- Trigger being temporarily disabled
This controller detects missing user_roles entries and backfills them, preventing "Access Denied" errors for affected users.
func NewRoleSyncController ¶ added in v0.1.2
func NewRoleSyncController( db *sql.DB, config *RoleSyncControllerConfig, ) *RoleSyncController
NewRoleSyncController creates a new RoleSyncController.
func (*RoleSyncController) Interval ¶ added in v0.1.2
func (c *RoleSyncController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*RoleSyncController) Name ¶ added in v0.1.2
func (c *RoleSyncController) Name() string
Name returns the controller name.
type RoleSyncControllerConfig ¶ added in v0.1.2
type RoleSyncControllerConfig struct {
// Interval is how often to run the sync check.
// Default: 1 hour.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
RoleSyncControllerConfig configures the RoleSyncController.
type SLABreachEvent ¶ added in v0.2.0
type SLABreachEvent struct {
TenantID shared.ID
FindingID shared.ID
SLADeadline time.Time
OverdueDuration time.Duration
At time.Time
}
SLABreachEvent is emitted once per finding on the transition into the `breached` SLA state. Downstream consumers (notification outbox, Jira commenter, PagerDuty router) subscribe via the publisher.
B4: closes the feedback edge where SLA status was previously computed but never acted on. Dedup via the SLA breach transition itself — a second controller run won't re-emit because rows already in `breached` state don't match the UPDATE WHERE clause.
type SLABreachPublisher ¶ added in v0.2.0
type SLABreachPublisher interface {
Publish(ctx context.Context, event SLABreachEvent) error
}
SLABreachPublisher delivers breach events to downstream consumers. Optional — nil publisher means "log only" (legacy behaviour).
type SLAEscalationController ¶ added in v0.2.0
type SLAEscalationController struct {
// contains filtered or unexported fields
}
SLAEscalationController periodically checks for overdue findings and updates their sla_status to 'breached'. Runs every 15 minutes.
Note: This operates across all tenants intentionally — it's a system-level background job that marks overdue findings within their own rows. Each finding's tenant_id remains unchanged.
RFC-005 Gap 7 + B4: Automated SLA Escalation with publisher.
func NewSLAEscalationController ¶ added in v0.2.0
func NewSLAEscalationController(db *sql.DB, log *logger.Logger) *SLAEscalationController
NewSLAEscalationController creates a new SLA escalation controller.
func (*SLAEscalationController) Interval ¶ added in v0.2.0
func (c *SLAEscalationController) Interval() time.Duration
Interval returns 15 minutes.
func (*SLAEscalationController) Name ¶ added in v0.2.0
func (c *SLAEscalationController) Name() string
Name returns the controller name.
func (*SLAEscalationController) Reconcile ¶ added in v0.2.0
func (c *SLAEscalationController) Reconcile(ctx context.Context) (int, error)
Reconcile checks for overdue findings and marks them as breached.
B4: For every row newly-transitioned into `breached`, a SLABreachEvent is emitted via the publisher. Dedup is structural — the WHERE clause excludes rows already in `breached`, so a second run won't re-emit.
func (*SLAEscalationController) SetBreachPublisher ¶ added in v0.2.0
func (c *SLAEscalationController) SetBreachPublisher(p SLABreachPublisher)
SetBreachPublisher wires the breach-event publisher. Safe after construction; nil disables publishing.
type ScanRetryController ¶ added in v0.1.5
type ScanRetryController struct {
// contains filtered or unexported fields
}
ScanRetryController periodically checks for failed pipeline runs that are eligible for automatic retry (based on the parent scan's max_retries + retry_backoff_seconds with exponential backoff) and dispatches retries through the RetryDispatcher.
func NewScanRetryController ¶ added in v0.1.5
func NewScanRetryController( runRepo pipeline.RunRepository, dispatcher RetryDispatcher, config *ScanRetryControllerConfig, ) *ScanRetryController
NewScanRetryController creates a new ScanRetryController.
func (*ScanRetryController) Interval ¶ added in v0.1.5
func (c *ScanRetryController) Interval() time.Duration
func (*ScanRetryController) Name ¶ added in v0.1.5
func (c *ScanRetryController) Name() string
type ScanRetryControllerConfig ¶ added in v0.1.5
type ScanRetryControllerConfig struct {
// Interval is how often to check for retry candidates.
// Default: 60 seconds.
Interval time.Duration
// BatchSize is the maximum candidates processed per cycle.
// Default: 100.
BatchSize int
Logger *logger.Logger
}
ScanRetryControllerConfig configures the ScanRetryController.
type ScanTimeoutController ¶ added in v0.1.5
type ScanTimeoutController struct {
// contains filtered or unexported fields
}
ScanTimeoutController periodically marks pipeline_runs as timed out when they exceed their scan's configured timeout_seconds.
This complements JobRecoveryController (which marks stuck commands) by enforcing per-scan timeouts. A scan can specify its own timeout_seconds (default 1h, max 24h), and runs that exceed that are forcefully marked as timeout with an appropriate error message.
func NewScanTimeoutController ¶ added in v0.1.5
func NewScanTimeoutController( runRepo pipeline.RunRepository, config *ScanTimeoutControllerConfig, ) *ScanTimeoutController
NewScanTimeoutController creates a new ScanTimeoutController.
func (*ScanTimeoutController) Interval ¶ added in v0.1.5
func (c *ScanTimeoutController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ScanTimeoutController) Name ¶ added in v0.1.5
func (c *ScanTimeoutController) Name() string
Name returns the controller name.
type ScanTimeoutControllerConfig ¶ added in v0.1.5
type ScanTimeoutControllerConfig struct {
// Interval is how often to check for timed-out runs.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
ScanTimeoutControllerConfig configures the ScanTimeoutController.
type ScopeReconciliationController ¶ added in v0.1.2
type ScopeReconciliationController struct {
// contains filtered or unexported fields
}
ScopeReconciliationController periodically reconciles scope rule assignments as a safety net for the real-time event-driven hooks.
This controller: 1. Lists all tenants with active scope rules 2. For each tenant, lists all groups with active scope rules 3. Reconciles each group by re-evaluating matching assets
Design: This is a background safety net (K8s-style eventual consistency). The primary path is real-time hooks in AssetService and AssetGroupService.
func NewScopeReconciliationController ¶ added in v0.1.2
func NewScopeReconciliationController( acRepo accesscontrol.Repository, reconciler scopeGroupReconciler, config *ScopeReconciliationControllerConfig, ) *ScopeReconciliationController
NewScopeReconciliationController creates a new ScopeReconciliationController.
func (*ScopeReconciliationController) Interval ¶ added in v0.1.2
func (c *ScopeReconciliationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ScopeReconciliationController) Name ¶ added in v0.1.2
func (c *ScopeReconciliationController) Name() string
Name returns the controller name.
type ScopeReconciliationControllerConfig ¶ added in v0.1.2
type ScopeReconciliationControllerConfig struct {
// Interval is how often to run the reconciliation.
// Default: 30 minutes.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
ScopeReconciliationControllerConfig configures the ScopeReconciliationController.
type ThreatIntelRefreshController ¶ added in v0.1.7
type ThreatIntelRefreshController struct {
// contains filtered or unexported fields
}
ThreatIntelRefreshController periodically refreshes EPSS scores and KEV catalog. Runs every 24 hours. Fetches latest data from FIRST.org (EPSS) and CISA (KEV), then persists to database via ThreatIntelService.SyncAll(). After sync, auto-escalates findings whose CVEs appear in the KEV catalog.
func NewThreatIntelRefreshController ¶ added in v0.1.7
func NewThreatIntelRefreshController(service *threat.IntelService, escalator threat.KEVEscalator, log *logger.Logger) *ThreatIntelRefreshController
NewThreatIntelRefreshController creates a new controller.
func (*ThreatIntelRefreshController) Interval ¶ added in v0.1.7
func (c *ThreatIntelRefreshController) Interval() time.Duration
Interval returns 24 hours — daily refresh.
func (*ThreatIntelRefreshController) Name ¶ added in v0.1.7
func (c *ThreatIntelRefreshController) Name() string
Name returns the controller name.
Source Files
¶
- agent_health.go
- approval_expiration.go
- audit_chain_verify.go
- audit_retention.go
- control_change_publisher.go
- control_test_cadence.go
- control_test_scheduler.go
- controller.go
- data_expiration.go
- group_sync.go
- job_recovery.go
- metrics.go
- owner_resolution.go
- priority_audit_retention.go
- priority_queue_order.go
- priority_reclassify.go
- queue_priority.go
- risk_snapshot.go
- role_sync.go
- scan_retry.go
- scan_timeout.go
- scope_reconciliation.go
- sla_escalation.go
- threat_intel_refresh.go