mq

package
v0.3.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const AnotherAuthKey = "Authorization"
View Source
const AuthHeaderKey = "User-Auth"
View Source
const CacheAuthPrefix = "Token:"

Variables

View Source
var FastApiExcludePaths = []string{
	"/docs",
	"/redoc",
	"/favicon.ico",
	"/openapi.json",
	"/swagger-ui.css",
	"/swagger-ui-bundle.js",
}

Functions

func AuthInterceptor added in v0.3.11

func AuthInterceptor(c *fiber.Ctx) error

AuthInterceptor 登陆拦截器

func CreateEdge added in v0.3.11

func CreateEdge(conf *Config) *fastapi.Wrapper

func ErrorFormatter added in v0.3.11

func ErrorFormatter(c *fastapi.Context, err error) (int, any)

func ErrorLog added in v0.3.11

func ErrorLog(c *fastapi.Context)

func NewAuthInterceptor added in v0.3.11

func NewAuthInterceptor(excludePaths []string, itp func(c *fiber.Ctx) error) func(c *fiber.Ctx) error

NewAuthInterceptor 请求认证拦截器,验证请求是否需要认证,如果需要认证,则执行拦截器,否则继续执行 @param excludePaths []string 排除的路径,如果请求路径匹配这些路径,则不执行拦截器 @param itp func(c *fiber.Ctx) error 拦截器函数 @return fiber.Handler

Types

type Config

type Config struct {
	AppName           string         `json:"app_name"`
	Version           string         `json:"version"`
	EdgeHttpHost      string         `json:"edge_http_host"`     // http api 接口服务
	EdgeHttpPort      string         `json:"edge_http_port"`     //
	EdgeEnabled       bool           `json:"edge_enabled"`       // 是否开启基于Http的消息publisher功能
	Debug             bool           `json:"debug"`              // 调试模式开关
	SwaggerDisabled   bool           `json:"swagger_disabled"`   // 禁用调试文档
	StatisticDisabled bool           `json:"statistic_disabled"` // 禁用统计功能
	Broker            *engine.Config `json:"broker"`             //
	// contains filtered or unexported fields
}

func DefaultConf

func DefaultConf() Config

type ConsumerStatistic added in v0.3.7

type ConsumerStatistic struct {
	Addr   string   `json:"addr" description:"连接地址"`
	Topics []string `json:"topics" description:"订阅的主题名列表"`
}

func (*ConsumerStatistic) SchemaDesc added in v0.3.7

func (m *ConsumerStatistic) SchemaDesc() string

type CoreEventHandler added in v0.3.2

type CoreEventHandler struct {
	engine.DefaultEventHandler
}

func (CoreEventHandler) OnConsumerClosed added in v0.3.2

func (e CoreEventHandler) OnConsumerClosed(_ string)

func (CoreEventHandler) OnConsumerRegister added in v0.3.2

func (e CoreEventHandler) OnConsumerRegister(_ string)

func (CoreEventHandler) OnProducerClosed added in v0.3.2

func (e CoreEventHandler) OnProducerClosed(_ string)

func (CoreEventHandler) OnProducerRegister added in v0.3.2

func (e CoreEventHandler) OnProducerRegister(_ string)

type EdgeRouter added in v0.3.7

type EdgeRouter struct {
	fastapi.BaseGroupRouter
}

func (*EdgeRouter) Description added in v0.3.11

func (r *EdgeRouter) Description() map[string]string

func (*EdgeRouter) Path added in v0.3.11

func (r *EdgeRouter) Path() map[string]string

func (*EdgeRouter) PathSchema added in v0.3.11

func (r *EdgeRouter) PathSchema() pathschema.RoutePathSchema

func (*EdgeRouter) PostProduct added in v0.3.11

func (r *EdgeRouter) PostProduct(c *fastapi.Context, form *ProducerForm) (*ProductResponse, error)

PostProduct 发送一个生产者消息

func (*EdgeRouter) PostProductAsync added in v0.3.11

func (r *EdgeRouter) PostProductAsync(c *fastapi.Context, form *ProducerForm) (*ProductResponse, error)

PostProductAsync 异步发送一个生产者消息

func (*EdgeRouter) Prefix added in v0.3.11

func (r *EdgeRouter) Prefix() string

func (*EdgeRouter) Summary added in v0.3.11

func (r *EdgeRouter) Summary() map[string]string

type ErrorResponse added in v0.3.11

type ErrorResponse struct {
	Code    string `json:"code"`
	Message string `json:"message"`
}

type ExchangeReq added in v0.3.11

type ExchangeReq struct {
	From string `json:"from" validate:"required,gte=1" description:"源TOPIC"`
	To   string `json:"to" validate:"required,gte=1" description:"目标TOPIC"`
}

func (*ExchangeReq) SchemaDesc added in v0.3.11

func (m *ExchangeReq) SchemaDesc() string

func (*ExchangeReq) String added in v0.3.11

func (m *ExchangeReq) String() string

type ExchangeResp added in v0.3.11

type ExchangeResp struct {
	OperationResult
}

func (*ExchangeResp) SchemaDesc added in v0.3.11

func (m *ExchangeResp) SchemaDesc() string

func (*ExchangeResp) String added in v0.3.11

func (m *ExchangeResp) String() string

type ExchangeRouter added in v0.3.11

type ExchangeRouter struct {
	fastapi.BaseGroupRouter
}

func (*ExchangeRouter) DeleteDelRoute added in v0.3.11

func (r *ExchangeRouter) DeleteDelRoute(c *fastapi.Context, form *ExchangeReq) (*OperationResult, error)

func (*ExchangeRouter) DeleteDelTopic added in v0.3.11

func (r *ExchangeRouter) DeleteDelTopic(c *fastapi.Context, form *TopicDeleteReq) (*OperationResult, error)

DeleteDelTopic 删除主题

func (*ExchangeRouter) Description added in v0.3.11

func (r *ExchangeRouter) Description() map[string]string

func (*ExchangeRouter) GetShowRoute added in v0.3.11

func (r *ExchangeRouter) GetShowRoute(c *fastapi.Context, form *ExchangeShowReq) ([]*ExchangeShowResp, error)

func (*ExchangeRouter) PathSchema added in v0.3.11

func (r *ExchangeRouter) PathSchema() pathschema.RoutePathSchema

func (*ExchangeRouter) PostAddRoute added in v0.3.11

func (r *ExchangeRouter) PostAddRoute(c *fastapi.Context, form *ExchangeReq) (*OperationResult, error)

func (*ExchangeRouter) Prefix added in v0.3.11

func (r *ExchangeRouter) Prefix() string

func (*ExchangeRouter) Summary added in v0.3.11

func (r *ExchangeRouter) Summary() map[string]string

type ExchangeShowReq added in v0.3.11

type ExchangeShowReq struct {
	Via string `query:"via" json:"via" description:"源TOPIC,空则查询所有"`
}

type ExchangeShowResp added in v0.3.11

type ExchangeShowResp struct {
	From string `json:"from" validate:"required,gte=1" description:"源TOPIC"`
	To   string `json:"to" validate:"required,gte=1" description:"目标TOPIC"`
}

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

func New

func New(cs ...Config) *MQ

func (*MQ) AddExchange added in v0.3.11

func (m *MQ) AddExchange(from, to string) error

AddExchange 添加一个topic间的数据交换

func (*MQ) Config added in v0.3.3

func (m *MQ) Config() any

func (*MQ) Ctx added in v0.3.1

func (m *MQ) Ctx() context.Context

Ctx 获取根context

func (*MQ) DelExchange added in v0.3.11

func (m *MQ) DelExchange(from, to string) error

DelExchange 删除一个topic间的数据交换

func (*MQ) Logger added in v0.3.1

func (m *MQ) Logger() logger.Iface

func (*MQ) Serve added in v0.3.1

func (m *MQ) Serve()

Serve 阻塞启动

func (*MQ) SetCrypto added in v0.3.4

func (m *MQ) SetCrypto(crypto proto.Crypto) *MQ

SetCrypto 设置加解密器

func (*MQ) SetCryptoPlan added in v0.3.4

func (m *MQ) SetCryptoPlan(option string, key ...string) *MQ

SetCryptoPlan 修改加密方案

func (*MQ) SetLogger added in v0.3.1

func (m *MQ) SetLogger(logger logger.Iface) *MQ

func (*MQ) Stat added in v0.3.7

func (m *MQ) Stat() *engine.Statistic

Stat 获取统计信息类

func (*MQ) Stop

func (m *MQ) Stop()

type OperationResult added in v0.3.11

type OperationResult struct {
	Result string `json:"result" validate:"required,oneof=ok fail" description:"删除结果"`
	Err    string `json:"err,omitempty" description:"错误原因"`
}

func Result added in v0.3.11

func Result(err error) *OperationResult

type ProducerForm added in v0.3.7

type ProducerForm struct {
	Token string `json:"token,omitempty" description:"认证密钥"`
	Topic string `json:"topic" description:"消息主题"`
	Key   string `json:"key" description:"消息键"`
	Value string `json:"value" description:"base64编码后的消息体"`
}

func (*ProducerForm) IsEncrypt added in v0.3.7

func (m *ProducerForm) IsEncrypt() bool

func (*ProducerForm) SchemaDesc added in v0.3.7

func (m *ProducerForm) SchemaDesc() string

func (*ProducerForm) String added in v0.3.7

func (m *ProducerForm) String() string

type ProductResponse added in v0.3.7

type ProductResponse struct {
	Status       string `` /* 126-byte string literal not displayed */
	Offset       uint64 `json:"offset" description:"消息偏移量"`
	ResponseTime int64  `json:"response_time" description:"服务端返回消息时的时间戳"`
	Message      string `json:"message" description:"额外的消息描述"`
}

func (*ProductResponse) SchemaDesc added in v0.3.7

func (m *ProductResponse) SchemaDesc() string

func (*ProductResponse) String added in v0.3.7

func (m *ProductResponse) String() string

type StatRouter added in v0.3.7

type StatRouter struct {
	fastapi.BaseGroupRouter
}

func (*StatRouter) Description added in v0.3.11

func (r *StatRouter) Description() map[string]string

func (*StatRouter) GetConsumers added in v0.3.11

func (r *StatRouter) GetConsumers(c *fastapi.Context) ([]*ConsumerStatistic, error)

GetConsumers 获取Broker内的消费者连接

func (*StatRouter) GetProducers added in v0.3.11

func (r *StatRouter) GetProducers(c *fastapi.Context) ([]string, error)

GetProducers 获取Broker内的生产者连接

func (*StatRouter) GetTopic added in v0.3.11

func (r *StatRouter) GetTopic(c *fastapi.Context) ([]string, error)

GetTopic 获取Broker内的topic名称

func (*StatRouter) GetTopicConsumers added in v0.3.11

func (r *StatRouter) GetTopicConsumers(c *fastapi.Context) ([]*TopicConsumerStatistic, error)

GetTopicConsumers 获取主题内部的消费者连接

func (*StatRouter) GetTopicOffset added in v0.3.11

func (r *StatRouter) GetTopicOffset(c *fastapi.Context) ([]*TopicOffsetStatistic, error)

GetTopicOffset 获取Broker内的topic名称及其最新的消息计数

func (*StatRouter) GetTopicRecord added in v0.3.11

func (r *StatRouter) GetTopicRecord(c *fastapi.Context) ([]*TopicRecordStatistic, error)

GetTopicRecord 获取主题内部的最新消息记录

func (*StatRouter) PathSchema added in v0.3.11

func (r *StatRouter) PathSchema() pathschema.RoutePathSchema

func (*StatRouter) Prefix added in v0.3.11

func (r *StatRouter) Prefix() string

func (*StatRouter) Summary added in v0.3.11

func (r *StatRouter) Summary() map[string]string

type TopicConsumerStatistic added in v0.3.7

type TopicConsumerStatistic struct {
	Topic     string   `json:"topic" description:"名称"`
	Consumers []string `json:"consumers" description:"消费者连接"`
}

type TopicDeleteReq added in v0.3.11

type TopicDeleteReq struct {
	Topic string `json:"topic" validate:"required,gte=1" description:"主题名称"`
	Force bool   `` /* 159-byte string literal not displayed */
}

type TopicOffsetStatistic added in v0.3.7

type TopicOffsetStatistic struct {
	Topic  string `json:"topic" description:"名称"`
	Offset uint64 `json:"offset" description:"最新的消息偏移量"`
}

func (*TopicOffsetStatistic) SchemaDesc added in v0.3.7

func (m *TopicOffsetStatistic) SchemaDesc() string

type TopicRecordStatistic added in v0.3.7

type TopicRecordStatistic struct {
	Topic       string `json:"topic" description:"名称"`
	Key         string `json:"key"`
	Value       string `json:"value" description:"base64编码后的消息体明文"`
	Offset      uint64 `json:"offset" description:"消息偏移量"`
	ProductTime int64  `json:"product_time" description:"消息接收时间戳"`
}

type User added in v0.3.11

type User struct {
	Email    string `json:"email,omitempty" validate:"required" description:"邮箱地址"`
	Password string `json:"password,omitempty" validate:"required" description:"密码"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL