transport

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompressedSerializer

type CompressedSerializer struct {
	// contains filtered or unexported fields
}

CompressedSerializer wraps a serializer with compression.

func NewCompressedSerializer

func NewCompressedSerializer(serializer Serializer) *CompressedSerializer

NewCompressedSerializer creates a compressed serializer.

func (*CompressedSerializer) Deserialize

func (cs *CompressedSerializer) Deserialize(data []byte, msg any) error

Deserialize decompresses and deserializes.

func (*CompressedSerializer) Serialize

func (cs *CompressedSerializer) Serialize(msg any) ([]byte, error)

Serialize compresses and serializes.

func (*CompressedSerializer) Type

Type returns the underlying serializer type.

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages connection pooling for transports.

func NewConnectionPool

func NewConnectionPool(nodeID string, config PoolConfig, logger forge.Logger) *ConnectionPool

NewConnectionPool creates a new connection pool.

func (*ConnectionPool) Available

func (cp *ConnectionPool) Available() int

Available returns the number of available connection slots.

func (*ConnectionPool) Cleanup

func (cp *ConnectionPool) Cleanup() int

Cleanup removes expired connections.

func (*ConnectionPool) Close

func (cp *ConnectionPool) Close(peerID string) error

Close closes a connection.

func (*ConnectionPool) CloseAll

func (cp *ConnectionPool) CloseAll()

CloseAll closes all connections.

func (*ConnectionPool) Get

func (cp *ConnectionPool) Get(ctx context.Context, peerID string, creator func() (any, error)) (*PooledConnection, error)

Get gets or creates a connection for a peer.

func (*ConnectionPool) GetAllConnections

func (cp *ConnectionPool) GetAllConnections() map[string]*PooledConnection

GetAllConnections returns all connections.

func (*ConnectionPool) GetConnection

func (cp *ConnectionPool) GetConnection(peerID string) (*PooledConnection, bool)

GetConnection returns a connection for a peer.

func (*ConnectionPool) GetStatistics

func (cp *ConnectionPool) GetStatistics() PoolStatistics

GetStatistics returns pool statistics.

func (*ConnectionPool) MarkHealthy

func (cp *ConnectionPool) MarkHealthy(peerID string)

MarkHealthy marks a connection as healthy.

func (*ConnectionPool) MarkUnhealthy

func (cp *ConnectionPool) MarkUnhealthy(peerID string)

MarkUnhealthy marks a connection as unhealthy.

func (*ConnectionPool) MonitorHealth

func (cp *ConnectionPool) MonitorHealth(ctx context.Context)

MonitorHealth monitors connection health.

func (*ConnectionPool) Release

func (cp *ConnectionPool) Release(peerID string)

Release releases a connection back to the pool.

func (*ConnectionPool) Size

func (cp *ConnectionPool) Size() int

Size returns the current pool size.

type GRPCTransport

type GRPCTransport struct {
	// contains filtered or unexported fields
}

GRPCTransport implements gRPC-based network transport.

func NewGRPCTransport

func NewGRPCTransport(config GRPCTransportConfig, logger forge.Logger) *GRPCTransport

NewGRPCTransport creates a new gRPC transport.

func (*GRPCTransport) AddPeer

func (gt *GRPCTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport.

func (*GRPCTransport) GetAddress

func (gt *GRPCTransport) GetAddress() string

GetAddress returns the local address.

func (*GRPCTransport) Receive

func (gt *GRPCTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages.

func (*GRPCTransport) RemovePeer

func (gt *GRPCTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport.

func (*GRPCTransport) Send

func (gt *GRPCTransport) Send(ctx context.Context, target string, message any) error

Send sends a message to a peer.

func (*GRPCTransport) Start

func (gt *GRPCTransport) Start(ctx context.Context) error

Start starts the gRPC transport.

func (*GRPCTransport) Stop

func (gt *GRPCTransport) Stop(ctx context.Context) error

Stop stops the gRPC transport.

type GRPCTransportConfig

type GRPCTransportConfig struct {
	NodeID               string
	Address              string
	Port                 int
	BufferSize           int
	MaxConnections       int
	ConnectionTimeout    time.Duration
	RequestTimeout       time.Duration
	KeepAliveTime        time.Duration
	KeepAliveTimeout     time.Duration
	MaxConcurrentStreams uint32
	TLSConfig            *tls.Config
}

GRPCTransportConfig contains gRPC transport configuration.

type GobSerializer

type GobSerializer struct{}

GobSerializer implements gob-based serialization.

func NewGobSerializer

func NewGobSerializer() *GobSerializer

NewGobSerializer creates a new gob serializer.

func (*GobSerializer) Deserialize

func (gs *GobSerializer) Deserialize(data []byte, msg any) error

Deserialize deserializes a message using gob.

func (*GobSerializer) Serialize

func (gs *GobSerializer) Serialize(msg any) ([]byte, error)

Serialize serializes a message using gob.

func (*GobSerializer) Type

func (gs *GobSerializer) Type() SerializerType

Type returns the serializer type.

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport implements HTTP-based network transport.

func NewHTTPTransport

func NewHTTPTransport(config HTTPTransportConfig, logger forge.Logger) *HTTPTransport

NewHTTPTransport creates a new HTTP transport.

func (*HTTPTransport) AddPeer

func (ht *HTTPTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport.

func (*HTTPTransport) GetAddress

func (ht *HTTPTransport) GetAddress() string

GetAddress returns the local address.

func (*HTTPTransport) Receive

func (ht *HTTPTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages.

func (*HTTPTransport) RemovePeer

func (ht *HTTPTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport.

func (*HTTPTransport) Send

func (ht *HTTPTransport) Send(ctx context.Context, target string, message any) error

Send sends a message to a peer.

func (*HTTPTransport) Start

func (ht *HTTPTransport) Start(ctx context.Context) error

Start starts the HTTP transport.

func (*HTTPTransport) Stop

func (ht *HTTPTransport) Stop(ctx context.Context) error

Stop stops the HTTP transport.

type HTTPTransportConfig

type HTTPTransportConfig struct {
	NodeID              string
	Address             string
	Port                int
	BufferSize          int
	RequestTimeout      time.Duration
	IdleConnTimeout     time.Duration
	MaxIdleConns        int
	MaxIdleConnsPerHost int
}

HTTPTransportConfig contains HTTP transport configuration.

type JSONSerializer

type JSONSerializer struct{}

JSONSerializer implements JSON-based serialization.

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new JSON serializer.

func (*JSONSerializer) Deserialize

func (js *JSONSerializer) Deserialize(data []byte, msg any) error

Deserialize deserializes a message using JSON.

func (*JSONSerializer) Serialize

func (js *JSONSerializer) Serialize(msg any) ([]byte, error)

Serialize serializes a message using JSON.

func (*JSONSerializer) Type

func (js *JSONSerializer) Type() SerializerType

Type returns the serializer type.

type LocalTransport

type LocalTransport struct {
	// contains filtered or unexported fields
}

LocalTransport implements an in-memory transport for testing.

func NewLocalTransport

func NewLocalTransport(config LocalTransportConfig, logger forge.Logger) *LocalTransport

NewLocalTransport creates a new local transport.

func (*LocalTransport) AddPeer

func (t *LocalTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport.

func (*LocalTransport) Connect

func (t *LocalTransport) Connect(peer *LocalTransport)

Connect connects this transport to another local transport This is used for testing to establish bidirectional communication.

func (*LocalTransport) Disconnect

func (t *LocalTransport) Disconnect(peerID string)

Disconnect disconnects this transport from a peer.

func (*LocalTransport) DisconnectAll

func (t *LocalTransport) DisconnectAll()

DisconnectAll disconnects from all peers.

func (*LocalTransport) GetAddress

func (t *LocalTransport) GetAddress() string

GetAddress returns the local address.

func (*LocalTransport) GetPeers

func (t *LocalTransport) GetPeers() []string

GetPeers returns a list of connected peer IDs.

func (*LocalTransport) Receive

func (t *LocalTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages.

func (*LocalTransport) RemovePeer

func (t *LocalTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport.

func (*LocalTransport) Send

func (t *LocalTransport) Send(ctx context.Context, target string, message any) error

Send sends a message to a peer.

func (*LocalTransport) SetLatency

func (t *LocalTransport) SetLatency(latency time.Duration)

SetLatency sets artificial latency for testing.

func (*LocalTransport) Start

func (t *LocalTransport) Start(ctx context.Context) error

Start starts the transport.

func (*LocalTransport) Stop

func (t *LocalTransport) Stop(ctx context.Context) error

Stop stops the transport.

type LocalTransportConfig

type LocalTransportConfig struct {
	NodeID     string
	Address    string
	BufferSize int
}

LocalTransportConfig contains configuration for local transport.

type MessageCodec

type MessageCodec struct {
	// contains filtered or unexported fields
}

MessageCodec provides encoding/decoding with envelope.

func NewMessageCodec

func NewMessageCodec(serializerType SerializerType) *MessageCodec

NewMessageCodec creates a new message codec.

func (*MessageCodec) Decode

func (mc *MessageCodec) Decode(data []byte) (*internal.Message, error)

Decode decodes an envelope into a message.

func (*MessageCodec) Encode

func (mc *MessageCodec) Encode(msg internal.Message) ([]byte, error)

Encode encodes a message into an envelope.

func (*MessageCodec) GetSerializer

func (mc *MessageCodec) GetSerializer() Serializer

GetSerializer returns the underlying serializer.

type MessageEnvelope

type MessageEnvelope struct {
	Type           internal.MessageType
	From           string
	To             string
	Data           []byte
	Timestamp      int64
	SerializerType SerializerType
}

MessageEnvelope wraps a message with metadata.

type MetricsCollector

type MetricsCollector interface {
	RecordMessage(msg *internal.Message, duration time.Duration, success bool)
}

MetricsCollector collects transport metrics.

type MiddlewareChain

type MiddlewareChain struct {
	// contains filtered or unexported fields
}

MiddlewareChain chains multiple middleware.

func NewMiddlewareChain

func NewMiddlewareChain(logger forge.Logger) *MiddlewareChain

NewMiddlewareChain creates a new middleware chain.

func (*MiddlewareChain) Execute

func (mc *MiddlewareChain) Execute(handler TransportHandler) TransportHandler

Execute executes the middleware chain.

func (*MiddlewareChain) Use

func (mc *MiddlewareChain) Use(middleware TransportMiddleware)

Use adds middleware to the chain.

type PoolConfig

type PoolConfig struct {
	MaxConnections      int
	MaxIdleTime         time.Duration
	MaxConnectionAge    time.Duration
	HealthCheckInterval time.Duration
	ConnectTimeout      time.Duration
}

PoolConfig contains pool configuration.

type PoolStatistics

type PoolStatistics struct {
	TotalConnections   int64
	ActiveConnections  int64
	IdleConnections    int64
	CreatedConnections int64
	ClosedConnections  int64
	FailedConnections  int64
	TotalUses          int64
}

PoolStatistics contains pool statistics.

type PooledConnection

type PooledConnection struct {
	PeerID     string
	Connection any
	Created    time.Time
	LastUsed   time.Time
	UseCount   int64
	InUse      bool
	Healthy    bool
	// contains filtered or unexported fields
}

PooledConnection represents a pooled connection.

type ProtobufSerializer

type ProtobufSerializer struct{}

ProtobufSerializer implements Protocol Buffers serialization.

func NewProtobufSerializer

func NewProtobufSerializer() *ProtobufSerializer

NewProtobufSerializer creates a new protobuf serializer.

func (*ProtobufSerializer) Deserialize

func (ps *ProtobufSerializer) Deserialize(data []byte, msg any) error

Deserialize deserializes a message using protobuf.

func (*ProtobufSerializer) Serialize

func (ps *ProtobufSerializer) Serialize(msg any) ([]byte, error)

Serialize serializes a message using protobuf.

func (*ProtobufSerializer) Type

Type returns the serializer type.

type Serializer

type Serializer interface {
	Serialize(msg any) ([]byte, error)
	Deserialize(data []byte, msg any) error
	Type() SerializerType
}

Serializer defines the interface for message serialization.

type SerializerType

type SerializerType string

SerializerType represents the type of serializer.

const (
	// SerializerGob uses Go's gob encoding.
	SerializerGob SerializerType = "gob"
	// SerializerJSON uses JSON encoding.
	SerializerJSON SerializerType = "json"
	// SerializerProtobuf uses Protocol Buffers.
	SerializerProtobuf SerializerType = "protobuf"
)

type TCPTransport

type TCPTransport struct {
	// contains filtered or unexported fields
}

TCPTransport implements TCP-based network transport.

func NewTCPTransport

func NewTCPTransport(config TCPTransportConfig, logger forge.Logger) *TCPTransport

NewTCPTransport creates a new TCP transport.

func (*TCPTransport) AddPeer

func (tt *TCPTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport.

func (*TCPTransport) GetAddress

func (tt *TCPTransport) GetAddress() string

GetAddress returns the local address.

func (*TCPTransport) Receive

func (tt *TCPTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages.

func (*TCPTransport) RemovePeer

func (tt *TCPTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport.

func (*TCPTransport) Send

func (tt *TCPTransport) Send(ctx context.Context, target string, message any) error

Send sends a message to a peer.

func (*TCPTransport) Start

func (tt *TCPTransport) Start(ctx context.Context) error

Start starts the TCP transport.

func (*TCPTransport) Stop

func (tt *TCPTransport) Stop(ctx context.Context) error

Stop stops the TCP transport.

type TCPTransportConfig

type TCPTransportConfig struct {
	NodeID            string
	Address           string
	Port              int
	BufferSize        int
	ConnectionTimeout time.Duration
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	KeepAlive         time.Duration
	MaxRetries        int
}

TCPTransportConfig contains TCP transport configuration.

type TLSConfig

type TLSConfig struct {
	// Enable TLS
	Enabled bool

	// Certificate and key paths
	CertFile string
	KeyFile  string
	CAFile   string

	// mTLS configuration
	RequireClientCert bool
	ClientCAFile      string

	// TLS version
	MinVersion uint16
	MaxVersion uint16

	// Cipher suites
	CipherSuites []uint16

	// Server name for SNI
	ServerName string

	// Skip verification (for testing only)
	InsecureSkipVerify bool
}

TLSConfig contains TLS configuration for secure transport.

func DefaultTLSConfig

func DefaultTLSConfig() TLSConfig

DefaultTLSConfig returns a secure default TLS configuration.

func MTLSConfig

func MTLSConfig(certFile, keyFile, caFile string) TLSConfig

MTLSConfig returns a default mTLS configuration.

type TLSManager

type TLSManager struct {
	// contains filtered or unexported fields
}

TLSManager manages TLS configuration and certificate loading.

func NewTLSManager

func NewTLSManager(config TLSConfig, logger forge.Logger) (*TLSManager, error)

NewTLSManager creates a new TLS manager.

func (*TLSManager) GetCertificateInfo

func (tm *TLSManager) GetCertificateInfo() map[string]any

GetCertificateInfo returns information about the loaded certificate.

func (*TLSManager) GetTLSConfig

func (tm *TLSManager) GetTLSConfig() *tls.Config

GetTLSConfig returns the TLS configuration.

func (*TLSManager) IsEnabled

func (tm *TLSManager) IsEnabled() bool

IsEnabled returns whether TLS is enabled.

func (*TLSManager) IsMTLSEnabled

func (tm *TLSManager) IsMTLSEnabled() bool

IsMTLSEnabled returns whether mTLS is enabled.

func (*TLSManager) ReloadCertificates

func (tm *TLSManager) ReloadCertificates() error

ReloadCertificates reloads certificates from disk.

func (*TLSManager) ShouldReload

func (tm *TLSManager) ShouldReload() bool

ShouldReload checks if certificates should be reloaded.

func (*TLSManager) VerifyPeerCertificate

func (tm *TLSManager) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error

VerifyPeerCertificate verifies a peer certificate.

type TransportHandler

type TransportHandler func(ctx context.Context, msg *internal.Message) error

TransportHandler handles transport operations.

type TransportMiddleware

type TransportMiddleware func(next TransportHandler) TransportHandler

TransportMiddleware represents transport-level middleware.

func AuthenticationMiddleware

func AuthenticationMiddleware(verifyFunc func(string) bool, logger forge.Logger) TransportMiddleware

AuthenticationMiddleware creates authentication middleware.

func CircuitBreakerMiddleware

func CircuitBreakerMiddleware(threshold int, resetTimeout time.Duration, logger forge.Logger) TransportMiddleware

CircuitBreakerMiddleware creates circuit breaker middleware.

func CompressionMiddleware

func CompressionMiddleware(minSize int, logger forge.Logger) TransportMiddleware

CompressionMiddleware creates compression middleware.

func DeduplicationMiddleware

func DeduplicationMiddleware(window time.Duration, logger forge.Logger) TransportMiddleware

DeduplicationMiddleware creates message deduplication middleware.

func LoggingMiddleware

func LoggingMiddleware(logger forge.Logger) TransportMiddleware

LoggingMiddleware creates logging middleware.

func MetricsMiddleware

func MetricsMiddleware(collector MetricsCollector, logger forge.Logger) TransportMiddleware

MetricsMiddleware creates metrics collection middleware.

func RateLimitMiddleware

func RateLimitMiddleware(maxRate int, window time.Duration, logger forge.Logger) TransportMiddleware

RateLimitMiddleware creates rate limiting middleware.

func RetryMiddleware

func RetryMiddleware(maxRetries int, backoff time.Duration, logger forge.Logger) TransportMiddleware

RetryMiddleware creates retry middleware.

func TimeoutMiddleware

func TimeoutMiddleware(timeout time.Duration, logger forge.Logger) TransportMiddleware

TimeoutMiddleware creates timeout middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL