Documentation
¶
Index ¶
- Constants
- Variables
- func BuildTopicPath(pkg, service, method, sessionID, direction string) string
- func CopyBytes(b []byte) []byte
- func CopyString(s string) string
- func NewBridgeError(op, message string, err error) error
- func NewConnectionFailedError(sessionID string, err error) error
- func NewInvalidSessionError(sessionID string) error
- func NewInvalidStateError(sessionID string) error
- func NewLargeMessage(size int) *bytes.Buffer
- func NewSessionActiveError(sessionID string) error
- func NewSessionNotFoundError(sessionID string) error
- func NewSessionSuspendedError(sessionID string) error
- func NewUnauthorizedError(sessionID string) error
- func ParseTopicPath(topic string) (pkg, service, method, sessionID, direction string, err error)
- func ReleaseLargeMessage(buf *bytes.Buffer)
- func SetMaxFragmentSize(size int)
- func UnsafeBytes(s string) []byte
- func UnsafeString(b []byte) string
- type BridgeConfig
- type BridgeError
- type BridgeHook
- type BridgeHookBase
- func (h *BridgeHookBase) ID() string
- func (h *BridgeHookBase) Init(config any) error
- func (h *BridgeHookBase) OnMessageReceived(msg []byte) []byte
- func (h *BridgeHookBase) OnSessionCreated(session *SessionInfo) error
- func (h *BridgeHookBase) OnSessionDisconnected(session *SessionInfo) error
- func (h *BridgeHookBase) OnSessionResumed(session *SessionInfo) error
- func (h *BridgeHookBase) OnSessionSuspended(session *SessionInfo) error
- func (h *BridgeHookBase) Provides(b byte) bool
- func (h *BridgeHookBase) SetOpts(l *zap.Logger, opts *HookOptions)
- func (h *BridgeHookBase) Stop() error
- func (h *BridgeHookBase) StoredSessions() ([]SessionInfo, error)
- type BridgeHooks
- func (h *BridgeHooks) Add(hook BridgeHook, config any) error
- func (h *BridgeHooks) GetAll() []BridgeHook
- func (h *BridgeHooks) Len() int64
- func (h *BridgeHooks) OnMessageReceived(msg []byte) []byte
- func (h *BridgeHooks) OnSessionCreated(session *SessionInfo) error
- func (h *BridgeHooks) OnSessionDisconnected(session *SessionInfo) error
- func (h *BridgeHooks) OnSessionResumed(session *SessionInfo) error
- func (h *BridgeHooks) OnSessionSuspended(session *SessionInfo) error
- func (h *BridgeHooks) Provides(b ...byte) bool
- func (h *BridgeHooks) Stop()
- func (h *BridgeHooks) StoredSessions() ([]SessionInfo, error)
- type BridgeOption
- func WithCleanUpInterval(interval time.Duration) BridgeOption
- func WithLogger(logger *zap.Logger) BridgeOption
- func WithMQTTClient(client mqtt.Client) BridgeOption
- func WithMaxConnections(max int) BridgeOption
- func WithQoS(qos byte) BridgeOption
- func WithRateBurst(burst int) BridgeOption
- func WithRateLimit(ops float64) BridgeOption
- func WithRootTopic(topic string) BridgeOption
- type BridgeSessionState
- type Frame
- type FrameOption
- type Header
- type HookLoadConfig
- type HookOptions
- type ISessionStore
- type MQTTAddr
- type MQTTBridge
- type MQTTNetBridge
- func (b *MQTTNetBridge) Accept() (net.Conn, error)
- func (b *MQTTNetBridge) AddHook(hook BridgeHook, config any) error
- func (b *MQTTNetBridge) Addr() net.Addr
- func (b *MQTTNetBridge) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error)
- func (b *MQTTNetBridge) CleanupStaleSessions(timeout time.Duration)
- func (b *MQTTNetBridge) Close() error
- func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string, opts ...SessionOption) (net.Conn, error)
- func (b *MQTTNetBridge) DisconnectSession(sessionID string) error
- func (b *MQTTNetBridge) HandleLifecycleMessage(payload []byte, topic string)
- func (b *MQTTNetBridge) ListenOnUnixSocket(path string, addr string) error
- func (b *MQTTNetBridge) ResumeSession(ctx context.Context, targetBridgeID, sessionID string) (net.Conn, error)
- func (b *MQTTNetBridge) Scheme() string
- func (b *MQTTNetBridge) SuspendSession(sessionID string) error
- func (b *MQTTNetBridge) WriteOnUnixSocket(path string, addr string) (net.Conn, error)
- type MQTTNetBridgeConn
- func (c *MQTTNetBridgeConn) Close() error
- func (c *MQTTNetBridgeConn) LocalAddr() net.Addr
- func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)
- func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr
- func (c *MQTTNetBridgeConn) SessionID() string
- func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) Write(b []byte) (n int, err error)
- type MessageType
- type Session
- type SessionConfig
- type SessionError
- type SessionEvent
- type SessionEventHandler
- type SessionInfo
- type SessionManager
- func (sm *SessionManager) AddSession(sessionID string, session *SessionInfo)
- func (sm *SessionManager) CleanupStaleSessions(defaultTimeout time.Duration)
- func (sm *SessionManager) Close()
- func (sm *SessionManager) CreateSession(sessionID, clientID string, timeout time.Duration) *SessionInfo
- func (sm *SessionManager) DisconnectSession(sessionID string) error
- func (sm *SessionManager) GetAllSessions() map[string]*SessionInfo
- func (sm *SessionManager) GetSession(sessionID string) (*SessionInfo, bool)
- func (sm *SessionManager) HandleConnectionEstablished(sessionID string, conn *MQTTNetBridgeConn, clientID string, ...) error
- func (sm *SessionManager) HandleDisconnect(clientID, sessionID string) error
- func (sm *SessionManager) HandleLifecycleMessage(payload []byte, topic string)
- func (sm *SessionManager) HandleSessionError(sessionID string, errorType string) error
- func (sm *SessionManager) RemoveSession(sessionID string)
- func (sm *SessionManager) ResumeSession(sessionID string) error
- func (sm *SessionManager) SuspendSession(sessionID string, clientID string) error
- func (sm *SessionManager) UpdateStore(store ISessionStore) error
- type SessionOption
- type SessionState
- type StreamContext
- type StreamState
Constants ¶
const ( SetOptions byte = iota OnMessageReceived OnSessionCreated OnSessionResumed OnSessionSuspended OnSessionDisconnected StoredSessions )
const ( MinHeaderSize = 16 // Fixed header size without StreamID MaxStreamIDSize = 256 // Maximum length for StreamID MinSequenceNum = 1 // Minimum sequence number MaxSequenceNum uint64 = 1<<64 - 1 // Maximum sequence number (uint64 max) )
Variables ¶
var ( ErrSessionActive = &SessionError{BridgeError: BridgeError{Message: "session is already active"}} ErrSessionNotFound = &SessionError{BridgeError: BridgeError{Message: "session not found"}} ErrInvalidSession = &SessionError{BridgeError: BridgeError{Message: "invalid session"}} ErrSessionSuspended = &SessionError{BridgeError: BridgeError{Message: "session is suspended"}} ErrConnectionFailed = &SessionError{BridgeError: BridgeError{Message: "failed to create connection"}} // Additional error types ErrGeneric = &SessionError{BridgeError: BridgeError{Message: "error"}} ErrSessionClosed = &SessionError{BridgeError: BridgeError{Message: "session is closed"}} ErrInvalidState = &SessionError{BridgeError: BridgeError{Message: "invalid session state"}} ErrSessionExpired = &SessionError{BridgeError: BridgeError{Message: "session has expired"}} ErrMaxSessions = &SessionError{BridgeError: BridgeError{Message: "maximum number of sessions reached"}} )
Specific error types for different scenarios
var ( MaxFragmentSize = 10 * 1024 // 10KB per fragment HeaderSize = 16 // Fixed header size )
var ( // ErrInvalidConfigType indicates a different Type of config value was expected to what was received. ErrInvalidConfigType = errors.New("invalid config type provided") )
Functions ¶
func BuildTopicPath ¶
BuildTopicPath creates an MQTT topic path for a given service and method
func CopyString ¶ added in v0.1.3
CopyString copies a string to make it immutable
func NewBridgeError ¶ added in v0.1.4
Helper function to create a generic bridge error
func NewConnectionFailedError ¶ added in v0.1.4
func NewInvalidSessionError ¶ added in v0.1.4
func NewInvalidStateError ¶ added in v0.1.4
NewInvalidStateError creates a new error for invalid session state
func NewLargeMessage ¶
Add a new helper for creating large messages
func NewSessionActiveError ¶ added in v0.1.4
Error constructors for common operations
func NewSessionNotFoundError ¶ added in v0.1.4
func NewSessionSuspendedError ¶ added in v0.1.4
func NewUnauthorizedError ¶ added in v0.1.4
func ParseTopicPath ¶
ParseTopicPath extracts components from an MQTT topic path
func ReleaseLargeMessage ¶
Add a helper to return large message buffers to the pool
func SetMaxFragmentSize ¶
func SetMaxFragmentSize(size int)
func UnsafeBytes ¶ added in v0.1.3
UnsafeBytes returns a byte pointer without allocation.
func UnsafeString ¶ added in v0.1.3
UnsafeString returns a string pointer without allocation
Types ¶
type BridgeConfig ¶ added in v0.1.3
type BridgeConfig struct {
// contains filtered or unexported fields
}
BridgeConfig holds bridge-specific configuration
type BridgeError ¶
type BridgeError struct {
Op string // Operation that failed
Message string // Human-readable error message
Err error // Underlying error if any
}
BridgeError represents a base error type for bridge-related errors
func (*BridgeError) Error ¶
func (e *BridgeError) Error() string
func (*BridgeError) Is ¶ added in v0.1.4
func (e *BridgeError) Is(target error) bool
Is implements the interface for errors.Is functionality
func (*BridgeError) Unwrap ¶
func (e *BridgeError) Unwrap() error
type BridgeHook ¶ added in v0.1.3
type BridgeHook interface {
// ID returns the unique identifier for this hook
ID() string
// Init initializes the hook with the provided configuration
Init(config any) error
// Stop gracefully stops the hook
Stop() error
// Provides indicates whether this hook provides the specified functionality
Provides(b byte) bool
// SetOpts is called by the server to propagate internal values
SetOpts(l *zap.Logger, o *HookOptions)
// OnMessageReceived processes incoming messages
OnMessageReceived(msg []byte) []byte
// OnSessionCreated is called when a new session is created
OnSessionCreated(session *SessionInfo) error
// OnSessionResumed is called when a session is resumed
OnSessionResumed(session *SessionInfo) error
// OnSessionSuspended is called when a session is suspended
OnSessionSuspended(session *SessionInfo) error
// OnSessionDisconnected is called when a session is disconnected
OnSessionDisconnected(session *SessionInfo) error
// StoredSessions returns stored sessions
StoredSessions() ([]SessionInfo, error)
}
BridgeHook defines the interface for bridge hooks
type BridgeHookBase ¶ added in v0.1.4
type BridgeHookBase struct {
BridgeHook
Log *zap.Logger
Opts *HookOptions
}
BridgeHookBase provides a set of default methods for each hook
func (*BridgeHookBase) ID ¶ added in v0.1.4
func (h *BridgeHookBase) ID() string
ID returns the ID of the hook
func (*BridgeHookBase) Init ¶ added in v0.1.4
func (h *BridgeHookBase) Init(config any) error
Init initializes the hook
func (*BridgeHookBase) OnMessageReceived ¶ added in v0.1.4
func (h *BridgeHookBase) OnMessageReceived(msg []byte) []byte
OnMessageReceived processes incoming messages
func (*BridgeHookBase) OnSessionCreated ¶ added in v0.1.4
func (h *BridgeHookBase) OnSessionCreated(session *SessionInfo) error
OnSessionCreated is called when a new session is created
func (*BridgeHookBase) OnSessionDisconnected ¶ added in v0.1.4
func (h *BridgeHookBase) OnSessionDisconnected(session *SessionInfo) error
OnSessionDisconnected is called when a session is disconnected
func (*BridgeHookBase) OnSessionResumed ¶ added in v0.1.4
func (h *BridgeHookBase) OnSessionResumed(session *SessionInfo) error
OnSessionResumed is called when a session is resumed
func (*BridgeHookBase) OnSessionSuspended ¶ added in v0.1.4
func (h *BridgeHookBase) OnSessionSuspended(session *SessionInfo) error
OnSessionSuspended is called when a session is suspended
func (*BridgeHookBase) Provides ¶ added in v0.1.4
func (h *BridgeHookBase) Provides(b byte) bool
Provides indicates which methods a hook provides
func (*BridgeHookBase) SetOpts ¶ added in v0.1.4
func (h *BridgeHookBase) SetOpts(l *zap.Logger, opts *HookOptions)
SetOpts sets the options for the hook
func (*BridgeHookBase) Stop ¶ added in v0.1.4
func (h *BridgeHookBase) Stop() error
Stop stops the hook
func (*BridgeHookBase) StoredSessions ¶ added in v0.1.4
func (h *BridgeHookBase) StoredSessions() ([]SessionInfo, error)
StoredSessions returns stored sessions
type BridgeHooks ¶ added in v0.1.3
BridgeHooks manages a collection of hooks
func (*BridgeHooks) Add ¶ added in v0.1.3
func (h *BridgeHooks) Add(hook BridgeHook, config any) error
Add adds a new hook to the collection
func (*BridgeHooks) GetAll ¶ added in v0.1.3
func (h *BridgeHooks) GetAll() []BridgeHook
GetAll returns a slice of all the hooks.
func (*BridgeHooks) Len ¶ added in v0.1.3
func (h *BridgeHooks) Len() int64
Len returns the number of hooks added.
func (*BridgeHooks) OnMessageReceived ¶ added in v0.1.3
func (h *BridgeHooks) OnMessageReceived(msg []byte) []byte
OnMessageReceived processes a message through all hooks
func (*BridgeHooks) OnSessionCreated ¶ added in v0.1.4
func (h *BridgeHooks) OnSessionCreated(session *SessionInfo) error
OnSessionCreated calls the OnSessionCreated hook for all hooks that provide it
func (*BridgeHooks) OnSessionDisconnected ¶ added in v0.1.4
func (h *BridgeHooks) OnSessionDisconnected(session *SessionInfo) error
OnSessionDisconnected calls the OnSessionDisconnected hook for all hooks that provide it
func (*BridgeHooks) OnSessionResumed ¶ added in v0.1.4
func (h *BridgeHooks) OnSessionResumed(session *SessionInfo) error
OnSessionResumed calls the OnSessionResumed hook for all hooks that provide it
func (*BridgeHooks) OnSessionSuspended ¶ added in v0.1.4
func (h *BridgeHooks) OnSessionSuspended(session *SessionInfo) error
OnSessionSuspended calls the OnSessionSuspended hook for all hooks that provide it
func (*BridgeHooks) Provides ¶ added in v0.1.3
func (h *BridgeHooks) Provides(b ...byte) bool
Provides returns true if any one hook provides any of the requested hook methods.
func (*BridgeHooks) StoredSessions ¶ added in v0.1.4
func (h *BridgeHooks) StoredSessions() ([]SessionInfo, error)
StoredSessions calls the StoredSessions hook for all hooks that provide it
type BridgeOption ¶ added in v0.1.3
type BridgeOption func(*BridgeConfig)
BridgeOption configures bridge behavior
func WithCleanUpInterval ¶ added in v0.1.4
func WithCleanUpInterval(interval time.Duration) BridgeOption
func WithLogger ¶
func WithLogger(logger *zap.Logger) BridgeOption
WithLogger sets the logger for the bridge
func WithMQTTClient ¶
func WithMQTTClient(client mqtt.Client) BridgeOption
WithMQTTClient sets the MQTT client for the bridge
func WithMaxConnections ¶ added in v0.1.4
func WithMaxConnections(max int) BridgeOption
WithMaxConnections sets the maximum number of concurrent connections
func WithQoS ¶ added in v0.1.3
func WithQoS(qos byte) BridgeOption
WithQoS sets the MQTT QoS level for the bridge
func WithRateBurst ¶ added in v0.1.4
func WithRateBurst(burst int) BridgeOption
WithRateBurst sets the burst size for rate limiting
func WithRateLimit ¶ added in v0.1.4
func WithRateLimit(ops float64) BridgeOption
WithRateLimit sets the rate limit for operations
func WithRootTopic ¶ added in v0.1.3
func WithRootTopic(topic string) BridgeOption
WithRootTopic sets the root topic for the bridge
type BridgeSessionState ¶ added in v0.1.3
type BridgeSessionState int
BridgeSessionState represents the current state of a bridge session
const ( BridgeSessionStateActive BridgeSessionState = iota BridgeSessionStateSuspended BridgeSessionStateClosed )
func (BridgeSessionState) String ¶ added in v0.1.3
func (s BridgeSessionState) String() string
String returns the string representation of BridgeSessionState
type Frame ¶
Frame represents a complete protocol frame including header and payload
func FrameMessage ¶
func FrameMessage(data []byte, seqNum uint64, msgType MessageType) []Frame
FrameMessage splits a large message into frames
func NewFrame ¶
func NewFrame(msgType MessageType, seqNum uint64, data []byte, opts ...FrameOption) (*Frame, error)
NewFrame creates a new Frame with validated parameters
func UnmarshalFrame ¶
UnmarshalFrame converts a byte slice into a Frame
type FrameOption ¶
FrameOption defines options for frame creation
func WithFragmentation ¶
func WithFragmentation(fragmentID uint16, total uint16, seq uint16, isLast bool) FrameOption
WithFragmentation sets fragmentation parameters for a frame
func WithStreamID ¶
func WithStreamID(streamID string) FrameOption
WithStreamID sets the StreamID for a frame
type HookLoadConfig ¶ added in v0.1.4
type HookLoadConfig struct {
Hook BridgeHook
Config any
}
HookLoadConfig contains the hook and configuration as loaded from a configuration (usually file).
type HookOptions ¶ added in v0.1.4
type HookOptions struct {
}
HookOptions contains values which are inherited from the server on initialisation.
type ISessionStore ¶ added in v0.1.4
type ISessionStore interface {
GetStoredSessions() (map[string]*SessionInfo, error)
}
ISessionStore defines the interface for session storage providers
type MQTTAddr ¶
type MQTTAddr struct {
// contains filtered or unexported fields
}
MQTTAddr implements net.Addr for MQTT connections
type MQTTBridge ¶
type MQTTBridge struct {
// contains filtered or unexported fields
}
func NewMQTTBridge ¶
func (*MQTTBridge) GetServiceInfo ¶
func (b *MQTTBridge) GetServiceInfo() map[string]grpc.ServiceInfo
GetServiceInfo implements grpc.ServiceInfoProvider
func (*MQTTBridge) RegisterService ¶
func (b *MQTTBridge) RegisterService(desc *grpc.ServiceDesc, impl interface{})
RegisterService implements grpc.ServiceRegistrar
type MQTTNetBridge ¶
type MQTTNetBridge struct {
// contains filtered or unexported fields
}
MQTTNetBridge implements net.Listener over MQTT
func NewMQTTNetBridge ¶
func NewMQTTNetBridge(mqttClient mqtt.Client, bridgeID string, opts ...BridgeOption) *MQTTNetBridge
NewMQTTNetBridge creates a new bridge that listens on a specific bridgeID
func (*MQTTNetBridge) Accept ¶
func (b *MQTTNetBridge) Accept() (net.Conn, error)
Accept implements net.Listener.Accept
func (*MQTTNetBridge) AddHook ¶ added in v0.1.3
func (b *MQTTNetBridge) AddHook(hook BridgeHook, config any) error
AddHook adds a new hook to the bridge
func (*MQTTNetBridge) Addr ¶
func (b *MQTTNetBridge) Addr() net.Addr
Addr implements net.Listener.Addr
func (*MQTTNetBridge) Build ¶ added in v0.1.1
func (b *MQTTNetBridge) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error)
func (*MQTTNetBridge) CleanupStaleSessions ¶ added in v0.1.3
func (b *MQTTNetBridge) CleanupStaleSessions(timeout time.Duration)
CleanupStaleSessions removes sessions that have been suspended longer than the timeout
func (*MQTTNetBridge) Close ¶
func (b *MQTTNetBridge) Close() error
Close implements net.Listener.Close
func (*MQTTNetBridge) Dial ¶
func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string, opts ...SessionOption) (net.Conn, error)
Dial creates a new connection to a specific bridge
func (*MQTTNetBridge) DisconnectSession ¶ added in v0.1.4
func (b *MQTTNetBridge) DisconnectSession(sessionID string) error
DisconnectSession disconnects an active session and cleans it up
func (*MQTTNetBridge) HandleLifecycleMessage ¶ added in v0.1.4
func (b *MQTTNetBridge) HandleLifecycleMessage(payload []byte, topic string)
HandleLifecycleMessage processes lifecycle messages for sessions
func (*MQTTNetBridge) ListenOnUnixSocket ¶ added in v0.1.3
func (b *MQTTNetBridge) ListenOnUnixSocket(path string, addr string) error
listens for connections on a unix socket and proxies them to the bridge
func (*MQTTNetBridge) ResumeSession ¶ added in v0.1.3
func (b *MQTTNetBridge) ResumeSession(ctx context.Context, targetBridgeID, sessionID string) (net.Conn, error)
ResumeSession attempts to resume a suspended session
func (*MQTTNetBridge) Scheme ¶ added in v0.1.1
func (b *MQTTNetBridge) Scheme() string
func (*MQTTNetBridge) SuspendSession ¶ added in v0.1.3
func (b *MQTTNetBridge) SuspendSession(sessionID string) error
SuspendSession suspends an active session for later resumption
func (*MQTTNetBridge) WriteOnUnixSocket ¶ added in v0.1.3
type MQTTNetBridgeConn ¶
type MQTTNetBridgeConn struct {
// contains filtered or unexported fields
}
MQTTNetBridgeConn implements net.Conn over MQTT
func (*MQTTNetBridgeConn) Close ¶
func (c *MQTTNetBridgeConn) Close() error
func (*MQTTNetBridgeConn) LocalAddr ¶
func (c *MQTTNetBridgeConn) LocalAddr() net.Addr
func (*MQTTNetBridgeConn) Read ¶
func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)
Implement net.Conn interface stubs (we'll flesh these out next)
func (*MQTTNetBridgeConn) RemoteAddr ¶
func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr
func (*MQTTNetBridgeConn) SessionID ¶ added in v0.1.4
func (c *MQTTNetBridgeConn) SessionID() string
func (*MQTTNetBridgeConn) SetDeadline ¶
func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error
func (*MQTTNetBridgeConn) SetReadDeadline ¶
func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error
func (*MQTTNetBridgeConn) SetWriteDeadline ¶
func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error
type MessageType ¶
type MessageType uint8
const ( MessageTypeData MessageType = iota MessageTypeStreamInit MessageTypeStreamEnd MessageTypeHeader MessageTypeError )
type Session ¶
type Session struct {
ID string
ServiceName string
LastActive time.Time
State SessionState
// contains filtered or unexported fields
}
Session represents an active MQTT-GRPC bridge session
func NewSession ¶
func NewSession(id, serviceName string, eventHandler SessionEventHandler) *Session
NewSession creates a new session with proper initialization
func (*Session) AddStream ¶
func (s *Session) AddStream(methodName string) (*StreamContext, error)
AddStream adds a new stream to the session
func (*Session) CloseStream ¶
CloseStream gracefully closes a stream
func (*Session) UpdateActivity ¶
UpdateActivity updates the last active timestamp for the session and stream
type SessionConfig ¶ added in v0.1.3
type SessionConfig struct {
SessionID string
State BridgeSessionState
Timeout time.Duration
DialTimeout time.Duration
}
SessionConfig holds session-specific configuration
type SessionError ¶ added in v0.1.4
type SessionError struct {
BridgeError
SessionID string
}
SessionError represents session-specific errors
func (*SessionError) Error ¶ added in v0.1.4
func (e *SessionError) Error() string
func (*SessionError) Is ¶ added in v0.1.4
func (e *SessionError) Is(target error) bool
Is implements the interface for errors.Is functionality
type SessionEvent ¶
type SessionEvent int
SessionEvent represents different session lifecycle events
const ( SessionEventCreated SessionEvent = iota SessionEventActivated SessionEventStreamStarted SessionEventStreamEnded SessionEventClosing SessionEventClosed SessionEventTimeout )
type SessionEventHandler ¶
type SessionEventHandler func(session *Session, event SessionEvent)
SessionEventHandler is called when session state changes occur
type SessionInfo ¶ added in v0.1.3
type SessionInfo struct {
ID string
ClientID string // Current/last client that owns this session
State BridgeSessionState
LastActive time.Time
LastSuspended time.Time
Connection *MQTTNetBridgeConn
Metadata map[string]string
Timeout time.Duration
}
SessionInfo tracks session state and metadata
type SessionManager ¶ added in v0.1.4
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager handles session lifecycle and state management
func NewSessionManager ¶ added in v0.1.4
func NewSessionManager(bridge *MQTTNetBridge, logger *zap.Logger, cleanUpInterval time.Duration) *SessionManager
NewSessionManager creates a new session manager
func (*SessionManager) AddSession ¶ added in v0.1.4
func (sm *SessionManager) AddSession(sessionID string, session *SessionInfo)
AddSession adds a new session to the manager
func (*SessionManager) CleanupStaleSessions ¶ added in v0.1.4
func (sm *SessionManager) CleanupStaleSessions(defaultTimeout time.Duration)
CleanupStaleSessions removes sessions that have been suspended longer than the timeout
func (*SessionManager) Close ¶ added in v0.1.4
func (sm *SessionManager) Close()
Close stops the session manager and cleans up resources
func (*SessionManager) CreateSession ¶ added in v0.1.4
func (sm *SessionManager) CreateSession(sessionID, clientID string, timeout time.Duration) *SessionInfo
CreateSession creates a new session
func (*SessionManager) DisconnectSession ¶ added in v0.1.4
func (sm *SessionManager) DisconnectSession(sessionID string) error
DisconnectSession disconnects and cleans up a session
func (*SessionManager) GetAllSessions ¶ added in v0.1.4
func (sm *SessionManager) GetAllSessions() map[string]*SessionInfo
GetAllSessions returns all sessions
func (*SessionManager) GetSession ¶ added in v0.1.4
func (sm *SessionManager) GetSession(sessionID string) (*SessionInfo, bool)
GetSession retrieves a session by ID
func (*SessionManager) HandleConnectionEstablished ¶ added in v0.1.4
func (sm *SessionManager) HandleConnectionEstablished(sessionID string, conn *MQTTNetBridgeConn, clientID string, timeout time.Duration) error
HandleConnectionEstablished handles a new or resumed connection
func (*SessionManager) HandleDisconnect ¶ added in v0.1.4
func (sm *SessionManager) HandleDisconnect(clientID, sessionID string) error
HandleDisconnect processes a disconnect message for a session
func (*SessionManager) HandleLifecycleMessage ¶ added in v0.1.4
func (sm *SessionManager) HandleLifecycleMessage(payload []byte, topic string)
HandleLifecycleMessage processes lifecycle messages for sessions
func (*SessionManager) HandleSessionError ¶ added in v0.1.4
func (sm *SessionManager) HandleSessionError(sessionID string, errorType string) error
HandleSessionError processes session error messages
func (*SessionManager) RemoveSession ¶ added in v0.1.4
func (sm *SessionManager) RemoveSession(sessionID string)
RemoveSession removes a session from the manager
func (*SessionManager) ResumeSession ¶ added in v0.1.4
func (sm *SessionManager) ResumeSession(sessionID string) error
ResumeSession attempts to resume a suspended session
func (*SessionManager) SuspendSession ¶ added in v0.1.4
func (sm *SessionManager) SuspendSession(sessionID string, clientID string) error
SuspendSession suspends an active session
func (*SessionManager) UpdateStore ¶ added in v0.1.4
func (sm *SessionManager) UpdateStore(store ISessionStore) error
UpdateStore updates the session store and loads sessions from it
type SessionOption ¶ added in v0.1.3
type SessionOption func(*SessionConfig)
SessionOption configures session behavior
func WithDialTimeout ¶ added in v0.1.4
func WithDialTimeout(timeout time.Duration) SessionOption
func WithSessionID ¶ added in v0.1.3
func WithSessionID(sessionID string) SessionOption
WithSessionID sets a specific session ID for connection
func WithSessionState ¶ added in v0.1.3
func WithSessionState(state BridgeSessionState) SessionOption
WithSessionState sets the initial session state
func WithSessionTimeout ¶ added in v0.1.4
func WithSessionTimeout(timeout time.Duration) SessionOption
type SessionState ¶
type SessionState int
const ( SessionStateActive SessionState = iota SessionStateClosing SessionStateClosed )
type StreamContext ¶
type StreamContext struct {
StreamID string // Usually method name + unique identifier
Method string
Created time.Time
LastActive time.Time
State StreamState
Cancel context.CancelFunc // For cancelling individual streams
Context context.Context
Extra interface{} // For storing the serverStream
}
StreamContext holds the context for an active stream
type StreamState ¶
type StreamState int
const ( StreamStateActive StreamState = iota StreamStateClosing StreamStateClosed )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
grpc_bridge/client
command
|
|
|
grpc_bridge/server
command
|
|
|
net_bridge/client
command
|
|
|
net_bridge/server
command
|
|
|
pty/client
command
|
|
|
pty/server
command
|
|
|
hooks
|
|