Documentation
¶
Index ¶
- type ClickHouseStorage
- type ClickHouseStorageConfig
- type InsertEventsQuery
- type InsertInvalidQuery
- type MeterStore
- type NamespaceStore
- type Offset
- type OffsetStore
- type PartitionOffsets
- type ProcessingControl
- type ProcessingError
- type Sink
- type SinkBuffer
- type SinkConfig
- type SinkMessage
- type Storage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClickHouseStorage ¶
type ClickHouseStorage struct {
// contains filtered or unexported fields
}
func NewClickhouseStorage ¶
func NewClickhouseStorage(config ClickHouseStorageConfig) *ClickHouseStorage
func (*ClickHouseStorage) BatchInsert ¶
func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []SinkMessage) error
func (*ClickHouseStorage) BatchInsertInvalid ¶
func (c *ClickHouseStorage) BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error
type ClickHouseStorageConfig ¶
type ClickHouseStorageConfig struct {
ClickHouse clickhouse.Conn
Database string
}
type InsertEventsQuery ¶
type InsertEventsQuery struct {
Database string
Messages []SinkMessage
}
func (InsertEventsQuery) ToSQL ¶
func (q InsertEventsQuery) ToSQL() (string, []interface{}, error)
type InsertInvalidQuery ¶
type InsertInvalidQuery struct {
Database string
Messages []SinkMessage
}
func (InsertInvalidQuery) ToSQL ¶
func (q InsertInvalidQuery) ToSQL() (string, []interface{}, error)
type MeterStore ¶
type NamespaceStore ¶
type NamespaceStore struct {
// contains filtered or unexported fields
}
func NewNamespaceStore ¶
func NewNamespaceStore() *NamespaceStore
func (*NamespaceStore) AddMeter ¶
func (n *NamespaceStore) AddMeter(meter models.Meter)
func (*NamespaceStore) ValidateEvent ¶
func (a *NamespaceStore) ValidateEvent(ctx context.Context, event serializer.CloudEventsKafkaPayload, namespace string) error
ValidateEvent validates a single event by matching it with the corresponding meter if any
type OffsetStore ¶
type OffsetStore struct {
// contains filtered or unexported fields
}
OffsetStore helps to determinate the next offset to commit
func NewOffsetStore ¶
func NewOffsetStore() *OffsetStore
func (*OffsetStore) Add ¶
func (o *OffsetStore) Add(topicPartition kafka.TopicPartition)
func (*OffsetStore) Get ¶
func (o *OffsetStore) Get() []kafka.TopicPartition
type PartitionOffsets ¶
type PartitionOffsets struct {
// contains filtered or unexported fields
}
type ProcessingControl ¶
type ProcessingControl int32
const ( DROP ProcessingControl = 0 DEADLETTER ProcessingControl = 1 )
type ProcessingError ¶
type ProcessingError struct {
Message string
ProcessingControl ProcessingControl
}
func NewProcessingError ¶
func NewProcessingError(msg string, control ProcessingControl) *ProcessingError
func (*ProcessingError) Error ¶
func (e *ProcessingError) Error() string
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
func NewSink ¶
func NewSink(config SinkConfig) (*Sink, error)
func (*Sink) ParseMessage ¶
func (s *Sink) ParseMessage(e *kafka.Message) (string, *serializer.CloudEventsKafkaPayload, error)
type SinkBuffer ¶
type SinkBuffer struct {
// contains filtered or unexported fields
}
func NewSinkBuffer ¶
func NewSinkBuffer() *SinkBuffer
func (*SinkBuffer) Add ¶
func (b *SinkBuffer) Add(message SinkMessage)
func (*SinkBuffer) Dequeue ¶
func (b *SinkBuffer) Dequeue() []SinkMessage
func (*SinkBuffer) Size ¶
func (b *SinkBuffer) Size() int
type SinkConfig ¶
type SinkConfig struct {
Logger *slog.Logger
Tracer trace.Tracer
MetricMeter metric.Meter
MeterRepository meter.Repository
Storage Storage
Deduplicator dedupe.Deduplicator
Consumer *kafka.Consumer
// MinCommitCount is the minimum number of messages to wait before flushing the buffer.
// Whichever happens earlier MinCommitCount or MaxCommitWait will trigger a flush.
MinCommitCount int
// MaxCommitWait is the maximum time to wait before flushing the buffer
MaxCommitWait time.Duration
// NamespaceRefetch is the interval to refetch exsisting namespaces and meters
// this information is used to configure which topics the consumer subscribes and
// the meter configs used in event validation.
NamespaceRefetch time.Duration
// OnFlushSuccess is an optional lifecycle hook
OnFlushSuccess func(string, int64)
}
type SinkMessage ¶
type SinkMessage struct {
Namespace string
KafkaMessage *kafka.Message
Serialized *serializer.CloudEventsKafkaPayload
Error *ProcessingError
}
type Storage ¶
type Storage interface {
BatchInsert(ctx context.Context, messages []SinkMessage) error
BatchInsertInvalid(ctx context.Context, messages []SinkMessage) error
}
Click to show internal directories.
Click to hide internal directories.