Documentation
¶
Index ¶
- Variables
- type Config
- type Consumer
- type ConsumerConfig
- type Engine
- func (e *Engine) AddTopic(name []byte) *Topic
- func (e *Engine) BindMessageHandler(m proto.Message, handler HookHandler, texts ...string) error
- func (e *Engine) EventHandler() EventHandler
- func (e *Engine) GetTopic(name []byte) *Topic
- func (e *Engine) GetTopicOffset(name []byte) uint64
- func (e *Engine) Logger() logger.Iface
- func (e *Engine) NeedToken() bool
- func (e *Engine) ProducerSendInterval() time.Duration
- func (e *Engine) Publisher(msg *proto.PMessage) uint64
- func (e *Engine) QueryConsumer(addr string) (*Consumer, bool)
- func (e *Engine) QueryProducer(addr string) (*Producer, bool)
- func (e *Engine) RangeConsumer(fn func(c *Consumer) bool)
- func (e *Engine) RangeProducer(fn func(p *Producer) bool)
- func (e *Engine) RangeTopic(fn func(topic *Topic) bool)
- func (e *Engine) RemoveConsumer(addr string)
- func (e *Engine) RemoveProducer(addr string)
- func (e *Engine) ReplaceTransfer(transfer Transfer) *Engine
- func (e *Engine) Serve() error
- func (e *Engine) SetTopicHistoryBufferSize(size int) *Engine
- func (e *Engine) Stop()
- type EventHandler
- type HistoryRecord
- type Hook
- type HookHandler
- type Producer
- type ProducerConfig
- type TCPTransfer
- func (t *TCPTransfer) Handler(r *tcp.Remote) error
- func (t *TCPTransfer) OnAccepted(r *tcp.Remote) error
- func (t *TCPTransfer) OnClosed(r *tcp.Remote) error
- func (t *TCPTransfer) Serve() error
- func (t *TCPTransfer) SetHost(host string)
- func (t *TCPTransfer) SetLogger(logger logger.Iface)
- func (t *TCPTransfer) SetMaxOpenConn(num int)
- func (t *TCPTransfer) SetOnClosedHandler(fn func(addr string))
- func (t *TCPTransfer) SetOnConnectedHandler(fn func(r *tcp.Remote))
- func (t *TCPTransfer) SetOnFrameParseErrorHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
- func (t *TCPTransfer) SetOnReceivedHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
- func (t *TCPTransfer) SetPort(port string)
- func (t *TCPTransfer) Stop()
- type Topic
- type Transfer
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Host string `json:"host"`
Port string `json:"port"`
MaxOpenConn int `json:"max_open_conn"` // 允许的最大连接数, 即 生产者+消费者最多有 MaxOpenConn 个
BufferSize int `json:"buffer_size"` // 生产者消息历史记录最大数量
Logger logger.Iface `json:"-"`
Crypto proto.Crypto `json:"-"` // 加密器
Token string `json:"-"` // 注册认证密钥
EventHandler EventHandler // 事件触发器
// contains filtered or unexported fields
}
type Consumer ¶
type Consumer struct {
Addr string `json:"addr"`
Conf *ConsumerConfig `json:"conf"`
Conn *tcp.Remote `json:"-"`
// contains filtered or unexported fields
}
type ConsumerConfig ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) BindMessageHandler ¶
BindMessageHandler 绑定一个自实现的协议处理器,
参数m为实现了 proto.Message 接口的协议, 参数handler则为收到此协议后的同步处理函数, 如果需要在处理完成之后向客户端返回消息,则直接就地修改frame参数, 并返回 true 和 nil, 除此之外,则不会向客户端返回任何消息 HookHandler 的第一个参数为接收到的消息帧,需自行解码, 第二个参数为当前的客户端连接, 此方法需返回(是否返回数据,处理是否正确)两个参数. 参数texts则为协议m的摘要名称
func (*Engine) EventHandler ¶
func (e *Engine) EventHandler() EventHandler
func (*Engine) GetTopicOffset ¶
GetTopicOffset 查询指定topic当前的消息偏移量
func (*Engine) ProducerSendInterval ¶
ProducerSendInterval 允许生产者发送数据间隔
func (*Engine) QueryConsumer ¶
QueryConsumer 查询消费者记录, 若未注册则返回nil
func (*Engine) QueryProducer ¶
QueryProducer 查询生产者记录, 若未注册则返回nil
func (*Engine) RangeConsumer ¶
RangeConsumer if false returned, for-loop will stop
func (*Engine) RangeProducer ¶
RangeProducer if false returned, for-loop will stop
func (*Engine) RangeTopic ¶
RangeTopic if false returned, for-loop will stop
func (*Engine) ReplaceTransfer ¶
ReplaceTransfer 替换传输层实现
func (*Engine) SetTopicHistoryBufferSize ¶
SetTopicHistoryBufferSize 设置topic历史数据缓存大小, 对于修改前已经创建的topic不受影响
@param size int 历史数据缓存大小,[1, 10000)
type EventHandler ¶
type EventHandler interface {
// OnFrameParseError 当来自客户端消息帧解析出错时触发的事件
OnFrameParseError(frame *proto.TransferFrame, r *tcp.Remote)
// OnConsumerRegister 当消费者注册成功时触发的事件
OnConsumerRegister(addr string)
// OnProducerRegister 当生产者注册成功时触发的事件
OnProducerRegister(addr string)
// OnConsumerClosed 当消费者关闭连接时触发的事件
OnConsumerClosed(addr string)
// OnProducerClosed 当生产者关闭连接时触发的事件
OnProducerClosed(addr string)
// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
OnNotImplementMessageType(frame *proto.TransferFrame, r *tcp.Remote) (bool, error)
// OnCMConsumed 当一个消费者被消费成功(成功发送给全部消费者)后时触发的事件
OnCMConsumed(record *HistoryRecord)
}
EventHandler 事件触发器
type HistoryRecord ¶
type Hook ¶
type Hook struct {
Type proto.MessageType
Handler HookHandler
IsAsync bool
}
type HookHandler ¶
type Producer ¶
type Producer struct {
Addr string `json:"addr"`
Conf *ProducerConfig `json:"conf"`
Conn *tcp.Remote `json:"-"`
// contains filtered or unexported fields
}
func (*Producer) NeedConfirm ¶
type ProducerConfig ¶
type TCPTransfer ¶
type TCPTransfer struct {
// contains filtered or unexported fields
}
TCPTransfer TCP传输层实现
func (*TCPTransfer) OnAccepted ¶
func (t *TCPTransfer) OnAccepted(r *tcp.Remote) error
func (*TCPTransfer) OnClosed ¶
func (t *TCPTransfer) OnClosed(r *tcp.Remote) error
OnClosed 连接关闭, 删除此连接的消费者记录或生产者记录
func (*TCPTransfer) SetHost ¶
func (t *TCPTransfer) SetHost(host string)
func (*TCPTransfer) SetLogger ¶
func (t *TCPTransfer) SetLogger(logger logger.Iface)
func (*TCPTransfer) SetMaxOpenConn ¶
func (t *TCPTransfer) SetMaxOpenConn(num int)
func (*TCPTransfer) SetOnClosedHandler ¶
func (t *TCPTransfer) SetOnClosedHandler(fn func(addr string))
SetOnClosedHandler 设置当客户端断开连接时的事件
func (*TCPTransfer) SetOnConnectedHandler ¶
func (t *TCPTransfer) SetOnConnectedHandler(fn func(r *tcp.Remote))
SetOnConnectedHandler 设置当客户端连接成功时的事件
func (*TCPTransfer) SetOnFrameParseErrorHandler ¶
func (t *TCPTransfer) SetOnFrameParseErrorHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
SetOnFrameParseErrorHandler 设置当客户端数据帧解析出错时的事件
func (*TCPTransfer) SetOnReceivedHandler ¶
func (t *TCPTransfer) SetOnReceivedHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
SetOnReceivedHandler 设置当收到客户端数据帧时的事件
func (*TCPTransfer) SetPort ¶
func (t *TCPTransfer) SetPort(port string)
func (*TCPTransfer) Stop ¶
func (t *TCPTransfer) Stop()
type Topic ¶
type Topic struct {
Name []byte `json:"name"` // 唯一标识
HistorySize int `json:"history_size"` // 生产者消息缓冲区大小
Offset uint64 `json:"offset"` // 当前数据偏移量,仅用于模糊显示
// contains filtered or unexported fields
}
func NewTopic ¶
func NewTopic(name []byte, bufferSize, historySize int, onConsumed func(record *HistoryRecord)) *Topic
func (*Topic) IterConsumer ¶
IterConsumer 逐个迭代现有消费者
type Transfer ¶
type Transfer interface {
SetHost(host string)
SetPort(port string) // 设置绑定端口
SetMaxOpenConn(num int) // 设置最大连接数量
SetLogger(logger logger.Iface) // logger
// SetOnConnectedHandler 设置当客户端连接成功时的事件
SetOnConnectedHandler(fn func(r *tcp.Remote))
// SetOnClosedHandler 设置当客户端断开连接时的事件
SetOnClosedHandler(fn func(addr string))
// SetOnReceivedHandler 设置当收到客户端数据帧时的事件
SetOnReceivedHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
// SetOnFrameParseErrorHandler 设置当客户端数据帧解析出错时的事件
SetOnFrameParseErrorHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
Serve() error // 阻塞式启动TCP服务
Stop()
}
Transfer Engine 传输层实现