Documentation
¶
Index ¶
- Variables
- type Capability
- type Category
- type Config
- type Database
- type DbClient
- type DbContextExecutor
- type DbExecutor
- type Logger
- type Message
- type MessageQueue
- type MessageQueuePublisher
- type MessageQueueSubscriber
- type Payload
- type Provider
- type PublisherOption
- type Redis
- type RedisClient
- type RedisCommand
- type SubscriberOption
- type TxCtxKey
Constants ¶
This section is empty.
Variables ¶
var ( DefaultPublisherOption = &PublisherOption{ PublishDelayMessage: false, } DefaultSubscriberOption = &SubscriberOption{ SubscribeDelayMessage: false, } )
Functions ¶
This section is empty.
Types ¶
type Capability ¶
Capability 能力提供者
type Config ¶
type Config interface {
Provider
Unmarshal(configVar any, key ...string) error // 读取配置到变量configVar
Get(key string) any // 获取配置项的值
}
Config Provider
type Database ¶
type Database interface {
Provider
// Default 缺省数据库
Default() DbClient
// Master 主数据库
Master() DbClient
// Slave 从数据库
Slave(i int) DbClient
// Named 指定名称的数据库
Named(name string) DbClient
// Read 返回用于读操作的数据库客户端(自动从 slave 中轮询选择)
Read() DbClient
// Write 返回用于写操作的数据库客户端(返回 master 或 default)
Write() DbClient
}
Database provider
type DbClient ¶
type DbClient interface {
DbContextExecutor
Close() error
// RunInTransaction 在事务中执行函数,支持嵌套事务(通过 SAVEPOINT 实现)
// fn 的参数 ctx 包含事务信息,用于嵌套事务检测
RunInTransaction(ctx context.Context, fn func(ctx context.Context) error) error
// Db returns the underlying *sql.DB for direct access
Db() *sql.DB
}
DbClient 数据库客户端接口
type DbContextExecutor ¶
type DbContextExecutor interface {
DbExecutor
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
PrepareContext(context.Context, string) (*sql.Stmt, error)
}
DbContextExecutor can perform SQL queries with context
type DbExecutor ¶
type Logger ¶
type Logger interface {
Provider
GetStdLogger() *log.Logger
Log(keyvals ...interface{}) error
Trace(msg string, keyvals ...interface{})
Debug(msg string, keyvals ...interface{})
Info(msg string, keyvals ...interface{})
Warn(msg string, keyvals ...interface{})
Error(msg string, keyvals ...interface{})
Fatal(msg string, keyvals ...interface{})
Panic(msg string, keyvals ...interface{})
}
Logger provider
type Message ¶
type Message struct {
// Payload is the message's payload.
Payload Payload
// contains filtered or unexported fields
}
Message is the basic transfer unit. Messages are emitted by Publishers and received by Subscribers.
func NewMessage ¶
NewMessage creates a new Message with payload.
func (*Message) Ack ¶
Ack sends message's acknowledgement.
Ack is not blocking. Ack is idempotent. False is returned, if Nack is already sent.
func (*Message) Acked ¶
func (m *Message) Acked() <-chan struct{}
Acked returns channel which is closed when acknowledgement is sent.
Usage:
select {
case <-message.Acked():
// ack received
case <-message.Nacked():
// nack received
}
func (*Message) Context ¶
Context returns the message's servicectx. To change the servicectx, use SetContext.
The returned servicectx is always non-nil; it defaults to the background servicectx.
func (*Message) Nack ¶
Nack sends message's negative acknowledgement.
Nack is not blocking. Nack is idempotent. False is returned, if Ack is already sent.
func (*Message) Nacked ¶
func (m *Message) Nacked() <-chan struct{}
Nacked returns channel which is closed when negative acknowledgement is sent.
Usage:
select {
case <-message.Acked():
// ack received
case <-message.Nacked():
// nack received
}
func (*Message) SetContext ¶
SetContext sets provided servicectx to the message.
type MessageQueue ¶
type MessageQueue interface {
Provider
NewPublisher(name string, args ...*PublisherOption) (MessageQueuePublisher, error)
NewSubscriber(name string, args ...*SubscriberOption) (MessageQueueSubscriber, error)
}
MessageQueue provider 相同name的多个订阅者如果订阅同一个topic,则只有一个订阅者会收到消息 不同name的多个订阅者果订阅同一个topic,则所有订阅者都会收到消息
type MessageQueuePublisher ¶
type MessageQueuePublisher interface {
// Publish publishes provided messages to given topic.
//
// Publish can be synchronous or asynchronous - it depends on the implementation.
//
// Most publishers implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// Publish must be thread safe.
Publish(topic string, messages [][]byte, delaySeconds ...int64) error
// Close should flush unsent messages, if publisher is async.
Close() error
}
type MessageQueueSubscriber ¶
type MessageQueueSubscriber interface {
// Subscribe returns output channel with messages from provided topic.
// Channel is closed, when Close() was called on the subscriber.
//
// To receive the next message, `Ack()` must be called on the received message.
// If message processing failed and message should be redelivered `Nack()` should be called.
//
// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
// Provided ctx is set to all produced messages.
// When Nack or Ack is called on the message, servicectx of the message is canceled.
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
Close() error
}
type PublisherOption ¶
type PublisherOption struct {
PublishDelayMessage bool
}
type Redis ¶
type Redis interface {
Provider
My() RedisClient
By(string) RedisClient
}
type RedisClient ¶
type RedisClient interface {
// general purpose
Del(key string) error
Dels(keys []string) error
Exists(key string) (bool, error)
Expire(key string, expire int) error
Incr(key string) (int64, error)
IncrBy(key string, number int) (int64, error)
DecrBy(key string, number int) (int64, error)
Ttl(key string) (int64, error)
Pipeline(commands []*RedisCommand) (map[int]any, error)
Ping() error
// Set operations
Set(key string, value interface{}) error
SetEx(key string, value interface{}, expire int) error
// Get operations
Get(key string) ([]byte, error)
GetInt(key string) (int, error)
GetInt64(key string) (int64, error)
GetFloat64(key string) (float64, error)
GetString(key string) (string, error)
// HashMap operations
HGetAll(key string) (map[string]string, error)
HGet(key string, field any) ([]byte, error)
HGetInt(key string, field string) (int, error)
HGetInt64(key string, field string) (int64, error)
HGetFloat64(key string, field string) (float64, error)
HGetString(key string, field string) (string, error)
HMGet(key string, fields []string) ([][]byte, error)
HSet(key string, field interface{}, value interface{}) (int, error)
HMSet(key string, args map[string]interface{}) error
HDel(key string, field interface{}) (int, error)
HDels(key string, fields []interface{}) (int, error)
HLen(key string) (int, error)
// set
SIsMember(key string, member interface{}) (bool, error)
SAdd(key string, members interface{}) error
SRem(key string, members interface{}) error
SInter(keys []string) ([]string, error)
SUnion(keys []string) ([]string, error)
SDiff(keys []string) ([]string, error)
SMembers(key string) ([]string, error)
// zset
ZAdd(key string, score int64, member interface{}) error
ZCard(key string) (int, error)
ZRange(key string, min, max int64) ([]string, error)
ZRemRangeByScore(key string, min, max interface{}) error
ZRangeByScore(key string, min, max interface{}, withScores bool, list *protobuf.ListParam) ([]string, error)
ZScore(key string, member interface{}) (int64, error)
ZInterstore(newKey string, keys ...interface{}) (int64, error)
ZIncrBy(key string, increment int64, member interface{}) error
ZRem(destKey string, members ...interface{}) (int64, error)
// list
LPush(key string, values ...any) error
RPush(key string, values ...any) error
RPop(key string) ([]byte, error)
LRangeInt64(key string, start, end int64) ([]int64, error)
LRangeString(key string, start, end int64) ([]string, error)
LLen(key string) (int64, error)
Eval(scriptContent string, keys []interface{}, args []interface{}) (interface{}, error)
// redis bloom
BfExists(key string, item string) (exists bool, err error)
BfAdd(key string, item string) (exists bool, err error)
BfReserve(key string, errorRate float64, capacity uint64) (err error)
BfAddMulti(key string, items []interface{}) ([]int64, error)
BfExistsMulti(key string, items []interface{}) ([]int64, error)
}
type RedisCommand ¶
type RedisCommand struct {
Name string
Args []interface{}
}
type SubscriberOption ¶
type SubscriberOption struct {
SubscribeDelayMessage bool
}