Documentation
¶
Index ¶
- Variables
- type ChainArgs
- type Config
- type Consumer
- type ConsumerConfig
- type DefaultEventHandler
- func (e DefaultEventHandler) OnCMConsumed(_ *HistoryRecord)
- func (e DefaultEventHandler) OnConsumerClosed(_ string)
- func (e DefaultEventHandler) OnConsumerHeartbeatTimeout(event TimeoutEvent)
- func (e DefaultEventHandler) OnConsumerRegister(_ string)
- func (e DefaultEventHandler) OnConsumerRegisterTimeout(event TimeoutEvent)
- func (e DefaultEventHandler) OnFrameParseError(_ *proto.TransferFrame, _ transfer.Conn)
- func (e DefaultEventHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) (bool, error)
- func (e DefaultEventHandler) OnProducerClosed(_ string)
- func (e DefaultEventHandler) OnProducerHeartbeatTimeout(event TimeoutEvent)
- func (e DefaultEventHandler) OnProducerRegister(_ string)
- func (e DefaultEventHandler) OnProducerRegisterTimeout(event TimeoutEvent)
- type Engine
- func (e *Engine) AddTopic(name []byte) *Topic
- func (e *Engine) BindMessageHandler(m proto.Message, handler HookHandler, texts ...string) error
- func (e *Engine) Ctx() context.Context
- func (e *Engine) EventHandler() EventHandler
- func (e *Engine) GetTopic(name []byte) *Topic
- func (e *Engine) GetTopicOffset(name []byte) uint64
- func (e *Engine) HeartbeatInterval() float64
- func (e *Engine) IsTokenCorrect(token string) bool
- func (e *Engine) Logger() logger.Iface
- func (e *Engine) NeedToken() bool
- func (e *Engine) ProducerSendInterval() time.Duration
- func (e *Engine) Publisher(msg *proto.PMessage) uint64
- func (e *Engine) QueryConsumer(addr string) (*Consumer, bool)
- func (e *Engine) QueryProducer(addr string) (*Producer, bool)
- func (e *Engine) RangeConsumer(fn func(c *Consumer) bool)
- func (e *Engine) RangeProducer(fn func(p *Producer) bool)
- func (e *Engine) RangeTopic(fn func(topic *Topic) bool)
- func (e *Engine) RemoveConsumer(addr string)
- func (e *Engine) RemoveProducer(addr string)
- func (e *Engine) ReplaceTransfer(transfer transfer.Transfer) *Engine
- func (e *Engine) Serve() error
- func (e *Engine) SetEventHandler(handler EventHandler) *Engine
- func (e *Engine) SetTopicHistoryBufferSize(size int) *Engine
- func (e *Engine) Stop()
- type EventHandler
- type FlowHandler
- type HistoryRecord
- type Hook
- type HookHandler
- type Monitor
- func (k *Monitor) Do(ctx context.Context) error
- func (k *Monitor) Interval() time.Duration
- func (k *Monitor) OnClientClosed(addr string)
- func (k *Monitor) OnClientConnected(addr string)
- func (k *Monitor) OnClientHeartbeat(addr string)
- func (k *Monitor) OnClientRegistered(addr string, linkType proto.LinkType)
- func (k *Monitor) OnStartup()
- func (k *Monitor) ReadClientTimeInfo(addr string, linkType proto.LinkType) *TimeInfo
- func (k *Monitor) String() string
- type Producer
- type ProducerConfig
- type TimeInfo
- type TimeoutEvent
- type TimeoutEventType
- type Topic
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type ChainArgs ¶ added in v0.3.2
type ChainArgs struct {
// contains filtered or unexported fields
}
type Config ¶
type Config struct {
Host string `json:"host"`
Port string `json:"port"`
MaxOpenConn int `json:"max_open_conn"` // 允许的最大连接数, 即 生产者+消费者最多有 MaxOpenConn 个
BufferSize int `json:"buffer_size"` // 生产者消息历史记录最大数量
HeartbeatTimeout float64 `json:"heartbeat_timeout"`
Logger logger.Iface `json:"-"`
Crypto proto.Crypto `json:"-"` // 加密器
Token string `json:"-"` // 注册认证密钥
EventHandler EventHandler `json:"-"` // 事件触发器
Ctx context.Context `json:"-"`
// contains filtered or unexported fields
}
type Consumer ¶
type Consumer struct {
Addr string `json:"addr"`
Conf *ConsumerConfig `json:"conf"`
Conn transfer.Conn `json:"-"`
// contains filtered or unexported fields
}
type ConsumerConfig ¶
type DefaultEventHandler ¶ added in v0.3.3
type DefaultEventHandler struct{}
func (DefaultEventHandler) OnCMConsumed ¶ added in v0.3.3
func (e DefaultEventHandler) OnCMConsumed(_ *HistoryRecord)
func (DefaultEventHandler) OnConsumerClosed ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerClosed(_ string)
func (DefaultEventHandler) OnConsumerHeartbeatTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerHeartbeatTimeout(event TimeoutEvent)
func (DefaultEventHandler) OnConsumerRegister ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerRegister(_ string)
func (DefaultEventHandler) OnConsumerRegisterTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnConsumerRegisterTimeout(event TimeoutEvent)
func (DefaultEventHandler) OnFrameParseError ¶ added in v0.3.3
func (e DefaultEventHandler) OnFrameParseError(_ *proto.TransferFrame, _ transfer.Conn)
func (DefaultEventHandler) OnNotImplementMessageType ¶ added in v0.3.3
func (e DefaultEventHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) (bool, error)
func (DefaultEventHandler) OnProducerClosed ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerClosed(_ string)
func (DefaultEventHandler) OnProducerHeartbeatTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerHeartbeatTimeout(event TimeoutEvent)
func (DefaultEventHandler) OnProducerRegister ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerRegister(_ string)
func (DefaultEventHandler) OnProducerRegisterTimeout ¶ added in v0.3.3
func (e DefaultEventHandler) OnProducerRegisterTimeout(event TimeoutEvent)
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) BindMessageHandler ¶
BindMessageHandler 绑定一个自实现的协议处理器,
参数m为实现了 proto.Message 接口的协议, 参数handler则为收到此协议后的同步处理函数, 如果需要在处理完成之后向客户端返回消息,则直接就地修改frame参数, 并返回 true 和 nil, 除此之外,则不会向客户端返回任何消息 HookHandler 的第一个参数为接收到的消息帧,需自行解码, 第二个参数为当前的客户端连接, 此方法需返回(是否返回数据,处理是否正确)两个参数. 参数texts则为协议m的摘要名称
func (*Engine) EventHandler ¶
func (e *Engine) EventHandler() EventHandler
func (*Engine) GetTopicOffset ¶
GetTopicOffset 查询指定topic当前的消息偏移量
func (*Engine) HeartbeatInterval ¶ added in v0.3.3
HeartbeatInterval 心跳周期间隔
func (*Engine) IsTokenCorrect ¶ added in v0.3.2
IsTokenCorrect 判断客户端的token是否正确,若未开启token验证,则始终正确
func (*Engine) ProducerSendInterval ¶
ProducerSendInterval 允许生产者发送数据间隔
func (*Engine) QueryConsumer ¶
QueryConsumer 查询消费者记录, 若未注册则返回nil
func (*Engine) QueryProducer ¶
QueryProducer 查询生产者记录, 若未注册则返回nil
func (*Engine) RangeConsumer ¶
RangeConsumer if false returned, for-loop will stop
func (*Engine) RangeProducer ¶
RangeProducer if false returned, for-loop will stop
func (*Engine) RangeTopic ¶
RangeTopic if false returned, for-loop will stop
func (*Engine) ReplaceTransfer ¶
ReplaceTransfer 替换传输层实现
func (*Engine) SetEventHandler ¶ added in v0.3.2
func (e *Engine) SetEventHandler(handler EventHandler) *Engine
SetEventHandler 设置事件触发器
func (*Engine) SetTopicHistoryBufferSize ¶
SetTopicHistoryBufferSize 设置topic历史数据缓存大小, 对于修改前已经创建的topic不受影响
@param size int 历史数据缓存大小,[1, 10000)
type EventHandler ¶
type EventHandler interface {
// OnFrameParseError 当来自客户端消息帧解析出错时触发的事件(同步调用)
OnFrameParseError(frame *proto.TransferFrame, con transfer.Conn)
// OnConsumerRegister 当消费者注册成功时触发的事件(异步调用)
OnConsumerRegister(addr string)
// OnProducerRegister 当生产者注册成功时触发的事件(异步调用)
OnProducerRegister(addr string)
// OnConsumerClosed 当消费者关闭连接时触发的事件(异步调用)
OnConsumerClosed(addr string)
// OnProducerClosed 当生产者关闭连接时触发的事件(异步调用)
OnProducerClosed(addr string)
// OnConsumerHeartbeatTimeout 当消费者心跳超时触发的事件(异步调用)
OnConsumerHeartbeatTimeout(event TimeoutEvent)
// OnProducerHeartbeatTimeout 当生产者心跳超时时触发的事件(异步调用)
OnProducerHeartbeatTimeout(event TimeoutEvent)
// OnConsumerRegisterTimeout 当消费者连接成功后不注册引发的超时事件(异步调用)
OnConsumerRegisterTimeout(event TimeoutEvent)
// OnProducerRegisterTimeout 当消生产者连接成功后不注册引发的超时事件(异步调用)
OnProducerRegisterTimeout(event TimeoutEvent)
// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件(同步调用)
OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) (bool, error)
// OnCMConsumed 当一个消费者被消费成功(成功发送给全部消费者)后时触发的事件(同步调用)
OnCMConsumed(record *HistoryRecord)
}
EventHandler 事件触发器
type FlowHandler ¶ added in v0.3.2
type HistoryRecord ¶
type Hook ¶
type Hook struct {
Type proto.MessageType
Handler HookHandler
IsAsync bool
}
type HookHandler ¶
HookHandler 消息处理方法
type Monitor ¶ added in v0.3.3
Monitor 监视器 1. 检测连接成功但不注册的客户端 2. 检测心跳超时的客户端 超时时主动断开连接
func (*Monitor) OnClientClosed ¶ added in v0.3.3
OnClientClosed 连接关闭,清空时间信息
func (*Monitor) OnClientConnected ¶ added in v0.3.3
func (*Monitor) OnClientHeartbeat ¶ added in v0.3.3
func (*Monitor) OnClientRegistered ¶ added in v0.3.3
func (*Monitor) ReadClientTimeInfo ¶ added in v0.3.3
type Producer ¶
type Producer struct {
Addr string `json:"addr"`
Conf *ProducerConfig `json:"conf"`
Conn transfer.Conn `json:"-"`
// contains filtered or unexported fields
}
func (*Producer) NeedConfirm ¶
type ProducerConfig ¶
type TimeInfo ¶ added in v0.3.3
type TimeInfo struct {
Addr string `json:"addr"`
LinkType proto.LinkType `json:"link_type"`
ConnectedAt int64 `json:"connected_at"` // 连接成功时间戳
RegisteredAt int64 `json:"registered_at"` // 注册成功时间戳
HeartbeatAt int64 `json:"heartbeat_at"` // 最近的一个心跳时间戳
}
TimeInfo 关于监视器有关的时间信息
func (*TimeInfo) IsRegistered ¶ added in v0.3.3
type TimeoutEvent ¶ added in v0.3.3
type TimeoutEvent struct {
Addr string `json:"addr,omitempty"`
EventType TimeoutEventType `json:"event_type,omitempty"`
LinkType proto.LinkType `json:"link_type,omitempty"`
TimeoutInterval float64 `json:"timeout_interval,omitempty"`
ConnectedAt int64 `json:"connected_at"`
TimeoutAt int64 `json:"timeout_at"`
}
TimeoutEvent 超时事件
type TimeoutEventType ¶ added in v0.3.3
type TimeoutEventType string
const ( HeartbeatTimeoutEvent TimeoutEventType = "HEARTBEAT_TIMEOUT" RegisterTimeoutEvent TimeoutEventType = "REGISTER_TIMEOUT" )
type Topic ¶
type Topic struct {
Name []byte `json:"name"` // 唯一标识
HistorySize int `json:"history_size"` // 生产者消息缓冲区大小
Offset uint64 `json:"offset"` // 当前数据偏移量,仅用于模糊显示
// contains filtered or unexported fields
}
func NewTopic ¶
func NewTopic(name []byte, bufferSize, historySize int, onConsumed func(record *HistoryRecord)) *Topic
func (*Topic) IterConsumer ¶
IterConsumer 逐个迭代现有消费者