processing

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LockDurationSeconds defines how long a lock is valid before it expires.
	// If a pod crashes without releasing, another pod can take over after 30s.
	LockDurationSeconds = 30
)

Variables

View Source
var GatewayRetryBackoff = retry.DefaultBackoff

GatewayRetryBackoff is the retry configuration for Gateway status updates. DD-GATEWAY-011: Optimized for Gateway latency requirements (P95 <50ms)

Functions

func IsTerminalPhase

func IsTerminalPhase(phase remediationv1alpha1.RemediationPhase) bool

IsTerminalPhase checks if a RemediationRequest phase is terminal. Terminal phases allow new RR creation for the same signal fingerprint.

DD-GATEWAY-011 v1.3: Terminal phase classification DD-GATEWAY-009: Cancelled state handling (allows retry)

TERMINAL (allow new RR creation): - Completed: Remediation succeeded - Failed: Remediation failed (including after Blocked→Failed transition) - TimedOut: Remediation timed out - Skipped: Remediation was not needed (per BR-ORCH-032) - Cancelled: Remediation was manually cancelled (per DD-GATEWAY-009, allows retry)

NON-TERMINAL (deduplicate → update status): - Pending, Processing, Analyzing, AwaitingApproval, Executing, Verifying - Blocked: RO holds for cooldown, Gateway updates dedup status (prevents RR flood) - Verifying: EA assessment in progress (#280), Gateway deduplicates

WHITELIST approach (safer than blacklist): - Only explicitly terminal phases allow new RR - ALL other phases (including Blocked, Verifying, and unknown future phases) are non-terminal

Phase values per api/remediation/v1alpha1/remediationrequest_types.go: - Terminal: Completed, Failed, TimedOut, Skipped, Cancelled - Non-Terminal: Pending, Processing, Analyzing, AwaitingApproval, Executing, Verifying, Blocked

🏛️ Compliance: BR-COMMON-001 (Phase Format), Viceversa Pattern (Cross-Service Consumption)

Types

type CRDCreationError

type CRDCreationError struct {
	*OperationError
	CRDName    string // RemediationRequest name
	SignalType string // Signal type (alert/event)
	SignalName string // Alert name (if applicable)
}

CRDCreationError is a specialized error for CRD creation failures. Extends OperationError with CRD-specific fields.

func NewCRDCreationError

func NewCRDCreationError(fingerprint, namespace, crdName, signalType, alertName string, attempts int, startTime time.Time, err error) *CRDCreationError

NewCRDCreationError creates a CRD creation error with full context. Automatically sets operation to "create_remediation_request" and phase to "crd_creation".

func (*CRDCreationError) Error

func (e *CRDCreationError) Error() string

Error extends OperationError.Error() with CRD-specific fields.

type CRDCreator

type CRDCreator struct {
	// contains filtered or unexported fields
}

CRDCreator converts NormalizedSignal to RemediationRequest CRD

This component is responsible for: 1. Generating unique CRD names (rr-{fingerprint[:12]}-{uuid[:8]}) 2. Populating CRD spec from signal data 3. Adding required labels for querying/filtering 4. Creating CRD in Kubernetes via API 5. Recording metrics (success/failure)

CRD naming (DD-AUDIT-CORRELATION-002): - Format: rr-{fingerprint[:12]}-{uuid[:8]} - Example: rr-b157a3a9e42f-1c2b5576 - Reason: Human-readable fingerprint prefix + UUID suffix for zero collision risk

func NewCRDCreator

func NewCRDCreator(k8sClient k8s.ClientInterface, logger logr.Logger, metricsInstance *metrics.Metrics, retryConfig *config.RetrySettings, retryObserver RetryObserver, controllerNamespace string) *CRDCreator

NewCRDCreator creates a new CRD creator BR-GATEWAY-111: Accepts retry configuration for K8s API retry logic BR-GATEWAY-058: Accepts RetryObserver for per-attempt audit compliance DD-005: Uses logr.Logger for unified logging

func NewCRDCreatorWithClock

func NewCRDCreatorWithClock(k8sClient k8s.ClientInterface, logger logr.Logger, metricsInstance *metrics.Metrics, retryConfig *config.RetrySettings, retryObserver RetryObserver, controllerNamespace string, clock Clock) *CRDCreator

NewCRDCreatorWithClock creates a new CRD creator with a custom clock This variant enables testing with MockClock for time-dependent behavior

TDD GREEN: Accepts ClientInterface to support circuit breaker (BR-GATEWAY-093)

func (*CRDCreator) CreateRemediationRequest

func (c *CRDCreator) CreateRemediationRequest(
	ctx context.Context,
	signal *types.NormalizedSignal,
) (*remediationv1alpha1.RemediationRequest, error)

CreateRemediationRequest creates a RemediationRequest CRD from a signal

This method: 1. Generates CRD name from fingerprint 2. Populates metadata (labels, annotations) 3. Populates spec (signal data, timestamps) 4. Creates CRD in Kubernetes 5. Records success/failure metrics 6. Logs creation event

CRD structure:

apiVersion: remediation.kubernaut.ai/v1alpha1
kind: RemediationRequest
metadata:
  name: rr-<fingerprint[:12]>-<uuid[:8]>
  namespace: <controller-namespace>
spec:
  signalFingerprint: <fingerprint>
  signalName: HighMemoryUsage
  severity: critical
  environment: prod
  priority: P0
  signalType: prometheus-alert
  targetType: kubernetes
  firingTime: <timestamp>
  receivedTime: <timestamp>
  signalLabels: {alertname: ..., namespace: ..., pod: ...}
  signalAnnotations: {summary: ..., description: ...}
  originalPayload: <base64-encoded-json>

Parameters: - ctx: Context for cancellation and timeout - signal: Normalized signal from adapter

Note: Environment and Priority classification removed from Gateway (2025-12-06) These are now owned by Signal Processing service per DD-CATEGORIZATION-001.

Returns: - *RemediationRequest: Created CRD with populated fields - error: Kubernetes API errors or validation errors

type Clock

type Clock interface {
	// Now returns the current time
	Now() time.Time
}

Clock provides time-related operations for dependency injection

This interface enables: - Fast, deterministic tests (no time.Sleep() needed) - Time-based behavior testing without wall-clock dependency - Better test isolation and reliability

Usage: - Production: Use RealClock for actual time - Tests: Use MockClock with controllable time

type DeduplicationError

type DeduplicationError struct {
	*OperationError
	DedupeStatus string // Deduplication status (new/duplicate/unknown)
}

DeduplicationError is a specialized error for deduplication failures. Extends OperationError with deduplication-specific fields.

func NewDeduplicationError

func NewDeduplicationError(fingerprint, namespace, dedupeStatus string, attempts int, startTime time.Time, err error) *DeduplicationError

NewDeduplicationError creates a deduplication error with full context. Automatically sets operation to "check_deduplication" and phase to "deduplication".

func (*DeduplicationError) Error

func (e *DeduplicationError) Error() string

Error extends OperationError.Error() with deduplication-specific fields.

type DeduplicationMetadata

type DeduplicationMetadata struct {
	// Fingerprint is the SHA256 hash of the alert
	Fingerprint string `json:"fingerprint,omitempty"`

	// Count is the number of times this alert has been received (including current)
	// Example: First duplicate has count=2 (original + 1 duplicate)
	Count int `json:"count"`

	// RemediationRequestRef is the name of the RemediationRequest CRD
	// Used in HTTP 202 response to inform caller which CRD was updated
	RemediationRequestRef string `json:"remediationRequestRef,omitempty"`

	// FirstOccurrence is when the alert first appeared (ISO 8601 timestamp)
	// Example: "2025-10-09T10:00:00Z"
	FirstOccurrence string `json:"firstOccurrence,omitempty"`

	// LastOccurrence is the most recent occurrence (ISO 8601 timestamp)
	// Updated every time a duplicate is detected
	// Example: "2025-10-09T10:04:30Z"
	LastOccurrence string `json:"lastOccurrence,omitempty"`
}

DeduplicationMetadata contains deduplication tracking information for HTTP responses DD-GATEWAY-011: This data now comes from RR status.deduplication instead of Redis

type DistributedLockManager

type DistributedLockManager struct {
	// contains filtered or unexported fields
}

DistributedLockManager manages K8s Lease-based distributed locks for multi-replica Gateway safety.

Business Requirement: BR-GATEWAY-190 (Multi-Replica Deduplication Safety) Design Decision: ADR-052 (K8s Lease-Based Distributed Locking Pattern) API Client Choice: Uses apiReader (non-cached client) for immediate consistency

Purpose: Prevents duplicate RemediationRequest CRD creation when multiple Gateway pods process the same signal concurrently (fixes GW-DEDUP-002 race condition).

Pattern: Uses Kubernetes coordination.k8s.io/v1 Lease resources for distributed locking. This ensures only 1 pod can acquire a lock for a given fingerprint at a time.

WHY apiReader (Non-Cached Client)? - ✅ Immediate Consistency: No cache sync delay (5-50ms race window eliminated) - ✅ Correctness: Distributed locking requires guaranteed freshness - ✅ Production-Ready: K8s leader-election uses direct API calls for Lease operations

API Server Impact: Acceptable at production scale - Normal load (1 signal/sec): 3 API req/sec (negligible) - Peak load (8 signals/sec): 24 API req/sec (low) - Design target (1000 signals/sec): 3000 API req/sec (30-60% of K8s API capacity)

Lock Lifecycle:

  1. AcquireLock(fingerprint) - Creates or claims Lease
  2. Process signal (create CRD)
  3. ReleaseLock(fingerprint) - Deletes Lease

Fault Tolerance: Expired leases (from crashed pods) can be taken over after 30s.

func NewDistributedLockManager

func NewDistributedLockManager(k8sClient client.Client, namespace, holderID string) *DistributedLockManager

NewDistributedLockManager creates a new distributed lock manager.

Parameters:

  • client: K8s client for Lease operations (MUST be apiReader/non-cached for immediate consistency)
  • namespace: Namespace where Leases will be created (typically "kubernaut-system")
  • holderID: Unique identifier for this pod (typically POD_NAME from env)

IMPORTANT: Pass apiReader (non-cached client) to avoid race conditions. Cached clients have 5-50ms sync delay, allowing duplicate lock acquisitions.

Example (Production):

lockManager := NewDistributedLockManager(apiReader, "kubernaut-system", "gateway-pod-1")

Example (Testing):

lockManager := NewDistributedLockManager(k8sClient, "default", "test-pod")

func (*DistributedLockManager) AcquireLock

func (m *DistributedLockManager) AcquireLock(ctx context.Context, fingerprint string) (bool, error)

AcquireLock attempts to acquire a distributed lock for the given fingerprint.

Returns:

  • (true, nil) if lock was acquired
  • (false, nil) if lock is held by another pod (contention - not an error)
  • (false, err) if K8s API error occurred

Lock Acquisition Flow:

  1. Try to Get existing Lease
  2. If not found, Create new Lease (acquire lock)
  3. If found and held by us, return true (idempotent)
  4. If found and held by another pod, check if expired
  5. If expired, Update Lease to take over
  6. If not expired, return false (contention)

Example:

acquired, err := lockManager.AcquireLock(ctx, signal.Fingerprint)
if err != nil {
    return fmt.Errorf("lock acquisition failed: %w", err)
}
if !acquired {
    // Lock held by another pod - retry or return
    return nil
}
defer lockManager.ReleaseLock(ctx, signal.Fingerprint)

func (*DistributedLockManager) ReleaseLock

func (m *DistributedLockManager) ReleaseLock(ctx context.Context, fingerprint string) error

ReleaseLock releases the distributed lock by deleting the Lease.

This method is idempotent - it's safe to call multiple times (e.g., in defer). If the Lease doesn't exist (already deleted), no error is returned.

Returns:

  • nil on success
  • error if K8s API error occurred (excluding NotFound)

Example:

defer func() {
    if err := lockManager.ReleaseLock(ctx, signal.Fingerprint); err != nil {
        logger.Error(err, "Failed to release lock")
    }
}()

type MockClock

type MockClock struct {
	// contains filtered or unexported fields
}

MockClock provides controllable time for testing

Usage in tests:

clock := NewMockClock(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC))
crdCreator := NewCRDCreator(..., clock)

// First call
rr1, _ := crdCreator.CreateRemediationRequest(ctx, signal)

// Advance time for uniqueness testing
clock.Advance(1 * time.Second)

// Second call (different timestamp)
rr2, _ := crdCreator.CreateRemediationRequest(ctx, signal)

func NewMockClock

func NewMockClock(initialTime time.Time) *MockClock

NewMockClock creates a new MockClock with the specified initial time

func (*MockClock) Advance

func (c *MockClock) Advance(d time.Duration)

Advance moves the mock clock forward by the specified duration

This enables fast testing of time-dependent behavior without sleep:

clock.Advance(1 * time.Second)  // Instant, no actual wait

func (*MockClock) Now

func (c *MockClock) Now() time.Time

Now returns the current mock time

func (*MockClock) Set

func (c *MockClock) Set(t time.Time)

Set sets the mock clock to a specific time

type OperationError

type OperationError struct {
	Operation     string        // Operation name (e.g., "create_remediation_request")
	Phase         string        // Processing phase (e.g., "deduplication", "crd_creation")
	Fingerprint   string        // Signal fingerprint (correlation ID)
	Namespace     string        // Target namespace
	Attempts      int           // Number of retry attempts
	Duration      time.Duration // Total operation duration
	StartTime     time.Time     // Operation start time
	CorrelationID string        // Request correlation ID (RR name)
	Underlying    error         // Wrapped underlying error
}

OperationError provides rich context for processing errors with timing, correlation, and retry information. GAP-10: Enhanced Error Wrapping

This structured error type helps operators quickly diagnose issues by providing: - Operation: Operation name (e.g., "create_remediation_request") - Phase: Processing phase (e.g., "deduplication", "crd_creation") - Fingerprint: Signal fingerprint (serves as correlation ID) - Namespace: Target namespace - Attempts: Number of retry attempts - Duration: Total operation duration - StartTime: Operation start time - CorrelationID: Request correlation ID (typically RR name) - Underlying: Wrapped underlying error

Example:

err := NewOperationError(
    "create_remediation_request",
    "crd_creation",
    "abc123",
    "default",
    "rr-pod-crash-abc123",
    3,
    startTime,
    kubernetesErr,
)

func NewOperationError

func NewOperationError(operation, phase, fingerprint, namespace, correlationID string, attempts int, startTime time.Time, err error) *OperationError

NewOperationError creates a new operation error with automatic duration calculation. Duration is calculated as time.Since(startTime).

func (*OperationError) Error

func (e *OperationError) Error() string

Error implements the error interface with rich, actionable error messages. Format:

{operation} failed: phase={phase}, fingerprint={fingerprint}, namespace={namespace},
attempts={attempts}, duration={duration}, correlation={correlationID}: {underlying}

func (*OperationError) Unwrap

func (e *OperationError) Unwrap() error

Unwrap returns the underlying error for error chain unwrapping. This enables errors.Is() and errors.As() to work with OperationError.

type PhaseBasedDeduplicationChecker

type PhaseBasedDeduplicationChecker struct {
	// contains filtered or unexported fields
}

PhaseBasedDeduplicationChecker checks for existing in-progress RRs by fingerprint

func NewPhaseBasedDeduplicationChecker

func NewPhaseBasedDeduplicationChecker(k8sClient client.Reader, cooldownPeriod time.Duration) *PhaseBasedDeduplicationChecker

NewPhaseBasedDeduplicationChecker creates a new phase-based checker. DD-GATEWAY-011: Accepts client.Reader to allow apiReader (cache-bypassed) for race-free deduplication. #280: cooldownPeriod removed — Verifying phase replaces post-completion cooldown. The cooldownPeriod parameter is retained for backward compatibility but ignored.

func (*PhaseBasedDeduplicationChecker) ShouldDeduplicate

func (c *PhaseBasedDeduplicationChecker) ShouldDeduplicate(ctx context.Context, namespace, fingerprint string) (bool, *remediationv1alpha1.RemediationRequest, error)

ShouldDeduplicate checks if a signal should be deduplicated based on existing RR phase

DD-GATEWAY-011 v1.3: Phase-Based Deduplication Decision This method: 1. Lists RRs matching the fingerprint via field selector (BR-GATEWAY-185 v1.1) 2. Checks if any RR is in a non-terminal phase (including Blocked) 3. Returns true (deduplicate) if active RR exists 4. Returns false (allow new RR) if no active RR exists

v1.3 SIMPLIFICATION: - Gateway does NOT count consecutive failures - Gateway does NOT create Blocked RRs - Gateway simply checks: "Is there an active RR?" → update dedup, else create new

BR-GATEWAY-185 v1.1: Use spec.signalFingerprint field selector instead of labels - Labels are mutable and truncated to 63 chars (data loss risk) - spec.signalFingerprint is immutable and supports full 64-char SHA256

Business Requirements: - BR-GATEWAY-181: Move deduplication tracking to status - BR-GATEWAY-185: Field selector for fingerprint lookup (v1.1) - BR-ORCH-042: Consecutive failure blocking (RO responsibility, NOT Gateway)

Parameters: - ctx: Context for cancellation and timeout - namespace: Namespace to search in - fingerprint: Signal fingerprint to match (full 64-char SHA256)

Returns: - bool: true if should deduplicate (in-progress RR exists) - *RemediationRequest: existing in-progress RR (nil if none) - error: K8s API errors

type RealClock

type RealClock struct{}

RealClock provides actual system time for production use

func NewRealClock

func NewRealClock() *RealClock

NewRealClock creates a new RealClock instance

func (*RealClock) Now

func (c *RealClock) Now() time.Time

Now returns the current system time

type RetryError

type RetryError struct {
	Attempt     int    // Final attempt number (1-based)
	MaxAttempts int    // Maximum retry attempts configured
	OriginalErr error  // Original underlying error
	ErrorType   string // Error type classification (e.g., "transient", "timeout")
	IsRetryable bool   // Whether error was retryable
}

RetryError wraps errors from retry operations with retry context. This is used by createCRDWithRetry to provide rich context about retry attempts.

func (*RetryError) Error

func (e *RetryError) Error() string

Error implements the error interface for RetryError.

func (*RetryError) Unwrap

func (e *RetryError) Unwrap() error

Unwrap returns the underlying error for error chain unwrapping.

type RetryObserver

type RetryObserver interface {
	OnRetryAttempt(ctx context.Context, signal *types.NormalizedSignal, attempt int, err error)
}

RetryObserver is notified on each failed CRD creation retry attempt. BR-GATEWAY-058: Every retry attempt MUST be observed for audit compliance. BR-GATEWAY-113: Decouples retry observation from CRD creation logic.

type StatusUpdater

type StatusUpdater struct {
	// contains filtered or unexported fields
}

StatusUpdater handles status updates to RemediationRequest CRDs

func NewStatusUpdater

func NewStatusUpdater(k8sClient client.Client, apiReader client.Reader) *StatusUpdater

NewStatusUpdater creates a new status updater apiReader bypasses controller-runtime cache for optimistic locking refetches (DD-STATUS-001)

func (*StatusUpdater) UpdateDeduplicationStatus

func (u *StatusUpdater) UpdateDeduplicationStatus(ctx context.Context, rr *remediationv1alpha1.RemediationRequest) error

UpdateDeduplicationStatus updates the status.deduplication fields for a duplicate signal

DD-GATEWAY-011: Status-Based Deduplication This method: 1. Refetches RR to get latest resourceVersion (using apiReader for cache-bypassed read) 2. Updates ONLY status.deduplication (Gateway-owned) 3. Uses Status().Update() to update status subresource 4. Retries on conflict (optimistic concurrency)

DD-STATUS-001: Uses apiReader to bypass controller-runtime cache This prevents "not found" errors when Gateway reads immediately after CRD creation, as the cached client may not have synced yet. Direct API server reads are always fresh.

Business Requirements: - BR-GATEWAY-181: Move deduplication tracking from spec to status - BR-GATEWAY-183: Implement optimistic concurrency for status updates

Parameters: - ctx: Context for cancellation and timeout - rr: RemediationRequest to update

Returns: - error: K8s API errors (not found, timeout, etc.)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL