Documentation
¶
Index ¶
- Constants
- Variables
- func EnableCompression(enable bool)
- func EnableEncryption(enable bool, key []byte) error
- func Marshal(v any) ([]byte, error)
- func SendMessage(ctx context.Context, conn net.Conn, msg *Message) error
- func SetMarshaller(marshaller MarshallerFunc)
- func SetUnmarshaller(unmarshaller UnmarshallerFunc)
- func Unmarshal(data []byte, v any) error
- type Codec
- type CodecTestSuite
- type Config
- type ContentType
- type FragmentManager
- type HeartbeatManager
- func (hm *HeartbeatManager) GetStats() map[string]uint64
- func (hm *HeartbeatManager) IsRunning() bool
- func (hm *HeartbeatManager) RecordHeartbeat()
- func (hm *HeartbeatManager) SetInterval(interval time.Duration)
- func (hm *HeartbeatManager) SetOnFailure(fn func(error))
- func (hm *HeartbeatManager) SetTimeout(timeout time.Duration)
- func (hm *HeartbeatManager) Start()
- func (hm *HeartbeatManager) Stop()
- type Marshaller
- type MarshallerFunc
- type Message
- type MessageFlag
- type MessageType
- type MockConn
- func (m *MockConn) Close() error
- func (m *MockConn) LocalAddr() net.Addr
- func (m *MockConn) Read(b []byte) (n int, err error)
- func (m *MockConn) RemoteAddr() net.Addr
- func (m *MockConn) SetDeadline(t time.Time) error
- func (m *MockConn) SetReadDeadline(t time.Time) error
- func (m *MockConn) SetWriteDeadline(t time.Time) error
- func (m *MockConn) Write(b []byte) (n int, err error)
- type SerializationConfig
- type SerializationManager
- func (sm *SerializationManager) Marshal(v any) ([]byte, error)
- func (sm *SerializationManager) SetEncryptionKey(key []byte) error
- func (sm *SerializationManager) SetMarshaller(marshaller Marshaller)
- func (sm *SerializationManager) SetUnmarshaller(unmarshaller Unmarshaller)
- func (sm *SerializationManager) Unmarshal(data []byte, v any) error
- type Stats
- type Unmarshaller
- type UnmarshallerFunc
Constants ¶
const ( ProtocolVersion = uint8(1) MaxMessageSize = 64 * 1024 * 1024 // 64MB default limit MaxHeaderSize = 1024 * 1024 // 1MB header limit MaxQueueLength = 255 // Max queue name length FragmentationThreshold = 16 * 1024 * 1024 // Messages larger than 16MB will be fragmented FragmentSize = 8 * 1024 * 1024 // 8MB fragment size MaxFragments = 256 // Maximum fragments per message )
Protocol version for backward compatibility
Variables ¶
var ( ErrMessageTooLarge = errors.New("message exceeds maximum size") ErrInvalidMessage = errors.New("invalid message format") ErrInvalidQueue = errors.New("invalid queue name") ErrInvalidCommand = errors.New("invalid command") ErrConnectionClosed = errors.New("connection closed") ErrTimeout = errors.New("operation timeout") ErrProtocolMismatch = errors.New("protocol version mismatch") ErrFragmentationRequired = errors.New("message requires fragmentation") ErrInvalidFragment = errors.New("invalid message fragment") ErrFragmentTimeout = errors.New("timed out waiting for fragments") ErrFragmentMissing = errors.New("missing fragments in sequence") )
Error definitions
var ( ErrSerializationFailed = errors.New("serialization failed") ErrDeserializationFailed = errors.New("deserialization failed") ErrCompressionFailed = errors.New("compression failed") ErrDecompressionFailed = errors.New("decompression failed") ErrEncryptionFailed = errors.New("encryption failed") ErrDecryptionFailed = errors.New("decryption failed") ErrInvalidKey = errors.New("invalid encryption key") )
Error definitions for serialization
Functions ¶
func EnableCompression ¶ added in v0.0.17
func EnableCompression(enable bool)
func EnableEncryption ¶ added in v0.0.17
func SendMessage ¶
SendMessage Backward compatibility functions
func SetMarshaller ¶
func SetMarshaller(marshaller MarshallerFunc)
Global functions for backward compatibility
func SetUnmarshaller ¶
func SetUnmarshaller(unmarshaller UnmarshallerFunc)
Types ¶
type Codec ¶ added in v0.0.17
type Codec struct {
// contains filtered or unexported fields
}
Codec handles message encoding/decoding with configuration
func (*Codec) ReadMessage ¶ added in v0.0.17
ReadMessage reads a message WITHOUT any timeouts for persistent broker-consumer connections
func (*Codec) ResetStats ¶ added in v0.0.17
func (c *Codec) ResetStats()
ResetStats resets codec statistics
type CodecTestSuite ¶ added in v0.0.17
CodecTestSuite provides utilities for testing the codec
func NewCodecTestSuite ¶ added in v0.0.17
func NewCodecTestSuite() *CodecTestSuite
NewCodecTestSuite creates a new codec test suite
func (*CodecTestSuite) FragmentationTest ¶ added in v0.0.17
func (ts *CodecTestSuite) FragmentationTest(payload []byte) error
FragmentationTest tests the fragmentation and reassembly of large messages
func (*CodecTestSuite) SendReceiveTest ¶ added in v0.0.17
func (ts *CodecTestSuite) SendReceiveTest(msg *Message) error
SendReceiveTest tests sending and receiving a message
type Config ¶ added in v0.0.17
type Config struct {
MaxMessageSize uint32
MaxHeaderSize uint32
MaxQueueLength uint8
ReadTimeout time.Duration
WriteTimeout time.Duration
EnableCompression bool
BufferPoolSize int
}
Config holds codec configuration
func DefaultConfig ¶ added in v0.0.17
func DefaultConfig() *Config
DefaultConfig returns default configuration with NO timeouts for persistent connections
type ContentType ¶ added in v0.0.17
type ContentType string
ContentType represents the content type of serialized data
const ( ContentTypeJSON ContentType = "application/json" ContentTypeMsgPack ContentType = "application/msgpack" ContentTypeCBOR ContentType = "application/cbor" )
type FragmentManager ¶ added in v0.0.17
type FragmentManager struct {
// contains filtered or unexported fields
}
FragmentManager handles message fragmentation and reassembly
func NewFragmentManager ¶ added in v0.0.17
func NewFragmentManager(codec *Codec, config *Config) *FragmentManager
NewFragmentManager creates a new fragment manager
func (*FragmentManager) Stop ¶ added in v0.0.17
func (fm *FragmentManager) Stop()
Stop stops the fragment manager
type HeartbeatManager ¶ added in v0.0.17
type HeartbeatManager struct {
// contains filtered or unexported fields
}
HeartbeatManager manages heartbeat messages for connection health monitoring
func NewHeartbeatManager ¶ added in v0.0.17
func NewHeartbeatManager(codec *Codec, conn net.Conn) *HeartbeatManager
NewHeartbeatManager creates a new heartbeat manager
func (*HeartbeatManager) GetStats ¶ added in v0.0.17
func (hm *HeartbeatManager) GetStats() map[string]uint64
GetStats returns heartbeat statistics
func (*HeartbeatManager) IsRunning ¶ added in v0.0.17
func (hm *HeartbeatManager) IsRunning() bool
IsRunning returns whether the heartbeat manager is running
func (*HeartbeatManager) RecordHeartbeat ¶ added in v0.0.17
func (hm *HeartbeatManager) RecordHeartbeat()
RecordHeartbeat records a received heartbeat
func (*HeartbeatManager) SetInterval ¶ added in v0.0.17
func (hm *HeartbeatManager) SetInterval(interval time.Duration)
SetInterval sets the heartbeat interval
func (*HeartbeatManager) SetOnFailure ¶ added in v0.0.17
func (hm *HeartbeatManager) SetOnFailure(fn func(error))
SetOnFailure sets the callback function for heartbeat failures
func (*HeartbeatManager) SetTimeout ¶ added in v0.0.17
func (hm *HeartbeatManager) SetTimeout(timeout time.Duration)
SetTimeout sets the heartbeat timeout
func (*HeartbeatManager) Start ¶ added in v0.0.17
func (hm *HeartbeatManager) Start()
Start starts the heartbeat monitoring
func (*HeartbeatManager) Stop ¶ added in v0.0.17
func (hm *HeartbeatManager) Stop()
Stop stops the heartbeat monitoring
type Marshaller ¶ added in v0.0.17
type Marshaller interface {
Marshal(v any) ([]byte, error)
ContentType() ContentType
}
Marshaller interface for pluggable serialization
type MarshallerFunc ¶
MarshallerFunc adapter
func (MarshallerFunc) ContentType ¶ added in v0.0.17
func (f MarshallerFunc) ContentType() ContentType
type Message ¶
type Message struct {
Headers map[string]string `msgpack:"h" json:"headers"`
Queue string `msgpack:"q" json:"queue"`
Payload []byte `msgpack:"p" json:"payload"`
Command consts.CMD `msgpack:"c" json:"command"`
Version uint8 `msgpack:"v" json:"version"`
Timestamp int64 `msgpack:"t" json:"timestamp"`
ID string `msgpack:"i" json:"id,omitempty"`
Flags MessageFlag `msgpack:"f" json:"flags"`
Type MessageType `msgpack:"mt" json:"messageType"`
FragmentID uint32 `msgpack:"fid" json:"fragmentId,omitempty"`
Fragments uint16 `msgpack:"fs" json:"fragments,omitempty"`
Sequence uint16 `msgpack:"seq" json:"sequence,omitempty"`
}
Message represents a protocol message with validation
func Deserialize ¶
Deserialize converts bytes to message with validation
func NewMessage ¶
func NewMessage(cmd consts.CMD, payload []byte, queue string, headers map[string]string) (*Message, error)
NewMessage creates a validated message
type MessageFlag ¶ added in v0.0.17
type MessageFlag uint16
MessageFlag represents various flags that can be set on messages
const ( FlagNone MessageFlag = 0 FlagFragmented MessageFlag = 1 << iota FlagCompressed FlagEncrypted FlagHighPriority FlagRedelivered FlagNoAck )
type MessageType ¶ added in v0.0.17
type MessageType uint8
MessageType indicates the type of message being sent
const ( MessageTypeStandard MessageType = iota MessageTypeFragment MessageTypeHeartbeat MessageTypeAck MessageTypeError )
type MockConn ¶ added in v0.0.17
type MockConn struct {
ReadBuffer *bytes.Buffer
WriteBuffer *bytes.Buffer
ReadDelay time.Duration
WriteDelay time.Duration
IsClosed bool
ReadErr error
WriteErr error
// contains filtered or unexported fields
}
MockConn implements net.Conn for testing
func NewMockConn ¶ added in v0.0.17
func NewMockConn() *MockConn
NewMockConn creates a new mock connection
func (*MockConn) RemoteAddr ¶ added in v0.0.17
RemoteAddr implements the net.Conn RemoteAddr method
func (*MockConn) SetDeadline ¶ added in v0.0.17
SetDeadline implements the net.Conn SetDeadline method
func (*MockConn) SetReadDeadline ¶ added in v0.0.17
SetReadDeadline implements the net.Conn SetReadDeadline method
func (*MockConn) SetWriteDeadline ¶ added in v0.0.17
SetWriteDeadline implements the net.Conn SetWriteDeadline method
type SerializationConfig ¶ added in v0.0.17
type SerializationConfig struct {
EnableCompression bool
CompressionLevel int
MaxCompressionRatio float64
EnableEncryption bool
EncryptionKey []byte
PreferredCipher string // "chacha20poly1305" or "aes-gcm"
}
SerializationConfig holds serialization configuration
func DefaultSerializationConfig ¶ added in v0.0.17
func DefaultSerializationConfig() *SerializationConfig
DefaultSerializationConfig returns default configuration
type SerializationManager ¶ added in v0.0.17
type SerializationManager struct {
// contains filtered or unexported fields
}
SerializationManager manages serialization with configuration
func NewSerializationManager ¶ added in v0.0.17
func NewSerializationManager(config *SerializationConfig) *SerializationManager
NewSerializationManager creates a new serialization manager
func (*SerializationManager) Marshal ¶ added in v0.0.17
func (sm *SerializationManager) Marshal(v any) ([]byte, error)
Marshal serializes data with optional compression and encryption
func (*SerializationManager) SetEncryptionKey ¶ added in v0.0.17
func (sm *SerializationManager) SetEncryptionKey(key []byte) error
SetEncryptionKey sets the encryption key
func (*SerializationManager) SetMarshaller ¶ added in v0.0.17
func (sm *SerializationManager) SetMarshaller(marshaller Marshaller)
SetMarshaller sets custom marshaller
func (*SerializationManager) SetUnmarshaller ¶ added in v0.0.17
func (sm *SerializationManager) SetUnmarshaller(unmarshaller Unmarshaller)
SetUnmarshaller sets custom unmarshaller
type Stats ¶ added in v0.0.17
type Stats struct {
MessagesSent uint64
MessagesReceived uint64
BytesSent uint64
BytesReceived uint64
Errors uint64
// contains filtered or unexported fields
}
Stats tracks codec statistics
type Unmarshaller ¶ added in v0.0.17
type Unmarshaller interface {
Unmarshal(data []byte, v any) error
ContentType() ContentType
}
Unmarshaller interface for pluggable deserialization
type UnmarshallerFunc ¶
UnmarshallerFunc adapter
func (UnmarshallerFunc) ContentType ¶ added in v0.0.17
func (f UnmarshallerFunc) ContentType() ContentType