README
¶
pkg/k8s
Kubernetes resource watching and indexing package for HAPTIC.
Overview
The pkg/k8s package provides a high-level interface for watching Kubernetes resources with:
- Indexed Storage: O(1) lookups using configurable JSONPath expressions
- Field Filtering: Remove unnecessary fields to reduce memory usage
- Initial Sync Tracking: Distinguish bulk loading from real-time changes
- Debounced Callbacks: Batch rapid changes to prevent overwhelming consumers
- Two Store Types: Memory (fast) or Cached (memory-efficient with API-backed fetches)
- Thread-Safe: Safe for concurrent access from multiple goroutines
Features
- Watch any Kubernetes resource type using GroupVersionResource (GVR)
- Filter by namespace and label selectors
- Index resources using JSONPath expressions for fast lookups
- Remove unnecessary fields (e.g.,
metadata.managedFields) before storage - Track initial synchronization with callbacks and status methods
- Debounce rapid resource changes with configurable intervals
- Choose between memory and cached storage strategies
- Thread-safe operations with proper locking
Installation
import (
"haptic/pkg/k8s/client"
"haptic/pkg/k8s/types"
"haptic/pkg/k8s/watcher"
)
Quick Start
Watching Collections (Multiple Resources)
package main
import (
"context"
"fmt"
"time"
"haptic/pkg/k8s/client"
"haptic/pkg/k8s/types"
"haptic/pkg/k8s/watcher"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func main() {
// Create Kubernetes client
k8sClient, err := client.New(client.Config{})
if err != nil {
panic(err)
}
// Configure watcher for multiple ingresses
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
},
IndexBy: []string{"metadata.namespace", "metadata.name"},
IgnoreFields: []string{"metadata.managedFields"},
DebounceInterval: 500 * time.Millisecond,
OnChange: func(store types.Store, stats types.ChangeStats) {
fmt.Printf("Resources changed: +%d -%d ~%d\n",
stats.Created, stats.Deleted, stats.Modified)
},
}
// Create and start watcher
w, err := watcher.New(cfg, k8sClient)
if err != nil {
panic(err)
}
ctx := context.Background()
if err := w.Start(ctx); err != nil {
panic(err)
}
}
Watching Single Resource
package main
import (
"context"
"fmt"
"haptic/pkg/k8s/client"
"haptic/pkg/k8s/types"
"haptic/pkg/k8s/watcher"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func main() {
// Create Kubernetes client
k8sClient, err := client.New(client.Config{})
if err != nil {
panic(err)
}
// Configure watcher for specific ConfigMap
cfg := types.SingleWatcherConfig{
GVR: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "configmaps",
},
Namespace: "default",
Name: "haproxy-config",
OnChange: func(obj interface{}) error {
cm := obj.(*corev1.ConfigMap)
fmt.Printf("ConfigMap changed: %s\n", cm.Name)
// Parse and validate configuration
return nil
},
}
// Create and start watcher
w, err := watcher.NewSingle(cfg, k8sClient)
if err != nil {
panic(err)
}
ctx := context.Background()
if err := w.Start(ctx); err != nil {
panic(err)
}
}
Choosing Between Watchers
The package provides two types of watchers for different use cases:
Watcher (Collections)
Use watcher.New() when watching multiple resources (collections):
Best for:
- Many ingresses across namespaces
- All services in a cluster
- Multiple endpoint slices
- Any collection where you need O(1) lookup by index keys
Features:
- Indexed storage for fast lookups
- Debouncing to batch rapid changes
- Field filtering to reduce memory
- Aggregated change statistics
Example use cases:
- Watching all Ingress resources to generate HAProxy configuration
- Watching Services and EndpointSlices for load balancing
- Monitoring Pods with specific labels
SingleWatcher (Single Resource)
Use watcher.NewSingle() when watching one specific resource:
Best for:
- Configuration in a specific ConfigMap
- Credentials in a specific Secret
- A particular Deployment or Service you depend on
Features:
- No indexing overhead (only one resource)
- Immediate callbacks (no debouncing)
- Simpler API (resource object in callback)
- Lighter weight implementation
Example use cases:
- Watching ConfigMap containing HAProxy templates
- Watching Secret containing dataplane credentials
- Monitoring controller's own Deployment
Core Concepts
Store Types
The package provides two store implementations:
Memory Store (Default)
Stores complete resources in memory for fastest access.
cfg := types.WatcherConfig{
StoreType: types.StoreTypeMemory, // Default
// ... other config
}
Use when:
- Resources are small to medium sized
- Fast access is critical
- Memory is not constrained
Cached Store
Stores only keys in memory, fetches full resources from Kubernetes API on demand with TTL-based caching.
cfg := types.WatcherConfig{
StoreType: types.StoreTypeCached,
CacheTTL: 2 * time.Minute, // Cache duration
// ... other config
}
Use when:
- Resources are large (e.g., Secrets with certificate data)
- Memory usage must be minimized
- Some API latency on cache misses is acceptable
Indexing with JSONPath
Resources are indexed using JSONPath expressions for O(1) lookups:
// Standard indexing by namespace and name
IndexBy: []string{"metadata.namespace", "metadata.name"}
// Custom indexing by label for O(1) service-to-endpoints lookup
IndexBy: []string{"metadata.labels['kubernetes.io/service-name']"}
JSONPath expressions:
metadata.namespace- Resource namespacemetadata.name- Resource namemetadata.labels['key']- Label value- Any valid JSONPath expression supported by k8s.io/client-go/util/jsonpath
Lookup performance:
- Multiple index keys create composite keys:
{key1}:{key2}:... - Store.Get() performs O(1) lookup using provided keys
- Store.List() returns all resources: O(n)
Field Filtering
Remove unnecessary fields before storage to reduce memory usage:
IgnoreFields: []string{
"metadata.managedFields", // Verbose, rarely needed
"metadata.annotations", // If annotations aren't needed
}
Fields are removed using JSONPath patterns before indexing and storage.
Initial Synchronization Handling
The watcher distinguishes between initial bulk loading of pre-existing resources and real-time changes:
OnSyncComplete Callback
Called once after initial sync completes with the fully populated store:
OnSyncComplete: func(store types.Store, initialCount int) {
fmt.Printf("Initial sync complete: %d resources loaded\n", initialCount)
// Now safe to act on complete data
// Example: render HAProxy config
}
IsInitialSync Flag
ChangeStats includes IsInitialSync to distinguish bulk load from real-time changes:
OnChange: func(store types.Store, stats types.ChangeStats) {
if stats.IsInitialSync {
fmt.Println("Processing pre-existing resources during sync")
} else {
fmt.Println("Processing real-time change")
}
}
CallOnChangeDuringSync
Control whether OnChange is called during initial sync:
// Default: suppress callbacks during sync, only call OnSyncComplete
CallOnChangeDuringSync: false
// Enable: receive callbacks during sync with IsInitialSync=true
CallOnChangeDuringSync: true
WaitForSync()
Block until initial sync completes (useful for staged startup):
w, _ := watcher.New(cfg, k8sClient)
// Start watcher in background
go w.Start(ctx)
// Wait for sync before continuing
count, err := w.WaitForSync(ctx)
if err != nil {
panic(err)
}
fmt.Printf("Watcher synced: %d resources loaded\n", count)
// Now safe to proceed with operations requiring complete data
IsSynced()
Non-blocking sync status check:
if w.IsSynced() {
fmt.Println("Watcher is synced")
} else {
fmt.Println("Still syncing...")
}
Debouncing
Rapid resource changes are batched into a single callback invocation:
DebounceInterval: 500 * time.Millisecond // Default
The callback receives aggregated statistics about all changes within the debounce window.
Usage Patterns
Basic Resource Watching
Watch Ingress resources with standard indexing:
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
},
IndexBy: []string{"metadata.namespace", "metadata.name"},
IgnoreFields: []string{"metadata.managedFields"},
StoreType: types.StoreTypeMemory,
DebounceInterval: 500 * time.Millisecond,
OnChange: func(store types.Store, stats types.ChangeStats) {
ingresses, _ := store.List()
fmt.Printf("Total ingresses: %d\n", len(ingresses))
},
}
w, _ := watcher.New(cfg, k8sClient)
w.Start(ctx)
Namespaced Watch
Watch resources only in the controller's own namespace:
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
},
NamespacedWatch: true, // Auto-detect namespace from service account
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "haproxy",
"component": "loadbalancer",
},
},
IndexBy: []string{"metadata.name"},
OnChange: func(store types.Store, stats types.ChangeStats) {
fmt.Printf("HAProxy pods changed: %+v\n", stats)
},
}
w, _ := watcher.New(cfg, k8sClient)
w.Start(ctx)
Cached Store for Large Resources
Use cached store for Secrets to reduce memory usage:
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "secrets",
},
IndexBy: []string{"metadata.namespace", "metadata.name"},
StoreType: types.StoreTypeCached,
CacheTTL: 2 * time.Minute,
IgnoreFields: []string{"metadata.managedFields"},
OnChange: func(store types.Store, stats types.ChangeStats) {
// Secrets fetched from API only when accessed
secrets, _ := store.Get("default")
fmt.Printf("Secrets in default namespace: %d\n", len(secrets))
},
}
w, _ := watcher.New(cfg, k8sClient)
w.Start(ctx)
Custom Indexing for O(1) Lookups
Index EndpointSlices by service name for fast service-to-endpoints lookup:
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
// Index by service name label for O(1) lookup
IndexBy: []string{"metadata.labels['kubernetes.io/service-name']"},
OnChange: func(store types.Store, stats types.ChangeStats) {
// Fast O(1) lookup: get all endpoint slices for a service
endpointSlices, _ := store.Get("nginx-service")
fmt.Printf("Endpoints for nginx-service: %d\n", len(endpointSlices))
},
}
w, _ := watcher.New(cfg, k8sClient)
w.Start(ctx)
Sync Handling - Wait for Complete Data
Recommended pattern: suppress callbacks during sync, act only when complete:
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
},
IndexBy: []string{"metadata.namespace", "metadata.name"},
IgnoreFields: []string{"metadata.managedFields"},
// Suppress OnChange during initial sync (default)
CallOnChangeDuringSync: false,
// Called once after initial sync completes
OnSyncComplete: func(store types.Store, initialCount int) {
fmt.Printf("Initial sync complete: %d ingresses loaded\n", initialCount)
// Now safe to act on complete data
// Example: renderHAProxyConfig(store)
},
// Called only for real-time changes after sync
OnChange: func(store types.Store, stats types.ChangeStats) {
// IsInitialSync is always false here
fmt.Printf("Real-time change: +%d -%d ~%d\n",
stats.Created, stats.Deleted, stats.Modified)
// Re-render config for incremental changes
},
}
w, _ := watcher.New(cfg, k8sClient)
w.Start(ctx)
Sync Handling - Incremental Processing
Process resources incrementally during sync:
cfg := types.WatcherConfig{
GVR: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
},
IndexBy: []string{"metadata.namespace", "metadata.name"},
// Enable callbacks during initial sync
CallOnChangeDuringSync: true,
OnChange: func(store types.Store, stats types.ChangeStats) {
if stats.IsInitialSync {
// Processing pre-existing resources during bulk load
fmt.Printf("Initial sync progress: %d services loaded so far\n", stats.Created)
} else {
// Processing real-time changes
fmt.Printf("Service changed: +%d -%d ~%d\n",
stats.Created, stats.Deleted, stats.Modified)
}
// Process resources (both during sync and after)
},
OnSyncComplete: func(store types.Store, initialCount int) {
fmt.Printf("Initial sync complete: %d total services\n", initialCount)
// Mark system as "ready"
},
}
w, _ := watcher.New(cfg, k8sClient)
w.Start(ctx)
Single Resource Watching - ConfigMap
Watch a specific ConfigMap containing configuration:
cfg := types.SingleWatcherConfig{
GVR: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "configmaps",
},
Namespace: "kube-system",
Name: "haproxy-templates",
OnChange: func(obj interface{}) error {
cm := obj.(*corev1.ConfigMap)
// Parse configuration from ConfigMap data
config, err := parseConfig(cm.Data["haproxy.yaml"])
if err != nil {
return fmt.Errorf("failed to parse config: %w", err)
}
// Validate configuration
if err := validateConfig(config); err != nil {
return fmt.Errorf("invalid config: %w", err)
}
// Apply configuration (render HAProxy config, reload, etc.)
return applyConfig(config)
},
}
w, _ := watcher.NewSingle(cfg, k8sClient)
// Start watcher in background
go w.Start(context.Background())
// Wait for initial sync before proceeding
if err := w.WaitForSync(context.Background()); err != nil {
panic(err)
}
fmt.Println("ConfigMap watcher synced and ready")
Single Resource Watching - Secret
Watch a specific Secret containing credentials:
cfg := types.SingleWatcherConfig{
GVR: schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "secrets",
},
Namespace: "kube-system",
Name: "haproxy-credentials",
OnChange: func(obj interface{}) error {
secret := obj.(*corev1.Secret)
// Extract credentials from Secret data
dataplane_username := string(secret.Data["dataplane_username"])
dataplane_password := string(secret.Data["dataplane_password"])
// Validate credentials are present
if dataplane_username == "" || dataplane_password == "" {
return fmt.Errorf("missing required credentials")
}
// Update HAProxy dataplane API client with new credentials
return updateDataplaneCredentials(dataplane_username, dataplane_password)
},
}
w, _ := watcher.NewSingle(cfg, k8sClient)
w.Start(context.Background())
API Reference
SingleWatcherConfig
Configuration for watching a single named resource:
type SingleWatcherConfig struct {
// GroupVersionResource identifies the Kubernetes resource type to watch
GVR schema.GroupVersionResource
// Namespace containing the resource (required)
Namespace string
// Name of the specific resource to watch (required)
Name string
// OnChange is called when the resource changes (add, update, delete)
OnChange OnResourceChangeCallback
// Context for cancellation (default: context.Background())
Context context.Context
}
SingleWatcher Methods
// NewSingle creates a new single-resource watcher
func NewSingle(cfg SingleWatcherConfig, k8sClient *client.Client) (*SingleWatcher, error)
// Start begins watching the resource (blocks until context is cancelled)
func (w *SingleWatcher) Start(ctx context.Context) error
// Stop stops watching the resource
func (w *SingleWatcher) Stop() error
// WaitForSync blocks until initial synchronization is complete
func (w *SingleWatcher) WaitForSync(ctx context.Context) error
// IsSynced returns true if initial synchronization has completed
func (w *SingleWatcher) IsSynced() bool
SingleWatcher Callbacks
// OnResourceChangeCallback is invoked when a single watched resource changes
// Returns an error if processing fails
type OnResourceChangeCallback func(obj interface{}) error
API Reference (Bulk Watcher)
WatcherConfig
Configuration for resource watching:
type WatcherConfig struct {
// GroupVersionResource identifies the Kubernetes resource type to watch
GVR schema.GroupVersionResource
// Namespace restricts watching to a specific namespace (empty = all namespaces)
Namespace string
// NamespacedWatch restricts to controller's own namespace (auto-detected)
NamespacedWatch bool
// LabelSelector filters resources by labels
LabelSelector *metav1.LabelSelector
// IndexBy specifies JSONPath expressions for index keys
IndexBy []string
// IgnoreFields specifies JSONPath expressions for fields to remove
IgnoreFields []string
// StoreType determines storage implementation (Memory or Cached)
StoreType StoreType
// CacheTTL sets cache duration for StoreTypeCached (default: 2m10s)
CacheTTL time.Duration
// DebounceInterval sets minimum time between callbacks (default: 500ms)
DebounceInterval time.Duration
// OnChange is called when resources change (debounced)
OnChange OnChangeCallback
// OnSyncComplete is called once after initial sync completes
OnSyncComplete OnSyncCompleteCallback
// CallOnChangeDuringSync enables callbacks during initial sync (default: false)
CallOnChangeDuringSync bool
// Context for cancellation (default: context.Background())
Context context.Context
}
Store Interface
Thread-safe storage for indexed resources:
type Store interface {
// Get retrieves all resources matching the provided index keys
Get(keys ...string) ([]interface{}, error)
// List returns all resources in the store
List() ([]interface{}, error)
// Add inserts a new resource with the provided index keys
Add(resource interface{}, keys []string) error
// Update modifies an existing resource
Update(resource interface{}, keys []string) error
// Delete removes a resource using its index keys
Delete(keys ...string) error
// Clear removes all resources from the store
Clear() error
}
Watcher Methods
// New creates a new resource watcher
func New(cfg WatcherConfig, k8sClient *client.Client) (*Watcher, error)
// Start begins watching resources (blocks until context is cancelled)
func (w *Watcher) Start(ctx context.Context) error
// Stop stops watching resources
func (w *Watcher) Stop() error
// Store returns the underlying store for direct access
func (w *Watcher) Store() types.Store
// WaitForSync blocks until initial synchronization is complete
func (w *Watcher) WaitForSync(ctx context.Context) (int, error)
// IsSynced returns true if initial synchronization has completed
func (w *Watcher) IsSynced() bool
// ForceSync forces an immediate callback with current statistics
func (w *Watcher) ForceSync()
Callbacks
// OnChangeCallback is invoked when resources change (debounced)
type OnChangeCallback func(store Store, stats ChangeStats)
// OnSyncCompleteCallback is invoked once after initial sync completes
type OnSyncCompleteCallback func(store Store, initialCount int)
// ChangeStats tracks aggregated resource changes
type ChangeStats struct {
Created int // Number of resources added
Modified int // Number of resources updated
Deleted int // Number of resources removed
IsInitialSync bool // True during initial synchronization
}
Package Structure
The package is organized into focused subpackages:
- types/: Core interfaces and types
StoreinterfaceWatcherConfigconfiguration- Callback types and statistics
- client/: Kubernetes client wrapper
- Wraps kubernetes.Interface and dynamic.Interface
- Auto-detects in-cluster vs out-of-cluster configuration
- indexer/: JSONPath evaluation and field filtering
- Extracts index keys from resources
- Removes unnecessary fields
- Fail-fast validation of JSONPath expressions
- store/: Store implementations
MemoryStore: Fast in-memory storageCachedStore: Memory-efficient API-backed storage
- watcher/: Resource watching orchestration
- Uses SharedInformerFactory
- Debouncing logic
- Sync tracking and callbacks
- leaderelection/: Leader election using Kubernetes Leases
- Pure component (no event dependencies)
- See
pkg/k8s/leaderelection/README.mdfor details
Thread Safety
All store implementations are thread-safe for concurrent access:
- Multiple goroutines can safely call Get(), List(), Add(), Update(), Delete()
- Internal RWMutex ensures proper synchronization
- Callbacks are invoked serially (never concurrently)
Testing
See example_test.go for comprehensive usage examples:
- Basic resource watching
- Namespaced watch
- Cached store usage
- Custom indexing patterns
- Sync handling patterns
Run examples:
go test -v -run Example
Error Handling
The package uses fail-fast validation:
- JSONPath expressions are validated at watcher creation (not runtime)
- Invalid configuration returns error from
watcher.New() - Runtime errors (e.g., store access failures) are logged but don't crash the watcher
Performance Considerations
Memory Usage:
- Memory store: Proportional to resource count and size
- Cached store: Only keys in memory, ~O(n) where n = number of resources
- Field filtering significantly reduces memory usage
Lookup Performance:
- Store.Get() with index keys: O(1)
- Store.List(): O(n) where n = number of resources
- Debouncing reduces callback frequency during rapid changes
API Calls (Cached Store):
- Initial sync: 1 LIST call per resource type
- Real-time updates: Watch events via SharedInformer (no additional calls)
- Cache misses: GET call to Kubernetes API (TTL-based caching reduces frequency)
License
Part of haproxy-template-ingress-controller project.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package client provides a wrapper around the Kubernetes client-go library.
|
Package client provides a wrapper around the Kubernetes client-go library. |
|
Package indexer provides functionality for extracting index keys from Kubernetes resources and filtering fields based on JSONPath expressions.
|
Package indexer provides functionality for extracting index keys from Kubernetes resources and filtering fields based on JSONPath expressions. |
|
Package store provides storage implementations for indexed Kubernetes resources.
|
Package store provides storage implementations for indexed Kubernetes resources. |
|
Package types defines core interfaces and types for the k8s package.
|
Package types defines core interfaces and types for the k8s package. |
|
Package watcher provides Kubernetes resource watching with indexing, field filtering, and debounced callbacks.
|
Package watcher provides Kubernetes resource watching with indexing, field filtering, and debounced callbacks. |