bridge

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2025 License: MIT Imports: 26 Imported by: 0

README

MQTT-Bridge

MQTT-Bridge is a library that allows you to bridge protocols over MQTT, with a focus on gRPC. It provides two implementations:

  • Network Bridge: A low-level implementation that allows existing gRPC clients and servers to communicate over MQTT without modification. The network bridge provides a net.Listener and net.Conn interface.
  • gRPC Bridge: A higher-level implementation that works directly with MQTT messages while maintaining gRPC-style APIs. The gRPC bridge provides a grpc.ServiceRegistrar and grpc.ServiceInfoProvider interface.

In theory, the network bridge should work with anything that uses the net.Listener and net.Conn interfaces, such as HTTP servers and other networking libraries.

Example Overview

This example demonstrates how to use mqtt-bridge to enable gRPC-style communication over MQTT. The project includes two different implementations showing how to bridge gRPC and MQTT communications.

Overview

The example implements a simple Echo service with three types of RPCs:

  • Unary calls (simple request-response)
  • Server streaming (server sends multiple responses)
  • Bidirectional streaming (both client and server can send multiple messages)

Prerequisites

  • Go 1.19 or later
  • An MQTT broker (e.g., Mosquitto) running on localhost:1883
  • Protocol buffer compiler (protoc)

Service Definition

The Echo service is defined in the proto file:

startLine: 6
endLine: 15

Implementation Options

1. Network Bridge Implementation

The network bridge provides a low-level network implementation that allows existing gRPC clients and servers to communicate over MQTT without modification.

Server Setup
startLine: 34
endLine: 45
Client Setup
startLine: 37
endLine: 55

To run:

# Start the server
go run example/net_bridge/server/main.go

# In another terminal, start the client
go run example/net_bridge/client/main.go
2. gRPC Bridge Implementation

The gRPC bridge provides a higher-level abstraction that works directly with MQTT messages while maintaining gRPC-style APIs.

Server Setup
startLine: 33
endLine: 39
Client Setup
startLine: 30
endLine: 51

To run:

# Start the server
go run example/grpc_bridge/server/main.go

# In another terminal, start the client
go run example/grpc_bridge/client/main.go

Service Implementation

The Echo service implements three types of RPCs:

  1. Unary Call - Simple request-response:
startLine: 23
endLine: 33
  1. Server Streaming - Server sends multiple responses:
startLine: 36
endLine: 55
  1. Bidirectional Streaming - Both sides can send messages:
startLine: 58
endLine: 82

Key Features

  • Seamless conversion between gRPC and MQTT communication
  • Support for all gRPC communication patterns:
    • Unary calls
    • Server streaming
    • Client streaming
    • Bidirectional streaming
  • Automatic message framing and protocol handling
  • Integration with existing gRPC tooling
  • Choice between network-level and message-level implementations

Notes

  • The network bridge implementation is ideal when you want to use existing gRPC code over MQTT
  • The gRPC bridge implementation is better when you want to work directly with MQTT messages while maintaining gRPC-style APIs
  • Both implementations support the full range of gRPC features
  • Ensure your MQTT broker is properly configured and accessible before running the examples
  • The gRPC bridge implementation has not been tested for streaming RPCs, only unary has been tested as of now.

License

MIT License

Copyright 2024 Golain Systems Private Limited.

Documentation

Index

Constants

View Source
const (
	SetOptions byte = iota
	OnMessageReceived
	OnSessionCreated
	OnSessionResumed
	OnSessionSuspended
	OnSessionDisconnected
	StoredSessions
)
View Source
const (
	MinHeaderSize          = 16        // Fixed header size without StreamID
	MaxStreamIDSize        = 256       // Maximum length for StreamID
	MinSequenceNum         = 1         // Minimum sequence number
	MaxSequenceNum  uint64 = 1<<64 - 1 // Maximum sequence number (uint64 max)
)

Variables

View Source
var (
	ErrSessionActive    = &SessionError{BridgeError: BridgeError{Message: "session is already active"}}
	ErrSessionNotFound  = &SessionError{BridgeError: BridgeError{Message: "session not found"}}
	ErrInvalidSession   = &SessionError{BridgeError: BridgeError{Message: "invalid session"}}
	ErrSessionSuspended = &SessionError{BridgeError: BridgeError{Message: "session is suspended"}}
	ErrUnauthorized     = &SessionError{BridgeError: BridgeError{Message: "unauthorized operation"}}
	ErrConnectionFailed = &SessionError{BridgeError: BridgeError{Message: "failed to create connection"}}

	// Additional error types
	ErrGeneric        = &SessionError{BridgeError: BridgeError{Message: "error"}}
	ErrSessionClosed  = &SessionError{BridgeError: BridgeError{Message: "session is closed"}}
	ErrInvalidState   = &SessionError{BridgeError: BridgeError{Message: "invalid session state"}}
	ErrSessionExpired = &SessionError{BridgeError: BridgeError{Message: "session has expired"}}
	ErrMaxSessions    = &SessionError{BridgeError: BridgeError{Message: "maximum number of sessions reached"}}
)

Specific error types for different scenarios

View Source
var (
	MaxFragmentSize = 10 * 1024 // 10KB per fragment
	HeaderSize      = 16        // Fixed header size

)
View Source
var (
	// ErrInvalidConfigType indicates a different Type of config value was expected to what was received.
	ErrInvalidConfigType = errors.New("invalid config type provided")
)

Functions

func BuildTopicPath

func BuildTopicPath(pkg, service, method, sessionID, direction string) string

BuildTopicPath creates an MQTT topic path for a given service and method

func CopyBytes added in v0.1.3

func CopyBytes(b []byte) []byte

#nosec G103 CopyBytes copies a slice to make it immutable

func CopyString added in v0.1.3

func CopyString(s string) string

CopyString copies a string to make it immutable

func NewBridgeError added in v0.1.4

func NewBridgeError(op, message string, err error) error

Helper function to create a generic bridge error

func NewConnectionFailedError added in v0.1.4

func NewConnectionFailedError(sessionID string, err error) error

func NewInvalidSessionError added in v0.1.4

func NewInvalidSessionError(sessionID string) error

func NewInvalidStateError added in v0.1.4

func NewInvalidStateError(sessionID string) error

NewInvalidStateError creates a new error for invalid session state

func NewLargeMessage

func NewLargeMessage(size int) *bytes.Buffer

Add a new helper for creating large messages

func NewSessionActiveError added in v0.1.4

func NewSessionActiveError(sessionID string) error

Error constructors for common operations

func NewSessionNotFoundError added in v0.1.4

func NewSessionNotFoundError(sessionID string) error

func NewSessionSuspendedError added in v0.1.4

func NewSessionSuspendedError(sessionID string) error

func NewUnauthorizedError added in v0.1.4

func NewUnauthorizedError(sessionID string) error

func ParseTopicPath

func ParseTopicPath(topic string) (pkg, service, method, sessionID, direction string, err error)

ParseTopicPath extracts components from an MQTT topic path

func ReleaseLargeMessage

func ReleaseLargeMessage(buf *bytes.Buffer)

Add a helper to return large message buffers to the pool

func SetMaxFragmentSize

func SetMaxFragmentSize(size int)

func UnsafeBytes added in v0.1.3

func UnsafeBytes(s string) []byte

UnsafeBytes returns a byte pointer without allocation.

func UnsafeString added in v0.1.3

func UnsafeString(b []byte) string

UnsafeString returns a string pointer without allocation

Types

type BridgeConfig added in v0.1.3

type BridgeConfig struct {
	// contains filtered or unexported fields
}

BridgeConfig holds bridge-specific configuration

type BridgeError

type BridgeError struct {
	Op      string // Operation that failed
	Message string // Human-readable error message
	Err     error  // Underlying error if any
}

BridgeError represents a base error type for bridge-related errors

func (*BridgeError) Error

func (e *BridgeError) Error() string

func (*BridgeError) Is added in v0.1.4

func (e *BridgeError) Is(target error) bool

Is implements the interface for errors.Is functionality

func (*BridgeError) Unwrap

func (e *BridgeError) Unwrap() error

type BridgeHook added in v0.1.3

type BridgeHook interface {
	// ID returns the unique identifier for this hook
	ID() string

	// Init initializes the hook with the provided configuration
	Init(config any) error

	// Stop gracefully stops the hook
	Stop() error

	// Provides indicates whether this hook provides the specified functionality
	Provides(b byte) bool

	// SetOpts is called by the server to propagate internal values
	SetOpts(l *zap.Logger, o *HookOptions)

	// OnMessageReceived processes incoming messages
	OnMessageReceived(msg []byte) []byte

	// OnSessionCreated is called when a new session is created
	OnSessionCreated(session *SessionInfo) error

	// OnSessionResumed is called when a session is resumed
	OnSessionResumed(session *SessionInfo) error

	// OnSessionSuspended is called when a session is suspended
	OnSessionSuspended(session *SessionInfo) error

	// OnSessionDisconnected is called when a session is disconnected
	OnSessionDisconnected(session *SessionInfo) error

	// StoredSessions returns stored sessions
	StoredSessions() ([]SessionInfo, error)
}

BridgeHook defines the interface for bridge hooks

type BridgeHookBase added in v0.1.4

type BridgeHookBase struct {
	BridgeHook
	Log  *zap.Logger
	Opts *HookOptions
}

BridgeHookBase provides a set of default methods for each hook

func (*BridgeHookBase) ID added in v0.1.4

func (h *BridgeHookBase) ID() string

ID returns the ID of the hook

func (*BridgeHookBase) Init added in v0.1.4

func (h *BridgeHookBase) Init(config any) error

Init initializes the hook

func (*BridgeHookBase) OnMessageReceived added in v0.1.4

func (h *BridgeHookBase) OnMessageReceived(msg []byte) []byte

OnMessageReceived processes incoming messages

func (*BridgeHookBase) OnSessionCreated added in v0.1.4

func (h *BridgeHookBase) OnSessionCreated(session *SessionInfo) error

OnSessionCreated is called when a new session is created

func (*BridgeHookBase) OnSessionDisconnected added in v0.1.4

func (h *BridgeHookBase) OnSessionDisconnected(session *SessionInfo) error

OnSessionDisconnected is called when a session is disconnected

func (*BridgeHookBase) OnSessionResumed added in v0.1.4

func (h *BridgeHookBase) OnSessionResumed(session *SessionInfo) error

OnSessionResumed is called when a session is resumed

func (*BridgeHookBase) OnSessionSuspended added in v0.1.4

func (h *BridgeHookBase) OnSessionSuspended(session *SessionInfo) error

OnSessionSuspended is called when a session is suspended

func (*BridgeHookBase) Provides added in v0.1.4

func (h *BridgeHookBase) Provides(b byte) bool

Provides indicates which methods a hook provides

func (*BridgeHookBase) SetOpts added in v0.1.4

func (h *BridgeHookBase) SetOpts(l *zap.Logger, opts *HookOptions)

SetOpts sets the options for the hook

func (*BridgeHookBase) Stop added in v0.1.4

func (h *BridgeHookBase) Stop() error

Stop stops the hook

func (*BridgeHookBase) StoredSessions added in v0.1.4

func (h *BridgeHookBase) StoredSessions() ([]SessionInfo, error)

StoredSessions returns stored sessions

type BridgeHooks added in v0.1.3

type BridgeHooks struct {
	sync.Mutex
	// contains filtered or unexported fields
}

BridgeHooks manages a collection of hooks

func (*BridgeHooks) Add added in v0.1.3

func (h *BridgeHooks) Add(hook BridgeHook, config any) error

Add adds a new hook to the collection

func (*BridgeHooks) GetAll added in v0.1.3

func (h *BridgeHooks) GetAll() []BridgeHook

GetAll returns a slice of all the hooks.

func (*BridgeHooks) Len added in v0.1.3

func (h *BridgeHooks) Len() int64

Len returns the number of hooks added.

func (*BridgeHooks) OnMessageReceived added in v0.1.3

func (h *BridgeHooks) OnMessageReceived(msg []byte) []byte

OnMessageReceived processes a message through all hooks

func (*BridgeHooks) OnSessionCreated added in v0.1.4

func (h *BridgeHooks) OnSessionCreated(session *SessionInfo) error

OnSessionCreated calls the OnSessionCreated hook for all hooks that provide it

func (*BridgeHooks) OnSessionDisconnected added in v0.1.4

func (h *BridgeHooks) OnSessionDisconnected(session *SessionInfo) error

OnSessionDisconnected calls the OnSessionDisconnected hook for all hooks that provide it

func (*BridgeHooks) OnSessionResumed added in v0.1.4

func (h *BridgeHooks) OnSessionResumed(session *SessionInfo) error

OnSessionResumed calls the OnSessionResumed hook for all hooks that provide it

func (*BridgeHooks) OnSessionSuspended added in v0.1.4

func (h *BridgeHooks) OnSessionSuspended(session *SessionInfo) error

OnSessionSuspended calls the OnSessionSuspended hook for all hooks that provide it

func (*BridgeHooks) Provides added in v0.1.3

func (h *BridgeHooks) Provides(b ...byte) bool

Provides returns true if any one hook provides any of the requested hook methods.

func (*BridgeHooks) Stop added in v0.1.3

func (h *BridgeHooks) Stop()

Stop stops all hooks

func (*BridgeHooks) StoredSessions added in v0.1.4

func (h *BridgeHooks) StoredSessions() ([]SessionInfo, error)

StoredSessions calls the StoredSessions hook for all hooks that provide it

type BridgeOption added in v0.1.3

type BridgeOption func(*BridgeConfig)

BridgeOption configures bridge behavior

func WithCleanUpInterval added in v0.1.4

func WithCleanUpInterval(interval time.Duration) BridgeOption

func WithLogger

func WithLogger(logger *zap.Logger) BridgeOption

WithLogger sets the logger for the bridge

func WithMQTTClient

func WithMQTTClient(client mqtt.Client) BridgeOption

WithMQTTClient sets the MQTT client for the bridge

func WithMaxConnections added in v0.1.4

func WithMaxConnections(max int) BridgeOption

WithMaxConnections sets the maximum number of concurrent connections

func WithQoS added in v0.1.3

func WithQoS(qos byte) BridgeOption

WithQoS sets the MQTT QoS level for the bridge

func WithRateBurst added in v0.1.4

func WithRateBurst(burst int) BridgeOption

WithRateBurst sets the burst size for rate limiting

func WithRateLimit added in v0.1.4

func WithRateLimit(ops float64) BridgeOption

WithRateLimit sets the rate limit for operations

func WithRootTopic added in v0.1.3

func WithRootTopic(topic string) BridgeOption

WithRootTopic sets the root topic for the bridge

type BridgeSessionState added in v0.1.3

type BridgeSessionState int

BridgeSessionState represents the current state of a bridge session

const (
	BridgeSessionStateActive BridgeSessionState = iota
	BridgeSessionStateSuspended
	BridgeSessionStateClosed
)

func (BridgeSessionState) String added in v0.1.3

func (s BridgeSessionState) String() string

String returns the string representation of BridgeSessionState

type Frame

type Frame struct {
	Header *Header
	Data   []byte
}

Frame represents a complete protocol frame including header and payload

func FrameMessage

func FrameMessage(data []byte, seqNum uint64, msgType MessageType) []Frame

FrameMessage splits a large message into frames

func NewFrame

func NewFrame(msgType MessageType, seqNum uint64, data []byte, opts ...FrameOption) (*Frame, error)

NewFrame creates a new Frame with validated parameters

func UnmarshalFrame

func UnmarshalFrame(data []byte) (Frame, error)

UnmarshalFrame converts a byte slice into a Frame

func (*Frame) Marshal

func (f *Frame) Marshal() []byte

MarshalFrame converts a Frame into a single byte slice ready for transmission

type FrameOption

type FrameOption func(*Frame) error

FrameOption defines options for frame creation

func WithFragmentation

func WithFragmentation(fragmentID uint16, total uint16, seq uint16, isLast bool) FrameOption

WithFragmentation sets fragmentation parameters for a frame

func WithStreamID

func WithStreamID(streamID string) FrameOption

WithStreamID sets the StreamID for a frame

type Header struct {
	Type           MessageType // 1 byte
	SequenceNumber uint64      // 8 bytes
	FragmentID     uint16      // 2 bytes
	FragmentTotal  uint16      // 2 bytes
	FragmentSeq    uint16      // 2 bytes
	IsLastFragment bool        // 1 byte
	StreamID       string      // Variable length - we'll need to adjust marshaling/unmarshaling
}

type HookLoadConfig added in v0.1.4

type HookLoadConfig struct {
	Hook   BridgeHook
	Config any
}

HookLoadConfig contains the hook and configuration as loaded from a configuration (usually file).

type HookOptions added in v0.1.4

type HookOptions struct {
}

HookOptions contains values which are inherited from the server on initialisation.

type ISessionStore added in v0.1.4

type ISessionStore interface {
	GetStoredSessions() (map[string]*SessionInfo, error)
}

ISessionStore defines the interface for session storage providers

type MQTTAddr

type MQTTAddr struct {
	// contains filtered or unexported fields
}

MQTTAddr implements net.Addr for MQTT connections

func (*MQTTAddr) Network

func (a *MQTTAddr) Network() string

func (*MQTTAddr) String

func (a *MQTTAddr) String() string

type MQTTBridge

type MQTTBridge struct {
	// contains filtered or unexported fields
}

func NewMQTTBridge

func NewMQTTBridge(mqttClient mqtt.Client, logger *zap.Logger, timeout time.Duration) *MQTTBridge

func (*MQTTBridge) GetServiceInfo

func (b *MQTTBridge) GetServiceInfo() map[string]grpc.ServiceInfo

GetServiceInfo implements grpc.ServiceInfoProvider

func (*MQTTBridge) RegisterService

func (b *MQTTBridge) RegisterService(desc *grpc.ServiceDesc, impl interface{})

RegisterService implements grpc.ServiceRegistrar

type MQTTNetBridge

type MQTTNetBridge struct {
	// contains filtered or unexported fields
}

MQTTNetBridge implements net.Listener over MQTT

func NewMQTTNetBridge

func NewMQTTNetBridge(mqttClient mqtt.Client, bridgeID string, opts ...BridgeOption) *MQTTNetBridge

NewMQTTNetBridge creates a new bridge that listens on a specific bridgeID

func (*MQTTNetBridge) Accept

func (b *MQTTNetBridge) Accept() (net.Conn, error)

Accept implements net.Listener.Accept

func (*MQTTNetBridge) AddHook added in v0.1.3

func (b *MQTTNetBridge) AddHook(hook BridgeHook, config any) error

AddHook adds a new hook to the bridge

func (*MQTTNetBridge) Addr

func (b *MQTTNetBridge) Addr() net.Addr

Addr implements net.Listener.Addr

func (*MQTTNetBridge) Build added in v0.1.1

func (*MQTTNetBridge) CleanupStaleSessions added in v0.1.3

func (b *MQTTNetBridge) CleanupStaleSessions(timeout time.Duration)

CleanupStaleSessions removes sessions that have been suspended longer than the timeout

func (*MQTTNetBridge) Close

func (b *MQTTNetBridge) Close() error

Close implements net.Listener.Close

func (*MQTTNetBridge) Dial

func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string, opts ...SessionOption) (net.Conn, error)

Dial creates a new connection to a specific bridge

func (*MQTTNetBridge) DisconnectSession added in v0.1.4

func (b *MQTTNetBridge) DisconnectSession(sessionID string) error

DisconnectSession disconnects an active session and cleans it up

func (*MQTTNetBridge) HandleLifecycleMessage added in v0.1.4

func (b *MQTTNetBridge) HandleLifecycleMessage(payload []byte, topic string)

HandleLifecycleMessage processes lifecycle messages for sessions

func (*MQTTNetBridge) ListenOnUnixSocket added in v0.1.3

func (b *MQTTNetBridge) ListenOnUnixSocket(path string, addr string) error

listens for connections on a unix socket and proxies them to the bridge

func (*MQTTNetBridge) ResumeSession added in v0.1.3

func (b *MQTTNetBridge) ResumeSession(ctx context.Context, targetBridgeID, sessionID string) (net.Conn, error)

ResumeSession attempts to resume a suspended session

func (*MQTTNetBridge) Scheme added in v0.1.1

func (b *MQTTNetBridge) Scheme() string

func (*MQTTNetBridge) SuspendSession added in v0.1.3

func (b *MQTTNetBridge) SuspendSession(sessionID string) error

SuspendSession suspends an active session for later resumption

func (*MQTTNetBridge) WriteOnUnixSocket added in v0.1.3

func (b *MQTTNetBridge) WriteOnUnixSocket(path string, addr string) (net.Conn, error)

type MQTTNetBridgeConn

type MQTTNetBridgeConn struct {
	// contains filtered or unexported fields
}

MQTTNetBridgeConn implements net.Conn over MQTT

func (*MQTTNetBridgeConn) Close

func (c *MQTTNetBridgeConn) Close() error

func (*MQTTNetBridgeConn) LocalAddr

func (c *MQTTNetBridgeConn) LocalAddr() net.Addr

func (*MQTTNetBridgeConn) Read

func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)

Implement net.Conn interface stubs (we'll flesh these out next)

func (*MQTTNetBridgeConn) RemoteAddr

func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr

func (*MQTTNetBridgeConn) SessionID added in v0.1.4

func (c *MQTTNetBridgeConn) SessionID() string

func (*MQTTNetBridgeConn) SetDeadline

func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error

func (*MQTTNetBridgeConn) SetReadDeadline

func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error

func (*MQTTNetBridgeConn) SetWriteDeadline

func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error

func (*MQTTNetBridgeConn) Write

func (c *MQTTNetBridgeConn) Write(b []byte) (n int, err error)

type MessageType

type MessageType uint8
const (
	MessageTypeData MessageType = iota
	MessageTypeStreamInit
	MessageTypeStreamEnd
	MessageTypeHeader
	MessageTypeError
)

type Session

type Session struct {
	ID          string
	ServiceName string
	LastActive  time.Time
	State       SessionState
	// contains filtered or unexported fields
}

Session represents an active MQTT-GRPC bridge session

func NewSession

func NewSession(id, serviceName string, eventHandler SessionEventHandler) *Session

NewSession creates a new session with proper initialization

func (*Session) AddStream

func (s *Session) AddStream(methodName string) (*StreamContext, error)

AddStream adds a new stream to the session

func (*Session) Close

func (s *Session) Close()

Close closes all streams and the session itself

func (*Session) CloseStream

func (s *Session) CloseStream(streamID string) error

CloseStream gracefully closes a stream

func (*Session) UpdateActivity

func (s *Session) UpdateActivity(streamID string)

UpdateActivity updates the last active timestamp for the session and stream

type SessionConfig added in v0.1.3

type SessionConfig struct {
	SessionID   string
	State       BridgeSessionState
	Timeout     time.Duration
	DialTimeout time.Duration
}

SessionConfig holds session-specific configuration

type SessionError added in v0.1.4

type SessionError struct {
	BridgeError
	SessionID string
}

SessionError represents session-specific errors

func (*SessionError) Error added in v0.1.4

func (e *SessionError) Error() string

func (*SessionError) Is added in v0.1.4

func (e *SessionError) Is(target error) bool

Is implements the interface for errors.Is functionality

type SessionEvent

type SessionEvent int

SessionEvent represents different session lifecycle events

const (
	SessionEventCreated SessionEvent = iota
	SessionEventActivated
	SessionEventStreamStarted
	SessionEventStreamEnded
	SessionEventClosing
	SessionEventClosed
	SessionEventTimeout
)

type SessionEventHandler

type SessionEventHandler func(session *Session, event SessionEvent)

SessionEventHandler is called when session state changes occur

type SessionInfo added in v0.1.3

type SessionInfo struct {
	ID            string
	ClientID      string // Current/last client that owns this session
	State         BridgeSessionState
	LastActive    time.Time
	LastSuspended time.Time
	Connection    *MQTTNetBridgeConn
	Metadata      map[string]string
	Timeout       time.Duration
}

SessionInfo tracks session state and metadata

type SessionManager added in v0.1.4

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager handles session lifecycle and state management

func NewSessionManager added in v0.1.4

func NewSessionManager(bridge *MQTTNetBridge, logger *zap.Logger, cleanUpInterval time.Duration) *SessionManager

NewSessionManager creates a new session manager

func (*SessionManager) AddSession added in v0.1.4

func (sm *SessionManager) AddSession(sessionID string, session *SessionInfo)

AddSession adds a new session to the manager

func (*SessionManager) CleanupStaleSessions added in v0.1.4

func (sm *SessionManager) CleanupStaleSessions(defaultTimeout time.Duration)

CleanupStaleSessions removes sessions that have been suspended longer than the timeout

func (*SessionManager) Close added in v0.1.4

func (sm *SessionManager) Close()

Close stops the session manager and cleans up resources

func (*SessionManager) CreateSession added in v0.1.4

func (sm *SessionManager) CreateSession(sessionID, clientID string, timeout time.Duration) *SessionInfo

CreateSession creates a new session

func (*SessionManager) DisconnectSession added in v0.1.4

func (sm *SessionManager) DisconnectSession(sessionID string) error

DisconnectSession disconnects and cleans up a session

func (*SessionManager) GetAllSessions added in v0.1.4

func (sm *SessionManager) GetAllSessions() map[string]*SessionInfo

GetAllSessions returns all sessions

func (*SessionManager) GetSession added in v0.1.4

func (sm *SessionManager) GetSession(sessionID string) (*SessionInfo, bool)

GetSession retrieves a session by ID

func (*SessionManager) HandleConnectionEstablished added in v0.1.4

func (sm *SessionManager) HandleConnectionEstablished(sessionID string, conn *MQTTNetBridgeConn, clientID string, timeout time.Duration) error

HandleConnectionEstablished handles a new or resumed connection

func (*SessionManager) HandleDisconnect added in v0.1.4

func (sm *SessionManager) HandleDisconnect(clientID, sessionID string) error

HandleDisconnect processes a disconnect message for a session

func (*SessionManager) HandleLifecycleMessage added in v0.1.4

func (sm *SessionManager) HandleLifecycleMessage(payload []byte, topic string)

HandleLifecycleMessage processes lifecycle messages for sessions

func (*SessionManager) HandleSessionError added in v0.1.4

func (sm *SessionManager) HandleSessionError(sessionID string, errorType string) error

HandleSessionError processes session error messages

func (*SessionManager) RemoveSession added in v0.1.4

func (sm *SessionManager) RemoveSession(sessionID string)

RemoveSession removes a session from the manager

func (*SessionManager) ResumeSession added in v0.1.4

func (sm *SessionManager) ResumeSession(sessionID string) error

ResumeSession attempts to resume a suspended session

func (*SessionManager) SuspendSession added in v0.1.4

func (sm *SessionManager) SuspendSession(sessionID string, clientID string) error

SuspendSession suspends an active session

func (*SessionManager) UpdateStore added in v0.1.4

func (sm *SessionManager) UpdateStore(store ISessionStore) error

UpdateStore updates the session store and loads sessions from it

type SessionOption added in v0.1.3

type SessionOption func(*SessionConfig)

SessionOption configures session behavior

func WithDialTimeout added in v0.1.4

func WithDialTimeout(timeout time.Duration) SessionOption

func WithSessionID added in v0.1.3

func WithSessionID(sessionID string) SessionOption

WithSessionID sets a specific session ID for connection

func WithSessionState added in v0.1.3

func WithSessionState(state BridgeSessionState) SessionOption

WithSessionState sets the initial session state

func WithSessionTimeout added in v0.1.4

func WithSessionTimeout(timeout time.Duration) SessionOption

type SessionState

type SessionState int
const (
	SessionStateActive SessionState = iota
	SessionStateClosing
	SessionStateClosed
)

type StreamContext

type StreamContext struct {
	StreamID   string // Usually method name + unique identifier
	Method     string
	Created    time.Time
	LastActive time.Time
	State      StreamState
	Cancel     context.CancelFunc // For cancelling individual streams
	Context    context.Context
	Extra      interface{} // For storing the serverStream
}

StreamContext holds the context for an active stream

type StreamState

type StreamState int
const (
	StreamStateActive StreamState = iota
	StreamStateClosing
	StreamStateClosed
)

Directories

Path Synopsis
pty/client command
pty/server command
hooks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL