Documentation
¶
Index ¶
- Constants
- Variables
- func NewExtension(opts ...ConfigOption) forge.Extension
- func NewExtensionWithConfig(config Config) forge.Extension
- type ClientStats
- type Config
- type ConfigOption
- func WithAutoReconnect(enable bool) ConfigOption
- func WithBroker(broker string) ConfigOption
- func WithCleanSession(clean bool) ConfigOption
- func WithClientID(clientID string) ConfigOption
- func WithConfig(config Config) ConfigOption
- func WithCredentials(username, password string) ConfigOption
- func WithKeepAlive(duration time.Duration) ConfigOption
- func WithMetrics(enable bool) ConfigOption
- func WithQoS(qos byte) ConfigOption
- func WithRequireConfig(require bool) ConfigOption
- func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
- func WithTracing(enable bool) ConfigOption
- func WithWill(topic, payload string, qos byte, retained bool) ConfigOption
- type ConnectHandler
- type ConnectionLostHandler
- type Extension
- type MQTT
- type MQTTService
- func (s *MQTTService) Client() MQTT
- func (s *MQTTService) Connect(ctx context.Context) error
- func (s *MQTTService) Disconnect(ctx context.Context) error
- func (s *MQTTService) Health(ctx context.Context) error
- func (s *MQTTService) IsConnected() bool
- func (s *MQTTService) Name() string
- func (s *MQTTService) Ping(ctx context.Context) error
- func (s *MQTTService) Publish(ctx context.Context, topic string, payload []byte, opts ...PublishOption) error
- func (s *MQTTService) Start(ctx context.Context) error
- func (s *MQTTService) Stop(ctx context.Context) error
- func (s *MQTTService) Subscribe(ctx context.Context, topic string, handler MessageHandler, ...) error
- func (s *MQTTService) Unsubscribe(ctx context.Context, topic string) error
- type MessageHandler
- type ReconnectingHandler
- type SubscriptionInfo
Constants ¶
const (
// ServiceKey is the DI key for the MQTT service.
ServiceKey = "mqtt"
)
DI container keys for MQTT extension services.
Variables ¶
var ( // ErrNotConnected is returned when operation requires connection ErrNotConnected = errors.New("mqtt: not connected") // ErrAlreadyConnected is returned when already connected ErrAlreadyConnected = errors.New("mqtt: already connected") // ErrConnectionFailed is returned when connection fails ErrConnectionFailed = errors.New("mqtt: connection failed") // ErrPublishFailed is returned when publish fails ErrPublishFailed = errors.New("mqtt: publish failed") // ErrSubscribeFailed is returned when subscription fails ErrSubscribeFailed = errors.New("mqtt: subscribe failed") // ErrUnsubscribeFailed is returned when unsubscribe fails ErrUnsubscribeFailed = errors.New("mqtt: unsubscribe failed") // ErrInvalidQoS is returned when QoS value is invalid ErrInvalidQoS = errors.New("mqtt: invalid QoS value") // ErrInvalidTopic is returned when topic is invalid ErrInvalidTopic = errors.New("mqtt: invalid topic") // ErrTimeout is returned when operation times out ErrTimeout = errors.New("mqtt: operation timeout") )
Functions ¶
func NewExtension ¶
func NewExtension(opts ...ConfigOption) forge.Extension
NewExtension creates a new MQTT extension
func NewExtensionWithConfig ¶
NewExtensionWithConfig creates a new MQTT extension with a complete config
Types ¶
type ClientStats ¶
type ClientStats struct {
Connected bool
ConnectTime time.Time
LastMessageTime time.Time
MessagesReceived int64
MessagesSent int64
Subscriptions int
Reconnects int64
}
ClientStats contains client statistics
type Config ¶
type Config struct {
// Connection settings
Broker string `json:"broker" yaml:"broker" mapstructure:"broker"`
ClientID string `json:"client_id" yaml:"client_id" mapstructure:"client_id"`
Username string `json:"username,omitempty" yaml:"username,omitempty" mapstructure:"username"`
Password string `json:"password,omitempty" yaml:"password,omitempty" mapstructure:"password"`
CleanSession bool `json:"clean_session" yaml:"clean_session" mapstructure:"clean_session"`
ConnectTimeout time.Duration `json:"connect_timeout" yaml:"connect_timeout" mapstructure:"connect_timeout"`
KeepAlive time.Duration `json:"keep_alive" yaml:"keep_alive" mapstructure:"keep_alive"`
PingTimeout time.Duration `json:"ping_timeout" yaml:"ping_timeout" mapstructure:"ping_timeout"`
MaxReconnectDelay time.Duration `json:"max_reconnect_delay" yaml:"max_reconnect_delay" mapstructure:"max_reconnect_delay"`
// TLS/SSL
EnableTLS bool `json:"enable_tls" yaml:"enable_tls" mapstructure:"enable_tls"`
TLSCertFile string `json:"tls_cert_file,omitempty" yaml:"tls_cert_file,omitempty" mapstructure:"tls_cert_file"`
TLSKeyFile string `json:"tls_key_file,omitempty" yaml:"tls_key_file,omitempty" mapstructure:"tls_key_file"`
TLSCAFile string `json:"tls_ca_file,omitempty" yaml:"tls_ca_file,omitempty" mapstructure:"tls_ca_file"`
TLSSkipVerify bool `json:"tls_skip_verify" yaml:"tls_skip_verify" mapstructure:"tls_skip_verify"`
// QoS settings
DefaultQoS byte `json:"default_qos" yaml:"default_qos" mapstructure:"default_qos"`
// Retry and reliability
AutoReconnect bool `json:"auto_reconnect" yaml:"auto_reconnect" mapstructure:"auto_reconnect"`
ResumeSubs bool `json:"resume_subs" yaml:"resume_subs" mapstructure:"resume_subs"`
MaxReconnectAttempts int `json:"max_reconnect_attempts" yaml:"max_reconnect_attempts" mapstructure:"max_reconnect_attempts"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout" mapstructure:"write_timeout"`
// Message handling
MessageChannelDepth uint `json:"message_channel_depth" yaml:"message_channel_depth" mapstructure:"message_channel_depth"`
OrderMatters bool `json:"order_matters" yaml:"order_matters" mapstructure:"order_matters"`
MessageStore string `json:"message_store" yaml:"message_store" mapstructure:"message_store"` // "memory", "file"
StoreDirectory string `json:"store_directory,omitempty" yaml:"store_directory,omitempty" mapstructure:"store_directory"`
// Last Will and Testament
WillEnabled bool `json:"will_enabled" yaml:"will_enabled" mapstructure:"will_enabled"`
WillTopic string `json:"will_topic,omitempty" yaml:"will_topic,omitempty" mapstructure:"will_topic"`
WillPayload string `json:"will_payload,omitempty" yaml:"will_payload,omitempty" mapstructure:"will_payload"`
WillQoS byte `json:"will_qos" yaml:"will_qos" mapstructure:"will_qos"`
WillRetained bool `json:"will_retained" yaml:"will_retained" mapstructure:"will_retained"`
// Observability
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics" mapstructure:"enable_metrics"`
EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing" mapstructure:"enable_tracing"`
EnableLogging bool `json:"enable_logging" yaml:"enable_logging" mapstructure:"enable_logging"`
// Config loading flags
RequireConfig bool `json:"-" yaml:"-" mapstructure:"-"`
}
Config contains configuration for the MQTT extension
type ConfigOption ¶
type ConfigOption func(*Config)
ConfigOption is a functional option for Config
func WithAutoReconnect ¶
func WithAutoReconnect(enable bool) ConfigOption
func WithBroker ¶
func WithBroker(broker string) ConfigOption
func WithCleanSession ¶
func WithCleanSession(clean bool) ConfigOption
func WithClientID ¶
func WithClientID(clientID string) ConfigOption
func WithConfig ¶
func WithConfig(config Config) ConfigOption
func WithCredentials ¶
func WithCredentials(username, password string) ConfigOption
func WithKeepAlive ¶
func WithKeepAlive(duration time.Duration) ConfigOption
func WithMetrics ¶
func WithMetrics(enable bool) ConfigOption
func WithQoS ¶
func WithQoS(qos byte) ConfigOption
func WithRequireConfig ¶
func WithRequireConfig(require bool) ConfigOption
func WithTLS ¶
func WithTLS(certFile, keyFile, caFile string, skipVerify bool) ConfigOption
func WithTracing ¶
func WithTracing(enable bool) ConfigOption
type ConnectHandler ¶
type ConnectHandler = mqttclient.OnConnectHandler
ConnectHandler is called when connection is established
type ConnectionLostHandler ¶
type ConnectionLostHandler = mqttclient.ConnectionLostHandler
ConnectionLostHandler is called when connection is lost
type Extension ¶
type Extension struct {
*forge.BaseExtension
// contains filtered or unexported fields
}
Extension implements forge.Extension for MQTT functionality. The extension is now a lightweight facade that loads config and registers services.
func (*Extension) Health ¶
Health checks the extension health. Service health is managed by Vessel through MQTTService.Health().
type MQTT ¶
type MQTT interface {
// Connection management
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
IsConnected() bool
Reconnect() error
// Publishing
Publish(topic string, qos byte, retained bool, payload interface{}) error
PublishAsync(topic string, qos byte, retained bool, payload interface{}) error
// Subscribing
Subscribe(topic string, qos byte, handler MessageHandler) error
SubscribeMultiple(filters map[string]byte, handler MessageHandler) error
Unsubscribe(topics ...string) error
// Message handling
AddRoute(topic string, handler MessageHandler)
SetDefaultHandler(handler MessageHandler)
SetOnConnectHandler(handler ConnectHandler)
SetConnectionLostHandler(handler ConnectionLostHandler)
SetReconnectingHandler(handler ReconnectingHandler)
// Client info
GetClient() mqttclient.Client
GetStats() ClientStats
GetSubscriptions() []SubscriptionInfo
// Health
Ping(ctx context.Context) error
}
MQTT represents a unified MQTT client interface
type MQTTService ¶
type MQTTService struct {
// contains filtered or unexported fields
}
MQTTService wraps an MQTT client and provides lifecycle management. It implements vessel's di.Service interface so Vessel can manage its lifecycle.
func NewMQTTService ¶
func NewMQTTService(config Config, logger forge.Logger, metrics forge.Metrics) (*MQTTService, error)
NewMQTTService creates a new MQTT service with the given configuration. This is the constructor that will be registered with the DI container.
func (*MQTTService) Client ¶
func (s *MQTTService) Client() MQTT
Client returns the underlying MQTT client.
func (*MQTTService) Disconnect ¶
func (s *MQTTService) Disconnect(ctx context.Context) error
func (*MQTTService) Health ¶
func (s *MQTTService) Health(ctx context.Context) error
Health checks if the MQTT service is healthy.
func (*MQTTService) IsConnected ¶
func (s *MQTTService) IsConnected() bool
func (*MQTTService) Name ¶
func (s *MQTTService) Name() string
Name returns the service name for Vessel's lifecycle management.
func (*MQTTService) Start ¶
func (s *MQTTService) Start(ctx context.Context) error
Start starts the MQTT service by connecting to broker. This is called automatically by Vessel during container.Start().
func (*MQTTService) Stop ¶
func (s *MQTTService) Stop(ctx context.Context) error
Stop stops the MQTT service by disconnecting from broker. This is called automatically by Vessel during container.Stop().
func (*MQTTService) Subscribe ¶
func (s *MQTTService) Subscribe(ctx context.Context, topic string, handler MessageHandler, opts ...SubscribeOption) error
func (*MQTTService) Unsubscribe ¶
func (s *MQTTService) Unsubscribe(ctx context.Context, topic string) error
type MessageHandler ¶
type MessageHandler = mqttclient.MessageHandler
MessageHandler processes incoming MQTT messages This is an alias to the paho MQTT MessageHandler for compatibility
type ReconnectingHandler ¶
type ReconnectingHandler = mqttclient.ReconnectHandler
ReconnectingHandler is called when client is reconnecting
type SubscriptionInfo ¶
SubscriptionInfo contains subscription metadata