sqlite

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package sqlite provides a SQLite-based transport for protoflow.

Index

Constants

View Source
const (
	// DefaultPollInterval is the default interval for polling new messages.
	DefaultPollInterval = 100 * time.Millisecond
	// DefaultMaxRetries is the default number of retries before moving to DLQ.
	DefaultMaxRetries = 3
)
View Source
const TransportName = "sqlite"

TransportName is the name used to register this transport.

Variables

This section is empty.

Functions

func Build

Build creates a new SQLite transport.

func Capabilities

func Capabilities() transport.Capabilities

Capabilities returns the capabilities of this transport.

func Register added in v0.5.0

func Register()

Register registers the SQLite transport with the default registry. This should be called from an init() function in an importing package, or explicitly before using the transport.

Types

type Config

type Config struct {
	// FilePath is the path to the SQLite database file.
	// Use ":memory:" for an in-memory database (useful for testing).
	FilePath string
	// PollInterval is the interval for polling new messages.
	PollInterval time.Duration
	// MaxRetries is the number of times to retry a message before giving up.
	MaxRetries int
}

Config holds SQLite-specific configuration.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements both Publisher and Subscriber interfaces for SQLite.

func New

func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)

New creates a new SQLite-based transport.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport and releases resources.

func (*Transport) GetCapabilities

func (t *Transport) GetCapabilities() transport.Capabilities

GetCapabilities returns the capabilities of this transport instance.

func (*Transport) GetDB

func (t *Transport) GetDB() *sql.DB

GetDB returns the underlying database connection for advanced use cases.

func (*Transport) GetDLQCount

func (t *Transport) GetDLQCount(topic string) (int64, error)

GetDLQCount returns the number of messages in the dead letter queue for a topic.

func (*Transport) GetPendingCount

func (t *Transport) GetPendingCount(topic string) (int64, error)

GetPendingCount returns the number of pending messages for a topic.

func (*Transport) ListDLQMessages

func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)

ListDLQMessages returns messages from the dead letter queue with pagination.

func (*Transport) Publish

func (t *Transport) Publish(topic string, messages ...*message.Message) error

Publish publishes a message to the specified topic.

func (*Transport) PurgeDLQ

func (t *Transport) PurgeDLQ(topic string) (int64, error)

PurgeDLQ removes all messages from the dead letter queue for a topic.

func (*Transport) ReplayAllDLQ

func (t *Transport) ReplayAllDLQ(topic string) (int64, error)

ReplayAllDLQ moves all messages from DLQ back to the main queue for a topic.

func (*Transport) ReplayDLQMessage

func (t *Transport) ReplayDLQMessage(dlqID int64) error

ReplayDLQMessage moves a message from DLQ back to the main queue.

func (*Transport) Subscribe

func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes to messages from the specified topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL