Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Type string `yaml:"type" env:"MESSAGE_QUEUE_TYPE" default:"none"`
Kafka *KafkaConfig `yaml:"kafka,omitempty"`
RabbitMQ *RabbitMQConfig `yaml:"rabbitmq,omitempty"`
}
Config 配置结构体(用于从配置文件创建) 对应 message_queue.yaml 文件结构
type Consumer ¶
type Consumer interface {
// Subscribe 订阅主题并处理消息(自动处理 trace)
// ctx 用于控制订阅的生命周期
// topic 是主题名称
// handler 是消息处理函数,ctx 已包含 trace context,body 是消息体
Subscribe(ctx context.Context, topic string, handler MessageHandler) error
// Close 关闭消费者
Close() error
}
Consumer 统一的消费者接口
type KafkaConfig ¶
type KafkaConfig struct {
Enabled bool `yaml:"enabled" env:"KAFKA_ENABLED" required:"true"`
Brokers []string `yaml:"brokers" env:"KAFKA_BROKERS" required:"true"`
ClientID string `yaml:"client_id" env:"KAFKA_CLIENT_ID" default:"ai-api-market"`
Version string `yaml:"version" env:"KAFKA_VERSION" default:"2.8.0"`
EnableSASL bool `yaml:"enable_sasl" env:"KAFKA_ENABLE_SASL" default:"false"`
SASLMechanism string `yaml:"sasl_mechanism" env:"KAFKA_SASL_MECHANISM" default:"PLAIN"`
SASLUsername string `yaml:"sasl_username" env:"KAFKA_SASL_USERNAME"`
SASLPassword string `yaml:"sasl_password" env:"KAFKA_SASL_PASSWORD"`
EnableTLS bool `yaml:"enable_tls" env:"KAFKA_ENABLE_TLS" default:"false"`
MaxOpenRequests int `yaml:"max_open_requests" env:"KAFKA_MAX_OPEN_REQUESTS" default:"10"`
DialTimeout string `yaml:"dial_timeout" env:"KAFKA_DIAL_TIMEOUT" default:"30s"`
ReadTimeout string `yaml:"read_timeout" env:"KAFKA_READ_TIMEOUT" default:"30s"`
WriteTimeout string `yaml:"write_timeout" env:"KAFKA_WRITE_TIMEOUT" default:"30s"`
EnableTrace bool `yaml:"enable_trace" env:"KAFKA_ENABLE_TRACE" default:"true"`
}
KafkaConfig Kafka 配置
type MessageHandler ¶
MessageHandler 消息处理函数类型 ctx 已包含从消息中提取的 trace context,可以直接使用 body 是消息体
type Options ¶
type Options struct {
// 通用选项
Type Type
EnableTrace bool
// Kafka 选项
KafkaBrokers []string
KafkaClientID string
KafkaVersion string
KafkaEnableSASL bool
KafkaSASLMechanism string
KafkaSASLUsername string
KafkaSASLPassword string
KafkaEnableTLS bool
KafkaMaxOpenRequests int
KafkaDialTimeout time.Duration
KafkaReadTimeout time.Duration
KafkaWriteTimeout time.Duration
// RabbitMQ 选项
RabbitMQURL string
RabbitMQHost string
RabbitMQPort int
RabbitMQUsername string
RabbitMQPassword string
RabbitMQVHost string
RabbitMQClientID string
RabbitMQEnableTLS bool
RabbitMQDialTimeout time.Duration
RabbitMQReadTimeout time.Duration
RabbitMQWriteTimeout time.Duration
RabbitMQPrefetchCount int
RabbitMQPrefetchSize int
RabbitMQReconnectDelay time.Duration
}
Options 统一的消息队列配置选项
type Producer ¶
type Producer interface {
// Send 发送消息(自动处理 trace)
Send(ctx context.Context, topic string, body []byte, opts ...SendOption) error
// SendString 发送字符串消息
SendString(ctx context.Context, topic string, body string, opts ...SendOption) error
// Close 关闭生产者
Close() error
}
Producer 统一的生产者接口
type RabbitMQConfig ¶
type RabbitMQConfig struct {
Enabled bool `yaml:"enabled" env:"RABBITMQ_ENABLED" required:"true"`
URL string `yaml:"url" env:"RABBITMQ_URL"`
Host string `yaml:"host" env:"RABBITMQ_HOST" default:"localhost"`
Port int `yaml:"port" env:"RABBITMQ_PORT" default:"5672"`
Username string `yaml:"username" env:"RABBITMQ_USERNAME" default:"guest"`
Password string `yaml:"password" env:"RABBITMQ_PASSWORD" default:"guest"`
VHost string `yaml:"vhost" env:"RABBITMQ_VHOST" default:"/"`
ClientID string `yaml:"client_id" env:"RABBITMQ_CLIENT_ID" default:"ai-api-market"`
EnableTLS bool `yaml:"enable_tls" env:"RABBITMQ_ENABLE_TLS" default:"false"`
DialTimeout string `yaml:"dial_timeout" env:"RABBITMQ_DIAL_TIMEOUT" default:"30s"`
ReadTimeout string `yaml:"read_timeout" env:"RABBITMQ_READ_TIMEOUT" default:"30s"`
WriteTimeout string `yaml:"write_timeout" env:"RABBITMQ_WRITE_TIMEOUT" default:"30s"`
PrefetchCount int `yaml:"prefetch_count" env:"RABBITMQ_PREFETCH_COUNT" default:"10"`
PrefetchSize int `yaml:"prefetch_size" env:"RABBITMQ_PREFETCH_SIZE" default:"0"`
ReconnectDelay string `yaml:"reconnect_delay" env:"RABBITMQ_RECONNECT_DELAY" default:"5s"`
EnableTrace bool `yaml:"enable_trace" env:"RABBITMQ_ENABLE_TRACE" default:"true"`
}
RabbitMQConfig RabbitMQ 配置
type SendOptions ¶
SendOptions 发送选项配置
Click to show internal directories.
Click to hide internal directories.