Documentation
¶
Index ¶
- Constants
- Variables
- func BuildTopicPath(pkg, service, method, sessionID, direction string) string
- func CopyBytes(b []byte) []byte
- func CopyString(s string) string
- func NewLargeMessage(size int) *bytes.Buffer
- func ParseTopicPath(topic string) (pkg, service, method, sessionID, direction string, err error)
- func ReleaseLargeMessage(buf *bytes.Buffer)
- func SetMaxFragmentSize(size int)
- func UnsafeBytes(s string) []byte
- func UnsafeString(b []byte) string
- type BridgeConfig
- type BridgeError
- type BridgeHook
- type BridgeHooks
- type BridgeOption
- type BridgeSessionState
- type Frame
- type FrameOption
- type Header
- type MQTTAddr
- type MQTTBridge
- type MQTTNetBridge
- func (b *MQTTNetBridge) Accept() (net.Conn, error)
- func (b *MQTTNetBridge) AddHook(hook BridgeHook, config any) error
- func (b *MQTTNetBridge) Addr() net.Addr
- func (b *MQTTNetBridge) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error)
- func (b *MQTTNetBridge) CleanupStaleSessions(timeout time.Duration)
- func (b *MQTTNetBridge) Close() error
- func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string, opts ...SessionOption) (net.Conn, error)
- func (b *MQTTNetBridge) ListenOnUnixSocket(path string, addr string) error
- func (b *MQTTNetBridge) ResumeSession(ctx context.Context, targetBridgeID, sessionID string) (net.Conn, error)
- func (b *MQTTNetBridge) Scheme() string
- func (b *MQTTNetBridge) SuspendSession(sessionID string) error
- func (b *MQTTNetBridge) WriteOnUnixSocket(path string, addr string) (net.Conn, error)
- type MQTTNetBridgeConn
- func (c *MQTTNetBridgeConn) Close() error
- func (c *MQTTNetBridgeConn) LocalAddr() net.Addr
- func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)
- func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr
- func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) Write(b []byte) (n int, err error)
- type MessageType
- type Session
- type SessionConfig
- type SessionEvent
- type SessionEventHandler
- type SessionInfo
- type SessionOption
- type SessionState
- type StreamContext
- type StreamState
Constants ¶
const ( MinHeaderSize = 16 // Fixed header size without StreamID MaxStreamIDSize = 256 // Maximum length for StreamID MinSequenceNum = 1 // Minimum sequence number MaxSequenceNum uint64 = 1<<64 - 1 // Maximum sequence number (uint64 max) )
const (
OnMessageReceived byte = iota
)
Variables ¶
var ( MaxFragmentSize = 10 * 1024 // 10KB per fragment HeaderSize = 16 // Fixed header size )
Functions ¶
func BuildTopicPath ¶
BuildTopicPath creates an MQTT topic path for a given service and method
func CopyString ¶ added in v0.1.3
CopyString copies a string to make it immutable
func NewLargeMessage ¶
Add a new helper for creating large messages
func ParseTopicPath ¶
ParseTopicPath extracts components from an MQTT topic path
func ReleaseLargeMessage ¶
Add a helper to return large message buffers to the pool
func SetMaxFragmentSize ¶
func SetMaxFragmentSize(size int)
func UnsafeBytes ¶ added in v0.1.3
UnsafeBytes returns a byte pointer without allocation.
func UnsafeString ¶ added in v0.1.3
UnsafeString returns a string pointer without allocation
Types ¶
type BridgeConfig ¶ added in v0.1.3
type BridgeConfig struct {
// contains filtered or unexported fields
}
BridgeConfig holds bridge-specific configuration
type BridgeError ¶
func (*BridgeError) Error ¶
func (e *BridgeError) Error() string
func (*BridgeError) Unwrap ¶
func (e *BridgeError) Unwrap() error
type BridgeHook ¶ added in v0.1.3
type BridgeHook interface {
OnMessageReceived(msg []byte) []byte
Provides(b byte) bool
Init(config any) error
Stop() error
ID() string
}
BridgeHook provides an interface for handling bridge-related events.
type BridgeHooks ¶ added in v0.1.3
BridgeHooks manages a collection of bridge hooks.
func (*BridgeHooks) Add ¶ added in v0.1.3
func (h *BridgeHooks) Add(hook BridgeHook, config any) error
Add adds and initializes a new hook.
func (*BridgeHooks) GetAll ¶ added in v0.1.3
func (h *BridgeHooks) GetAll() []BridgeHook
GetAll returns a slice of all the hooks.
func (*BridgeHooks) Len ¶ added in v0.1.3
func (h *BridgeHooks) Len() int64
Len returns the number of hooks added.
func (*BridgeHooks) OnMessageReceived ¶ added in v0.1.3
func (h *BridgeHooks) OnMessageReceived(msg []byte) []byte
OnMessageReceived is called when a message is received from a bridge connection.
func (*BridgeHooks) Provides ¶ added in v0.1.3
func (h *BridgeHooks) Provides(b ...byte) bool
Provides returns true if any one hook provides any of the requested hook methods.
func (*BridgeHooks) Stop ¶ added in v0.1.3
func (h *BridgeHooks) Stop()
Stop indicates all attached hooks to gracefully end.
type BridgeOption ¶ added in v0.1.3
type BridgeOption func(*BridgeConfig)
BridgeOption configures bridge behavior
func WithLogger ¶
func WithLogger(logger *zap.Logger) BridgeOption
WithLogger sets the logger for the bridge
func WithMQTTClient ¶
func WithMQTTClient(client mqtt.Client) BridgeOption
WithMQTTClient sets the MQTT client for the bridge
func WithQoS ¶ added in v0.1.3
func WithQoS(qos byte) BridgeOption
WithQoS sets the MQTT QoS level for the bridge
func WithRootTopic ¶ added in v0.1.3
func WithRootTopic(topic string) BridgeOption
WithRootTopic sets the root topic for the bridge
type BridgeSessionState ¶ added in v0.1.3
type BridgeSessionState int
BridgeSessionState represents the current state of a bridge session
const ( BridgeSessionStateActive BridgeSessionState = iota BridgeSessionStateSuspended BridgeSessionStateClosed )
func (BridgeSessionState) String ¶ added in v0.1.3
func (s BridgeSessionState) String() string
String returns the string representation of BridgeSessionState
type Frame ¶
Frame represents a complete protocol frame including header and payload
func FrameMessage ¶
func FrameMessage(data []byte, seqNum uint64, msgType MessageType) []Frame
FrameMessage splits a large message into frames
func NewFrame ¶
func NewFrame(msgType MessageType, seqNum uint64, data []byte, opts ...FrameOption) (*Frame, error)
NewFrame creates a new Frame with validated parameters
func UnmarshalFrame ¶
UnmarshalFrame converts a byte slice into a Frame
type FrameOption ¶
FrameOption defines options for frame creation
func WithFragmentation ¶
func WithFragmentation(fragmentID uint16, total uint16, seq uint16, isLast bool) FrameOption
WithFragmentation sets fragmentation parameters for a frame
func WithStreamID ¶
func WithStreamID(streamID string) FrameOption
WithStreamID sets the StreamID for a frame
type MQTTAddr ¶
type MQTTAddr struct {
// contains filtered or unexported fields
}
MQTTAddr implements net.Addr for MQTT connections
type MQTTBridge ¶
type MQTTBridge struct {
// contains filtered or unexported fields
}
func NewMQTTBridge ¶
func (*MQTTBridge) GetServiceInfo ¶
func (b *MQTTBridge) GetServiceInfo() map[string]grpc.ServiceInfo
GetServiceInfo implements grpc.ServiceInfoProvider
func (*MQTTBridge) RegisterService ¶
func (b *MQTTBridge) RegisterService(desc *grpc.ServiceDesc, impl interface{})
RegisterService implements grpc.ServiceRegistrar
type MQTTNetBridge ¶
type MQTTNetBridge struct {
// contains filtered or unexported fields
}
MQTTNetBridge implements net.Listener over MQTT
func NewMQTTNetBridge ¶
func NewMQTTNetBridge(mqttClient mqtt.Client, bridgeID string, opts ...BridgeOption) *MQTTNetBridge
NewMQTTNetBridge creates a new bridge that listens on a specific bridgeID
func (*MQTTNetBridge) Accept ¶
func (b *MQTTNetBridge) Accept() (net.Conn, error)
Accept implements net.Listener.Accept
func (*MQTTNetBridge) AddHook ¶ added in v0.1.3
func (b *MQTTNetBridge) AddHook(hook BridgeHook, config any) error
AddHook adds a new hook to the bridge
func (*MQTTNetBridge) Addr ¶
func (b *MQTTNetBridge) Addr() net.Addr
Addr implements net.Listener.Addr
func (*MQTTNetBridge) Build ¶ added in v0.1.1
func (b *MQTTNetBridge) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error)
func (*MQTTNetBridge) CleanupStaleSessions ¶ added in v0.1.3
func (b *MQTTNetBridge) CleanupStaleSessions(timeout time.Duration)
CleanupStaleSessions removes sessions that have been suspended for longer than the timeout
func (*MQTTNetBridge) Close ¶
func (b *MQTTNetBridge) Close() error
Close implements net.Listener.Close
func (*MQTTNetBridge) Dial ¶
func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string, opts ...SessionOption) (net.Conn, error)
Dial creates a new connection to a specific bridge
func (*MQTTNetBridge) ListenOnUnixSocket ¶ added in v0.1.3
func (b *MQTTNetBridge) ListenOnUnixSocket(path string, addr string) error
listens for connections on a unix socket and proxies them to the bridge
func (*MQTTNetBridge) ResumeSession ¶ added in v0.1.3
func (b *MQTTNetBridge) ResumeSession(ctx context.Context, targetBridgeID, sessionID string) (net.Conn, error)
ResumeSession attempts to resume a suspended session that may still exist on the server bridge (not cleaned up yet)
func (*MQTTNetBridge) Scheme ¶ added in v0.1.1
func (b *MQTTNetBridge) Scheme() string
func (*MQTTNetBridge) SuspendSession ¶ added in v0.1.3
func (b *MQTTNetBridge) SuspendSession(sessionID string) error
SuspendSession suspends an active session for later resumption
func (*MQTTNetBridge) WriteOnUnixSocket ¶ added in v0.1.3
type MQTTNetBridgeConn ¶
type MQTTNetBridgeConn struct {
// contains filtered or unexported fields
}
MQTTNetBridgeConn implements net.Conn over MQTT
func (*MQTTNetBridgeConn) Close ¶
func (c *MQTTNetBridgeConn) Close() error
func (*MQTTNetBridgeConn) LocalAddr ¶
func (c *MQTTNetBridgeConn) LocalAddr() net.Addr
func (*MQTTNetBridgeConn) Read ¶
func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)
Implement net.Conn interface stubs (we'll flesh these out next)
func (*MQTTNetBridgeConn) RemoteAddr ¶
func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr
func (*MQTTNetBridgeConn) SetDeadline ¶
func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error
func (*MQTTNetBridgeConn) SetReadDeadline ¶
func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error
func (*MQTTNetBridgeConn) SetWriteDeadline ¶
func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error
type MessageType ¶
type MessageType uint8
const ( MessageTypeData MessageType = iota MessageTypeStreamInit MessageTypeStreamEnd MessageTypeHeader MessageTypeError )
type Session ¶
type Session struct {
ID string
ServiceName string
LastActive time.Time
State SessionState
// contains filtered or unexported fields
}
Session represents an active MQTT-GRPC bridge session
func NewSession ¶
func NewSession(id, serviceName string, eventHandler SessionEventHandler) *Session
NewSession creates a new session with proper initialization
func (*Session) AddStream ¶
func (s *Session) AddStream(methodName string) (*StreamContext, error)
AddStream adds a new stream to the session
func (*Session) CloseStream ¶
CloseStream gracefully closes a stream
func (*Session) UpdateActivity ¶
UpdateActivity updates the last active timestamp for the session and stream
type SessionConfig ¶ added in v0.1.3
type SessionConfig struct {
SessionID string
State BridgeSessionState
}
SessionConfig holds session-specific configuration
type SessionEvent ¶
type SessionEvent int
SessionEvent represents different session lifecycle events
const ( SessionEventCreated SessionEvent = iota SessionEventActivated SessionEventStreamStarted SessionEventStreamEnded SessionEventClosing SessionEventClosed SessionEventTimeout )
type SessionEventHandler ¶
type SessionEventHandler func(session *Session, event SessionEvent)
SessionEventHandler is called when session state changes occur
type SessionInfo ¶ added in v0.1.3
type SessionInfo struct {
ID string
ClientID string // Current/last client that owns this session
State BridgeSessionState
LastActive time.Time
LastSuspended time.Time
Connection *MQTTNetBridgeConn
Metadata map[string]string
}
SessionInfo tracks session state and metadata
type SessionOption ¶ added in v0.1.3
type SessionOption func(*SessionConfig)
SessionOption configures session behavior
func WithSessionID ¶ added in v0.1.3
func WithSessionID(sessionID string) SessionOption
WithSessionID sets a specific session ID for connection
func WithSessionState ¶ added in v0.1.3
func WithSessionState(state BridgeSessionState) SessionOption
WithSessionState sets the initial session state
type SessionState ¶
type SessionState int
const ( SessionStateActive SessionState = iota SessionStateClosing SessionStateClosed )
type StreamContext ¶
type StreamContext struct {
StreamID string // Usually method name + unique identifier
Method string
Created time.Time
LastActive time.Time
State StreamState
Cancel context.CancelFunc // For cancelling individual streams
Context context.Context
Extra interface{} // For storing the serverStream
}
StreamContext holds the context for an active stream
type StreamState ¶
type StreamState int
const ( StreamStateActive StreamState = iota StreamStateClosing StreamStateClosed )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
grpc_bridge/client
command
|
|
|
grpc_bridge/server
command
|
|
|
net_bridge/client
command
|
|
|
net_bridge/server
command
|
|
|
pty/client
command
|
|
|
pty/server
command
|
|