store

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

README

Resource Storage Implementations

Overview

This package provides storage backends for indexed Kubernetes resources. The controller uses stores to maintain fast-access collections of watched resources for template rendering. You choose between two storage strategies depending on your resource access patterns and memory constraints.

When to use this package:

  • Building custom resource watchers that need indexed storage
  • Implementing resource caching strategies
  • Optimizing memory usage for large resource collections
  • Creating high-performance resource lookup mechanisms

The package offers two complementary store types:

  • MemoryStore: Complete in-memory storage for fast access and iteration
  • CachedStore: Reference-based storage with on-demand API fetching and TTL caching

Both implement the types.Store interface, allowing transparent switching between storage strategies.

Features

  • Multiple Storage Strategies: Choose between full memory or cached on-demand
  • Indexed Lookups: O(1) access using composite keys
  • Non-Unique Keys: Multiple resources can share the same index key
  • Thread-Safe: Concurrent access from multiple goroutines
  • Field Filtering: Integration with indexer for memory optimization
  • TTL Caching: Automatic cache expiration in CachedStore

Quick Start

Memory Store
package main

import (
    "haptic/pkg/k8s/store"
)

func main() {
    // Create memory store with 2 index keys (namespace, name)
    memStore := store.NewMemoryStore(2)

    // Add resource with index keys
    resource := map[string]interface{}{
        "metadata": map[string]interface{}{
            "namespace": "default",
            "name":      "my-ingress",
        },
        "spec": map[string]interface{}{
            "rules": []interface{}{/* ... */},
        },
    }

    keys := []string{"default", "my-ingress"}
    memStore.Add(resource, keys)

    // Retrieve by keys
    resources, _ := memStore.Get("default", "my-ingress")
    // resources contains [resource]

    // Retrieve all in namespace
    resources, _ = memStore.Get("default")
    // resources contains all resources in "default" namespace
}
Cached Store
package main

import (
    "time"
    "haptic/pkg/k8s/store"
    "haptic/pkg/k8s/indexer"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
)

func main() {
    // Setup (assumes you have client and indexer)
    cfg := &store.CachedStoreConfig{
        NumKeys:   2,
        CacheTTL:  10 * time.Minute,
        Client:    dynamicClient,
        GVR: schema.GroupVersionResource{
            Group:    "",
            Version:  "v1",
            Resource: "secrets",
        },
        Namespace: "",  // All namespaces
        Indexer:   myIndexer,
    }

    cachedStore, _ := store.NewCachedStore(cfg)

    // Add reference (from watcher)
    keys := []string{"default", "my-secret"}
    cachedStore.Add(nil, keys)  // Only stores reference, not resource

    // Get triggers API fetch and caching
    resources, _ := cachedStore.Get("default", "my-secret")
    // First call: fetches from API
    // Subsequent calls within TTL: returns cached

    // Clear cache when needed
    cachedStore.ClearCache()
}

Storage Strategy Comparison

Aspect MemoryStore CachedStore
Storage Full resources in memory Only references + TTL cache
Lookup Speed O(1), instant O(1) on cache hit, API latency on miss
Memory Usage ~1KB per resource (varies by type) Minimal (refs) + bounded cache
API Calls Initial list only Initial list + fetch on cache miss
Best For Iterating all resources Selective access to resources
Template Usage resources.<type> (list all) resources.<type>.Fetch(ns, name)
Typical Resources Ingress, Service, EndpointSlice Secret, ConfigMap with large data

MemoryStore

Overview

MemoryStore keeps complete resource objects in memory after field filtering. This provides instant access for template rendering at the cost of higher memory usage.

How It Works
1. Watcher receives resource from Kubernetes API
2. Indexer extracts index keys (e.g., ["default", "my-ingress"])
3. Indexer filters unnecessary fields (managedFields, etc.)
4. MemoryStore stores complete resource at composite key "default/my-ingress"
5. Template accesses resource via Get("default", "my-ingress") → instant return
API Reference
Creating a MemoryStore
func NewMemoryStore(numKeys int) *MemoryStore

Parameters:

  • numKeys: Number of index keys (must match indexer configuration)

Example:

// For indexing by [namespace, name]
store := store.NewMemoryStore(2)

// For indexing by [namespace, name, label]
store := store.NewMemoryStore(3)
Adding Resources
func (s *MemoryStore) Add(resource interface{}, keys []string) error

Stores a resource with the given index keys.

Parameters:

  • resource: The resource object (typically *unstructured.Unstructured or map)
  • keys: Index key values extracted from the resource

Returns: Error if key count doesn't match numKeys

Example:

keys := []string{"default", "my-ingress"}
err := store.Add(ingressResource, keys)
Retrieving Resources
func (s *MemoryStore) Get(keys ...string) ([]interface{}, error)

Retrieves resources matching the provided keys. Supports partial key matching.

Parameters:

  • keys: One or more index keys to match

Returns:

  • Slice of matching resources
  • Error if too many keys provided

Examples:

// Get specific resource (all keys)
resources, _ := store.Get("default", "my-ingress")
// Returns: resources with namespace=default AND name=my-ingress

// Get all in namespace (partial keys)
resources, _ := store.Get("default")
// Returns: all resources with namespace=default

// Get all resources (no keys)
resources, _ := store.List()
Updating Resources
func (s *MemoryStore) Update(resource interface{}, keys []string) error

Updates an existing resource. If not found, adds it.

Example:

keys := []string{"default", "my-ingress"}
err := store.Update(updatedResource, keys)
Deleting Resources
func (s *MemoryStore) Delete(keys ...string) error

Removes resources matching the keys.

Example:

// Delete specific resource
err := store.Delete("default", "my-ingress")
Listing All Resources
func (s *MemoryStore) List() ([]interface{}, error)

Returns all resources in the store.

Example:

allResources, _ := store.List()
for _, res := range allResources {
    // Process each resource
}
Clearing Store
func (s *MemoryStore) Clear() error

Removes all resources from the store.

Memory Usage

Approximate memory per resource (after field filtering):

  • Ingress: 1-2 KB
  • Service: 1-5 KB (depends on endpoints)
  • EndpointSlice: 2-5 KB
  • ConfigMap: 1 KB + data size
  • Secret: 1 KB + data size

Example calculation:

1000 Ingress × 1.5 KB = 1.5 MB
500 Services × 2 KB = 1 MB
2000 EndpointSlices × 3 KB = 6 MB
Total: ~8.5 MB for 3500 resources
When to Use MemoryStore

Use MemoryStore when:

  • You iterate over most/all resources during template rendering
  • Template uses {% for ingress in resources.ingresses %}
  • Fast template rendering is critical
  • Resource count is reasonable (< 10,000 per type)
  • Resources are small to medium size

CachedStore

Overview

CachedStore stores only resource references (namespace + name + index keys) in memory and fetches complete resources from the Kubernetes API on demand. Fetched resources are cached with a TTL to reduce API calls.

How It Works
1. Watcher receives resource from Kubernetes API
2. Indexer extracts index keys (e.g., ["default", "my-secret"])
3. CachedStore stores reference {namespace: "default", name: "my-secret", keys: [...]
4. Template calls Fetch("default", "my-secret")
5. CachedStore checks TTL cache
   - Cache hit → return cached resource
   - Cache miss → fetch from API, cache with TTL, return
API Reference
Creating a CachedStore
func NewCachedStore(cfg *CachedStoreConfig) (*CachedStore, error)

Configuration:

type CachedStoreConfig struct {
    NumKeys   int                         // Number of index keys
    CacheTTL  time.Duration               // Cache entry TTL
    Client    dynamic.Interface           // Kubernetes client
    GVR       schema.GroupVersionResource // Resource type
    Namespace string                      // Namespace filter (empty = all)
    Indexer   *indexer.Indexer            // Field filter
    Logger    *slog.Logger                // Optional logger
}

Example:

cfg := &store.CachedStoreConfig{
    NumKeys:  2,
    CacheTTL: 10 * time.Minute,
    Client:   dynamicClient,
    GVR: schema.GroupVersionResource{
        Group:    "",
        Version:  "v1",
        Resource: "secrets",
    },
    Namespace: "",
    Indexer:   indexer.New([]string{"metadata.namespace", "metadata.name"}, nil),
}

store, err := store.NewCachedStore(cfg)
Adding References
func (s *CachedStore) Add(resource interface{}, keys []string) error

Stores a resource reference. The resource parameter is typically nil since only keys matter.

Example:

// Add reference (from watcher)
keys := []string{"default", "tls-cert"}
err := store.Add(nil, keys)
Fetching Resources
func (s *CachedStore) Get(keys ...string) ([]interface{}, error)

Fetches resources matching keys. Triggers API fetch on cache miss.

Behavior:

  1. Finds matching references by keys
  2. For each reference:
    • Check TTL cache using "namespace/name" key
    • Cache hit: return cached resource
    • Cache miss: fetch from API, cache with TTL
  3. Return all fetched resources

Example:

// Fetch specific secret
resources, err := store.Get("default", "tls-cert")
// First call: API fetch + cache
// Calls within TTL: cached return

// Fetch all secrets in namespace
resources, err := store.Get("default")
// Fetches each matching secret individually
Cache Management
func (s *CachedStore) ClearCache() error

Clears the TTL cache, forcing fresh fetches.

Example:

// Force fresh fetch on next Get()
store.ClearCache()
func (s *CachedStore) GetCacheStats() (hits, misses int)

Returns cache hit/miss statistics for monitoring.

Memory Usage

CachedStore memory usage is bounded:

Base memory = (reference count) × 200 bytes
Cache memory = (cached entries) × (resource size)
Total = Base + min(Cache, MaxCacheSize × AvgResourceSize)

Example:

1000 Secret references × 200 bytes = 200 KB
Cache: 100 entries × 5 KB = 500 KB
Total: ~700 KB (vs 5 MB for MemoryStore)
When to Use CachedStore

Use CachedStore when:

  • Resources are large (Secrets with certificates, ConfigMaps with big data)
  • Template accesses only a few specific resources (not iteration)
  • Memory is constrained
  • Resources change infrequently (TTL helps)
  • Template uses resources.<type>.Fetch(namespace, name)
TTL Configuration

Choose TTL based on access patterns:

  • Short TTL (1-5 min): Frequently changing resources
  • Medium TTL (10-30 min): Moderate change rate
  • Long TTL (1 hour+): Rarely changing resources

Trade-off: Longer TTL = less API load but potentially stale data

Error Handling

Both stores return StoreError for operation failures:

type StoreError struct {
    Operation string   // "add", "get", "update", "delete"
    Keys      []string // Index keys involved
    Err       error    // Underlying error
}

Example:

resources, err := store.Get("default", "missing")
if err != nil {
    var storeErr *store.StoreError
    if errors.As(err, &storeErr) {
        log.Error("store operation failed",
            "operation", storeErr.Operation,
            "keys", storeErr.Keys,
            "error", storeErr.Err)
    }
}

Integration with Watcher

Stores are typically created and managed by watchers:

// In pkg/k8s/watcher
config := k8s.WatcherConfig{
    GVR: ingressGVR,
    IndexBy: []string{"metadata.namespace", "metadata.name"},
    StoreType: k8s.StoreTypeMemory,  // or StoreTypeCached
    // ...
}

watcher := watcher.NewWatcher(client, config)
// Watcher creates appropriate store internally

See pkg/k8s/watcher for integration details.

Performance Characteristics

MemoryStore Performance
  • Add/Update: O(1) amortized
  • Get (all keys): O(1)
  • Get (partial keys): O(N) where N = matching resources
  • List: O(N) where N = total resources
  • Delete: O(1)
CachedStore Performance
  • Add/Update: O(1)
  • Get (cache hit): O(1)
  • Get (cache miss): O(1) + API latency (~10-50ms)
  • List: Not recommended (fetches each individually)
  • Delete: O(1)

Thread Safety

Both stores use sync.RWMutex for concurrent access:

// Safe to call from multiple goroutines
go func() {
    store.Add(resource1, keys1)
}()

go func() {
    resources, _ := store.Get("default")
}()

Read operations (Get, List) use read locks, allowing concurrent reads. Write operations (Add, Update, Delete) use write locks, blocking all access.

Best Practices

Choosing Store Type
// Use MemoryStore for:
watched_resources:
  ingresses:
    store: full  # Iterate in templates
  services:
    store: full  # Frequently accessed
  endpoints:
    store: full  # Small, many accesses

// Use CachedStore for:
  secrets:
    store: on-demand  # Large, selective access
    cache_ttl: 15m
  configmaps:
    store: on-demand  # Potentially large data
    cache_ttl: 10m
Index Key Selection

Choose index keys based on access patterns:

// Common pattern: namespace + name
index_by:
  - metadata.namespace
  - metadata.name

// Advanced: namespace + label
index_by:
  - metadata.namespace
  - metadata.labels['app']
Memory Optimization

For MemoryStore, filter unnecessary fields:

// In indexer configuration
remove_fields:
  - metadata.managedFields
  - metadata.annotations['kubectl.kubernetes.io/last-applied-configuration']

See pkg/k8s/indexer for field filtering details.

Testing

Unit Testing with MemoryStore
func TestMemoryStore(t *testing.T) {
    store := store.NewMemoryStore(2)

    resource := map[string]interface{}{
        "metadata": map[string]interface{}{
            "namespace": "default",
            "name":      "test",
        },
    }

    err := store.Add(resource, []string{"default", "test"})
    require.NoError(t, err)

    resources, err := store.Get("default", "test")
    require.NoError(t, err)
    assert.Len(t, resources, 1)
}
Testing CachedStore
func TestCachedStore(t *testing.T) {
    fakeClient := fake.NewSimpleDynamicClient(runtime.NewScheme())

    cfg := &store.CachedStoreConfig{
        NumKeys:  2,
        CacheTTL: 1 * time.Minute,
        Client:   fakeClient,
        GVR:      secretGVR,
        Indexer:  indexer.New([]string{"metadata.namespace", "metadata.name"}, nil),
    }

    store, err := store.NewCachedStore(cfg)
    require.NoError(t, err)

    // Add reference
    err = store.Add(nil, []string{"default", "my-secret"})
    require.NoError(t, err)

    // Create resource in fake client
    secret := &v1.Secret{
        ObjectMeta: metav1.ObjectMeta{
            Namespace: "default",
            Name:      "my-secret",
        },
    }
    fakeClient.Resource(secretGVR).Namespace("default").Create(ctx, toUnstructured(secret), metav1.CreateOptions{})

    // Fetch should succeed
    resources, err := store.Get("default", "my-secret")
    require.NoError(t, err)
    assert.Len(t, resources, 1)
}

See Also

Documentation

Overview

Package store provides storage implementations for indexed Kubernetes resources.

This package offers two store types: - Memory store: Fast in-memory storage with complete resources - Cached store: Memory-efficient storage with API-backed retrieval and caching

Index

Constants

View Source
const DefaultMaxCacheSize = 256

DefaultMaxCacheSize is the default maximum number of entries in the LRU cache.

Variables

This section is empty.

Functions

This section is empty.

Types

type CachedStore

type CachedStore struct {
	// contains filtered or unexported fields
}

CachedStore stores only resource references in memory and fetches resources from the Kubernetes API on access. Fetched resources are cached with a TTL.

This reduces memory usage for large resources (e.g., Secrets) at the cost of API latency on cache misses.

Supports non-unique index keys by storing multiple resource references per composite key.

Thread-safe for concurrent access.

func NewCachedStore

func NewCachedStore(cfg *CachedStoreConfig) (*CachedStore, error)

NewCachedStore creates a new API-backed store with caching.

func (*CachedStore) Add

func (s *CachedStore) Add(resource any, keys []string) error

Add inserts a new resource into the store.

func (*CachedStore) CacheSize

func (s *CachedStore) CacheSize() int

CacheSize returns the number of cached resources.

func (*CachedStore) Clear

func (s *CachedStore) Clear() error

Clear removes all resources from the store.

func (*CachedStore) Delete

func (s *CachedStore) Delete(keys ...string) error

Delete removes a resource from the store. NOTE: With non-unique index keys, this removes ALL resources matching the provided keys.

func (*CachedStore) EvictExpired

func (s *CachedStore) EvictExpired() int

EvictExpired removes expired cache entries.

func (*CachedStore) Get

func (s *CachedStore) Get(keys ...string) ([]any, error)

Get retrieves all resources matching the provided index keys.

func (*CachedStore) List

func (s *CachedStore) List() ([]any, error)

List returns all resources in the store.

func (*CachedStore) ModCount

func (s *CachedStore) ModCount() (uint64, bool)

ModCount returns the modification counter and whether tracking is supported. The counter is incremented on every mutation (Add, Update, Delete, Clear). This enables external caching layers to detect store changes without polling.

func (*CachedStore) Size

func (s *CachedStore) Size() int

Size returns the number of tracked resources in the store.

func (*CachedStore) Update

func (s *CachedStore) Update(resource any, keys []string) error

Update modifies an existing resource or adds it if it doesn't exist.

type CachedStoreConfig

type CachedStoreConfig struct {
	// NumKeys is the number of index keys (must match indexer configuration)
	NumKeys int

	// CacheTTL is the cache entry time-to-live
	CacheTTL time.Duration

	// MaxCacheSize is the maximum number of entries in the LRU cache.
	// When exceeded, the least recently used entry is evicted.
	// Default: 256
	MaxCacheSize int

	// Client is the Kubernetes dynamic client for fetching resources
	Client dynamic.Interface

	// GVR identifies the resource type to fetch
	GVR schema.GroupVersionResource

	// Namespace restricts fetching to a specific namespace (empty = all namespaces)
	Namespace string

	// Indexer processes fetched resources (field filtering)
	Indexer *indexer.Indexer

	// Logger for debug and warning messages (optional, uses slog.Default if nil)
	Logger *slog.Logger
}

CachedStoreConfig configures a CachedStore.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore stores complete Kubernetes resources in memory using nested maps.

This provides O(1) lookup performance at the cost of higher memory usage. Resources are stored with their full specification after field filtering.

Supports non-unique index keys by storing multiple resources per composite key.

Thread-safe for concurrent access.

Immutability Contract

Resources stored in MemoryStore are pre-converted (floats to ints) at storage time and MUST NOT be mutated by callers. The slices returned by Get() are direct references to internal data structures for performance. Callers MUST NOT:

  • Modify elements of returned slices
  • Append to or reslice returned slices
  • Modify fields within returned resources

Note: List() returns a fresh slice copy for thread safety, but the resource objects within are still references to internal data and must not be mutated.

func NewMemoryStore

func NewMemoryStore(numKeys int) *MemoryStore

NewMemoryStore creates a new memory-backed store.

Parameters:

  • numKeys: Number of index keys (must match indexer configuration)

func (*MemoryStore) Add

func (s *MemoryStore) Add(resource any, keys []string) error

Add inserts a new resource into the store. If resources with the same index keys already exist, the new resource is appended. The slice is kept sorted by namespace/name for deterministic Get() results.

func (*MemoryStore) Clear

func (s *MemoryStore) Clear() error

Clear removes all resources from the store.

func (*MemoryStore) Delete

func (s *MemoryStore) Delete(keys ...string) error

Delete removes a resource from the store. NOTE: With non-unique index keys, this method cannot identify which specific resource to delete when multiple resources have the same index keys. It removes ALL resources matching the provided keys. The watcher should call this with the resource's actual namespace+name as the index keys to delete a specific resource.

func (*MemoryStore) Get

func (s *MemoryStore) Get(keys ...string) ([]any, error)

Get retrieves all resources matching the provided index keys.

Returns a direct reference to the internal slice for exact key matches. Callers MUST NOT modify the returned slice or its elements (see Immutability Contract).

For partial key matches, a new slice is constructed from matching entries and sorted for deterministic order.

func (*MemoryStore) List

func (s *MemoryStore) List() ([]any, error)

List returns all resources in the store. Returns a fresh copy of all resources to avoid race conditions.

func (*MemoryStore) ModCount

func (s *MemoryStore) ModCount() (uint64, bool)

ModCount returns the modification counter and whether tracking is supported. The counter is incremented on every mutation (Add, Update, Delete, Clear). This enables external caching layers to detect store changes without polling.

func (*MemoryStore) Size

func (s *MemoryStore) Size() int

Size returns the number of resources in the store.

func (*MemoryStore) Update

func (s *MemoryStore) Update(resource any, keys []string) error

Update modifies an existing resource or adds it if it doesn't exist. For non-unique index keys, it finds the resource by namespace+name and replaces it. The slice is kept sorted by namespace/name for deterministic Get() results.

type StoreError

type StoreError struct {
	Operation string
	Keys      []string
	Cause     error
}

StoreError represents a generic store operation error.

func (*StoreError) Error

func (e *StoreError) Error() string

func (*StoreError) Unwrap

func (e *StoreError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL