logmq

package
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidLogEntry = errors.New("invalid log entry: both event and attempt are required")

ErrInvalidLogEntry is returned when a LogEntry is missing required fields.

Functions

func NewMessageHandler

func NewMessageHandler(logger *logging.Logger, batchAdder BatchAdder) consumer.MessageHandler

func WithQueue

func WithQueue(queueConfig *mqs.QueueConfig) func(opts *LogMQOption)

Types

type BatchAdder added in v0.13.0

type BatchAdder interface {
	Add(ctx context.Context, msg *mqs.Message) error
}

BatchAdder is the interface for adding messages to a batch processor.

type BatchProcessor added in v0.13.0

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor batches log entries and writes them to the log store.

func NewBatchProcessor added in v0.13.0

func NewBatchProcessor(ctx context.Context, logger *logging.Logger, logStore LogStore, cfg BatchProcessorConfig) (*BatchProcessor, error)

NewBatchProcessor creates a new batch processor for log entries.

func (*BatchProcessor) Add added in v0.13.0

func (bp *BatchProcessor) Add(ctx context.Context, msg *mqs.Message) error

Add adds a message to the batch.

func (*BatchProcessor) Shutdown added in v0.13.0

func (bp *BatchProcessor) Shutdown()

Shutdown gracefully shuts down the batch processor.

type BatchProcessorConfig added in v0.13.0

type BatchProcessorConfig struct {
	ItemCountThreshold int
	DelayThreshold     time.Duration
}

BatchProcessorConfig configures the batch processor.

type LogInfra

type LogInfra interface {
	DeclareInfrastructure(ctx context.Context) error
}

type LogMQ

type LogMQ struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...func(opts *LogMQOption)) *LogMQ

func (*LogMQ) Init

func (q *LogMQ) Init(ctx context.Context) (func(), error)

func (*LogMQ) Publish

func (q *LogMQ) Publish(ctx context.Context, entry models.LogEntry) error

func (*LogMQ) Subscribe

func (q *LogMQ) Subscribe(ctx context.Context) (mqs.Subscription, error)

type LogMQOption

type LogMQOption struct {
	QueueConfig *mqs.QueueConfig
}

type LogStore added in v0.13.0

type LogStore interface {
	InsertMany(ctx context.Context, entries []*models.LogEntry) error
}

LogStore defines the interface for persisting log entries. This is a consumer-defined interface containing only what logmq needs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL