sdk

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AllConfirm    = proto.AllConfirm
	NoConfirm     = proto.NoConfirm
	LeaderConfirm = proto.LeaderConfirm
)
View Source
const (
	DefaultProducerSendInterval = 500 * time.Millisecond
)

Variables

View Source
var ErrConsumerHandlerIsNil = errors.New("consumer handler is nil")
View Source
var ErrTopicEmpty = errors.New("topic is empty")
View Source
var NewQueue = proto.NewQueue

Functions

This section is empty.

Types

type Config

type Config struct {
	Host   string          `json:"host"`
	Port   string          `json:"port"`
	Ack    proto.AckType   `json:"ack"`
	Ctx    context.Context `json:"-"` // 作用于生产者的父context,默认为 context.Background()
	Logger logger.Iface    `json:"-"`
}

Config 生产者和消费者配置参数

func (*Config) Clean

func (c *Config) Clean() *Config

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 消费者

func NewAsyncConsumer

func NewAsyncConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)

NewAsyncConsumer 创建异步消费者

func NewConsumer

func NewConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)

NewConsumer 创建一个消费者,需要手动Start

func (*Consumer) Handler

func (c *Consumer) Handler(r *tcp.Remote) error

func (*Consumer) HandlerFunc

func (c *Consumer) HandlerFunc() ConsumerHandler

HandlerFunc 获取注册的消息处理方法

func (*Consumer) IsConnected

func (c *Consumer) IsConnected() bool

IsConnected 与服务器是否连接成功

func (*Consumer) IsRegistered

func (c *Consumer) IsRegistered() bool

IsRegistered 消费者是否注册成功

func (*Consumer) JSONMarshal

func (c *Consumer) JSONMarshal(v any) ([]byte, error)

func (*Consumer) JSONUnmarshal

func (c *Consumer) JSONUnmarshal(data []byte, v any) error

JSONUnmarshal 反序列化方法

func (*Consumer) Logger

func (c *Consumer) Logger() logger.Iface

func (*Consumer) OnAccepted

func (c *Consumer) OnAccepted(r *tcp.Remote) error

OnAccepted 当TCP连接成功时会自行发送注册消息

func (*Consumer) OnClosed

func (c *Consumer) OnClosed(r *tcp.Remote) error

func (*Consumer) ReRegister

func (c *Consumer) ReRegister(r *tcp.Remote) error

func (*Consumer) Start

func (c *Consumer) Start() error

Start 异步启动

func (*Consumer) StatusOK

func (c *Consumer) StatusOK() bool

StatusOK 连接状态是否正常

type ConsumerHandler

type ConsumerHandler interface {
	Topics() []string
	Handler(record *proto.ConsumerMessage)
	OnConnected() // (同步执行)当连接成功时,发出的信号, 此事件必须在执行完成之后才会进行后续的处理,因此需自行控制
	OnClosed()    // (同步执行)当连接中断时,发出的信号, 此事件必须在执行完成之后才会进行重连操作(若有)
	// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
	OnNotImplementMessageType(frame *proto.TransferFrame, r *tcp.Remote)
}

type ConsumerHandlerFunc

type ConsumerHandlerFunc struct{}

func (*ConsumerHandlerFunc) OnClosed

func (c *ConsumerHandlerFunc) OnClosed()

func (*ConsumerHandlerFunc) OnConnected

func (c *ConsumerHandlerFunc) OnConnected()

func (*ConsumerHandlerFunc) OnNotImplementMessageType

func (c *ConsumerHandlerFunc) OnNotImplementMessageType(frame *proto.TransferFrame, r *tcp.Remote)

type ConsumerMessage

type ConsumerMessage = proto.ConsumerMessage
type Link struct {
	Host string         `json:"host"`
	Port string         `json:"port"`
	Kind proto.LinkType `json:"kind"`
	// contains filtered or unexported fields
}

func (*Link) Connect

func (l *Link) Connect() error

Connect 阻塞式连接

func (*Link) IsConsumer

func (l *Link) IsConsumer() bool

func (*Link) IsProducer

func (l *Link) IsProducer() bool

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer 生产者, 通过 Send 发送的消息并非会立即投递给服务端 而是会按照服务器下发的配置定时批量发送消息,通常为500ms

func NewAsyncProducer

func NewAsyncProducer(conf Config, handlers ...ProducerHandler) (*Producer, error)

NewAsyncProducer 创建异步生产者,无需再手动启动

func NewProducer

func NewProducer(conf Config, handlers ...ProducerHandler) *Producer

NewProducer 创建异步生产者,需手动启动

func (*Producer) Beautify

func (p *Producer) Beautify(data []byte) string

Beautify 格式化显示字节流

func (*Producer) Done

func (p *Producer) Done() <-chan struct{}

func (*Producer) Handler

func (p *Producer) Handler(r *tcp.Remote) error

func (*Producer) IsConnected

func (p *Producer) IsConnected() bool

func (*Producer) IsRegistered

func (p *Producer) IsRegistered() bool

func (*Producer) JSONMarshal

func (p *Producer) JSONMarshal(v any) ([]byte, error)

JSONMarshal 序列化方法

func (*Producer) JSONUnmarshal

func (p *Producer) JSONUnmarshal(data []byte, v any) error

func (*Producer) Logger

func (p *Producer) Logger() logger.Iface

func (*Producer) NewRecord

func (p *Producer) NewRecord() *proto.ProducerMessage

NewRecord 从池中初始化一个新的消息记录

func (*Producer) OnAccepted

func (p *Producer) OnAccepted(r *tcp.Remote) error

func (*Producer) OnClosed

func (p *Producer) OnClosed(_ *tcp.Remote) error

func (*Producer) Publisher

func (p *Producer) Publisher(msg *proto.ProducerMessage) error

Publisher 发送消息

func (*Producer) PutRecord

func (p *Producer) PutRecord(msg *proto.ProducerMessage)

PutRecord 主动归还消息记录到池,仅在主动调用 NewRecord 却没发送数据时使用

func (*Producer) ReRegister

func (p *Producer) ReRegister(r *tcp.Remote) error

ReRegister 服务器令客户端重新发起注册流程

func (*Producer) Send

func (p *Producer) Send(fn func(record *proto.ProducerMessage) error) error

Send 发送一条消息

func (*Producer) Start

func (p *Producer) Start() error

func (*Producer) Stop

func (p *Producer) Stop()

type ProducerHandler

type ProducerHandler interface {
	OnRegistered()     // 阻塞调用
	OnClose()          // 阻塞调用
	OnRegisterExpire() // 阻塞调用
}

type ProducerMessage

type ProducerMessage = proto.ProducerMessage

type Queue

type Queue = proto.Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL