Documentation
¶
Index ¶
- Constants
- Variables
- type Config
- type Consumer
- func (c *Consumer) Handler(r *tcp.Remote) error
- func (c *Consumer) HandlerFunc() ConsumerHandler
- func (c *Consumer) IsConnected() bool
- func (c *Consumer) IsRegistered() bool
- func (c *Consumer) JSONMarshal(v any) ([]byte, error)
- func (c *Consumer) JSONUnmarshal(data []byte, v any) error
- func (c *Consumer) Logger() logger.Iface
- func (c *Consumer) OnAccepted(r *tcp.Remote) error
- func (c *Consumer) OnClosed(r *tcp.Remote) error
- func (c *Consumer) ReRegister(r *tcp.Remote) error
- func (c *Consumer) Start() error
- func (c *Consumer) StatusOK() bool
- type ConsumerHandler
- type ConsumerHandlerFunc
- type ConsumerMessage
- type Link
- type Producer
- func (p *Producer) Beautify(data []byte) string
- func (p *Producer) Done() <-chan struct{}
- func (p *Producer) Handler(r *tcp.Remote) error
- func (p *Producer) IsConnected() bool
- func (p *Producer) IsRegistered() bool
- func (p *Producer) JSONMarshal(v any) ([]byte, error)
- func (p *Producer) JSONUnmarshal(data []byte, v any) error
- func (p *Producer) Logger() logger.Iface
- func (p *Producer) NewRecord() *proto.ProducerMessage
- func (p *Producer) OnAccepted(r *tcp.Remote) error
- func (p *Producer) OnClosed(_ *tcp.Remote) error
- func (p *Producer) Publisher(msg *proto.ProducerMessage) error
- func (p *Producer) PutRecord(msg *proto.ProducerMessage)
- func (p *Producer) ReRegister(r *tcp.Remote) error
- func (p *Producer) Send(fn func(record *proto.ProducerMessage) error) error
- func (p *Producer) Start() error
- func (p *Producer) Stop()
- type ProducerHandler
- type ProducerMessage
- type Queue
Constants ¶
View Source
const ( AllConfirm = proto.AllConfirm NoConfirm = proto.NoConfirm LeaderConfirm = proto.LeaderConfirm )
View Source
const (
DefaultProducerSendInterval = 500 * time.Millisecond
)
Variables ¶
View Source
var ErrConsumerHandlerIsNil = errors.New("consumer handler is nil")
View Source
var ErrTopicEmpty = errors.New("topic is empty")
View Source
var NewQueue = proto.NewQueue
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Host string `json:"host"`
Port string `json:"port"`
Ack proto.AckType `json:"ack"`
Ctx context.Context `json:"-"` // 作用于生产者的父context,默认为 context.Background()
Logger logger.Iface `json:"-"`
}
Config 生产者和消费者配置参数
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer 消费者
func NewAsyncConsumer ¶
func NewAsyncConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)
NewAsyncConsumer 创建异步消费者
func NewConsumer ¶
func NewConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)
NewConsumer 创建一个消费者,需要手动Start
func (*Consumer) HandlerFunc ¶
func (c *Consumer) HandlerFunc() ConsumerHandler
HandlerFunc 获取注册的消息处理方法
func (*Consumer) JSONUnmarshal ¶
JSONUnmarshal 反序列化方法
func (*Consumer) OnAccepted ¶
OnAccepted 当TCP连接成功时会自行发送注册消息
type ConsumerHandler ¶
type ConsumerHandler interface {
Topics() []string
Handler(record *proto.ConsumerMessage)
OnConnected() // (同步执行)当连接成功时,发出的信号, 此事件必须在执行完成之后才会进行后续的处理,因此需自行控制
OnClosed() // (同步执行)当连接中断时,发出的信号, 此事件必须在执行完成之后才会进行重连操作(若有)
// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
OnNotImplementMessageType(frame *proto.TransferFrame, r *tcp.Remote)
}
type ConsumerHandlerFunc ¶
type ConsumerHandlerFunc struct{}
func (*ConsumerHandlerFunc) OnClosed ¶
func (c *ConsumerHandlerFunc) OnClosed()
func (*ConsumerHandlerFunc) OnConnected ¶
func (c *ConsumerHandlerFunc) OnConnected()
func (*ConsumerHandlerFunc) OnNotImplementMessageType ¶
func (c *ConsumerHandlerFunc) OnNotImplementMessageType(frame *proto.TransferFrame, r *tcp.Remote)
type ConsumerMessage ¶
type ConsumerMessage = proto.ConsumerMessage
type Link ¶
type Link struct {
Host string `json:"host"`
Port string `json:"port"`
Kind proto.LinkType `json:"kind"`
// contains filtered or unexported fields
}
func (*Link) IsConsumer ¶
func (*Link) IsProducer ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer 生产者, 通过 Send 发送的消息并非会立即投递给服务端 而是会按照服务器下发的配置定时批量发送消息,通常为500ms
func NewAsyncProducer ¶
func NewAsyncProducer(conf Config, handlers ...ProducerHandler) (*Producer, error)
NewAsyncProducer 创建异步生产者,无需再手动启动
func NewProducer ¶
func NewProducer(conf Config, handlers ...ProducerHandler) *Producer
NewProducer 创建异步生产者,需手动启动
func (*Producer) IsConnected ¶
func (*Producer) IsRegistered ¶
func (*Producer) JSONMarshal ¶
JSONMarshal 序列化方法
func (*Producer) NewRecord ¶
func (p *Producer) NewRecord() *proto.ProducerMessage
NewRecord 从池中初始化一个新的消息记录
func (*Producer) Publisher ¶
func (p *Producer) Publisher(msg *proto.ProducerMessage) error
Publisher 发送消息
func (*Producer) PutRecord ¶
func (p *Producer) PutRecord(msg *proto.ProducerMessage)
PutRecord 主动归还消息记录到池,仅在主动调用 NewRecord 却没发送数据时使用
func (*Producer) ReRegister ¶
ReRegister 服务器令客户端重新发起注册流程
type ProducerHandler ¶
type ProducerHandler interface {
OnRegistered() // 阻塞调用
OnClose() // 阻塞调用
OnRegisterExpire() // 阻塞调用
}
type ProducerMessage ¶
type ProducerMessage = proto.ProducerMessage
Click to show internal directories.
Click to hide internal directories.