Documentation
¶
Index ¶
- Constants
- Variables
- func ArchiveMsg(ctx context.Context, queue string, messageID int64, c ...*Client) error
- func CreateExtension(ctx context.Context, db DB) error
- func DeleteMsg(ctx context.Context, queue string, messageID int64, c ...*Client) error
- func SendBatchMsg[T any](ctx context.Context, queue string, payloads []T, c ...*Client) ([]int64, error)
- func SendBatchMsgWithDelay[T any](ctx context.Context, queue string, payloads []T, delay time.Duration, ...) ([]int64, error)
- func SendMsg[T any](ctx context.Context, queue string, payload T, c ...*Client) (int64, error)
- func SendMsgWithDelay[T any](ctx context.Context, queue string, payload T, delay time.Duration, ...) (int64, error)
- type Client
- type Consumer
- type ConsumerConfig
- type ConsumerOption
- type DB
- type HandlerFunc
- type Message
- func PopMsg[T any](ctx context.Context, queue string, c ...*Client) (*Message[T], error)
- func ReadMsg[T any](ctx context.Context, queue string, opts ReadOptions, c ...*Client) ([]Message[T], error)
- func SetVisibilityTimeoutMsg[T any](ctx context.Context, queue string, messageID int64, delay time.Duration, ...) (*Message[T], error)
- type Metrics
- type Option
- func WithCheckExtension(enabled bool) Option
- func WithConsumerConfig(consumer ConsumerConfig) Option
- func WithDLQSuffix(suffix string) Option
- func WithEnsureQueue(enabled bool) Option
- func WithLogger(logger SimpleLogger) Option
- func WithMetrics(metrics Metrics) Option
- func WithReadQuantity(quantity int) Option
- func WithRetryConfig(retry RetryConfig) Option
- func WithSchema(schema string) Option
- func WithVisibilityTimeout(timeout time.Duration) Option
- type Queue
- func (q *Queue[T]) Archive(ctx context.Context, messageID int64) error
- func (q *Queue[T]) ArchiveBatch(ctx context.Context, messageIDs []int64) ([]int64, error)
- func (q *Queue[T]) Consume(ctx context.Context, handler HandlerFunc[T]) error
- func (q *Queue[T]) Delete(ctx context.Context, messageID int64) error
- func (q *Queue[T]) DeleteBatch(ctx context.Context, messageIDs []int64) ([]int64, error)
- func (q *Queue[T]) Drop(ctx context.Context) error
- func (q *Queue[T]) Name() string
- func (q *Queue[T]) Pop(ctx context.Context) (*Message[T], error)
- func (q *Queue[T]) Read(ctx context.Context, opts ReadOptions) ([]Message[T], error)
- func (q *Queue[T]) Send(ctx context.Context, payload T) (int64, error)
- func (q *Queue[T]) SendBatch(ctx context.Context, payloads []T) ([]int64, error)
- func (q *Queue[T]) SendBatchRaw(ctx context.Context, queue string, payloads []json.RawMessage) ([]int64, error)
- func (q *Queue[T]) SendBatchRawWithDelay(ctx context.Context, queue string, payloads []json.RawMessage, ...) ([]int64, error)
- func (q *Queue[T]) SendBatchRawWithDelayTimestamp(ctx context.Context, queue string, payloads []json.RawMessage, delay time.Time) ([]int64, error)
- func (q *Queue[T]) SendBatchWithDelay(ctx context.Context, payloads []T, delay time.Duration) ([]int64, error)
- func (q *Queue[T]) SendBatchWithDelayTimestamp(ctx context.Context, payloads []T, delay time.Time) ([]int64, error)
- func (q *Queue[T]) SendRaw(ctx context.Context, queue string, payload json.RawMessage) (int64, error)
- func (q *Queue[T]) SendRawWithDelay(ctx context.Context, queue string, payload json.RawMessage, ...) (int64, error)
- func (q *Queue[T]) SendRawWithDelayTimestamp(ctx context.Context, queue string, payload json.RawMessage, delay time.Time) (int64, error)
- func (q *Queue[T]) SendWithDelay(ctx context.Context, payload T, delay time.Duration) (int64, error)
- func (q *Queue[T]) SendWithDelayTimestamp(ctx context.Context, payload T, delay time.Time) (int64, error)
- func (q *Queue[T]) SetVisibilityTimeout(ctx context.Context, messageID int64, delay time.Duration) (*Message[T], error)
- func (q *Queue[T]) StartConsumer(ctx context.Context, handler HandlerFunc[T], opts ...ConsumerOption) (*Consumer[T], error)
- type QueueConfig
- type QueueOptions
- type ReadOptions
- type RetryConfig
- type SQLDBAdapter
- func (a *SQLDBAdapter) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
- func (a *SQLDBAdapter) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
- func (a *SQLDBAdapter) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
- type SimpleLogger
Constants ¶
const ( DefaultSchema = "pgmq" ExtensionName = "pgmq" DefaultDLQSuffix = "_dlq" DefaultVisibility = 30 * time.Second DefaultReadQuantity = 1 DefaultMaxRetries = 5 DefaultRetryDelay = 2 * time.Second DefaultRetryMaxDelay = 5 * time.Minute DefaultBackoffFactor = 2.0 DefaultPollInterval = 200 * time.Millisecond DefaultConcurrency = 4 )
const ( StatusSuccess = "success" StatusRetry = "retry" StatusDLQ = "dlq" )
Variables ¶
var ( ErrMissingDB = errors.New("db 不能为空") ErrMissingClient = errors.New("client 未初始化") ErrNoRows = errors.New("pgmq: no rows in result set") ErrMissingQueue = errors.New("queue 名称不能为空") ErrInvalidQueue = errors.New("queue 名称不合法") ErrInvalidDelay = errors.New("延迟时间不能为负") ErrInvalidQuantity = errors.New("读取数量必须大于0") ErrInvalidConfig = errors.New("配置无效") ErrExtensionMissing = errors.New("pgmq 扩展不存在") ErrDecodeMessage = errors.New("消息解码失败") )
Functions ¶
func ArchiveMsg ¶
ArchiveMsg 归档消息
func CreateExtension ¶
CreateExtension 创建 pgmq 扩展
func SendBatchMsg ¶
func SendBatchMsg[T any](ctx context.Context, queue string, payloads []T, c ...*Client) ([]int64, error)
SendBatchMsg 批量发送消息
func SendBatchMsgWithDelay ¶
func SendBatchMsgWithDelay[T any](ctx context.Context, queue string, payloads []T, delay time.Duration, c ...*Client) ([]int64, error)
SendBatchMsgWithDelay 批量发送消息(延迟秒)
Types ¶
type Consumer ¶
type Consumer[T any] struct { // contains filtered or unexported fields }
Consumer manages the worker lifecycle.
type ConsumerConfig ¶
type ConsumerConfig struct {
VisibilityTimeout time.Duration
PollInterval time.Duration
MaxConcurrency int
}
ConsumerConfig 消费者配置
type ConsumerOption ¶
type ConsumerOption func(*ConsumerConfig)
ConsumerOption overrides consumer behavior.
func WithConsumerMaxConcurrency ¶
func WithConsumerMaxConcurrency(n int) ConsumerOption
WithConsumerMaxConcurrency sets max concurrent handlers.
func WithConsumerPollInterval ¶
func WithConsumerPollInterval(interval time.Duration) ConsumerOption
WithConsumerPollInterval sets the polling interval when no messages are found.
func WithConsumerVisibilityTimeout ¶
func WithConsumerVisibilityTimeout(vt time.Duration) ConsumerOption
WithConsumerVisibilityTimeout overrides visibility timeout for consumer reads.
type DB ¶
type DB interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
DB 适配最小数据库能力
func NewAdapter ¶
NewAdapter 统一适配不同 DB 输入
func NewDatabaseAdapter ¶
NewDatabaseAdapter 适配 go-kit/database.DB
type HandlerFunc ¶
HandlerFunc processes a message.
type Message ¶
type Message[T any] struct { MsgID int64 ReadCount int64 EnqueuedAt time.Time VT time.Time Raw json.RawMessage Headers json.RawMessage Body T }
Message PGMQ 消息
type Metrics ¶
type Metrics interface {
IncProcessCount(queue string, status string)
ObserveLatency(queue string, duration time.Duration)
}
Metrics 可插拔指标接口
type Option ¶
type Option func(*QueueConfig, *QueueOptions)
Option 配置 Queue
func WithConsumerConfig ¶
func WithConsumerConfig(consumer ConsumerConfig) Option
WithConsumerConfig 设置消费者配置
func WithVisibilityTimeout ¶
WithVisibilityTimeout 设置默认可见性超时
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue 队列封装
func NewQueueWithClient ¶
NewQueueWithClient 使用 Client 创建队列实例
func (*Queue[T]) ArchiveBatch ¶
ArchiveBatch 批量归档消息
func (*Queue[T]) Consume ¶
func (q *Queue[T]) Consume(ctx context.Context, handler HandlerFunc[T]) error
Consume starts a consumer and blocks until it stops.
func (*Queue[T]) DeleteBatch ¶
DeleteBatch 批量删除消息
func (*Queue[T]) SendBatchRaw ¶
func (q *Queue[T]) SendBatchRaw(ctx context.Context, queue string, payloads []json.RawMessage) ([]int64, error)
SendBatchRaw 批量发送 JSON 消息
func (*Queue[T]) SendBatchRawWithDelay ¶
func (q *Queue[T]) SendBatchRawWithDelay(ctx context.Context, queue string, payloads []json.RawMessage, delay time.Duration) ([]int64, error)
SendBatchRawWithDelay 批量发送 JSON 消息(延迟秒)
func (*Queue[T]) SendBatchRawWithDelayTimestamp ¶
func (q *Queue[T]) SendBatchRawWithDelayTimestamp(ctx context.Context, queue string, payloads []json.RawMessage, delay time.Time) ([]int64, error)
SendBatchRawWithDelayTimestamp 批量发送 JSON 消息(指定时间)
func (*Queue[T]) SendBatchWithDelay ¶
func (q *Queue[T]) SendBatchWithDelay(ctx context.Context, payloads []T, delay time.Duration) ([]int64, error)
SendBatchWithDelay 批量发送消息(延迟秒)
func (*Queue[T]) SendBatchWithDelayTimestamp ¶
func (q *Queue[T]) SendBatchWithDelayTimestamp(ctx context.Context, payloads []T, delay time.Time) ([]int64, error)
SendBatchWithDelayTimestamp 批量发送消息(指定时间)
func (*Queue[T]) SendRaw ¶
func (q *Queue[T]) SendRaw(ctx context.Context, queue string, payload json.RawMessage) (int64, error)
SendRaw 直接发送 JSON 消息
func (*Queue[T]) SendRawWithDelay ¶
func (q *Queue[T]) SendRawWithDelay(ctx context.Context, queue string, payload json.RawMessage, delay time.Duration) (int64, error)
SendRawWithDelay 直接发送 JSON 消息(延迟秒)
func (*Queue[T]) SendRawWithDelayTimestamp ¶
func (q *Queue[T]) SendRawWithDelayTimestamp(ctx context.Context, queue string, payload json.RawMessage, delay time.Time) (int64, error)
SendRawWithDelayTimestamp 直接发送 JSON 消息(指定时间)
func (*Queue[T]) SendWithDelay ¶
func (q *Queue[T]) SendWithDelay(ctx context.Context, payload T, delay time.Duration) (int64, error)
SendWithDelay 发送消息(延迟秒)
func (*Queue[T]) SendWithDelayTimestamp ¶
func (q *Queue[T]) SendWithDelayTimestamp(ctx context.Context, payload T, delay time.Time) (int64, error)
SendWithDelayTimestamp 发送消息(指定时间)
func (*Queue[T]) SetVisibilityTimeout ¶
func (q *Queue[T]) SetVisibilityTimeout(ctx context.Context, messageID int64, delay time.Duration) (*Message[T], error)
SetVisibilityTimeout 设置消息可见性超时
func (*Queue[T]) StartConsumer ¶
func (q *Queue[T]) StartConsumer(ctx context.Context, handler HandlerFunc[T], opts ...ConsumerOption) (*Consumer[T], error)
StartConsumer starts polling the queue with concurrency control.
type QueueConfig ¶
type QueueConfig struct {
Schema string
CheckExtension bool
EnsureQueue bool
DLQSuffix string
Visibility time.Duration
ReadQuantity int
Retry RetryConfig
Consumer ConsumerConfig
}
QueueConfig 队列配置
type QueueOptions ¶
type QueueOptions struct {
// contains filtered or unexported fields
}
QueueOptions 额外注入项
type ReadOptions ¶
ReadOptions 读取配置
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
Jitter bool
}
RetryConfig 重试配置
type SQLDBAdapter ¶
type SQLDBAdapter struct {
// contains filtered or unexported fields
}
SQLDBAdapter 适配 *sql.DB
func NewSQLDBAdapter ¶
func NewSQLDBAdapter(db *sql.DB) (*SQLDBAdapter, error)
NewSQLDBAdapter 创建 SQLDBAdapter
func (*SQLDBAdapter) ExecContext ¶
func (*SQLDBAdapter) QueryContext ¶
func (*SQLDBAdapter) QueryRowContext ¶
type SimpleLogger ¶
type SimpleLogger interface {
Info(msg string, fields ...interface{})
Warn(msg string, fields ...interface{})
Error(msg string, fields ...interface{})
}
SimpleLogger 基础日志接口