Documentation
¶
Index ¶
- Constants
- Variables
- func BuildLabelSelector(requirements []SelectorRequirement) (string, error)
- func NewRestConfig(endpoint, caCert, token string) (*rest.Config, error)
- func TestConnection(ctx context.Context, endpoint, caCert, token string) error
- func ValidateIntegration(name, endpoint, caCert, bearerToken string) error
- type ConnectionValidationError
- type CreateIntegrationRequest
- type HealthMonitor
- func (h *HealthMonitor) GetIntegrationID() string
- func (h *HealthMonitor) GetReconnectCount() int64
- func (h *HealthMonitor) GetStatus() (IntegrationStatus, time.Time)
- func (h *HealthMonitor) OnSuccessfulSync()
- func (h *HealthMonitor) Start(ctx context.Context)
- func (h *HealthMonitor) Stop()
- func (h *HealthMonitor) WatchErrorHandler(r *cache.Reflector, err error)
- type Integration
- type IntegrationStatus
- type K8sMetricsCollector
- type PodInfo
- type PodPreviewResponse
- type PodWatcher
- type ReconcileEvent
- type ReconcileOp
- type Reconciler
- func (r *Reconciler) EnqueueAdd(networkName, ip string)
- func (r *Reconciler) EnqueueRemoveWithGrace(networkName, ip string)
- func (r *Reconciler) GetBPFUpdateCounts() (success map[string]int64, failure map[string]int64)
- func (r *Reconciler) GetPodCount(networkName string) int
- func (r *Reconciler) GetPodIPs(networkName string) []string
- func (r *Reconciler) GetQueueOverflows() int64
- func (r *Reconciler) SetLogger(logger logr.Logger)
- func (r *Reconciler) Start(ctx context.Context)
- func (r *Reconciler) Stop()
- type ReconcilerOption
- type SelectorRequirement
- type Storage
- func (s *Storage) Create(ctx context.Context, req *CreateIntegrationRequest) (*Integration, error)
- func (s *Storage) Delete(ctx context.Context, id string) error
- func (s *Storage) DeletePodMetadata(networkName string)
- func (s *Storage) Get(ctx context.Context, id string) (*Integration, error)
- func (s *Storage) GetLastSync(networkName string) time.Time
- func (s *Storage) GetPodCount(networkName string) int
- func (s *Storage) List(ctx context.Context) ([]*Integration, error)
- func (s *Storage) Update(ctx context.Context, id string, req *UpdateIntegrationRequest) (*Integration, error)
- func (s *Storage) UpdatePodCount(networkName string, count int)
- type TestConnectionRequest
- type TestConnectionResponse
- type UpdateIntegrationRequest
- type ValidationError
- type ValidationPhase
- type WatcherManager
- func (m *WatcherManager) GetPodIPsForNetwork(networkName string) []string
- func (m *WatcherManager) OnCredentialUpdate(ctx context.Context, integrationID string) error
- func (m *WatcherManager) OnNetworkCreated(network ruleset.Network) error
- func (m *WatcherManager) OnNetworkDeleted(network ruleset.Network) error
- func (m *WatcherManager) OnNetworkUpdated(oldNetwork, newNetwork ruleset.Network) error
- func (m *WatcherManager) SetLogger(logger logr.Logger)
- func (m *WatcherManager) SetReconciler(r *Reconciler)
- func (m *WatcherManager) Start(ctx context.Context, networks []ruleset.Network) error
- func (m *WatcherManager) Stop()
Constants ¶
const ( // DefaultGracePeriod is the default delay before removing pod IPs from BPF maps. // Allows connections to drain during pod restarts (30 seconds per CONTEXT.md). // Can be overridden with WithGracePeriod option for testing. DefaultGracePeriod = 30 * time.Second )
Variables ¶
var ( // PodsTrackedDesc tracks the number of Kubernetes pods currently tracked in BPF maps PodsTrackedDesc = prometheus.NewDesc( "neuwerk_k8s_pods_tracked", "Number of Kubernetes pods currently tracked in BPF maps", []string{"integration_id", "namespace", "network_name"}, nil, ) // ReconnectionsTotalDesc counts watch reconnection events ReconnectionsTotalDesc = prometheus.NewDesc( "neuwerk_k8s_watch_reconnections_total", "Total number of Kubernetes watch reconnection events", []string{"integration_id"}, nil, ) // ConnectionStatusDesc indicates connection health (0=disconnected/pending, 1=degraded, 2=connected) ConnectionStatusDesc = prometheus.NewDesc( "neuwerk_k8s_connection_status", "Kubernetes connection status (0=disconnected/pending, 1=degraded, 2=connected)", []string{"integration_id"}, nil, ) // LastSyncTimestampDesc records Unix timestamp of last successful cache sync LastSyncTimestampDesc = prometheus.NewDesc( "neuwerk_k8s_last_sync_timestamp_seconds", "Unix timestamp of last successful cache sync", []string{"integration_id"}, nil, ) // BPFUpdatesTotalDesc counts BPF map updates from pod events BPFUpdatesTotalDesc = prometheus.NewDesc( "neuwerk_k8s_bpf_updates_total", "Total number of BPF map updates from Kubernetes pod events", []string{"integration_id", "network_name", "result"}, nil, ) // QueueDepthDesc tracks reconciler workqueue depth QueueDepthDesc = prometheus.NewDesc( "neuwerk_k8s_queue_depth", "Number of items in the reconciler workqueue", nil, nil, ) // QueueOverflowsTotalDesc counts events dropped due to queue overflow QueueOverflowsTotalDesc = prometheus.NewDesc( "neuwerk_k8s_queue_overflows_total", "Total number of events dropped due to queue overflow", nil, nil, ) )
Metric descriptors for Kubernetes integration monitoring (Phase 51)
Functions ¶
func BuildLabelSelector ¶ added in v1.6.1
func BuildLabelSelector(requirements []SelectorRequirement) (string, error)
BuildLabelSelector converts structured UI requirements into a Kubernetes label selector string. It validates operators and requirements, returning an error if any requirement is invalid.
Supported operators:
- "=" or "==" : Equality (key must equal value)
- "!=" : Inequality (key must not equal value)
- "in" : Set membership (key must be in values list)
- "notin" : Set exclusion (key must not be in values list)
- "exists" : Existence (key must exist, values must be empty)
- "!exists" : Non-existence (key must not exist, values must be empty)
Empty requirements return an empty string (matches all pods). Multiple requirements are combined with AND logic (comma-separated).
Example:
requirements := []SelectorRequirement{
{Key: "app", Operator: "=", Values: []string{"backend"}},
{Key: "env", Operator: "in", Values: []string{"prod", "staging"}},
}
selector, err := BuildLabelSelector(requirements)
// Returns: "app=backend,env in (prod,staging)"
func NewRestConfig ¶
NewRestConfig creates a REST config from integration credentials with rate limiting It builds an in-memory kubeconfig without file I/O Configures 50 QPS / 100 burst to prevent throttling in multi-cluster setups
func TestConnection ¶
TestConnection validates K8s API server reachability by attempting to list namespaces DEPRECATED: Use ValidateConnection() for phase-labeled error detection It creates a REST config, establishes a clientset, and performs a minimal API call
func ValidateIntegration ¶
ValidateIntegration validates integration credentials before storage It enforces HTTPS endpoints, PEM certificate format, and X.509 validity
Types ¶
type ConnectionValidationError ¶
type ConnectionValidationError struct {
Phase ValidationPhase
Message string
}
ConnectionValidationError represents a connection validation failure with phase context
func ValidateConnection ¶
func ValidateConnection(ctx context.Context, endpoint, caCert, token string) *ConnectionValidationError
ValidateConnection performs sequential validation: certificate → reachability → authentication Returns ConnectionValidationError with phase label on failure, nil on success Uses discovery.ServerVersion() for lightweight server check (no RBAC permissions required)
func (*ConnectionValidationError) Error ¶
func (e *ConnectionValidationError) Error() string
type CreateIntegrationRequest ¶
type CreateIntegrationRequest struct {
// Name is the human-readable name for this integration
Name string `json:"name"`
// Endpoint is the HTTPS URL of the Kubernetes API server
Endpoint string `json:"endpoint"`
// CACert is the CA certificate in PEM format
CACert string `json:"ca_cert"`
// BearerToken is the service account token
BearerToken string `json:"bearer_token"`
}
CreateIntegrationRequest represents the payload for creating a new integration
type HealthMonitor ¶ added in v1.6.1
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor tracks connection health for a Kubernetes integration Implements three-state machine: Connected → Degraded → Disconnected Following 48-RESEARCH.md Pattern 2 & 3: WatchErrorHandler and state transitions
func NewHealthMonitor ¶ added in v1.6.1
func NewHealthMonitor(integrationID string, logger logr.Logger, storage *Storage, clientset kubernetes.Interface) *HealthMonitor
NewHealthMonitor creates a new health monitor for a Kubernetes integration Status starts as StatusPending until first successful sync storage: The Storage instance for persisting health status to NATS KV (Phase 48) clientset: The Kubernetes client for active health checks (can be nil to disable)
func (*HealthMonitor) GetIntegrationID ¶ added in v1.6.1
func (h *HealthMonitor) GetIntegrationID() string
GetIntegrationID returns the integration ID for metrics labeling (Phase 51)
func (*HealthMonitor) GetReconnectCount ¶ added in v1.6.1
func (h *HealthMonitor) GetReconnectCount() int64
GetReconnectCount returns total reconnection events for metrics (Phase 51)
func (*HealthMonitor) GetStatus ¶ added in v1.6.1
func (h *HealthMonitor) GetStatus() (IntegrationStatus, time.Time)
GetStatus returns the current status and last successful sync time (thread-safe) Uses read lock for concurrent access from API handlers
func (*HealthMonitor) OnSuccessfulSync ¶ added in v1.6.1
func (h *HealthMonitor) OnSuccessfulSync()
OnSuccessfulSync is called when informer cache syncs successfully Resets error counter and transitions to Connected state (silent recovery)
func (*HealthMonitor) Start ¶ added in v1.6.1
func (h *HealthMonitor) Start(ctx context.Context)
Start begins periodic health checks for API connectivity and state transitions Runs two tickers: - 1-second ticker for active API server health checks (detects disconnection) - 30-second ticker for Degraded → Disconnected transition after 5 minutes
func (*HealthMonitor) Stop ¶ added in v1.6.1
func (h *HealthMonitor) Stop()
Stop halts the health monitor and prevents goroutine leaks
func (*HealthMonitor) WatchErrorHandler ¶ added in v1.6.1
func (h *HealthMonitor) WatchErrorHandler(r *cache.Reflector, err error)
WatchErrorHandler is called by client-go when watch connection drops Implements cache.WatchErrorHandler interface for SetWatchErrorHandler Following 48-RESEARCH.md Pattern 2: track consecutive failures for state transitions
type Integration ¶
type Integration struct {
// ID is the unique identifier (UUID) for this integration
ID string `json:"id"`
// Name is the human-readable name for this integration
Name string `json:"name"`
// Endpoint is the HTTPS URL of the Kubernetes API server (e.g., https://cluster.example.com:6443)
Endpoint string `json:"endpoint"`
// CACert is the CA certificate in PEM format for verifying the API server's TLS certificate
CACert string `json:"ca_cert"`
// BearerToken is the service account token for authenticating with the API server
// SECURITY: This field is write-only - never returned in API responses (handled in Plan 02)
BearerToken string `json:"bearer_token,omitempty"`
// Status represents the current connection state (from HealthMonitor, Phase 48)
Status IntegrationStatus `json:"status"`
// CreatedAt is the timestamp when this integration was created
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is the timestamp when this integration was last modified
UpdatedAt time.Time `json:"updated_at"`
// LastError stores the most recent validation error message (empty if healthy)
// Only the message is stored, not the timestamp (per CONTEXT.md decision)
LastError string `json:"last_error,omitempty"`
// LastErrorPhase identifies which validation phase failed: "reachability", "certificate", "authentication"
// Empty string indicates no error (integration is healthy)
LastErrorPhase string `json:"last_error_phase,omitempty"`
// LastSync is the timestamp of the last successful cache sync (Phase 48)
// Updated by HealthMonitor.OnSuccessfulSync via persistStatus
LastSync time.Time `json:"last_sync,omitempty"`
}
Integration represents a Kubernetes cluster integration configuration
type IntegrationStatus ¶
type IntegrationStatus string
IntegrationStatus represents the current connection state of an integration
const ( // StatusPending indicates integration is created but not yet verified StatusPending IntegrationStatus = "Pending" // StatusConnected indicates integration is reachable and healthy StatusConnected IntegrationStatus = "Connected" // StatusDegraded indicates integration has intermittent connectivity issues StatusDegraded IntegrationStatus = "Degraded" // StatusDisconnected indicates integration is unreachable StatusDisconnected IntegrationStatus = "Disconnected" )
type K8sMetricsCollector ¶ added in v1.6.1
type K8sMetricsCollector struct {
// contains filtered or unexported fields
}
K8sMetricsCollector implements prometheus.Collector for Kubernetes integration metrics. It follows the custom collector pattern from pkg/metrics/metrics.go, computing metrics on-demand during scrape from in-memory state (WatcherManager and Reconciler).
func NewK8sMetricsCollector ¶ added in v1.6.1
func NewK8sMetricsCollector(manager *WatcherManager, storage *Storage) *K8sMetricsCollector
NewK8sMetricsCollector creates a new Kubernetes metrics collector. Parameters:
- manager: WatcherManager for accessing watchers and reconciler state
- storage: Storage for accessing integration metadata
func (*K8sMetricsCollector) Collect ¶ added in v1.6.1
func (c *K8sMetricsCollector) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector interface. Computes metrics on-demand during Prometheus scrape from current state.
func (*K8sMetricsCollector) Describe ¶ added in v1.6.1
func (c *K8sMetricsCollector) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector interface. Uses DescribeByCollect helper (neuwerk pattern) to generate descriptions from Collect.
type PodInfo ¶ added in v1.6.1
type PodInfo struct {
// Name is the pod name
Name string `json:"name"`
// IP is the pod IP address
IP string `json:"ip"`
}
PodInfo represents basic information about a pod
type PodPreviewResponse ¶ added in v1.6.1
type PodPreviewResponse struct {
// Pods is the list of matching pods (filtered to show only pods with IPs)
Pods []PodInfo `json:"pods"`
// TotalCount is the total number of pods matching the selector (before truncation)
TotalCount int `json:"total_count"`
// Truncated indicates if the result was truncated (showing sample, not all pods)
Truncated bool `json:"truncated"`
}
PodPreviewResponse represents the response for pod preview endpoint Used by GET /api/v1/integrations/kubernetes/{id}/pods
type PodWatcher ¶ added in v1.6.1
type PodWatcher struct {
// contains filtered or unexported fields
}
PodWatcher watches pods in a Kubernetes cluster and extracts pod IPs in real-time It uses SharedInformerFactory for efficient caching and namespace/label scoping Following RESEARCH.md Pattern 1: Complete Pod Watcher Setup
func NewPodWatcher ¶ added in v1.6.1
func NewPodWatcher(clientset *kubernetes.Clientset, namespace, labelSelector, integrationID string, storage *Storage) *PodWatcher
NewPodWatcher creates a new PodWatcher for the given namespace and label selector Accepts clientset from Phase 45 (created via NewRestConfig + kubernetes.NewForConfig) namespace: The Kubernetes namespace to watch (single namespace per watcher) labelSelector: Label selector string (e.g., "app=backend,env in (prod,staging)") integrationID: The ID of the integration this watcher belongs to (for health monitoring, Phase 48) storage: The Storage instance for persisting health status (Phase 48)
func (*PodWatcher) SetLogger ¶ added in v1.6.1
func (w *PodWatcher) SetLogger(logger logr.Logger)
SetLogger configures the logger for this watcher
func (*PodWatcher) SetOnPodIPAdded ¶ added in v1.6.1
func (w *PodWatcher) SetOnPodIPAdded(fn func(ip string))
SetOnPodIPAdded sets the callback invoked when a pod IP is added Callback is invoked for each IP when a pod is created or gets a new IP
func (*PodWatcher) SetOnPodIPRemoved ¶ added in v1.6.1
func (w *PodWatcher) SetOnPodIPRemoved(fn func(ip string))
SetOnPodIPRemoved sets the callback invoked when a pod IP is removed Callback is invoked for each IP when a pod is deleted or loses an IP
func (*PodWatcher) Start ¶ added in v1.6.1
func (w *PodWatcher) Start(ctx context.Context) error
Start initializes the SharedInformerFactory and begins watching pods This method blocks until the initial cache sync completes Returns error if cache sync fails
Implementation sequence follows RESEARCH.md patterns to avoid pitfalls: 1. Create factory with namespace and label selector options (Pattern 1) 2. Get pod informer 3. Register event handlers (Pattern 3) 4. CRITICAL: Start factory BEFORE WaitForCacheSync (Pitfall 3) 5. Wait for cache sync (prevents incomplete firewall rules)
func (*PodWatcher) Stop ¶ added in v1.6.1
func (w *PodWatcher) Stop()
Stop gracefully shuts down the watcher and prevents goroutine leaks (Pitfall 1)
type ReconcileEvent ¶ added in v1.6.1
type ReconcileEvent struct {
// NetworkName is the network this event applies to
NetworkName string
// Operation is the type of operation (add or remove)
Operation ReconcileOp
// IP is the pod IP address to add or remove
IP string
}
ReconcileEvent represents a BPF map update operation from pod events
type ReconcileOp ¶ added in v1.6.1
type ReconcileOp string
ReconcileOp represents the type of reconciliation operation
const ( // OpAddIP adds a pod IP to the BPF map OpAddIP ReconcileOp = "add" // OpRemoveIP removes a pod IP from the BPF map OpRemoveIP ReconcileOp = "remove" )
type Reconciler ¶ added in v1.6.1
type Reconciler struct {
// contains filtered or unexported fields
}
Reconciler manages asynchronous BPF map updates from pod events using workqueue pattern. It decouples pod watcher event handlers from BPF operations to prevent blocking, provides rate limiting and exponential backoff for failed updates, and implements grace periods for connection draining during pod restarts.
func NewReconciler ¶ added in v1.6.1
func NewReconciler(bpfCollection *bpf.Collection, ruleProvider ruleset.RuleProvider, k8sStore *Storage, opts ...ReconcilerOption) *Reconciler
NewReconciler creates a new Reconciler with workqueue initialization. It uses DefaultTypedControllerRateLimiter for exponential backoff and bounded queue depth. Parameters:
- bpfCollection: BPF maps for policy updates
- ruleProvider: Current ruleset for network lookup
- k8sStore: Storage for pod count tracking
- opts: Optional configuration (e.g., WithGracePeriod for testing)
func (*Reconciler) EnqueueAdd ¶ added in v1.6.1
func (r *Reconciler) EnqueueAdd(networkName, ip string)
EnqueueAdd enqueues a pod IP addition event for processing. It uses non-blocking send to prevent blocking pod watcher event handlers. If the buffer is full, the event is dropped and logged (overflow handling).
func (*Reconciler) EnqueueRemoveWithGrace ¶ added in v1.6.1
func (r *Reconciler) EnqueueRemoveWithGrace(networkName, ip string)
EnqueueRemoveWithGrace enqueues a pod IP removal event with a grace period. The grace period (configurable via WithGracePeriod, default 30s) allows connections to drain during pod restarts. If a grace period already exists for this IP, it is canceled and replaced with a new one (handles rapid pod restarts).
IMPORTANT: The IP is immediately removed from podIPs so the main controller's reconciliation won't re-add it to the BPF map. The actual BPF map removal happens after the grace period expires.
func (*Reconciler) GetBPFUpdateCounts ¶ added in v1.6.1
func (r *Reconciler) GetBPFUpdateCounts() (success map[string]int64, failure map[string]int64)
GetBPFUpdateCounts returns success/failure counts per network for metrics (Phase 51)
func (*Reconciler) GetPodCount ¶ added in v1.6.1
func (r *Reconciler) GetPodCount(networkName string) int
GetPodCount returns the number of pod IPs tracked for a network (Phase 51)
func (*Reconciler) GetPodIPs ¶ added in v1.6.1
func (r *Reconciler) GetPodIPs(networkName string) []string
GetPodIPs returns all pod IPs tracked for a network. This is used by the main controller during reconciliation to preserve K8s pod IPs.
func (*Reconciler) GetQueueOverflows ¶ added in v1.6.1
func (r *Reconciler) GetQueueOverflows() int64
GetQueueOverflows returns total queue overflow events for metrics (Phase 51)
func (*Reconciler) SetLogger ¶ added in v1.6.1
func (r *Reconciler) SetLogger(logger logr.Logger)
SetLogger sets the structured logger for the reconciler
func (*Reconciler) Start ¶ added in v1.6.1
func (r *Reconciler) Start(ctx context.Context)
Start launches worker goroutines to process events from the workqueue. It also starts the drainBuffer goroutine to move events from the bounded channel to the workqueue.
func (*Reconciler) Stop ¶ added in v1.6.1
func (r *Reconciler) Stop()
Stop shuts down the reconciler gracefully. It closes the enqueue buffer, shuts down the workqueue, and cancels all active grace period timers.
type ReconcilerOption ¶ added in v1.6.1
type ReconcilerOption func(*Reconciler)
ReconcilerOption is a function that configures a Reconciler.
func WithGracePeriod ¶ added in v1.6.1
func WithGracePeriod(d time.Duration) ReconcilerOption
WithGracePeriod sets a custom grace period duration for IP removal. This is useful for testing where shorter grace periods speed up test execution.
type SelectorRequirement ¶ added in v1.6.1
type SelectorRequirement struct {
Key string // Label key (e.g., "app", "env", "tier")
Operator string // Operator: "=", "!=", "in", "notin", "exists", "!exists"
Values []string // Values for the requirement (empty for exists/!exists operators)
}
SelectorRequirement represents a single label selector requirement from the UI. It provides a structured way to build Kubernetes label selectors without raw string manipulation.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage provides CRUD operations for Kubernetes integration credentials using NATS KV
func NewStorage ¶
NewStorage creates a new Storage instance with NATS KV persistence It creates or updates the "neuwerk-integrations" bucket with FileStorage and Replicas=1
func (*Storage) Create ¶
func (s *Storage) Create(ctx context.Context, req *CreateIntegrationRequest) (*Integration, error)
Create stores a new integration in NATS KV It generates a new UUID for the integration and checks for key conflicts
func (*Storage) Delete ¶
Delete removes an integration from NATS KV It verifies the integration exists before deleting
func (*Storage) DeletePodMetadata ¶ added in v1.6.1
DeletePodMetadata removes pod count and last sync metadata for a network Called when a K8s-managed network is deleted
func (*Storage) GetLastSync ¶ added in v1.6.1
GetLastSync returns the last sync timestamp for a network Returns zero time if network hasn't been synced yet
func (*Storage) GetPodCount ¶ added in v1.6.1
GetPodCount returns the current pod count for a network Returns zero if network has no pods or hasn't been synced yet
func (*Storage) List ¶
func (s *Storage) List(ctx context.Context) ([]*Integration, error)
List retrieves all integrations with the kubernetes: prefix from NATS KV It skips entries that fail to unmarshal (partial list return for resilience)
func (*Storage) Update ¶
func (s *Storage) Update(ctx context.Context, id string, req *UpdateIntegrationRequest) (*Integration, error)
Update modifies an existing integration in NATS KV It verifies the integration exists before updating
func (*Storage) UpdatePodCount ¶ added in v1.6.1
UpdatePodCount updates the pod count for a network and sets last sync timestamp Called by reconcile method after successful BPF update
type TestConnectionRequest ¶ added in v1.6.1
type TestConnectionRequest struct {
// Endpoint is the HTTPS URL of the Kubernetes API server (e.g., https://cluster.example.com:6443)
Endpoint string `json:"endpoint"`
// CACert is the CA certificate in PEM format for verifying the API server's TLS certificate
CACert string `json:"ca_cert"`
// BearerToken is the service account token for authenticating with the API server
BearerToken string `json:"bearer_token"`
}
TestConnectionRequest represents the payload for testing Kubernetes API connectivity Used by POST /api/v1/integrations/kubernetes/test
type TestConnectionResponse ¶ added in v1.6.1
type TestConnectionResponse struct {
// Status is either "success" or "error"
Status string `json:"status"`
// ServerVersion is the K8s server version (populated on success, from discovery API)
ServerVersion string `json:"server_version,omitempty"`
// Phase identifies which validation phase failed: "reachability", "certificate", "authentication" (populated on error)
Phase string `json:"phase,omitempty"`
// Message provides detailed error message (populated on error)
Message string `json:"message,omitempty"`
}
TestConnectionResponse represents the response from testing Kubernetes API connectivity
type UpdateIntegrationRequest ¶
type UpdateIntegrationRequest struct {
// Name is the human-readable name for this integration (optional - omitted to preserve existing)
Name string `json:"name,omitempty"`
// Endpoint is the HTTPS URL of the Kubernetes API server (optional - omitted to preserve existing)
Endpoint string `json:"endpoint,omitempty"`
// CACert is the CA certificate in PEM format (optional - omitted to preserve existing)
CACert string `json:"ca_cert,omitempty"`
// BearerToken is the service account token (optional - nil pointer preserves existing)
BearerToken *string `json:"bearer_token,omitempty"`
// Status is the connection status (optional - set by handlers after validation or HealthMonitor)
Status *IntegrationStatus `json:"status,omitempty"`
// LastError is the last validation error (optional - set by handlers after validation)
LastError *string `json:"last_error,omitempty"`
// LastErrorPhase is the validation phase that failed (optional - set by handlers after validation)
LastErrorPhase *string `json:"last_error_phase,omitempty"`
// LastSync is the last successful sync timestamp (optional - set by HealthMonitor, Phase 48)
LastSync *time.Time `json:"last_sync,omitempty"`
}
UpdateIntegrationRequest represents the payload for updating an existing integration
type ValidationError ¶
ValidationError represents a validation error with field context Follows the pattern from pkg/api/validation.go
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type ValidationPhase ¶
type ValidationPhase string
ValidationPhase identifies which stage of connection validation failed
const ( PhaseReachability ValidationPhase = "reachability" PhaseCertificate ValidationPhase = "certificate" PhaseAuthentication ValidationPhase = "authentication" )
type WatcherManager ¶ added in v1.6.1
type WatcherManager struct {
// contains filtered or unexported fields
}
WatcherManager coordinates PodWatcher lifecycles across multiple networks. It manages one PodWatcher per network with IntegrationID, starting watchers when networks are created, stopping them when deleted, and restarting when K8s fields change.
Thread-safe for concurrent network CRUD operations.
func NewWatcherManager ¶ added in v1.6.1
func NewWatcherManager(integrationStore *Storage, ruleProvider ruleset.RuleProvider) *WatcherManager
NewWatcherManager creates a new WatcherManager for coordinating PodWatchers. integrationStore is used to lookup Integration credentials by ID when starting watchers. ruleProvider is used to find all networks using an integration (for OnCredentialUpdate, Phase 48).
func (*WatcherManager) GetPodIPsForNetwork ¶ added in v1.6.1
func (m *WatcherManager) GetPodIPsForNetwork(networkName string) []string
GetPodIPsForNetwork returns the pod IPs tracked by the K8s reconciler for a network. This is used by the main controller during BPF map reconciliation to preserve K8s pod IPs. Returns nil if the reconciler is not set or the network has no tracked pods.
func (*WatcherManager) OnCredentialUpdate ¶ added in v1.6.1
func (m *WatcherManager) OnCredentialUpdate(ctx context.Context, integrationID string) error
OnCredentialUpdate restarts watchers affected by integration credential changes. Stops and recreates watchers to achieve: 1. Fresh resync (clear cache, list+watch from scratch) 2. Reset exponential backoff (new Reflector with no retry history) Per CONTEXT.md: "Credential update resets backoff and triggers immediate reconnect"
This method is called from the API UpdateIntegration handler when credentials change. Watcher restart failures are logged but don't fail the API call - credentials were updated successfully in storage.
func (*WatcherManager) OnNetworkCreated ¶ added in v1.6.1
func (m *WatcherManager) OnNetworkCreated(network ruleset.Network) error
OnNetworkCreated starts a new PodWatcher for a network when it's created. If the network has no IntegrationID, it's a static CIDR network and no watcher is needed.
This method is called by APIRuleProvider after successful network creation.
func (*WatcherManager) OnNetworkDeleted ¶ added in v1.6.1
func (m *WatcherManager) OnNetworkDeleted(network ruleset.Network) error
OnNetworkDeleted stops and removes the watcher for a deleted network. If the network has no watcher (static CIDR), this is a no-op.
This method is called by APIRuleProvider after successful network deletion.
func (*WatcherManager) OnNetworkUpdated ¶ added in v1.6.1
func (m *WatcherManager) OnNetworkUpdated(oldNetwork, newNetwork ruleset.Network) error
OnNetworkUpdated handles network updates by detecting K8s field changes. If IntegrationID, Namespace, or LabelSelector changed, the watcher is restarted. If only non-K8s fields changed (e.g., CIDR, policies), no action is needed.
This method is called by APIRuleProvider after successful network update.
func (*WatcherManager) SetLogger ¶ added in v1.6.1
func (m *WatcherManager) SetLogger(logger logr.Logger)
SetLogger configures the logger for this manager
func (*WatcherManager) SetReconciler ¶ added in v1.6.1
func (m *WatcherManager) SetReconciler(r *Reconciler)
SetReconciler configures the reconciler for BPF map updates. This must be called after NewWatcherManager() but before Start() to wire pod watcher callbacks to the reconciler.
func (*WatcherManager) Start ¶ added in v1.6.1
Start initializes the WatcherManager and starts watchers for existing networks. It creates a cancellable context for all watchers and starts a watcher for each network that has IntegrationID set.
Returns error if any watcher fails to start.
func (*WatcherManager) Stop ¶ added in v1.6.1
func (m *WatcherManager) Stop()
Stop shuts down all watchers and cancels the shared context. This method should be called during Controller shutdown.