Documentation
¶
Index ¶
- Constants
- func NewAdaptiveRateLimitTracker(config AdaptiveRateLimitTrackerConfig, registry metrics.Registry, ...) rate.AdaptiveRateLimitTracker
- func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, ...) rate.AdaptiveRateLimiter
- func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, ...) rate.RateLimiter
- func WasRateLimited(err error) bool
- type AdaptiveRateLimitTrackerConfig
- type AdaptiveRateLimiterConfig
- type Command
- type CreateEntityCommand
- type Decoder
- type DecoderF
- type Decoders
- type DefaultRateLimiter
- type DeleteEntityCommand
- type Dispatcher
- type EntityCreator
- type EntityDeleter
- type EntityManager
- type EntityMarshaller
- type EntityUpdater
- type LocalDispatcher
- func (self *LocalDispatcher) Bootstrap() error
- func (self *LocalDispatcher) CtrlAddresses() (uint64, []string, []*ctrl_pb.CtrlDetail)
- func (self *LocalDispatcher) Dispatch(command Command) error
- func (self *LocalDispatcher) GetPeers() map[string]channel.Channel
- func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter
- func (self *LocalDispatcher) IsLeader() bool
- func (self *LocalDispatcher) IsLeaderOrLeaderless() bool
- func (self *LocalDispatcher) IsLeaderless() bool
- type NoOpAdaptiveRateLimitTracker
- type NoOpAdaptiveRateLimiter
- type NoOpRateLimiter
- type RateLimiterConfig
- type SyncSnapshotCommand
- type UpdateEntityCommand
- type Validatable
Constants ¶
const ( MetricCommandLimiterCurrentQueuedCount = "command.limiter.queued_count" MetricCommandLimiterWorkTimer = "command.limiter.work_timer" DefaultLimiterSize = 100 MinLimiterSize = 10 DefaultAdaptiveRateLimiterEnabled = true DefaultAdaptiveRateLimiterMinWindowSize = 5 DefaultAdaptiveRateLimiterMaxWindowSize = 250 DefaultAdaptiveRateLimiterTimeout = 30 * time.Second DefaultAdaptiveRateLimiterSuccessThreshold = 0.9 DefaultAdaptiveRateLimiterIncreaseFactor = 1.02 DefaultAdaptiveRateLimiterDecreaseFactor = 0.9 DefaultAdaptiveRateLimiterIncreaseCheckInterval = 10 DefaultAdaptiveRateLimiterDecreaseCheckInterval = 10 )
Variables ¶
This section is empty.
Functions ¶
func NewAdaptiveRateLimitTracker ¶
func NewAdaptiveRateLimitTracker(config AdaptiveRateLimitTrackerConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimitTracker
NewAdaptiveRateLimitTracker creates a new adaptive rate limit tracker using the given configuration. If the configuration has Enabled set to false, a NoOpAdaptiveRateLimitTracker will be returned. Unlike the AdaptiveRateLimiter, the tracker does not execute work directly. Instead it tracks outstanding work and adjusts the window size based on the success rate of completed work.
func NewAdaptiveRateLimiter ¶
func NewAdaptiveRateLimiter(config AdaptiveRateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.AdaptiveRateLimiter
NewAdaptiveRateLimiter creates a new adaptive rate limiter using the given configuration. If the configuration has Enabled set to false, a NoOpAdaptiveRateLimiter will be returned
func NewRateLimiter ¶
func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNotify <-chan struct{}) rate.RateLimiter
NewRateLimiter creates a new rate limiter using the given configuration. If the configuration has Enabled set to false, a NoOpRateLimiter will be returned
func WasRateLimited ¶
WasRateLimited returns true if the given error indicates that a request was rejected due to rate limiting
Types ¶
type AdaptiveRateLimitTrackerConfig ¶
type AdaptiveRateLimitTrackerConfig struct {
AdaptiveRateLimiterConfig
// SuccessThreshold - the success rate threshold above which the window size will be increased and
// below which the window size will be decreased
SuccessThreshold float64
// IncreaseFactor - the multiplier applied to the current window size when increasing it
IncreaseFactor float64
// DecreaseFactor - the multiplier applied to the current window size when decreasing it
DecreaseFactor float64
// IncreaseCheckInterval - the number of successes between window size increase checks
IncreaseCheckInterval int
// DecreaseCheckInterval - the number of backoffs between window size decrease checks
DecreaseCheckInterval int
}
AdaptiveRateLimitTrackerConfig contains configuration values used to create a new AdaptiveRateLimitTracker
func (*AdaptiveRateLimitTrackerConfig) Load ¶
func (cfg *AdaptiveRateLimitTrackerConfig) Load(cfgmap map[interface{}]interface{}) error
Load reads the configuration values from the given config map
func (*AdaptiveRateLimitTrackerConfig) SetDefaults ¶
func (self *AdaptiveRateLimitTrackerConfig) SetDefaults()
SetDefaults sets the default values for the AdaptiveRateLimitTrackerConfig
type AdaptiveRateLimiterConfig ¶
type AdaptiveRateLimiterConfig struct {
// Enabled - if false, a no-op rate limiter will be created, which doesn't enforce any rate limiting
Enabled bool
// MaxSize - the maximum window size to allow
MaxSize uint32
// MinSize - the smallest window size to allow
MinSize uint32
// WorkTimerMetric - the name of the timer metric for timing how long operations take to execute
WorkTimerMetric string
// QueueSize - the name of the gauge metric showing the current number of operations queued
QueueSizeMetric string
// WindowSizeMetric - the name of the metric show the current window size
WindowSizeMetric string
// Timeout - only used for AdaptiveRateLimitTracker, sets when a piece of outstanding work will be assumed to
// have failed if it hasn't been marked completed yet, so that work slots aren't lost
Timeout time.Duration
}
AdaptiveRateLimiterConfig contains configuration values used to create a new AdaptiveRateLimiter
func (*AdaptiveRateLimiterConfig) Load ¶
func (cfg *AdaptiveRateLimiterConfig) Load(cfgmap map[interface{}]interface{}) error
Load reads the configuration values from the given config map
func (*AdaptiveRateLimiterConfig) SetDefaults ¶
func (self *AdaptiveRateLimiterConfig) SetDefaults()
SetDefaults sets the default values for the AdaptiveRateLimiterConfig
type Command ¶
type Command interface {
// Apply runs the commands
Apply(ctx boltz.MutateContext) error
// GetChangeContext returns the change context associated with the command
GetChangeContext() *change.Context
// Encode returns a serialized representation of the command
Encode() ([]byte, error)
}
Command instances represent actions to be taken by the fabric controller. They are serializable, so they can be shipped from one controller for RAFT coordination
type CreateEntityCommand ¶
type CreateEntityCommand[T models.Entity] struct { Context *change.Context Creator EntityCreator[T] Entity T PostCreateHook func(ctx boltz.MutateContext, entity T) error Flags uint32 }
func (*CreateEntityCommand[T]) Apply ¶
func (self *CreateEntityCommand[T]) Apply(ctx boltz.MutateContext) error
func (*CreateEntityCommand[T]) Encode ¶
func (self *CreateEntityCommand[T]) Encode() ([]byte, error)
func (*CreateEntityCommand[T]) GetChangeContext ¶
func (self *CreateEntityCommand[T]) GetChangeContext() *change.Context
type Decoders ¶
type Decoders interface {
Register(id int32, decoder Decoder)
RegisterF(id int32, decoder DecoderF)
Decode(data []byte) (Command, error)
GetDecoder(id int32) Decoder
Clear()
}
func GetDefaultDecoders ¶
func GetDefaultDecoders() Decoders
func NewDecoders ¶
func NewDecoders() Decoders
type DefaultRateLimiter ¶
type DefaultRateLimiter struct {
// contains filtered or unexported fields
}
DefaultRateLimiter implements rate.RateLimiter using a fixed-size buffered channel as a work queue
func (*DefaultRateLimiter) GetQueueFillPct ¶
func (self *DefaultRateLimiter) GetQueueFillPct() float64
func (*DefaultRateLimiter) RunRateLimited ¶
func (self *DefaultRateLimiter) RunRateLimited(f func() error) error
type DeleteEntityCommand ¶
type DeleteEntityCommand struct {
Context *change.Context
Deleter EntityDeleter
Id string
}
func (*DeleteEntityCommand) Apply ¶
func (self *DeleteEntityCommand) Apply(ctx boltz.MutateContext) error
func (*DeleteEntityCommand) Encode ¶
func (self *DeleteEntityCommand) Encode() ([]byte, error)
func (*DeleteEntityCommand) GetChangeContext ¶
func (self *DeleteEntityCommand) GetChangeContext() *change.Context
type Dispatcher ¶
type Dispatcher interface {
Dispatch(command Command) error
IsLeaderOrLeaderless() bool
IsLeaderless() bool
IsLeader() bool
GetPeers() map[string]channel.Channel
GetRateLimiter() rate.RateLimiter
Bootstrap() error
CtrlAddresses() (uint64, []string, []*ctrl_pb.CtrlDetail)
}
Dispatcher instances will take a command and either send it to the leader to be applied, or if the current system is the leader, apply it locally
type EntityCreator ¶
type EntityCreator[T models.Entity] interface { EntityMarshaller[T] // ApplyCreate creates the entity described by the given command ApplyCreate(cmd *CreateEntityCommand[T], ctx boltz.MutateContext) error }
EntityCreator instances can apply a create entity command to create entities of a given type
type EntityDeleter ¶
type EntityDeleter interface {
GetEntityTypeId() string
// ApplyDelete deletes the entity described by the given command
ApplyDelete(cmd *DeleteEntityCommand, ctx boltz.MutateContext) error
}
EntityDeleter instances can apply a delete entity command to delete entities of a given type
type EntityManager ¶
type EntityManager[T models.Entity] interface { EntityCreator[T] EntityUpdater[T] EntityDeleter }
EntityManager instances can handle create, update and delete entities of a specific type
type EntityMarshaller ¶
type EntityMarshaller[T any] interface { // GetEntityTypeId returns the entity type id. This is distinct from the Store entity id // which may be shared by types, such as service and router. The entity type is unique // for each type GetEntityTypeId() string // Marshall marshals the entity to a bytes encoded format Marshall(entity T) ([]byte, error) // Unmarshall unmarshalls the bytes back into an entity Unmarshall(bytes []byte) (T, error) }
EntityMarshaller instances can marshal and unmarshal entities of the type that they manage as well as knowing their entity type
type EntityUpdater ¶
type EntityUpdater[T models.Entity] interface { EntityMarshaller[T] // ApplyUpdate updates the entity described by the given command ApplyUpdate(cmd *UpdateEntityCommand[T], ctx boltz.MutateContext) error }
EntityUpdater instances can apply an update entity command to update entities of a given type
type LocalDispatcher ¶
type LocalDispatcher struct {
EncodeDecodeCommands bool
Limiter rate.RateLimiter
}
LocalDispatcher should be used when running a non-clustered system
func (*LocalDispatcher) Bootstrap ¶
func (self *LocalDispatcher) Bootstrap() error
func (*LocalDispatcher) CtrlAddresses ¶
func (self *LocalDispatcher) CtrlAddresses() (uint64, []string, []*ctrl_pb.CtrlDetail)
func (*LocalDispatcher) Dispatch ¶
func (self *LocalDispatcher) Dispatch(command Command) error
func (*LocalDispatcher) GetPeers ¶
func (self *LocalDispatcher) GetPeers() map[string]channel.Channel
func (*LocalDispatcher) GetRateLimiter ¶
func (self *LocalDispatcher) GetRateLimiter() rate.RateLimiter
func (*LocalDispatcher) IsLeader ¶
func (self *LocalDispatcher) IsLeader() bool
func (*LocalDispatcher) IsLeaderOrLeaderless ¶
func (self *LocalDispatcher) IsLeaderOrLeaderless() bool
func (*LocalDispatcher) IsLeaderless ¶
func (self *LocalDispatcher) IsLeaderless() bool
type NoOpAdaptiveRateLimitTracker ¶
type NoOpAdaptiveRateLimitTracker struct{}
NoOpAdaptiveRateLimitTracker is an adaptive rate limit tracker that doesn't enforce any rate limiting
func (NoOpAdaptiveRateLimitTracker) IsRateLimited ¶
func (n NoOpAdaptiveRateLimitTracker) IsRateLimited() bool
func (NoOpAdaptiveRateLimitTracker) RunRateLimited ¶
func (n NoOpAdaptiveRateLimitTracker) RunRateLimited(string) (rate.RateLimitControl, error)
func (NoOpAdaptiveRateLimitTracker) RunRateLimitedF ¶
func (n NoOpAdaptiveRateLimitTracker) RunRateLimitedF(_ string, f func(control rate.RateLimitControl) error) error
type NoOpAdaptiveRateLimiter ¶
type NoOpAdaptiveRateLimiter struct{}
NoOpAdaptiveRateLimiter is an adaptive rate limiter that doesn't enforce any rate limiting
func (NoOpAdaptiveRateLimiter) RunRateLimited ¶
func (self NoOpAdaptiveRateLimiter) RunRateLimited(f func() error) (rate.RateLimitControl, error)
type NoOpRateLimiter ¶
type NoOpRateLimiter struct{}
NoOpRateLimiter is a rate limiter that doesn't enforce any rate limiting
func (NoOpRateLimiter) GetQueueFillPct ¶
func (self NoOpRateLimiter) GetQueueFillPct() float64
func (NoOpRateLimiter) RunRateLimited ¶
func (self NoOpRateLimiter) RunRateLimited(f func() error) error
type RateLimiterConfig ¶
RateLimiterConfig contains configuration values used to create a new DefaultRateLimiter
type SyncSnapshotCommand ¶
type SyncSnapshotCommand struct {
TimelineId string
Snapshot []byte
SnapshotSink func(cmd *SyncSnapshotCommand, index uint64) error
}
func (*SyncSnapshotCommand) Apply ¶
func (self *SyncSnapshotCommand) Apply(ctx boltz.MutateContext) error
func (*SyncSnapshotCommand) Encode ¶
func (self *SyncSnapshotCommand) Encode() ([]byte, error)
func (*SyncSnapshotCommand) GetChangeContext ¶
func (self *SyncSnapshotCommand) GetChangeContext() *change.Context
type UpdateEntityCommand ¶
type UpdateEntityCommand[T models.Entity] struct { Context *change.Context Updater EntityUpdater[T] Entity T UpdatedFields fields.UpdatedFields Flags uint32 }
func (*UpdateEntityCommand[T]) Apply ¶
func (self *UpdateEntityCommand[T]) Apply(ctx boltz.MutateContext) error
func (*UpdateEntityCommand[T]) Encode ¶
func (self *UpdateEntityCommand[T]) Encode() ([]byte, error)
func (*UpdateEntityCommand[T]) GetChangeContext ¶
func (self *UpdateEntityCommand[T]) GetChangeContext() *change.Context
type Validatable ¶
type Validatable interface {
Validate() error
}
Validatable instances can be validated. Command instances which implement Validable will be validated before Command.Apply is called