Documentation
¶
Index ¶
- Constants
- Variables
- type CHandler
- type Config
- type Consumer
- func (c *Consumer) Handler(r *tcp.Remote) error
- func (c *Consumer) HandlerFunc() ConsumerHandler
- func (c *Consumer) IsConnected() bool
- func (c *Consumer) IsRegistered() bool
- func (c *Consumer) JSONMarshal(v any) ([]byte, error)
- func (c *Consumer) JSONUnmarshal(data []byte, v any) error
- func (c *Consumer) Logger() logger.Iface
- func (c *Consumer) OnAccepted(r *tcp.Remote) error
- func (c *Consumer) OnClosed(r *tcp.Remote) error
- func (c *Consumer) ReRegister(r transfer.Conn) error
- func (c *Consumer) Start() error
- func (c *Consumer) StatusOK() bool
- func (c *Consumer) Stop()
- type ConsumerHandler
- type ConsumerMessage
- type Link
- type PHandler
- type Producer
- func (p *Producer) Beautify(data []byte) string
- func (p *Producer) CanPublisher() bool
- func (p *Producer) Done() <-chan struct{}
- func (p *Producer) Handler(r *tcp.Remote) error
- func (p *Producer) IsConnected() bool
- func (p *Producer) IsRegistered() bool
- func (p *Producer) JSONMarshal(v any) ([]byte, error)
- func (p *Producer) JSONUnmarshal(data []byte, v any) error
- func (p *Producer) Logger() logger.Iface
- func (p *Producer) NewRecord() *proto.ProducerMessage
- func (p *Producer) OnAccepted(r *tcp.Remote) error
- func (p *Producer) OnClosed(_ *tcp.Remote) error
- func (p *Producer) Publisher(msg *proto.ProducerMessage) error
- func (p *Producer) PutRecord(msg *proto.ProducerMessage)
- func (p *Producer) ReRegister(r transfer.Conn) error
- func (p *Producer) Send(fn func(record *proto.ProducerMessage) error) error
- func (p *Producer) Start() error
- func (p *Producer) Stop()
- type ProducerHandler
- type ProducerMessage
- type Queue
- type TCPLink
Constants ¶
View Source
const ( AllConfirm = proto.AllConfirm NoConfirm = proto.NoConfirm LeaderConfirm = proto.LeaderConfirm )
View Source
const (
DefaultProducerSendInterval = 500 * time.Millisecond
)
Variables ¶
View Source
var ErrConsumerHandlerIsNil = errors.New("consumer handler is nil")
View Source
var ErrConsumerUnconnected = errors.New("consumer unconnected")
View Source
var ErrConsumerUnregistered = errors.New("consumer unregistered")
View Source
var ErrProducerUnconnected = errors.New("producer unconnected")
View Source
var ErrProducerUnregistered = errors.New("producer unregistered")
View Source
var ErrTopicEmpty = errors.New("topic is empty")
View Source
var NewQueue = proto.NewQueue
Functions ¶
This section is empty.
Types ¶
type CHandler ¶ added in v0.3.3
type CHandler struct{}
func (*CHandler) OnConnected ¶ added in v0.3.3
func (c *CHandler) OnConnected()
func (*CHandler) OnNotImplementMessageType ¶ added in v0.3.3
func (c *CHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn)
func (*CHandler) OnRegisterFailed ¶ added in v0.3.3
func (c *CHandler) OnRegisterFailed()
type Config ¶
type Config struct {
Host string `json:"host"`
Port string `json:"port"`
Ack proto.AckType `json:"ack"`
Link string `json:"link" description:"tcp/udp"`
Ctx context.Context `json:"-"` // 作用于生产者的父context,默认为 context.Background()
Logger logger.Iface `json:"-"`
Token string `json:"-"`
}
Config 生产者和消费者配置参数
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer 消费者
func NewAsyncConsumer ¶
func NewAsyncConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)
NewAsyncConsumer 创建异步消费者
func NewConsumer ¶
func NewConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)
NewConsumer 创建一个消费者,需要手动Start
func (*Consumer) HandlerFunc ¶
func (c *Consumer) HandlerFunc() ConsumerHandler
HandlerFunc 获取注册的消息处理方法
func (*Consumer) JSONUnmarshal ¶
JSONUnmarshal 反序列化方法
func (*Consumer) OnAccepted ¶
OnAccepted 当TCP连接成功时会自行发送注册消息
type ConsumerHandler ¶
type ConsumerHandler interface {
Topics() []string
Handler(record *proto.ConsumerMessage) // (异步执行)
OnConnected() // (同步执行)当连接成功时,发出的信号, 此事件必须在执行完成之后才会进行后续的处理,因此需自行控制
OnClosed() // (同步执行)当连接中断时,发出的信号, 此事件必须在执行完成之后才会进行重连操作(若有)
OnRegisterFailed() // (同步执行)当注册失败触发的事件
// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
OnNotImplementMessageType(frame *proto.TransferFrame, c transfer.Conn)
}
type ConsumerMessage ¶
type ConsumerMessage = proto.ConsumerMessage
type PHandler ¶ added in v0.3.3
type PHandler struct {
// contains filtered or unexported fields
}
PHandler 默认实现
func (PHandler) OnNotImplementMessageType ¶ added in v0.3.3
func (h PHandler) OnNotImplementMessageType(frame *proto.TransferFrame, r transfer.Conn)
func (PHandler) OnRegisterExpire ¶ added in v0.3.3
func (h PHandler) OnRegisterExpire()
func (PHandler) OnRegisterFailed ¶ added in v0.3.3
func (h PHandler) OnRegisterFailed(status proto.MessageResponseStatus)
func (PHandler) OnRegistered ¶ added in v0.3.3
func (h PHandler) OnRegistered()
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer 生产者, 通过 Send 发送的消息并非会立即投递给服务端 而是会按照服务器下发的配置定时批量发送消息,通常为500ms
func NewAsyncProducer ¶
func NewAsyncProducer(conf Config, handlers ...ProducerHandler) (*Producer, error)
NewAsyncProducer 创建异步生产者,无需再手动启动
func NewProducer ¶
func NewProducer(conf Config, handlers ...ProducerHandler) *Producer
NewProducer 创建异步生产者,需手动启动
func (*Producer) CanPublisher ¶ added in v0.3.1
CanPublisher 是否可以向服务器发送消息
func (*Producer) JSONMarshal ¶
JSONMarshal 序列化方法
func (*Producer) NewRecord ¶
func (p *Producer) NewRecord() *proto.ProducerMessage
NewRecord 从池中初始化一个新的消息记录
func (*Producer) Publisher ¶
func (p *Producer) Publisher(msg *proto.ProducerMessage) error
Publisher 发送消息
func (*Producer) PutRecord ¶
func (p *Producer) PutRecord(msg *proto.ProducerMessage)
PutRecord 主动归还消息记录到池,仅在主动调用 NewRecord 却没发送数据时使用
func (*Producer) ReRegister ¶
ReRegister 服务器令客户端重新发起注册流程
type ProducerHandler ¶
type ProducerHandler interface {
OnClose() // 阻塞调用
OnRegistered() // 阻塞调用
OnRegisterFailed(status proto.MessageResponseStatus) // 阻塞调用, 当注册失败触发的事件
OnRegisterExpire() // 阻塞调用
// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
OnNotImplementMessageType(frame *proto.TransferFrame, r transfer.Conn)
}
type ProducerMessage ¶
type ProducerMessage = proto.ProducerMessage
type TCPLink ¶ added in v0.3.3
type TCPLink struct {
Host string `json:"host"`
Port string `json:"port"`
LinkType proto.LinkType `json:"link_type"`
// contains filtered or unexported fields
}
func (*TCPLink) SetTCPHandler ¶ added in v0.3.3
func (l *TCPLink) SetTCPHandler(handler tcp.HandlerFunc)
func (*TCPLink) SetUDPHandler ¶ added in v0.3.3
func (l *TCPLink) SetUDPHandler(_ func())
Click to show internal directories.
Click to hide internal directories.