k8s

package
v1.6.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// DefaultGracePeriod is the default delay before removing pod IPs from BPF maps.
	// Allows connections to drain during pod restarts (30 seconds per CONTEXT.md).
	// Can be overridden with WithGracePeriod option for testing.
	DefaultGracePeriod = 30 * time.Second
)

Variables

View Source
var (
	// PodsTrackedDesc tracks the number of Kubernetes pods currently tracked in BPF maps
	PodsTrackedDesc = prometheus.NewDesc(
		"neuwerk_k8s_pods_tracked",
		"Number of Kubernetes pods currently tracked in BPF maps",
		[]string{"integration_id", "namespace", "network_name"},
		nil,
	)

	// ReconnectionsTotalDesc counts watch reconnection events
	ReconnectionsTotalDesc = prometheus.NewDesc(
		"neuwerk_k8s_watch_reconnections_total",
		"Total number of Kubernetes watch reconnection events",
		[]string{"integration_id"},
		nil,
	)

	// ConnectionStatusDesc indicates connection health (0=disconnected/pending, 1=degraded, 2=connected)
	ConnectionStatusDesc = prometheus.NewDesc(
		"neuwerk_k8s_connection_status",
		"Kubernetes connection status (0=disconnected/pending, 1=degraded, 2=connected)",
		[]string{"integration_id"},
		nil,
	)

	// LastSyncTimestampDesc records Unix timestamp of last successful cache sync
	LastSyncTimestampDesc = prometheus.NewDesc(
		"neuwerk_k8s_last_sync_timestamp_seconds",
		"Unix timestamp of last successful cache sync",
		[]string{"integration_id"},
		nil,
	)

	// BPFUpdatesTotalDesc counts BPF map updates from pod events
	BPFUpdatesTotalDesc = prometheus.NewDesc(
		"neuwerk_k8s_bpf_updates_total",
		"Total number of BPF map updates from Kubernetes pod events",
		[]string{"integration_id", "network_name", "result"},
		nil,
	)

	// QueueDepthDesc tracks reconciler workqueue depth
	QueueDepthDesc = prometheus.NewDesc(
		"neuwerk_k8s_queue_depth",
		"Number of items in the reconciler workqueue",
		nil,
		nil,
	)

	// QueueOverflowsTotalDesc counts events dropped due to queue overflow
	QueueOverflowsTotalDesc = prometheus.NewDesc(
		"neuwerk_k8s_queue_overflows_total",
		"Total number of events dropped due to queue overflow",
		nil,
		nil,
	)
)

Metric descriptors for Kubernetes integration monitoring (Phase 51)

Functions

func BuildLabelSelector added in v1.6.1

func BuildLabelSelector(requirements []SelectorRequirement) (string, error)

BuildLabelSelector converts structured UI requirements into a Kubernetes label selector string. It validates operators and requirements, returning an error if any requirement is invalid.

Supported operators:

  • "=" or "==" : Equality (key must equal value)
  • "!=" : Inequality (key must not equal value)
  • "in" : Set membership (key must be in values list)
  • "notin" : Set exclusion (key must not be in values list)
  • "exists" : Existence (key must exist, values must be empty)
  • "!exists" : Non-existence (key must not exist, values must be empty)

Empty requirements return an empty string (matches all pods). Multiple requirements are combined with AND logic (comma-separated).

Example:

requirements := []SelectorRequirement{
    {Key: "app", Operator: "=", Values: []string{"backend"}},
    {Key: "env", Operator: "in", Values: []string{"prod", "staging"}},
}
selector, err := BuildLabelSelector(requirements)
// Returns: "app=backend,env in (prod,staging)"

func NewRestConfig

func NewRestConfig(endpoint, caCert, token string) (*rest.Config, error)

NewRestConfig creates a REST config from integration credentials with rate limiting It builds an in-memory kubeconfig without file I/O Configures 50 QPS / 100 burst to prevent throttling in multi-cluster setups

func TestConnection

func TestConnection(ctx context.Context, endpoint, caCert, token string) error

TestConnection validates K8s API server reachability by attempting to list namespaces DEPRECATED: Use ValidateConnection() for phase-labeled error detection It creates a REST config, establishes a clientset, and performs a minimal API call

func ValidateIntegration

func ValidateIntegration(name, endpoint, caCert, bearerToken string) error

ValidateIntegration validates integration credentials before storage It enforces HTTPS endpoints, PEM certificate format, and X.509 validity

Types

type ConnectionValidationError

type ConnectionValidationError struct {
	Phase   ValidationPhase
	Message string
}

ConnectionValidationError represents a connection validation failure with phase context

func ValidateConnection

func ValidateConnection(ctx context.Context, endpoint, caCert, token string) *ConnectionValidationError

ValidateConnection performs sequential validation: certificate → reachability → authentication Returns ConnectionValidationError with phase label on failure, nil on success Uses discovery.ServerVersion() for lightweight server check (no RBAC permissions required)

func (*ConnectionValidationError) Error

func (e *ConnectionValidationError) Error() string

type CreateIntegrationRequest

type CreateIntegrationRequest struct {
	// Name is the human-readable name for this integration
	Name string `json:"name"`
	// Endpoint is the HTTPS URL of the Kubernetes API server
	Endpoint string `json:"endpoint"`
	// CACert is the CA certificate in PEM format
	CACert string `json:"ca_cert"`
	// BearerToken is the service account token
	BearerToken string `json:"bearer_token"`
}

CreateIntegrationRequest represents the payload for creating a new integration

type HealthMonitor added in v1.6.1

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor tracks connection health for a Kubernetes integration Implements three-state machine: Connected → Degraded → Disconnected Following 48-RESEARCH.md Pattern 2 & 3: WatchErrorHandler and state transitions

func NewHealthMonitor added in v1.6.1

func NewHealthMonitor(integrationID string, logger logr.Logger, storage *Storage, clientset kubernetes.Interface) *HealthMonitor

NewHealthMonitor creates a new health monitor for a Kubernetes integration Status starts as StatusPending until first successful sync storage: The Storage instance for persisting health status to NATS KV (Phase 48) clientset: The Kubernetes client for active health checks (can be nil to disable)

func (*HealthMonitor) GetIntegrationID added in v1.6.1

func (h *HealthMonitor) GetIntegrationID() string

GetIntegrationID returns the integration ID for metrics labeling (Phase 51)

func (*HealthMonitor) GetReconnectCount added in v1.6.1

func (h *HealthMonitor) GetReconnectCount() int64

GetReconnectCount returns total reconnection events for metrics (Phase 51)

func (*HealthMonitor) GetStatus added in v1.6.1

func (h *HealthMonitor) GetStatus() (IntegrationStatus, time.Time)

GetStatus returns the current status and last successful sync time (thread-safe) Uses read lock for concurrent access from API handlers

func (*HealthMonitor) OnSuccessfulSync added in v1.6.1

func (h *HealthMonitor) OnSuccessfulSync()

OnSuccessfulSync is called when informer cache syncs successfully Resets error counter and transitions to Connected state (silent recovery)

func (*HealthMonitor) Start added in v1.6.1

func (h *HealthMonitor) Start(ctx context.Context)

Start begins periodic health checks for API connectivity and state transitions Runs two tickers: - 1-second ticker for active API server health checks (detects disconnection) - 30-second ticker for Degraded → Disconnected transition after 5 minutes

func (*HealthMonitor) Stop added in v1.6.1

func (h *HealthMonitor) Stop()

Stop halts the health monitor and prevents goroutine leaks

func (*HealthMonitor) WatchErrorHandler added in v1.6.1

func (h *HealthMonitor) WatchErrorHandler(r *cache.Reflector, err error)

WatchErrorHandler is called by client-go when watch connection drops Implements cache.WatchErrorHandler interface for SetWatchErrorHandler Following 48-RESEARCH.md Pattern 2: track consecutive failures for state transitions

type Integration

type Integration struct {
	// ID is the unique identifier (UUID) for this integration
	ID string `json:"id"`
	// Name is the human-readable name for this integration
	Name string `json:"name"`
	// Endpoint is the HTTPS URL of the Kubernetes API server (e.g., https://cluster.example.com:6443)
	Endpoint string `json:"endpoint"`
	// CACert is the CA certificate in PEM format for verifying the API server's TLS certificate
	CACert string `json:"ca_cert"`
	// BearerToken is the service account token for authenticating with the API server
	// SECURITY: This field is write-only - never returned in API responses (handled in Plan 02)
	BearerToken string `json:"bearer_token,omitempty"`
	// Status represents the current connection state (from HealthMonitor, Phase 48)
	Status IntegrationStatus `json:"status"`
	// CreatedAt is the timestamp when this integration was created
	CreatedAt time.Time `json:"created_at"`
	// UpdatedAt is the timestamp when this integration was last modified
	UpdatedAt time.Time `json:"updated_at"`
	// LastError stores the most recent validation error message (empty if healthy)
	// Only the message is stored, not the timestamp (per CONTEXT.md decision)
	LastError string `json:"last_error,omitempty"`
	// LastErrorPhase identifies which validation phase failed: "reachability", "certificate", "authentication"
	// Empty string indicates no error (integration is healthy)
	LastErrorPhase string `json:"last_error_phase,omitempty"`
	// LastSync is the timestamp of the last successful cache sync (Phase 48)
	// Updated by HealthMonitor.OnSuccessfulSync via persistStatus
	LastSync time.Time `json:"last_sync,omitempty"`
}

Integration represents a Kubernetes cluster integration configuration

type IntegrationStatus

type IntegrationStatus string

IntegrationStatus represents the current connection state of an integration

const (
	// StatusPending indicates integration is created but not yet verified
	StatusPending IntegrationStatus = "Pending"
	// StatusConnected indicates integration is reachable and healthy
	StatusConnected IntegrationStatus = "Connected"
	// StatusDegraded indicates integration has intermittent connectivity issues
	StatusDegraded IntegrationStatus = "Degraded"
	// StatusDisconnected indicates integration is unreachable
	StatusDisconnected IntegrationStatus = "Disconnected"
)

type K8sMetricsCollector added in v1.6.1

type K8sMetricsCollector struct {
	// contains filtered or unexported fields
}

K8sMetricsCollector implements prometheus.Collector for Kubernetes integration metrics. It follows the custom collector pattern from pkg/metrics/metrics.go, computing metrics on-demand during scrape from in-memory state (WatcherManager and Reconciler).

func NewK8sMetricsCollector added in v1.6.1

func NewK8sMetricsCollector(manager *WatcherManager, storage *Storage) *K8sMetricsCollector

NewK8sMetricsCollector creates a new Kubernetes metrics collector. Parameters:

  • manager: WatcherManager for accessing watchers and reconciler state
  • storage: Storage for accessing integration metadata

func (*K8sMetricsCollector) Collect added in v1.6.1

func (c *K8sMetricsCollector) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector interface. Computes metrics on-demand during Prometheus scrape from current state.

func (*K8sMetricsCollector) Describe added in v1.6.1

func (c *K8sMetricsCollector) Describe(ch chan<- *prometheus.Desc)

Describe implements prometheus.Collector interface. Uses DescribeByCollect helper (neuwerk pattern) to generate descriptions from Collect.

type PodInfo added in v1.6.1

type PodInfo struct {
	// Name is the pod name
	Name string `json:"name"`
	// IP is the pod IP address
	IP string `json:"ip"`
}

PodInfo represents basic information about a pod

type PodPreviewResponse added in v1.6.1

type PodPreviewResponse struct {
	// Pods is the list of matching pods (filtered to show only pods with IPs)
	Pods []PodInfo `json:"pods"`
	// TotalCount is the total number of pods matching the selector (before truncation)
	TotalCount int `json:"total_count"`
	// Truncated indicates if the result was truncated (showing sample, not all pods)
	Truncated bool `json:"truncated"`
}

PodPreviewResponse represents the response for pod preview endpoint Used by GET /api/v1/integrations/kubernetes/{id}/pods

type PodWatcher added in v1.6.1

type PodWatcher struct {
	// contains filtered or unexported fields
}

PodWatcher watches pods in a Kubernetes cluster and extracts pod IPs in real-time It uses SharedInformerFactory for efficient caching and namespace/label scoping Following RESEARCH.md Pattern 1: Complete Pod Watcher Setup

func NewPodWatcher added in v1.6.1

func NewPodWatcher(clientset *kubernetes.Clientset, namespace, labelSelector, integrationID string, storage *Storage) *PodWatcher

NewPodWatcher creates a new PodWatcher for the given namespace and label selector Accepts clientset from Phase 45 (created via NewRestConfig + kubernetes.NewForConfig) namespace: The Kubernetes namespace to watch (single namespace per watcher) labelSelector: Label selector string (e.g., "app=backend,env in (prod,staging)") integrationID: The ID of the integration this watcher belongs to (for health monitoring, Phase 48) storage: The Storage instance for persisting health status (Phase 48)

func (*PodWatcher) SetLogger added in v1.6.1

func (w *PodWatcher) SetLogger(logger logr.Logger)

SetLogger configures the logger for this watcher

func (*PodWatcher) SetOnPodIPAdded added in v1.6.1

func (w *PodWatcher) SetOnPodIPAdded(fn func(ip string))

SetOnPodIPAdded sets the callback invoked when a pod IP is added Callback is invoked for each IP when a pod is created or gets a new IP

func (*PodWatcher) SetOnPodIPRemoved added in v1.6.1

func (w *PodWatcher) SetOnPodIPRemoved(fn func(ip string))

SetOnPodIPRemoved sets the callback invoked when a pod IP is removed Callback is invoked for each IP when a pod is deleted or loses an IP

func (*PodWatcher) Start added in v1.6.1

func (w *PodWatcher) Start(ctx context.Context) error

Start initializes the SharedInformerFactory and begins watching pods This method blocks until the initial cache sync completes Returns error if cache sync fails

Implementation sequence follows RESEARCH.md patterns to avoid pitfalls: 1. Create factory with namespace and label selector options (Pattern 1) 2. Get pod informer 3. Register event handlers (Pattern 3) 4. CRITICAL: Start factory BEFORE WaitForCacheSync (Pitfall 3) 5. Wait for cache sync (prevents incomplete firewall rules)

func (*PodWatcher) Stop added in v1.6.1

func (w *PodWatcher) Stop()

Stop gracefully shuts down the watcher and prevents goroutine leaks (Pitfall 1)

type ReconcileEvent added in v1.6.1

type ReconcileEvent struct {
	// NetworkName is the network this event applies to
	NetworkName string
	// Operation is the type of operation (add or remove)
	Operation ReconcileOp
	// IP is the pod IP address to add or remove
	IP string
}

ReconcileEvent represents a BPF map update operation from pod events

type ReconcileOp added in v1.6.1

type ReconcileOp string

ReconcileOp represents the type of reconciliation operation

const (
	// OpAddIP adds a pod IP to the BPF map
	OpAddIP ReconcileOp = "add"
	// OpRemoveIP removes a pod IP from the BPF map
	OpRemoveIP ReconcileOp = "remove"
)

type Reconciler added in v1.6.1

type Reconciler struct {
	// contains filtered or unexported fields
}

Reconciler manages asynchronous BPF map updates from pod events using workqueue pattern. It decouples pod watcher event handlers from BPF operations to prevent blocking, provides rate limiting and exponential backoff for failed updates, and implements grace periods for connection draining during pod restarts.

func NewReconciler added in v1.6.1

func NewReconciler(bpfCollection *bpf.Collection, ruleProvider ruleset.RuleProvider, k8sStore *Storage, opts ...ReconcilerOption) *Reconciler

NewReconciler creates a new Reconciler with workqueue initialization. It uses DefaultTypedControllerRateLimiter for exponential backoff and bounded queue depth. Parameters:

  • bpfCollection: BPF maps for policy updates
  • ruleProvider: Current ruleset for network lookup
  • k8sStore: Storage for pod count tracking
  • opts: Optional configuration (e.g., WithGracePeriod for testing)

func (*Reconciler) EnqueueAdd added in v1.6.1

func (r *Reconciler) EnqueueAdd(networkName, ip string)

EnqueueAdd enqueues a pod IP addition event for processing. It uses non-blocking send to prevent blocking pod watcher event handlers. If the buffer is full, the event is dropped and logged (overflow handling).

func (*Reconciler) EnqueueRemoveWithGrace added in v1.6.1

func (r *Reconciler) EnqueueRemoveWithGrace(networkName, ip string)

EnqueueRemoveWithGrace enqueues a pod IP removal event with a grace period. The grace period (configurable via WithGracePeriod, default 30s) allows connections to drain during pod restarts. If a grace period already exists for this IP, it is canceled and replaced with a new one (handles rapid pod restarts).

IMPORTANT: The IP is immediately removed from podIPs so the main controller's reconciliation won't re-add it to the BPF map. The actual BPF map removal happens after the grace period expires.

func (*Reconciler) GetBPFUpdateCounts added in v1.6.1

func (r *Reconciler) GetBPFUpdateCounts() (success map[string]int64, failure map[string]int64)

GetBPFUpdateCounts returns success/failure counts per network for metrics (Phase 51)

func (*Reconciler) GetPodCount added in v1.6.1

func (r *Reconciler) GetPodCount(networkName string) int

GetPodCount returns the number of pod IPs tracked for a network (Phase 51)

func (*Reconciler) GetPodIPs added in v1.6.1

func (r *Reconciler) GetPodIPs(networkName string) []string

GetPodIPs returns all pod IPs tracked for a network. This is used by the main controller during reconciliation to preserve K8s pod IPs.

func (*Reconciler) GetQueueOverflows added in v1.6.1

func (r *Reconciler) GetQueueOverflows() int64

GetQueueOverflows returns total queue overflow events for metrics (Phase 51)

func (*Reconciler) SetLogger added in v1.6.1

func (r *Reconciler) SetLogger(logger logr.Logger)

SetLogger sets the structured logger for the reconciler

func (*Reconciler) Start added in v1.6.1

func (r *Reconciler) Start(ctx context.Context)

Start launches worker goroutines to process events from the workqueue. It also starts the drainBuffer goroutine to move events from the bounded channel to the workqueue.

func (*Reconciler) Stop added in v1.6.1

func (r *Reconciler) Stop()

Stop shuts down the reconciler gracefully. It closes the enqueue buffer, shuts down the workqueue, and cancels all active grace period timers.

type ReconcilerOption added in v1.6.1

type ReconcilerOption func(*Reconciler)

ReconcilerOption is a function that configures a Reconciler.

func WithGracePeriod added in v1.6.1

func WithGracePeriod(d time.Duration) ReconcilerOption

WithGracePeriod sets a custom grace period duration for IP removal. This is useful for testing where shorter grace periods speed up test execution.

type SelectorRequirement added in v1.6.1

type SelectorRequirement struct {
	Key      string   // Label key (e.g., "app", "env", "tier")
	Operator string   // Operator: "=", "!=", "in", "notin", "exists", "!exists"
	Values   []string // Values for the requirement (empty for exists/!exists operators)
}

SelectorRequirement represents a single label selector requirement from the UI. It provides a structured way to build Kubernetes label selectors without raw string manipulation.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage provides CRUD operations for Kubernetes integration credentials using NATS KV

func NewStorage

func NewStorage(js jetstream.JetStream) (*Storage, error)

NewStorage creates a new Storage instance with NATS KV persistence It creates or updates the "neuwerk-integrations" bucket with FileStorage and Replicas=1

func (*Storage) Create

Create stores a new integration in NATS KV It generates a new UUID for the integration and checks for key conflicts

func (*Storage) Delete

func (s *Storage) Delete(ctx context.Context, id string) error

Delete removes an integration from NATS KV It verifies the integration exists before deleting

func (*Storage) DeletePodMetadata added in v1.6.1

func (s *Storage) DeletePodMetadata(networkName string)

DeletePodMetadata removes pod count and last sync metadata for a network Called when a K8s-managed network is deleted

func (*Storage) Get

func (s *Storage) Get(ctx context.Context, id string) (*Integration, error)

Get retrieves an integration by ID from NATS KV

func (*Storage) GetLastSync added in v1.6.1

func (s *Storage) GetLastSync(networkName string) time.Time

GetLastSync returns the last sync timestamp for a network Returns zero time if network hasn't been synced yet

func (*Storage) GetPodCount added in v1.6.1

func (s *Storage) GetPodCount(networkName string) int

GetPodCount returns the current pod count for a network Returns zero if network has no pods or hasn't been synced yet

func (*Storage) List

func (s *Storage) List(ctx context.Context) ([]*Integration, error)

List retrieves all integrations with the kubernetes: prefix from NATS KV It skips entries that fail to unmarshal (partial list return for resilience)

func (*Storage) Update

Update modifies an existing integration in NATS KV It verifies the integration exists before updating

func (*Storage) UpdatePodCount added in v1.6.1

func (s *Storage) UpdatePodCount(networkName string, count int)

UpdatePodCount updates the pod count for a network and sets last sync timestamp Called by reconcile method after successful BPF update

type TestConnectionRequest added in v1.6.1

type TestConnectionRequest struct {
	// Endpoint is the HTTPS URL of the Kubernetes API server (e.g., https://cluster.example.com:6443)
	Endpoint string `json:"endpoint"`
	// CACert is the CA certificate in PEM format for verifying the API server's TLS certificate
	CACert string `json:"ca_cert"`
	// BearerToken is the service account token for authenticating with the API server
	BearerToken string `json:"bearer_token"`
}

TestConnectionRequest represents the payload for testing Kubernetes API connectivity Used by POST /api/v1/integrations/kubernetes/test

type TestConnectionResponse added in v1.6.1

type TestConnectionResponse struct {
	// Status is either "success" or "error"
	Status string `json:"status"`
	// ServerVersion is the K8s server version (populated on success, from discovery API)
	ServerVersion string `json:"server_version,omitempty"`
	// Phase identifies which validation phase failed: "reachability", "certificate", "authentication" (populated on error)
	Phase string `json:"phase,omitempty"`
	// Message provides detailed error message (populated on error)
	Message string `json:"message,omitempty"`
}

TestConnectionResponse represents the response from testing Kubernetes API connectivity

type UpdateIntegrationRequest

type UpdateIntegrationRequest struct {
	// Name is the human-readable name for this integration (optional - omitted to preserve existing)
	Name string `json:"name,omitempty"`
	// Endpoint is the HTTPS URL of the Kubernetes API server (optional - omitted to preserve existing)
	Endpoint string `json:"endpoint,omitempty"`
	// CACert is the CA certificate in PEM format (optional - omitted to preserve existing)
	CACert string `json:"ca_cert,omitempty"`
	// BearerToken is the service account token (optional - nil pointer preserves existing)
	BearerToken *string `json:"bearer_token,omitempty"`
	// Status is the connection status (optional - set by handlers after validation or HealthMonitor)
	Status *IntegrationStatus `json:"status,omitempty"`
	// LastError is the last validation error (optional - set by handlers after validation)
	LastError *string `json:"last_error,omitempty"`
	// LastErrorPhase is the validation phase that failed (optional - set by handlers after validation)
	LastErrorPhase *string `json:"last_error_phase,omitempty"`
	// LastSync is the last successful sync timestamp (optional - set by HealthMonitor, Phase 48)
	LastSync *time.Time `json:"last_sync,omitempty"`
}

UpdateIntegrationRequest represents the payload for updating an existing integration

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a validation error with field context Follows the pattern from pkg/api/validation.go

func (*ValidationError) Error

func (e *ValidationError) Error() string

type ValidationPhase

type ValidationPhase string

ValidationPhase identifies which stage of connection validation failed

const (
	PhaseReachability   ValidationPhase = "reachability"
	PhaseCertificate    ValidationPhase = "certificate"
	PhaseAuthentication ValidationPhase = "authentication"
)

type WatcherManager added in v1.6.1

type WatcherManager struct {
	// contains filtered or unexported fields
}

WatcherManager coordinates PodWatcher lifecycles across multiple networks. It manages one PodWatcher per network with IntegrationID, starting watchers when networks are created, stopping them when deleted, and restarting when K8s fields change.

Thread-safe for concurrent network CRUD operations.

func NewWatcherManager added in v1.6.1

func NewWatcherManager(integrationStore *Storage, ruleProvider ruleset.RuleProvider) *WatcherManager

NewWatcherManager creates a new WatcherManager for coordinating PodWatchers. integrationStore is used to lookup Integration credentials by ID when starting watchers. ruleProvider is used to find all networks using an integration (for OnCredentialUpdate, Phase 48).

func (*WatcherManager) GetPodIPsForNetwork added in v1.6.1

func (m *WatcherManager) GetPodIPsForNetwork(networkName string) []string

GetPodIPsForNetwork returns the pod IPs tracked by the K8s reconciler for a network. This is used by the main controller during BPF map reconciliation to preserve K8s pod IPs. Returns nil if the reconciler is not set or the network has no tracked pods.

func (*WatcherManager) OnCredentialUpdate added in v1.6.1

func (m *WatcherManager) OnCredentialUpdate(ctx context.Context, integrationID string) error

OnCredentialUpdate restarts watchers affected by integration credential changes. Stops and recreates watchers to achieve: 1. Fresh resync (clear cache, list+watch from scratch) 2. Reset exponential backoff (new Reflector with no retry history) Per CONTEXT.md: "Credential update resets backoff and triggers immediate reconnect"

This method is called from the API UpdateIntegration handler when credentials change. Watcher restart failures are logged but don't fail the API call - credentials were updated successfully in storage.

func (*WatcherManager) OnNetworkCreated added in v1.6.1

func (m *WatcherManager) OnNetworkCreated(network ruleset.Network) error

OnNetworkCreated starts a new PodWatcher for a network when it's created. If the network has no IntegrationID, it's a static CIDR network and no watcher is needed.

This method is called by APIRuleProvider after successful network creation.

func (*WatcherManager) OnNetworkDeleted added in v1.6.1

func (m *WatcherManager) OnNetworkDeleted(network ruleset.Network) error

OnNetworkDeleted stops and removes the watcher for a deleted network. If the network has no watcher (static CIDR), this is a no-op.

This method is called by APIRuleProvider after successful network deletion.

func (*WatcherManager) OnNetworkUpdated added in v1.6.1

func (m *WatcherManager) OnNetworkUpdated(oldNetwork, newNetwork ruleset.Network) error

OnNetworkUpdated handles network updates by detecting K8s field changes. If IntegrationID, Namespace, or LabelSelector changed, the watcher is restarted. If only non-K8s fields changed (e.g., CIDR, policies), no action is needed.

This method is called by APIRuleProvider after successful network update.

func (*WatcherManager) SetLogger added in v1.6.1

func (m *WatcherManager) SetLogger(logger logr.Logger)

SetLogger configures the logger for this manager

func (*WatcherManager) SetReconciler added in v1.6.1

func (m *WatcherManager) SetReconciler(r *Reconciler)

SetReconciler configures the reconciler for BPF map updates. This must be called after NewWatcherManager() but before Start() to wire pod watcher callbacks to the reconciler.

func (*WatcherManager) Start added in v1.6.1

func (m *WatcherManager) Start(ctx context.Context, networks []ruleset.Network) error

Start initializes the WatcherManager and starts watchers for existing networks. It creates a cancellable context for all watchers and starts a watcher for each network that has IntegrationID set.

Returns error if any watcher fails to start.

func (*WatcherManager) Stop added in v1.6.1

func (m *WatcherManager) Stop()

Stop shuts down all watchers and cancels the shared context. This method should be called during Controller shutdown.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL