Documentation
¶
Overview ¶
Package messaging provides Redpanda/Kafka producer and consumer implementations.
Producer: Async publishing with error channels for low-latency event ingestion Consumer: Batch processing with smart batching (500 events OR 20ms timeout)
Key features:
- At-least-once delivery guarantees
- Automatic retry with exponential backoff
- Dead letter queue for failed events
- Thread-safe metrics with RWMutex
Index ¶
- type BatchEventHandler
- type Consumer
- type ConsumerConfig
- type ConsumerMetrics
- type DLQWriter
- type EventEnvelope
- type EventHandler
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Flush(ctx context.Context) error
- func (p *Producer) Publish(ctx context.Context, envelope *EventEnvelope) error
- func (p *Producer) PublishAsync(ctx context.Context, envelope *EventEnvelope, errChan chan<- error)
- func (p *Producer) PublishAsyncBatch(ctx context.Context, envelopes []*EventEnvelope, errChan chan<- error)
- func (p *Producer) PublishBatch(ctx context.Context, envelopes []*EventEnvelope) error
- type RecordBatch
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchEventHandler ¶
type BatchEventHandler func(ctx context.Context, envelopes []*EventEnvelope) error
BatchEventHandler processes a batch of events
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer handles consuming events from Redpanda
func NewBatchConsumer ¶
func NewBatchConsumer(brokers []string, topic string, groupID string, handler BatchEventHandler, config *ConsumerConfig, dlqWriter DLQWriter) (*Consumer, error)
NewBatchConsumer creates a new consumer with custom batch handler and configuration
func NewConsumer ¶
func NewConsumer(brokers []string, topic string, groupID string, handler EventHandler, dlqWriter DLQWriter) (*Consumer, error)
NewConsumer creates a new Redpanda consumer with default config
func (*Consumer) GetMetrics ¶
func (c *Consumer) GetMetrics() ConsumerMetrics
GetMetrics returns current consumer metrics
type ConsumerConfig ¶
type ConsumerConfig struct {
BatchSize int // Maximum batch size (default: 100)
BatchTimeout time.Duration // Maximum time to wait for batch (default: 1s)
MaxRetries int // Maximum retry attempts (default: 3)
RetryBackoffMin time.Duration // Minimum retry backoff (default: 1s)
RetryBackoffMax time.Duration // Maximum retry backoff (default: 30s)
Workers int // Number of concurrent workers (default: 4)
}
ConsumerConfig holds consumer configuration
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() *ConsumerConfig
DefaultConsumerConfig returns the default consumer configuration
type ConsumerMetrics ¶
type ConsumerMetrics struct {
EventsProcessed int64
EventsFailed int64
EventsDLQ int64
DBWriteErrors int64 // Track DB write failures
UnmarshalErrors int64 // Track unmarshal failures separately
BatchesProcessed int64
BatchesFailed int64 // Track batch failures
TotalLatencyMs int64
LastProcessedTime time.Time
}
ConsumerMetrics tracks consumer performance
type DLQWriter ¶
type DLQWriter interface {
WriteBatch(ctx context.Context, envelopes []*EventEnvelope, err error) error
}
DLQWriter interface for writing failed events
type EventEnvelope ¶
type EventEnvelope struct {
SchemaVersion string `json:"schema_version"`
EventType string `json:"event_type"`
TenantID string `json:"tenant_id"`
EventID string `json:"event_id"`
Timestamp time.Time `json:"timestamp"`
Payload json.RawMessage `json:"payload"` // Raw JSON for efficient marshaling
}
EventEnvelope wraps an event for Redpanda
type EventHandler ¶
type EventHandler func(ctx context.Context, envelope *EventEnvelope) error
EventHandler processes a single consumed event
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer handles publishing events to Redpanda
func NewProducer ¶
NewProducer creates a new Redpanda producer
func (*Producer) Publish ¶
func (p *Producer) Publish(ctx context.Context, envelope *EventEnvelope) error
Publish sends an event to Redpanda synchronously
func (*Producer) PublishAsync ¶
func (p *Producer) PublishAsync(ctx context.Context, envelope *EventEnvelope, errChan chan<- error)
PublishAsync sends an event to Redpanda asynchronously Returns immediately, errors are sent to the provided error channel
func (*Producer) PublishAsyncBatch ¶
func (p *Producer) PublishAsyncBatch(ctx context.Context, envelopes []*EventEnvelope, errChan chan<- error)
PublishAsyncBatch sends multiple events to Redpanda asynchronously Returns immediately, errors are sent to the provided error channel
func (*Producer) PublishBatch ¶
func (p *Producer) PublishBatch(ctx context.Context, envelopes []*EventEnvelope) error
PublishBatch sends multiple events to Redpanda synchronously Waits for Kafka broker acknowledgment before returning
type RecordBatch ¶
type RecordBatch struct {
Records []*kgo.Record
Envelopes []*EventEnvelope
}
RecordBatch groups records for batch processing