mqtt

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: PostgreSQL Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnsupported = NewRpcError("unsupported operation")

ErrUnsupported is returned when an endpoint receives a message on an unrecognized topic.

Functions

func NewRpcError

func NewRpcError(msg string) error

func WithPublishFunc

func WithPublishFunc(ctx context.Context, fn PublishFunc) context.Context

WithPublishFunc adds a PublishFunc to the context.

Types

type BrokerConfig

type BrokerConfig struct {
	URL      string
	ClientID string
	Username string
	Password string
}

type Call

type Call struct {
	Topic     string
	Request   any
	Response  any
	QoS       int
	MaxWait   time.Duration
	Retain    bool
	Unmarshal func(payload []byte, resp any) error
}

func (*Call) Payload

func (c *Call) Payload() ([]byte, error)

type Config

type Config struct {
	Logger          *zerolog.Logger
	Broker          BrokerConfig
	MessageHandlers map[string]MessageHandler
	RequestHandlers map[string]RequestHandler
	HandlerTimeout  time.Duration
	Subscriptions   []string
	AutoSubscribe   bool
}

type Endpoint

type Endpoint interface {
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Call(ctx context.Context, c *Call) error
	Publish(ctx context.Context, msg *Message) error
	Subscribe(ctx context.Context, topic string) error
	Unsubscribe(ctx context.Context, topic string) error
	RegisterMessageHandler(topic string, h MessageHandler)
	RegisterRequestHandler(topic string, h RequestHandler)
	UnregisterHandler(topic string)
}

type HTTPDoer

type HTTPDoer struct {
	// contains filtered or unexported fields
}

HTTPDoer is an HTTP client that communicates over MQTT. It works by reading and writing raw HTTP requests/responses in the MQTT message payloads.

func NewHTTPDoer

func NewHTTPDoer(config HTTPDoerConfig) *HTTPDoer

func (*HTTPDoer) Do

func (d *HTTPDoer) Do(req *http.Request) (*http.Response, error)

type HTTPDoerConfig

type HTTPDoerConfig struct {
	Topic    string
	Endpoint Endpoint
	MaxWait  time.Duration
	QoS      int
	Retain   bool
}

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

HTTPServer is an HTTP server that communicates over MQTT. It works by reading and writing raw HTTP requests/responses in the MQTT message payloads.

func NewHTTPServer

func NewHTTPServer(config HTTPServerConfig) *HTTPServer

func (*HTTPServer) Start

func (s *HTTPServer) Start(ctx context.Context) error

func (*HTTPServer) Stop

func (s *HTTPServer) Stop(ctx context.Context) error

type HTTPServerConfig

type HTTPServerConfig struct {
	Topic          string
	Broker         BrokerConfig
	HandlerTimeout time.Duration
	Handler        http.Handler
	Logger         *zerolog.Logger
}

type MQTTEndpoint

type MQTTEndpoint struct {
	// contains filtered or unexported fields
}

func New

func New(config Config) *MQTTEndpoint

func (*MQTTEndpoint) Call

func (e *MQTTEndpoint) Call(ctx context.Context, c *Call) error

Call sends a command and blocks until a response is received. Requests and responses are automatically marshalled to/from JSON. The response is expected to be a JSON object with either `result` or `error` properties set.

func (*MQTTEndpoint) Connect

func (e *MQTTEndpoint) Connect(ctx context.Context) error

func (*MQTTEndpoint) Disconnect

func (e *MQTTEndpoint) Disconnect(ctx context.Context) error

func (*MQTTEndpoint) Publish

func (e *MQTTEndpoint) Publish(ctx context.Context, msg *Message) error

Publish a message to a specified topic. The payload is automatically marshalled to JSON if it's not already a byte slice.

func (*MQTTEndpoint) RegisterMessageHandler

func (e *MQTTEndpoint) RegisterMessageHandler(topic string, h MessageHandler)

func (*MQTTEndpoint) RegisterRequestHandler

func (e *MQTTEndpoint) RegisterRequestHandler(topic string, h RequestHandler)

func (*MQTTEndpoint) Subscribe

func (e *MQTTEndpoint) Subscribe(ctx context.Context, topic string) error

func (*MQTTEndpoint) UnregisterHandler

func (e *MQTTEndpoint) UnregisterHandler(topic string)

func (*MQTTEndpoint) Unsubscribe

func (e *MQTTEndpoint) Unsubscribe(ctx context.Context, topic string) error

type Message

type Message struct {
	Topic   string
	QoS     int
	Payload []byte
	Retain  bool
	Expiry  time.Duration
}

type MessageHandler

type MessageHandler func(ctx context.Context, msg *Message)

type PublishFunc

type PublishFunc func(ctx context.Context, msg *Message) error

func GetPublishFunc

func GetPublishFunc(ctx context.Context) (PublishFunc, bool)

GetPublishFunc returns the PublishFunc associated with the context, if any.

type RequestHandler

type RequestHandler func(ctx context.Context, msg *Message) (any, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL