Documentation
¶
Index ¶
- type CompressedSerializer
- type ConnectionPool
- func (cp *ConnectionPool) Available() int
- func (cp *ConnectionPool) Cleanup() int
- func (cp *ConnectionPool) Close(peerID string) error
- func (cp *ConnectionPool) CloseAll()
- func (cp *ConnectionPool) Get(ctx context.Context, peerID string, creator func() (interface{}, error)) (*PooledConnection, error)
- func (cp *ConnectionPool) GetAllConnections() map[string]*PooledConnection
- func (cp *ConnectionPool) GetConnection(peerID string) (*PooledConnection, bool)
- func (cp *ConnectionPool) GetStatistics() PoolStatistics
- func (cp *ConnectionPool) MarkHealthy(peerID string)
- func (cp *ConnectionPool) MarkUnhealthy(peerID string)
- func (cp *ConnectionPool) MonitorHealth(ctx context.Context)
- func (cp *ConnectionPool) Release(peerID string)
- func (cp *ConnectionPool) Size() int
- type GRPCTransport
- func (gt *GRPCTransport) AddPeer(nodeID, address string, port int) error
- func (gt *GRPCTransport) GetAddress() string
- func (gt *GRPCTransport) Receive() <-chan internal.Message
- func (gt *GRPCTransport) RemovePeer(nodeID string) error
- func (gt *GRPCTransport) Send(ctx context.Context, target string, message interface{}) error
- func (gt *GRPCTransport) Start(ctx context.Context) error
- func (gt *GRPCTransport) Stop(ctx context.Context) error
- type GRPCTransportConfig
- type GobSerializer
- type HTTPTransport
- func (ht *HTTPTransport) AddPeer(nodeID, address string, port int) error
- func (ht *HTTPTransport) GetAddress() string
- func (ht *HTTPTransport) Receive() <-chan internal.Message
- func (ht *HTTPTransport) RemovePeer(nodeID string) error
- func (ht *HTTPTransport) Send(ctx context.Context, target string, message interface{}) error
- func (ht *HTTPTransport) Start(ctx context.Context) error
- func (ht *HTTPTransport) Stop(ctx context.Context) error
- type HTTPTransportConfig
- type JSONSerializer
- type LocalTransport
- func (t *LocalTransport) AddPeer(nodeID, address string, port int) error
- func (t *LocalTransport) Connect(peer *LocalTransport)
- func (t *LocalTransport) Disconnect(peerID string)
- func (t *LocalTransport) DisconnectAll()
- func (t *LocalTransport) GetAddress() string
- func (t *LocalTransport) GetPeers() []string
- func (t *LocalTransport) Receive() <-chan internal.Message
- func (t *LocalTransport) RemovePeer(nodeID string) error
- func (t *LocalTransport) Send(ctx context.Context, target string, message interface{}) error
- func (t *LocalTransport) SetLatency(latency time.Duration)
- func (t *LocalTransport) Start(ctx context.Context) error
- func (t *LocalTransport) Stop(ctx context.Context) error
- type LocalTransportConfig
- type MessageCodec
- type MessageEnvelope
- type MetricsCollector
- type MiddlewareChain
- type PoolConfig
- type PoolStatistics
- type PooledConnection
- type ProtobufSerializer
- type Serializer
- type SerializerType
- type TCPTransport
- func (tt *TCPTransport) AddPeer(nodeID, address string, port int) error
- func (tt *TCPTransport) GetAddress() string
- func (tt *TCPTransport) Receive() <-chan internal.Message
- func (tt *TCPTransport) RemovePeer(nodeID string) error
- func (tt *TCPTransport) Send(ctx context.Context, target string, message interface{}) error
- func (tt *TCPTransport) Start(ctx context.Context) error
- func (tt *TCPTransport) Stop(ctx context.Context) error
- type TCPTransportConfig
- type TLSConfig
- type TLSManager
- func (tm *TLSManager) GetCertificateInfo() map[string]interface{}
- func (tm *TLSManager) GetTLSConfig() *tls.Config
- func (tm *TLSManager) IsEnabled() bool
- func (tm *TLSManager) IsMTLSEnabled() bool
- func (tm *TLSManager) ReloadCertificates() error
- func (tm *TLSManager) ShouldReload() bool
- func (tm *TLSManager) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
- type TransportHandler
- type TransportMiddleware
- func AuthenticationMiddleware(verifyFunc func(string) bool, logger forge.Logger) TransportMiddleware
- func CircuitBreakerMiddleware(threshold int, resetTimeout time.Duration, logger forge.Logger) TransportMiddleware
- func CompressionMiddleware(minSize int, logger forge.Logger) TransportMiddleware
- func DeduplicationMiddleware(window time.Duration, logger forge.Logger) TransportMiddleware
- func LoggingMiddleware(logger forge.Logger) TransportMiddleware
- func MetricsMiddleware(collector MetricsCollector, logger forge.Logger) TransportMiddleware
- func RateLimitMiddleware(maxRate int, window time.Duration, logger forge.Logger) TransportMiddleware
- func RetryMiddleware(maxRetries int, backoff time.Duration, logger forge.Logger) TransportMiddleware
- func TimeoutMiddleware(timeout time.Duration, logger forge.Logger) TransportMiddleware
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CompressedSerializer ¶
type CompressedSerializer struct {
// contains filtered or unexported fields
}
CompressedSerializer wraps a serializer with compression
func NewCompressedSerializer ¶
func NewCompressedSerializer(serializer Serializer) *CompressedSerializer
NewCompressedSerializer creates a compressed serializer
func (*CompressedSerializer) Deserialize ¶
func (cs *CompressedSerializer) Deserialize(data []byte, msg interface{}) error
Deserialize decompresses and deserializes
func (*CompressedSerializer) Serialize ¶
func (cs *CompressedSerializer) Serialize(msg interface{}) ([]byte, error)
Serialize compresses and serializes
func (*CompressedSerializer) Type ¶
func (cs *CompressedSerializer) Type() SerializerType
Type returns the underlying serializer type
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages connection pooling for transports
func NewConnectionPool ¶
func NewConnectionPool(nodeID string, config PoolConfig, logger forge.Logger) *ConnectionPool
NewConnectionPool creates a new connection pool
func (*ConnectionPool) Available ¶
func (cp *ConnectionPool) Available() int
Available returns the number of available connection slots
func (*ConnectionPool) Cleanup ¶
func (cp *ConnectionPool) Cleanup() int
Cleanup removes expired connections
func (*ConnectionPool) Close ¶
func (cp *ConnectionPool) Close(peerID string) error
Close closes a connection
func (*ConnectionPool) CloseAll ¶
func (cp *ConnectionPool) CloseAll()
CloseAll closes all connections
func (*ConnectionPool) Get ¶
func (cp *ConnectionPool) Get(ctx context.Context, peerID string, creator func() (interface{}, error)) (*PooledConnection, error)
Get gets or creates a connection for a peer
func (*ConnectionPool) GetAllConnections ¶
func (cp *ConnectionPool) GetAllConnections() map[string]*PooledConnection
GetAllConnections returns all connections
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection(peerID string) (*PooledConnection, bool)
GetConnection returns a connection for a peer
func (*ConnectionPool) GetStatistics ¶
func (cp *ConnectionPool) GetStatistics() PoolStatistics
GetStatistics returns pool statistics
func (*ConnectionPool) MarkHealthy ¶
func (cp *ConnectionPool) MarkHealthy(peerID string)
MarkHealthy marks a connection as healthy
func (*ConnectionPool) MarkUnhealthy ¶
func (cp *ConnectionPool) MarkUnhealthy(peerID string)
MarkUnhealthy marks a connection as unhealthy
func (*ConnectionPool) MonitorHealth ¶
func (cp *ConnectionPool) MonitorHealth(ctx context.Context)
MonitorHealth monitors connection health
func (*ConnectionPool) Release ¶
func (cp *ConnectionPool) Release(peerID string)
Release releases a connection back to the pool
func (*ConnectionPool) Size ¶
func (cp *ConnectionPool) Size() int
Size returns the current pool size
type GRPCTransport ¶
type GRPCTransport struct {
// contains filtered or unexported fields
}
GRPCTransport implements gRPC-based network transport
func NewGRPCTransport ¶
func NewGRPCTransport(config GRPCTransportConfig, logger forge.Logger) *GRPCTransport
NewGRPCTransport creates a new gRPC transport
func (*GRPCTransport) AddPeer ¶
func (gt *GRPCTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport
func (*GRPCTransport) GetAddress ¶
func (gt *GRPCTransport) GetAddress() string
GetAddress returns the local address
func (*GRPCTransport) Receive ¶
func (gt *GRPCTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages
func (*GRPCTransport) RemovePeer ¶
func (gt *GRPCTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport
func (*GRPCTransport) Send ¶
func (gt *GRPCTransport) Send(ctx context.Context, target string, message interface{}) error
Send sends a message to a peer
type GRPCTransportConfig ¶
type GRPCTransportConfig struct {
NodeID string
Address string
Port int
BufferSize int
MaxConnections int
ConnectionTimeout time.Duration
RequestTimeout time.Duration
KeepAliveTime time.Duration
KeepAliveTimeout time.Duration
MaxConcurrentStreams uint32
TLSConfig *tls.Config
}
GRPCTransportConfig contains gRPC transport configuration
type GobSerializer ¶
type GobSerializer struct{}
GobSerializer implements gob-based serialization
func NewGobSerializer ¶
func NewGobSerializer() *GobSerializer
NewGobSerializer creates a new gob serializer
func (*GobSerializer) Deserialize ¶
func (gs *GobSerializer) Deserialize(data []byte, msg interface{}) error
Deserialize deserializes a message using gob
func (*GobSerializer) Serialize ¶
func (gs *GobSerializer) Serialize(msg interface{}) ([]byte, error)
Serialize serializes a message using gob
func (*GobSerializer) Type ¶
func (gs *GobSerializer) Type() SerializerType
Type returns the serializer type
type HTTPTransport ¶
type HTTPTransport struct {
// contains filtered or unexported fields
}
HTTPTransport implements HTTP-based network transport
func NewHTTPTransport ¶
func NewHTTPTransport(config HTTPTransportConfig, logger forge.Logger) *HTTPTransport
NewHTTPTransport creates a new HTTP transport
func (*HTTPTransport) AddPeer ¶
func (ht *HTTPTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport
func (*HTTPTransport) GetAddress ¶
func (ht *HTTPTransport) GetAddress() string
GetAddress returns the local address
func (*HTTPTransport) Receive ¶
func (ht *HTTPTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages
func (*HTTPTransport) RemovePeer ¶
func (ht *HTTPTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport
func (*HTTPTransport) Send ¶
func (ht *HTTPTransport) Send(ctx context.Context, target string, message interface{}) error
Send sends a message to a peer
type HTTPTransportConfig ¶
type HTTPTransportConfig struct {
NodeID string
Address string
Port int
BufferSize int
RequestTimeout time.Duration
IdleConnTimeout time.Duration
MaxIdleConns int
MaxIdleConnsPerHost int
}
HTTPTransportConfig contains HTTP transport configuration
type JSONSerializer ¶
type JSONSerializer struct{}
JSONSerializer implements JSON-based serialization
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSON serializer
func (*JSONSerializer) Deserialize ¶
func (js *JSONSerializer) Deserialize(data []byte, msg interface{}) error
Deserialize deserializes a message using JSON
func (*JSONSerializer) Serialize ¶
func (js *JSONSerializer) Serialize(msg interface{}) ([]byte, error)
Serialize serializes a message using JSON
func (*JSONSerializer) Type ¶
func (js *JSONSerializer) Type() SerializerType
Type returns the serializer type
type LocalTransport ¶
type LocalTransport struct {
// contains filtered or unexported fields
}
LocalTransport implements an in-memory transport for testing
func NewLocalTransport ¶
func NewLocalTransport(config LocalTransportConfig, logger forge.Logger) *LocalTransport
NewLocalTransport creates a new local transport
func (*LocalTransport) AddPeer ¶
func (t *LocalTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport
func (*LocalTransport) Connect ¶
func (t *LocalTransport) Connect(peer *LocalTransport)
Connect connects this transport to another local transport This is used for testing to establish bidirectional communication
func (*LocalTransport) Disconnect ¶
func (t *LocalTransport) Disconnect(peerID string)
Disconnect disconnects this transport from a peer
func (*LocalTransport) DisconnectAll ¶
func (t *LocalTransport) DisconnectAll()
DisconnectAll disconnects from all peers
func (*LocalTransport) GetAddress ¶
func (t *LocalTransport) GetAddress() string
GetAddress returns the local address
func (*LocalTransport) GetPeers ¶
func (t *LocalTransport) GetPeers() []string
GetPeers returns a list of connected peer IDs
func (*LocalTransport) Receive ¶
func (t *LocalTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages
func (*LocalTransport) RemovePeer ¶
func (t *LocalTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport
func (*LocalTransport) Send ¶
func (t *LocalTransport) Send(ctx context.Context, target string, message interface{}) error
Send sends a message to a peer
func (*LocalTransport) SetLatency ¶
func (t *LocalTransport) SetLatency(latency time.Duration)
SetLatency sets artificial latency for testing
type LocalTransportConfig ¶
LocalTransportConfig contains configuration for local transport
type MessageCodec ¶
type MessageCodec struct {
// contains filtered or unexported fields
}
MessageCodec provides encoding/decoding with envelope
func NewMessageCodec ¶
func NewMessageCodec(serializerType SerializerType) *MessageCodec
NewMessageCodec creates a new message codec
func (*MessageCodec) Decode ¶
func (mc *MessageCodec) Decode(data []byte) (*internal.Message, error)
Decode decodes an envelope into a message
func (*MessageCodec) Encode ¶
func (mc *MessageCodec) Encode(msg internal.Message) ([]byte, error)
Encode encodes a message into an envelope
func (*MessageCodec) GetSerializer ¶
func (mc *MessageCodec) GetSerializer() Serializer
GetSerializer returns the underlying serializer
type MessageEnvelope ¶
type MessageEnvelope struct {
Type internal.MessageType
From string
To string
Data []byte
Timestamp int64
SerializerType SerializerType
}
MessageEnvelope wraps a message with metadata
type MetricsCollector ¶
type MetricsCollector interface {
RecordMessage(msg *internal.Message, duration time.Duration, success bool)
}
MetricsCollector collects transport metrics
type MiddlewareChain ¶
type MiddlewareChain struct {
// contains filtered or unexported fields
}
MiddlewareChain chains multiple middleware
func NewMiddlewareChain ¶
func NewMiddlewareChain(logger forge.Logger) *MiddlewareChain
NewMiddlewareChain creates a new middleware chain
func (*MiddlewareChain) Execute ¶
func (mc *MiddlewareChain) Execute(handler TransportHandler) TransportHandler
Execute executes the middleware chain
func (*MiddlewareChain) Use ¶
func (mc *MiddlewareChain) Use(middleware TransportMiddleware)
Use adds middleware to the chain
type PoolConfig ¶
type PoolConfig struct {
MaxConnections int
MaxIdleTime time.Duration
MaxConnectionAge time.Duration
HealthCheckInterval time.Duration
ConnectTimeout time.Duration
}
PoolConfig contains pool configuration
type PoolStatistics ¶
type PoolStatistics struct {
TotalConnections int64
ActiveConnections int64
IdleConnections int64
CreatedConnections int64
ClosedConnections int64
FailedConnections int64
TotalUses int64
}
PoolStatistics contains pool statistics
type PooledConnection ¶
type PooledConnection struct {
PeerID string
Connection interface{}
Created time.Time
LastUsed time.Time
UseCount int64
InUse bool
Healthy bool
// contains filtered or unexported fields
}
PooledConnection represents a pooled connection
type ProtobufSerializer ¶
type ProtobufSerializer struct{}
ProtobufSerializer implements Protocol Buffers serialization
func NewProtobufSerializer ¶
func NewProtobufSerializer() *ProtobufSerializer
NewProtobufSerializer creates a new protobuf serializer
func (*ProtobufSerializer) Deserialize ¶
func (ps *ProtobufSerializer) Deserialize(data []byte, msg interface{}) error
Deserialize deserializes a message using protobuf
func (*ProtobufSerializer) Serialize ¶
func (ps *ProtobufSerializer) Serialize(msg interface{}) ([]byte, error)
Serialize serializes a message using protobuf
func (*ProtobufSerializer) Type ¶
func (ps *ProtobufSerializer) Type() SerializerType
Type returns the serializer type
type Serializer ¶
type Serializer interface {
Serialize(msg interface{}) ([]byte, error)
Deserialize(data []byte, msg interface{}) error
Type() SerializerType
}
Serializer defines the interface for message serialization
type SerializerType ¶
type SerializerType string
SerializerType represents the type of serializer
const ( // SerializerGob uses Go's gob encoding SerializerGob SerializerType = "gob" // SerializerJSON uses JSON encoding SerializerJSON SerializerType = "json" // SerializerProtobuf uses Protocol Buffers SerializerProtobuf SerializerType = "protobuf" )
type TCPTransport ¶
type TCPTransport struct {
// contains filtered or unexported fields
}
TCPTransport implements TCP-based network transport
func NewTCPTransport ¶
func NewTCPTransport(config TCPTransportConfig, logger forge.Logger) *TCPTransport
NewTCPTransport creates a new TCP transport
func (*TCPTransport) AddPeer ¶
func (tt *TCPTransport) AddPeer(nodeID, address string, port int) error
AddPeer adds a peer to the transport
func (*TCPTransport) GetAddress ¶
func (tt *TCPTransport) GetAddress() string
GetAddress returns the local address
func (*TCPTransport) Receive ¶
func (tt *TCPTransport) Receive() <-chan internal.Message
Receive returns a channel for receiving messages
func (*TCPTransport) RemovePeer ¶
func (tt *TCPTransport) RemovePeer(nodeID string) error
RemovePeer removes a peer from the transport
func (*TCPTransport) Send ¶
func (tt *TCPTransport) Send(ctx context.Context, target string, message interface{}) error
Send sends a message to a peer
type TCPTransportConfig ¶
type TCPTransportConfig struct {
NodeID string
Address string
Port int
BufferSize int
ConnectionTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
KeepAlive time.Duration
MaxRetries int
}
TCPTransportConfig contains TCP transport configuration
type TLSConfig ¶
type TLSConfig struct {
// Enable TLS
Enabled bool
// Certificate and key paths
CertFile string
KeyFile string
CAFile string
// mTLS configuration
RequireClientCert bool
ClientCAFile string
// TLS version
MinVersion uint16
MaxVersion uint16
// Cipher suites
CipherSuites []uint16
// Server name for SNI
ServerName string
// Skip verification (for testing only)
InsecureSkipVerify bool
}
TLSConfig contains TLS configuration for secure transport
func DefaultTLSConfig ¶
func DefaultTLSConfig() TLSConfig
DefaultTLSConfig returns a secure default TLS configuration
func MTLSConfig ¶
MTLSConfig returns a default mTLS configuration
type TLSManager ¶
type TLSManager struct {
// contains filtered or unexported fields
}
TLSManager manages TLS configuration and certificate loading
func NewTLSManager ¶
func NewTLSManager(config TLSConfig, logger forge.Logger) (*TLSManager, error)
NewTLSManager creates a new TLS manager
func (*TLSManager) GetCertificateInfo ¶
func (tm *TLSManager) GetCertificateInfo() map[string]interface{}
GetCertificateInfo returns information about the loaded certificate
func (*TLSManager) GetTLSConfig ¶
func (tm *TLSManager) GetTLSConfig() *tls.Config
GetTLSConfig returns the TLS configuration
func (*TLSManager) IsEnabled ¶
func (tm *TLSManager) IsEnabled() bool
IsEnabled returns whether TLS is enabled
func (*TLSManager) IsMTLSEnabled ¶
func (tm *TLSManager) IsMTLSEnabled() bool
IsMTLSEnabled returns whether mTLS is enabled
func (*TLSManager) ReloadCertificates ¶
func (tm *TLSManager) ReloadCertificates() error
ReloadCertificates reloads certificates from disk
func (*TLSManager) ShouldReload ¶
func (tm *TLSManager) ShouldReload() bool
ShouldReload checks if certificates should be reloaded
func (*TLSManager) VerifyPeerCertificate ¶
func (tm *TLSManager) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
VerifyPeerCertificate verifies a peer certificate
type TransportHandler ¶
TransportHandler handles transport operations
type TransportMiddleware ¶
type TransportMiddleware func(next TransportHandler) TransportHandler
TransportMiddleware represents transport-level middleware
func AuthenticationMiddleware ¶
func AuthenticationMiddleware(verifyFunc func(string) bool, logger forge.Logger) TransportMiddleware
AuthenticationMiddleware creates authentication middleware
func CircuitBreakerMiddleware ¶
func CircuitBreakerMiddleware(threshold int, resetTimeout time.Duration, logger forge.Logger) TransportMiddleware
CircuitBreakerMiddleware creates circuit breaker middleware
func CompressionMiddleware ¶
func CompressionMiddleware(minSize int, logger forge.Logger) TransportMiddleware
CompressionMiddleware creates compression middleware
func DeduplicationMiddleware ¶
func DeduplicationMiddleware(window time.Duration, logger forge.Logger) TransportMiddleware
DeduplicationMiddleware creates message deduplication middleware
func LoggingMiddleware ¶
func LoggingMiddleware(logger forge.Logger) TransportMiddleware
LoggingMiddleware creates logging middleware
func MetricsMiddleware ¶
func MetricsMiddleware(collector MetricsCollector, logger forge.Logger) TransportMiddleware
MetricsMiddleware creates metrics collection middleware
func RateLimitMiddleware ¶
func RateLimitMiddleware(maxRate int, window time.Duration, logger forge.Logger) TransportMiddleware
RateLimitMiddleware creates rate limiting middleware
func RetryMiddleware ¶
func RetryMiddleware(maxRetries int, backoff time.Duration, logger forge.Logger) TransportMiddleware
RetryMiddleware creates retry middleware
func TimeoutMiddleware ¶
func TimeoutMiddleware(timeout time.Duration, logger forge.Logger) TransportMiddleware
TimeoutMiddleware creates timeout middleware