controller

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: GPL-3.0 Imports: 21 Imported by: 0

Documentation

Overview

Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.

Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention

Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentHealthController

type AgentHealthController struct {
	// contains filtered or unexported fields
}

AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.

func NewAgentHealthController

func NewAgentHealthController(
	agentRepo agent.Repository,
	config *AgentHealthControllerConfig,
) *AgentHealthController

NewAgentHealthController creates a new AgentHealthController.

func (*AgentHealthController) Interval

func (c *AgentHealthController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*AgentHealthController) Name

func (c *AgentHealthController) Name() string

Name returns the controller name.

func (*AgentHealthController) Reconcile

func (c *AgentHealthController) Reconcile(ctx context.Context) (int, error)

Reconcile checks agent health and marks stale agents as offline. Uses the MarkStaleAgentsOffline method which also updates last_offline_at timestamp.

type AgentHealthControllerConfig

type AgentHealthControllerConfig struct {
	// Interval is how often to run the health check.
	// Default: 30 seconds.
	Interval time.Duration

	// StaleTimeout is how long since last heartbeat before marking an agent as offline.
	// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
	StaleTimeout time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

AgentHealthControllerConfig configures the AgentHealthController.

type ApprovalExpirationController added in v0.1.2

type ApprovalExpirationController struct {
	// contains filtered or unexported fields
}

ApprovalExpirationController auto-reopens findings whose risk acceptances have expired. It scans for approved approvals with an expires_at in the past, marks them as expired, and transitions the associated findings back to "confirmed" status.

func NewApprovalExpirationController added in v0.1.2

func NewApprovalExpirationController(
	approvalRepo vulnerability.ApprovalRepository,
	findingRepo vulnerability.FindingRepository,
	config *ApprovalExpirationControllerConfig,
) *ApprovalExpirationController

NewApprovalExpirationController creates a new ApprovalExpirationController.

func (*ApprovalExpirationController) Interval added in v0.1.2

Interval returns the reconciliation interval.

func (*ApprovalExpirationController) Name added in v0.1.2

Name returns the controller name.

func (*ApprovalExpirationController) Reconcile added in v0.1.2

func (c *ApprovalExpirationController) Reconcile(ctx context.Context) (int, error)

Reconcile finds expired approved approvals and reopens associated findings.

type ApprovalExpirationControllerConfig added in v0.1.2

type ApprovalExpirationControllerConfig struct {
	// Interval is how often to run the expiration check.
	// Default: 1 hour.
	Interval time.Duration

	// BatchSize is the maximum number of expired approvals to process per reconciliation.
	// Default: 100.
	BatchSize int

	// Logger for logging.
	Logger *logger.Logger
}

ApprovalExpirationControllerConfig configures the ApprovalExpirationController.

type AuditRetentionController

type AuditRetentionController struct {
	// contains filtered or unexported fields
}

AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes

The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years

IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.

func NewAuditRetentionController

func NewAuditRetentionController(
	auditRepo admin.AuditLogRepository,
	config *AuditRetentionControllerConfig,
) *AuditRetentionController

NewAuditRetentionController creates a new AuditRetentionController.

func (*AuditRetentionController) Interval

func (c *AuditRetentionController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*AuditRetentionController) Name

func (c *AuditRetentionController) Name() string

Name returns the controller name.

func (*AuditRetentionController) Reconcile

func (c *AuditRetentionController) Reconcile(ctx context.Context) (int, error)

Reconcile deletes audit logs older than the retention period.

type AuditRetentionControllerConfig

type AuditRetentionControllerConfig struct {
	// Interval is how often to run the retention check.
	// Default: 24 hours (once a day).
	Interval time.Duration

	// RetentionDays is how long to keep audit logs.
	// Logs older than this will be deleted.
	// Default: 365 days (1 year).
	RetentionDays int

	// BatchSize is the maximum number of logs to delete in one batch.
	// This prevents long-running transactions.
	// Default: 10000.
	BatchSize int

	// DryRun if true, only counts logs that would be deleted without actually deleting.
	// Useful for testing retention policies.
	// Default: false.
	DryRun bool

	// Logger for logging.
	Logger *logger.Logger
}

AuditRetentionControllerConfig configures the AuditRetentionController.

type Controller

type Controller interface {
	// Name returns the unique name of this controller.
	Name() string

	// Interval returns how often this controller should run.
	Interval() time.Duration

	// Reconcile performs the reconciliation logic.
	// It should be idempotent - running multiple times should have the same effect.
	// Returns the number of items processed and any error encountered.
	Reconcile(ctx context.Context) (int, error)
}

Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.

type DataExpirationController added in v0.1.2

type DataExpirationController struct {
	// contains filtered or unexported fields
}

DataExpirationController handles periodic expiration of stale data:

  • Suppression rules past their expires_at
  • Scope exclusions past their expires_at
  • Audit logs older than the retention period

Without this controller, expired suppression rules and scope exclusions remain in 'active'/'approved' status indefinitely, and audit logs accumulate without bounds.

func NewDataExpirationController added in v0.1.2

func NewDataExpirationController(
	suppressionRepo suppression.Repository,
	exclusionRepo scope.ExclusionRepository,
	auditRepo audit.Repository,
	config *DataExpirationControllerConfig,
) *DataExpirationController

NewDataExpirationController creates a new DataExpirationController.

func (*DataExpirationController) Interval added in v0.1.2

func (c *DataExpirationController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*DataExpirationController) Name added in v0.1.2

func (c *DataExpirationController) Name() string

Name returns the controller name.

func (*DataExpirationController) Reconcile added in v0.1.2

func (c *DataExpirationController) Reconcile(ctx context.Context) (int, error)

Reconcile expires stale suppression rules, scope exclusions, and old audit logs.

type DataExpirationControllerConfig added in v0.1.2

type DataExpirationControllerConfig struct {
	// Interval is how often to run the expiration check.
	// Default: 1 hour.
	Interval time.Duration

	// AuditRetentionDays is how long to keep audit logs.
	// Logs older than this will be deleted.
	// Default: 365 days (1 year).
	AuditRetentionDays int

	// Logger for logging.
	Logger *logger.Logger
}

DataExpirationControllerConfig configures the DataExpirationController.

type GroupSyncController added in v0.1.2

type GroupSyncController struct {
	// contains filtered or unexported fields
}

GroupSyncController periodically synchronizes groups from external providers. It implements the Controller interface and is managed by the controller Manager.

This controller: 1. Iterates over all active tenants 2. For each tenant, triggers SyncAll to synchronize all configured providers 3. Logs sync results for monitoring

func NewGroupSyncController added in v0.1.2

func NewGroupSyncController(
	syncService *app.GroupSyncService,
	tenantRepo tenant.Repository,
	config *GroupSyncControllerConfig,
) *GroupSyncController

NewGroupSyncController creates a new GroupSyncController.

func (*GroupSyncController) Interval added in v0.1.2

func (c *GroupSyncController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*GroupSyncController) Name added in v0.1.2

func (c *GroupSyncController) Name() string

Name returns the controller name.

func (*GroupSyncController) Reconcile added in v0.1.2

func (c *GroupSyncController) Reconcile(ctx context.Context) (int, error)

Reconcile performs the periodic group sync across all active tenants. Returns the number of tenants synced and any error encountered.

func (*GroupSyncController) TriggerSync added in v0.1.2

func (c *GroupSyncController) TriggerSync(ctx context.Context, tenantID shared.ID) error

TriggerSync manually triggers a sync for a specific tenant. This is used by the HTTP handler for on-demand sync requests.

type GroupSyncControllerConfig added in v0.1.2

type GroupSyncControllerConfig struct {
	// Interval is how often to run the sync.
	// Default: 1 hour.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

GroupSyncControllerConfig configures the GroupSyncController.

type JobRecoveryController

type JobRecoveryController struct {
	// contains filtered or unexported fields
}

JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.

The controller performs three main tasks:

  1. Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
  2. Expire old jobs: Mark jobs as expired if they've been in queue too long
  3. Clean up: Mark orphaned jobs as failed if they exceed retry limit

func NewJobRecoveryController

func NewJobRecoveryController(
	commandRepo command.Repository,
	config *JobRecoveryControllerConfig,
) *JobRecoveryController

NewJobRecoveryController creates a new JobRecoveryController.

func (*JobRecoveryController) Interval

func (c *JobRecoveryController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*JobRecoveryController) Name

func (c *JobRecoveryController) Name() string

Name returns the controller name.

func (*JobRecoveryController) Reconcile

func (c *JobRecoveryController) Reconcile(ctx context.Context) (int, error)

Reconcile recovers stuck jobs and expires old ones.

type JobRecoveryControllerConfig

type JobRecoveryControllerConfig struct {
	// Interval is how often to run the job recovery check.
	// Default: 60 seconds.
	Interval time.Duration

	// StuckThresholdMinutes is how long a job can be in acknowledged/running state
	// without progress before being considered stuck.
	// Default: 30 minutes.
	StuckThresholdMinutes int

	// TenantStuckThresholdMinutes is how long a tenant command can be assigned
	// to an agent without being picked up before being reassigned.
	// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
	TenantStuckThresholdMinutes int

	// MaxRetries is the maximum number of retry attempts for a job.
	// After this many retries, the job will be marked as failed.
	// Default: 3.
	MaxRetries int

	// MaxQueueMinutes is how long a job can wait in the queue before expiring.
	// Default: 60 minutes.
	MaxQueueMinutes int

	// Logger for logging.
	Logger *logger.Logger
}

JobRecoveryControllerConfig configures the JobRecoveryController.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages multiple controllers, running them in parallel goroutines.

func NewManager

func NewManager(cfg *ManagerConfig) *Manager

NewManager creates a new controller manager.

func (*Manager) ControllerCount

func (m *Manager) ControllerCount() int

ControllerCount returns the number of registered controllers.

func (*Manager) ControllerNames

func (m *Manager) ControllerNames() []string

ControllerNames returns the names of all registered controllers.

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning checks if the manager is running.

func (*Manager) Register

func (m *Manager) Register(c Controller)

Register adds a controller to the manager.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts all registered controllers.

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops all controllers gracefully.

type ManagerConfig

type ManagerConfig struct {
	// Metrics collector (optional)
	Metrics Metrics

	// Logger (required)
	Logger *logger.Logger
}

ManagerConfig configures the controller manager.

type Metrics

type Metrics interface {
	// RecordReconcile records a reconciliation run.
	RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

	// SetControllerRunning sets whether a controller is running.
	SetControllerRunning(controller string, running bool)

	// IncrementReconcileErrors increments the error counter.
	IncrementReconcileErrors(controller string)

	// SetLastReconcileTime sets the last reconcile timestamp.
	SetLastReconcileTime(controller string, t time.Time)
}

Metrics defines the interface for controller metrics collection.

type NoopMetrics

type NoopMetrics struct{}

NoopMetrics is a no-op implementation of Metrics for testing.

func (*NoopMetrics) IncrementReconcileErrors

func (m *NoopMetrics) IncrementReconcileErrors(controller string)

IncrementReconcileErrors does nothing.

func (*NoopMetrics) RecordReconcile

func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

RecordReconcile does nothing.

func (*NoopMetrics) SetControllerRunning

func (m *NoopMetrics) SetControllerRunning(controller string, running bool)

SetControllerRunning does nothing.

func (*NoopMetrics) SetLastReconcileTime

func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)

SetLastReconcileTime does nothing.

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements the Metrics interface using Prometheus.

func NewPrometheusMetrics

func NewPrometheusMetrics(namespace string) *PrometheusMetrics

NewPrometheusMetrics creates a new PrometheusMetrics.

func (*PrometheusMetrics) IncrementReconcileErrors

func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)

IncrementReconcileErrors increments the error counter.

func (*PrometheusMetrics) RecordReconcile

func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

RecordReconcile records a reconciliation run.

func (*PrometheusMetrics) SetControllerRunning

func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)

SetControllerRunning sets whether a controller is running.

func (*PrometheusMetrics) SetLastReconcileTime

func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)

SetLastReconcileTime sets the last reconcile timestamp.

type QueuePriorityController

type QueuePriorityController struct {
	// contains filtered or unexported fields
}

QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)

The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)

This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.

func NewQueuePriorityController

func NewQueuePriorityController(
	commandRepo command.Repository,
	config *QueuePriorityControllerConfig,
) *QueuePriorityController

NewQueuePriorityController creates a new QueuePriorityController.

func (*QueuePriorityController) Interval

func (c *QueuePriorityController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*QueuePriorityController) Name

func (c *QueuePriorityController) Name() string

Name returns the controller name.

func (*QueuePriorityController) Reconcile

func (c *QueuePriorityController) Reconcile(ctx context.Context) (int, error)

Reconcile recalculates queue priorities for all pending platform jobs.

type QueuePriorityControllerConfig

type QueuePriorityControllerConfig struct {
	// Interval is how often to recalculate queue priorities.
	// Default: 60 seconds.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

QueuePriorityControllerConfig configures the QueuePriorityController.

type RetryDispatcher added in v0.1.5

type RetryDispatcher interface {
	RetryScanRun(ctx context.Context, tenantID, scanID shared.ID, retryAttempt int) error
}

RetryDispatcher is the dependency the ScanRetryController uses to actually re-trigger a failed scan run. The scan service implements this.

type RoleSyncController added in v0.1.2

type RoleSyncController struct {
	// contains filtered or unexported fields
}

RoleSyncController ensures tenant_members and user_roles stay in sync.

The RBAC permission system reads from user_roles, while tenant_members stores the canonical membership. A trigger (sync_tenant_member_to_user_roles) handles new INSERT/UPDATE, but entries can become desynced after:

  • Database restore from backup
  • Migration rollback and re-apply
  • Manual data manipulation
  • Trigger being temporarily disabled

This controller detects missing user_roles entries and backfills them, preventing "Access Denied" errors for affected users.

func NewRoleSyncController added in v0.1.2

func NewRoleSyncController(
	db *sql.DB,
	config *RoleSyncControllerConfig,
) *RoleSyncController

NewRoleSyncController creates a new RoleSyncController.

func (*RoleSyncController) Interval added in v0.1.2

func (c *RoleSyncController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*RoleSyncController) Name added in v0.1.2

func (c *RoleSyncController) Name() string

Name returns the controller name.

func (*RoleSyncController) Reconcile added in v0.1.2

func (c *RoleSyncController) Reconcile(ctx context.Context) (int, error)

Reconcile detects and fixes missing user_roles entries.

type RoleSyncControllerConfig added in v0.1.2

type RoleSyncControllerConfig struct {
	// Interval is how often to run the sync check.
	// Default: 1 hour.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

RoleSyncControllerConfig configures the RoleSyncController.

type ScanRetryController added in v0.1.5

type ScanRetryController struct {
	// contains filtered or unexported fields
}

ScanRetryController periodically checks for failed pipeline runs that are eligible for automatic retry (based on the parent scan's max_retries + retry_backoff_seconds with exponential backoff) and dispatches retries through the RetryDispatcher.

func NewScanRetryController added in v0.1.5

func NewScanRetryController(
	runRepo pipeline.RunRepository,
	dispatcher RetryDispatcher,
	config *ScanRetryControllerConfig,
) *ScanRetryController

NewScanRetryController creates a new ScanRetryController.

func (*ScanRetryController) Interval added in v0.1.5

func (c *ScanRetryController) Interval() time.Duration

func (*ScanRetryController) Name added in v0.1.5

func (c *ScanRetryController) Name() string

func (*ScanRetryController) Reconcile added in v0.1.5

func (c *ScanRetryController) Reconcile(ctx context.Context) (int, error)

type ScanRetryControllerConfig added in v0.1.5

type ScanRetryControllerConfig struct {
	// Interval is how often to check for retry candidates.
	// Default: 60 seconds.
	Interval time.Duration

	// BatchSize is the maximum candidates processed per cycle.
	// Default: 100.
	BatchSize int

	Logger *logger.Logger
}

ScanRetryControllerConfig configures the ScanRetryController.

type ScanTimeoutController added in v0.1.5

type ScanTimeoutController struct {
	// contains filtered or unexported fields
}

ScanTimeoutController periodically marks pipeline_runs as timed out when they exceed their scan's configured timeout_seconds.

This complements JobRecoveryController (which marks stuck commands) by enforcing per-scan timeouts. A scan can specify its own timeout_seconds (default 1h, max 24h), and runs that exceed that are forcefully marked as timeout with an appropriate error message.

func NewScanTimeoutController added in v0.1.5

func NewScanTimeoutController(
	runRepo pipeline.RunRepository,
	config *ScanTimeoutControllerConfig,
) *ScanTimeoutController

NewScanTimeoutController creates a new ScanTimeoutController.

func (*ScanTimeoutController) Interval added in v0.1.5

func (c *ScanTimeoutController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*ScanTimeoutController) Name added in v0.1.5

func (c *ScanTimeoutController) Name() string

Name returns the controller name.

func (*ScanTimeoutController) Reconcile added in v0.1.5

func (c *ScanTimeoutController) Reconcile(ctx context.Context) (int, error)

Reconcile marks expired runs as timed out.

type ScanTimeoutControllerConfig added in v0.1.5

type ScanTimeoutControllerConfig struct {
	// Interval is how often to check for timed-out runs.
	// Default: 60 seconds.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

ScanTimeoutControllerConfig configures the ScanTimeoutController.

type ScopeReconciliationController added in v0.1.2

type ScopeReconciliationController struct {
	// contains filtered or unexported fields
}

ScopeReconciliationController periodically reconciles scope rule assignments as a safety net for the real-time event-driven hooks.

This controller: 1. Lists all tenants with active scope rules 2. For each tenant, lists all groups with active scope rules 3. Reconciles each group by re-evaluating matching assets

Design: This is a background safety net (K8s-style eventual consistency). The primary path is real-time hooks in AssetService and AssetGroupService.

func NewScopeReconciliationController added in v0.1.2

func NewScopeReconciliationController(
	acRepo accesscontrol.Repository,
	reconciler scopeGroupReconciler,
	config *ScopeReconciliationControllerConfig,
) *ScopeReconciliationController

NewScopeReconciliationController creates a new ScopeReconciliationController.

func (*ScopeReconciliationController) Interval added in v0.1.2

Interval returns the reconciliation interval.

func (*ScopeReconciliationController) Name added in v0.1.2

Name returns the controller name.

func (*ScopeReconciliationController) Reconcile added in v0.1.2

func (c *ScopeReconciliationController) Reconcile(ctx context.Context) (int, error)

Reconcile performs periodic scope rule reconciliation across all tenants. Returns the number of groups reconciled and any error encountered.

type ScopeReconciliationControllerConfig added in v0.1.2

type ScopeReconciliationControllerConfig struct {
	// Interval is how often to run the reconciliation.
	// Default: 30 minutes.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

ScopeReconciliationControllerConfig configures the ScopeReconciliationController.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL