sdk

package
v0.3.1-pre7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AllConfirm    = proto.AllConfirm
	NoConfirm     = proto.NoConfirm
	LeaderConfirm = proto.LeaderConfirm
)
View Source
const (
	DefaultProducerSendInterval = 500 * time.Millisecond
)

Variables

View Source
var ErrConsumerHandlerIsNil = errors.New("consumer handler is nil")
View Source
var ErrConsumerUnconnected = errors.New("consumer unconnected")
View Source
var ErrConsumerUnregistered = errors.New("consumer unregistered")
View Source
var ErrProducerUnconnected = errors.New("producer unconnected")
View Source
var ErrProducerUnregistered = errors.New("producer unregistered")
View Source
var ErrTopicEmpty = errors.New("topic is empty")
View Source
var NewQueue = proto.NewQueue

Functions

This section is empty.

Types

type CHandler added in v0.3.3

type CHandler struct{}

func (*CHandler) OnClosed added in v0.3.3

func (c *CHandler) OnClosed()

func (*CHandler) OnConnected added in v0.3.3

func (c *CHandler) OnConnected()

func (*CHandler) OnNotImplementMessageType added in v0.3.3

func (c *CHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn)

func (*CHandler) OnRegisterFailed added in v0.3.3

func (c *CHandler) OnRegisterFailed()

type Config

type Config struct {
	Host   string          `json:"host"`
	Port   string          `json:"port"`
	Ack    proto.AckType   `json:"ack"`
	Link   string          `json:"link" description:"tcp/udp"`
	Ctx    context.Context `json:"-"` // 作用于生产者的父context,默认为 context.Background()
	Logger logger.Iface    `json:"-"`
	Token  string          `json:"-"`
}

Config 生产者和消费者配置参数

func (*Config) Clean

func (c *Config) Clean() *Config

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 消费者

func NewAsyncConsumer

func NewAsyncConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)

NewAsyncConsumer 创建异步消费者

func NewConsumer

func NewConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)

NewConsumer 创建一个消费者,需要手动Start

func (*Consumer) Handler

func (c *Consumer) Handler(r *tcp.Remote) error

func (*Consumer) HandlerFunc

func (c *Consumer) HandlerFunc() ConsumerHandler

HandlerFunc 获取注册的消息处理方法

func (*Consumer) IsConnected

func (c *Consumer) IsConnected() bool

IsConnected 与服务器是否连接成功

func (*Consumer) IsRegistered

func (c *Consumer) IsRegistered() bool

IsRegistered 消费者是否注册成功

func (*Consumer) JSONMarshal

func (c *Consumer) JSONMarshal(v any) ([]byte, error)

func (*Consumer) JSONUnmarshal

func (c *Consumer) JSONUnmarshal(data []byte, v any) error

JSONUnmarshal 反序列化方法

func (*Consumer) Logger

func (c *Consumer) Logger() logger.Iface

func (*Consumer) OnAccepted

func (c *Consumer) OnAccepted(r *tcp.Remote) error

OnAccepted 当TCP连接成功时会自行发送注册消息

func (*Consumer) OnClosed

func (c *Consumer) OnClosed(r *tcp.Remote) error

func (*Consumer) ReRegister

func (c *Consumer) ReRegister(r transfer.Conn) error

func (*Consumer) Start

func (c *Consumer) Start() error

Start 异步启动

func (*Consumer) StatusOK

func (c *Consumer) StatusOK() bool

StatusOK 连接状态是否正常

func (*Consumer) Stop added in v0.3.1

func (c *Consumer) Stop()

type ConsumerHandler

type ConsumerHandler interface {
	Topics() []string
	Handler(record *proto.ConsumerMessage) // (异步执行)
	OnConnected()                          // (同步执行)当连接成功时,发出的信号, 此事件必须在执行完成之后才会进行后续的处理,因此需自行控制
	OnClosed()                             // (同步执行)当连接中断时,发出的信号, 此事件必须在执行完成之后才会进行重连操作(若有)
	OnRegisterFailed()                     // (同步执行)当注册失败触发的事件
	// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
	OnNotImplementMessageType(frame *proto.TransferFrame, c transfer.Conn)
}

type ConsumerMessage

type ConsumerMessage = proto.ConsumerMessage
type Link interface {
	Connect() error
	Close() error
	SetTCPHandler(handler tcp.HandlerFunc)
	SetUDPHandler(handler func())
	Write(p []byte) (int, error) // 将切片buf中的内容追加到发数据缓冲区内,并返回写入的数据长度
	Drain() error                // 将缓冲区的数据发生到客户端
}

type PHandler added in v0.3.3

type PHandler struct {
	// contains filtered or unexported fields
}

PHandler 默认实现

func (PHandler) OnClose

func (h PHandler) OnClose()

func (PHandler) OnNotImplementMessageType added in v0.3.3

func (h PHandler) OnNotImplementMessageType(frame *proto.TransferFrame, r transfer.Conn)

func (PHandler) OnRegisterExpire added in v0.3.3

func (h PHandler) OnRegisterExpire()

func (PHandler) OnRegisterFailed added in v0.3.3

func (h PHandler) OnRegisterFailed(status proto.MessageResponseStatus)

func (PHandler) OnRegistered added in v0.3.3

func (h PHandler) OnRegistered()

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer 生产者, 通过 Send 发送的消息并非会立即投递给服务端 而是会按照服务器下发的配置定时批量发送消息,通常为500ms

func NewAsyncProducer

func NewAsyncProducer(conf Config, handlers ...ProducerHandler) (*Producer, error)

NewAsyncProducer 创建异步生产者,无需再手动启动

func NewProducer

func NewProducer(conf Config, handlers ...ProducerHandler) *Producer

NewProducer 创建异步生产者,需手动启动

func (*Producer) Beautify

func (p *Producer) Beautify(data []byte) string

Beautify 格式化显示字节流

func (*Producer) CanPublisher added in v0.3.1

func (p *Producer) CanPublisher() bool

CanPublisher 是否可以向服务器发送消息

func (*Producer) Done

func (p *Producer) Done() <-chan struct{}

func (*Producer) Handler

func (p *Producer) Handler(r *tcp.Remote) error

func (*Producer) IsConnected

func (p *Producer) IsConnected() bool

IsConnected 与服务端是否连接成功

func (*Producer) IsRegistered

func (p *Producer) IsRegistered() bool

IsRegistered 向服务端注册消费者是否成功

func (*Producer) JSONMarshal

func (p *Producer) JSONMarshal(v any) ([]byte, error)

JSONMarshal 序列化方法

func (*Producer) JSONUnmarshal

func (p *Producer) JSONUnmarshal(data []byte, v any) error

func (*Producer) Logger

func (p *Producer) Logger() logger.Iface

func (*Producer) NewRecord

func (p *Producer) NewRecord() *proto.ProducerMessage

NewRecord 从池中初始化一个新的消息记录

func (*Producer) OnAccepted

func (p *Producer) OnAccepted(r *tcp.Remote) error

func (*Producer) OnClosed

func (p *Producer) OnClosed(_ *tcp.Remote) error

func (*Producer) Publisher

func (p *Producer) Publisher(msg *proto.ProducerMessage) error

Publisher 发送消息

func (*Producer) PutRecord

func (p *Producer) PutRecord(msg *proto.ProducerMessage)

PutRecord 主动归还消息记录到池,仅在主动调用 NewRecord 却没发送数据时使用

func (*Producer) ReRegister

func (p *Producer) ReRegister(r transfer.Conn) error

ReRegister 服务器令客户端重新发起注册流程

func (*Producer) Send

func (p *Producer) Send(fn func(record *proto.ProducerMessage) error) error

Send 发送一条消息

func (*Producer) Start

func (p *Producer) Start() error

func (*Producer) Stop

func (p *Producer) Stop()

type ProducerHandler

type ProducerHandler interface {
	OnClose()                                            // 阻塞调用
	OnRegistered()                                       // 阻塞调用
	OnRegisterFailed(status proto.MessageResponseStatus) // 阻塞调用, 当注册失败触发的事件
	OnRegisterExpire()                                   // 阻塞调用
	// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
	OnNotImplementMessageType(frame *proto.TransferFrame, r transfer.Conn)
}

type ProducerMessage

type ProducerMessage = proto.ProducerMessage

type Queue

type Queue = proto.Queue
type TCPLink struct {
	Host     string         `json:"host"`
	Port     string         `json:"port"`
	LinkType proto.LinkType `json:"link_type"`
	// contains filtered or unexported fields
}

func (*TCPLink) Close added in v0.3.3

func (l *TCPLink) Close() error

func (*TCPLink) Connect added in v0.3.3

func (l *TCPLink) Connect() error

Connect 阻塞式连接

func (*TCPLink) Drain added in v0.3.3

func (l *TCPLink) Drain() error

func (*TCPLink) SetTCPHandler added in v0.3.3

func (l *TCPLink) SetTCPHandler(handler tcp.HandlerFunc)

func (*TCPLink) SetUDPHandler added in v0.3.3

func (l *TCPLink) SetUDPHandler(_ func())

func (*TCPLink) Write added in v0.3.3

func (l *TCPLink) Write(p []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL