pgmq

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

README

PGMQ Package

基于 PostgreSQL pgmq 扩展的队列封装,提供泛型消息结构、标准 PGMQ 命名以及带重试与 DLQ 的消费能力。

✨ 特性

  • 泛型消息Message[T] 自动完成 JSON 编解码。
  • 标准 APISend / SendBatch / Read / Pop / Archive / Delete / Drop
  • 自动校验:初始化时检测 pgmq 扩展,可自动创建队列。
  • 并发消费者:支持协程池与渐进式重试、DLQ 自动转移。
  • 可观测性:可插拔 Metrics 接口。

✅ 依赖与初始化要求

pgmq 扩展
  • 必须预先安装扩展:
    CREATE EXTENSION IF NOT EXISTS pgmq;
    
    或使用内置方法:
    if err := pgmq.CreateExtension(ctx, adapter); err != nil {
        log.Fatal(err)
    }
    

🚀 SDK 风格快速开始

package main

import (
    "context"
    "log"

    "github.com/tsopia/go-kit/database"
    "github.com/tsopia/go-kit/pgmq"
)

func main() {
    db, err := database.New(&database.Config{
        Driver:   "postgres",
        Host:     "127.0.0.1",
        Port:     5432,
        Username: "postgres",
        Password: "password",
        Database: "app",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    adapter, err := pgmq.NewAdapter(context.Background(), db)
    if err != nil {
        log.Fatal(err)
    }

    _, err = pgmq.Configure(adapter)
    if err != nil {
        log.Fatal(err)
    }

    id, err := pgmq.SendMsg(context.Background(), "orders", map[string]string{"order_id": "123"})
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("sent: %d", id)
}

🔌 单一连接串接入(推荐)

adapter, err := pgmq.NewAdapter(ctx, "postgres://user:pass@localhost:5432/app")
if err != nil {
    log.Fatal(err)
}

_, err = pgmq.Configure(adapter)
if err != nil {
    log.Fatal(err)
}

queue, err := pgmq.NewQueue[map[string]string](ctx, adapter, "orders")
if err != nil {
    log.Fatal(err)
}

🧩 消费者示例

consumer, err := queue.StartConsumer(context.Background(), func(ctx context.Context, msg *pgmq.Message[map[string]string]) error {
    // 处理逻辑
    return nil
})
if err != nil {
    log.Fatal(err)
}
defer consumer.Stop(context.Background())

⚙️ 配置说明

queue, err := pgmq.NewQueue[map[string]string](ctx, adapter, "orders",
    pgmq.WithCheckExtension(true),
    pgmq.WithEnsureQueue(true),
    pgmq.WithReadQuantity(1),
    pgmq.WithVisibilityTimeout(30*time.Second),
    pgmq.WithRetryConfig(pgmq.RetryConfig{
        MaxRetries:    5,
        InitialDelay:  2 * time.Second,
        MaxDelay:      5 * time.Minute,
        BackoffFactor: 2,
        Jitter:        true,
    }),
    pgmq.WithConsumerConfig(pgmq.ConsumerConfig{
        VisibilityTimeout: 30 * time.Second,
        PollInterval:      200 * time.Millisecond,
        MaxConcurrency:    4,
    }),
)

🧰 SDK 快捷方法示例

_, err := pgmq.Configure(adapter,
    pgmq.WithEnsureQueue(true),
)
if err != nil {
    log.Fatal(err)
}

id, err := pgmq.SendMsg(ctx, "orders", map[string]string{"order_id": "1"})
if err != nil {
    log.Fatal(err)
}
log.Printf("sent: %d", id)

📦 批量发送示例

ids, err := queue.SendBatch(ctx, []map[string]string{
    {"order_id": "1"},
    {"order_id": "2"},
}, 0)
if err != nil {
    log.Fatal(err)
}
log.Printf("sent batch: %v", ids)

📌 目录结构

  • config.go:默认值、校验与重试/消费配置
  • options.go:Option 注入
  • queue.go:Queue 管理与核心 API
  • client.go:SDK Client 与全局实例管理
  • helpers.go:SDK 快捷方法
  • consumer.go:Consumer 生命周期管理
  • types.go:基础类型与消息结构
  • adapter_db.go:DB 适配
  • errors.go:错误定义

🔒 API 稳定性约定

  • ConfigureGetClient 以及 SendMsg/ReadMsg/... 等 SDK 快捷函数属于公开 API,优先保证签名和语义稳定。
  • 内部可通过提取公共流程减少重复代码,但不改变缺省 Client 解析和错误返回行为。

Documentation

Index

Constants

View Source
const (
	DefaultSchema        = "pgmq"
	ExtensionName        = "pgmq"
	DefaultDLQSuffix     = "_dlq"
	DefaultVisibility    = 30 * time.Second
	DefaultReadQuantity  = 1
	DefaultMaxRetries    = 5
	DefaultRetryDelay    = 2 * time.Second
	DefaultRetryMaxDelay = 5 * time.Minute
	DefaultBackoffFactor = 2.0
	DefaultPollInterval  = 200 * time.Millisecond
	DefaultConcurrency   = 4
)
View Source
const (
	StatusSuccess = "success"
	StatusRetry   = "retry"
	StatusDLQ     = "dlq"
)

Variables

View Source
var (
	ErrMissingDB        = errors.New("db 不能为空")
	ErrMissingClient    = errors.New("client 未初始化")
	ErrNoRows           = errors.New("pgmq: no rows in result set")
	ErrMissingQueue     = errors.New("queue 名称不能为空")
	ErrInvalidQueue     = errors.New("queue 名称不合法")
	ErrInvalidDelay     = errors.New("延迟时间不能为负")
	ErrInvalidQuantity  = errors.New("读取数量必须大于0")
	ErrInvalidConfig    = errors.New("配置无效")
	ErrExtensionMissing = errors.New("pgmq 扩展不存在")
	ErrDecodeMessage    = errors.New("消息解码失败")
)

Functions

func ArchiveMsg

func ArchiveMsg(ctx context.Context, queue string, messageID int64, c ...*Client) error

ArchiveMsg 归档消息

func CreateExtension

func CreateExtension(ctx context.Context, db DB) error

CreateExtension 创建 pgmq 扩展

func DeleteMsg

func DeleteMsg(ctx context.Context, queue string, messageID int64, c ...*Client) error

DeleteMsg 删除消息

func SendBatchMsg

func SendBatchMsg[T any](ctx context.Context, queue string, payloads []T, c ...*Client) ([]int64, error)

SendBatchMsg 批量发送消息

func SendBatchMsgWithDelay

func SendBatchMsgWithDelay[T any](ctx context.Context, queue string, payloads []T, delay time.Duration, c ...*Client) ([]int64, error)

SendBatchMsgWithDelay 批量发送消息(延迟秒)

func SendMsg

func SendMsg[T any](ctx context.Context, queue string, payload T, c ...*Client) (int64, error)

SendMsg 发送消息

func SendMsgWithDelay

func SendMsgWithDelay[T any](ctx context.Context, queue string, payload T, delay time.Duration, c ...*Client) (int64, error)

SendMsgWithDelay 发送消息(延迟秒)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client SDK 风格客户端

func Configure

func Configure(db DB, opts ...Option) (*Client, error)

Configure 初始化/替换默认 Client

func GetClient

func GetClient() *Client

GetClient 获取默认 Client

func NewClient

func NewClient(db DB, opts ...Option) (*Client, error)

NewClient 创建 Client

type Consumer

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

Consumer manages the worker lifecycle.

func (*Consumer[T]) Stop

func (c *Consumer[T]) Stop(ctx context.Context) error

Stop stops polling new messages and waits for in-flight handlers.

func (*Consumer[T]) Wait

func (c *Consumer[T]) Wait(ctx context.Context) error

Wait blocks until the consumer stops or returns an error.

type ConsumerConfig

type ConsumerConfig struct {
	VisibilityTimeout time.Duration
	PollInterval      time.Duration
	MaxConcurrency    int
}

ConsumerConfig 消费者配置

type ConsumerOption

type ConsumerOption func(*ConsumerConfig)

ConsumerOption overrides consumer behavior.

func WithConsumerMaxConcurrency

func WithConsumerMaxConcurrency(n int) ConsumerOption

WithConsumerMaxConcurrency sets max concurrent handlers.

func WithConsumerPollInterval

func WithConsumerPollInterval(interval time.Duration) ConsumerOption

WithConsumerPollInterval sets the polling interval when no messages are found.

func WithConsumerVisibilityTimeout

func WithConsumerVisibilityTimeout(vt time.Duration) ConsumerOption

WithConsumerVisibilityTimeout overrides visibility timeout for consumer reads.

type DB

type DB interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

DB 适配最小数据库能力

func NewAdapter

func NewAdapter(ctx context.Context, source any) (DB, error)

NewAdapter 统一适配不同 DB 输入

func NewDatabaseAdapter

func NewDatabaseAdapter(db database.DB) (DB, error)

NewDatabaseAdapter 适配 go-kit/database.DB

func NewPgxAdapter

func NewPgxAdapter(ctx context.Context, connString string) (DB, error)

NewPgxAdapter 使用 pgx 驱动创建 database/sql 连接池

type HandlerFunc

type HandlerFunc[T any] func(context.Context, *Message[T]) error

HandlerFunc processes a message.

type Message

type Message[T any] struct {
	MsgID      int64
	ReadCount  int64
	EnqueuedAt time.Time
	VT         time.Time
	Raw        json.RawMessage
	Headers    json.RawMessage
	Body       T
}

Message PGMQ 消息

func PopMsg

func PopMsg[T any](ctx context.Context, queue string, c ...*Client) (*Message[T], error)

PopMsg 读取并删除消息

func ReadMsg

func ReadMsg[T any](ctx context.Context, queue string, opts ReadOptions, c ...*Client) ([]Message[T], error)

ReadMsg 读取消息

func SetVisibilityTimeoutMsg

func SetVisibilityTimeoutMsg[T any](ctx context.Context, queue string, messageID int64, delay time.Duration, c ...*Client) (*Message[T], error)

SetVisibilityTimeoutMsg 设置消息可见性超时

type Metrics

type Metrics interface {
	IncProcessCount(queue string, status string)
	ObserveLatency(queue string, duration time.Duration)
}

Metrics 可插拔指标接口

type Option

type Option func(*QueueConfig, *QueueOptions)

Option 配置 Queue

func WithCheckExtension

func WithCheckExtension(enabled bool) Option

WithCheckExtension 是否检查扩展

func WithConsumerConfig

func WithConsumerConfig(consumer ConsumerConfig) Option

WithConsumerConfig 设置消费者配置

func WithDLQSuffix

func WithDLQSuffix(suffix string) Option

WithDLQSuffix 设置死信队列后缀

func WithEnsureQueue

func WithEnsureQueue(enabled bool) Option

WithEnsureQueue 是否自动创建队列

func WithLogger

func WithLogger(logger SimpleLogger) Option

WithLogger 注入日志器

func WithMetrics

func WithMetrics(metrics Metrics) Option

WithMetrics 注入指标实现

func WithReadQuantity

func WithReadQuantity(quantity int) Option

WithReadQuantity 设置默认读取数量

func WithRetryConfig

func WithRetryConfig(retry RetryConfig) Option

WithRetryConfig 设置重试配置

func WithSchema

func WithSchema(schema string) Option

WithSchema 设置 schema

func WithVisibilityTimeout

func WithVisibilityTimeout(timeout time.Duration) Option

WithVisibilityTimeout 设置默认可见性超时

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue 队列封装

func NewQueue

func NewQueue[T any](ctx context.Context, db DB, name string, opts ...Option) (*Queue[T], error)

NewQueue 创建队列

func NewQueueWithClient

func NewQueueWithClient[T any](ctx context.Context, name string, c ...*Client) (*Queue[T], error)

NewQueueWithClient 使用 Client 创建队列实例

func (*Queue[T]) Archive

func (q *Queue[T]) Archive(ctx context.Context, messageID int64) error

Archive 归档消息

func (*Queue[T]) ArchiveBatch

func (q *Queue[T]) ArchiveBatch(ctx context.Context, messageIDs []int64) ([]int64, error)

ArchiveBatch 批量归档消息

func (*Queue[T]) Consume

func (q *Queue[T]) Consume(ctx context.Context, handler HandlerFunc[T]) error

Consume starts a consumer and blocks until it stops.

func (*Queue[T]) Delete

func (q *Queue[T]) Delete(ctx context.Context, messageID int64) error

Delete 删除消息

func (*Queue[T]) DeleteBatch

func (q *Queue[T]) DeleteBatch(ctx context.Context, messageIDs []int64) ([]int64, error)

DeleteBatch 批量删除消息

func (*Queue[T]) Drop

func (q *Queue[T]) Drop(ctx context.Context) error

Drop 删除队列

func (*Queue[T]) Name

func (q *Queue[T]) Name() string

Name 队列名称

func (*Queue[T]) Pop

func (q *Queue[T]) Pop(ctx context.Context) (*Message[T], error)

Pop 读取并删除消息

func (*Queue[T]) Read

func (q *Queue[T]) Read(ctx context.Context, opts ReadOptions) ([]Message[T], error)

Read 读取消息

func (*Queue[T]) Send

func (q *Queue[T]) Send(ctx context.Context, payload T) (int64, error)

Send 发送消息

func (*Queue[T]) SendBatch

func (q *Queue[T]) SendBatch(ctx context.Context, payloads []T) ([]int64, error)

SendBatch 批量发送消息

func (*Queue[T]) SendBatchRaw

func (q *Queue[T]) SendBatchRaw(ctx context.Context, queue string, payloads []json.RawMessage) ([]int64, error)

SendBatchRaw 批量发送 JSON 消息

func (*Queue[T]) SendBatchRawWithDelay

func (q *Queue[T]) SendBatchRawWithDelay(ctx context.Context, queue string, payloads []json.RawMessage, delay time.Duration) ([]int64, error)

SendBatchRawWithDelay 批量发送 JSON 消息(延迟秒)

func (*Queue[T]) SendBatchRawWithDelayTimestamp

func (q *Queue[T]) SendBatchRawWithDelayTimestamp(ctx context.Context, queue string, payloads []json.RawMessage, delay time.Time) ([]int64, error)

SendBatchRawWithDelayTimestamp 批量发送 JSON 消息(指定时间)

func (*Queue[T]) SendBatchWithDelay

func (q *Queue[T]) SendBatchWithDelay(ctx context.Context, payloads []T, delay time.Duration) ([]int64, error)

SendBatchWithDelay 批量发送消息(延迟秒)

func (*Queue[T]) SendBatchWithDelayTimestamp

func (q *Queue[T]) SendBatchWithDelayTimestamp(ctx context.Context, payloads []T, delay time.Time) ([]int64, error)

SendBatchWithDelayTimestamp 批量发送消息(指定时间)

func (*Queue[T]) SendRaw

func (q *Queue[T]) SendRaw(ctx context.Context, queue string, payload json.RawMessage) (int64, error)

SendRaw 直接发送 JSON 消息

func (*Queue[T]) SendRawWithDelay

func (q *Queue[T]) SendRawWithDelay(ctx context.Context, queue string, payload json.RawMessage, delay time.Duration) (int64, error)

SendRawWithDelay 直接发送 JSON 消息(延迟秒)

func (*Queue[T]) SendRawWithDelayTimestamp

func (q *Queue[T]) SendRawWithDelayTimestamp(ctx context.Context, queue string, payload json.RawMessage, delay time.Time) (int64, error)

SendRawWithDelayTimestamp 直接发送 JSON 消息(指定时间)

func (*Queue[T]) SendWithDelay

func (q *Queue[T]) SendWithDelay(ctx context.Context, payload T, delay time.Duration) (int64, error)

SendWithDelay 发送消息(延迟秒)

func (*Queue[T]) SendWithDelayTimestamp

func (q *Queue[T]) SendWithDelayTimestamp(ctx context.Context, payload T, delay time.Time) (int64, error)

SendWithDelayTimestamp 发送消息(指定时间)

func (*Queue[T]) SetVisibilityTimeout

func (q *Queue[T]) SetVisibilityTimeout(ctx context.Context, messageID int64, delay time.Duration) (*Message[T], error)

SetVisibilityTimeout 设置消息可见性超时

func (*Queue[T]) StartConsumer

func (q *Queue[T]) StartConsumer(ctx context.Context, handler HandlerFunc[T], opts ...ConsumerOption) (*Consumer[T], error)

StartConsumer starts polling the queue with concurrency control.

type QueueConfig

type QueueConfig struct {
	Schema         string
	CheckExtension bool
	EnsureQueue    bool
	DLQSuffix      string
	Visibility     time.Duration
	ReadQuantity   int
	Retry          RetryConfig
	Consumer       ConsumerConfig
}

QueueConfig 队列配置

func (*QueueConfig) SetDefaults

func (c *QueueConfig) SetDefaults()

SetDefaults 设置默认值

func (*QueueConfig) Validate

func (c *QueueConfig) Validate() error

Validate 校验配置

type QueueOptions

type QueueOptions struct {
	// contains filtered or unexported fields
}

QueueOptions 额外注入项

type ReadOptions

type ReadOptions struct {
	VisibilityTimeout time.Duration
	Quantity          int
}

ReadOptions 读取配置

type RetryConfig

type RetryConfig struct {
	MaxRetries    int
	InitialDelay  time.Duration
	MaxDelay      time.Duration
	BackoffFactor float64
	Jitter        bool
}

RetryConfig 重试配置

type SQLDBAdapter

type SQLDBAdapter struct {
	// contains filtered or unexported fields
}

SQLDBAdapter 适配 *sql.DB

func NewSQLDBAdapter

func NewSQLDBAdapter(db *sql.DB) (*SQLDBAdapter, error)

NewSQLDBAdapter 创建 SQLDBAdapter

func (*SQLDBAdapter) ExecContext

func (a *SQLDBAdapter) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

func (*SQLDBAdapter) QueryContext

func (a *SQLDBAdapter) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

func (*SQLDBAdapter) QueryRowContext

func (a *SQLDBAdapter) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

type SimpleLogger

type SimpleLogger interface {
	Info(msg string, fields ...interface{})
	Warn(msg string, fields ...interface{})
	Error(msg string, fields ...interface{})
}

SimpleLogger 基础日志接口

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL