Documentation
¶
Index ¶
- Constants
- Variables
- type AuthConfig
- type BrokerConfig
- type BrokerInterface
- type BrokerMode
- type Client
- func (c *Client) AddSubscription(sub *Subscription)
- func (c *Client) Close()
- func (c *Client) GetMetadata(key string) (interface{}, bool)
- func (c *Client) GetSubscription(subID string) (*Subscription, bool)
- func (c *Client) RemoveSubscription(subID string)
- func (c *Client) SetMetadata(key string, value interface{})
- type ClientManager
- type Config
- type EmbeddedBroker
- func (eb *EmbeddedBroker) GetClientManager() *ClientManager
- func (eb *EmbeddedBroker) IsConnected() bool
- func (eb *EmbeddedBroker) Publish(topic string, qos byte, payload []byte) error
- func (eb *EmbeddedBroker) SetHandler(handler *Handler)
- func (eb *EmbeddedBroker) Start(ctx context.Context) error
- func (eb *EmbeddedBroker) Stop(ctx context.Context) error
- func (eb *EmbeddedBroker) Subscribe(topicFilter string, qos byte, callback MessageCallback) error
- func (eb *EmbeddedBroker) Unsubscribe(topicFilter string) error
- type ErrorInfo
- type ExternalBrokerClient
- func (ebc *ExternalBrokerClient) GetClientManager() *ClientManager
- func (ebc *ExternalBrokerClient) IsConnected() bool
- func (ebc *ExternalBrokerClient) Publish(topic string, qos byte, payload []byte) error
- func (ebc *ExternalBrokerClient) SetHandler(handler *Handler)
- func (ebc *ExternalBrokerClient) Start(ctx context.Context) error
- func (ebc *ExternalBrokerClient) Stop(ctx context.Context) error
- func (ebc *ExternalBrokerClient) Subscribe(topicFilter string, qos byte, callback MessageCallback) error
- func (ebc *ExternalBrokerClient) Unsubscribe(topicFilter string) error
- type ExternalBrokerConfig
- type Handler
- func NewHandler(db common.Database, registry common.ModelRegistry, config *Config) (*Handler, error)
- func NewHandlerWithBun(db *bun.DB, opts ...Option) (*Handler, error)
- func NewHandlerWithDatabase(db common.Database, registry common.ModelRegistry, opts ...Option) (*Handler, error)
- func NewHandlerWithGORM(db *gorm.DB, opts ...Option) (*Handler, error)
- func (h *Handler) GetDatabase() common.Database
- func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo
- func (h *Handler) Hooks() *HookRegistry
- func (h *Handler) Registry() common.ModelRegistry
- func (h *Handler) Shutdown() error
- func (h *Handler) Start() error
- type HookContext
- type HookFunc
- type HookRegistry
- type HookType
- type Message
- type MessageCallback
- type MessageType
- type NotificationMessage
- type OperationType
- type Option
- type QoSConfig
- type ResponseMessage
- type Subscription
- type SubscriptionManager
- type TimeoutConfig
- type TopicConfig
Constants ¶
const ( // CRUD operation hooks BeforeRead = websocketspec.BeforeRead AfterRead = websocketspec.AfterRead BeforeCreate = websocketspec.BeforeCreate AfterCreate = websocketspec.AfterCreate BeforeUpdate = websocketspec.BeforeUpdate AfterUpdate = websocketspec.AfterUpdate BeforeDelete = websocketspec.BeforeDelete AfterDelete = websocketspec.AfterDelete // Subscription hooks BeforeSubscribe = websocketspec.BeforeSubscribe AfterSubscribe = websocketspec.AfterSubscribe BeforeUnsubscribe = websocketspec.BeforeUnsubscribe AfterUnsubscribe = websocketspec.AfterUnsubscribe // Connection hooks BeforeConnect = websocketspec.BeforeConnect AfterConnect = websocketspec.AfterConnect BeforeDisconnect = websocketspec.BeforeDisconnect AfterDisconnect = websocketspec.AfterDisconnect )
Hook type constants - all 12 lifecycle hooks
const ( MessageTypeRequest = websocketspec.MessageTypeRequest MessageTypeResponse = websocketspec.MessageTypeResponse MessageTypeNotification = websocketspec.MessageTypeNotification MessageTypeSubscription = websocketspec.MessageTypeSubscription MessageTypeError = websocketspec.MessageTypeError MessageTypePing = websocketspec.MessageTypePing MessageTypePong = websocketspec.MessageTypePong )
Message type constants
const ( OperationRead = websocketspec.OperationRead OperationCreate = websocketspec.OperationCreate OperationUpdate = websocketspec.OperationUpdate OperationDelete = websocketspec.OperationDelete OperationSubscribe = websocketspec.OperationSubscribe OperationUnsubscribe = websocketspec.OperationUnsubscribe OperationMeta = websocketspec.OperationMeta )
Operation type constants
Variables ¶
var ( // NewResponseMessage creates a new response message NewResponseMessage = websocketspec.NewResponseMessage // NewErrorResponse creates an error response NewErrorResponse = websocketspec.NewErrorResponse // NewNotificationMessage creates a notification message NewNotificationMessage = websocketspec.NewNotificationMessage // ParseMessage parses a JSON message into a Message struct ParseMessage = websocketspec.ParseMessage )
Helper functions from websocketspec
Functions ¶
This section is empty.
Types ¶
type AuthConfig ¶
type AuthConfig struct {
// ValidateCredentials is called to validate username/password for embedded broker
// Return true if credentials are valid, false otherwise
ValidateCredentials func(username, password string) bool
}
AuthConfig for MQTT-level authentication
type BrokerConfig ¶
type BrokerConfig struct {
// Host to bind to (default: "localhost")
Host string
// Port to listen on (default: 1883)
Port int
// EnableWebSocket enables WebSocket support
EnableWebSocket bool
// WSPort is the WebSocket port (default: 8883)
WSPort int
// MaxConnections limits concurrent client connections
MaxConnections int
// KeepAlive is the client keepalive interval
KeepAlive time.Duration
// EnableAuth enables username/password authentication
EnableAuth bool
}
BrokerConfig configures the embedded Mochi MQTT broker
type BrokerInterface ¶
type BrokerInterface interface {
// Start initializes the broker/client connection
Start(ctx context.Context) error
// Stop gracefully shuts down the broker/client
Stop(ctx context.Context) error
// Publish sends a message to a topic
Publish(topic string, qos byte, payload []byte) error
// Subscribe subscribes to a topic pattern with callback
Subscribe(topicFilter string, qos byte, callback MessageCallback) error
// Unsubscribe removes subscription
Unsubscribe(topicFilter string) error
// IsConnected returns connection status
IsConnected() bool
// GetClientManager returns the client manager
GetClientManager() *ClientManager
// SetHandler sets the handler reference (needed for hooks)
SetHandler(handler *Handler)
}
BrokerInterface abstracts MQTT broker operations
type BrokerMode ¶
type BrokerMode string
BrokerMode specifies how to connect to MQTT
const ( // BrokerModeEmbedded runs Mochi MQTT broker in-process BrokerModeEmbedded BrokerMode = "embedded" // BrokerModeExternal connects to external MQTT broker as client BrokerModeExternal BrokerMode = "external" )
type Client ¶
type Client struct {
// ID is the MQTT client ID (unique per connection)
ID string
// Username from MQTT CONNECT packet
Username string
// ConnectedAt is when the client connected
ConnectedAt time.Time
// contains filtered or unexported fields
}
Client represents an MQTT client connection
func (*Client) AddSubscription ¶
func (c *Client) AddSubscription(sub *Subscription)
AddSubscription adds a subscription to this client
func (*Client) GetMetadata ¶
GetMetadata retrieves metadata for this client
func (*Client) GetSubscription ¶
func (c *Client) GetSubscription(subID string) (*Subscription, bool)
GetSubscription retrieves a subscription by ID
func (*Client) RemoveSubscription ¶
RemoveSubscription removes a subscription from this client
func (*Client) SetMetadata ¶
SetMetadata sets metadata for this client
type ClientManager ¶
type ClientManager struct {
// contains filtered or unexported fields
}
ClientManager manages all MQTT client connections
func NewClientManager ¶
func NewClientManager(ctx context.Context) *ClientManager
NewClientManager creates a new client manager
func (*ClientManager) Count ¶
func (cm *ClientManager) Count() int
Count returns the number of active clients
func (*ClientManager) GetClient ¶
func (cm *ClientManager) GetClient(clientID string) (*Client, bool)
GetClient retrieves a client by ID
func (*ClientManager) Register ¶
func (cm *ClientManager) Register(clientID, username string, handler *Handler) *Client
Register registers a new MQTT client
func (*ClientManager) Shutdown ¶
func (cm *ClientManager) Shutdown()
Shutdown gracefully shuts down the client manager
func (*ClientManager) Unregister ¶
func (cm *ClientManager) Unregister(clientID string)
Unregister removes a client
type Config ¶
type Config struct {
// BrokerMode determines whether to use embedded or external broker
BrokerMode BrokerMode
// Broker configuration for embedded mode
Broker BrokerConfig
// ExternalBroker configuration for external client mode
ExternalBroker ExternalBrokerConfig
// Topics configuration
Topics TopicConfig
// QoS configuration for different message types
QoS QoSConfig
// Auth configuration
Auth AuthConfig
// Timeouts for various operations
Timeouts TimeoutConfig
}
Config holds all mqttspec configuration
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a configuration with sensible defaults
type EmbeddedBroker ¶
type EmbeddedBroker struct {
// contains filtered or unexported fields
}
EmbeddedBroker wraps Mochi MQTT server
func NewEmbeddedBroker ¶
func NewEmbeddedBroker(config BrokerConfig, clientManager *ClientManager) *EmbeddedBroker
NewEmbeddedBroker creates a new embedded broker
func (*EmbeddedBroker) GetClientManager ¶
func (eb *EmbeddedBroker) GetClientManager() *ClientManager
GetClientManager returns the client manager
func (*EmbeddedBroker) IsConnected ¶
func (eb *EmbeddedBroker) IsConnected() bool
IsConnected returns whether the broker is running
func (*EmbeddedBroker) Publish ¶
func (eb *EmbeddedBroker) Publish(topic string, qos byte, payload []byte) error
Publish publishes a message to a topic
func (*EmbeddedBroker) SetHandler ¶
func (eb *EmbeddedBroker) SetHandler(handler *Handler)
SetHandler sets the handler reference
func (*EmbeddedBroker) Start ¶
func (eb *EmbeddedBroker) Start(ctx context.Context) error
Start starts the embedded MQTT broker
func (*EmbeddedBroker) Stop ¶
func (eb *EmbeddedBroker) Stop(ctx context.Context) error
Stop stops the embedded broker
func (*EmbeddedBroker) Subscribe ¶
func (eb *EmbeddedBroker) Subscribe(topicFilter string, qos byte, callback MessageCallback) error
Subscribe subscribes to a topic
func (*EmbeddedBroker) Unsubscribe ¶
func (eb *EmbeddedBroker) Unsubscribe(topicFilter string) error
Unsubscribe unsubscribes from a topic
type ExternalBrokerClient ¶
type ExternalBrokerClient struct {
// contains filtered or unexported fields
}
ExternalBrokerClient wraps Paho MQTT client
func NewExternalBrokerClient ¶
func NewExternalBrokerClient(config ExternalBrokerConfig, clientManager *ClientManager) *ExternalBrokerClient
NewExternalBrokerClient creates a new external broker client
func (*ExternalBrokerClient) GetClientManager ¶
func (ebc *ExternalBrokerClient) GetClientManager() *ClientManager
GetClientManager returns the client manager
func (*ExternalBrokerClient) IsConnected ¶
func (ebc *ExternalBrokerClient) IsConnected() bool
IsConnected returns connection status
func (*ExternalBrokerClient) Publish ¶
func (ebc *ExternalBrokerClient) Publish(topic string, qos byte, payload []byte) error
Publish publishes a message to a topic
func (*ExternalBrokerClient) SetHandler ¶
func (ebc *ExternalBrokerClient) SetHandler(handler *Handler)
SetHandler sets the handler reference
func (*ExternalBrokerClient) Start ¶
func (ebc *ExternalBrokerClient) Start(ctx context.Context) error
Start connects to the external MQTT broker
func (*ExternalBrokerClient) Stop ¶
func (ebc *ExternalBrokerClient) Stop(ctx context.Context) error
Stop disconnects from the external broker
func (*ExternalBrokerClient) Subscribe ¶
func (ebc *ExternalBrokerClient) Subscribe(topicFilter string, qos byte, callback MessageCallback) error
Subscribe subscribes to a topic
func (*ExternalBrokerClient) Unsubscribe ¶
func (ebc *ExternalBrokerClient) Unsubscribe(topicFilter string) error
Unsubscribe unsubscribes from a topic
type ExternalBrokerConfig ¶
type ExternalBrokerConfig struct {
// BrokerURL is the broker address (e.g., tcp://host:port or ssl://host:port)
BrokerURL string
// ClientID is a unique identifier for this handler instance
ClientID string
// Username for MQTT authentication
Username string
// Password for MQTT authentication
Password string
// CleanSession flag (default: true)
CleanSession bool
// KeepAlive interval (default: 60s)
KeepAlive time.Duration
// ConnectTimeout for initial connection (default: 30s)
ConnectTimeout time.Duration
// ReconnectDelay between reconnection attempts (default: 5s)
ReconnectDelay time.Duration
// MaxReconnect attempts (0 = unlimited, default: 0)
MaxReconnect int
// TLSConfig for SSL/TLS connections
TLSConfig *tls.Config
}
ExternalBrokerConfig for connecting as a client to external broker
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles MQTT messages and operations
func NewHandler ¶
func NewHandler(db common.Database, registry common.ModelRegistry, config *Config) (*Handler, error)
NewHandler creates a new MQTT handler
func NewHandlerWithBun ¶
NewHandlerWithBun creates an MQTT handler with Bun database adapter
func NewHandlerWithDatabase ¶
func NewHandlerWithDatabase(db common.Database, registry common.ModelRegistry, opts ...Option) (*Handler, error)
NewHandlerWithDatabase creates an MQTT handler with a custom database adapter
func NewHandlerWithGORM ¶
NewHandlerWithGORM creates an MQTT handler with GORM database adapter
func (*Handler) GetDatabase ¶
GetDatabase returns the database adapter
func (*Handler) GetRelationshipInfo ¶
func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo
GetRelationshipInfo is a placeholder for relationship detection
func (*Handler) Registry ¶
func (h *Handler) Registry() common.ModelRegistry
Registry returns the model registry
type HookContext ¶
type HookContext = websocketspec.HookContext
HookContext contains all context for hook execution Note: For MQTT, the Client is stored in Metadata["mqtt_client"]
type HookFunc ¶
type HookFunc = websocketspec.HookFunc
HookFunc is a function that executes during a lifecycle hook
type HookRegistry ¶
type HookRegistry = websocketspec.HookRegistry
HookRegistry manages all registered hooks
func NewHookRegistry ¶
func NewHookRegistry() *HookRegistry
NewHookRegistry creates a new hook registry
type Message ¶
type Message = websocketspec.Message
Message represents an MQTT message (identical to WebSocket message protocol)
type MessageCallback ¶
MessageCallback is called when a message arrives
type MessageType ¶
type MessageType = websocketspec.MessageType
MessageType defines the type of message
type NotificationMessage ¶
type NotificationMessage = websocketspec.NotificationMessage
NotificationMessage is sent to subscribers when data changes
type OperationType ¶
type OperationType = websocketspec.OperationType
OperationType defines the operation to perform
type Option ¶
Option is a functional option for configuring the handler
func WithEmbeddedBroker ¶
func WithEmbeddedBroker(config BrokerConfig) Option
WithEmbeddedBroker configures the handler to use an embedded MQTT broker
func WithExternalBroker ¶
func WithExternalBroker(config ExternalBrokerConfig) Option
WithExternalBroker configures the handler to connect to an external MQTT broker
func WithHooks ¶
func WithHooks(hooks *HookRegistry) Option
WithHooks sets a pre-configured hook registry
func WithTopicPrefix ¶
WithTopicPrefix sets a custom topic prefix (default: "spec")
type QoSConfig ¶
type QoSConfig struct {
// Request messages QoS (default: 1 - at-least-once)
Request byte
// Response messages QoS (default: 1 - at-least-once)
Response byte
// Notification messages QoS (default: 1 - at-least-once)
Notification byte
}
QoSConfig defines quality of service levels for different message types
type ResponseMessage ¶
type ResponseMessage = websocketspec.ResponseMessage
ResponseMessage is sent back to clients after processing requests
type Subscription ¶
type Subscription = websocketspec.Subscription
Subscription represents an active subscription to entity changes The key difference for MQTT: notifications are delivered via MQTT publish to spec/{client_id}/notify/{subscription_id} instead of WebSocket send
type SubscriptionManager ¶
type SubscriptionManager = websocketspec.SubscriptionManager
SubscriptionManager manages all active subscriptions
func NewSubscriptionManager ¶
func NewSubscriptionManager() *SubscriptionManager
NewSubscriptionManager creates a new subscription manager
type TimeoutConfig ¶
type TimeoutConfig struct {
// Connect timeout for MQTT connection (default: 30s)
Connect time.Duration
// Publish timeout for publishing messages (default: 5s)
Publish time.Duration
// Disconnect timeout for graceful shutdown (default: 10s)
Disconnect time.Duration
}
TimeoutConfig defines timeouts for various operations
type TopicConfig ¶
type TopicConfig struct {
// Prefix for all topics (default: "spec")
// Topics will be: {Prefix}/{client_id}/request|response|notify/{sub_id}
Prefix string
}
TopicConfig defines the MQTT topic structure