command

package
v2.0.0-pre5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: Apache-2.0 Imports: 25 Imported by: 1

Documentation

Index

Constants

View Source
const (
	MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count"
	MetricCommandLimiterWorkTimer          = "command.limiter.work_timer"

	DefaultLimiterSize = 100
	MinLimiterSize     = 10

	DefaultAdaptiveRateLimiterEnabled       = true
	DefaultAdaptiveRateLimiterMinWindowSize = 5
	DefaultAdaptiveRateLimiterMaxWindowSize = 250
	DefaultAdaptiveRateLimiterTimeout       = 30 * time.Second

	DefaultAdaptiveRateLimiterSuccessThreshold      = 0.9
	DefaultAdaptiveRateLimiterIncreaseFactor        = 1.02
	DefaultAdaptiveRateLimiterDecreaseFactor        = 0.9
	DefaultAdaptiveRateLimiterIncreaseCheckInterval = 10
	DefaultAdaptiveRateLimiterDecreaseCheckInterval = 10
)

Variables

This section is empty.

Functions

func NewAdaptiveRateLimitTracker

func NewAdaptiveRateLimitTracker(config AdaptiveRateLimitTrackerConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimitTracker

NewAdaptiveRateLimitTracker creates a new adaptive rate limit tracker using the given configuration. If the configuration has Enabled set to false, a NoOpAdaptiveRateLimitTracker will be returned. Unlike the AdaptiveRateLimiter, the tracker does not execute work directly. Instead it tracks outstanding work and adjusts the window size based on the success rate of completed work.

func NewAdaptiveRateLimiter

func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimiter

NewAdaptiveRateLimiter creates a new adaptive rate limiter using the given configuration. If the configuration has Enabled set to false, a NoOpAdaptiveRateLimiter will be returned

func NewRateLimiter

func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.RateLimiter

NewRateLimiter creates a new rate limiter using the given configuration. If the configuration has Enabled set to false, a NoOpRateLimiter will be returned

func WasRateLimited

func WasRateLimited(err error) bool

WasRateLimited returns true if the given error indicates that a request was rejected due to rate limiting

Types

type AdaptiveRateLimitTrackerConfig

type AdaptiveRateLimitTrackerConfig struct {
	AdaptiveRateLimiterConfig

	// SuccessThreshold - the success rate threshold above which the window size will be increased and
	//                    below which the window size will be decreased
	SuccessThreshold float64

	// IncreaseFactor - the multiplier applied to the current window size when increasing it
	IncreaseFactor float64

	// DecreaseFactor - the multiplier applied to the current window size when decreasing it
	DecreaseFactor float64

	// IncreaseCheckInterval - the number of successes between window size increase checks
	IncreaseCheckInterval int

	// DecreaseCheckInterval - the number of backoffs between window size decrease checks
	DecreaseCheckInterval int
}

AdaptiveRateLimitTrackerConfig contains configuration values used to create a new AdaptiveRateLimitTracker

func (*AdaptiveRateLimitTrackerConfig) Load

func (cfg *AdaptiveRateLimitTrackerConfig) Load(cfgmap map[interface{}]interface{}) error

Load reads the configuration values from the given config map

func (*AdaptiveRateLimitTrackerConfig) SetDefaults

func (self *AdaptiveRateLimitTrackerConfig) SetDefaults()

SetDefaults sets the default values for the AdaptiveRateLimitTrackerConfig

type AdaptiveRateLimiterConfig

type AdaptiveRateLimiterConfig struct {
	// Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting
	Enabled bool

	// MaxSize - the maximum window size to allow
	MaxSize uint32

	// MinSize - the smallest window size to allow
	MinSize uint32

	// WorkTimerMetric - the name of the timer metric for timing how long operations take to execute
	WorkTimerMetric string

	// QueueSize - the name of the gauge metric showing the current number of operations queued
	QueueSizeMetric string

	// WindowSizeMetric - the name of the metric show the current window size
	WindowSizeMetric string

	// Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to
	//           have failed if it hasn't been marked completed yet, so that work slots aren't lost
	Timeout time.Duration
}

AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter

func (*AdaptiveRateLimiterConfig) Load

func (cfg *AdaptiveRateLimiterConfig) Load(cfgmap map[interface{}]interface{}) error

Load reads the configuration values from the given config map

func (*AdaptiveRateLimiterConfig) SetDefaults

func (self *AdaptiveRateLimiterConfig) SetDefaults()

SetDefaults sets the default values for the AdaptiveRateLimiterConfig

type Command

type Command interface {
	// Apply runs the commands
	Apply(ctx boltz.MutateContext) error

	// GetChangeContext returns the change context associated with the command
	GetChangeContext() *change.Context

	// Encode returns a serialized representation of the command
	Encode() ([]byte, error)
}

Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination

type CreateEntityCommand

type CreateEntityCommand[T models.Entity] struct {
	Context        *change.Context
	Creator        EntityCreator[T]
	Entity         T
	PostCreateHook func(ctx boltz.MutateContext, entity T) error
	Flags          uint32
}

func (*CreateEntityCommand[T]) Apply

func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*CreateEntityCommand[T]) Encode

func (self *CreateEntityCommand[T]) Encode() ([]byte, error)

func (*CreateEntityCommand[T]) GetChangeContext

func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context

type Decoder

type Decoder interface {
	Decode(commandType int32, data []byte) (Command, error)
}

Decoder instances know how to decode encoded commands

type DecoderF

type DecoderF func(commandType int32, data []byte) (Command, error)

DecoderF is a function version of the Decoder interface

func (DecoderF) Decode

func (self DecoderF) Decode(commandType int32, data []byte) (Command, error)

type Decoders

type Decoders interface {
	Register(id int32, decoder Decoder)
	RegisterF(id int32, decoder DecoderF)
	Decode(data []byte) (Command, error)
	GetDecoder(id int32) Decoder
	Clear()
}

func GetDefaultDecoders

func GetDefaultDecoders() Decoders

func NewDecoders

func NewDecoders() Decoders

type DefaultRateLimiter

type DefaultRateLimiter struct {
	// contains filtered or unexported fields
}

DefaultRateLimiter implements rate.RateLimiter using a fixed-size buffered channel as a work queue

func (*DefaultRateLimiter) GetQueueFillPct

func (self *DefaultRateLimiter) GetQueueFillPct() float64

func (*DefaultRateLimiter) RunRateLimited

func (self *DefaultRateLimiter) RunRateLimited(f func() error) error

type DeleteEntityCommand

type DeleteEntityCommand struct {
	Context *change.Context
	Deleter EntityDeleter
	Id      string
}

func (*DeleteEntityCommand) Apply

func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error

func (*DeleteEntityCommand) Encode

func (self *DeleteEntityCommand) Encode() ([]byte, error)

func (*DeleteEntityCommand) GetChangeContext

func (self *DeleteEntityCommand) GetChangeContext() *change.Context

type Dispatcher

type Dispatcher interface {
	Dispatch(command Command) error
	IsLeaderOrLeaderless() bool
	IsLeaderless() bool
	IsLeader() bool
	GetPeers() map[string]channel.Channel
	GetRateLimiter() rate.RateLimiter
	Bootstrap() error
	CtrlAddresses() (uint64, []string, []*ctrl_pb.CtrlDetail)
}

Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally

type EntityCreator

type EntityCreator[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyCreate creates the entity described by the given command
	ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityCreator instances can apply a create entity command to create entities of a given type

type EntityDeleter

type EntityDeleter interface {
	GetEntityTypeId() string

	// ApplyDelete deletes the entity described by the given command
	ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error
}

EntityDeleter instances can apply a delete entity command to delete entities of a given type

type EntityManager

type EntityManager[T models.Entity] interface {
	EntityCreator[T]
	EntityUpdater[T]
	EntityDeleter
}

EntityManager instances can handle create, update and delete entities of a specific type

type EntityMarshaller

type EntityMarshaller[T any] interface {
	// GetEntityTypeId returns the entity type id. This is distinct from the Store entity id
	// which may be shared by types, such as service and router. The entity type is unique
	// for each type
	GetEntityTypeId() string

	// Marshall marshals the entity to a bytes encoded format
	Marshall(entity T) ([]byte, error)

	// Unmarshall unmarshalls the bytes back into an entity
	Unmarshall(bytes []byte) (T, error)
}

EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type

type EntityUpdater

type EntityUpdater[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyUpdate updates the entity described by the given command
	ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityUpdater instances can apply an update entity command to update entities of a given type

type LocalDispatcher

type LocalDispatcher struct {
	EncodeDecodeCommands bool
	Limiter              rate.RateLimiter
}

LocalDispatcher should be used when running a non-clustered system

func (*LocalDispatcher) Bootstrap

func (self *LocalDispatcher) Bootstrap() error

func (*LocalDispatcher) CtrlAddresses

func (self *LocalDispatcher) CtrlAddresses() (uint64, []string, []*ctrl_pb.CtrlDetail)

func (*LocalDispatcher) Dispatch

func (self *LocalDispatcher) Dispatch(command Command) error

func (*LocalDispatcher) GetPeers

func (self *LocalDispatcher) GetPeers() map[string]channel.Channel

func (*LocalDispatcher) GetRateLimiter

func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter

func (*LocalDispatcher) IsLeader

func (self *LocalDispatcher) IsLeader() bool

func (*LocalDispatcher) IsLeaderOrLeaderless

func (self *LocalDispatcher) IsLeaderOrLeaderless() bool

func (*LocalDispatcher) IsLeaderless

func (self *LocalDispatcher) IsLeaderless() bool

type NoOpAdaptiveRateLimitTracker

type NoOpAdaptiveRateLimitTracker struct{}

NoOpAdaptiveRateLimitTracker is an adaptive rate limit tracker that doesn't enforce any rate limiting

func (NoOpAdaptiveRateLimitTracker) IsRateLimited

func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool

func (NoOpAdaptiveRateLimitTracker) RunRateLimited

func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF

func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control rate.RateLimitControl) error) error

type NoOpAdaptiveRateLimiter

type NoOpAdaptiveRateLimiter struct{}

NoOpAdaptiveRateLimiter is an adaptive rate limiter that doesn't enforce any rate limiting

func (NoOpAdaptiveRateLimiter) RunRateLimited

func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (rate.RateLimitControl, error)

type NoOpRateLimiter

type NoOpRateLimiter struct{}

NoOpRateLimiter is a rate limiter that doesn't enforce any rate limiting

func (NoOpRateLimiter) GetQueueFillPct

func (self NoOpRateLimiter) GetQueueFillPct() float64

func (NoOpRateLimiter) RunRateLimited

func (self NoOpRateLimiter) RunRateLimited(f func() error) error

type RateLimiterConfig

type RateLimiterConfig struct {
	Enabled   bool
	QueueSize uint32
}

RateLimiterConfig contains configuration values used to create a new DefaultRateLimiter

type SyncSnapshotCommand

type SyncSnapshotCommand struct {
	TimelineId   string
	Snapshot     []byte
	SnapshotSink func(cmd *SyncSnapshotCommand, index uint64) error
}

func (*SyncSnapshotCommand) Apply

func (self *SyncSnapshotCommand) Apply(ctx boltz.MutateContext) error

func (*SyncSnapshotCommand) Encode

func (self *SyncSnapshotCommand) Encode() ([]byte, error)

func (*SyncSnapshotCommand) GetChangeContext

func (self *SyncSnapshotCommand) GetChangeContext() *change.Context

type UpdateEntityCommand

type UpdateEntityCommand[T models.Entity] struct {
	Context       *change.Context
	Updater       EntityUpdater[T]
	Entity        T
	UpdatedFields fields.UpdatedFields
	Flags         uint32
}

func (*UpdateEntityCommand[T]) Apply

func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*UpdateEntityCommand[T]) Encode

func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)

func (*UpdateEntityCommand[T]) GetChangeContext

func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context

type Validatable

type Validatable interface {
	Validate() error
}

Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL