engine

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerNotRegister = errors.New("consumer not register")
	ErrProducerNotRegister = errors.New("producer not register")
	ErrPMNotFound          = errors.New("ProducerMessage not found in frame")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Host         string       `json:"host"`
	Port         string       `json:"port"`
	MaxOpenConn  int          `json:"max_open_conn"` // 允许的最大连接数, 即 生产者+消费者最多有 MaxOpenConn 个
	BufferSize   int          `json:"buffer_size"`   // 生产者消息历史记录最大数量
	Logger       logger.Iface `json:"-"`
	Crypto       proto.Crypto `json:"-"` // 加密器
	Token        string       `json:"-"` // 注册认证密钥
	EventHandler EventHandler // 事件触发器
	// contains filtered or unexported fields
}

func (*Config) Clean

func (c *Config) Clean() *Config

type Consumer

type Consumer struct {
	Addr string          `json:"addr"`
	Conf *ConsumerConfig `json:"conf"`
	Conn *tcp.Remote     `json:"-"`
	// contains filtered or unexported fields
}

func (*Consumer) NeedConfirm

func (c *Consumer) NeedConfirm() bool

NeedConfirm 是否需要返回确认消息给客户端

func (*Consumer) Send

func (c *Consumer) Send(msg proto.Message) error

Send 向消费者客户端推送消息, 此操作是线程安全的

type ConsumerConfig

type ConsumerConfig struct {
	Topics []string      `json:"topics"`
	Ack    proto.AckType `json:"ack"`
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func New

func New(cs ...Config) *Engine

New 创建一个新的服务器

func (*Engine) AddTopic

func (e *Engine) AddTopic(name []byte) *Topic

AddTopic 添加一个新的topic,如果topic以存在则跳过

func (*Engine) BindMessageHandler

func (e *Engine) BindMessageHandler(m proto.Message, handler HookHandler, texts ...string) error

BindMessageHandler 绑定一个自实现的协议处理器,

参数m为实现了 proto.Message 接口的协议,

参数handler则为收到此协议后的同步处理函数, 如果需要在处理完成之后向客户端返回消息,则直接就地修改frame参数,
	并返回 true 和 nil, 除此之外,则不会向客户端返回任何消息
	HookHandler 的第一个参数为接收到的消息帧,需自行解码, 第二个参数为当前的客户端连接,
	此方法需返回(是否返回数据,处理是否正确)两个参数.

参数texts则为协议m的摘要名称

func (*Engine) EventHandler

func (e *Engine) EventHandler() EventHandler

func (*Engine) GetTopic

func (e *Engine) GetTopic(name []byte) *Topic

GetTopic 获取topic,并在不存在时自动新建一个topic

func (*Engine) GetTopicOffset

func (e *Engine) GetTopicOffset(name []byte) uint64

GetTopicOffset 查询指定topic当前的消息偏移量

func (*Engine) Logger

func (e *Engine) Logger() logger.Iface

func (*Engine) NeedToken added in v0.3.1

func (e *Engine) NeedToken() bool

NeedToken 是否需要密钥认证

func (*Engine) ProducerSendInterval

func (e *Engine) ProducerSendInterval() time.Duration

ProducerSendInterval 允许生产者发送数据间隔

func (*Engine) Publisher

func (e *Engine) Publisher(msg *proto.PMessage) uint64

Publisher 发布消息,并返回此消息在当前topic中的偏移量

func (*Engine) QueryConsumer

func (e *Engine) QueryConsumer(addr string) (*Consumer, bool)

QueryConsumer 查询消费者记录, 若未注册则返回nil

func (*Engine) QueryProducer

func (e *Engine) QueryProducer(addr string) (*Producer, bool)

QueryProducer 查询生产者记录, 若未注册则返回nil

func (*Engine) RangeConsumer

func (e *Engine) RangeConsumer(fn func(c *Consumer) bool)

RangeConsumer if false returned, for-loop will stop

func (*Engine) RangeProducer

func (e *Engine) RangeProducer(fn func(p *Producer) bool)

RangeProducer if false returned, for-loop will stop

func (*Engine) RangeTopic

func (e *Engine) RangeTopic(fn func(topic *Topic) bool)

RangeTopic if false returned, for-loop will stop

func (*Engine) RemoveConsumer

func (e *Engine) RemoveConsumer(addr string)

RemoveConsumer 删除一个消费者

func (*Engine) RemoveProducer

func (e *Engine) RemoveProducer(addr string)

RemoveProducer 删除一个生产者

func (*Engine) ReplaceTransfer

func (e *Engine) ReplaceTransfer(transfer Transfer) *Engine

ReplaceTransfer 替换传输层实现

func (*Engine) Serve added in v0.3.1

func (e *Engine) Serve() error

Serve 阻塞运行

func (*Engine) SetTopicHistoryBufferSize

func (e *Engine) SetTopicHistoryBufferSize(size int) *Engine

SetTopicHistoryBufferSize 设置topic历史数据缓存大小, 对于修改前已经创建的topic不受影响

@param size	int 历史数据缓存大小,[1, 10000)

func (*Engine) Stop added in v0.3.1

func (e *Engine) Stop()

type EventHandler

type EventHandler interface {
	// OnFrameParseError 当来自客户端消息帧解析出错时触发的事件
	OnFrameParseError(frame *proto.TransferFrame, r *tcp.Remote)
	// OnConsumerRegister 当消费者注册成功时触发的事件
	OnConsumerRegister(addr string)
	// OnProducerRegister 当生产者注册成功时触发的事件
	OnProducerRegister(addr string)
	// OnConsumerClosed 当消费者关闭连接时触发的事件
	OnConsumerClosed(addr string)
	// OnProducerClosed 当生产者关闭连接时触发的事件
	OnProducerClosed(addr string)
	// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
	OnNotImplementMessageType(frame *proto.TransferFrame, r *tcp.Remote) (bool, error)
	// OnCMConsumed 当一个消费者被消费成功(成功发送给全部消费者)后时触发的事件
	OnCMConsumed(record *HistoryRecord)
}

EventHandler 事件触发器

type HistoryRecord

type HistoryRecord struct {
	Topic       []byte            // 历史记录所属的topic
	Offset      []byte            // 历史记录所属的偏移量
	Time        int64             // 历史记录创建时间戳,而非CM被创建的事件戳
	MessageType proto.MessageType // CM协议类型,以此来反序列化
	Stream      []byte            // CM序列化字节流
	// contains filtered or unexported fields
}

type Hook

type Hook struct {
	Type    proto.MessageType
	Handler HookHandler
	IsAsync bool
}

type HookHandler

type HookHandler func(frame *proto.TransferFrame, r *tcp.Remote) (bool, error)

type Producer

type Producer struct {
	Addr string          `json:"addr"`
	Conf *ProducerConfig `json:"conf"`
	Conn *tcp.Remote     `json:"-"`
	// contains filtered or unexported fields
}

func (*Producer) NeedConfirm

func (p *Producer) NeedConfirm() bool

type ProducerConfig

type ProducerConfig struct {
	Ack proto.AckType `json:"ack"`
	// 定时器间隔,单位ms,仅生产者有效,生产者需要按照此间隔发送帧消息
	TickerInterval time.Duration `json:"ticker_duration"`
}

type TCPTransfer

type TCPTransfer struct {
	// contains filtered or unexported fields
}

TCPTransfer TCP传输层实现

func (*TCPTransfer) Handler

func (t *TCPTransfer) Handler(r *tcp.Remote) error

func (*TCPTransfer) OnAccepted

func (t *TCPTransfer) OnAccepted(r *tcp.Remote) error

func (*TCPTransfer) OnClosed

func (t *TCPTransfer) OnClosed(r *tcp.Remote) error

OnClosed 连接关闭, 删除此连接的消费者记录或生产者记录

func (*TCPTransfer) Serve

func (t *TCPTransfer) Serve() error

Serve 阻塞式启动TCP服务

func (*TCPTransfer) SetHost

func (t *TCPTransfer) SetHost(host string)

func (*TCPTransfer) SetLogger

func (t *TCPTransfer) SetLogger(logger logger.Iface)

func (*TCPTransfer) SetMaxOpenConn

func (t *TCPTransfer) SetMaxOpenConn(num int)

func (*TCPTransfer) SetOnClosedHandler

func (t *TCPTransfer) SetOnClosedHandler(fn func(addr string))

SetOnClosedHandler 设置当客户端断开连接时的事件

func (*TCPTransfer) SetOnConnectedHandler

func (t *TCPTransfer) SetOnConnectedHandler(fn func(r *tcp.Remote))

SetOnConnectedHandler 设置当客户端连接成功时的事件

func (*TCPTransfer) SetOnFrameParseErrorHandler

func (t *TCPTransfer) SetOnFrameParseErrorHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))

SetOnFrameParseErrorHandler 设置当客户端数据帧解析出错时的事件

func (*TCPTransfer) SetOnReceivedHandler

func (t *TCPTransfer) SetOnReceivedHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))

SetOnReceivedHandler 设置当收到客户端数据帧时的事件

func (*TCPTransfer) SetPort

func (t *TCPTransfer) SetPort(port string)

func (*TCPTransfer) Stop

func (t *TCPTransfer) Stop()

type Topic

type Topic struct {
	Name        []byte `json:"name"`         // 唯一标识
	HistorySize int    `json:"history_size"` // 生产者消息缓冲区大小
	Offset      uint64 `json:"offset"`       // 当前数据偏移量,仅用于模糊显示
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(name []byte, bufferSize, historySize int, onConsumed func(record *HistoryRecord)) *Topic

func (*Topic) AddConsumer

func (t *Topic) AddConsumer(con *Consumer)

AddConsumer 添加一个消费者

func (*Topic) IterConsumer

func (t *Topic) IterConsumer(fn func(c *Consumer))

IterConsumer 逐个迭代现有消费者

func (*Topic) Publisher

func (t *Topic) Publisher(pm *proto.PMessage) uint64

Publisher 发布消费者消息,此处会将来自生产者的消息转换成消费者消息

func (*Topic) RemoveConsumer

func (t *Topic) RemoveConsumer(addr string)

RemoveConsumer 移除一个消费者

type Transfer

type Transfer interface {
	SetHost(host string)
	SetPort(port string)           // 设置绑定端口
	SetMaxOpenConn(num int)        // 设置最大连接数量
	SetLogger(logger logger.Iface) // logger
	// SetOnConnectedHandler 设置当客户端连接成功时的事件
	SetOnConnectedHandler(fn func(r *tcp.Remote))
	// SetOnClosedHandler 设置当客户端断开连接时的事件
	SetOnClosedHandler(fn func(addr string))
	// SetOnReceivedHandler 设置当收到客户端数据帧时的事件
	SetOnReceivedHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
	// SetOnFrameParseErrorHandler 设置当客户端数据帧解析出错时的事件
	SetOnFrameParseErrorHandler(fn func(frame *proto.TransferFrame, r *tcp.Remote))
	Serve() error // 阻塞式启动TCP服务
	Stop()
}

Transfer Engine 传输层实现

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL