messaging

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package messaging provides Redpanda/Kafka producer and consumer implementations.

Producer: Async publishing with error channels for low-latency event ingestion Consumer: Batch processing with smart batching (500 events OR 20ms timeout)

Key features:

  • At-least-once delivery guarantees
  • Automatic retry with exponential backoff
  • Dead letter queue for failed events
  • Thread-safe metrics with RWMutex

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchEventHandler

type BatchEventHandler func(ctx context.Context, envelopes []*EventEnvelope) error

BatchEventHandler processes a batch of events

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles consuming events from Redpanda

func NewBatchConsumer

func NewBatchConsumer(brokers []string, topic string, groupID string, handler BatchEventHandler, config *ConsumerConfig, dlqWriter DLQWriter) (*Consumer, error)

NewBatchConsumer creates a new consumer with custom batch handler and configuration

func NewConsumer

func NewConsumer(brokers []string, topic string, groupID string, handler EventHandler, dlqWriter DLQWriter) (*Consumer, error)

NewConsumer creates a new Redpanda consumer with default config

func (*Consumer) Close

func (c *Consumer) Close()

Close closes the consumer

func (*Consumer) GetMetrics

func (c *Consumer) GetMetrics() ConsumerMetrics

GetMetrics returns current consumer metrics

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start begins consuming messages with batch processing

type ConsumerConfig

type ConsumerConfig struct {
	BatchSize       int           // Maximum batch size (default: 100)
	BatchTimeout    time.Duration // Maximum time to wait for batch (default: 1s)
	MaxRetries      int           // Maximum retry attempts (default: 3)
	RetryBackoffMin time.Duration // Minimum retry backoff (default: 1s)
	RetryBackoffMax time.Duration // Maximum retry backoff (default: 30s)
	Workers         int           // Number of concurrent workers (default: 4)
}

ConsumerConfig holds consumer configuration

func DefaultConsumerConfig

func DefaultConsumerConfig() *ConsumerConfig

DefaultConsumerConfig returns the default consumer configuration

type ConsumerMetrics

type ConsumerMetrics struct {
	EventsProcessed   int64
	EventsFailed      int64
	EventsDLQ         int64
	DBWriteErrors     int64 // Track DB write failures
	UnmarshalErrors   int64 // Track unmarshal failures separately
	BatchesProcessed  int64
	BatchesFailed     int64 // Track batch failures
	TotalLatencyMs    int64
	LastProcessedTime time.Time
}

ConsumerMetrics tracks consumer performance

type DLQWriter

type DLQWriter interface {
	WriteBatch(ctx context.Context, envelopes []*EventEnvelope, err error) error
}

DLQWriter interface for writing failed events

type EventEnvelope

type EventEnvelope struct {
	SchemaVersion string          `json:"schema_version"`
	EventType     string          `json:"event_type"`
	TenantID      string          `json:"tenant_id"`
	EventID       string          `json:"event_id"`
	Timestamp     time.Time       `json:"timestamp"`
	Payload       json.RawMessage `json:"payload"` // Raw JSON for efficient marshaling
}

EventEnvelope wraps an event for Redpanda

type EventHandler

type EventHandler func(ctx context.Context, envelope *EventEnvelope) error

EventHandler processes a single consumed event

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer handles publishing events to Redpanda

func NewProducer

func NewProducer(brokers []string, topic string) (*Producer, error)

NewProducer creates a new Redpanda producer

func (*Producer) Close

func (p *Producer) Close()

Close closes the producer

func (*Producer) Flush

func (p *Producer) Flush(ctx context.Context) error

Flush waits for all async messages to be sent

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, envelope *EventEnvelope) error

Publish sends an event to Redpanda synchronously

func (*Producer) PublishAsync

func (p *Producer) PublishAsync(ctx context.Context, envelope *EventEnvelope, errChan chan<- error)

PublishAsync sends an event to Redpanda asynchronously Returns immediately, errors are sent to the provided error channel

func (*Producer) PublishAsyncBatch

func (p *Producer) PublishAsyncBatch(ctx context.Context, envelopes []*EventEnvelope, errChan chan<- error)

PublishAsyncBatch sends multiple events to Redpanda asynchronously Returns immediately, errors are sent to the provided error channel

func (*Producer) PublishBatch

func (p *Producer) PublishBatch(ctx context.Context, envelopes []*EventEnvelope) error

PublishBatch sends multiple events to Redpanda synchronously Waits for Kafka broker acknowledgment before returning

type RecordBatch

type RecordBatch struct {
	Records   []*kgo.Record
	Envelopes []*EventEnvelope
}

RecordBatch groups records for batch processing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL