provider

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultPublisherOption = &PublisherOption{
		PublishDelayMessage: false,
	}

	DefaultSubscriberOption = &SubscriberOption{
		SubscribeDelayMessage: false,
	}
)

Functions

This section is empty.

Types

type Capability

type Capability struct {
	Category Category
	Name     string
	Module   fx.Option
}

Capability 能力提供者

type Category

type Category int
const (
	CategoryUnknown Category = iota
	CategoryConfig
	CategoryLogger
	CategoryDb
	CategoryRedis
	CategoryMq
)

type Config

type Config interface {
	Provider
	Unmarshal(configVar any, key ...string) error // 读取配置到变量configVar
	Get(key string) any                           // 获取配置项的值
}

Config Provider

type Database

type Database interface {
	Provider
	// Default 缺省数据库
	Default() DbClient
	// Master 主数据库
	Master() DbClient
	// Slave 从数据库
	Slave(i int) DbClient
	// Named 指定名称的数据库
	Named(name string) DbClient
	// Read 返回用于读操作的数据库客户端(自动从 slave 中轮询选择)
	Read() DbClient
	// Write 返回用于写操作的数据库客户端(返回 master 或 default)
	Write() DbClient
}

Database provider

type DbClient

type DbClient interface {
	DbContextExecutor

	Close() error
	// RunInTransaction 在事务中执行函数,支持嵌套事务(通过 SAVEPOINT 实现)
	// fn 的参数 ctx 包含事务信息,用于嵌套事务检测
	RunInTransaction(ctx context.Context, fn func(ctx context.Context) error) error
	// Db returns the underlying *sql.DB for direct access
	Db() *sql.DB
}

DbClient 数据库客户端接口

type DbContextExecutor

type DbContextExecutor interface {
	DbExecutor

	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	PrepareContext(context.Context, string) (*sql.Stmt, error)
}

DbContextExecutor can perform SQL queries with context

type DbExecutor

type DbExecutor interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
}

type Logger

type Logger interface {
	Provider
	GetStdLogger() *log.Logger
	Log(keyvals ...interface{}) error
	Trace(msg string, keyvals ...interface{})
	Debug(msg string, keyvals ...interface{})
	Info(msg string, keyvals ...interface{})
	Warn(msg string, keyvals ...interface{})
	Error(msg string, keyvals ...interface{})
	Fatal(msg string, keyvals ...interface{})
	Panic(msg string, keyvals ...interface{})
}

Logger provider

type Message

type Message struct {
	// Payload is the message's payload.
	Payload Payload
	// contains filtered or unexported fields
}

Message is the basic transfer unit. Messages are emitted by Publishers and received by Subscribers.

func NewMessage

func NewMessage(payload Payload) *Message

NewMessage creates a new Message with payload.

func (*Message) Ack

func (m *Message) Ack() bool

Ack sends message's acknowledgement.

Ack is not blocking. Ack is idempotent. False is returned, if Nack is already sent.

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

Acked returns channel which is closed when acknowledgement is sent.

Usage:

select {
case <-message.Acked():
	// ack received
case <-message.Nacked():
	// nack received
}

func (*Message) Context

func (m *Message) Context() context.Context

Context returns the message's servicectx. To change the servicectx, use SetContext.

The returned servicectx is always non-nil; it defaults to the background servicectx.

func (*Message) Nack

func (m *Message) Nack() bool

Nack sends message's negative acknowledgement.

Nack is not blocking. Nack is idempotent. False is returned, if Ack is already sent.

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

Nacked returns channel which is closed when negative acknowledgement is sent.

Usage:

select {
case <-message.Acked():
	// ack received
case <-message.Nacked():
	// nack received
}

func (*Message) SetContext

func (m *Message) SetContext(ctx context.Context)

SetContext sets provided servicectx to the message.

type MessageQueue

type MessageQueue interface {
	Provider
	NewPublisher(name string, args ...*PublisherOption) (MessageQueuePublisher, error)
	NewSubscriber(name string, args ...*SubscriberOption) (MessageQueueSubscriber, error)
}

MessageQueue provider 相同name的多个订阅者如果订阅同一个topic,则只有一个订阅者会收到消息 不同name的多个订阅者果订阅同一个topic,则所有订阅者都会收到消息

type MessageQueuePublisher

type MessageQueuePublisher interface {
	// Publish publishes provided messages to given topic.
	//
	// Publish can be synchronous or asynchronous - it depends on the implementation.
	//
	// Most publishers implementations don't support atomic publishing of messages.
	// This means that if publishing one of the messages fails, the next messages will not be published.
	//
	// Publish must be thread safe.
	Publish(topic string, messages [][]byte, delaySeconds ...int64) error
	// Close should flush unsent messages, if publisher is async.
	Close() error
}

type MessageQueueSubscriber

type MessageQueueSubscriber interface {
	// Subscribe returns output channel with messages from provided topic.
	// Channel is closed, when Close() was called on the subscriber.
	//
	// To receive the next message, `Ack()` must be called on the received message.
	// If message processing failed and message should be redelivered `Nack()` should be called.
	//
	// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
	// Provided ctx is set to all produced messages.
	// When Nack or Ack is called on the message, servicectx of the message is canceled.
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
	Close() error
}

type Payload

type Payload []byte

Payload is the Message's payload.

type Provider

type Provider interface {
	GetCapability() Capability // 获取能力
}

Provider 底层库能力提供者接口

type PublisherOption

type PublisherOption struct {
	PublishDelayMessage bool
}

type Redis

type Redis interface {
	Provider
	My() RedisClient
	By(string) RedisClient
}

type RedisClient

type RedisClient interface {
	// general purpose
	Del(key string) error
	Dels(keys []string) error
	Exists(key string) (bool, error)
	Expire(key string, expire int) error
	Incr(key string) (int64, error)
	IncrBy(key string, number int) (int64, error)
	DecrBy(key string, number int) (int64, error)
	Ttl(key string) (int64, error)
	Pipeline(commands []*RedisCommand) (map[int]any, error)
	Ping() error

	// Set operations
	Set(key string, value interface{}) error
	SetEx(key string, value interface{}, expire int) error

	// Get operations
	Get(key string) ([]byte, error)
	GetInt(key string) (int, error)
	GetInt64(key string) (int64, error)
	GetFloat64(key string) (float64, error)
	GetString(key string) (string, error)

	// HashMap operations
	HGetAll(key string) (map[string]string, error)
	HGet(key string, field any) ([]byte, error)
	HGetInt(key string, field string) (int, error)
	HGetInt64(key string, field string) (int64, error)
	HGetFloat64(key string, field string) (float64, error)
	HGetString(key string, field string) (string, error)
	HMGet(key string, fields []string) ([][]byte, error)
	HSet(key string, field interface{}, value interface{}) (int, error)
	HMSet(key string, args map[string]interface{}) error
	HDel(key string, field interface{}) (int, error)
	HDels(key string, fields []interface{}) (int, error)
	HLen(key string) (int, error)

	// set
	SIsMember(key string, member interface{}) (bool, error)
	SAdd(key string, members interface{}) error
	SRem(key string, members interface{}) error
	SInter(keys []string) ([]string, error)
	SUnion(keys []string) ([]string, error)
	SDiff(keys []string) ([]string, error)
	SMembers(key string) ([]string, error)

	// zset
	ZAdd(key string, score int64, member interface{}) error
	ZCard(key string) (int, error)
	ZRange(key string, min, max int64) ([]string, error)
	ZRemRangeByScore(key string, min, max interface{}) error
	ZRangeByScore(key string, min, max interface{}, withScores bool, list *protobuf.ListParam) ([]string, error)
	ZScore(key string, member interface{}) (int64, error)
	ZInterstore(newKey string, keys ...interface{}) (int64, error)
	ZIncrBy(key string, increment int64, member interface{}) error
	ZRem(destKey string, members ...interface{}) (int64, error)

	// list
	LPush(key string, values ...any) error
	RPush(key string, values ...any) error
	RPop(key string) ([]byte, error)
	LRangeInt64(key string, start, end int64) ([]int64, error)
	LRangeString(key string, start, end int64) ([]string, error)
	LLen(key string) (int64, error)
	Eval(scriptContent string, keys []interface{}, args []interface{}) (interface{}, error)

	// redis bloom
	BfExists(key string, item string) (exists bool, err error)
	BfAdd(key string, item string) (exists bool, err error)
	BfReserve(key string, errorRate float64, capacity uint64) (err error)
	BfAddMulti(key string, items []interface{}) ([]int64, error)
	BfExistsMulti(key string, items []interface{}) ([]int64, error)
}

type RedisCommand

type RedisCommand struct {
	Name string
	Args []interface{}
}

type SubscriberOption

type SubscriberOption struct {
	SubscribeDelayMessage bool
}

type TxCtxKey

type TxCtxKey struct{}

TxCtxKey 用于在 context 中传递事务状态

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL