command

package
v2.0.0-pre2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: Apache-2.0 Imports: 23 Imported by: 1

Documentation

Index

Constants

View Source
const (
	MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count"
	MetricCommandLimiterWorkTimer          = "command.limiter.work_timer"

	DefaultLimiterSize = 100
	MinLimiterSize     = 10

	DefaultAdaptiveRateLimiterEnabled       = true
	DefaultAdaptiveRateLimiterMinWindowSize = 5
	DefaultAdaptiveRateLimiterMaxWindowSize = 250
	DefaultAdaptiveRateLimiterTimeout       = 30 * time.Second
)

Variables

This section is empty.

Functions

func LoadAdaptiveRateLimiterConfig

func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[interface{}]interface{}) error

func NewAdaptiveRateLimitTracker

func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimitTracker

func NewAdaptiveRateLimiter

func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimiter

func NewRateLimiter

func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.RateLimiter

func WasRateLimited

func WasRateLimited(err error) bool

Types

type AdaptiveRateLimiterConfig

type AdaptiveRateLimiterConfig struct {
	// Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting
	Enabled bool

	// MaxSize - the maximum window size to allow
	MaxSize uint32

	// MinSize - the smallest window size to allow
	MinSize uint32

	// WorkTimerMetric - the name of the timer metric for timing how long operations take to execute
	WorkTimerMetric string

	// QueueSize - the name of the gauge metric showing the current number of operations queued
	QueueSizeMetric string

	// WindowSizeMetric - the name of the metric show the current window size
	WindowSizeMetric string

	// Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to
	//           have failed if it hasn't been marked completed yet, so that work slots aren't lost
	Timeout time.Duration
}

AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter

func (*AdaptiveRateLimiterConfig) SetDefaults

func (self *AdaptiveRateLimiterConfig) SetDefaults()

type Command

type Command interface {
	// Apply runs the commands
	Apply(ctx boltz.MutateContext) error

	// GetChangeContext returns the change context associated with the command
	GetChangeContext() *change.Context

	// Encode returns a serialized representation of the command
	Encode() ([]byte, error)
}

Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination

type CreateEntityCommand

type CreateEntityCommand[T models.Entity] struct {
	Context        *change.Context
	Creator        EntityCreator[T]
	Entity         T
	PostCreateHook func(ctx boltz.MutateContext, entity T) error
	Flags          uint32
}

func (*CreateEntityCommand[T]) Apply

func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*CreateEntityCommand[T]) Encode

func (self *CreateEntityCommand[T]) Encode() ([]byte, error)

func (*CreateEntityCommand[T]) GetChangeContext

func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context

type Decoder

type Decoder interface {
	Decode(commandType int32, data []byte) (Command, error)
}

Decoder instances know how to decode encoded commands

type DecoderF

type DecoderF func(commandType int32, data []byte) (Command, error)

DecoderF is a function version of the Decoder interface

func (DecoderF) Decode

func (self DecoderF) Decode(commandType int32, data []byte) (Command, error)

type Decoders

type Decoders interface {
	Register(id int32, decoder Decoder)
	RegisterF(id int32, decoder DecoderF)
	Decode(data []byte) (Command, error)
	GetDecoder(id int32) Decoder
	Clear()
}

func GetDefaultDecoders

func GetDefaultDecoders() Decoders

func NewDecoders

func NewDecoders() Decoders

type DefaultRateLimiter

type DefaultRateLimiter struct {
	// contains filtered or unexported fields
}

func (*DefaultRateLimiter) GetQueueFillPct

func (self *DefaultRateLimiter) GetQueueFillPct() float64

func (*DefaultRateLimiter) RunRateLimited

func (self *DefaultRateLimiter) RunRateLimited(f func() error) error

type DeleteEntityCommand

type DeleteEntityCommand struct {
	Context *change.Context
	Deleter EntityDeleter
	Id      string
}

func (*DeleteEntityCommand) Apply

func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error

func (*DeleteEntityCommand) Encode

func (self *DeleteEntityCommand) Encode() ([]byte, error)

func (*DeleteEntityCommand) GetChangeContext

func (self *DeleteEntityCommand) GetChangeContext() *change.Context

type Dispatcher

type Dispatcher interface {
	Dispatch(command Command) error
	IsLeaderOrLeaderless() bool
	IsLeaderless() bool
	IsLeader() bool
	GetPeers() map[string]channel.Channel
	GetRateLimiter() rate.RateLimiter
	Bootstrap() error
	CtrlAddresses() (uint64, []string)
}

Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally

type EntityCreator

type EntityCreator[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyCreate creates the entity described by the given command
	ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityCreator instances can apply a create entity command to create entities of a given type

type EntityDeleter

type EntityDeleter interface {
	GetEntityTypeId() string

	// ApplyDelete deletes the entity described by the given command
	ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error
}

EntityDeleter instances can apply a delete entity command to delete entities of a given type

type EntityManager

type EntityManager[T models.Entity] interface {
	EntityCreator[T]
	EntityUpdater[T]
	EntityDeleter
}

EntityManager instances can handle create, update and delete entities of a specific type

type EntityMarshaller

type EntityMarshaller[T any] interface {
	// GetEntityTypeId returns the entity type id. This is distinct from the Store entity id
	// which may be shared by types, such as service and router. The entity type is unique
	// for each type
	GetEntityTypeId() string

	// Marshall marshals the entity to a bytes encoded format
	Marshall(entity T) ([]byte, error)

	// Unmarshall unmarshalls the bytes back into an entity
	Unmarshall(bytes []byte) (T, error)
}

EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type

type EntityUpdater

type EntityUpdater[T models.Entity] interface {
	EntityMarshaller[T]

	// ApplyUpdate updates the entity described by the given command
	ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error
}

EntityUpdater instances can apply an update entity command to update entities of a given type

type LocalDispatcher

type LocalDispatcher struct {
	EncodeDecodeCommands bool
	Limiter              rate.RateLimiter
}

LocalDispatcher should be used when running a non-clustered system

func (*LocalDispatcher) Bootstrap

func (self *LocalDispatcher) Bootstrap() error

func (*LocalDispatcher) CtrlAddresses

func (self *LocalDispatcher) CtrlAddresses() (uint64, []string)

func (*LocalDispatcher) Dispatch

func (self *LocalDispatcher) Dispatch(command Command) error

func (*LocalDispatcher) GetPeers

func (self *LocalDispatcher) GetPeers() map[string]channel.Channel

func (*LocalDispatcher) GetRateLimiter

func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter

func (*LocalDispatcher) IsLeader

func (self *LocalDispatcher) IsLeader() bool

func (*LocalDispatcher) IsLeaderOrLeaderless

func (self *LocalDispatcher) IsLeaderOrLeaderless() bool

func (*LocalDispatcher) IsLeaderless

func (self *LocalDispatcher) IsLeaderless() bool

type NoOpAdaptiveRateLimitTracker

type NoOpAdaptiveRateLimitTracker struct{}

func (NoOpAdaptiveRateLimitTracker) IsRateLimited

func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool

func (NoOpAdaptiveRateLimitTracker) RunRateLimited

func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF

func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control rate.RateLimitControl) error) error

type NoOpAdaptiveRateLimiter

type NoOpAdaptiveRateLimiter struct{}

func (NoOpAdaptiveRateLimiter) RunRateLimited

func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (rate.RateLimitControl, error)

type NoOpRateLimiter

type NoOpRateLimiter struct{}

func (NoOpRateLimiter) GetQueueFillPct

func (self NoOpRateLimiter) GetQueueFillPct() float64

func (NoOpRateLimiter) RunRateLimited

func (self NoOpRateLimiter) RunRateLimited(f func() error) error

type RateLimiterConfig

type RateLimiterConfig struct {
	Enabled   bool
	QueueSize uint32
}

type SyncSnapshotCommand

type SyncSnapshotCommand struct {
	TimelineId   string
	Snapshot     []byte
	SnapshotSink func(cmd *SyncSnapshotCommand, index uint64) error
}

func (*SyncSnapshotCommand) Apply

func (self *SyncSnapshotCommand) Apply(ctx boltz.MutateContext) error

func (*SyncSnapshotCommand) Encode

func (self *SyncSnapshotCommand) Encode() ([]byte, error)

func (*SyncSnapshotCommand) GetChangeContext

func (self *SyncSnapshotCommand) GetChangeContext() *change.Context

type UpdateEntityCommand

type UpdateEntityCommand[T models.Entity] struct {
	Context       *change.Context
	Updater       EntityUpdater[T]
	Entity        T
	UpdatedFields fields.UpdatedFields
	Flags         uint32
}

func (*UpdateEntityCommand[T]) Apply

func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error

func (*UpdateEntityCommand[T]) Encode

func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)

func (*UpdateEntityCommand[T]) GetChangeContext

func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context

type Validatable

type Validatable interface {
	Validate() error
}

Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL