Documentation
¶
Index ¶
- Constants
- type Adapter
- type Config
- type ConsumerPerformanceConfig
- type KafkaConfig
- type KafkaConsumerConfig
- type KafkaFlushConfig
- type KafkaProducerConfig
- type KafkaRetryConfig
- type KafkaSASLConfig
- type KafkaTLSConfig
- type MemoryConfig
- type ObjectPoolConfig
- type PerformanceConfig
- type PoolConfig
- type ProducerPerformanceConfig
- type RabbitMQConfig
- func (c *RabbitMQConfig) GetConsumerConfig() ConsumerPerformanceConfig
- func (c *RabbitMQConfig) GetObjectPoolConfig() ObjectPoolConfig
- func (c *RabbitMQConfig) GetProducerConfig() ProducerPerformanceConfig
- func (c *RabbitMQConfig) GetSerializationConfig() SerializationConfig
- func (c *RabbitMQConfig) SetDefaults()
- type RedisConfig
- type RedisMode
- type SerializationConfig
Constants ¶
const DefaultKeyPrefix = "mq"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Adapter ¶
type Adapter string
Adapter 适配器类型
type Config ¶
type Config struct {
// Adapter 消息队列适配器类型
Adapter Adapter `json:"type" yaml:"adapter"`
// KeyPrefix 全局key前缀
KeyPrefix string `json:"key_prefix" yaml:"key_prefix"`
// Redis配置
Redis RedisConfig `json:"redis" yaml:"redis"`
// RabbitMQ配置
RabbitMQ RabbitMQConfig `json:"rabbitmq" yaml:"rabbitmq"`
// Kafka配置
Kafka KafkaConfig `json:"kafka" yaml:"kafka"`
// Memory配置
Memory MemoryConfig `json:"memory" yaml:"memory"`
}
Config 消息队列配置
type ConsumerPerformanceConfig ¶
type ConsumerPerformanceConfig struct {
WorkerCount int `json:"worker_count" yaml:"worker_count"`
BufferSize int `json:"buffer_size" yaml:"buffer_size"`
BatchSize int `json:"batch_size" yaml:"batch_size"`
PollTimeout time.Duration `json:"poll_timeout" yaml:"poll_timeout"`
RetryInterval time.Duration `json:"retry_interval" yaml:"retry_interval"`
MaxRetries int `json:"max_retries" yaml:"max_retries"`
}
ConsumerPerformanceConfig 消费者性能配置
type KafkaConfig ¶
type KafkaConfig struct {
// Brokers Kafka代理地址列表
Brokers []string `json:"brokers" yaml:"brokers"`
// GroupID 消费者组ID
GroupID string `json:"group_id" yaml:"group_id"`
// ClientID 客户端ID
ClientID string `json:"client_id" yaml:"client_id"`
// Version Kafka版本
Version string `json:"version" yaml:"version"`
// Producer 生产者配置
Producer KafkaProducerConfig `json:"producer" yaml:"producer"`
// Consumer 消费者配置
Consumer KafkaConsumerConfig `json:"consumer" yaml:"consumer"`
// SASL SASL认证配置
SASL KafkaSASLConfig `json:"sasl" yaml:"sasl"`
// TLS TLS配置
TLS KafkaTLSConfig `json:"tls" yaml:"tls"`
// KeyPrefix 主题前缀
KeyPrefix string `json:"key_prefix" yaml:"key_prefix"`
}
KafkaConfig Kafka配置
type KafkaConsumerConfig ¶
type KafkaConsumerConfig struct {
// MinBytes 最小字节数
MinBytes int `json:"min_bytes" yaml:"min_bytes"`
// MaxBytes 最大字节数
MaxBytes int `json:"max_bytes" yaml:"max_bytes"`
// MaxWait 最大等待时间
MaxWait time.Duration `json:"max_wait" yaml:"max_wait"`
// CommitInterval 提交间隔
CommitInterval time.Duration `json:"commit_interval" yaml:"commit_interval"`
// StartOffset 起始偏移量
StartOffset int64 `json:"start_offset" yaml:"start_offset"`
// HeartbeatInterval 心跳间隔
HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
// SessionTimeout 会话超时时间
SessionTimeout time.Duration `json:"session_timeout" yaml:"session_timeout"`
// RebalanceTimeout 重平衡超时时间
RebalanceTimeout time.Duration `json:"rebalance_timeout" yaml:"rebalance_timeout"`
// RetentionTime 保留时间
RetentionTime time.Duration `json:"retention_time" yaml:"retention_time"`
}
KafkaConsumerConfig Kafka消费者配置
func DefaultKafkaConsumerConfig ¶
func DefaultKafkaConsumerConfig() KafkaConsumerConfig
DefaultKafkaConsumerConfig 默认Kafka消费者配置
type KafkaFlushConfig ¶
type KafkaFlushConfig struct {
// Frequency 刷新频率
Frequency time.Duration `json:"frequency" yaml:"frequency"`
// Messages 消息数量
Messages int `json:"messages" yaml:"messages"`
// Bytes 字节数
Bytes int `json:"bytes" yaml:"bytes"`
}
KafkaFlushConfig Kafka刷新配置
type KafkaProducerConfig ¶
type KafkaProducerConfig struct {
// MaxMessageBytes 最大消息大小
MaxMessageBytes int `json:"max_message_bytes" yaml:"max_message_bytes"`
// RequiredAcks 确认级别
RequiredAcks int `json:"required_acks" yaml:"required_acks"`
// Timeout 超时时间
Timeout time.Duration `json:"timeout" yaml:"timeout"`
// Compression 压缩类型
Compression string `json:"compression" yaml:"compression"`
// Flush 刷新配置
Flush KafkaFlushConfig `json:"flush" yaml:"flush"`
// Retry 重试配置
Retry KafkaRetryConfig `json:"retry" yaml:"retry"`
// Idempotent 是否幂等
Idempotent bool `json:"idempotent" yaml:"idempotent"`
// BatchSize 批量大小
BatchSize int `json:"batch_size" yaml:"batch_size"`
// BatchTimeout 批量超时时间
BatchTimeout time.Duration `json:"batch_timeout" yaml:"batch_timeout"`
}
KafkaProducerConfig Kafka生产者配置
func DefaultKafkaProducerConfig ¶
func DefaultKafkaProducerConfig() KafkaProducerConfig
DefaultKafkaProducerConfig 默认Kafka生产者配置
type KafkaRetryConfig ¶
type KafkaRetryConfig struct {
// Max 最大重试次数
Max int `json:"max" yaml:"max"`
// Backoff 退避时间
Backoff time.Duration `json:"backoff" yaml:"backoff"`
}
KafkaRetryConfig Kafka重试配置
type KafkaSASLConfig ¶
type KafkaSASLConfig struct {
// Enable 是否启用SASL
Enable bool `json:"enable" yaml:"enable"`
// Mechanism SASL机制
Mechanism string `json:"mechanism" yaml:"mechanism"`
// Username 用户名
Username string `json:"username" yaml:"username"`
// Password 密码
Password string `json:"password" yaml:"password"`
}
KafkaSASLConfig Kafka SASL配置
func DefaultKafkaSASLConfig ¶
func DefaultKafkaSASLConfig() KafkaSASLConfig
DefaultKafkaSASLConfig 默认Kafka SASL配置
type KafkaTLSConfig ¶
type KafkaTLSConfig struct {
// Enable 是否启用TLS
Enable bool `json:"enable" yaml:"enable"`
// InsecureSkipVerify 是否跳过证书验证
InsecureSkipVerify bool `json:"insecure_skip_verify" yaml:"insecure_skip_verify"`
// CertFile 证书文件路径
CertFile string `json:"cert_file" yaml:"cert_file"`
// KeyFile 私钥文件路径
KeyFile string `json:"key_file" yaml:"key_file"`
// CAFile CA证书文件路径
CAFile string `json:"ca_file" yaml:"ca_file"`
}
KafkaTLSConfig Kafka TLS配置
func DefaultKafkaTLSConfig ¶
func DefaultKafkaTLSConfig() KafkaTLSConfig
DefaultKafkaTLSConfig 默认Kafka TLS配置
type MemoryConfig ¶ added in v0.3.3
type MemoryConfig struct {
// MaxQueueSize 每个topic的最大队列大小,0表示无限制
MaxQueueSize int `json:"max_queue_size" yaml:"max_queue_size"`
// MaxDelayQueueSize 延时队列的最大大小,0表示无限制
MaxDelayQueueSize int `json:"max_delay_queue_size" yaml:"max_delay_queue_size"`
// DelayCheckInterval 延时消息检查间隔
DelayCheckInterval time.Duration `json:"delay_check_interval" yaml:"delay_check_interval"`
// EnableMetrics 是否启用指标收集
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"`
}
MemoryConfig 内存消息队列配置
func (*MemoryConfig) SetDefaults ¶ added in v0.3.3
func (c *MemoryConfig) SetDefaults()
SetDefaults 设置默认值
type ObjectPoolConfig ¶
type ObjectPoolConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
MaxMessageObjects int `json:"max_message_objects" yaml:"max_message_objects"`
MaxBufferObjects int `json:"max_buffer_objects" yaml:"max_buffer_objects"`
}
ObjectPoolConfig 对象池配置
type PerformanceConfig ¶
type PerformanceConfig struct {
// 消费者配置
Consumer ConsumerPerformanceConfig `json:"consumer" yaml:"consumer"`
// 生产者配置
Producer ProducerPerformanceConfig `json:"producer" yaml:"producer"`
// 序列化配置
Serialization SerializationConfig `json:"serialization" yaml:"serialization"`
// 对象池配置
ObjectPool ObjectPoolConfig `json:"object_pool" yaml:"object_pool"`
}
PerformanceConfig 性能配置
func DefaultPerformanceConfig ¶
func DefaultPerformanceConfig() PerformanceConfig
DefaultPerformanceConfig 默认性能配置
type PoolConfig ¶
type PoolConfig struct {
MaxIdle int `json:"max_idle" yaml:"max_idle"`
MaxActive int `json:"max_active" yaml:"max_active"`
IdleTimeout time.Duration `json:"idle_timeout" yaml:"idle_timeout"`
MaxLifetime time.Duration `json:"max_lifetime" yaml:"max_lifetime"`
}
PoolConfig 连接池配置
type ProducerPerformanceConfig ¶
type ProducerPerformanceConfig struct {
BatchSize int `json:"batch_size" yaml:"batch_size"`
FlushInterval time.Duration `json:"flush_interval" yaml:"flush_interval"`
Compression bool `json:"compression" yaml:"compression"`
}
ProducerPerformanceConfig 生产者性能配置
type RabbitMQConfig ¶
type RabbitMQConfig struct {
// URL RabbitMQ连接URL
URL string `json:"url" yaml:"url"`
// Host RabbitMQ主机
Host string `json:"host" yaml:"host"`
// Port RabbitMQ端口
Port int `json:"port" yaml:"port"`
// Username 用户名
Username string `json:"username" yaml:"username"`
// Password 密码
Password string `json:"password" yaml:"password"`
// VHost 虚拟主机
VHost string `json:"vhost" yaml:"vhost"`
// ExchangeType 交换机类型
ExchangeType string `json:"exchange_type" yaml:"exchange_type"`
// QueueDurable 队列是否持久化
QueueDurable bool `json:"queue_durable" yaml:"queue_durable"`
// QueueAutoDelete 队列是否自动删除
QueueAutoDelete bool `json:"queue_auto_delete" yaml:"queue_auto_delete"`
// QueueExclusive 队列是否独占
QueueExclusive bool `json:"queue_exclusive" yaml:"queue_exclusive"`
// QueueNoWait 队列是否等待
QueueNoWait bool `json:"queue_no_wait" yaml:"queue_no_wait"`
// QoS 消费者QoS设置
QoS int `json:"qos" yaml:"qos"`
// Heartbeat 心跳间隔
Heartbeat time.Duration `json:"heartbeat" yaml:"heartbeat"`
// ConnectionTimeout 连接超时时间
ConnectionTimeout time.Duration `json:"connection_timeout" yaml:"connection_timeout"`
// ChannelMax 最大通道数
ChannelMax int `json:"channel_max" yaml:"channel_max"`
// FrameSize 帧大小
FrameSize int `json:"frame_size" yaml:"frame_size"`
// 连接池配置
PoolSize int `json:"pool_size" yaml:"pool_size"`
MinConnections int `json:"min_connections" yaml:"min_connections"`
MaxConnections int `json:"max_connections" yaml:"max_connections"`
ChannelPoolSize int `json:"channel_pool_size" yaml:"channel_pool_size"`
// 重连配置
MaxRetries int `json:"max_retries" yaml:"max_retries"`
RetryInterval time.Duration `json:"retry_interval" yaml:"retry_interval"`
ReconnectDelay time.Duration `json:"reconnect_delay" yaml:"reconnect_delay"`
// 性能配置
Performance PerformanceConfig `json:"performance" yaml:"performance"`
}
RabbitMQConfig RabbitMQ配置
func DefaultRabbitMQConfig ¶
func DefaultRabbitMQConfig() RabbitMQConfig
DefaultRabbitMQConfig 默认RabbitMQ配置
func (*RabbitMQConfig) GetConsumerConfig ¶
func (c *RabbitMQConfig) GetConsumerConfig() ConsumerPerformanceConfig
GetConsumerConfig 获取消费者配置
func (*RabbitMQConfig) GetObjectPoolConfig ¶
func (c *RabbitMQConfig) GetObjectPoolConfig() ObjectPoolConfig
GetObjectPoolConfig 获取对象池配置
func (*RabbitMQConfig) GetProducerConfig ¶
func (c *RabbitMQConfig) GetProducerConfig() ProducerPerformanceConfig
GetProducerConfig 获取生产者配置
func (*RabbitMQConfig) GetSerializationConfig ¶
func (c *RabbitMQConfig) GetSerializationConfig() SerializationConfig
GetSerializationConfig 获取序列化配置
type RedisConfig ¶
type RedisConfig struct {
// 连接模式
Mode RedisMode `json:"mode" yaml:"mode"`
// 单机模式配置
Addr string `json:"addr" yaml:"addr"`
Password string `json:"password" yaml:"password"`
DB int `json:"db" yaml:"db"`
// 集群模式配置
Addrs []string `json:"addrs" yaml:"addrs"`
// 哨兵模式配置
SentinelAddrs []string `json:"sentinel_addrs" yaml:"sentinel_addrs"`
SentinelPassword string `json:"sentinel_password" yaml:"sentinel_password"`
MasterName string `json:"master_name" yaml:"master_name"`
// 连接池配置
PoolSize int `json:"pool_size" yaml:"pool_size"`
MinIdleConns int `json:"min_idle_conns" yaml:"min_idle_conns"`
MaxConnAge time.Duration `json:"max_conn_age" yaml:"max_conn_age"`
PoolTimeout time.Duration `json:"pool_timeout" yaml:"pool_timeout"`
IdleTimeout time.Duration `json:"idle_timeout" yaml:"idle_timeout"`
IdleCheckFrequency time.Duration `json:"idle_check_frequency" yaml:"idle_check_frequency"`
// 重试和超时配置
MaxRetries int `json:"max_retries" yaml:"max_retries"`
MinRetryBackoff time.Duration `json:"min_retry_backoff" yaml:"min_retry_backoff"`
MaxRetryBackoff time.Duration `json:"max_retry_backoff" yaml:"max_retry_backoff"`
DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
// 其他配置
KeyPrefix string `json:"key_prefix" yaml:"key_prefix"`
// 消费者性能配置
ConsumerWorkerCount int `json:"consumer_worker_count" yaml:"consumer_worker_count"`
ConsumerBufferSize int `json:"consumer_buffer_size" yaml:"consumer_buffer_size"`
ConsumerBatchSize int `json:"consumer_batch_size" yaml:"consumer_batch_size"`
ConsumerPollTimeout time.Duration `json:"consumer_poll_timeout" yaml:"consumer_poll_timeout"`
ConsumerRetryInterval time.Duration `json:"consumer_retry_interval" yaml:"consumer_retry_interval"`
ConsumerMaxRetries int `json:"consumer_max_retries" yaml:"consumer_max_retries"`
// 生产者性能配置
ProducerBatchSize int `json:"producer_batch_size" yaml:"producer_batch_size"`
ProducerFlushInterval time.Duration `json:"producer_flush_interval" yaml:"producer_flush_interval"`
ProducerCompression bool `json:"producer_compression" yaml:"producer_compression"`
// 序列化配置
SerializationType string `json:"serialization_type" yaml:"serialization_type"`
SerializationCompression bool `json:"serialization_compression" yaml:"serialization_compression"`
// 对象池配置
ObjectPoolEnabled bool `json:"object_pool_enabled" yaml:"object_pool_enabled"`
ObjectPoolMaxMessageObjects int `json:"object_pool_max_message_objects" yaml:"object_pool_max_message_objects"`
ObjectPoolMaxBufferObjects int `json:"object_pool_max_buffer_objects" yaml:"object_pool_max_buffer_objects"`
}
RedisConfig Redis配置
func DefaultRedisClusterConfig ¶
func DefaultRedisClusterConfig() RedisConfig
DefaultRedisClusterConfig 默认Redis集群配置
func DefaultRedisSentinelConfig ¶
func DefaultRedisSentinelConfig() RedisConfig
DefaultRedisSentinelConfig 默认Redis哨兵配置
func (*RedisConfig) GetConsumerConfig ¶
func (r *RedisConfig) GetConsumerConfig() ConsumerPerformanceConfig
GetConsumerConfig 获取消费者配置
func (*RedisConfig) GetObjectPoolConfig ¶
func (r *RedisConfig) GetObjectPoolConfig() ObjectPoolConfig
GetObjectPoolConfig 获取对象池配置
func (*RedisConfig) GetProducerConfig ¶
func (r *RedisConfig) GetProducerConfig() ProducerPerformanceConfig
GetProducerConfig 获取生产者配置
func (*RedisConfig) GetSerializationConfig ¶
func (r *RedisConfig) GetSerializationConfig() SerializationConfig
GetSerializationConfig 获取序列化配置
type SerializationConfig ¶
type SerializationConfig struct {
Type string `json:"type" yaml:"type"` // json, msgpack
Compression bool `json:"compression" yaml:"compression"`
}
SerializationConfig 序列化配置