handler

package
v0.0.0-...-f07c2e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EvalFlag = func(evalContext models.EvalContext) *models.EvalResult {
	flag := LookupFlag(evalContext)
	return EvalFlagWithContext(flag, evalContext)
}
View Source
var EvalFlagWithContext = func(flag *entity.Flag, evalContext models.EvalContext) *models.EvalResult {
	flagID := util.SafeUint(evalContext.FlagID)
	flagKey := util.SafeString(evalContext.FlagKey)

	if flag == nil {
		emptyFlag := &entity.Flag{Model: gorm.Model{ID: flagID}, Key: flagKey}
		return BlankResult(emptyFlag, evalContext, fmt.Sprintf("flagID %v not found or deleted", flagID))
	}

	if !flag.Enabled {
		return BlankResult(flag, evalContext, fmt.Sprintf("flagID %v is not enabled", flag.ID))
	}

	if len(flag.Segments) == 0 {
		return BlankResult(flag, evalContext, fmt.Sprintf("flagID %v has no segments", flag.ID))
	}

	if evalContext.EntityID == "" {
		evalContext.EntityID = fmt.Sprintf("randomly_generated_%d", rand.Int31())
	}

	if flag.EntityType != "" {
		evalContext.EntityType = flag.EntityType
	}

	logs := []*models.SegmentDebugLog{}
	var vID int64
	var sID int64

	for _, segment := range flag.Segments {
		sID = int64(segment.ID)
		variantID, log, evalNextSegment := evalSegment(flag.ID, evalContext, segment)
		if config.Config.EvalDebugEnabled && evalContext.EnableDebug {
			logs = append(logs, log)
		}
		if variantID != nil {
			vID = int64(*variantID)
		}
		if !evalNextSegment {
			break
		}
	}
	evalResult := BlankResult(flag, evalContext, "")
	evalResult.EvalDebugLog.SegmentDebugLogs = logs
	evalResult.SegmentID = sID
	evalResult.VariantID = vID
	v := flag.FlagEvaluation.VariantsMap[util.SafeUint(vID)]
	if v != nil {
		evalResult.VariantAttachment = v.Attachment
		evalResult.VariantKey = v.Key
	}

	logEvalResult(evalResult, flag.DataRecordsEnabled)
	evalResult.DataRecordsEnabled = flag.DataRecordsEnabled
	return evalResult
}
View Source
var EvalFlagsByTags = func(evalContext models.EvalContext) []*models.EvalResult {
	cache := GetEvalCache()
	fs := cache.GetByTags(evalContext.FlagTags, evalContext.FlagTagsOperator)
	results := []*models.EvalResult{}
	for _, f := range fs {
		results = append(results, EvalFlagWithContext(f, evalContext))
	}
	return results
}
View Source
var GetEvalCache = func() *EvalCache {
	singletonEvalCacheOnce.Do(func() {
		ec := &EvalCache{
			cache:           &cacheContainer{},
			refreshTimeout:  config.Config.EvalCacheRefreshTimeout,
			refreshInterval: config.Config.EvalCacheRefreshInterval,
		}
		singletonEvalCache = ec
	})
	return singletonEvalCache
}

GetEvalCache gets the EvalCache

View Source
var LookupFlag = func(evalContext models.EvalContext) *entity.Flag {
	cache := GetEvalCache()
	flagID := util.SafeUint(evalContext.FlagID)
	flagKey := util.SafeString(evalContext.FlagKey)
	f := cache.GetByFlagKeyOrID(flagID)
	if f == nil {
		f = cache.GetByFlagKeyOrID(flagKey)
	}
	return f
}
View Source
var NewKafkaRecorder = func() DataRecorder {
	if config.Config.RecorderKafkaVerbose {
		sarama.Logger = logrus.StandardLogger()
	}

	cfg := sarama.NewConfig()

	tlscfg := createTLSConfiguration(
		config.Config.RecorderKafkaCertFile,
		config.Config.RecorderKafkaKeyFile,
		config.Config.RecorderKafkaCAFile,
		config.Config.RecorderKafkaVerifySSL,
		config.Config.RecorderKafkaSimpleSSL,
	)
	if tlscfg != nil {
		cfg.Net.TLS.Enable = true
		cfg.Net.TLS.Config = tlscfg
	}

	if config.Config.RecorderKafkaSASLUsername != "" && config.Config.RecorderKafkaSASLPassword != "" {
		cfg.Net.SASL.Enable = true
		cfg.Net.SASL.User = config.Config.RecorderKafkaSASLUsername
		cfg.Net.SASL.Password = config.Config.RecorderKafkaSASLPassword
	}

	cfg.Net.MaxOpenRequests = config.Config.RecorderKafkaMaxOpenReqs

	cfg.Producer.Compression = sarama.CompressionCodec(config.Config.RecorderKafkaCompressionCodec)
	cfg.Producer.RequiredAcks = sarama.RequiredAcks(config.Config.RecorderKafkaRequiredAcks)
	cfg.Producer.Idempotent = config.Config.RecorderKafkaIdempotent
	cfg.Producer.Retry.Max = config.Config.RecorderKafkaRetryMax
	cfg.Producer.Flush.Frequency = config.Config.RecorderKafkaFlushFrequency
	cfg.Version = mustParseKafkaVersion(config.Config.RecorderKafkaVersion)

	if cfg.Producer.Idempotent {
		cfg.Producer.RequiredAcks = sarama.WaitForAll
		cfg.Net.MaxOpenRequests = 1
		if !cfg.Version.IsAtLeast(sarama.V0_11_0_0) {
			cfg.Version = sarama.V0_11_0_0
		}
		logrus.Info("Idempotent producer enabled: set RequiredAcks=WaitForAll, MaxOpenRequests=1, Version>=0.11.0.0")
	}

	brokerList := strings.Split(config.Config.RecorderKafkaBrokers, ",")
	producer, err := saramaNewAsyncProducer(brokerList, cfg)
	if err != nil {
		logrus.WithField("kafka_error", err).Fatal("Failed to start Sarama producer:")
	}

	var encryptor dataRecordEncryptor
	if config.Config.RecorderKafkaEncrypted && config.Config.RecorderKafkaEncryptionKey != "" {
		encryptor = newSimpleboxEncryptor(config.Config.RecorderKafkaEncryptionKey)
	}

	bufSize := config.Config.RecorderKafkaBufferSize
	if bufSize < 1 {
		bufSize = 10000
		logrus.Warn("RecorderKafkaBufferSize < 1, using default 10000")
	}

	workerCount := config.Config.RecorderKafkaWorkerCount
	if workerCount < 1 {
		workerCount = 4
		logrus.Warn("RecorderKafkaWorkerCount < 1, using default 4")
	}

	recorder := &kafkaRecorder{
		topic:               config.Config.RecorderKafkaTopic,
		partitionKeyEnabled: config.Config.RecorderKafkaPartitionKeyEnabled,
		producer:            producer,
		recordCh:            make(chan models.EvalResult, bufSize),
		options: DataRecordFrameOptions{
			Encrypted:       config.Config.RecorderKafkaEncrypted,
			Encryptor:       encryptor,
			FrameOutputMode: config.Config.RecorderFrameOutputMode,
		},
	}

	recorder.errWg.Add(1)
	go func() {
		defer recorder.errWg.Done()
		for err := range producer.Errors() {
			logrus.WithField("kafka_error", err).Error("failed to write access log entry")
			if config.Global.Prometheus.RecorderErrors != nil {
				config.Global.Prometheus.RecorderErrors.Inc()
			}
		}
	}()

	if config.Config.PrometheusEnabled {
		promauto.NewGaugeFunc(prometheus.GaugeOpts{
			Name: "flagr_recorder_buffer_usage",
			Help: "Current number of records in the async buffer",
		}, func() float64 {
			return float64(len(recorder.recordCh))
		})
	}

	for i := 0; i < workerCount; i++ {
		recorder.startWorker()
	}

	return recorder
}

NewKafkaRecorder creates a new Kafka recorder

View Source
var NewKinesisRecorder = func() DataRecorder {
	cfg, err := config.LoadDefaultConfig(context.TODO())
	if err != nil {
		logrus.WithField("kinesis_error", err).Fatal("error creating aws session")
	}

	client := kinesis.NewFromConfig(cfg)

	p := newKinesisProducer(&producer.Config{
		StreamName:          flagrConfig.Config.RecorderKinesisStreamName,
		Client:              client,
		BacklogCount:        flagrConfig.Config.RecorderKinesisBacklogCount,
		MaxConnections:      flagrConfig.Config.RecorderKinesisMaxConnections,
		FlushInterval:       flagrConfig.Config.RecorderKinesisFlushInterval,
		BatchSize:           flagrConfig.Config.RecorderKinesisBatchSize,
		BatchCount:          flagrConfig.Config.RecorderKinesisBatchCount,
		AggregateBatchCount: flagrConfig.Config.RecorderKinesisAggregateBatchCount,
		AggregateBatchSize:  flagrConfig.Config.RecorderKinesisAggregateBatchSize,
		Verbose:             flagrConfig.Config.RecorderKinesisVerbose,
		Logger:              &kplogrus.Logger{Logger: logrus.StandardLogger()},
	})

	p.Start()

	go func() {
		for err := range p.NotifyFailures() {
			logrus.WithField("kinesis_error", err).Error("error pushing to kinesis")
		}
	}()

	return &kinesisRecorder{
		producer: p,
		options: DataRecordFrameOptions{
			Encrypted:       false,
			FrameOutputMode: flagrConfig.Config.RecorderFrameOutputMode,
		},
	}
}

NewKinesisRecorder creates a new Kinesis recorder

View Source
var NewPubsubRecorder = func() DataRecorder {
	client, err := pubsubClient()
	if err != nil {
		logrus.WithField("pubsub_error", err).Fatal("error getting pubsub client")
	}

	return &pubsubRecorder{
		producer:  client,
		publisher: client.Publisher(config.Config.RecorderPubsubTopicName),
		options: DataRecordFrameOptions{
			Encrypted:       false,
			FrameOutputMode: config.Config.RecorderFrameOutputMode,
		},
	}
}

NewPubsubRecorder creates a new Pubsub recorder

Functions

func BlankResult

func BlankResult(f *entity.Flag, evalContext models.EvalContext, msg string) *models.EvalResult

BlankResult creates a blank result

func CloseDataRecorder

func CloseDataRecorder() error

CloseDataRecorder closes the singleton data recorder if it was initialized

func ErrorMessage

func ErrorMessage(s string, data ...any) *models.Error

ErrorMessage generates error messages

func LoadSimpleBooleanFlagTemplate

func LoadSimpleBooleanFlagTemplate(flag *entity.Flag, tx *gorm.DB) error

LoadSimpleBooleanFlagTemplate loads the simple boolean flag template into a new flag. It creates a single segment, variant ('on'), and distribution.

func Setup

func Setup(api *operations.FlagrAPI)

Setup initialize all the handler functions

func WrapWithOFREP

func WrapWithOFREP(next http.Handler) http.Handler

WrapWithOFREP wraps the given handler with OFREP route interception.

Types

type CRUD

type CRUD interface {
	// Flags
	FindFlags(flag.FindFlagsParams) middleware.Responder
	CreateFlag(flag.CreateFlagParams) middleware.Responder
	GetFlag(flag.GetFlagParams) middleware.Responder
	PutFlag(flag.PutFlagParams) middleware.Responder
	DeleteFlag(flag.DeleteFlagParams) middleware.Responder
	RestoreFlag(flag.RestoreFlagParams) middleware.Responder
	SetFlagEnabledState(flag.SetFlagEnabledParams) middleware.Responder
	GetFlagSnapshots(params flag.GetFlagSnapshotsParams) middleware.Responder
	GetFlagEntityTypes(params flag.GetFlagEntityTypesParams) middleware.Responder

	//Tags
	CreateTag(tag.CreateTagParams) middleware.Responder
	DeleteTag(tag.DeleteTagParams) middleware.Responder
	FindTags(tag.FindTagsParams) middleware.Responder
	FindAllTags(params tag.FindAllTagsParams) middleware.Responder

	// Segments
	CreateSegment(segment.CreateSegmentParams) middleware.Responder
	FindSegments(segment.FindSegmentsParams) middleware.Responder
	PutSegment(segment.PutSegmentParams) middleware.Responder
	DeleteSegment(segment.DeleteSegmentParams) middleware.Responder
	PutSegmentsReorder(segment.PutSegmentsReorderParams) middleware.Responder

	// Constraints
	CreateConstraint(constraint.CreateConstraintParams) middleware.Responder
	FindConstraints(constraint.FindConstraintsParams) middleware.Responder
	PutConstraint(params constraint.PutConstraintParams) middleware.Responder
	DeleteConstraint(params constraint.DeleteConstraintParams) middleware.Responder

	// Distributions
	FindDistributions(distribution.FindDistributionsParams) middleware.Responder
	PutDistributions(distribution.PutDistributionsParams) middleware.Responder

	// Variants
	CreateVariant(variant.CreateVariantParams) middleware.Responder
	FindVariants(variant.FindVariantsParams) middleware.Responder
	PutVariant(variant.PutVariantParams) middleware.Responder
	DeleteVariant(variant.DeleteVariantParams) middleware.Responder
}

CRUD is the CRUD interface

func NewCRUD

func NewCRUD() CRUD

NewCRUD creates a new CRUD instance

type DataRecordFrame

type DataRecordFrame struct {
	// contains filtered or unexported fields
}

DataRecordFrame represents the structure we can json.Marshal into data recorders

func (*DataRecordFrame) GetPartitionKey

func (drf *DataRecordFrame) GetPartitionKey() string

GetPartitionKey gets the partition key from entityID

func (*DataRecordFrame) MarshalJSON

func (drf *DataRecordFrame) MarshalJSON() ([]byte, error)

MarshalJSON defines the behavior of MarshalJSON for DataRecordFrame

func (*DataRecordFrame) Output

func (drf *DataRecordFrame) Output() ([]byte, error)

Output sets the paylaod using its input and returns the json marshal bytes

type DataRecordFrameOptions

type DataRecordFrameOptions struct {
	Encrypted       bool
	Encryptor       dataRecordEncryptor
	FrameOutputMode string
}

DataRecordFrameOptions represents the options we can set to create a DataRecordFrame

type DataRecorder

type DataRecorder interface {
	AsyncRecord(models.EvalResult)
	NewDataRecordFrame(models.EvalResult) DataRecordFrame
	Close() error
}

DataRecorder can record and produce the evaluation result

func GetDataRecorder

func GetDataRecorder() DataRecorder

GetDataRecorder gets the data recorder

type Error

type Error struct {
	StatusCode int
	Message    string
	Values     []any
}

Error is the handler error

func NewError

func NewError(statusCode int, msg string, values ...any) *Error

NewError creates Error

func (*Error) Error

func (e *Error) Error() string

type Eval

Eval is the Eval interface

func NewEval

func NewEval() Eval

NewEval creates a new Eval instance

type EvalCache

type EvalCache struct {
	// contains filtered or unexported fields
}

EvalCache is the in-memory cache just for evaluation

func GenFixtureEvalCache

func GenFixtureEvalCache() *EvalCache

GenFixtureEvalCache generates a fixture

func GenFixtureEvalCacheWithFlags

func GenFixtureEvalCacheWithFlags(flags []entity.Flag) *EvalCache

func GenFixtureEvalCacheWithNFlags

func GenFixtureEvalCacheWithNFlags(n int) *EvalCache

GenFixtureEvalCacheWithNFlags generates an EvalCache with n realistic flags. Each flag has segments with constraints, multiple variants, and tags.

func (*EvalCache) GetAllEnabledFlags

func (ec *EvalCache) GetAllEnabledFlags() []*entity.Flag

GetAllEnabledFlags returns all enabled flags from the cache.

func (*EvalCache) GetByFlagKeyOrID

func (ec *EvalCache) GetByFlagKeyOrID(keyOrID any) *entity.Flag

GetByFlagKeyOrID gets the flag by Key or ID

func (*EvalCache) GetByTags

func (ec *EvalCache) GetByTags(tags []string, operator *string) []*entity.Flag

func (*EvalCache) GetETag

func (ec *EvalCache) GetETag() string

GetETag returns an ETag string based on the unix millisecond timestamp of the last cache reload. Using a timestamp ensures the ETag survives application restarts without false 304 responses.

func (*EvalCache) Start

func (ec *EvalCache) Start()

Start starts the polling of EvalCache

type EvalCacheJSON

type EvalCacheJSON struct {
	Flags []entity.Flag
}

EvalCacheJSON is the JSON serialization format of EvalCache's flags

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL