Documentation
¶
Index ¶
- Constants
- func GetRedisClient() redis.UniversalClient
- func SetRedisClient(c redis.UniversalClient)
- type AdapterCache
- type AdapterLocker
- type AdapterQueue
- type ConsumerFunc
- type Messager
- type NSQOptions
- type Option
- func WithConsumerFunc(f ConsumerFunc) Option
- func WithDelay(delay time.Duration) Option
- func WithGroupID(groupID string) Option
- func WithKafkaConfig(c *sarama.Config) Option
- func WithMessage(message Messager) Option
- func WithPartition(partition int) Option
- func WithStrategy(f sarama.BalanceStrategy) Option
- func WithTopic(topic string) Option
- type Options
- type RedisConnectOptions
- type TLS
Constants ¶
View Source
const (
PrefixKey = "__host"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AdapterCache ¶
type AdapterLocker ¶
type AdapterQueue ¶
type ConsumerFunc ¶
type NSQOptions ¶
type NSQOptions struct {
DialTimeout time.Duration `opt:"dial_timeout" default:"1s" yaml:"dialTimeout" json:"dialTimeout"`
LookupdAddr string `opt:"-" json:"lookupdAddr" yaml:"lookupdAddr"`
AdminAddr string `opt:"-" json:"adminAddr" yaml:"adminAddr"`
// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s" yaml:"readTimeout" json:"readTimeout"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s" yaml:"writeTimeout" json:"writeTimeout"`
// Addresses is the local address to use when dialing an nsqd.
Addresses []string `opt:"addresses" yaml:"addresses" json:"addresses"`
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s" yaml:"lookupdPollInterval" json:"lookupdPollInterval"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3" yaml:"lookupdPollJitter" json:"lookupdPollJitter"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m" yaml:"maxRequeueDelay" json:"maxRequeueDelay"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s" yaml:"defaultRequeueDelay" json:"defaultRequeueDelay"`
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m" yaml:"maxBackoffDuration" json:"maxBackoffDuration"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s" yaml:"backoffMultiplier" json:"backoffMultiplier"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5" yaml:"maxAttempts" json:"maxAttempts"`
// Duration to wait for a message from an nsqd when in a state where RDY
// counts are re-distributed (e.g. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s" yaml:"lowRdyIdleTimeout" json:"lowRdyIdleTimeout"`
// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s" yaml:"lowRdyTimeout" json:"lowRdyTimeout"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s" yaml:"rdyRedistributeInterval" json:"rdyRedistributeInterval"`
// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id" yaml:"clientID" json:"clientID"` // (defaults: short hostname)
Hostname string `opt:"hostname" yaml:"hostname" json:"hostname"`
UserAgent string `opt:"user_agent" yaml:"userAgent" json:"userAgent"`
// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s" yaml:"heartbeatInterval" json:"heartbeatInterval"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99" yaml:"sampleRate" json:"sampleRate"`
Tls *TLS `yaml:"tls" json:"tls"`
// Compression Settings
Deflate bool `opt:"deflate" yaml:"deflate" json:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6" yaml:"deflateLevel" json:"deflateLevel"`
Snappy bool `opt:"snappy" yaml:"snappy" json:"snappy"`
// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384" yaml:"outputBufferSize" json:"outputBufferSize"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms" yaml:"outputBufferTimeout" json:"outputBufferTimeout"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1" yaml:"maxInFlight" json:"maxInFlight"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0" yaml:"msgTimeout" json:"msgTimeout"`
// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret" yaml:"authSecret" json:"authSecret"`
}
func (NSQOptions) GetNSQOptions ¶
func (e NSQOptions) GetNSQOptions() (*nsq.Config, error)
type Option ¶
type Option func(*Options)
func WithConsumerFunc ¶
func WithConsumerFunc(f ConsumerFunc) Option
func WithGroupID ¶
func WithKafkaConfig ¶
func WithMessage ¶
func WithPartition ¶
func WithStrategy ¶
func WithStrategy(f sarama.BalanceStrategy) Option
type Options ¶
type Options struct {
Topic string
GroupID string
F ConsumerFunc
Message Messager
Partition int
PartitionAssignmentStrategy sarama.BalanceStrategy
KafkaConfig *sarama.Config
Delay time.Duration
}
func DefaultOptions ¶
func DefaultOptions() *Options
func SetOptions ¶
type RedisConnectOptions ¶
type RedisConnectOptions struct {
// Addr In order to be compatible with the previous configuration
// Deprecated: Use Addrs instead.
Addr string `yaml:"addr" json:"addr"`
Addrs []string `yaml:"addrs"`
ClientName string `yaml:"clientName"`
DB int `yaml:"db"`
Username string `yaml:"username"`
Password string `yaml:"password"`
SentinelUsername string `yaml:"sentinelUsername"`
SentinelPassword string `yaml:"sentinelPassword"`
MasterName string `yaml:"masterName"`
Protocol int `yaml:"protocol"`
MaxRetries int `yaml:"maxRetries"`
MinRetryBackoff time.Duration `yaml:"minRetryBackoff"`
MaxRetryBackoff time.Duration `yaml:"maxRetryBackoff"`
DialTimeout time.Duration `yaml:"dialTimeout"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
ContextTimeout bool `yaml:"contextTimeoutEnabled"`
PoolFIFO bool `yaml:"poolFIFO"`
PoolSize int `yaml:"poolSize"`
PoolTimeout time.Duration `yaml:"poolTimeout"`
MinIdleConns int `yaml:"minIdleConns"`
MaxIdleConns int `yaml:"maxIdleConns"`
MaxActiveConns int `yaml:"maxActiveConns"`
ConnMaxIdleTime time.Duration `yaml:"connMaxIdleTime"`
ConnMaxLifetime time.Duration `yaml:"connMaxLifetime"`
TLS *TLS `yaml:"tls" json:"tls"`
MaxRedirects int `yaml:"maxRedirects"`
ReadOnly bool `yaml:"readOnly"`
RouteByLatency bool `yaml:"routeByLatency"`
RouteRandomly bool `yaml:"routeRandomly"`
DisableIdentity bool `yaml:"disableIdentity"`
IdentitySuffix string `yaml:"identitySuffix"`
UnstableResp3 bool `yaml:"unstableResp3"`
}
func (*RedisConnectOptions) GetRedisOptions ¶
func (e *RedisConnectOptions) GetRedisOptions() (opt *redis.UniversalOptions, err error)
Click to show internal directories.
Click to hide internal directories.