synchronizer

package
v8.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Ready represents ready
	Ready = iota
	// StreamingReady ready
	StreamingReady
	// Error represents some error in SSE streaming
	Error
)
View Source
const (
	Streaming = iota
	Polling
)

Operation mode constants

Variables

This section is empty.

Functions

This section is empty.

Types

type Local

type Local struct {
	// contains filtered or unexported fields
}

Local implements Local Synchronizer

func (*Local) LocalKill

func (s *Local) LocalKill(splitName string, defaultTreatment string, changeNumber int64)

LocalKill local kill without any logic for localhost mode

func (*Local) RefreshRates

func (s *Local) RefreshRates() (time.Duration, time.Duration)

RefreshRates returns anything

func (*Local) StartPeriodicDataRecording

func (s *Local) StartPeriodicDataRecording()

StartPeriodicDataRecording starts periodic recorders tasks

func (*Local) StartPeriodicFetching

func (s *Local) StartPeriodicFetching()

StartPeriodicFetching starts periodic fetchers tasks

func (*Local) StopPeriodicDataRecording

func (s *Local) StopPeriodicDataRecording()

StopPeriodicDataRecording stops periodic recorders tasks

func (*Local) StopPeriodicFetching

func (s *Local) StopPeriodicFetching()

StopPeriodicFetching stops periodic fetchers tasks

func (*Local) SyncAll

func (s *Local) SyncAll() error

SyncAll syncs splits and segments

func (*Local) SynchronizeFeatureFlags

func (s *Local) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) error

SynchronizeFeatureFlags no logic attached for localhost mode

func (*Local) SynchronizeLargeSegment

func (s *Local) SynchronizeLargeSegment(name string, till *int64) error

SynchronizeLargeSegment syncs segment

func (*Local) SynchronizeLargeSegmentUpdate

func (s *Local) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) error

SynchronizeLargeSegment syncs segment

func (*Local) SynchronizeSegment

func (s *Local) SynchronizeSegment(name string, till *int64) error

SynchronizeSegment syncs segment

type LocalConfig

type LocalConfig struct {
	SplitPeriod      int
	SegmentPeriod    int
	SegmentWorkers   int
	QueueSize        int
	SegmentDirectory string
	RefreshEnabled   bool
	FlagSets         []string
	FfRulesAccepted  []string
	RbRulesAccepted  []string
}

type Manager

type Manager interface {
	Start()
	Stop()
	IsRunning() bool
	StartBGSync(status chan int, shouldRetry bool, onReady func()) error
}

Manager interface

func NewSynchronizerManagerRedis

func NewSynchronizerManagerRedis(synchronizer Synchronizer, logger logging.LoggerInterface) Manager

NewSynchronizerManagerRedis creates new sync manager for redis

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl struct

func NewSynchronizerManager

func NewSynchronizerManager(
	synchronizer Synchronizer,
	logger logging.LoggerInterface,
	config conf.AdvancedConfig,
	authClient service.AuthClient,
	ffStorage storage.SplitStorage,
	managerStatus chan int,
	runtimeTelemetry storage.TelemetryRuntimeProducer,
	metadata dtos.Metadata,
	clientKey *string,
	hcMonitor hc.MonitorProducerInterface,
) (*ManagerImpl, error)

NewSynchronizerManager creates new sync manager

func (*ManagerImpl) IsRunning

func (s *ManagerImpl) IsRunning() bool

IsRunning returns true if is in Streaming or Polling

func (*ManagerImpl) Start

func (s *ManagerImpl) Start()

Start starts synchronization through Split

func (*ManagerImpl) StartBGSync

func (m *ManagerImpl) StartBGSync(status chan int, shouldRetry bool, onReady func()) error

func (*ManagerImpl) Stop

func (s *ManagerImpl) Stop()

Stop stop synchronizaation through Split

type ManagerRedisImpl

type ManagerRedisImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl struct

func (*ManagerRedisImpl) IsRunning

func (m *ManagerRedisImpl) IsRunning() bool

func (*ManagerRedisImpl) Start

func (m *ManagerRedisImpl) Start()

func (*ManagerRedisImpl) StartBGSync

func (m *ManagerRedisImpl) StartBGSync(status chan int, shouldRetry bool, onReady func()) error

func (*ManagerRedisImpl) Stop

func (m *ManagerRedisImpl) Stop()

type SplitTasks

type SplitTasks struct {
	SplitSyncTask            *asynctask.AsyncTask
	SegmentSyncTask          *asynctask.AsyncTask
	LargeSegmentSyncTask     *asynctask.AsyncTask
	TelemetrySyncTask        tasks.Task
	ImpressionSyncTask       tasks.Task
	EventSyncTask            tasks.Task
	ImpressionsCountSyncTask tasks.Task
	UniqueKeysTask           tasks.Task
	CleanFilterTask          tasks.Task
	ImpsCountConsumerTask    tasks.Task
}

SplitTasks struct for tasks

type Synchronizer

type Synchronizer interface {
	SyncAll() error
	SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) error
	LocalKill(splitName string, defaultTreatment string, changeNumber int64)
	SynchronizeSegment(segmentName string, till *int64) error
	StartPeriodicFetching()
	StopPeriodicFetching()
	StartPeriodicDataRecording()
	StopPeriodicDataRecording()
	RefreshRates() (splits time.Duration, segments time.Duration)
	SynchronizeLargeSegment(name string, till *int64) error
	SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) error
}

Synchronizer interface for syncing data to and from splits servers

func NewLocal

func NewLocal(cfg *LocalConfig, splitAPI *api.SplitAPI, splitStorage storage.SplitStorage, segmentStorage storage.SegmentStorage, largeSegmentStorage storage.LargeSegmentsStorage, ruleBasedStorage storage.RuleBasedSegmentsStorage, logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer, hcMonitor application.MonitorProducerInterface) Synchronizer

NewLocal creates new Local

func NewSynchronizer

func NewSynchronizer(
	confAdvanced conf.AdvancedConfig,
	splitTasks SplitTasks,
	workers Workers,
	logger logging.LoggerInterface,
	inMememoryFullQueue chan string,
) Synchronizer

NewSynchronizer creates new SynchronizerImpl

type SynchronizerImpl

type SynchronizerImpl struct {
	// contains filtered or unexported fields
}

SynchronizerImpl implements Synchronizer

func (*SynchronizerImpl) LocalKill

func (s *SynchronizerImpl) LocalKill(splitName string, defaultTreatment string, changeNumber int64)

LocalKill locally kills a split

func (*SynchronizerImpl) RefreshRates

func (s *SynchronizerImpl) RefreshRates() (splits time.Duration, segments time.Duration)

RefreshRates returns the refresh rates of the splits & segment tasks

func (*SynchronizerImpl) StartPeriodicDataRecording

func (s *SynchronizerImpl) StartPeriodicDataRecording()

StartPeriodicDataRecording starts periodic recorders tasks

func (*SynchronizerImpl) StartPeriodicFetching

func (s *SynchronizerImpl) StartPeriodicFetching()

StartPeriodicFetching starts periodic fetchers tasks

func (*SynchronizerImpl) StopPeriodicDataRecording

func (s *SynchronizerImpl) StopPeriodicDataRecording()

StopPeriodicDataRecording stops periodic recorders tasks

func (*SynchronizerImpl) StopPeriodicFetching

func (s *SynchronizerImpl) StopPeriodicFetching()

StopPeriodicFetching stops periodic fetchers tasks

func (*SynchronizerImpl) SyncAll

func (s *SynchronizerImpl) SyncAll() error

SyncAll syncs splits and segments

func (*SynchronizerImpl) SynchronizeFeatureFlags

func (s *SynchronizerImpl) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) error

SynchronizeFeatureFlags syncs featureFlags

func (*SynchronizerImpl) SynchronizeLargeSegment

func (s *SynchronizerImpl) SynchronizeLargeSegment(name string, till *int64) error

SynchronizeLargeSegment syncs large segment

func (*SynchronizerImpl) SynchronizeLargeSegmentUpdate

func (s *SynchronizerImpl) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) error

func (*SynchronizerImpl) SynchronizeSegment

func (s *SynchronizerImpl) SynchronizeSegment(name string, till *int64) error

SynchronizeSegment syncs segment

type Workers

type Workers struct {
	SplitUpdater             split.Updater
	SegmentUpdater           segment.Updater
	LargeSegmentUpdater      largesegment.Updater
	TelemetryRecorder        telemetry.TelemetrySynchronizer
	ImpressionRecorder       impression.ImpressionRecorder
	EventRecorder            event.EventRecorder
	ImpressionsCountRecorder impressionscount.ImpressionsCountRecorder
}

Workers struct for workers

Directories

Path Synopsis
worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL