evaluator

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitTracer added in v0.4.0

func InitTracer(ctx context.Context, serviceName, version string) (shutdown func(context.Context) error, err error)

InitTracer sets up the OpenTelemetry TracerProvider.

When OTEL_EXPORTER_OTLP_ENDPOINT is set, spans are exported via gRPC to the configured collector. When unset, a noop provider is registered so all tracing calls are zero-cost no-ops.

Returns a shutdown function that flushes buffered spans.

Types

type CacheConfig

type CacheConfig struct {
	KillTTL         time.Duration `yaml:"kill_ttl"`
	FlagTTL         time.Duration `yaml:"flag_ttl"`
	OverrideTTL     time.Duration `yaml:"override_ttl"`
	OverrideMaxSize int64         `yaml:"override_max_entries"`
	JitterPercent   int           `yaml:"jitter_percent"`
	FetchTimeout    time.Duration `yaml:"fetch_timeout"`
}

CacheConfig controls cache TTLs and sizes.

type CacheStore

type CacheStore struct {
	// contains filtered or unexported fields
}

CacheStore provides caching for all evaluator data.

Ristretto caches provide fast lookups with TTL-based freshness. When an entry expires, it is evicted. To satisfy the design requirement that stale data is served indefinitely during server outages, a parallel staleFlagMap and staleOverrideMap retain the last-known values without TTL. These maps are consulted only when the Ristretto cache misses AND the server fetch fails.

func NewCacheStore

func NewCacheStore(cfg CacheStoreConfig) (*CacheStore, error)

NewCacheStore creates a cache store.

func (*CacheStore) CachedFlagCount

func (s *CacheStore) CachedFlagCount() int32

CachedFlagCount returns the approximate number of cached flag states.

func (*CacheStore) Close

func (s *CacheStore) Close()

Close releases cache resources.

func (*CacheStore) FlushAll

func (s *CacheStore) FlushAll()

FlushAll evicts all entries from the hot caches (Ristretto) but preserves the stale fallback maps.

func (*CacheStore) GetFlagState

func (s *CacheStore) GetFlagState(flagID string) *CachedFlagState

GetFlagState returns the cached state for a flag, or nil on miss.

func (*CacheStore) GetKillSet

func (s *CacheStore) GetKillSet() *KillSet

GetKillSet returns the current kill set.

func (*CacheStore) GetOverride

func (s *CacheStore) GetOverride(flagID, entityID string) *CachedOverride

GetOverride returns the cached override, or nil on miss.

func (*CacheStore) GetStaleFlagState

func (s *CacheStore) GetStaleFlagState(flagID string) *CachedFlagState

GetStaleFlagState returns the last-known state from the stale fallback map.

func (*CacheStore) GetStaleOverride

func (s *CacheStore) GetStaleOverride(flagID, entityID string) *CachedOverride

GetStaleOverride returns the last-known override from the stale fallback map.

func (*CacheStore) OverrideCacheSize added in v0.4.0

func (s *CacheStore) OverrideCacheSize() int64

OverrideCacheSize returns the approximate number of cached overrides.

func (*CacheStore) SetFlagState

func (s *CacheStore) SetFlagState(state *CachedFlagState)

SetFlagState caches a flag state with TTL + jitter.

func (*CacheStore) SetKillSet

func (s *CacheStore) SetKillSet(ks *KillSet)

SetKillSet atomically replaces the kill set.

func (*CacheStore) SetOverride

func (s *CacheStore) SetOverride(o *CachedOverride)

SetOverride caches an override with TTL + jitter.

func (*CacheStore) WaitAll

func (s *CacheStore) WaitAll()

WaitAll blocks until both Ristretto caches have processed all pending writes and evictions.

type CacheStoreConfig

type CacheStoreConfig struct {
	FlagTTL         time.Duration
	OverrideTTL     time.Duration
	OverrideMaxSize int64
	JitterPercent   int
}

CacheStoreConfig configures cache sizes and TTLs.

type CachedFlagState

type CachedFlagState struct {
	FlagID   string
	State    pbflagsv1.State
	Value    *pbflagsv1.FlagValue
	Archived bool
}

CachedFlagState holds cached state for a single flag.

type CachedOverride

type CachedOverride struct {
	FlagID   string
	EntityID string
	State    pbflagsv1.State
	Value    *pbflagsv1.FlagValue
}

CachedOverride holds a cached per-entity override.

type Config

type Config struct {
	Descriptors string      `yaml:"descriptors"`
	Server      string      `yaml:"server"`
	Listen      string      `yaml:"listen"`
	Admin       string      `yaml:"admin"`
	Database    string      `yaml:"database"`
	Cache       CacheConfig `yaml:"cache"`
	EnvName     string      `yaml:"env_name"`
	EnvColor    string      `yaml:"env_color"`
}

Config is the evaluator configuration.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with all default values applied.

func LoadConfig

func LoadConfig(path string) (Config, error)

LoadConfig reads configuration from a YAML file, applying defaults for unset fields. Environment variables override file values:

PBFLAGS_DESCRIPTORS → Descriptors
PBFLAGS_SERVER      → Server
PBFLAGS_LISTEN      → Listen
PBFLAGS_ADMIN       → Admin
PBFLAGS_DATABASE    → Database

type DBFetcher

type DBFetcher struct {
	// contains filtered or unexported fields
}

DBFetcher fetches flag state directly from PostgreSQL. It implements Fetcher (for the Evaluator), KillFetcher (for the KillPoller), and StateServer (for the Service to serve state RPCs in root mode).

func NewDBFetcher

func NewDBFetcher(pool *pgxpool.Pool, tracker *HealthTracker, logger *slog.Logger, m *Metrics, tracer trace.Tracer) *DBFetcher

NewDBFetcher creates a fetcher backed by direct database access.

func (*DBFetcher) FetchFlagState

func (f *DBFetcher) FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)

FetchFlagState implements Fetcher.

func (*DBFetcher) FetchOverrides

func (f *DBFetcher) FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)

FetchOverrides implements Fetcher.

func (*DBFetcher) GetFlagStateProto

func (f *DBFetcher) GetFlagStateProto(ctx context.Context, flagID string) (*pbflagsv1.GetFlagStateResponse, error)

GetFlagStateProto implements StateServer.

func (*DBFetcher) GetKilledFlags

func (f *DBFetcher) GetKilledFlags(ctx context.Context) (*KillSet, error)

GetKilledFlags implements KillFetcher.

func (*DBFetcher) GetKilledFlagsProto

func (f *DBFetcher) GetKilledFlagsProto(ctx context.Context) (*pbflagsv1.GetKilledFlagsResponse, error)

GetKilledFlagsProto implements StateServer.

func (*DBFetcher) GetOverridesProto

func (f *DBFetcher) GetOverridesProto(ctx context.Context, entityID string, flagIDs []string) (*pbflagsv1.GetOverridesResponse, error)

GetOverridesProto implements StateServer.

type Defaults

type Defaults struct {
	// contains filtered or unexported fields
}

Defaults is an immutable map of flag_id → FlagDef, built from descriptors.pb.

func NewDefaults

func NewDefaults(defs []FlagDef) *Defaults

NewDefaults creates a Defaults from parsed flag definitions.

func (*Defaults) DefaultValue

func (d *Defaults) DefaultValue(flagID string) *pbflagsv1.FlagValue

DefaultValue returns the compiled default FlagValue for a flag, or nil.

func (*Defaults) FlagIDs

func (d *Defaults) FlagIDs() []string

FlagIDs returns all known flag IDs.

func (*Defaults) Get

func (d *Defaults) Get(flagID string) (FlagDef, bool)

Get returns the flag definition and its default value, or false if unknown.

func (*Defaults) Len

func (d *Defaults) Len() int

Len returns the number of flags in the registry.

type DescriptorWatcher

type DescriptorWatcher struct {
	// contains filtered or unexported fields
}

DescriptorWatcher monitors the descriptors file for changes and atomically swaps the defaults registry when a valid new descriptor set is parsed.

func NewDescriptorWatcher

func NewDescriptorWatcher(
	path string,
	reg *Registry,
	pollInterval time.Duration,
	sighupCh <-chan os.Signal,
	logger *slog.Logger,
) *DescriptorWatcher

NewDescriptorWatcher creates a descriptor file watcher.

func (*DescriptorWatcher) Run

func (w *DescriptorWatcher) Run(ctx context.Context)

Run starts the watcher. Blocks until ctx is cancelled.

type Evaluator

type Evaluator struct {
	// contains filtered or unexported fields
}

Evaluator resolves flag values using the full precedence chain: kill set → per-entity override → global state → stale cache → compiled default. Note: per-entity kills are not supported; overrides can only be ENABLED or DEFAULT.

func NewEvaluator

func NewEvaluator(reg *Registry, cache *CacheStore, fetcher Fetcher, logger *slog.Logger, m *Metrics, tracer trace.Tracer) *Evaluator

NewEvaluator creates an Evaluator.

func (*Evaluator) Evaluate

func (e *Evaluator) Evaluate(ctx context.Context, flagID, entityID string) (value *pbflagsv1.FlagValue, source pbflagsv1.EvaluationSource)

Evaluate resolves a single flag for an optional entity.

type Fetcher

type Fetcher interface {
	FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)
	FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)
}

Fetcher fetches flag state from the remote flag server on demand.

type FlagDef

type FlagDef struct {
	FlagID    string
	FeatureID string
	FieldNum  int32
	Name      string
	FlagType  pbflagsv1.FlagType
	Layer     string // Layer name from FlagOptions.layer (e.g., "user", "entity"). Empty means global.
	Default   *pbflagsv1.FlagValue

	SupportedValues *pbflagspb.SupportedValues

	FeatureDisplayName string
	FeatureDescription string
	FeatureOwner       string
}

FlagDef is a flag definition extracted from a proto descriptor.

func ParseDescriptorFile

func ParseDescriptorFile(path string) ([]FlagDef, error)

ParseDescriptorFile reads and parses a FileDescriptorSet from the given path.

func ParseDescriptors

func ParseDescriptors(data []byte) ([]FlagDef, error)

ParseDescriptors extracts flag definitions from a serialized FileDescriptorSet.

func (*FlagDef) IsGlobalLayer

func (d *FlagDef) IsGlobalLayer() bool

IsGlobalLayer returns true if the flag layer is global (including unspecified).

type FlagServerClient

type FlagServerClient struct {
	// contains filtered or unexported fields
}

FlagServerClient talks to the upstream evaluator's FlagEvaluatorService via Connect.

func NewFlagServerClient

func NewFlagServerClient(serverURL string, tracker *HealthTracker, fetchTimeout time.Duration, m *Metrics, opts ...connect.ClientOption) *FlagServerClient

NewFlagServerClient creates a Connect client for the upstream evaluator.

func (*FlagServerClient) FetchFlagState

func (c *FlagServerClient) FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)

FetchFlagState implements Fetcher.

func (*FlagServerClient) FetchOverrides

func (c *FlagServerClient) FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)

FetchOverrides implements Fetcher.

func (*FlagServerClient) GetKilledFlags

func (c *FlagServerClient) GetKilledFlags(ctx context.Context) (*KillSet, error)

GetKilledFlags fetches the current kill set from the server.

func (*FlagServerClient) StateServer

func (c *FlagServerClient) StateServer() StateServer

StateServer returns a StateServer that delegates to the upstream evaluator.

type HealthTracker

type HealthTracker struct {
	// contains filtered or unexported fields
}

HealthTracker maintains health state and computes backoff intervals.

func NewHealthTracker

func NewHealthTracker(m *Metrics) *HealthTracker

NewHealthTracker creates a tracker starting in CONNECTING state.

func (*HealthTracker) BackoffMultiplier

func (t *HealthTracker) BackoffMultiplier() int

BackoffMultiplier returns the current backoff multiplier.

0 failures → 1x, 1-2 → 2x, 3-5 → 4x, 6+ → 8x (capped)

func (*HealthTracker) ConsecutiveFailures

func (t *HealthTracker) ConsecutiveFailures() int32

ConsecutiveFailures returns the current failure count.

func (*HealthTracker) RecordFailure

func (t *HealthTracker) RecordFailure()

RecordFailure increments the failure counter and may transition to DEGRADED.

func (*HealthTracker) RecordSuccess

func (t *HealthTracker) RecordSuccess()

RecordSuccess resets failure count and transitions to SERVING.

func (*HealthTracker) SecondsSinceContact

func (t *HealthTracker) SecondsSinceContact() int64

SecondsSinceContact returns seconds since last successful server contact.

func (*HealthTracker) Status

Status returns the current evaluator status.

type KillFetcher

type KillFetcher interface {
	GetKilledFlags(ctx context.Context) (*KillSet, error)
}

KillFetcher fetches the current kill set. Implemented by both FlagServerClient (proxy mode) and DBFetcher (root mode).

type KillPoller

type KillPoller struct {
	// contains filtered or unexported fields
}

KillPoller runs a background loop that polls GetKilledFlags at regular intervals. On failure, the last known kill set is preserved indefinitely.

func NewKillPoller

func NewKillPoller(
	fetcher KillFetcher,
	cache *CacheStore,
	tracker *HealthTracker,
	baseTTL, timeout time.Duration,
	logger *slog.Logger,
	m *Metrics,
) *KillPoller

NewKillPoller creates a kill set poller.

func (*KillPoller) Run

func (p *KillPoller) Run(ctx context.Context)

Run starts the polling loop. Blocks until ctx is cancelled.

type KillSet

type KillSet struct {
	FlagIDs map[string]struct{}
}

KillSet holds the current set of globally killed flags.

func (*KillSet) IsKilled

func (ks *KillSet) IsKilled(flagID string) bool

IsKilled checks if a flag is globally killed.

type Metrics added in v0.4.0

type Metrics struct {
	EvaluationsTotal  *prometheus.CounterVec
	CacheHitsTotal    *prometheus.CounterVec
	CacheMissesTotal  *prometheus.CounterVec
	FetchDuration     *prometheus.HistogramVec
	KillSetSize       prometheus.Gauge
	ConsecutiveFails  prometheus.Gauge
	PollerLastSuccess prometheus.Gauge
}

Metrics holds all Prometheus metrics for the evaluator.

func NewMetrics added in v0.4.0

func NewMetrics(reg prometheus.Registerer) *Metrics

NewMetrics creates and registers all metrics with the given registerer.

func NewNoopMetrics added in v0.4.0

func NewNoopMetrics() *Metrics

NewNoopMetrics returns metrics that are not registered with any registry. Use in tests where metric values don't matter.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry provides atomic access to the current Defaults snapshot.

func NewRegistry

func NewRegistry(initial *Defaults) *Registry

NewRegistry creates a Registry with the given initial defaults.

func (*Registry) Load

func (r *Registry) Load() *Defaults

Load returns the current Defaults snapshot.

func (*Registry) Swap

func (r *Registry) Swap(next *Defaults)

Swap atomically replaces the defaults with a new snapshot.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service implements the FlagEvaluatorService Connect handler.

func NewService

func NewService(eval *Evaluator, reg *Registry, tracker *HealthTracker, cache *CacheStore, state StateServer) *Service

NewService creates the evaluator Connect service.

func (*Service) BulkEvaluate

BulkEvaluate resolves multiple flags at once.

func (*Service) Evaluate

Evaluate resolves a single flag value.

func (*Service) GetFlagState

GetFlagState fetches state for a single flag via the StateServer.

func (*Service) GetKilledFlags

GetKilledFlags fetches the current kill set via the StateServer.

func (*Service) GetOverrides

GetOverrides fetches overrides for an entity via the StateServer.

func (*Service) Health

Health returns the evaluator's current health and degradation status.

type StateServer

type StateServer interface {
	GetFlagStateProto(ctx context.Context, flagID string) (*pbflagsv1.GetFlagStateResponse, error)
	GetKilledFlagsProto(ctx context.Context) (*pbflagsv1.GetKilledFlagsResponse, error)
	GetOverridesProto(ctx context.Context, entityID string, flagIDs []string) (*pbflagsv1.GetOverridesResponse, error)
}

StateServer serves flag state RPCs. In root mode this reads from DB (via DBFetcher), in proxy mode it delegates to an upstream evaluator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL