Documentation
¶
Index ¶
- func WithRetry(action func() (bool, error), intialBackoff time.Duration, ...) error
- type BatchMerger
- type BatchMetricPublisher
- type Batcher
- type EventCounter
- type HasPulsarMessageIds
- type IngestionPipeline
- type InstructionConverter
- type MessageUnmarshaller
- type PartitionInfoFetcher
- type PulsarDelayRecorder
- type Sink
- type TopicProcessingDelayMonitor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchMerger ¶ added in v0.15.0
type BatchMerger[T utils.ArmadaEvent] func(batch []*utils.EventsWithIds[T]) *utils.EventsWithIds[T]
BatchMerger merges together events within the batch, where possible
type BatchMetricPublisher ¶ added in v0.15.0
type BatchMetricPublisher[T utils.ArmadaEvent] func(metrics *commonmetrics.Metrics, batch *utils.EventsWithIds[T])
BatchMetricPublisher logs a summary of the batching process
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).
func NewBatcher ¶
func (*Batcher[T]) Run ¶
func (b *Batcher[T]) Run(ctx *armadacontext.Context)
type EventCounter ¶ added in v0.15.0
type EventCounter[T utils.ArmadaEvent] func(events *utils.EventsWithIds[T]) int
EventCounter determines the true count of events, as some utils.ArmadaEvent can contain nested events
type HasPulsarMessageIds ¶
HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end
type IngestionPipeline ¶
type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct { // contains filtered or unexported fields }
IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:
- Receiving messages from pulsar
- Unmarshalling into eventsWithIds
- Combining messages into batches for efficient processing
- Publishing relevant metrics related to batch
- Converting eventsWithIds to instructions
- Acking processed messages
Callers must supply two structs, an InstructionConverter for converting eventsWithIds into something that can be exhausted and a Sink capable of exhausting these objects
func NewIngestionPipeline ¶
func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent]( pulsarConfig commonconfig.PulsarConfig, pulsarTopic string, pulsarSubscriptionName string, pulsarBatchSize int, pulsarBatchDuration time.Duration, pulsarSubscriptionType pulsar.SubscriptionType, eventCounter EventCounter[U], messageConverter MessageUnmarshaller[U], batchMerger BatchMerger[U], metricPublisher BatchMetricPublisher[U], converter InstructionConverter[T, U], sink Sink[T], metrics *commonmetrics.Metrics, ) *IngestionPipeline[T, U]
NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages
func (*IngestionPipeline[T, U]) Run ¶
func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error
Run will run the ingestion pipeline until the supplied context is shut down
type InstructionConverter ¶
type InstructionConverter[T HasPulsarMessageIds, U utils.ArmadaEvent] interface { Convert(ctx *armadacontext.Context, msg *utils.EventsWithIds[U]) T }
InstructionConverter should be implemented by structs that can convert a batch of eventsWithIds into an object suitable for passing to the sink
type MessageUnmarshaller ¶ added in v0.15.0
type MessageUnmarshaller[T utils.ArmadaEvent] func(msg pulsar.ConsumerMessage, metrics *commonmetrics.Metrics) *utils.EventsWithIds[T]
MessageUnmarshaller converts consumed pulsar messages to the intermediate type, utils.EventsWithIds.
type PartitionInfoFetcher ¶ added in v0.20.2
type PulsarDelayRecorder ¶ added in v0.20.2
type Sink ¶
type Sink[T HasPulsarMessageIds] interface { // Store should persist the sink. The store is responsible for retrying failed attempts and should only return an error // When it is satisfied that operation cannot be retries. Store(ctx *armadacontext.Context, msg T) error }
Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.
type TopicProcessingDelayMonitor ¶ added in v0.20.2
type TopicProcessingDelayMonitor struct {
// contains filtered or unexported fields
}
func NewTopicProcessingDelayMonitor ¶ added in v0.20.2
func NewTopicProcessingDelayMonitor( partitionInfoFetcher PartitionInfoFetcher, adminClient pulsarclient.Client, topic string, subscriptionName string, interval time.Duration, metrics PulsarDelayRecorder, ) *TopicProcessingDelayMonitor
func (*TopicProcessingDelayMonitor) Initialise ¶ added in v0.20.2
func (t *TopicProcessingDelayMonitor) Initialise(ctx *armadacontext.Context) error
func (*TopicProcessingDelayMonitor) Run ¶ added in v0.20.2
func (t *TopicProcessingDelayMonitor) Run(ctx *armadacontext.Context) error