Documentation
¶
Index ¶
- func Fibonacci(n int) int
- func SetDefaults(flushPeriodMilliseconds int, flushItemsThreshold int)
- type BuffStrategy
- type BulkEventWriter
- type BulkQueueStepRunOpts
- type BulkSemaphoreReleaser
- type BulkStepRunQueuer
- type ConfigFileBuffer
- type FlushResponse
- type IngestBuf
- type IngestBufOpts
- type SemaphoreReleaseOpts
- type TenantBufManagerOpts
- type TenantBufferManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetDefaults ¶
Types ¶
type BuffStrategy ¶
type BuffStrategy string
const ( Dynamic BuffStrategy = "DYNAMIC" Static BuffStrategy = "STATIC" )
type BulkEventWriter ¶ added in v0.50.0
type BulkEventWriter struct {
*TenantBufferManager[*repository.CreateStepRunEventOpts, int]
// contains filtered or unexported fields
}
func NewBulkEventWriter ¶ added in v0.50.0
func NewBulkEventWriter(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkEventWriter, error)
func (*BulkEventWriter) BulkWriteStepRunEvents ¶ added in v0.50.0
func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)
func (*BulkEventWriter) Cleanup ¶ added in v0.50.0
func (w *BulkEventWriter) Cleanup() error
func (*BulkEventWriter) SerialWriteStepRunEvent ¶ added in v0.50.2
func (w *BulkEventWriter) SerialWriteStepRunEvent(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)
type BulkQueueStepRunOpts ¶ added in v0.50.0
type BulkQueueStepRunOpts struct {
*dbsqlc.GetStepRunForEngineRow
Priority int
IsRetry bool
Input []byte
}
type BulkSemaphoreReleaser ¶ added in v0.50.0
type BulkSemaphoreReleaser struct {
*TenantBufferManager[SemaphoreReleaseOpts, pgtype.UUID]
// contains filtered or unexported fields
}
func NewBulkSemaphoreReleaser ¶ added in v0.50.0
func NewBulkSemaphoreReleaser(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkSemaphoreReleaser, error)
func (*BulkSemaphoreReleaser) BulkReleaseSemaphores ¶ added in v0.50.0
func (w *BulkSemaphoreReleaser) BulkReleaseSemaphores(ctx context.Context, opts []SemaphoreReleaseOpts) ([]pgtype.UUID, error)
func (*BulkSemaphoreReleaser) Cleanup ¶ added in v0.50.0
func (w *BulkSemaphoreReleaser) Cleanup() error
type BulkStepRunQueuer ¶ added in v0.50.0
type BulkStepRunQueuer struct {
*TenantBufferManager[BulkQueueStepRunOpts, pgtype.UUID]
// contains filtered or unexported fields
}
func NewBulkStepRunQueuer ¶ added in v0.50.0
func NewBulkStepRunQueuer(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkStepRunQueuer, error)
func (*BulkStepRunQueuer) BulkQueueStepRuns ¶ added in v0.50.0
func (w *BulkStepRunQueuer) BulkQueueStepRuns(ctx context.Context, opts []BulkQueueStepRunOpts) ([]pgtype.UUID, error)
func (*BulkStepRunQueuer) Cleanup ¶ added in v0.50.0
func (w *BulkStepRunQueuer) Cleanup() error
type ConfigFileBuffer ¶
type ConfigFileBuffer struct {
// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"`
// MaxConcurrent is the maximum number of concurrent flushes
MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"`
// FlushPeriodMilliseconds is the number of milliseconds before flush
FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`
// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`
// SerialBuffer is a flag to determine if the buffer should be serial or bulk
SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`
// FlushStrategy is the strategy to use for flushing the buffer
FlushStrategy BuffStrategy `mapstructure:"flushStrategy" json:"flushStrategy" default:"DYNAMIC"`
}
ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies
type FlushResponse ¶
type IngestBuf ¶
func NewIngestBuffer ¶
func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]
NewIngestBuffer creates a new buffer for any type T
func (*IngestBuf[T, U]) BuffItem ¶ added in v0.50.0
func (b *IngestBuf[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)
func (*IngestBuf[T, U]) StartDebugLoop ¶
func (b *IngestBuf[T, U]) StartDebugLoop()
type IngestBufOpts ¶
type IngestBufOpts[T any, U any] struct { Name string `validate:"required"` // MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush MaxCapacity int `validate:"required,gt=0"` FlushPeriod time.Duration `validate:"required,gt=0"` MaxDataSizeInQueue int `validate:"required,gt=0"` OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` MaxConcurrent int `validate:"omitempty,gt=0"` WaitForFlush time.Duration `validate:"omitempty,gt=0"` FlushStrategy BuffStrategy `validate:"required"` }
type SemaphoreReleaseOpts ¶ added in v0.50.0
type TenantBufManagerOpts ¶
type TenantBufManagerOpts[T any, U any] struct { Name string `validate:"required"` OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` V validator.Validator `validate:"required"` Config ConfigFileBuffer `validate:"required"` }
type TenantBufferManager ¶
func NewTenantBufManager ¶
func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)
Create a new TenantBufferManager with generic types T for input and U for output
func (*TenantBufferManager[T, U]) BuffItem ¶ added in v0.50.0
func (t *TenantBufferManager[T, U]) BuffItem(tenantKey string, eventOps T) (chan *FlushResponse[U], error)
func (*TenantBufferManager[T, U]) Cleanup ¶
func (t *TenantBufferManager[T, U]) Cleanup() error
cleanup all tenant buffers
Click to show internal directories.
Click to hide internal directories.