Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClickHouseStorage ¶
type ClickHouseStorage struct {
// contains filtered or unexported fields
}
func NewClickhouseStorage ¶
func NewClickhouseStorage(config ClickHouseStorageConfig) (*ClickHouseStorage, error)
func (*ClickHouseStorage) BatchInsert ¶
func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error
BatchInsert inserts multiple messages into ClickHouse.
type ClickHouseStorageConfig ¶
func (ClickHouseStorageConfig) Validate ¶
func (c ClickHouseStorageConfig) Validate() error
type MetersByType ¶
type NamespacedMeterCache ¶
type NamespacedMeterCache struct {
// contains filtered or unexported fields
}
func NewNamespaceStore ¶
func NewNamespaceStore(config NamespacedMeterCacheConfig) (*NamespacedMeterCache, error)
func (*NamespacedMeterCache) GetAffectedMeters ¶
func (n *NamespacedMeterCache) GetAffectedMeters(_ context.Context, m *sinkmodels.SinkMessage) ([]*meter.Meter, error)
GetAffectedMeters gets the list of meters that are affected by an event
type NamespacedMeterCacheConfig ¶
type NamespacedMeterCacheConfig struct {
PeriodicRefetchInterval time.Duration
Logger *slog.Logger
MeterService meter.Service
}
func (NamespacedMeterCacheConfig) Validate ¶
func (c NamespacedMeterCacheConfig) Validate() error
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
func NewSink ¶
func NewSink(config SinkConfig) (*Sink, error)
type SinkBuffer ¶
type SinkBuffer struct {
// contains filtered or unexported fields
}
func NewSinkBuffer ¶
func NewSinkBuffer() *SinkBuffer
func (*SinkBuffer) Add ¶
func (b *SinkBuffer) Add(message sinkmodels.SinkMessage)
func (*SinkBuffer) Dequeue ¶
func (b *SinkBuffer) Dequeue() []sinkmodels.SinkMessage
func (*SinkBuffer) RemoveByPartitions ¶
func (b *SinkBuffer) RemoveByPartitions(partitions []kafka.TopicPartition)
RemoveByPartitions removes messages from the buffer by partitions Useful when partitions are revoked.
func (*SinkBuffer) Size ¶
func (b *SinkBuffer) Size() int
type SinkConfig ¶
type SinkConfig struct {
Logger *slog.Logger
Tracer trace.Tracer
MetricMeter metric.Meter
Storage Storage
Deduplicator dedupe.Deduplicator
Consumer *kafka.Consumer
// MinCommitCount is the minimum number of messages to wait before flushing the buffer.
// Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush.
MinCommitCount int
// MaxCommitWait is the maximum time to wait before flushing the buffer
MaxCommitWait time.Duration
// The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
// If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
MaxPollTimeout time.Duration
// NamespaceRefetch is the interval to refetch exsisting namespaces and meters
// this information is used to configure which topics the consumer subscribes and
// the meter configs used in event validation.
NamespaceRefetch time.Duration
// NamespaceRefetchTimeout is the timeout for updating namespaces and consumer subscription.
// It must be less than NamespaceRefetch interval.
NamespaceRefetchTimeout time.Duration
// NamespaceTopicRegexp defines the regular expression to match/validate topic names the sink-worker needs to subscribe to.
NamespaceTopicRegexp string
// FlushEventHandlers is an optional lifecycle hook, allowing to act on successful batch
// flushes. To prevent blocking the main sink logic this is always called in a go routine.
FlushEventHandler flushhandler.FlushEventHandler
// FlushSuccessTimeout is the timeout for the OnFlushSuccess callback,
// after this period the context of the callback will be canceled.
FlushSuccessTimeout time.Duration
// DrainTimeout is the maximum time to wait before draining the buffer and closing the sink.
DrainTimeout time.Duration
TopicResolver topicresolver.Resolver
// MeterRefetchInterval is the interval to refetch meters from the database
MeterRefetchInterval time.Duration
// MeterService is the service to fetch meters from the database
MeterService meter.Service
}
func (*SinkConfig) Validate ¶
func (s *SinkConfig) Validate() error
type Storage ¶
type Storage interface {
BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error
}
Click to show internal directories.
Click to hide internal directories.