storage

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2025 License: MIT Imports: 12 Imported by: 2

Documentation

Index

Constants

View Source
const (
	PrefixKey = "__host"
)

Variables

This section is empty.

Functions

func GetRedisClient

func GetRedisClient() redis.UniversalClient

GetRedisClient 获取redis客户端

func SetRedisClient

func SetRedisClient(c redis.UniversalClient)

SetRedisClient 设置redis客户端

Types

type AdapterCache

type AdapterCache interface {
	redis.UniversalClient
	Name() string
	String() string
	Initialize(*gorm.DB) error
	RemoveFromTag(ctx context.Context, tag string) error
}

type AdapterLocker

type AdapterLocker interface {
	String() string
	Lock(ctx context.Context, key string, ttl time.Duration, options *redislock.Options) (*redislock.Lock, error)
}

type AdapterQueue

type AdapterQueue interface {
	String() string
	Append(opts ...Option) error
	Register(opts ...Option)
	Run(context.Context)
	Shutdown()
}

type ConsumerFunc

type ConsumerFunc func(Messager) error

type Messager

type Messager interface {
	SetID(string)
	SetStream(string)
	SetValues(map[string]any)
	GetID() string
	GetStream() string
	GetValues() map[string]any
	GetPrefix() string
	SetPrefix(string)
	SetErrorCount(count int)
	GetErrorCount() int
	SetContext(ctx context.Context)
	GetContext() context.Context
}

type NSQOptions

type NSQOptions struct {
	DialTimeout time.Duration `opt:"dial_timeout" default:"1s" yaml:"dialTimeout" json:"dialTimeout"`

	LookupdAddr string `opt:"-" json:"lookupdAddr" yaml:"lookupdAddr"`

	AdminAddr string `opt:"-" json:"adminAddr" yaml:"adminAddr"`

	// Deadlines for network reads and writes
	ReadTimeout  time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s" yaml:"readTimeout" json:"readTimeout"`
	WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s" yaml:"writeTimeout" json:"writeTimeout"`

	// Addresses is the local address to use when dialing an nsqd.
	Addresses []string `opt:"addresses" yaml:"addresses" json:"addresses"`

	// Duration between polling lookupd for new producers, and fractional jitter to add to
	// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
	// restart at the same time
	//
	// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
	// reconnection attempts
	LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s" yaml:"lookupdPollInterval" json:"lookupdPollInterval"`
	LookupdPollJitter   float64       `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3" yaml:"lookupdPollJitter" json:"lookupdPollJitter"`

	// Maximum duration when REQueueing (for doubling of deferred requeue)
	MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m" yaml:"maxRequeueDelay" json:"maxRequeueDelay"`
	DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s" yaml:"defaultRequeueDelay" json:"defaultRequeueDelay"`

	// Maximum amount of time to backoff when processing fails 0 == no backoff
	MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m" yaml:"maxBackoffDuration" json:"maxBackoffDuration"`
	// Unit of time for calculating consumer backoff
	BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s" yaml:"backoffMultiplier" json:"backoffMultiplier"`

	// Maximum number of times this consumer will attempt to process a message before giving up
	MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5" yaml:"maxAttempts" json:"maxAttempts"`

	// Duration to wait for a message from an nsqd when in a state where RDY
	// counts are re-distributed (e.g. max_in_flight < num_producers)
	LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s" yaml:"lowRdyIdleTimeout" json:"lowRdyIdleTimeout"`
	// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
	LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s" yaml:"lowRdyTimeout" json:"lowRdyTimeout"`
	// Duration between redistributing max-in-flight to connections
	RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s" yaml:"rdyRedistributeInterval" json:"rdyRedistributeInterval"`

	// Identifiers sent to nsqd representing this client
	// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
	ClientID  string `opt:"client_id" yaml:"clientID" json:"clientID"` // (defaults: short hostname)
	Hostname  string `opt:"hostname" yaml:"hostname" json:"hostname"`
	UserAgent string `opt:"user_agent" yaml:"userAgent" json:"userAgent"`

	// Duration of time between heartbeats. This must be less than ReadTimeout
	HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s" yaml:"heartbeatInterval" json:"heartbeatInterval"`
	// Integer percentage to sample the channel (requires nsqd 0.2.25+)
	SampleRate int32 `opt:"sample_rate" min:"0" max:"99" yaml:"sampleRate" json:"sampleRate"`

	Tls *TLS `yaml:"tls" json:"tls"`

	// Compression Settings
	Deflate      bool `opt:"deflate" yaml:"deflate" json:"deflate"`
	DeflateLevel int  `opt:"deflate_level" min:"1" max:"9" default:"6" yaml:"deflateLevel" json:"deflateLevel"`
	Snappy       bool `opt:"snappy" yaml:"snappy" json:"snappy"`

	// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
	OutputBufferSize int64 `opt:"output_buffer_size" default:"16384" yaml:"outputBufferSize" json:"outputBufferSize"`
	// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
	//
	// WARNING: configuring clients with an extremely low
	// (< 25ms) output_buffer_timeout has a significant effect
	// on nsqd CPU usage (particularly with > 50 clients connected).
	OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms" yaml:"outputBufferTimeout" json:"outputBufferTimeout"`

	// Maximum number of messages to allow in flight (concurrency knob)
	MaxInFlight int `opt:"max_in_flight" min:"0" default:"1" yaml:"maxInFlight" json:"maxInFlight"`

	// The server-side message timeout for messages delivered to this client
	MsgTimeout time.Duration `opt:"msg_timeout" min:"0" yaml:"msgTimeout" json:"msgTimeout"`

	// secret for nsqd authentication (requires nsqd 0.2.29+)
	AuthSecret string `opt:"auth_secret" yaml:"authSecret" json:"authSecret"`
}

func (NSQOptions) GetNSQOptions

func (e NSQOptions) GetNSQOptions() (*nsq.Config, error)

type Option

type Option func(*Options)

func WithConsumerFunc

func WithConsumerFunc(f ConsumerFunc) Option

func WithDelay added in v0.6.0

func WithDelay(delay time.Duration) Option

func WithGroupID

func WithGroupID(groupID string) Option

func WithKafkaConfig

func WithKafkaConfig(c *sarama.Config) Option

func WithMessage

func WithMessage(message Messager) Option

func WithPartition

func WithPartition(partition int) Option

func WithStrategy

func WithStrategy(f sarama.BalanceStrategy) Option

func WithTopic

func WithTopic(topic string) Option

type Options

type Options struct {
	Topic                       string
	GroupID                     string
	F                           ConsumerFunc
	Message                     Messager
	Partition                   int
	PartitionAssignmentStrategy sarama.BalanceStrategy
	KafkaConfig                 *sarama.Config
	Delay                       time.Duration
}

func DefaultOptions

func DefaultOptions() *Options

func SetOptions

func SetOptions(opts ...Option) *Options

type RedisConnectOptions

type RedisConnectOptions struct {
	// Addr In order to be compatible with the previous configuration
	// Deprecated: Use Addrs instead.
	Addr             string        `yaml:"addr" json:"addr"`
	Addrs            []string      `yaml:"addrs"`
	ClientName       string        `yaml:"clientName"`
	DB               int           `yaml:"db"`
	Username         string        `yaml:"username"`
	Password         string        `yaml:"password"`
	SentinelUsername string        `yaml:"sentinelUsername"`
	SentinelPassword string        `yaml:"sentinelPassword"`
	MasterName       string        `yaml:"masterName"`
	Protocol         int           `yaml:"protocol"`
	MaxRetries       int           `yaml:"maxRetries"`
	MinRetryBackoff  time.Duration `yaml:"minRetryBackoff"`
	MaxRetryBackoff  time.Duration `yaml:"maxRetryBackoff"`
	DialTimeout      time.Duration `yaml:"dialTimeout"`
	ReadTimeout      time.Duration `yaml:"readTimeout"`
	WriteTimeout     time.Duration `yaml:"writeTimeout"`
	ContextTimeout   bool          `yaml:"contextTimeoutEnabled"`
	PoolFIFO         bool          `yaml:"poolFIFO"`
	PoolSize         int           `yaml:"poolSize"`
	PoolTimeout      time.Duration `yaml:"poolTimeout"`
	MinIdleConns     int           `yaml:"minIdleConns"`
	MaxIdleConns     int           `yaml:"maxIdleConns"`
	MaxActiveConns   int           `yaml:"maxActiveConns"`
	ConnMaxIdleTime  time.Duration `yaml:"connMaxIdleTime"`
	ConnMaxLifetime  time.Duration `yaml:"connMaxLifetime"`
	TLS              *TLS          `yaml:"tls" json:"tls"`
	MaxRedirects     int           `yaml:"maxRedirects"`
	ReadOnly         bool          `yaml:"readOnly"`
	RouteByLatency   bool          `yaml:"routeByLatency"`
	RouteRandomly    bool          `yaml:"routeRandomly"`
	DisableIdentity  bool          `yaml:"disableIdentity"`
	IdentitySuffix   string        `yaml:"identitySuffix"`
	UnstableResp3    bool          `yaml:"unstableResp3"`
}

func (*RedisConnectOptions) GetRedisOptions

func (e *RedisConnectOptions) GetRedisOptions() (opt *redis.UniversalOptions, err error)

type TLS

type TLS struct {
	Cert string `yaml:"cert" json:"cert"`
	Key  string `yaml:"key" json:"key"`
	Ca   string `yaml:"ca" json:"ca"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL