Documentation
¶
Index ¶
- func InitTracer(ctx context.Context, serviceName, version string) (shutdown func(context.Context) error, err error)
- type CacheConfig
- type CacheStore
- func (s *CacheStore) CachedFlagCount() int32
- func (s *CacheStore) Close()
- func (s *CacheStore) FlushAll()
- func (s *CacheStore) GetFlagState(flagID string) *CachedFlagState
- func (s *CacheStore) GetKillSet() *KillSet
- func (s *CacheStore) GetOverride(flagID, entityID string) *CachedOverride
- func (s *CacheStore) GetStaleFlagState(flagID string) *CachedFlagState
- func (s *CacheStore) GetStaleOverride(flagID, entityID string) *CachedOverride
- func (s *CacheStore) OverrideCacheSize() int64
- func (s *CacheStore) SetFlagState(state *CachedFlagState)
- func (s *CacheStore) SetKillSet(ks *KillSet)
- func (s *CacheStore) SetOverride(o *CachedOverride)
- func (s *CacheStore) WaitAll()
- type CacheStoreConfig
- type CachedFlagState
- type CachedOverride
- type Config
- type DBFetcher
- func (f *DBFetcher) FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)
- func (f *DBFetcher) FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)
- func (f *DBFetcher) GetFlagStateProto(ctx context.Context, flagID string) (*pbflagsv1.GetFlagStateResponse, error)
- func (f *DBFetcher) GetKilledFlags(ctx context.Context) (*KillSet, error)
- func (f *DBFetcher) GetKilledFlagsProto(ctx context.Context) (*pbflagsv1.GetKilledFlagsResponse, error)
- func (f *DBFetcher) GetOverridesProto(ctx context.Context, entityID string, flagIDs []string) (*pbflagsv1.GetOverridesResponse, error)
- type Defaults
- type DescriptorWatcher
- type Evaluator
- type Fetcher
- type FlagDef
- type FlagServerClient
- func (c *FlagServerClient) FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)
- func (c *FlagServerClient) FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)
- func (c *FlagServerClient) GetKilledFlags(ctx context.Context) (*KillSet, error)
- func (c *FlagServerClient) StateServer() StateServer
- type HealthTracker
- type KillFetcher
- type KillPoller
- type KillSet
- type Metrics
- type Registry
- type Service
- func (s *Service) BulkEvaluate(ctx context.Context, req *connect.Request[pbflagsv1.BulkEvaluateRequest]) (*connect.Response[pbflagsv1.BulkEvaluateResponse], error)
- func (s *Service) Evaluate(ctx context.Context, req *connect.Request[pbflagsv1.EvaluateRequest]) (*connect.Response[pbflagsv1.EvaluateResponse], error)
- func (s *Service) GetFlagState(ctx context.Context, req *connect.Request[pbflagsv1.GetFlagStateRequest]) (*connect.Response[pbflagsv1.GetFlagStateResponse], error)
- func (s *Service) GetKilledFlags(ctx context.Context, _ *connect.Request[pbflagsv1.GetKilledFlagsRequest]) (*connect.Response[pbflagsv1.GetKilledFlagsResponse], error)
- func (s *Service) GetOverrides(ctx context.Context, req *connect.Request[pbflagsv1.GetOverridesRequest]) (*connect.Response[pbflagsv1.GetOverridesResponse], error)
- func (s *Service) Health(_ context.Context, _ *connect.Request[pbflagsv1.HealthRequest]) (*connect.Response[pbflagsv1.HealthResponse], error)
- type StateServer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitTracer ¶ added in v0.4.0
func InitTracer(ctx context.Context, serviceName, version string) (shutdown func(context.Context) error, err error)
InitTracer sets up the OpenTelemetry TracerProvider.
When OTEL_EXPORTER_OTLP_ENDPOINT is set, spans are exported via gRPC to the configured collector. When unset, a noop provider is registered so all tracing calls are zero-cost no-ops.
Returns a shutdown function that flushes buffered spans.
Types ¶
type CacheConfig ¶
type CacheConfig struct {
KillTTL time.Duration `yaml:"kill_ttl"`
FlagTTL time.Duration `yaml:"flag_ttl"`
OverrideTTL time.Duration `yaml:"override_ttl"`
OverrideMaxSize int64 `yaml:"override_max_entries"`
JitterPercent int `yaml:"jitter_percent"`
FetchTimeout time.Duration `yaml:"fetch_timeout"`
}
CacheConfig controls cache TTLs and sizes.
type CacheStore ¶
type CacheStore struct {
// contains filtered or unexported fields
}
CacheStore provides caching for all evaluator data.
Ristretto caches provide fast lookups with TTL-based freshness. When an entry expires, it is evicted. To satisfy the design requirement that stale data is served indefinitely during server outages, a parallel staleFlagMap and staleOverrideMap retain the last-known values without TTL. These maps are consulted only when the Ristretto cache misses AND the server fetch fails.
func NewCacheStore ¶
func NewCacheStore(cfg CacheStoreConfig) (*CacheStore, error)
NewCacheStore creates a cache store.
func (*CacheStore) CachedFlagCount ¶
func (s *CacheStore) CachedFlagCount() int32
CachedFlagCount returns the approximate number of cached flag states.
func (*CacheStore) FlushAll ¶
func (s *CacheStore) FlushAll()
FlushAll evicts all entries from the hot caches (Ristretto) but preserves the stale fallback maps.
func (*CacheStore) GetFlagState ¶
func (s *CacheStore) GetFlagState(flagID string) *CachedFlagState
GetFlagState returns the cached state for a flag, or nil on miss.
func (*CacheStore) GetKillSet ¶
func (s *CacheStore) GetKillSet() *KillSet
GetKillSet returns the current kill set.
func (*CacheStore) GetOverride ¶
func (s *CacheStore) GetOverride(flagID, entityID string) *CachedOverride
GetOverride returns the cached override, or nil on miss.
func (*CacheStore) GetStaleFlagState ¶
func (s *CacheStore) GetStaleFlagState(flagID string) *CachedFlagState
GetStaleFlagState returns the last-known state from the stale fallback map.
func (*CacheStore) GetStaleOverride ¶
func (s *CacheStore) GetStaleOverride(flagID, entityID string) *CachedOverride
GetStaleOverride returns the last-known override from the stale fallback map.
func (*CacheStore) OverrideCacheSize ¶ added in v0.4.0
func (s *CacheStore) OverrideCacheSize() int64
OverrideCacheSize returns the approximate number of cached overrides.
func (*CacheStore) SetFlagState ¶
func (s *CacheStore) SetFlagState(state *CachedFlagState)
SetFlagState caches a flag state with TTL + jitter.
func (*CacheStore) SetKillSet ¶
func (s *CacheStore) SetKillSet(ks *KillSet)
SetKillSet atomically replaces the kill set.
func (*CacheStore) SetOverride ¶
func (s *CacheStore) SetOverride(o *CachedOverride)
SetOverride caches an override with TTL + jitter.
func (*CacheStore) WaitAll ¶
func (s *CacheStore) WaitAll()
WaitAll blocks until both Ristretto caches have processed all pending writes and evictions.
type CacheStoreConfig ¶
type CacheStoreConfig struct {
FlagTTL time.Duration
OverrideTTL time.Duration
OverrideMaxSize int64
JitterPercent int
}
CacheStoreConfig configures cache sizes and TTLs.
type CachedFlagState ¶
type CachedFlagState struct {
FlagID string
State pbflagsv1.State
Value *pbflagsv1.FlagValue
Archived bool
}
CachedFlagState holds cached state for a single flag.
type CachedOverride ¶
type CachedOverride struct {
FlagID string
EntityID string
State pbflagsv1.State
Value *pbflagsv1.FlagValue
}
CachedOverride holds a cached per-entity override.
type Config ¶
type Config struct {
Descriptors string `yaml:"descriptors"`
Server string `yaml:"server"`
Listen string `yaml:"listen"`
Admin string `yaml:"admin"`
Database string `yaml:"database"`
Cache CacheConfig `yaml:"cache"`
EnvName string `yaml:"env_name"`
EnvColor string `yaml:"env_color"`
}
Config is the evaluator configuration.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with all default values applied.
func LoadConfig ¶
LoadConfig reads configuration from a YAML file, applying defaults for unset fields. Environment variables override file values:
PBFLAGS_DESCRIPTORS → Descriptors PBFLAGS_SERVER → Server PBFLAGS_LISTEN → Listen PBFLAGS_ADMIN → Admin PBFLAGS_DATABASE → Database
type DBFetcher ¶
type DBFetcher struct {
// contains filtered or unexported fields
}
DBFetcher fetches flag state directly from PostgreSQL. It implements Fetcher (for the Evaluator), KillFetcher (for the KillPoller), and StateServer (for the Service to serve state RPCs in root mode).
func NewDBFetcher ¶
func NewDBFetcher(pool *pgxpool.Pool, tracker *HealthTracker, logger *slog.Logger, m *Metrics, tracer trace.Tracer) *DBFetcher
NewDBFetcher creates a fetcher backed by direct database access.
func (*DBFetcher) FetchFlagState ¶
FetchFlagState implements Fetcher.
func (*DBFetcher) FetchOverrides ¶
func (f *DBFetcher) FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)
FetchOverrides implements Fetcher.
func (*DBFetcher) GetFlagStateProto ¶
func (f *DBFetcher) GetFlagStateProto(ctx context.Context, flagID string) (*pbflagsv1.GetFlagStateResponse, error)
GetFlagStateProto implements StateServer.
func (*DBFetcher) GetKilledFlags ¶
GetKilledFlags implements KillFetcher.
func (*DBFetcher) GetKilledFlagsProto ¶
func (f *DBFetcher) GetKilledFlagsProto(ctx context.Context) (*pbflagsv1.GetKilledFlagsResponse, error)
GetKilledFlagsProto implements StateServer.
func (*DBFetcher) GetOverridesProto ¶
func (f *DBFetcher) GetOverridesProto(ctx context.Context, entityID string, flagIDs []string) (*pbflagsv1.GetOverridesResponse, error)
GetOverridesProto implements StateServer.
type Defaults ¶
type Defaults struct {
// contains filtered or unexported fields
}
Defaults is an immutable map of flag_id → FlagDef, built from descriptors.pb.
func NewDefaults ¶
NewDefaults creates a Defaults from parsed flag definitions.
func (*Defaults) DefaultValue ¶
DefaultValue returns the compiled default FlagValue for a flag, or nil.
type DescriptorWatcher ¶
type DescriptorWatcher struct {
// contains filtered or unexported fields
}
DescriptorWatcher monitors the descriptors file for changes and atomically swaps the defaults registry when a valid new descriptor set is parsed.
func NewDescriptorWatcher ¶
func NewDescriptorWatcher( path string, reg *Registry, pollInterval time.Duration, sighupCh <-chan os.Signal, logger *slog.Logger, ) *DescriptorWatcher
NewDescriptorWatcher creates a descriptor file watcher.
func (*DescriptorWatcher) Run ¶
func (w *DescriptorWatcher) Run(ctx context.Context)
Run starts the watcher. Blocks until ctx is cancelled.
type Evaluator ¶
type Evaluator struct {
// contains filtered or unexported fields
}
Evaluator resolves flag values using the full precedence chain: kill set → per-entity override → global state → stale cache → compiled default. Note: per-entity kills are not supported; overrides can only be ENABLED or DEFAULT.
type Fetcher ¶
type Fetcher interface {
FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)
FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)
}
Fetcher fetches flag state from the remote flag server on demand.
type FlagDef ¶
type FlagDef struct {
FlagID string
FeatureID string
FieldNum int32
Name string
FlagType pbflagsv1.FlagType
Layer string // Layer name from FlagOptions.layer (e.g., "user", "entity"). Empty means global.
Default *pbflagsv1.FlagValue
SupportedValues *pbflagspb.SupportedValues
FeatureDisplayName string
FeatureDescription string
FeatureOwner string
}
FlagDef is a flag definition extracted from a proto descriptor.
func ParseDescriptorFile ¶
ParseDescriptorFile reads and parses a FileDescriptorSet from the given path.
func ParseDescriptors ¶
ParseDescriptors extracts flag definitions from a serialized FileDescriptorSet.
func (*FlagDef) IsGlobalLayer ¶
IsGlobalLayer returns true if the flag layer is global (including unspecified).
type FlagServerClient ¶
type FlagServerClient struct {
// contains filtered or unexported fields
}
FlagServerClient talks to the upstream evaluator's FlagEvaluatorService via Connect.
func NewFlagServerClient ¶
func NewFlagServerClient(serverURL string, tracker *HealthTracker, fetchTimeout time.Duration, m *Metrics, opts ...connect.ClientOption) *FlagServerClient
NewFlagServerClient creates a Connect client for the upstream evaluator.
func (*FlagServerClient) FetchFlagState ¶
func (c *FlagServerClient) FetchFlagState(ctx context.Context, flagID string) (*CachedFlagState, error)
FetchFlagState implements Fetcher.
func (*FlagServerClient) FetchOverrides ¶
func (c *FlagServerClient) FetchOverrides(ctx context.Context, entityID string, flagIDs []string) ([]*CachedOverride, error)
FetchOverrides implements Fetcher.
func (*FlagServerClient) GetKilledFlags ¶
func (c *FlagServerClient) GetKilledFlags(ctx context.Context) (*KillSet, error)
GetKilledFlags fetches the current kill set from the server.
func (*FlagServerClient) StateServer ¶
func (c *FlagServerClient) StateServer() StateServer
StateServer returns a StateServer that delegates to the upstream evaluator.
type HealthTracker ¶
type HealthTracker struct {
// contains filtered or unexported fields
}
HealthTracker maintains health state and computes backoff intervals.
func NewHealthTracker ¶
func NewHealthTracker(m *Metrics) *HealthTracker
NewHealthTracker creates a tracker starting in CONNECTING state.
func (*HealthTracker) BackoffMultiplier ¶
func (t *HealthTracker) BackoffMultiplier() int
BackoffMultiplier returns the current backoff multiplier.
0 failures → 1x, 1-2 → 2x, 3-5 → 4x, 6+ → 8x (capped)
func (*HealthTracker) ConsecutiveFailures ¶
func (t *HealthTracker) ConsecutiveFailures() int32
ConsecutiveFailures returns the current failure count.
func (*HealthTracker) RecordFailure ¶
func (t *HealthTracker) RecordFailure()
RecordFailure increments the failure counter and may transition to DEGRADED.
func (*HealthTracker) RecordSuccess ¶
func (t *HealthTracker) RecordSuccess()
RecordSuccess resets failure count and transitions to SERVING.
func (*HealthTracker) SecondsSinceContact ¶
func (t *HealthTracker) SecondsSinceContact() int64
SecondsSinceContact returns seconds since last successful server contact.
func (*HealthTracker) Status ¶
func (t *HealthTracker) Status() pbflagsv1.EvaluatorStatus
Status returns the current evaluator status.
type KillFetcher ¶
KillFetcher fetches the current kill set. Implemented by both FlagServerClient (proxy mode) and DBFetcher (root mode).
type KillPoller ¶
type KillPoller struct {
// contains filtered or unexported fields
}
KillPoller runs a background loop that polls GetKilledFlags at regular intervals. On failure, the last known kill set is preserved indefinitely.
func NewKillPoller ¶
func NewKillPoller( fetcher KillFetcher, cache *CacheStore, tracker *HealthTracker, baseTTL, timeout time.Duration, logger *slog.Logger, m *Metrics, ) *KillPoller
NewKillPoller creates a kill set poller.
func (*KillPoller) Run ¶
func (p *KillPoller) Run(ctx context.Context)
Run starts the polling loop. Blocks until ctx is cancelled.
type KillSet ¶
type KillSet struct {
FlagIDs map[string]struct{}
}
KillSet holds the current set of globally killed flags.
type Metrics ¶ added in v0.4.0
type Metrics struct {
EvaluationsTotal *prometheus.CounterVec
CacheHitsTotal *prometheus.CounterVec
CacheMissesTotal *prometheus.CounterVec
FetchDuration *prometheus.HistogramVec
KillSetSize prometheus.Gauge
ConsecutiveFails prometheus.Gauge
PollerLastSuccess prometheus.Gauge
}
Metrics holds all Prometheus metrics for the evaluator.
func NewMetrics ¶ added in v0.4.0
func NewMetrics(reg prometheus.Registerer) *Metrics
NewMetrics creates and registers all metrics with the given registerer.
func NewNoopMetrics ¶ added in v0.4.0
func NewNoopMetrics() *Metrics
NewNoopMetrics returns metrics that are not registered with any registry. Use in tests where metric values don't matter.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry provides atomic access to the current Defaults snapshot.
func NewRegistry ¶
NewRegistry creates a Registry with the given initial defaults.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service implements the FlagEvaluatorService Connect handler.
func NewService ¶
func NewService(eval *Evaluator, reg *Registry, tracker *HealthTracker, cache *CacheStore, state StateServer) *Service
NewService creates the evaluator Connect service.
func (*Service) BulkEvaluate ¶
func (s *Service) BulkEvaluate(ctx context.Context, req *connect.Request[pbflagsv1.BulkEvaluateRequest]) (*connect.Response[pbflagsv1.BulkEvaluateResponse], error)
BulkEvaluate resolves multiple flags at once.
func (*Service) Evaluate ¶
func (s *Service) Evaluate(ctx context.Context, req *connect.Request[pbflagsv1.EvaluateRequest]) (*connect.Response[pbflagsv1.EvaluateResponse], error)
Evaluate resolves a single flag value.
func (*Service) GetFlagState ¶
func (s *Service) GetFlagState(ctx context.Context, req *connect.Request[pbflagsv1.GetFlagStateRequest]) (*connect.Response[pbflagsv1.GetFlagStateResponse], error)
GetFlagState fetches state for a single flag via the StateServer.
func (*Service) GetKilledFlags ¶
func (s *Service) GetKilledFlags(ctx context.Context, _ *connect.Request[pbflagsv1.GetKilledFlagsRequest]) (*connect.Response[pbflagsv1.GetKilledFlagsResponse], error)
GetKilledFlags fetches the current kill set via the StateServer.
func (*Service) GetOverrides ¶
func (s *Service) GetOverrides(ctx context.Context, req *connect.Request[pbflagsv1.GetOverridesRequest]) (*connect.Response[pbflagsv1.GetOverridesResponse], error)
GetOverrides fetches overrides for an entity via the StateServer.
type StateServer ¶
type StateServer interface {
GetFlagStateProto(ctx context.Context, flagID string) (*pbflagsv1.GetFlagStateResponse, error)
GetKilledFlagsProto(ctx context.Context) (*pbflagsv1.GetKilledFlagsResponse, error)
GetOverridesProto(ctx context.Context, entityID string, flagIDs []string) (*pbflagsv1.GetOverridesResponse, error)
}
StateServer serves flag state RPCs. In root mode this reads from DB (via DBFetcher), in proxy mode it delegates to an upstream evaluator.