Documentation
¶
Index ¶
- type CompressedSerializer
- type ConnectionPool
- func (cp *ConnectionPool) Available() int
- func (cp *ConnectionPool) Cleanup() int
- func (cp *ConnectionPool) Close(peerID string) error
- func (cp *ConnectionPool) CloseAll()
- func (cp *ConnectionPool) Get(ctx context.Context, peerID string, creator func() (any, error)) (*PooledConnection, error)
- func (cp *ConnectionPool) GetAllConnections() map[string]*PooledConnection
- func (cp *ConnectionPool) GetConnection(peerID string) (*PooledConnection, bool)
- func (cp *ConnectionPool) GetStatistics() PoolStatistics
- func (cp *ConnectionPool) MarkHealthy(peerID string)
- func (cp *ConnectionPool) MarkUnhealthy(peerID string)
- func (cp *ConnectionPool) MonitorHealth(ctx context.Context)
- func (cp *ConnectionPool) Release(peerID string)
- func (cp *ConnectionPool) Size() int
- type GRPCTransport
- func (gt *GRPCTransport) AddPeer(nodeID, address string, port int) error
- func (gt *GRPCTransport) GetAddress() string
- func (gt *GRPCTransport) Receive() <-chan internal.Message
- func (gt *GRPCTransport) RemovePeer(nodeID string) error
- func (gt *GRPCTransport) Send(ctx context.Context, target string, message any) error
- func (gt *GRPCTransport) Start(ctx context.Context) error
- func (gt *GRPCTransport) Stop(ctx context.Context) error
- type GRPCTransportConfig
- type GobSerializer
- type HTTPTransport
- func (ht *HTTPTransport) AddPeer(nodeID, address string, port int) error
- func (ht *HTTPTransport) GetAddress() string
- func (ht *HTTPTransport) Receive() <-chan internal.Message
- func (ht *HTTPTransport) RemovePeer(nodeID string) error
- func (ht *HTTPTransport) Send(ctx context.Context, target string, message any) error
- func (ht *HTTPTransport) Start(ctx context.Context) error
- func (ht *HTTPTransport) Stop(ctx context.Context) error
- type HTTPTransportConfig
- type JSONSerializer
- type LocalTransport
- func (t *LocalTransport) AddPeer(nodeID, address string, port int) error
- func (t *LocalTransport) Connect(peer *LocalTransport)
- func (t *LocalTransport) Disconnect(peerID string)
- func (t *LocalTransport) DisconnectAll()
- func (t *LocalTransport) GetAddress() string
- func (t *LocalTransport) GetPeers() []string
- func (t *LocalTransport) Receive() <-chan internal.Message
- func (t *LocalTransport) RemovePeer(nodeID string) error
- func (t *LocalTransport) Send(ctx context.Context, target string, message any) error
- func (t *LocalTransport) SetLatency(latency time.Duration)
- func (t *LocalTransport) Start(ctx context.Context) error
- func (t *LocalTransport) Stop(ctx context.Context) error
- type LocalTransportConfig
- type MessageCodec
- type MessageEnvelope
- type MetricsCollector
- type MiddlewareChain
- type PoolConfig
- type PoolStatistics
- type PooledConnection
- type ProtobufSerializer
- type Serializer
- type SerializerType
- type TCPTransport
- func (tt *TCPTransport) AddPeer(nodeID, address string, port int) error
- func (tt *TCPTransport) GetAddress() string
- func (tt *TCPTransport) Receive() <-chan internal.Message
- func (tt *TCPTransport) RemovePeer(nodeID string) error
- func (tt *TCPTransport) Send(ctx context.Context, target string, message any) error
- func (tt *TCPTransport) Start(ctx context.Context) error
- func (tt *TCPTransport) Stop(ctx context.Context) error
- type TCPTransportConfig
- type TLSConfig
- type TLSManager
- func (tm *TLSManager) GetCertificateInfo() map[string]any
- func (tm *TLSManager) GetTLSConfig() *tls.Config
- func (tm *TLSManager) IsEnabled() bool
- func (tm *TLSManager) IsMTLSEnabled() bool
- func (tm *TLSManager) ReloadCertificates() error
- func (tm *TLSManager) ShouldReload() bool
- func (tm *TLSManager) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
- type TransportHandler
- type TransportMiddleware
- func AuthenticationMiddleware(verifyFunc func(string) bool, logger forge.Logger) TransportMiddleware
- func CircuitBreakerMiddleware(threshold int, resetTimeout time.Duration, logger forge.Logger) TransportMiddleware
- func CompressionMiddleware(minSize int, logger forge.Logger) TransportMiddleware
- func DeduplicationMiddleware(window time.Duration, logger forge.Logger) TransportMiddleware
- func LoggingMiddleware(logger forge.Logger) TransportMiddleware
- func MetricsMiddleware(collector MetricsCollector, logger forge.Logger) TransportMiddleware
- func RateLimitMiddleware(maxRate int, window time.Duration, logger forge.Logger) TransportMiddleware
- func RetryMiddleware(maxRetries int, backoff time.Duration, logger forge.Logger) TransportMiddleware
- func TimeoutMiddleware(timeout time.Duration, logger forge.Logger) TransportMiddleware
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CompressedSerializer ¶
type CompressedSerializer struct {
// contains filtered or unexported fields
}
CompressedSerializer wraps a serializer with compression.
func NewCompressedSerializer ¶
func NewCompressedSerializer(serializer Serializer) *CompressedSerializer
NewCompressedSerializer creates a compressed serializer.
func (*CompressedSerializer) Deserialize ¶
func (cs *CompressedSerializer) Deserialize(data []byte, msg any) error
Deserialize decompresses and deserializes.
func (*CompressedSerializer) Serialize ¶
func (cs *CompressedSerializer) Serialize(msg any) ([]byte, error)
Serialize compresses and serializes.
func (*CompressedSerializer) Type ¶
func (cs *CompressedSerializer) Type() SerializerType
Type returns the underlying serializer type.
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages connection pooling for transports.
func NewConnectionPool ¶
func NewConnectionPool(nodeID string, config PoolConfig, logger forge.Logger) *ConnectionPool
NewConnectionPool creates a new connection pool.
func (*ConnectionPool) Available ¶
func (cp *ConnectionPool) Available() int
Available returns the number of available connection slots.
func (*ConnectionPool) Cleanup ¶
func (cp *ConnectionPool) Cleanup() int
Cleanup removes expired connections.
func (*ConnectionPool) Close ¶
func (cp *ConnectionPool) Close(peerID string) error
Close closes a connection.
func (*ConnectionPool) CloseAll ¶
func (cp *ConnectionPool) CloseAll()
CloseAll closes all connections.
func (*ConnectionPool) Get ¶
func (cp *ConnectionPool) Get(ctx context.Context, peerID string, creator func() (any, error)) (*PooledConnection, error)
Get gets or creates a connection for a peer.
func (*ConnectionPool) GetAllConnections ¶
func (cp *ConnectionPool) GetAllConnections() map[string]*PooledConnection
GetAllConnections returns all connections.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection(peerID string) (*PooledConnection, bool)
GetConnection returns a connection for a peer.
func (*ConnectionPool) GetStatistics ¶
func (cp *ConnectionPool) GetStatistics() PoolStatistics
GetStatistics returns pool statistics.
func (*ConnectionPool) MarkHealthy ¶
func (cp *ConnectionPool) MarkHealthy(peerID string)
MarkHealthy marks a connection as healthy.
func (*ConnectionPool) MarkUnhealthy ¶
func (cp *ConnectionPool) MarkUnhealthy(peerID string)
MarkUnhealthy marks a connection as unhealthy.
func (*ConnectionPool) MonitorHealth ¶
func (cp *ConnectionPool) MonitorHealth(ctx context.Context)
MonitorHealth monitors connection health.
func (*ConnectionPool) Release ¶
func (cp *ConnectionPool) Release(peerID string)
Release releases a connection back to the pool.
func (*ConnectionPool) Size ¶
func (cp *ConnectionPool) Size() int
Size returns the current pool size.
type GRPCTransport ¶
type GRPCTransport struct {
// contains filtered or unexported fields
}
GRPCTransport implements gRPC-based network transport.
func NewGRPCTransport ¶
func NewGRPCTransport(config GRPCTransportConfig, logger forge.Logger) *GRPCTransport
NewGRPCTransport creates a new gRPC transport.
func (*GRPCTransport) AddPeer ¶
func (gt *GRPCTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport.
func (*GRPCTransport) GetAddress ¶
func (gt *GRPCTransport) GetAddress() string
GetAddress returns the local address.
func (*GRPCTransport) Receive ¶
func (gt *GRPCTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages.
func (*GRPCTransport) RemovePeer ¶
func (gt *GRPCTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport.
type GRPCTransportConfig ¶
type GRPCTransportConfig struct {
NodeID string
Address string
Port int
BufferSize int
MaxConnections int
ConnectionTimeout time.Duration
RequestTimeout time.Duration
KeepAliveTime time.Duration
KeepAliveTimeout time.Duration
MaxConcurrentStreams uint32
TLSConfig *tls.Config
}
GRPCTransportConfig contains gRPC transport configuration.
type GobSerializer ¶
type GobSerializer struct{}
GobSerializer implements gob-based serialization.
func NewGobSerializer ¶
func NewGobSerializer() *GobSerializer
NewGobSerializer creates a new gob serializer.
func (*GobSerializer) Deserialize ¶
func (gs *GobSerializer) Deserialize(data []byte, msg any) error
Deserialize deserializes a message using gob.
func (*GobSerializer) Serialize ¶
func (gs *GobSerializer) Serialize(msg any) ([]byte, error)
Serialize serializes a message using gob.
func (*GobSerializer) Type ¶
func (gs *GobSerializer) Type() SerializerType
Type returns the serializer type.
type HTTPTransport ¶
type HTTPTransport struct {
// contains filtered or unexported fields
}
HTTPTransport implements HTTP-based network transport.
func NewHTTPTransport ¶
func NewHTTPTransport(config HTTPTransportConfig, logger forge.Logger) *HTTPTransport
NewHTTPTransport creates a new HTTP transport.
func (*HTTPTransport) AddPeer ¶
func (ht *HTTPTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport.
func (*HTTPTransport) GetAddress ¶
func (ht *HTTPTransport) GetAddress() string
GetAddress returns the local address.
func (*HTTPTransport) Receive ¶
func (ht *HTTPTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages.
func (*HTTPTransport) RemovePeer ¶
func (ht *HTTPTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport.
type HTTPTransportConfig ¶
type HTTPTransportConfig struct {
NodeID string
Address string
Port int
BufferSize int
RequestTimeout time.Duration
IdleConnTimeout time.Duration
MaxIdleConns int
MaxIdleConnsPerHost int
}
HTTPTransportConfig contains HTTP transport configuration.
type JSONSerializer ¶
type JSONSerializer struct{}
JSONSerializer implements JSON-based serialization.
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSON serializer.
func (*JSONSerializer) Deserialize ¶
func (js *JSONSerializer) Deserialize(data []byte, msg any) error
Deserialize deserializes a message using JSON.
func (*JSONSerializer) Serialize ¶
func (js *JSONSerializer) Serialize(msg any) ([]byte, error)
Serialize serializes a message using JSON.
func (*JSONSerializer) Type ¶
func (js *JSONSerializer) Type() SerializerType
Type returns the serializer type.
type LocalTransport ¶
type LocalTransport struct {
// contains filtered or unexported fields
}
LocalTransport implements an in-memory transport for testing.
func NewLocalTransport ¶
func NewLocalTransport(config LocalTransportConfig, logger forge.Logger) *LocalTransport
NewLocalTransport creates a new local transport.
func (*LocalTransport) AddPeer ¶
func (t *LocalTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport.
func (*LocalTransport) Connect ¶
func (t *LocalTransport) Connect(peer *LocalTransport)
Connect connects this transport to another local transport This is used for testing to establish bidirectional communication.
func (*LocalTransport) Disconnect ¶
func (t *LocalTransport) Disconnect(peerID string)
Disconnect disconnects this transport from a peer.
func (*LocalTransport) DisconnectAll ¶
func (t *LocalTransport) DisconnectAll()
DisconnectAll disconnects from all peers.
func (*LocalTransport) GetAddress ¶
func (t *LocalTransport) GetAddress() string
GetAddress returns the local address.
func (*LocalTransport) GetPeers ¶
func (t *LocalTransport) GetPeers() []string
GetPeers returns a list of connected peer IDs.
func (*LocalTransport) Receive ¶
func (t *LocalTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages.
func (*LocalTransport) RemovePeer ¶
func (t *LocalTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport.
func (*LocalTransport) SetLatency ¶
func (t *LocalTransport) SetLatency(latency time.Duration)
SetLatency sets artificial latency for testing.
type LocalTransportConfig ¶
LocalTransportConfig contains configuration for local transport.
type MessageCodec ¶
type MessageCodec struct {
// contains filtered or unexported fields
}
MessageCodec provides encoding/decoding with envelope.
func NewMessageCodec ¶
func NewMessageCodec(serializerType SerializerType) *MessageCodec
NewMessageCodec creates a new message codec.
func (*MessageCodec) Decode ¶
func (mc *MessageCodec) Decode(data []byte) (*internal.Message, error)
Decode decodes an envelope into a message.
func (*MessageCodec) Encode ¶
func (mc *MessageCodec) Encode(msg internal.Message) ([]byte, error)
Encode encodes a message into an envelope.
func (*MessageCodec) GetSerializer ¶
func (mc *MessageCodec) GetSerializer() Serializer
GetSerializer returns the underlying serializer.
type MessageEnvelope ¶
type MessageEnvelope struct {
Type internal.MessageType
From string
To string
Data []byte
Timestamp int64
SerializerType SerializerType
}
MessageEnvelope wraps a message with metadata.
type MetricsCollector ¶
type MetricsCollector interface {
RecordMessage(msg *internal.Message, duration time.Duration, success bool)
}
MetricsCollector collects transport metrics.
type MiddlewareChain ¶
type MiddlewareChain struct {
// contains filtered or unexported fields
}
MiddlewareChain chains multiple middleware.
func NewMiddlewareChain ¶
func NewMiddlewareChain(logger forge.Logger) *MiddlewareChain
NewMiddlewareChain creates a new middleware chain.
func (*MiddlewareChain) Execute ¶
func (mc *MiddlewareChain) Execute(handler TransportHandler) TransportHandler
Execute executes the middleware chain.
func (*MiddlewareChain) Use ¶
func (mc *MiddlewareChain) Use(middleware TransportMiddleware)
Use adds middleware to the chain.
type PoolConfig ¶
type PoolConfig struct {
MaxConnections int
MaxIdleTime time.Duration
MaxConnectionAge time.Duration
HealthCheckInterval time.Duration
ConnectTimeout time.Duration
}
PoolConfig contains pool configuration.
type PoolStatistics ¶
type PoolStatistics struct {
TotalConnections int64
ActiveConnections int64
IdleConnections int64
CreatedConnections int64
ClosedConnections int64
FailedConnections int64
TotalUses int64
}
PoolStatistics contains pool statistics.
type PooledConnection ¶
type PooledConnection struct {
PeerID string
Connection any
Created time.Time
LastUsed time.Time
UseCount int64
InUse bool
Healthy bool
// contains filtered or unexported fields
}
PooledConnection represents a pooled connection.
type ProtobufSerializer ¶
type ProtobufSerializer struct{}
ProtobufSerializer implements Protocol Buffers serialization.
func NewProtobufSerializer ¶
func NewProtobufSerializer() *ProtobufSerializer
NewProtobufSerializer creates a new protobuf serializer.
func (*ProtobufSerializer) Deserialize ¶
func (ps *ProtobufSerializer) Deserialize(data []byte, msg any) error
Deserialize deserializes a message using protobuf.
func (*ProtobufSerializer) Serialize ¶
func (ps *ProtobufSerializer) Serialize(msg any) ([]byte, error)
Serialize serializes a message using protobuf.
func (*ProtobufSerializer) Type ¶
func (ps *ProtobufSerializer) Type() SerializerType
Type returns the serializer type.
type Serializer ¶
type Serializer interface {
Serialize(msg any) ([]byte, error)
Deserialize(data []byte, msg any) error
Type() SerializerType
}
Serializer defines the interface for message serialization.
type SerializerType ¶
type SerializerType string
SerializerType represents the type of serializer.
const ( // SerializerGob uses Go's gob encoding. SerializerGob SerializerType = "gob" // SerializerJSON uses JSON encoding. SerializerJSON SerializerType = "json" // SerializerProtobuf uses Protocol Buffers. SerializerProtobuf SerializerType = "protobuf" )
type TCPTransport ¶
type TCPTransport struct {
// contains filtered or unexported fields
}
TCPTransport implements TCP-based network transport.
func NewTCPTransport ¶
func NewTCPTransport(config TCPTransportConfig, logger forge.Logger) *TCPTransport
NewTCPTransport creates a new TCP transport.
func (*TCPTransport) AddPeer ¶
func (tt *TCPTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport.
func (*TCPTransport) GetAddress ¶
func (tt *TCPTransport) GetAddress() string
GetAddress returns the local address.
func (*TCPTransport) Receive ¶
func (tt *TCPTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages.
func (*TCPTransport) RemovePeer ¶
func (tt *TCPTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport.
type TCPTransportConfig ¶
type TCPTransportConfig struct {
NodeID string
Address string
Port int
BufferSize int
ConnectionTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
KeepAlive time.Duration
MaxRetries int
}
TCPTransportConfig contains TCP transport configuration.
type TLSConfig ¶
type TLSConfig struct {
// Enable TLS
Enabled bool
// Certificate and key paths
CertFile string
KeyFile string
CAFile string
// mTLS configuration
RequireClientCert bool
ClientCAFile string
// TLS version
MinVersion uint16
MaxVersion uint16
// Cipher suites
CipherSuites []uint16
// Server name for SNI
ServerName string
// Skip verification (for testing only)
InsecureSkipVerify bool
}
TLSConfig contains TLS configuration for secure transport.
func DefaultTLSConfig ¶
func DefaultTLSConfig() TLSConfig
DefaultTLSConfig returns a secure default TLS configuration.
func MTLSConfig ¶
MTLSConfig returns a default mTLS configuration.
type TLSManager ¶
type TLSManager struct {
// contains filtered or unexported fields
}
TLSManager manages TLS configuration and certificate loading.
func NewTLSManager ¶
func NewTLSManager(config TLSConfig, logger forge.Logger) (*TLSManager, error)
NewTLSManager creates a new TLS manager.
func (*TLSManager) GetCertificateInfo ¶
func (tm *TLSManager) GetCertificateInfo() map[string]any
GetCertificateInfo returns information about the loaded certificate.
func (*TLSManager) GetTLSConfig ¶
func (tm *TLSManager) GetTLSConfig() *tls.Config
GetTLSConfig returns the TLS configuration.
func (*TLSManager) IsEnabled ¶
func (tm *TLSManager) IsEnabled() bool
IsEnabled returns whether TLS is enabled.
func (*TLSManager) IsMTLSEnabled ¶
func (tm *TLSManager) IsMTLSEnabled() bool
IsMTLSEnabled returns whether mTLS is enabled.
func (*TLSManager) ReloadCertificates ¶
func (tm *TLSManager) ReloadCertificates() error
ReloadCertificates reloads certificates from disk.
func (*TLSManager) ShouldReload ¶
func (tm *TLSManager) ShouldReload() bool
ShouldReload checks if certificates should be reloaded.
func (*TLSManager) VerifyPeerCertificate ¶
func (tm *TLSManager) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
VerifyPeerCertificate verifies a peer certificate.
type TransportHandler ¶
TransportHandler handles transport operations.
type TransportMiddleware ¶
type TransportMiddleware func(next TransportHandler) TransportHandler
TransportMiddleware represents transport-level middleware.
func AuthenticationMiddleware ¶
func AuthenticationMiddleware(verifyFunc func(string) bool, logger forge.Logger) TransportMiddleware
AuthenticationMiddleware creates authentication middleware.
func CircuitBreakerMiddleware ¶
func CircuitBreakerMiddleware(threshold int, resetTimeout time.Duration, logger forge.Logger) TransportMiddleware
CircuitBreakerMiddleware creates circuit breaker middleware.
func CompressionMiddleware ¶
func CompressionMiddleware(minSize int, logger forge.Logger) TransportMiddleware
CompressionMiddleware creates compression middleware.
func DeduplicationMiddleware ¶
func DeduplicationMiddleware(window time.Duration, logger forge.Logger) TransportMiddleware
DeduplicationMiddleware creates message deduplication middleware.
func LoggingMiddleware ¶
func LoggingMiddleware(logger forge.Logger) TransportMiddleware
LoggingMiddleware creates logging middleware.
func MetricsMiddleware ¶
func MetricsMiddleware(collector MetricsCollector, logger forge.Logger) TransportMiddleware
MetricsMiddleware creates metrics collection middleware.
func RateLimitMiddleware ¶
func RateLimitMiddleware(maxRate int, window time.Duration, logger forge.Logger) TransportMiddleware
RateLimitMiddleware creates rate limiting middleware.
func RetryMiddleware ¶
func RetryMiddleware(maxRetries int, backoff time.Duration, logger forge.Logger) TransportMiddleware
RetryMiddleware creates retry middleware.
func TimeoutMiddleware ¶
func TimeoutMiddleware(timeout time.Duration, logger forge.Logger) TransportMiddleware
TimeoutMiddleware creates timeout middleware.