sqlite

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package sqlite provides a SQLite-based transport for protoflow.

Index

Constants

View Source
const (
	// DefaultPollInterval is the default interval for polling new messages.
	DefaultPollInterval = 100 * time.Millisecond
	// DefaultMaxRetries is the default number of retries before moving to DLQ.
	DefaultMaxRetries = 3
)
View Source
const TransportName = "sqlite"

TransportName is the name used to register this transport.

Variables

This section is empty.

Functions

func Build

Build creates a new SQLite transport.

func Capabilities

func Capabilities() transport.Capabilities

Capabilities returns the capabilities of this transport.

Types

type Config

type Config struct {
	// FilePath is the path to the SQLite database file.
	// Use ":memory:" for an in-memory database (useful for testing).
	FilePath string
	// PollInterval is the interval for polling new messages.
	PollInterval time.Duration
	// MaxRetries is the number of times to retry a message before giving up.
	MaxRetries int
}

Config holds SQLite-specific configuration.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements both Publisher and Subscriber interfaces for SQLite.

func New

func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)

New creates a new SQLite-based transport.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport and releases resources.

func (*Transport) GetCapabilities

func (t *Transport) GetCapabilities() transport.Capabilities

GetCapabilities returns the capabilities of this transport instance.

func (*Transport) GetDB

func (t *Transport) GetDB() *sql.DB

GetDB returns the underlying database connection for advanced use cases.

func (*Transport) GetDLQCount

func (t *Transport) GetDLQCount(topic string) (int64, error)

GetDLQCount returns the number of messages in the dead letter queue for a topic.

func (*Transport) GetPendingCount

func (t *Transport) GetPendingCount(topic string) (int64, error)

GetPendingCount returns the number of pending messages for a topic.

func (*Transport) ListDLQMessages

func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)

ListDLQMessages returns messages from the dead letter queue with pagination.

func (*Transport) Publish

func (t *Transport) Publish(topic string, messages ...*message.Message) error

Publish publishes a message to the specified topic.

func (*Transport) PurgeDLQ

func (t *Transport) PurgeDLQ(topic string) (int64, error)

PurgeDLQ removes all messages from the dead letter queue for a topic.

func (*Transport) ReplayAllDLQ

func (t *Transport) ReplayAllDLQ(topic string) (int64, error)

ReplayAllDLQ moves all messages from DLQ back to the main queue for a topic.

func (*Transport) ReplayDLQMessage

func (t *Transport) ReplayDLQMessage(dlqID int64) error

ReplayDLQMessage moves a message from DLQ back to the main queue.

func (*Transport) Subscribe

func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes to messages from the specified topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL