storage

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: MIT Imports: 4 Imported by: 0

README

Governance Storage

This package provides pluggable storage backends for the governance library. You can choose between in-memory storage (default) or persistent database storage (MySQL, PostgreSQL, MongoDB).

Storage Interface

All storage implementations conform to the RegistryStore interface:

type RegistryStore interface {
    // Service operations
    SaveService(ctx context.Context, service *models.ServiceInfo) error
    GetService(ctx context.Context, key string) (*models.ServiceInfo, error)
    GetServicesByName(ctx context.Context, serviceName string) ([]*models.ServiceInfo, error)
    GetAllServices(ctx context.Context) ([]*models.ServiceInfo, error)
    DeleteService(ctx context.Context, key string) error
    UpdateHealthStatus(ctx context.Context, key string, status models.ServiceStatus, timestamp time.Time) error

    // Subscription operations
    AddSubscription(ctx context.Context, subscriberKey string, serviceGroup string) error
    RemoveSubscription(ctx context.Context, subscriberKey string, serviceGroup string) error
    RemoveAllSubscriptions(ctx context.Context, subscriberKey string) error
    GetSubscribers(ctx context.Context, serviceGroup string) ([]string, error)
    GetSubscriberServices(ctx context.Context, serviceGroup string) ([]*models.ServiceInfo, error)

    // Lifecycle operations
    Close() error
    Ping(ctx context.Context) error
}

Available Storage Backends

1. In-Memory Storage (Default)

Fast, lock-free in-memory storage. Data is lost when the manager stops.

import (
    "github.com/chronnie/governance/manager"
    "github.com/chronnie/governance/models"
)

// In-memory storage is used by default
mgr := manager.NewManager(config)
2. MySQL Storage

Persistent storage using MySQL database.

Setup:

CREATE DATABASE governance;

Usage:

import (
    "github.com/chronnie/governance/manager"
    "github.com/chronnie/governance/storage/mysql"
)

mysqlConfig := mysql.Config{
    Host:     "localhost",
    Port:     3306,
    Database: "governance",
    Username: "root",
    Password: "password",
    MaxOpenConns:    25,
    MaxIdleConns:    5,
    ConnMaxLifetime: 5 * time.Minute,
}

store, err := mysql.NewMySQLStore(mysqlConfig)
if err != nil {
    log.Fatal(err)
}

mgr := manager.NewManagerWithStorage(config, store)

Tables Created:

  • services - Stores service registration data
  • subscriptions - Stores pub/sub relationships
3. PostgreSQL Storage

Persistent storage using PostgreSQL database.

Setup:

CREATE DATABASE governance;

Usage:

import (
    "github.com/chronnie/governance/manager"
    "github.com/chronnie/governance/storage/postgres"
)

postgresConfig := postgres.Config{
    Host:     "localhost",
    Port:     5432,
    Database: "governance",
    Username: "postgres",
    Password: "password",
    SSLMode:  "disable", // or "require", "verify-ca", "verify-full"
    MaxOpenConns:    25,
    MaxIdleConns:    5,
    ConnMaxLifetime: 5 * time.Minute,
}

store, err := postgres.NewPostgreSQLStore(postgresConfig)
if err != nil {
    log.Fatal(err)
}

mgr := manager.NewManagerWithStorage(config, store)

Tables Created:

  • services - Stores service registration data (with JSONB for flexible data)
  • subscriptions - Stores pub/sub relationships
4. MongoDB Storage

Persistent storage using MongoDB.

Setup:

MongoDB will automatically create the database and collections.

Usage:

import (
    "github.com/chronnie/governance/manager"
    "github.com/chronnie/governance/storage/mongodb"
)

mongoConfig := mongodb.Config{
    URI:            "mongodb://localhost:27017",
    Database:       "governance",
    ConnectTimeout: 10 * time.Second,
    MaxPoolSize: 100,
    MinPoolSize: 10,
}

store, err := mongodb.NewMongoDBStore(mongoConfig)
if err != nil {
    log.Fatal(err)
}

mgr := manager.NewManagerWithStorage(config, store)

Collections Created:

  • services - Stores service registration data
  • subscriptions - Stores pub/sub relationships

Indexes:

  • services.service_name - For fast service group queries
  • services.status - For health status filtering
  • subscriptions.service_group - For subscriber lookups
  • subscriptions.subscriber_key + service_group - Unique constraint

Querying Service Pods

The manager provides convenient methods to query pods by service group:

// Get all pods for a specific service
pods := mgr.GetServicePods("user-service")
for _, pod := range pods {
    fmt.Printf("Pod: %s, IP: %s, Status: %s\n",
        pod.PodName, pod.Providers[0].IP, pod.Status)
}

// Get all services and their pods
allServicePods := mgr.GetAllServicePods()
for serviceName, pods := range allServicePods {
    fmt.Printf("Service: %s has %d pods\n", serviceName, len(pods))
}

Storage Performance Considerations

In-Memory
  • Pros: Fastest performance, no network latency
  • Cons: Data lost on restart, single-node only
  • Use Case: Development, testing, ephemeral environments
MySQL/PostgreSQL
  • Pros: ACID transactions, strong consistency, mature ecosystem
  • Cons: Requires schema management, slightly slower than NoSQL
  • Use Case: Production systems requiring strong consistency
MongoDB
  • Pros: Flexible schema, horizontal scaling, fast writes
  • Cons: Eventual consistency in clustered setups
  • Use Case: High-throughput systems, multi-datacenter deployments

Connection Pool Settings

All database backends support connection pooling:

config := mysql.Config{
    MaxOpenConns:    25,    // Maximum number of open connections
    MaxIdleConns:    5,     // Maximum number of idle connections
    ConnMaxLifetime: 5 * time.Minute, // Connection lifetime
}

Recommended settings:

  • Development: MaxOpenConns=10, MaxIdleConns=2
  • Production (small): MaxOpenConns=25, MaxIdleConns=5
  • Production (large): MaxOpenConns=100, MaxIdleConns=10

Error Handling

Storage operations may fail. The governance library handles errors gracefully:

  • Failed writes log errors but don't crash the manager
  • Failed reads return empty results
  • Connection errors are logged during manager stop

For production systems, monitor storage health using the Ping() method:

ctx := context.Background()
if err := store.Ping(ctx); err != nil {
    log.Printf("Storage unhealthy: %v", err)
}

Examples

See the examples/ directory for complete working examples:

  • examples/mysql_example/ - MySQL storage example
  • examples/postgresql_example/ - PostgreSQL storage example
  • examples/mongodb_example/ - MongoDB storage example
  • examples/query_pods_example/ - Querying pods by service group

Custom Storage Implementation

You can implement your own storage backend by implementing the RegistryStore interface:

type MyCustomStore struct {
    // Your storage implementation
}

func (s *MyCustomStore) SaveService(ctx context.Context, service *models.ServiceInfo) error {
    // Implementation
}

// Implement all other interface methods...

// Use with manager
store := &MyCustomStore{}
mgr := manager.NewManagerWithStorage(config, store)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DatabaseStore

type DatabaseStore interface {

	// SaveService stores or updates a service entry in the database
	SaveService(ctx context.Context, service *models.ServiceInfo) error

	// GetService retrieves a single service by its composite key (serviceName:podName)
	GetService(ctx context.Context, key string) (*models.ServiceInfo, error)

	// GetAllServices retrieves all registered services from database
	// Used during reconciliation to sync with cache
	GetAllServices(ctx context.Context) ([]*models.ServiceInfo, error)

	// DeleteService removes a service entry by its composite key
	DeleteService(ctx context.Context, key string) error

	// UpdateHealthStatus updates the health status and last check timestamp
	UpdateHealthStatus(ctx context.Context, key string, status models.ServiceStatus, timestamp time.Time) error

	// SaveSubscriptions saves all subscriptions for a service
	// This replaces all existing subscriptions for the given subscriber
	SaveSubscriptions(ctx context.Context, subscriberKey string, subscriptions []string) error

	// GetSubscriptions retrieves all service groups that a subscriber is subscribed to
	GetSubscriptions(ctx context.Context, subscriberKey string) ([]string, error)

	// GetAllSubscriptions retrieves all subscription relationships from database
	// Used during reconciliation to sync with cache
	// Returns map[subscriberKey][]serviceGroups
	GetAllSubscriptions(ctx context.Context) (map[string][]string, error)

	// DeleteSubscriptions removes all subscriptions for a subscriber
	DeleteSubscriptions(ctx context.Context, subscriberKey string) error

	// Close closes the database connection and cleans up resources
	Close() error

	// Ping checks if the database is accessible
	Ping(ctx context.Context) error
}

DatabaseStore defines the interface for database persistence layer. This is simpler than RegistryStore - it's only for persistence, not for runtime queries. The in-memory cache handles all runtime operations.

type DualStore

type DualStore struct {
	// contains filtered or unexported fields
}

DualStore combines in-memory cache with optional database persistence. All reads/writes go to memory for performance. Database writes happen asynchronously (fire-and-forget).

func NewDualStore

func NewDualStore(db DatabaseStore) *DualStore

NewDualStore creates a new dual-layer storage. If db is nil, only in-memory cache is used (no persistence).

func (*DualStore) AddSubscription

func (d *DualStore) AddSubscription(ctx context.Context, subscriberKey string, serviceGroup string) error

AddSubscription adds to cache immediately, then persists to database asynchronously

func (*DualStore) Close

func (d *DualStore) Close() error

Close closes the database connection (cache doesn't need closing)

func (*DualStore) DeleteService

func (d *DualStore) DeleteService(ctx context.Context, key string) error

DeleteService deletes from cache immediately, then from database asynchronously

func (*DualStore) GetAllServices

func (d *DualStore) GetAllServices(ctx context.Context) ([]*models.ServiceInfo, error)

GetAllServices retrieves from cache (fast)

func (*DualStore) GetDatabase

func (d *DualStore) GetDatabase() DatabaseStore

GetDatabase returns the underlying database store (may be nil)

func (*DualStore) GetService

func (d *DualStore) GetService(ctx context.Context, key string) (*models.ServiceInfo, error)

GetService retrieves from cache (fast)

func (*DualStore) GetServicesByName

func (d *DualStore) GetServicesByName(ctx context.Context, serviceName string) ([]*models.ServiceInfo, error)

GetServicesByName retrieves from cache (fast)

func (*DualStore) GetSubscriberServices

func (d *DualStore) GetSubscriberServices(ctx context.Context, serviceGroup string) ([]*models.ServiceInfo, error)

GetSubscriberServices retrieves from cache (fast)

func (*DualStore) GetSubscribers

func (d *DualStore) GetSubscribers(ctx context.Context, serviceGroup string) ([]string, error)

GetSubscribers retrieves from cache (fast)

func (*DualStore) Ping

func (d *DualStore) Ping(ctx context.Context) error

Ping checks database health (cache is always healthy)

func (*DualStore) RemoveAllSubscriptions

func (d *DualStore) RemoveAllSubscriptions(ctx context.Context, subscriberKey string) error

RemoveAllSubscriptions removes from cache immediately, then from database asynchronously

func (*DualStore) RemoveSubscription

func (d *DualStore) RemoveSubscription(ctx context.Context, subscriberKey string, serviceGroup string) error

RemoveSubscription removes from cache immediately, then from database asynchronously

func (*DualStore) SaveService

func (d *DualStore) SaveService(ctx context.Context, service *models.ServiceInfo) error

SaveService stores to cache immediately, then persists to database asynchronously

func (*DualStore) SyncFromDatabase

func (d *DualStore) SyncFromDatabase(ctx context.Context) (servicesSynced int, subscriptionsSynced int, err error)

SyncFromDatabase loads all data from database into cache. This is called during reconciliation to ensure cache and database are in sync. Returns the number of services and subscriptions synced.

func (*DualStore) SyncToDatabase

func (d *DualStore) SyncToDatabase(ctx context.Context) error

SyncToDatabase writes all cache data to database. This is useful for initial database population or full sync.

func (*DualStore) UpdateHealthStatus

func (d *DualStore) UpdateHealthStatus(ctx context.Context, key string, status models.ServiceStatus, timestamp time.Time) error

UpdateHealthStatus updates cache immediately, then database asynchronously

type RegistryStore

type RegistryStore interface {

	// SaveService stores or updates a service entry
	SaveService(ctx context.Context, service *models.ServiceInfo) error

	// GetService retrieves a single service by its composite key (serviceName:podName)
	GetService(ctx context.Context, key string) (*models.ServiceInfo, error)

	// GetServicesByName retrieves all pods for a given service name
	GetServicesByName(ctx context.Context, serviceName string) ([]*models.ServiceInfo, error)

	// GetAllServices retrieves all registered services across all service groups
	GetAllServices(ctx context.Context) ([]*models.ServiceInfo, error)

	// DeleteService removes a service entry by its composite key
	DeleteService(ctx context.Context, key string) error

	// UpdateHealthStatus updates the health status and last check timestamp for a service
	UpdateHealthStatus(ctx context.Context, key string, status models.ServiceStatus, timestamp time.Time) error

	// AddSubscription adds a subscriber to a service group
	// subscriberKey is the composite key (serviceName:podName) of the subscriber
	// serviceGroup is the name of the service being subscribed to
	AddSubscription(ctx context.Context, subscriberKey string, serviceGroup string) error

	// RemoveSubscription removes a subscriber from a service group
	RemoveSubscription(ctx context.Context, subscriberKey string, serviceGroup string) error

	// RemoveAllSubscriptions removes all subscriptions for a given subscriber
	RemoveAllSubscriptions(ctx context.Context, subscriberKey string) error

	// GetSubscribers returns all subscriber keys for a given service group
	GetSubscribers(ctx context.Context, serviceGroup string) ([]string, error)

	// GetSubscriberServices returns full ServiceInfo objects for all subscribers of a service group
	GetSubscriberServices(ctx context.Context, serviceGroup string) ([]*models.ServiceInfo, error)

	// Close closes the storage connection and cleans up resources
	Close() error

	// Ping checks if the storage backend is accessible
	Ping(ctx context.Context) error
}

RegistryStore defines the interface for persisting service registry data. Implementations can use different storage backends (memory, MySQL, PostgreSQL, MongoDB, etc.)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL