messagequeue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: Apache-2.0 Imports: 14 Imported by: 1

README

framework-messagequeue

Go framework package for messagequeue.

Installation

go get github.com/go-anyway/framework-messagequeue@v1.0.0

Usage

See documentation for usage examples.

License

Apache License 2.0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	Producer Producer
	Consumer Consumer
	Type     Type
}

Client 统一的消息队列客户端

func NewClient

func NewClient(opts *Options) (*Client, error)

NewClient 根据配置创建统一的消息队列客户端

func NewFromConfig

func NewFromConfig(cfg *Config) (*Client, error)

NewFromConfig 从配置创建客户端

func (*Client) Close

func (c *Client) Close() error

Close 关闭客户端(关闭生产者和消费者)

func (*Client) HealthCheck

func (c *Client) HealthCheck() error

HealthCheck 检查消息队列服务端连接是否正常

type Config

type Config struct {
	Type     string          `yaml:"type" env:"MESSAGE_QUEUE_TYPE" default:"none"`
	Kafka    *KafkaConfig    `yaml:"kafka,omitempty"`
	RabbitMQ *RabbitMQConfig `yaml:"rabbitmq,omitempty"`
}

Config 配置结构体(用于从配置文件创建) 对应 message_queue.yaml 文件结构

func (*Config) ToOptions

func (c *Config) ToOptions() (*Options, error)

ToOptions 将 Config 转换为 Options

func (*Config) Validate

func (c *Config) Validate() error

Validate 验证配置

type Consumer

type Consumer interface {
	// Subscribe 订阅主题并处理消息(自动处理 trace)
	// ctx 用于控制订阅的生命周期
	// topic 是主题名称
	// handler 是消息处理函数,ctx 已包含 trace context,body 是消息体
	Subscribe(ctx context.Context, topic string, handler MessageHandler) error
	// Close 关闭消费者
	Close() error
}

Consumer 统一的消费者接口

type KafkaConfig

type KafkaConfig struct {
	Enabled         bool     `yaml:"enabled" env:"KAFKA_ENABLED" required:"true"`
	Brokers         []string `yaml:"brokers" env:"KAFKA_BROKERS" required:"true"`
	ClientID        string   `yaml:"client_id" env:"KAFKA_CLIENT_ID" default:"ai-api-market"`
	Version         string   `yaml:"version" env:"KAFKA_VERSION" default:"2.8.0"`
	EnableSASL      bool     `yaml:"enable_sasl" env:"KAFKA_ENABLE_SASL" default:"false"`
	SASLMechanism   string   `yaml:"sasl_mechanism" env:"KAFKA_SASL_MECHANISM" default:"PLAIN"`
	SASLUsername    string   `yaml:"sasl_username" env:"KAFKA_SASL_USERNAME"`
	SASLPassword    string   `yaml:"sasl_password" env:"KAFKA_SASL_PASSWORD"`
	EnableTLS       bool     `yaml:"enable_tls" env:"KAFKA_ENABLE_TLS" default:"false"`
	MaxOpenRequests int      `yaml:"max_open_requests" env:"KAFKA_MAX_OPEN_REQUESTS" default:"10"`
	DialTimeout     string   `yaml:"dial_timeout" env:"KAFKA_DIAL_TIMEOUT" default:"30s"`
	ReadTimeout     string   `yaml:"read_timeout" env:"KAFKA_READ_TIMEOUT" default:"30s"`
	WriteTimeout    string   `yaml:"write_timeout" env:"KAFKA_WRITE_TIMEOUT" default:"30s"`
	EnableTrace     bool     `yaml:"enable_trace" env:"KAFKA_ENABLE_TRACE" default:"true"`
}

KafkaConfig Kafka 配置

type MessageHandler

type MessageHandler func(ctx context.Context, body []byte) error

MessageHandler 消息处理函数类型 ctx 已包含从消息中提取的 trace context,可以直接使用 body 是消息体

type Options

type Options struct {
	// 通用选项
	Type        Type
	EnableTrace bool

	// Kafka 选项
	KafkaBrokers         []string
	KafkaClientID        string
	KafkaVersion         string
	KafkaEnableSASL      bool
	KafkaSASLMechanism   string
	KafkaSASLUsername    string
	KafkaSASLPassword    string
	KafkaEnableTLS       bool
	KafkaMaxOpenRequests int
	KafkaDialTimeout     time.Duration
	KafkaReadTimeout     time.Duration
	KafkaWriteTimeout    time.Duration

	// RabbitMQ 选项
	RabbitMQURL            string
	RabbitMQHost           string
	RabbitMQPort           int
	RabbitMQUsername       string
	RabbitMQPassword       string
	RabbitMQVHost          string
	RabbitMQClientID       string
	RabbitMQEnableTLS      bool
	RabbitMQDialTimeout    time.Duration
	RabbitMQReadTimeout    time.Duration
	RabbitMQWriteTimeout   time.Duration
	RabbitMQPrefetchCount  int
	RabbitMQPrefetchSize   int
	RabbitMQReconnectDelay time.Duration
}

Options 统一的消息队列配置选项

func (*Options) Validate

func (o *Options) Validate() error

Validate 验证配置选项

type Producer

type Producer interface {
	// Send 发送消息(自动处理 trace)
	Send(ctx context.Context, topic string, body []byte, opts ...SendOption) error
	// SendString 发送字符串消息
	SendString(ctx context.Context, topic string, body string, opts ...SendOption) error
	// Close 关闭生产者
	Close() error
}

Producer 统一的生产者接口

type RabbitMQConfig

type RabbitMQConfig struct {
	Enabled        bool   `yaml:"enabled" env:"RABBITMQ_ENABLED" required:"true"`
	URL            string `yaml:"url" env:"RABBITMQ_URL"`
	Host           string `yaml:"host" env:"RABBITMQ_HOST" default:"localhost"`
	Port           int    `yaml:"port" env:"RABBITMQ_PORT" default:"5672"`
	Username       string `yaml:"username" env:"RABBITMQ_USERNAME" default:"guest"`
	Password       string `yaml:"password" env:"RABBITMQ_PASSWORD" default:"guest"`
	VHost          string `yaml:"vhost" env:"RABBITMQ_VHOST" default:"/"`
	ClientID       string `yaml:"client_id" env:"RABBITMQ_CLIENT_ID" default:"ai-api-market"`
	EnableTLS      bool   `yaml:"enable_tls" env:"RABBITMQ_ENABLE_TLS" default:"false"`
	DialTimeout    string `yaml:"dial_timeout" env:"RABBITMQ_DIAL_TIMEOUT" default:"30s"`
	ReadTimeout    string `yaml:"read_timeout" env:"RABBITMQ_READ_TIMEOUT" default:"30s"`
	WriteTimeout   string `yaml:"write_timeout" env:"RABBITMQ_WRITE_TIMEOUT" default:"30s"`
	PrefetchCount  int    `yaml:"prefetch_count" env:"RABBITMQ_PREFETCH_COUNT" default:"10"`
	PrefetchSize   int    `yaml:"prefetch_size" env:"RABBITMQ_PREFETCH_SIZE" default:"0"`
	ReconnectDelay string `yaml:"reconnect_delay" env:"RABBITMQ_RECONNECT_DELAY" default:"5s"`
	EnableTrace    bool   `yaml:"enable_trace" env:"RABBITMQ_ENABLE_TRACE" default:"true"`
}

RabbitMQConfig RabbitMQ 配置

type SendOption

type SendOption func(*SendOptions)

SendOption 发送选项函数类型

func WithHeader

func WithHeader(key, value string) SendOption

WithHeader 设置消息头

func WithHeaders

func WithHeaders(headers map[string]string) SendOption

WithHeaders 批量设置消息头

func WithKey

func WithKey(key string) SendOption

WithKey 设置消息键

type SendOptions

type SendOptions struct {
	Key     string
	Headers map[string]string
}

SendOptions 发送选项配置

type Type

type Type string

Type 消息队列类型

const (
	// TypeKafka Kafka 消息队列
	TypeKafka Type = "kafka"
	// TypeRabbitMQ RabbitMQ 消息队列
	TypeRabbitMQ Type = "rabbitmq"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL