Documentation
¶
Overview ¶
Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.
Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention
Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring
Index ¶
- type AgentHealthController
- type AgentHealthControllerConfig
- type ApprovalExpirationController
- type ApprovalExpirationControllerConfig
- type AuditRetentionController
- type AuditRetentionControllerConfig
- type Controller
- type DataExpirationController
- type DataExpirationControllerConfig
- type GroupSyncController
- type GroupSyncControllerConfig
- type JobRecoveryController
- type JobRecoveryControllerConfig
- type Manager
- type ManagerConfig
- type Metrics
- type NoopMetrics
- func (m *NoopMetrics) IncrementReconcileErrors(controller string)
- func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
- func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
- type PrometheusMetrics
- func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
- func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
- func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
- type QueuePriorityController
- type QueuePriorityControllerConfig
- type RetryDispatcher
- type RoleSyncController
- type RoleSyncControllerConfig
- type ScanRetryController
- type ScanRetryControllerConfig
- type ScanTimeoutController
- type ScanTimeoutControllerConfig
- type ScopeReconciliationController
- type ScopeReconciliationControllerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHealthController ¶
type AgentHealthController struct {
// contains filtered or unexported fields
}
AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.
func NewAgentHealthController ¶
func NewAgentHealthController( agentRepo agent.Repository, config *AgentHealthControllerConfig, ) *AgentHealthController
NewAgentHealthController creates a new AgentHealthController.
func (*AgentHealthController) Interval ¶
func (c *AgentHealthController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AgentHealthController) Name ¶
func (c *AgentHealthController) Name() string
Name returns the controller name.
type AgentHealthControllerConfig ¶
type AgentHealthControllerConfig struct {
// Interval is how often to run the health check.
// Default: 30 seconds.
Interval time.Duration
// StaleTimeout is how long since last heartbeat before marking an agent as offline.
// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
StaleTimeout time.Duration
// Logger for logging.
Logger *logger.Logger
}
AgentHealthControllerConfig configures the AgentHealthController.
type ApprovalExpirationController ¶ added in v0.1.2
type ApprovalExpirationController struct {
// contains filtered or unexported fields
}
ApprovalExpirationController auto-reopens findings whose risk acceptances have expired. It scans for approved approvals with an expires_at in the past, marks them as expired, and transitions the associated findings back to "confirmed" status.
func NewApprovalExpirationController ¶ added in v0.1.2
func NewApprovalExpirationController( approvalRepo vulnerability.ApprovalRepository, findingRepo vulnerability.FindingRepository, config *ApprovalExpirationControllerConfig, ) *ApprovalExpirationController
NewApprovalExpirationController creates a new ApprovalExpirationController.
func (*ApprovalExpirationController) Interval ¶ added in v0.1.2
func (c *ApprovalExpirationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ApprovalExpirationController) Name ¶ added in v0.1.2
func (c *ApprovalExpirationController) Name() string
Name returns the controller name.
type ApprovalExpirationControllerConfig ¶ added in v0.1.2
type ApprovalExpirationControllerConfig struct {
// Interval is how often to run the expiration check.
// Default: 1 hour.
Interval time.Duration
// BatchSize is the maximum number of expired approvals to process per reconciliation.
// Default: 100.
BatchSize int
// Logger for logging.
Logger *logger.Logger
}
ApprovalExpirationControllerConfig configures the ApprovalExpirationController.
type AuditRetentionController ¶
type AuditRetentionController struct {
// contains filtered or unexported fields
}
AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes
The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years
IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.
func NewAuditRetentionController ¶
func NewAuditRetentionController( auditRepo admin.AuditLogRepository, config *AuditRetentionControllerConfig, ) *AuditRetentionController
NewAuditRetentionController creates a new AuditRetentionController.
func (*AuditRetentionController) Interval ¶
func (c *AuditRetentionController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AuditRetentionController) Name ¶
func (c *AuditRetentionController) Name() string
Name returns the controller name.
type AuditRetentionControllerConfig ¶
type AuditRetentionControllerConfig struct {
// Interval is how often to run the retention check.
// Default: 24 hours (once a day).
Interval time.Duration
// RetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
RetentionDays int
// BatchSize is the maximum number of logs to delete in one batch.
// This prevents long-running transactions.
// Default: 10000.
BatchSize int
// DryRun if true, only counts logs that would be deleted without actually deleting.
// Useful for testing retention policies.
// Default: false.
DryRun bool
// Logger for logging.
Logger *logger.Logger
}
AuditRetentionControllerConfig configures the AuditRetentionController.
type Controller ¶
type Controller interface {
// Name returns the unique name of this controller.
Name() string
// Interval returns how often this controller should run.
Interval() time.Duration
// Reconcile performs the reconciliation logic.
// It should be idempotent - running multiple times should have the same effect.
// Returns the number of items processed and any error encountered.
Reconcile(ctx context.Context) (int, error)
}
Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.
type DataExpirationController ¶ added in v0.1.2
type DataExpirationController struct {
// contains filtered or unexported fields
}
DataExpirationController handles periodic expiration of stale data:
- Suppression rules past their expires_at
- Scope exclusions past their expires_at
- Audit logs older than the retention period
Without this controller, expired suppression rules and scope exclusions remain in 'active'/'approved' status indefinitely, and audit logs accumulate without bounds.
func NewDataExpirationController ¶ added in v0.1.2
func NewDataExpirationController( suppressionRepo suppression.Repository, exclusionRepo scope.ExclusionRepository, auditRepo audit.Repository, config *DataExpirationControllerConfig, ) *DataExpirationController
NewDataExpirationController creates a new DataExpirationController.
func (*DataExpirationController) Interval ¶ added in v0.1.2
func (c *DataExpirationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*DataExpirationController) Name ¶ added in v0.1.2
func (c *DataExpirationController) Name() string
Name returns the controller name.
type DataExpirationControllerConfig ¶ added in v0.1.2
type DataExpirationControllerConfig struct {
// Interval is how often to run the expiration check.
// Default: 1 hour.
Interval time.Duration
// AuditRetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
AuditRetentionDays int
// Logger for logging.
Logger *logger.Logger
}
DataExpirationControllerConfig configures the DataExpirationController.
type GroupSyncController ¶ added in v0.1.2
type GroupSyncController struct {
// contains filtered or unexported fields
}
GroupSyncController periodically synchronizes groups from external providers. It implements the Controller interface and is managed by the controller Manager.
This controller: 1. Iterates over all active tenants 2. For each tenant, triggers SyncAll to synchronize all configured providers 3. Logs sync results for monitoring
func NewGroupSyncController ¶ added in v0.1.2
func NewGroupSyncController( syncService *app.GroupSyncService, tenantRepo tenant.Repository, config *GroupSyncControllerConfig, ) *GroupSyncController
NewGroupSyncController creates a new GroupSyncController.
func (*GroupSyncController) Interval ¶ added in v0.1.2
func (c *GroupSyncController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*GroupSyncController) Name ¶ added in v0.1.2
func (c *GroupSyncController) Name() string
Name returns the controller name.
func (*GroupSyncController) Reconcile ¶ added in v0.1.2
func (c *GroupSyncController) Reconcile(ctx context.Context) (int, error)
Reconcile performs the periodic group sync across all active tenants. Returns the number of tenants synced and any error encountered.
func (*GroupSyncController) TriggerSync ¶ added in v0.1.2
TriggerSync manually triggers a sync for a specific tenant. This is used by the HTTP handler for on-demand sync requests.
type GroupSyncControllerConfig ¶ added in v0.1.2
type GroupSyncControllerConfig struct {
// Interval is how often to run the sync.
// Default: 1 hour.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
GroupSyncControllerConfig configures the GroupSyncController.
type JobRecoveryController ¶
type JobRecoveryController struct {
// contains filtered or unexported fields
}
JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.
The controller performs three main tasks:
- Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
- Expire old jobs: Mark jobs as expired if they've been in queue too long
- Clean up: Mark orphaned jobs as failed if they exceed retry limit
func NewJobRecoveryController ¶
func NewJobRecoveryController( commandRepo command.Repository, config *JobRecoveryControllerConfig, ) *JobRecoveryController
NewJobRecoveryController creates a new JobRecoveryController.
func (*JobRecoveryController) Interval ¶
func (c *JobRecoveryController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*JobRecoveryController) Name ¶
func (c *JobRecoveryController) Name() string
Name returns the controller name.
type JobRecoveryControllerConfig ¶
type JobRecoveryControllerConfig struct {
// Interval is how often to run the job recovery check.
// Default: 60 seconds.
Interval time.Duration
// StuckThresholdMinutes is how long a job can be in acknowledged/running state
// without progress before being considered stuck.
// Default: 30 minutes.
StuckThresholdMinutes int
// TenantStuckThresholdMinutes is how long a tenant command can be assigned
// to an agent without being picked up before being reassigned.
// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
TenantStuckThresholdMinutes int
// MaxRetries is the maximum number of retry attempts for a job.
// After this many retries, the job will be marked as failed.
// Default: 3.
MaxRetries int
// MaxQueueMinutes is how long a job can wait in the queue before expiring.
// Default: 60 minutes.
MaxQueueMinutes int
// Logger for logging.
Logger *logger.Logger
}
JobRecoveryControllerConfig configures the JobRecoveryController.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages multiple controllers, running them in parallel goroutines.
func NewManager ¶
func NewManager(cfg *ManagerConfig) *Manager
NewManager creates a new controller manager.
func (*Manager) ControllerCount ¶
ControllerCount returns the number of registered controllers.
func (*Manager) ControllerNames ¶
ControllerNames returns the names of all registered controllers.
func (*Manager) Register ¶
func (m *Manager) Register(c Controller)
Register adds a controller to the manager.
type ManagerConfig ¶
type ManagerConfig struct {
// Metrics collector (optional)
Metrics Metrics
// Logger (required)
Logger *logger.Logger
}
ManagerConfig configures the controller manager.
type Metrics ¶
type Metrics interface {
// RecordReconcile records a reconciliation run.
RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
// SetControllerRunning sets whether a controller is running.
SetControllerRunning(controller string, running bool)
// IncrementReconcileErrors increments the error counter.
IncrementReconcileErrors(controller string)
// SetLastReconcileTime sets the last reconcile timestamp.
SetLastReconcileTime(controller string, t time.Time)
}
Metrics defines the interface for controller metrics collection.
type NoopMetrics ¶
type NoopMetrics struct{}
NoopMetrics is a no-op implementation of Metrics for testing.
func (*NoopMetrics) IncrementReconcileErrors ¶
func (m *NoopMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors does nothing.
func (*NoopMetrics) RecordReconcile ¶
func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile does nothing.
func (*NoopMetrics) SetControllerRunning ¶
func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning does nothing.
func (*NoopMetrics) SetLastReconcileTime ¶
func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime does nothing.
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements the Metrics interface using Prometheus.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(namespace string) *PrometheusMetrics
NewPrometheusMetrics creates a new PrometheusMetrics.
func (*PrometheusMetrics) IncrementReconcileErrors ¶
func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors increments the error counter.
func (*PrometheusMetrics) RecordReconcile ¶
func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile records a reconciliation run.
func (*PrometheusMetrics) SetControllerRunning ¶
func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning sets whether a controller is running.
func (*PrometheusMetrics) SetLastReconcileTime ¶
func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime sets the last reconcile timestamp.
type QueuePriorityController ¶
type QueuePriorityController struct {
// contains filtered or unexported fields
}
QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)
The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)
This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.
func NewQueuePriorityController ¶
func NewQueuePriorityController( commandRepo command.Repository, config *QueuePriorityControllerConfig, ) *QueuePriorityController
NewQueuePriorityController creates a new QueuePriorityController.
func (*QueuePriorityController) Interval ¶
func (c *QueuePriorityController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*QueuePriorityController) Name ¶
func (c *QueuePriorityController) Name() string
Name returns the controller name.
type QueuePriorityControllerConfig ¶
type QueuePriorityControllerConfig struct {
// Interval is how often to recalculate queue priorities.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
QueuePriorityControllerConfig configures the QueuePriorityController.
type RetryDispatcher ¶ added in v0.1.5
type RetryDispatcher interface {
RetryScanRun(ctx context.Context, tenantID, scanID shared.ID, retryAttempt int) error
}
RetryDispatcher is the dependency the ScanRetryController uses to actually re-trigger a failed scan run. The scan service implements this.
type RoleSyncController ¶ added in v0.1.2
type RoleSyncController struct {
// contains filtered or unexported fields
}
RoleSyncController ensures tenant_members and user_roles stay in sync.
The RBAC permission system reads from user_roles, while tenant_members stores the canonical membership. A trigger (sync_tenant_member_to_user_roles) handles new INSERT/UPDATE, but entries can become desynced after:
- Database restore from backup
- Migration rollback and re-apply
- Manual data manipulation
- Trigger being temporarily disabled
This controller detects missing user_roles entries and backfills them, preventing "Access Denied" errors for affected users.
func NewRoleSyncController ¶ added in v0.1.2
func NewRoleSyncController( db *sql.DB, config *RoleSyncControllerConfig, ) *RoleSyncController
NewRoleSyncController creates a new RoleSyncController.
func (*RoleSyncController) Interval ¶ added in v0.1.2
func (c *RoleSyncController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*RoleSyncController) Name ¶ added in v0.1.2
func (c *RoleSyncController) Name() string
Name returns the controller name.
type RoleSyncControllerConfig ¶ added in v0.1.2
type RoleSyncControllerConfig struct {
// Interval is how often to run the sync check.
// Default: 1 hour.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
RoleSyncControllerConfig configures the RoleSyncController.
type ScanRetryController ¶ added in v0.1.5
type ScanRetryController struct {
// contains filtered or unexported fields
}
ScanRetryController periodically checks for failed pipeline runs that are eligible for automatic retry (based on the parent scan's max_retries + retry_backoff_seconds with exponential backoff) and dispatches retries through the RetryDispatcher.
func NewScanRetryController ¶ added in v0.1.5
func NewScanRetryController( runRepo pipeline.RunRepository, dispatcher RetryDispatcher, config *ScanRetryControllerConfig, ) *ScanRetryController
NewScanRetryController creates a new ScanRetryController.
func (*ScanRetryController) Interval ¶ added in v0.1.5
func (c *ScanRetryController) Interval() time.Duration
func (*ScanRetryController) Name ¶ added in v0.1.5
func (c *ScanRetryController) Name() string
type ScanRetryControllerConfig ¶ added in v0.1.5
type ScanRetryControllerConfig struct {
// Interval is how often to check for retry candidates.
// Default: 60 seconds.
Interval time.Duration
// BatchSize is the maximum candidates processed per cycle.
// Default: 100.
BatchSize int
Logger *logger.Logger
}
ScanRetryControllerConfig configures the ScanRetryController.
type ScanTimeoutController ¶ added in v0.1.5
type ScanTimeoutController struct {
// contains filtered or unexported fields
}
ScanTimeoutController periodically marks pipeline_runs as timed out when they exceed their scan's configured timeout_seconds.
This complements JobRecoveryController (which marks stuck commands) by enforcing per-scan timeouts. A scan can specify its own timeout_seconds (default 1h, max 24h), and runs that exceed that are forcefully marked as timeout with an appropriate error message.
func NewScanTimeoutController ¶ added in v0.1.5
func NewScanTimeoutController( runRepo pipeline.RunRepository, config *ScanTimeoutControllerConfig, ) *ScanTimeoutController
NewScanTimeoutController creates a new ScanTimeoutController.
func (*ScanTimeoutController) Interval ¶ added in v0.1.5
func (c *ScanTimeoutController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ScanTimeoutController) Name ¶ added in v0.1.5
func (c *ScanTimeoutController) Name() string
Name returns the controller name.
type ScanTimeoutControllerConfig ¶ added in v0.1.5
type ScanTimeoutControllerConfig struct {
// Interval is how often to check for timed-out runs.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
ScanTimeoutControllerConfig configures the ScanTimeoutController.
type ScopeReconciliationController ¶ added in v0.1.2
type ScopeReconciliationController struct {
// contains filtered or unexported fields
}
ScopeReconciliationController periodically reconciles scope rule assignments as a safety net for the real-time event-driven hooks.
This controller: 1. Lists all tenants with active scope rules 2. For each tenant, lists all groups with active scope rules 3. Reconciles each group by re-evaluating matching assets
Design: This is a background safety net (K8s-style eventual consistency). The primary path is real-time hooks in AssetService and AssetGroupService.
func NewScopeReconciliationController ¶ added in v0.1.2
func NewScopeReconciliationController( acRepo accesscontrol.Repository, reconciler scopeGroupReconciler, config *ScopeReconciliationControllerConfig, ) *ScopeReconciliationController
NewScopeReconciliationController creates a new ScopeReconciliationController.
func (*ScopeReconciliationController) Interval ¶ added in v0.1.2
func (c *ScopeReconciliationController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*ScopeReconciliationController) Name ¶ added in v0.1.2
func (c *ScopeReconciliationController) Name() string
Name returns the controller name.
type ScopeReconciliationControllerConfig ¶ added in v0.1.2
type ScopeReconciliationControllerConfig struct {
// Interval is how often to run the reconciliation.
// Default: 30 minutes.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
ScopeReconciliationControllerConfig configures the ScopeReconciliationController.