codec

package
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2025 License: MIT Imports: 20 Imported by: 0

README

Message Queue Codec

This package provides a robust, production-ready codec implementation for serializing, transmitting, and deserializing messages in a distributed messaging system.

Features

  • Message Validation: Comprehensive validation of message format and content
  • Efficient Serialization: Pluggable serialization with JSON as default
  • Compression: Optional payload compression for large messages
  • Encryption: Optional message encryption for sensitive data
  • Large Message Support: Automatic fragmentation and reassembly of large messages
  • Connection Health: Heartbeat mechanism for connection monitoring
  • Performance Optimized: Buffer pooling, efficient memory usage
  • Robust Error Handling: Detailed error types and error wrapping
  • Timeout Management: Context-aware deadline handling
  • Observability: Built-in statistics tracking

Usage

Basic Message Sending/Receiving
// Create a message
msg, err := codec.NewMessage(consts.CmdPublish, payload, "my-queue", headers)
if err != nil {
    log.Fatalf("Failed to create message: %v", err)
}

// Send the message
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

codec := codec.NewCodec(codec.DefaultConfig())
if err := codec.SendMessage(ctx, conn, msg); err != nil {
    log.Fatalf("Failed to send message: %v", err)
}

// Receive a message
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

receivedMsg, err := codec.ReadMessage(ctx, conn)
if err != nil {
    log.Fatalf("Failed to receive message: %v", err)
}
Custom Serialization
// Set a custom marshaller/unmarshaller
codec.SetMarshaller(func(v any) ([]byte, error) {
    // Custom serialization logic
    return someCustomSerializer.Marshal(v)
})

codec.SetUnmarshaller(func(data []byte, v any) error {
    // Custom deserialization logic
    return someCustomSerializer.Unmarshal(data, v)
})
Enabling Compression
// Enable compression globally
codec.EnableCompression(true)

// Or configure it per codec instance
config := codec.DefaultConfig()
config.EnableCompression = true
codec := codec.NewCodec(config)
Enabling Encryption
// Generate a secure key
key := make([]byte, 32) // 256-bit key
if _, err := rand.Read(key); err != nil {
    log.Fatalf("Failed to generate key: %v", err)
}

// Enable encryption globally
codec.EnableEncryption(true, key)

// Or configure it per serialization manager
config := codec.DefaultSerializationConfig()
config.EnableEncryption = true
config.EncryptionKey = key
config.PreferredCipher = "chacha20poly1305" // or "aes-gcm"
Connection Health Monitoring
// Create a heartbeat manager
codec := codec.NewCodec(codec.DefaultConfig())
hm := codec.NewHeartbeatManager(codec, conn)

// Configure heartbeat
hm.SetInterval(15 * time.Second)
hm.SetTimeout(45 * time.Second)
hm.SetOnFailure(func(err error) {
    log.Printf("Heartbeat failure: %v", err)
    // Take action like closing connection
})

// Start heartbeat monitoring
hm.Start()
defer hm.Stop()

Configuration

The codec behavior can be customized through the Config struct:

config := &codec.Config{
    MaxMessageSize:    32 * 1024 * 1024, // 32MB max message size
    MaxHeaderSize:     64 * 1024,       // 64KB max header size
    MaxQueueLength:    128,             // Max queue name length
    ReadTimeout:       15 * time.Second,
    WriteTimeout:      10 * time.Second,
    EnableCompression: true,
    BufferPoolSize:    2000,
}

codec := codec.NewCodec(config)

Error Handling

The codec provides detailed error types for different failure scenarios:

  • ErrMessageTooLarge: Message exceeds maximum size
  • ErrInvalidMessage: Invalid message format
  • ErrInvalidQueue: Invalid queue name
  • ErrInvalidCommand: Invalid command
  • ErrConnectionClosed: Connection closed
  • ErrTimeout: Operation timeout
  • ErrProtocolMismatch: Protocol version mismatch
  • ErrFragmentationRequired: Message requires fragmentation
  • ErrInvalidFragment: Invalid message fragment
  • ErrFragmentTimeout: Timed out waiting for fragments
  • ErrFragmentMissing: Missing fragments in sequence

Error handling example:

if err := codec.SendMessage(ctx, conn, msg); err != nil {
    if errors.Is(err, codec.ErrMessageTooLarge) {
        // Handle message size error
    } else if errors.Is(err, codec.ErrTimeout) {
        // Handle timeout error
    } else {
        // Handle other errors
    }
}

Testing

The codec package includes testing utilities for validation and performance testing:

ts := codec.NewCodecTestSuite()

// Test basic message sending/receiving
msg, _ := codec.NewMessage(consts.CmdPublish, []byte("test"), "test-queue", nil)
if err := ts.SendReceiveTest(msg); err != nil {
    log.Fatalf("Test failed: %v", err)
}

// Test fragmentation/reassembly
largePayload := make([]byte, 20*1024*1024) // 20MB payload
rand.Read(largePayload)                   // Fill with random data
if err := ts.FragmentationTest(largePayload); err != nil {
    log.Fatalf("Fragmentation test failed: %v", err)
}

Documentation

Index

Constants

View Source
const (
	ProtocolVersion        = uint8(1)
	MaxMessageSize         = 64 * 1024 * 1024 // 64MB default limit
	MaxHeaderSize          = 1024 * 1024      // 1MB header limit
	MaxQueueLength         = 255              // Max queue name length
	FragmentationThreshold = 16 * 1024 * 1024 // Messages larger than 16MB will be fragmented
	FragmentSize           = 8 * 1024 * 1024  // 8MB fragment size
	MaxFragments           = 256              // Maximum fragments per message
)

Protocol version for backward compatibility

Variables

View Source
var (
	ErrMessageTooLarge       = errors.New("message exceeds maximum size")
	ErrInvalidMessage        = errors.New("invalid message format")
	ErrInvalidQueue          = errors.New("invalid queue name")
	ErrInvalidCommand        = errors.New("invalid command")
	ErrConnectionClosed      = errors.New("connection closed")
	ErrTimeout               = errors.New("operation timeout")
	ErrProtocolMismatch      = errors.New("protocol version mismatch")
	ErrFragmentationRequired = errors.New("message requires fragmentation")
	ErrInvalidFragment       = errors.New("invalid message fragment")
	ErrFragmentTimeout       = errors.New("timed out waiting for fragments")
	ErrFragmentMissing       = errors.New("missing fragments in sequence")
)

Error definitions

View Source
var (
	ErrSerializationFailed   = errors.New("serialization failed")
	ErrDeserializationFailed = errors.New("deserialization failed")
	ErrCompressionFailed     = errors.New("compression failed")
	ErrDecompressionFailed   = errors.New("decompression failed")
	ErrEncryptionFailed      = errors.New("encryption failed")
	ErrDecryptionFailed      = errors.New("decryption failed")
	ErrInvalidKey            = errors.New("invalid encryption key")
)

Error definitions for serialization

Functions

func EnableCompression added in v0.0.17

func EnableCompression(enable bool)

func EnableEncryption added in v0.0.17

func EnableEncryption(enable bool, key []byte) error

func Marshal

func Marshal(v any) ([]byte, error)

func SendMessage

func SendMessage(ctx context.Context, conn net.Conn, msg *Message) error

SendMessage Backward compatibility functions

func SetMarshaller

func SetMarshaller(marshaller MarshallerFunc)

Global functions for backward compatibility

func SetUnmarshaller

func SetUnmarshaller(unmarshaller UnmarshallerFunc)

func Unmarshal

func Unmarshal(data []byte, v any) error

Types

type Codec added in v0.0.17

type Codec struct {
	// contains filtered or unexported fields
}

Codec handles message encoding/decoding with configuration

func NewCodec added in v0.0.17

func NewCodec(config *Config) *Codec

NewCodec creates a new codec with configuration

func (*Codec) GetStats added in v0.0.17

func (c *Codec) GetStats() Stats

GetStats returns codec statistics

func (*Codec) ReadMessage added in v0.0.17

func (c *Codec) ReadMessage(ctx context.Context, conn net.Conn) (*Message, error)

ReadMessage reads a message WITHOUT any timeouts for persistent broker-consumer connections

func (*Codec) ResetStats added in v0.0.17

func (c *Codec) ResetStats()

ResetStats resets codec statistics

func (*Codec) SendMessage added in v0.0.17

func (c *Codec) SendMessage(ctx context.Context, conn net.Conn, msg *Message) error

SendMessage sends a message with proper error handling and timeouts

type CodecTestSuite added in v0.0.17

type CodecTestSuite struct {
	Codec  *Codec
	Config *Config
}

CodecTestSuite provides utilities for testing the codec

func NewCodecTestSuite added in v0.0.17

func NewCodecTestSuite() *CodecTestSuite

NewCodecTestSuite creates a new codec test suite

func (*CodecTestSuite) FragmentationTest added in v0.0.17

func (ts *CodecTestSuite) FragmentationTest(payload []byte) error

FragmentationTest tests the fragmentation and reassembly of large messages

func (*CodecTestSuite) SendReceiveTest added in v0.0.17

func (ts *CodecTestSuite) SendReceiveTest(msg *Message) error

SendReceiveTest tests sending and receiving a message

type Config added in v0.0.17

type Config struct {
	MaxMessageSize    uint32
	MaxHeaderSize     uint32
	MaxQueueLength    uint8
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	EnableCompression bool
	BufferPoolSize    int
}

Config holds codec configuration

func DefaultConfig added in v0.0.17

func DefaultConfig() *Config

DefaultConfig returns default configuration with NO timeouts for persistent connections

type ContentType added in v0.0.17

type ContentType string

ContentType represents the content type of serialized data

const (
	ContentTypeJSON    ContentType = "application/json"
	ContentTypeMsgPack ContentType = "application/msgpack"
	ContentTypeCBOR    ContentType = "application/cbor"
)

type FragmentManager added in v0.0.17

type FragmentManager struct {
	// contains filtered or unexported fields
}

FragmentManager handles message fragmentation and reassembly

func NewFragmentManager added in v0.0.17

func NewFragmentManager(codec *Codec, config *Config) *FragmentManager

NewFragmentManager creates a new fragment manager

func (*FragmentManager) Stop added in v0.0.17

func (fm *FragmentManager) Stop()

Stop stops the fragment manager

type HeartbeatManager added in v0.0.17

type HeartbeatManager struct {
	// contains filtered or unexported fields
}

HeartbeatManager manages heartbeat messages for connection health monitoring

func NewHeartbeatManager added in v0.0.17

func NewHeartbeatManager(codec *Codec, conn net.Conn) *HeartbeatManager

NewHeartbeatManager creates a new heartbeat manager

func (*HeartbeatManager) GetStats added in v0.0.17

func (hm *HeartbeatManager) GetStats() map[string]uint64

GetStats returns heartbeat statistics

func (*HeartbeatManager) IsRunning added in v0.0.17

func (hm *HeartbeatManager) IsRunning() bool

IsRunning returns whether the heartbeat manager is running

func (*HeartbeatManager) RecordHeartbeat added in v0.0.17

func (hm *HeartbeatManager) RecordHeartbeat()

RecordHeartbeat records a received heartbeat

func (*HeartbeatManager) SetInterval added in v0.0.17

func (hm *HeartbeatManager) SetInterval(interval time.Duration)

SetInterval sets the heartbeat interval

func (*HeartbeatManager) SetOnFailure added in v0.0.17

func (hm *HeartbeatManager) SetOnFailure(fn func(error))

SetOnFailure sets the callback function for heartbeat failures

func (*HeartbeatManager) SetTimeout added in v0.0.17

func (hm *HeartbeatManager) SetTimeout(timeout time.Duration)

SetTimeout sets the heartbeat timeout

func (*HeartbeatManager) Start added in v0.0.17

func (hm *HeartbeatManager) Start()

Start starts the heartbeat monitoring

func (*HeartbeatManager) Stop added in v0.0.17

func (hm *HeartbeatManager) Stop()

Stop stops the heartbeat monitoring

type Marshaller added in v0.0.17

type Marshaller interface {
	Marshal(v any) ([]byte, error)
	ContentType() ContentType
}

Marshaller interface for pluggable serialization

type MarshallerFunc

type MarshallerFunc func(v any) ([]byte, error)

MarshallerFunc adapter

func (MarshallerFunc) ContentType added in v0.0.17

func (f MarshallerFunc) ContentType() ContentType

func (MarshallerFunc) Marshal

func (f MarshallerFunc) Marshal(v any) ([]byte, error)

type Message

type Message struct {
	Headers    map[string]string `msgpack:"h" json:"headers"`
	Queue      string            `msgpack:"q" json:"queue"`
	Payload    []byte            `msgpack:"p" json:"payload"`
	Command    consts.CMD        `msgpack:"c" json:"command"`
	Version    uint8             `msgpack:"v" json:"version"`
	Timestamp  int64             `msgpack:"t" json:"timestamp"`
	ID         string            `msgpack:"i" json:"id,omitempty"`
	Flags      MessageFlag       `msgpack:"f" json:"flags"`
	Type       MessageType       `msgpack:"mt" json:"messageType"`
	FragmentID uint32            `msgpack:"fid" json:"fragmentId,omitempty"`
	Fragments  uint16            `msgpack:"fs" json:"fragments,omitempty"`
	Sequence   uint16            `msgpack:"seq" json:"sequence,omitempty"`
}

Message represents a protocol message with validation

func Deserialize

func Deserialize(data []byte) (*Message, error)

Deserialize converts bytes to message with validation

func NewMessage

func NewMessage(cmd consts.CMD, payload []byte, queue string, headers map[string]string) (*Message, error)

NewMessage creates a validated message

func ReadMessage

func ReadMessage(ctx context.Context, conn net.Conn) (*Message, error)

func (*Message) Serialize

func (m *Message) Serialize() ([]byte, error)

Serialize converts message to bytes with validation

func (*Message) Validate added in v0.0.17

func (m *Message) Validate(config *Config) error

Validate performs message validation

type MessageFlag added in v0.0.17

type MessageFlag uint16

MessageFlag represents various flags that can be set on messages

const (
	FlagNone       MessageFlag = 0
	FlagFragmented MessageFlag = 1 << iota
	FlagCompressed
	FlagEncrypted
	FlagHighPriority
	FlagRedelivered
	FlagNoAck
)

type MessageType added in v0.0.17

type MessageType uint8

MessageType indicates the type of message being sent

const (
	MessageTypeStandard MessageType = iota
	MessageTypeFragment
	MessageTypeHeartbeat
	MessageTypeAck
	MessageTypeError
)

type MockConn added in v0.0.17

type MockConn struct {
	ReadBuffer  *bytes.Buffer
	WriteBuffer *bytes.Buffer
	ReadDelay   time.Duration
	WriteDelay  time.Duration
	IsClosed    bool
	ReadErr     error
	WriteErr    error
	// contains filtered or unexported fields
}

MockConn implements net.Conn for testing

func NewMockConn added in v0.0.17

func NewMockConn() *MockConn

NewMockConn creates a new mock connection

func (*MockConn) Close added in v0.0.17

func (m *MockConn) Close() error

Close implements the net.Conn Close method

func (*MockConn) LocalAddr added in v0.0.17

func (m *MockConn) LocalAddr() net.Addr

LocalAddr implements the net.Conn LocalAddr method

func (*MockConn) Read added in v0.0.17

func (m *MockConn) Read(b []byte) (n int, err error)

Read implements the net.Conn Read method

func (*MockConn) RemoteAddr added in v0.0.17

func (m *MockConn) RemoteAddr() net.Addr

RemoteAddr implements the net.Conn RemoteAddr method

func (*MockConn) SetDeadline added in v0.0.17

func (m *MockConn) SetDeadline(t time.Time) error

SetDeadline implements the net.Conn SetDeadline method

func (*MockConn) SetReadDeadline added in v0.0.17

func (m *MockConn) SetReadDeadline(t time.Time) error

SetReadDeadline implements the net.Conn SetReadDeadline method

func (*MockConn) SetWriteDeadline added in v0.0.17

func (m *MockConn) SetWriteDeadline(t time.Time) error

SetWriteDeadline implements the net.Conn SetWriteDeadline method

func (*MockConn) Write added in v0.0.17

func (m *MockConn) Write(b []byte) (n int, err error)

Write implements the net.Conn Write method

type SerializationConfig added in v0.0.17

type SerializationConfig struct {
	EnableCompression   bool
	CompressionLevel    int
	MaxCompressionRatio float64
	EnableEncryption    bool
	EncryptionKey       []byte
	PreferredCipher     string // "chacha20poly1305" or "aes-gcm"
}

SerializationConfig holds serialization configuration

func DefaultSerializationConfig added in v0.0.17

func DefaultSerializationConfig() *SerializationConfig

DefaultSerializationConfig returns default configuration

type SerializationManager added in v0.0.17

type SerializationManager struct {
	// contains filtered or unexported fields
}

SerializationManager manages serialization with configuration

func NewSerializationManager added in v0.0.17

func NewSerializationManager(config *SerializationConfig) *SerializationManager

NewSerializationManager creates a new serialization manager

func (*SerializationManager) Marshal added in v0.0.17

func (sm *SerializationManager) Marshal(v any) ([]byte, error)

Marshal serializes data with optional compression and encryption

func (*SerializationManager) SetEncryptionKey added in v0.0.17

func (sm *SerializationManager) SetEncryptionKey(key []byte) error

SetEncryptionKey sets the encryption key

func (*SerializationManager) SetMarshaller added in v0.0.17

func (sm *SerializationManager) SetMarshaller(marshaller Marshaller)

SetMarshaller sets custom marshaller

func (*SerializationManager) SetUnmarshaller added in v0.0.17

func (sm *SerializationManager) SetUnmarshaller(unmarshaller Unmarshaller)

SetUnmarshaller sets custom unmarshaller

func (*SerializationManager) Unmarshal added in v0.0.17

func (sm *SerializationManager) Unmarshal(data []byte, v any) error

Unmarshal deserializes data with optional decompression and decryption

type Stats added in v0.0.17

type Stats struct {
	MessagesSent     uint64
	MessagesReceived uint64
	BytesSent        uint64
	BytesReceived    uint64
	Errors           uint64
	// contains filtered or unexported fields
}

Stats tracks codec statistics

type Unmarshaller added in v0.0.17

type Unmarshaller interface {
	Unmarshal(data []byte, v any) error
	ContentType() ContentType
}

Unmarshaller interface for pluggable deserialization

type UnmarshallerFunc

type UnmarshallerFunc func(data []byte, v any) error

UnmarshallerFunc adapter

func (UnmarshallerFunc) ContentType added in v0.0.17

func (f UnmarshallerFunc) ContentType() ContentType

func (UnmarshallerFunc) Unmarshal

func (f UnmarshallerFunc) Unmarshal(data []byte, v any) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL