Documentation
¶
Overview ¶
Package sqlite provides a SQLite-based transport for protoflow.
Index ¶
- Constants
- func Build(ctx context.Context, cfg transport.Config, logger watermill.LoggerAdapter) (transport.Transport, error)
- func Capabilities() transport.Capabilities
- func Register()
- type Config
- type Transport
- func (t *Transport) Close() error
- func (t *Transport) GetCapabilities() transport.Capabilities
- func (t *Transport) GetDB() *sql.DB
- func (t *Transport) GetDLQCount(topic string) (int64, error)
- func (t *Transport) GetPendingCount(topic string) (int64, error)
- func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)
- func (t *Transport) Publish(topic string, messages ...*message.Message) error
- func (t *Transport) PurgeDLQ(topic string) (int64, error)
- func (t *Transport) ReplayAllDLQ(topic string) (int64, error)
- func (t *Transport) ReplayDLQMessage(dlqID int64) error
- func (t *Transport) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
Constants ¶
const ( // DefaultPollInterval is the default interval for polling new messages. DefaultPollInterval = 100 * time.Millisecond // DefaultMaxRetries is the default number of retries before moving to DLQ. DefaultMaxRetries = 3 )
const TransportName = "sqlite"
TransportName is the name used to register this transport.
Variables ¶
This section is empty.
Functions ¶
func Build ¶
func Build(ctx context.Context, cfg transport.Config, logger watermill.LoggerAdapter) (transport.Transport, error)
Build creates a new SQLite transport.
func Capabilities ¶
func Capabilities() transport.Capabilities
Capabilities returns the capabilities of this transport.
Types ¶
type Config ¶
type Config struct {
// FilePath is the path to the SQLite database file.
// Use ":memory:" for an in-memory database (useful for testing).
FilePath string
// PollInterval is the interval for polling new messages.
PollInterval time.Duration
// MaxRetries is the number of times to retry a message before giving up.
MaxRetries int
}
Config holds SQLite-specific configuration.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport implements both Publisher and Subscriber interfaces for SQLite.
func New ¶
func New(cfg Config, logger watermill.LoggerAdapter) (*Transport, error)
New creates a new SQLite-based transport.
func (*Transport) GetCapabilities ¶
func (t *Transport) GetCapabilities() transport.Capabilities
GetCapabilities returns the capabilities of this transport instance.
func (*Transport) GetDLQCount ¶
GetDLQCount returns the number of messages in the dead letter queue for a topic.
func (*Transport) GetPendingCount ¶
GetPendingCount returns the number of pending messages for a topic.
func (*Transport) ListDLQMessages ¶
func (t *Transport) ListDLQMessages(topic string, limit, offset int) ([]transport.DLQMessage, error)
ListDLQMessages returns messages from the dead letter queue with pagination.
func (*Transport) ReplayAllDLQ ¶
ReplayAllDLQ moves all messages from DLQ back to the main queue for a topic.
func (*Transport) ReplayDLQMessage ¶
ReplayDLQMessage moves a message from DLQ back to the main queue.