Documentation
¶
Index ¶
- Constants
- Variables
- func IsTerminalPhase(phase remediationv1alpha1.RemediationPhase) bool
- type CRDCreationError
- type CRDCreator
- type Clock
- type DeduplicationError
- type DeduplicationMetadata
- type DistributedLockManager
- type MockClock
- type OperationError
- type PhaseBasedDeduplicationChecker
- type RealClock
- type RetryError
- type RetryObserver
- type StatusUpdater
Constants ¶
const ( // LockDurationSeconds defines how long a lock is valid before it expires. // If a pod crashes without releasing, another pod can take over after 30s. LockDurationSeconds = 30 )
Variables ¶
var GatewayRetryBackoff = retry.DefaultBackoff
GatewayRetryBackoff is the retry configuration for Gateway status updates. DD-GATEWAY-011: Optimized for Gateway latency requirements (P95 <50ms)
Functions ¶
func IsTerminalPhase ¶
func IsTerminalPhase(phase remediationv1alpha1.RemediationPhase) bool
IsTerminalPhase checks if a RemediationRequest phase is terminal. Terminal phases allow new RR creation for the same signal fingerprint.
DD-GATEWAY-011 v1.3: Terminal phase classification DD-GATEWAY-009: Cancelled state handling (allows retry)
TERMINAL (allow new RR creation): - Completed: Remediation succeeded - Failed: Remediation failed (including after Blocked→Failed transition) - TimedOut: Remediation timed out - Skipped: Remediation was not needed (per BR-ORCH-032) - Cancelled: Remediation was manually cancelled (per DD-GATEWAY-009, allows retry)
NON-TERMINAL (deduplicate → update status): - Pending, Processing, Analyzing, AwaitingApproval, Executing, Verifying - Blocked: RO holds for cooldown, Gateway updates dedup status (prevents RR flood) - Verifying: EA assessment in progress (#280), Gateway deduplicates
WHITELIST approach (safer than blacklist): - Only explicitly terminal phases allow new RR - ALL other phases (including Blocked, Verifying, and unknown future phases) are non-terminal
Phase values per api/remediation/v1alpha1/remediationrequest_types.go: - Terminal: Completed, Failed, TimedOut, Skipped, Cancelled - Non-Terminal: Pending, Processing, Analyzing, AwaitingApproval, Executing, Verifying, Blocked
🏛️ Compliance: BR-COMMON-001 (Phase Format), Viceversa Pattern (Cross-Service Consumption)
Types ¶
type CRDCreationError ¶
type CRDCreationError struct {
*OperationError
CRDName string // RemediationRequest name
SignalType string // Signal type (alert/event)
SignalName string // Alert name (if applicable)
}
CRDCreationError is a specialized error for CRD creation failures. Extends OperationError with CRD-specific fields.
func NewCRDCreationError ¶
func NewCRDCreationError(fingerprint, namespace, crdName, signalType, alertName string, attempts int, startTime time.Time, err error) *CRDCreationError
NewCRDCreationError creates a CRD creation error with full context. Automatically sets operation to "create_remediation_request" and phase to "crd_creation".
func (*CRDCreationError) Error ¶
func (e *CRDCreationError) Error() string
Error extends OperationError.Error() with CRD-specific fields.
type CRDCreator ¶
type CRDCreator struct {
// contains filtered or unexported fields
}
CRDCreator converts NormalizedSignal to RemediationRequest CRD
This component is responsible for: 1. Generating unique CRD names (rr-{fingerprint[:12]}-{uuid[:8]}) 2. Populating CRD spec from signal data 3. Adding required labels for querying/filtering 4. Creating CRD in Kubernetes via API 5. Recording metrics (success/failure)
CRD naming (DD-AUDIT-CORRELATION-002): - Format: rr-{fingerprint[:12]}-{uuid[:8]} - Example: rr-b157a3a9e42f-1c2b5576 - Reason: Human-readable fingerprint prefix + UUID suffix for zero collision risk
func NewCRDCreator ¶
func NewCRDCreator(k8sClient k8s.ClientInterface, logger logr.Logger, metricsInstance *metrics.Metrics, retryConfig *config.RetrySettings, retryObserver RetryObserver, controllerNamespace string) *CRDCreator
NewCRDCreator creates a new CRD creator BR-GATEWAY-111: Accepts retry configuration for K8s API retry logic BR-GATEWAY-058: Accepts RetryObserver for per-attempt audit compliance DD-005: Uses logr.Logger for unified logging
func NewCRDCreatorWithClock ¶
func NewCRDCreatorWithClock(k8sClient k8s.ClientInterface, logger logr.Logger, metricsInstance *metrics.Metrics, retryConfig *config.RetrySettings, retryObserver RetryObserver, controllerNamespace string, clock Clock) *CRDCreator
NewCRDCreatorWithClock creates a new CRD creator with a custom clock This variant enables testing with MockClock for time-dependent behavior
TDD GREEN: Accepts ClientInterface to support circuit breaker (BR-GATEWAY-093)
func (*CRDCreator) CreateRemediationRequest ¶
func (c *CRDCreator) CreateRemediationRequest( ctx context.Context, signal *types.NormalizedSignal, ) (*remediationv1alpha1.RemediationRequest, error)
CreateRemediationRequest creates a RemediationRequest CRD from a signal
This method: 1. Generates CRD name from fingerprint 2. Populates metadata (labels, annotations) 3. Populates spec (signal data, deduplication info, timestamps) 4. Creates CRD in Kubernetes 5. Records success/failure metrics 6. Logs creation event
CRD structure:
apiVersion: remediation.kubernaut.ai/v1alpha1
kind: RemediationRequest
metadata:
name: rr-<fingerprint[:12]>-<uuid[:8]>
namespace: <controller-namespace>
spec:
signalFingerprint: <fingerprint>
signalName: HighMemoryUsage
severity: critical
environment: prod
priority: P0
signalType: prometheus-alert
targetType: kubernetes
firingTime: <timestamp>
receivedTime: <timestamp>
signalLabels: {alertname: ..., namespace: ..., pod: ...}
signalAnnotations: {summary: ..., description: ...}
originalPayload: <base64-encoded-json>
deduplication:
firstSeen: <timestamp>
lastSeen: <timestamp>
occurrenceCount: 1
Parameters: - ctx: Context for cancellation and timeout - signal: Normalized signal from adapter
Note: Environment and Priority classification removed from Gateway (2025-12-06) These are now owned by Signal Processing service per DD-CATEGORIZATION-001.
Returns: - *RemediationRequest: Created CRD with populated fields - error: Kubernetes API errors or validation errors
type Clock ¶
Clock provides time-related operations for dependency injection
This interface enables: - Fast, deterministic tests (no time.Sleep() needed) - Time-based behavior testing without wall-clock dependency - Better test isolation and reliability
Usage: - Production: Use RealClock for actual time - Tests: Use MockClock with controllable time
type DeduplicationError ¶
type DeduplicationError struct {
*OperationError
DedupeStatus string // Deduplication status (new/duplicate/unknown)
}
DeduplicationError is a specialized error for deduplication failures. Extends OperationError with deduplication-specific fields.
func NewDeduplicationError ¶
func NewDeduplicationError(fingerprint, namespace, dedupeStatus string, attempts int, startTime time.Time, err error) *DeduplicationError
NewDeduplicationError creates a deduplication error with full context. Automatically sets operation to "check_deduplication" and phase to "deduplication".
func (*DeduplicationError) Error ¶
func (e *DeduplicationError) Error() string
Error extends OperationError.Error() with deduplication-specific fields.
type DeduplicationMetadata ¶
type DeduplicationMetadata struct {
// Fingerprint is the SHA256 hash of the alert
Fingerprint string `json:"fingerprint,omitempty"`
// Count is the number of times this alert has been received (including current)
// Example: First duplicate has count=2 (original + 1 duplicate)
Count int `json:"count"`
// RemediationRequestRef is the name of the RemediationRequest CRD
// Used in HTTP 202 response to inform caller which CRD was updated
RemediationRequestRef string `json:"remediationRequestRef,omitempty"`
// FirstOccurrence is when the alert first appeared (ISO 8601 timestamp)
// Example: "2025-10-09T10:00:00Z"
FirstOccurrence string `json:"firstOccurrence,omitempty"`
// LastOccurrence is the most recent occurrence (ISO 8601 timestamp)
// Updated every time a duplicate is detected
// Example: "2025-10-09T10:04:30Z"
LastOccurrence string `json:"lastOccurrence,omitempty"`
}
DeduplicationMetadata contains deduplication tracking information for HTTP responses DD-GATEWAY-011: This data now comes from RR status.deduplication instead of Redis
type DistributedLockManager ¶
type DistributedLockManager struct {
// contains filtered or unexported fields
}
DistributedLockManager manages K8s Lease-based distributed locks for multi-replica Gateway safety.
Business Requirement: BR-GATEWAY-190 (Multi-Replica Deduplication Safety) Design Decision: ADR-052 (K8s Lease-Based Distributed Locking Pattern) API Client Choice: Uses apiReader (non-cached client) for immediate consistency
Purpose: Prevents duplicate RemediationRequest CRD creation when multiple Gateway pods process the same signal concurrently (fixes GW-DEDUP-002 race condition).
Pattern: Uses Kubernetes coordination.k8s.io/v1 Lease resources for distributed locking. This ensures only 1 pod can acquire a lock for a given fingerprint at a time.
WHY apiReader (Non-Cached Client)? - ✅ Immediate Consistency: No cache sync delay (5-50ms race window eliminated) - ✅ Correctness: Distributed locking requires guaranteed freshness - ✅ Production-Ready: K8s leader-election uses direct API calls for Lease operations
API Server Impact: Acceptable at production scale - Normal load (1 signal/sec): 3 API req/sec (negligible) - Peak load (8 signals/sec): 24 API req/sec (low) - Design target (1000 signals/sec): 3000 API req/sec (30-60% of K8s API capacity)
Lock Lifecycle:
- AcquireLock(fingerprint) - Creates or claims Lease
- Process signal (create CRD)
- ReleaseLock(fingerprint) - Deletes Lease
Fault Tolerance: Expired leases (from crashed pods) can be taken over after 30s.
func NewDistributedLockManager ¶
func NewDistributedLockManager(k8sClient client.Client, namespace, holderID string) *DistributedLockManager
NewDistributedLockManager creates a new distributed lock manager.
Parameters:
- client: K8s client for Lease operations (MUST be apiReader/non-cached for immediate consistency)
- namespace: Namespace where Leases will be created (typically "kubernaut-system")
- holderID: Unique identifier for this pod (typically POD_NAME from env)
IMPORTANT: Pass apiReader (non-cached client) to avoid race conditions. Cached clients have 5-50ms sync delay, allowing duplicate lock acquisitions.
Example (Production):
lockManager := NewDistributedLockManager(apiReader, "kubernaut-system", "gateway-pod-1")
Example (Testing):
lockManager := NewDistributedLockManager(k8sClient, "default", "test-pod")
func (*DistributedLockManager) AcquireLock ¶
AcquireLock attempts to acquire a distributed lock for the given fingerprint.
Returns:
- (true, nil) if lock was acquired
- (false, nil) if lock is held by another pod (contention - not an error)
- (false, err) if K8s API error occurred
Lock Acquisition Flow:
- Try to Get existing Lease
- If not found, Create new Lease (acquire lock)
- If found and held by us, return true (idempotent)
- If found and held by another pod, check if expired
- If expired, Update Lease to take over
- If not expired, return false (contention)
Example:
acquired, err := lockManager.AcquireLock(ctx, signal.Fingerprint)
if err != nil {
return fmt.Errorf("lock acquisition failed: %w", err)
}
if !acquired {
// Lock held by another pod - retry or return
return nil
}
defer lockManager.ReleaseLock(ctx, signal.Fingerprint)
func (*DistributedLockManager) ReleaseLock ¶
func (m *DistributedLockManager) ReleaseLock(ctx context.Context, fingerprint string) error
ReleaseLock releases the distributed lock by deleting the Lease.
This method is idempotent - it's safe to call multiple times (e.g., in defer). If the Lease doesn't exist (already deleted), no error is returned.
Returns:
- nil on success
- error if K8s API error occurred (excluding NotFound)
Example:
defer func() {
if err := lockManager.ReleaseLock(ctx, signal.Fingerprint); err != nil {
logger.Error(err, "Failed to release lock")
}
}()
type MockClock ¶
type MockClock struct {
// contains filtered or unexported fields
}
MockClock provides controllable time for testing
Usage in tests:
clock := NewMockClock(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)) crdCreator := NewCRDCreator(..., clock) // First call rr1, _ := crdCreator.CreateRemediationRequest(ctx, signal) // Advance time for uniqueness testing clock.Advance(1 * time.Second) // Second call (different timestamp) rr2, _ := crdCreator.CreateRemediationRequest(ctx, signal)
func NewMockClock ¶
NewMockClock creates a new MockClock with the specified initial time
type OperationError ¶
type OperationError struct {
Operation string // Operation name (e.g., "create_remediation_request")
Phase string // Processing phase (e.g., "deduplication", "crd_creation")
Fingerprint string // Signal fingerprint (correlation ID)
Namespace string // Target namespace
Attempts int // Number of retry attempts
Duration time.Duration // Total operation duration
StartTime time.Time // Operation start time
CorrelationID string // Request correlation ID (RR name)
Underlying error // Wrapped underlying error
}
OperationError provides rich context for processing errors with timing, correlation, and retry information. GAP-10: Enhanced Error Wrapping
This structured error type helps operators quickly diagnose issues by providing: - Operation: Operation name (e.g., "create_remediation_request") - Phase: Processing phase (e.g., "deduplication", "crd_creation") - Fingerprint: Signal fingerprint (serves as correlation ID) - Namespace: Target namespace - Attempts: Number of retry attempts - Duration: Total operation duration - StartTime: Operation start time - CorrelationID: Request correlation ID (typically RR name) - Underlying: Wrapped underlying error
Example:
err := NewOperationError(
"create_remediation_request",
"crd_creation",
"abc123",
"default",
"rr-pod-crash-abc123",
3,
startTime,
kubernetesErr,
)
func NewOperationError ¶
func NewOperationError(operation, phase, fingerprint, namespace, correlationID string, attempts int, startTime time.Time, err error) *OperationError
NewOperationError creates a new operation error with automatic duration calculation. Duration is calculated as time.Since(startTime).
func (*OperationError) Error ¶
func (e *OperationError) Error() string
Error implements the error interface with rich, actionable error messages. Format:
{operation} failed: phase={phase}, fingerprint={fingerprint}, namespace={namespace},
attempts={attempts}, duration={duration}, correlation={correlationID}: {underlying}
func (*OperationError) Unwrap ¶
func (e *OperationError) Unwrap() error
Unwrap returns the underlying error for error chain unwrapping. This enables errors.Is() and errors.As() to work with OperationError.
type PhaseBasedDeduplicationChecker ¶
type PhaseBasedDeduplicationChecker struct {
// contains filtered or unexported fields
}
PhaseBasedDeduplicationChecker checks for existing in-progress RRs by fingerprint
func NewPhaseBasedDeduplicationChecker ¶
func NewPhaseBasedDeduplicationChecker(k8sClient client.Reader, cooldownPeriod time.Duration) *PhaseBasedDeduplicationChecker
NewPhaseBasedDeduplicationChecker creates a new phase-based checker. DD-GATEWAY-011: Accepts client.Reader to allow apiReader (cache-bypassed) for race-free deduplication. #280: cooldownPeriod removed — Verifying phase replaces post-completion cooldown. The cooldownPeriod parameter is retained for backward compatibility but ignored.
func (*PhaseBasedDeduplicationChecker) ShouldDeduplicate ¶
func (c *PhaseBasedDeduplicationChecker) ShouldDeduplicate(ctx context.Context, namespace, fingerprint string) (bool, *remediationv1alpha1.RemediationRequest, error)
ShouldDeduplicate checks if a signal should be deduplicated based on existing RR phase
DD-GATEWAY-011 v1.3: Phase-Based Deduplication Decision This method: 1. Lists RRs matching the fingerprint via field selector (BR-GATEWAY-185 v1.1) 2. Checks if any RR is in a non-terminal phase (including Blocked) 3. Returns true (deduplicate) if active RR exists 4. Returns false (allow new RR) if no active RR exists
v1.3 SIMPLIFICATION: - Gateway does NOT count consecutive failures - Gateway does NOT create Blocked RRs - Gateway simply checks: "Is there an active RR?" → update dedup, else create new
BR-GATEWAY-185 v1.1: Use spec.signalFingerprint field selector instead of labels - Labels are mutable and truncated to 63 chars (data loss risk) - spec.signalFingerprint is immutable and supports full 64-char SHA256
Business Requirements: - BR-GATEWAY-181: Move deduplication tracking to status - BR-GATEWAY-185: Field selector for fingerprint lookup (v1.1) - BR-ORCH-042: Consecutive failure blocking (RO responsibility, NOT Gateway)
Parameters: - ctx: Context for cancellation and timeout - namespace: Namespace to search in - fingerprint: Signal fingerprint to match (full 64-char SHA256)
Returns: - bool: true if should deduplicate (in-progress RR exists) - *RemediationRequest: existing in-progress RR (nil if none) - error: K8s API errors
type RetryError ¶
type RetryError struct {
Attempt int // Final attempt number (1-based)
MaxAttempts int // Maximum retry attempts configured
OriginalErr error // Original underlying error
ErrorType string // Error type classification (e.g., "transient", "timeout")
IsRetryable bool // Whether error was retryable
}
RetryError wraps errors from retry operations with retry context. This is used by createCRDWithRetry to provide rich context about retry attempts.
func (*RetryError) Error ¶
func (e *RetryError) Error() string
Error implements the error interface for RetryError.
func (*RetryError) Unwrap ¶
func (e *RetryError) Unwrap() error
Unwrap returns the underlying error for error chain unwrapping.
type RetryObserver ¶
type RetryObserver interface {
OnRetryAttempt(ctx context.Context, signal *types.NormalizedSignal, attempt int, err error)
}
RetryObserver is notified on each failed CRD creation retry attempt. BR-GATEWAY-058: Every retry attempt MUST be observed for audit compliance. BR-GATEWAY-113: Decouples retry observation from CRD creation logic.
type StatusUpdater ¶
type StatusUpdater struct {
// contains filtered or unexported fields
}
StatusUpdater handles status updates to RemediationRequest CRDs
func NewStatusUpdater ¶
func NewStatusUpdater(k8sClient client.Client, apiReader client.Reader) *StatusUpdater
NewStatusUpdater creates a new status updater apiReader bypasses controller-runtime cache for optimistic locking refetches (DD-STATUS-001)
func (*StatusUpdater) UpdateDeduplicationStatus ¶
func (u *StatusUpdater) UpdateDeduplicationStatus(ctx context.Context, rr *remediationv1alpha1.RemediationRequest) error
UpdateDeduplicationStatus updates the status.deduplication fields for a duplicate signal
DD-GATEWAY-011: Status-Based Deduplication This method: 1. Refetches RR to get latest resourceVersion (using apiReader for cache-bypassed read) 2. Updates ONLY status.deduplication (Gateway-owned) 3. Uses Status().Update() to update status subresource 4. Retries on conflict (optimistic concurrency)
DD-STATUS-001: Uses apiReader to bypass controller-runtime cache This prevents "not found" errors when Gateway reads immediately after CRD creation, as the cached client may not have synced yet. Direct API server reads are always fresh.
Business Requirements: - BR-GATEWAY-181: Move deduplication tracking from spec to status - BR-GATEWAY-183: Implement optimistic concurrency for status updates
Parameters: - ctx: Context for cancellation and timeout - rr: RemediationRequest to update
Returns: - error: K8s API errors (not found, timeout, etc.)