buffer

package
v0.52.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Fibonacci

func Fibonacci(n int) int

func SetDefaults

func SetDefaults(flushPeriodMilliseconds int, flushItemsThreshold int)

Types

type BuffStrategy

type BuffStrategy string
const (
	Dynamic BuffStrategy = "DYNAMIC"
	Static  BuffStrategy = "STATIC"
)

type BulkEventWriter added in v0.50.0

type BulkEventWriter struct {
	*TenantBufferManager[*repository.CreateStepRunEventOpts, int]
	// contains filtered or unexported fields
}

func NewBulkEventWriter added in v0.50.0

func NewBulkEventWriter(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkEventWriter, error)

func (*BulkEventWriter) BulkWriteStepRunEvents added in v0.50.0

func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)

func (*BulkEventWriter) Cleanup added in v0.50.0

func (w *BulkEventWriter) Cleanup() error

func (*BulkEventWriter) SerialWriteStepRunEvent added in v0.50.2

func (w *BulkEventWriter) SerialWriteStepRunEvent(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)

type BulkQueueStepRunOpts added in v0.50.0

type BulkQueueStepRunOpts struct {
	*dbsqlc.GetStepRunForEngineRow

	Priority int
	IsRetry  bool
	Input    []byte
}

type BulkSemaphoreReleaser added in v0.50.0

type BulkSemaphoreReleaser struct {
	*TenantBufferManager[SemaphoreReleaseOpts, pgtype.UUID]
	// contains filtered or unexported fields
}

func NewBulkSemaphoreReleaser added in v0.50.0

func NewBulkSemaphoreReleaser(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkSemaphoreReleaser, error)

func (*BulkSemaphoreReleaser) BulkReleaseSemaphores added in v0.50.0

func (w *BulkSemaphoreReleaser) BulkReleaseSemaphores(ctx context.Context, opts []SemaphoreReleaseOpts) ([]pgtype.UUID, error)

func (*BulkSemaphoreReleaser) Cleanup added in v0.50.0

func (w *BulkSemaphoreReleaser) Cleanup() error

type BulkStepRunQueuer added in v0.50.0

type BulkStepRunQueuer struct {
	*TenantBufferManager[BulkQueueStepRunOpts, pgtype.UUID]
	// contains filtered or unexported fields
}

func NewBulkStepRunQueuer added in v0.50.0

func NewBulkStepRunQueuer(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkStepRunQueuer, error)

func (*BulkStepRunQueuer) BulkQueueStepRuns added in v0.50.0

func (w *BulkStepRunQueuer) BulkQueueStepRuns(ctx context.Context, opts []BulkQueueStepRunOpts) ([]pgtype.UUID, error)

func (*BulkStepRunQueuer) Cleanup added in v0.50.0

func (w *BulkStepRunQueuer) Cleanup() error

type ConfigFileBuffer

type ConfigFileBuffer struct {

	// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
	WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"`

	// MaxConcurrent is the maximum number of concurrent flushes
	MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"`

	// FlushPeriodMilliseconds is the number of milliseconds before flush
	FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`

	// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
	FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`

	// SerialBuffer is a flag to determine if the buffer should be serial or bulk
	SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`

	// FlushStrategy is the strategy to use for flushing the buffer
	FlushStrategy BuffStrategy `mapstructure:"flushStrategy" json:"flushStrategy" default:"DYNAMIC"`
}

ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies

type FlushResponse

type FlushResponse[U any] struct {
	Result U
	Err    error
}

type IngestBuf

type IngestBuf[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewIngestBuffer

func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]

NewIngestBuffer creates a new buffer for any type T

func (*IngestBuf[T, U]) BuffItem added in v0.50.0

func (b *IngestBuf[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)

func (*IngestBuf[T, U]) Start

func (b *IngestBuf[T, U]) Start() (func() error, error)

func (*IngestBuf[T, U]) StartDebugLoop

func (b *IngestBuf[T, U]) StartDebugLoop()

type IngestBufOpts

type IngestBufOpts[T any, U any] struct {
	Name string `validate:"required"`
	// MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush
	MaxCapacity        int                                               `validate:"required,gt=0"`
	FlushPeriod        time.Duration                                     `validate:"required,gt=0"`
	MaxDataSizeInQueue int                                               `validate:"required,gt=0"`
	OutputFunc         func(ctx context.Context, items []T) ([]U, error) `validate:"required"`
	SizeFunc           func(T) int                                       `validate:"required"`
	L                  *zerolog.Logger                                   `validate:"required"`
	MaxConcurrent      int                                               `validate:"omitempty,gt=0"`
	WaitForFlush       time.Duration                                     `validate:"omitempty,gt=0"`
	FlushStrategy      BuffStrategy                                      `validate:"required"`
}

type SemaphoreReleaseOpts added in v0.50.0

type SemaphoreReleaseOpts struct {
	StepRunId pgtype.UUID
	TenantId  pgtype.UUID
}

type TenantBufManagerOpts

type TenantBufManagerOpts[T any, U any] struct {
	Name       string                                            `validate:"required"`
	OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"`
	SizeFunc   func(T) int                                       `validate:"required"`
	L          *zerolog.Logger                                   `validate:"required"`
	V          validator.Validator                               `validate:"required"`
	Config     ConfigFileBuffer                                  `validate:"required"`
}

type TenantBufferManager

type TenantBufferManager[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewTenantBufManager

func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)

Create a new TenantBufferManager with generic types T for input and U for output

func (*TenantBufferManager[T, U]) BuffItem added in v0.50.0

func (t *TenantBufferManager[T, U]) BuffItem(tenantKey string, eventOps T) (chan *FlushResponse[U], error)

func (*TenantBufferManager[T, U]) Cleanup

func (t *TenantBufferManager[T, U]) Cleanup() error

cleanup all tenant buffers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL