storage

package
v0.0.0-...-ffcbafe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound is returned when a configuration is not found
	ErrNotFound = errors.New("configuration not found")

	// ErrPolicyNotFound is returned when a policy configuration is not found
	ErrPolicyNotFound = errors.New("policy configuration not found")

	// ErrConflict is returned when a configuration with the same name/version already exists
	ErrConflict = errors.New("configuration already exists")

	// ErrDatabaseLocked is returned when the database is locked (SQLite specific)
	ErrDatabaseLocked = errors.New("database is locked")

	// ErrDatabaseUnavailable is returned when the database storage is unavailable
	ErrDatabaseUnavailable = errors.New("database storage is unavailable")

	// ErrOperationNotAllowed is returned when an operation is not permitted
	ErrOperationNotAllowed = errors.New("operation not allowed")

	// ErrUnsupportedStorageType is returned when a storage backend type is unknown.
	ErrUnsupportedStorageType = errors.New("unsupported storage type")
)

Common storage errors - implementation agnostic

Functions

func GetCompositeKey

func GetCompositeKey(apiId, keyName string) string

GetCompositeKey generates a composite key for storing/retrieving API keys

func IsConflictError

func IsConflictError(err error) bool

IsConflictError checks if an error is a conflict error This function allows handlers to distinguish between conflict errors and other types of errors for appropriate logging and response handling

func IsDatabaseUnavailableError

func IsDatabaseUnavailableError(err error) bool

func IsNotFoundError

func IsNotFoundError(err error) bool

IsNotFoundError checks if an error is a not found error

func IsOperationNotAllowedError

func IsOperationNotAllowedError(err error) bool

IsOperationNotAllowedError checks if an error is an operation not allowed error

func IsPolicyNotFoundError

func IsPolicyNotFoundError(err error) bool

IsPolicyNotFoundError checks if an error is a policy not found error

func Key

func Key(kind, handle string) string

Key returns the store key for a RuntimeDeployConfig.

func LoadAPIKeysFromDatabase

func LoadAPIKeysFromDatabase(storage Storage, configStore *ConfigStore, apiKeyStore apiKeyStoreWriter) error

LoadAPIKeysFromDatabase loads active API keys from database into both the ConfigStore and APIKeyStore.

func LoadFromDatabase

func LoadFromDatabase(storage Storage, cache *ConfigStore) error

LoadFromDatabase loads all configurations from database into the in-memory cache.

func LoadLLMProviderTemplatesFromDatabase

func LoadLLMProviderTemplatesFromDatabase(storage Storage, cache *ConfigStore) error

LoadLLMProviderTemplatesFromDatabase loads all LLM Provider templates from database into in-memory store.

Types

type APIKeyStore

type APIKeyStore struct {
	// contains filtered or unexported fields
}

APIKeyStore manages API keys in memory with thread-safe operations

func NewAPIKeyStore

func NewAPIKeyStore(logger *slog.Logger) *APIKeyStore

NewAPIKeyStore creates a new API key store

func (*APIKeyStore) Clear

func (s *APIKeyStore) Clear()

Clear resets all in-memory API key state. Call before reloading from the database to ensure deleted keys are removed from the in-memory store.

func (*APIKeyStore) Count

func (s *APIKeyStore) Count() int

Count returns the total number of API keys

func (*APIKeyStore) GetAll

func (s *APIKeyStore) GetAll() []*models.APIKey

GetAll retrieves all API keys

func (*APIKeyStore) GetResourceVersion

func (s *APIKeyStore) GetResourceVersion() int64

GetResourceVersion returns the current resource version

func (*APIKeyStore) IncrementResourceVersion

func (s *APIKeyStore) IncrementResourceVersion() int64

IncrementResourceVersion increments and returns the resource version

func (*APIKeyStore) RemoveByAPI

func (s *APIKeyStore) RemoveByAPI(apiId string) int

RemoveByAPI removes all API keys for a specific API

func (*APIKeyStore) Revoke

func (s *APIKeyStore) Revoke(apiId, apiKeyName string) bool

Revoke marks an API key as revoked by finding it through API ID and key name lookup

func (*APIKeyStore) Store

func (s *APIKeyStore) Store(apiKey *models.APIKey) error

Store adds or updates an API key

type BackendConfig

type BackendConfig struct {
	Type       string
	SQLitePath string
	Postgres   PostgresConnectionConfig
	GatewayID  string
}

BackendConfig contains the minimal storage backend configuration required by NewStorage.

type ConfigStore

type ConfigStore struct {
	TopicManager *TopicManager
	// contains filtered or unexported fields
}

ConfigStore holds all API configurations in memory for fast access

func NewConfigStore

func NewConfigStore() *ConfigStore

NewConfigStore creates a new in-memory config store

func (*ConfigStore) Add

func (cs *ConfigStore) Add(cfg *models.StoredConfig) error

Add stores a new configuration in memory

func (*ConfigStore) AddTemplate

func (cs *ConfigStore) AddTemplate(template *models.StoredLLMProviderTemplate) error

AddTemplate adds a new LLM provider template. ID must be unique and immutable; name must be unique.

func (*ConfigStore) CountActiveAPIKeysByUserAndAPI

func (cs *ConfigStore) CountActiveAPIKeysByUserAndAPI(apiId, userID string) (int, error)

CountActiveAPIKeysByUserAndAPI counts active API keys for a specific user and API

func (*ConfigStore) Delete

func (cs *ConfigStore) Delete(id string) error

Delete removes a configuration from memory

func (*ConfigStore) DeleteTemplate

func (cs *ConfigStore) DeleteTemplate(id string) error

DeleteTemplate removes an LLM provider template from the store by ID

func (*ConfigStore) Get

func (cs *ConfigStore) Get(id string) (*models.StoredConfig, error)

Get retrieves a configuration by ID

func (*ConfigStore) GetAPIKeyByID

func (cs *ConfigStore) GetAPIKeyByID(apiId, id string) (*models.APIKey, error)

GetAPIKeyByID retrieves an API key by its ID

func (*ConfigStore) GetAPIKeyByName

func (cs *ConfigStore) GetAPIKeyByName(apiId, name string) (*models.APIKey, error)

GetAPIKeyByName retrieves an API key by its apiId and name

func (*ConfigStore) GetAPIKeysByAPI

func (cs *ConfigStore) GetAPIKeysByAPI(apiId string) ([]*models.APIKey, error)

GetAPIKeysByAPI retrieves all API keys for a specific API

func (*ConfigStore) GetAll

func (cs *ConfigStore) GetAll() []*models.StoredConfig

GetAll returns all configurations

func (*ConfigStore) GetAllByKind

func (cs *ConfigStore) GetAllByKind(kind string) []*models.StoredConfig

GetAllByKind returns all configurations of a specific kind

func (*ConfigStore) GetAllSensitiveValues

func (cs *ConfigStore) GetAllSensitiveValues() []string

GetAllSensitiveValues aggregates SensitiveValues from all stored configs. Used by the config dump handler to redact resolved secret values from the dump output.

func (*ConfigStore) GetAllTemplates

func (cs *ConfigStore) GetAllTemplates() []*models.StoredLLMProviderTemplate

GetAllTemplates retrieves all LLM provider templates

func (*ConfigStore) GetByKindAndHandle

func (cs *ConfigStore) GetByKindAndHandle(kind string, handle string) (*models.StoredConfig, error)

GetByKindAndHandle returns a configuration of a specific kind, and handle

func (*ConfigStore) GetByKindNameAndVersion

func (cs *ConfigStore) GetByKindNameAndVersion(kind string, name string, version string) (*models.StoredConfig, error)

GetByKindNameAndVersion returns a configuration of a specific kind, name and version

func (*ConfigStore) GetLabelsMap

func (cs *ConfigStore) GetLabelsMap(handle string) (map[string]string, error)

GetLabels retrieves labels for an API

func (*ConfigStore) GetSnapshotVersion

func (cs *ConfigStore) GetSnapshotVersion() int64

GetSnapshotVersion returns the current snapshot version

func (*ConfigStore) GetTemplate

func (cs *ConfigStore) GetTemplate(id string) (*models.StoredLLMProviderTemplate, error)

GetTemplate retrieves an LLM provider template by ID

func (*ConfigStore) GetTemplateByHandle

func (cs *ConfigStore) GetTemplateByHandle(handle string) (*models.StoredLLMProviderTemplate, error)

GetTemplateByHandle retrieves an LLM provider template by handle identifier

func (*ConfigStore) IncrementSnapshotVersion

func (cs *ConfigStore) IncrementSnapshotVersion() int64

IncrementSnapshotVersion atomically increments and returns the next snapshot version

func (*ConfigStore) RemoveAPIKeyByID

func (cs *ConfigStore) RemoveAPIKeyByID(apiId, id string) error

RemoveAPIKeyByID removes an API key from the in-memory cache by its ID

func (*ConfigStore) RemoveAPIKeysByAPI

func (cs *ConfigStore) RemoveAPIKeysByAPI(apiId string) error

RemoveAPIKeysByAPI removes all API keys for a specific API

func (*ConfigStore) SetSnapshotVersion

func (cs *ConfigStore) SetSnapshotVersion(version int64)

SetSnapshotVersion sets the snapshot version (used during startup)

func (*ConfigStore) StoreAPIKey

func (cs *ConfigStore) StoreAPIKey(apiKey *models.APIKey) error

StoreAPIKey stores an API key in the in-memory cache

func (*ConfigStore) Update

func (cs *ConfigStore) Update(cfg *models.StoredConfig) error

Update modifies an existing configuration in memory

func (*ConfigStore) UpdateTemplate

func (cs *ConfigStore) UpdateTemplate(template *models.StoredLLMProviderTemplate) error

UpdateTemplate updates an existing LLM provider template's metadata. ID cannot change; only name can change.

type LazyResource

type LazyResource struct {
	// ID uniquely identifies this resource within its type
	ID string `json:"id" yaml:"id"`

	// ResourceType identifies the type of resource (e.g., "LlmProviderTemplate")
	ResourceType string `json:"resource_type" yaml:"resource_type"`

	// Resource contains the actual resource data as a map
	Resource map[string]interface{} `json:"resource" yaml:"resource"`
}

LazyResource represents a generic lazy resource with ID, Resource_Type, and Actual_Resource

type LazyResourceStore

type LazyResourceStore struct {
	// contains filtered or unexported fields
}

LazyResourceStore manages lazy resources in memory with thread-safe operations

func NewLazyResourceStore

func NewLazyResourceStore(logger *slog.Logger) *LazyResourceStore

NewLazyResourceStore creates a new lazy resource store

func (*LazyResourceStore) Count

func (s *LazyResourceStore) Count() int

Count returns the total number of resources

func (*LazyResourceStore) GetAll

func (s *LazyResourceStore) GetAll() []*LazyResource

GetAll retrieves all resources

func (*LazyResourceStore) GetByIDAndType

func (s *LazyResourceStore) GetByIDAndType(id, resourceType string) (*LazyResource, bool)

GetByIDAndType retrieves a resource by its ID and type (precise lookup)

func (*LazyResourceStore) GetByType

func (s *LazyResourceStore) GetByType(resourceType string) map[string]*LazyResource

GetByType retrieves all resources of a specific type

func (*LazyResourceStore) GetResourceVersion

func (s *LazyResourceStore) GetResourceVersion() int64

GetResourceVersion returns the current resource version

func (*LazyResourceStore) IncrementResourceVersion

func (s *LazyResourceStore) IncrementResourceVersion() int64

IncrementResourceVersion increments and returns the resource version

func (*LazyResourceStore) RemoveByIDAndType

func (s *LazyResourceStore) RemoveByIDAndType(id, resourceType string) bool

RemoveByIDAndType removes a resource by its ID and type (precise removal)

func (*LazyResourceStore) RemoveByType

func (s *LazyResourceStore) RemoveByType(resourceType string) int

RemoveByType removes all resources of a specific type

func (*LazyResourceStore) Store

func (s *LazyResourceStore) Store(resource *LazyResource)

Store adds or updates a lazy resource

type PolicyStore

type PolicyStore struct {
	// contains filtered or unexported fields
}

PolicyStore provides thread-safe in-memory storage for policy configurations

func NewPolicyStore

func NewPolicyStore() *PolicyStore

NewPolicyStore creates a new policy store

func (*PolicyStore) Clear

func (s *PolicyStore) Clear()

Clear removes all policy configurations

func (*PolicyStore) Count

func (s *PolicyStore) Count() int

Count returns the total number of stored policy configurations

func (*PolicyStore) Delete

func (s *PolicyStore) Delete(id string) error

Delete removes a policy configuration by ID

func (*PolicyStore) Get

Get retrieves a policy configuration by ID

func (*PolicyStore) GetAll

func (s *PolicyStore) GetAll() []*models.StoredPolicyConfig

GetAll returns all policy configurations

func (*PolicyStore) GetByCompositeKey

func (s *PolicyStore) GetByCompositeKey(apiName, version, context string) (*models.StoredPolicyConfig, bool)

GetByCompositeKey retrieves a policy configuration by composite key (api_name:version:context)

func (*PolicyStore) GetResourceVersion

func (s *PolicyStore) GetResourceVersion() int64

GetResourceVersion returns the current resource version

func (*PolicyStore) IncrementResourceVersion

func (s *PolicyStore) IncrementResourceVersion() int64

IncrementResourceVersion increments and returns the new resource version

func (*PolicyStore) Set

func (s *PolicyStore) Set(policy *models.StoredPolicyConfig) error

Set stores or updates a policy configuration

type PostgresConnectionConfig

type PostgresConnectionConfig struct {
	DSN             string
	Host            string
	Port            int
	Database        string
	User            string
	Password        string
	SSLMode         string
	ConnectTimeout  time.Duration
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxLifetime time.Duration
	ConnMaxIdleTime time.Duration
	ApplicationName string
}

PostgresConnectionConfig holds PostgreSQL-specific connection settings.

type PostgresStorage

type PostgresStorage struct {
	// contains filtered or unexported fields
}

PostgresStorage implements the Storage interface using PostgreSQL.

type RuntimeConfigStore

type RuntimeConfigStore struct {
	// contains filtered or unexported fields
}

RuntimeConfigStore provides thread-safe in-memory storage for RuntimeDeployConfig. Keyed by "kind:handle" (e.g. "RestApi:petstore"), consistent with ConfigStore indexing.

func NewRuntimeConfigStore

func NewRuntimeConfigStore() *RuntimeConfigStore

NewRuntimeConfigStore creates a new RuntimeConfigStore.

func (*RuntimeConfigStore) Delete

func (s *RuntimeConfigStore) Delete(key string) error

Delete removes a RuntimeDeployConfig by key.

func (*RuntimeConfigStore) Get

Get retrieves a RuntimeDeployConfig by key.

func (*RuntimeConfigStore) GetAll

GetAll returns all RuntimeDeployConfigs.

func (*RuntimeConfigStore) GetResourceVersion

func (s *RuntimeConfigStore) GetResourceVersion() int64

GetResourceVersion returns the current resource version.

func (*RuntimeConfigStore) IncrementResourceVersion

func (s *RuntimeConfigStore) IncrementResourceVersion() int64

IncrementResourceVersion increments and returns the new resource version.

func (*RuntimeConfigStore) Set

Set stores or updates a RuntimeDeployConfig.

type SQLiteStorage

type SQLiteStorage struct {
	// contains filtered or unexported fields
}

SQLiteStorage implements the Storage interface using SQLite

type Storage

type Storage interface {
	// SaveConfig persists a new API configuration.
	//
	// Returns an error if a configuration with the same name and version already exists.
	// Implementations should ensure this operation is atomic (all-or-nothing).
	SaveConfig(cfg *models.StoredConfig) error

	// UpdateConfig updates an existing API configuration.
	//
	// Returns an error if the configuration does not exist.
	// Implementations should ensure this operation is atomic and thread-safe.
	UpdateConfig(cfg *models.StoredConfig) error

	// UpsertConfig performs a timestamp-guarded insert-or-update of an API configuration.
	// It inserts the config if it does not exist, or updates it only if the incoming
	// deployed_at timestamp is newer than the existing one. This prevents stale events
	// (from sync or WebSocket) from overwriting newer data.
	//
	// Returns (true, nil) if the row was actually inserted or updated.
	// Returns (false, nil) if the row exists with a newer deployed_at (stale event — no-op).
	// Returns (false, error) on database errors.
	UpsertConfig(cfg *models.StoredConfig) (bool, error)

	// DeleteConfig removes an API configuration by ID.
	//
	// Returns an error if the configuration does not exist.
	// Implementations should ensure related data (if any) is cleaned up.
	DeleteConfig(id string) error

	// GetConfig retrieves an API configuration by ID.
	//
	// Returns an error if the configuration is not found.
	// This is the fastest lookup method (O(1) for most databases).
	GetConfig(id string) (*models.StoredConfig, error)

	// GetConfigByKindAndHandle retrieves an API configuration by kind and handle.
	//
	// Returns an error if the configuration is not found.
	// The handle is the metadata.name from the API YAML configuration.
	// The kind filter prevents cross-kind reads (e.g. fetching a WebSub API when a REST API is expected).
	// This is the recommended lookup method for REST API endpoints.
	GetConfigByKindAndHandle(kind string, handle string) (*models.StoredConfig, error)

	// GetConfigByKindNameAndVersion retrieves an API configuration by kind, display name, and version.
	//
	// Returns an error if the configuration is not found.
	GetConfigByKindNameAndVersion(kind, displayName, version string) (*models.StoredConfig, error)

	// GetAllConfigs retrieves all API configurations.
	//
	// Returns an empty slice if no configurations exist.
	// May be expensive for large datasets; consider pagination in future versions.
	GetAllConfigs() ([]*models.StoredConfig, error)

	// GetAllConfigsByKind retrieves all API configurations of a specific kind.
	//
	// Returns an empty slice if no configurations of the specified kind exist.
	// May be expensive for large datasets; consider pagination in future versions.
	GetAllConfigsByKind(kind string) ([]*models.StoredConfig, error)

	// GetAllConfigsByOrigin retrieves artifact metadata for all configs with the
	// given origin. Only the artifacts table is queried (no resource-table JOINs),
	// so the Configuration field will be nil. This is intended for sync diff
	// computation where only metadata (UUID, Kind, DesiredState, DeployedAt) is needed.
	//
	// Returns an empty slice if no configurations of the specified origin exist.
	GetAllConfigsByOrigin(origin models.Origin) ([]*models.StoredConfig, error)

	// SaveLLMProviderTemplate persists a new LLM provider template.
	//
	// Returns an error if a template with the same name already exists.
	// Implementations should ensure this operation is atomic (all-or-nothing).
	SaveLLMProviderTemplate(template *models.StoredLLMProviderTemplate) error

	// UpdateLLMProviderTemplate updates an existing LLM provider template.
	//
	// Returns an error if the template does not exist.
	// Implementations should ensure this operation is atomic and thread-safe.
	UpdateLLMProviderTemplate(template *models.StoredLLMProviderTemplate) error

	// DeleteLLMProviderTemplate removes an LLM provider template by ID.
	//
	// Returns an error if the template does not exist.
	DeleteLLMProviderTemplate(id string) error

	// GetLLMProviderTemplate retrieves an LLM provider template by ID.
	//
	// Returns an error if the template is not found.
	// This is the fastest lookup method (O(1) for most databases).
	GetLLMProviderTemplate(id string) (*models.StoredLLMProviderTemplate, error)

	// GetAllLLMProviderTemplates retrieves all LLM provider templates.
	//
	// Returns an empty slice if no templates exist.
	// May be expensive for large datasets; consider pagination in future versions.
	GetAllLLMProviderTemplates() ([]*models.StoredLLMProviderTemplate, error)

	// GetLLMProviderTemplateByHandle retrieves an LLM provider template by handle.
	//
	// Returns an error if the template is not found.
	GetLLMProviderTemplateByHandle(handle string) (*models.StoredLLMProviderTemplate, error)

	// SaveAPIKey persists a new API key.
	//
	// Returns an error if an API key with the same key value already exists.
	// Implementations should ensure this operation is atomic (all-or-nothing).
	SaveAPIKey(apiKey *models.APIKey) error

	// UpsertAPIKey inserts or updates an API key identified by (gateway_id, artifact_uuid, name).
	//
	// If a key with the same name already exists for the artifact, it is updated only when the
	// incoming record's updated_at is strictly newer than the stored one — preventing a slow
	// bulk-sync goroutine from overwriting a more recent event-driven write.
	// The existing source and external_ref_id are preserved when the incoming values are absent.
	UpsertAPIKey(apiKey *models.APIKey) error

	// GetAPIKeyByID retrieves an API key by its ID.
	//
	// Returns an error if the API key is not found.
	// This is used for API key validation during authentication.
	GetAPIKeyByID(id string) (*models.APIKey, error)

	// GetAPIKeyByUUID retrieves an API key by its platform UUID.
	//
	// Returns an error if the API key is not found.
	GetAPIKeyByUUID(uuid string) (*models.APIKey, error)

	// GetAPIKeyByKey retrieves an API key by its key value.
	//
	// Returns an error if the API key is not found.
	// This is used for API key validation during authentication.
	GetAPIKeyByKey(key string) (*models.APIKey, error)

	// GetAPIKeysByAPI retrieves all API keys for a specific API.
	//
	// Returns an empty slice if no API keys exist for the API.
	// Used for listing API keys associated with an API.
	GetAPIKeysByAPI(apiId string) ([]*models.APIKey, error)

	// GetAllAPIKeys retrieves all active API keys from the database.
	//
	// Returns an empty slice if no active API keys exist.
	// Used for loading active API keys into memory on startup.
	GetAllAPIKeys() ([]*models.APIKey, error)

	// GetAPIKeysByAPIAndName retrieves an API key by its name within a specific API.
	//
	// Returns an error if the API key is not found.
	// Used for retrieving specific API keys by name.
	GetAPIKeysByAPIAndName(apiId, name string) (*models.APIKey, error)

	// UpdateAPIKey updates an existing API key (e.g., to revoke or expire it).
	//
	// Returns an error if the API key does not exist.
	// Implementations should ensure this operation is atomic and thread-safe.
	UpdateAPIKey(apiKey *models.APIKey) error

	// DeleteAPIKey removes an API key by its key value.
	//
	// Returns an error if the API key does not exist.
	DeleteAPIKey(key string) error

	// RemoveAPIKeysAPI removes all API keys for a specific API.
	//
	// Returns an error if API key removal fails.
	RemoveAPIKeysAPI(apiId string) error

	// RemoveAPIKeyAPIAndName removes an API key by its API apiId and name.
	//
	// Returns an error if the API key does not exist.
	RemoveAPIKeyAPIAndName(apiId, name string) error

	// CountActiveAPIKeysByUserAndAPI counts active API keys for a specific user and API.
	//
	// Returns the count of active API keys and an error if the operation fails.
	CountActiveAPIKeysByUserAndAPI(apiId, userID string) (int, error)

	// ListAPIKeysForArtifactsNotIn returns the minimal key info (uuid + artifact_uuid) for
	// keys whose artifact_uuid is in artifactUUIDs but whose own UUID is not in keyUUIDs.
	// Used to collect identifiers before deletion so callers can publish EventHub events.
	ListAPIKeysForArtifactsNotIn(artifactUUIDs []string, keyUUIDs []string) ([]*models.APIKey, error)

	// DeleteAPIKeysByUUIDs removes API keys by their UUIDs. Used after ListAPIKeysForArtifactsNotIn
	// has already identified the stale keys, avoiding a redundant NOT IN query.
	DeleteAPIKeysByUUIDs(uuids []string) error

	SaveSubscriptionPlan(plan *models.SubscriptionPlan) error
	GetSubscriptionPlanByID(id, gatewayID string) (*models.SubscriptionPlan, error)
	ListSubscriptionPlans(gatewayID string) ([]*models.SubscriptionPlan, error)
	UpdateSubscriptionPlan(plan *models.SubscriptionPlan) error
	DeleteSubscriptionPlan(id, gatewayID string) error

	// DeleteSubscriptionPlansNotIn removes plans for this gateway whose IDs are not in the given set.
	// Used for bulk-sync reconciliation when plans were deleted on the control plane during downtime.
	DeleteSubscriptionPlansNotIn(ids []string) error

	// SaveSubscription persists a new subscription.
	SaveSubscription(sub *models.Subscription) error

	// GetSubscriptionByID retrieves a subscription by ID and gateway.
	GetSubscriptionByID(id, gatewayID string) (*models.Subscription, error)

	// ListSubscriptionsByAPI returns subscriptions for an API with optional filters.
	ListSubscriptionsByAPI(apiID, gatewayID string, applicationID *string, status *string) ([]*models.Subscription, error)

	// ListActiveSubscriptions returns all ACTIVE subscriptions for this gateway in one query.
	// Used for bulk snapshot generation to avoid N+1 per-API lookups.
	ListActiveSubscriptions() ([]*models.Subscription, error)

	// UpdateSubscription updates an existing subscription.
	UpdateSubscription(sub *models.Subscription) error

	// DeleteSubscription removes a subscription by ID and gateway.
	DeleteSubscription(id, gatewayID string) error

	// DeleteSubscriptionsForAPINotIn removes subscriptions for the given API whose IDs are not in the set.
	// Used for bulk-sync reconciliation when subscriptions were deleted on the control plane during downtime.
	DeleteSubscriptionsForAPINotIn(apiID string, ids []string) error
	// ReplaceApplicationAPIKeyMappings atomically replaces all API key mappings for an application.
	//
	// Existing mappings are removed and the supplied mapping set is inserted.
	ReplaceApplicationAPIKeyMappings(application *models.StoredApplication, mappings []*models.ApplicationAPIKeyMapping) error

	// SaveCertificate persists a new certificate.
	//
	// Returns an error if a certificate with the same name already exists.
	// Implementations should ensure this operation is atomic.
	SaveCertificate(cert *models.StoredCertificate) error

	// GetCertificate retrieves a certificate by ID.
	//
	// Returns an error if the certificate is not found.
	GetCertificate(id string) (*models.StoredCertificate, error)

	// GetCertificateByName retrieves a certificate by name.
	//
	// Returns an error if the certificate is not found.
	GetCertificateByName(name string) (*models.StoredCertificate, error)

	// ListCertificates retrieves all certificates ordered by creation time.
	//
	// Returns an empty slice if no certificates exist.
	ListCertificates() ([]*models.StoredCertificate, error)

	// DeleteCertificate removes a certificate by ID.
	//
	// Returns an error if the certificate does not exist.
	DeleteCertificate(id string) error

	// SaveSecret persists a new encrypted secret.
	//
	// Returns an error if a secret with the same handle already exists.
	// Implementations should ensure this operation is atomic.
	SaveSecret(secret *models.Secret) error

	// GetSecrets retrieves metadata for all secrets.
	//
	// Returns non-sensitive metadata (handle, display_name, timestamps) without
	// ciphertext or values. Returns an empty slice if no secrets exist.
	GetSecrets() ([]models.SecretMeta, error)

	// GetSecret retrieves a secret by handle.
	//
	// Returns error if the secret does not exist.
	GetSecret(handle string) (*models.Secret, error)

	// UpdateSecret updates an existing secret.
	//
	// Returns the updated secret (including database-assigned timestamps) or error
	// if the secret does not exist. Implementations should ensure this operation is atomic.
	UpdateSecret(secret *models.Secret) (*models.Secret, error)

	// DeleteSecret permanently removes a secret.
	//
	// Returns error if the secret does not exist.
	DeleteSecret(handle string) error

	// SecretExists checks if a secret with the given handle exists.
	//
	// Returns true if the secret exists, false otherwise.
	SecretExists(handle string) (bool, error)

	// GetDB returns the underlying *sql.DB for direct access.
	// Used by EventHub for event synchronization.
	// Returns nil for non-SQL backends.
	GetDB() *sql.DB

	// Close closes the storage connection and releases resources.
	//
	// Should be called during graceful shutdown.
	// Implementations should ensure all pending writes are flushed.
	Close() error
}

Storage is the interface for persisting API configurations.

Design Philosophy

This interface is intentionally database-agnostic to support multiple storage backends without requiring changes to business logic. All database operations are abstracted through this interface to ensure:

  • Flexibility: Easy migration between database engines (SQLite, PostgreSQL, MySQL)
  • Testability: Simple mocking for unit tests
  • Separation of Concerns: Business logic (API handlers, xDS server) is decoupled from storage implementation

Implementation Guidelines

When implementing this interface for a new database backend:

  • DO: Keep all database-specific code in the implementation file (e.g., sqlite.go, postgres.go)
  • DO: Use transactions for write operations to ensure consistency
  • DO: Handle concurrency safely (multiple goroutines may call methods simultaneously)
  • DON'T: Add database-specific methods to this interface (keep it generic)
  • DON'T: Expose database implementation details through return types

Current Implementations

  • SQLiteStorage: Embedded database for single-instance deployments (see sqlite.go)
  • PostgreSQL: Planned for multi-instance deployments with external database (future)
  • MySQL: Planned for cloud deployments (future)

Migration Strategy

To migrate from one database backend to another:

  1. Implement this interface for the new database (e.g., postgres.go)
  2. Add database type to StorageConfig in pkg/config/config.go
  3. Update storage initialization in cmd/controller/main.go
  4. Export data from old database and import to new database
  5. No changes required in API handlers, xDS server, or business logic

See docs/postgresql-migration.md for detailed migration guide.

func NewStorage

func NewStorage(cfg BackendConfig, logger *slog.Logger) (Storage, error)

NewStorage creates the configured persistent storage backend.

type TopicManager

type TopicManager struct {
	// contains filtered or unexported fields
}

TopicManager manages a thread-safe collection of registered topics per API configuration

func NewTopicManager

func NewTopicManager() *TopicManager

NewTopicManager creates a new TopicManager instance

func (*TopicManager) Add

func (tm *TopicManager) Add(configID, topic string) bool

Add adds a topic for a specific config ID to the manager Returns true if the topic was added, false if it already exists

func (*TopicManager) Clear

func (tm *TopicManager) Clear()

Clear removes all topics from the manager

func (*TopicManager) Contains

func (tm *TopicManager) Contains(configID, topic string) bool

Contains checks if a topic exists for a specific config ID

func (*TopicManager) Count

func (tm *TopicManager) Count() int

Count returns the total number of unique topics across all configs

func (*TopicManager) CountForConfig

func (tm *TopicManager) CountForConfig(configID string) int

CountForConfig returns the number of topics for a specific config ID

func (*TopicManager) GetAll

func (tm *TopicManager) GetAll() map[string]bool

GetAll returns all topics across all configs as a map[topic]bool

func (*TopicManager) GetAllByConfig

func (tm *TopicManager) GetAllByConfig(configID string) []string

GetAllByConfig returns all topics for a specific config ID

func (*TopicManager) GetAllForConfig

func (tm *TopicManager) GetAllForConfig() map[string]map[string]bool

GetAllForConfig returns the full nested map structure (for debugging/inspection)

func (*TopicManager) IsTopicExist

func (tm *TopicManager) IsTopicExist(configID string, topic string) bool

func (*TopicManager) Remove

func (tm *TopicManager) Remove(configID, topic string) bool

Remove removes a topic for a specific config ID from the manager Returns true if the topic was removed, false if it doesn't exist

func (*TopicManager) RemoveAllForConfig

func (tm *TopicManager) RemoveAllForConfig(configID string)

RemoveAllForConfig removes all topics for a specific config ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL