Documentation
      ¶
    
    
  
    
  
    Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetDefaults ¶
Types ¶
type BuffStrategy ¶
type BuffStrategy string
const ( Dynamic BuffStrategy = "DYNAMIC" Static BuffStrategy = "STATIC" )
type ConfigFileBuffer ¶
type ConfigFileBuffer struct {
	// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
	WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"`
	// MaxConcurrent is the maximum number of concurrent flushes
	MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"`
	// FlushPeriodMilliseconds is the number of milliseconds before flush
	FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`
	// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
	FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`
	// SerialBuffer is a flag to determine if the buffer should be serial or bulk
	SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`
	// FlushStrategy is the strategy to use for flushing the buffer
	FlushStrategy BuffStrategy `mapstructure:"flushStrategy" json:"flushStrategy" default:"DYNAMIC"`
}
    ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies
type FlushResponse ¶
type IngestBuf ¶
func NewIngestBuffer ¶
func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]
NewIngestBuffer creates a new buffer for any type T
func (*IngestBuf[T, U]) FireAndWait ¶
func (*IngestBuf[T, U]) FireForget ¶
func (*IngestBuf[T, U]) StartDebugLoop ¶
func (b *IngestBuf[T, U]) StartDebugLoop()
type IngestBufOpts ¶
type IngestBufOpts[T any, U any] struct { Name string `validate:"required"` // MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush MaxCapacity int `validate:"required,gt=0"` FlushPeriod time.Duration `validate:"required,gt=0"` MaxDataSizeInQueue int `validate:"required,gt=0"` OutputFunc func(ctx context.Context, items []T) ([]*U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` MaxConcurrent int `validate:"omitempty,gt=0"` WaitForFlush time.Duration `validate:"omitempty,gt=0"` FlushStrategy BuffStrategy `validate:"required"` }
type TenantBufManagerOpts ¶
type TenantBufManagerOpts[T any, U any] struct { Name string `validate:"required"` OutputFunc func(ctx context.Context, items []T) ([]*U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` V validator.Validator `validate:"required"` Config ConfigFileBuffer `validate:"required"` }
type TenantBufferManager ¶
func NewTenantBufManager ¶
func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)
Create a new TenantBufferManager with generic types T for input and U for output
func (*TenantBufferManager[T, U]) Cleanup ¶
func (t *TenantBufferManager[T, U]) Cleanup() error
cleanup all tenant buffers
func (*TenantBufferManager[T, U]) FireAndWait ¶
func (t *TenantBufferManager[T, U]) FireAndWait(ctx context.Context, tenantKey string, item T) (*U, error)
func (*TenantBufferManager[T, U]) FireForget ¶
func (t *TenantBufferManager[T, U]) FireForget(tenantKey string, item T) error
 Click to show internal directories. 
   Click to hide internal directories.