transport

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompressedSerializer

type CompressedSerializer struct {
	// contains filtered or unexported fields
}

CompressedSerializer wraps a serializer with compression

func NewCompressedSerializer

func NewCompressedSerializer(serializer Serializer) *CompressedSerializer

NewCompressedSerializer creates a compressed serializer

func (*CompressedSerializer) Deserialize

func (cs *CompressedSerializer) Deserialize(data []byte, msg interface{}) error

Deserialize decompresses and deserializes

func (*CompressedSerializer) Serialize

func (cs *CompressedSerializer) Serialize(msg interface{}) ([]byte, error)

Serialize compresses and serializes

func (*CompressedSerializer) Type

Type returns the underlying serializer type

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages connection pooling for transports

func NewConnectionPool

func NewConnectionPool(nodeID string, config PoolConfig, logger forge.Logger) *ConnectionPool

NewConnectionPool creates a new connection pool

func (*ConnectionPool) Available

func (cp *ConnectionPool) Available() int

Available returns the number of available connection slots

func (*ConnectionPool) Cleanup

func (cp *ConnectionPool) Cleanup() int

Cleanup removes expired connections

func (*ConnectionPool) Close

func (cp *ConnectionPool) Close(peerID string) error

Close closes a connection

func (*ConnectionPool) CloseAll

func (cp *ConnectionPool) CloseAll()

CloseAll closes all connections

func (*ConnectionPool) Get

func (cp *ConnectionPool) Get(ctx context.Context, peerID string, creator func() (interface{}, error)) (*PooledConnection, error)

Get gets or creates a connection for a peer

func (*ConnectionPool) GetAllConnections

func (cp *ConnectionPool) GetAllConnections() map[string]*PooledConnection

GetAllConnections returns all connections

func (*ConnectionPool) GetConnection

func (cp *ConnectionPool) GetConnection(peerID string) (*PooledConnection, bool)

GetConnection returns a connection for a peer

func (*ConnectionPool) GetStatistics

func (cp *ConnectionPool) GetStatistics() PoolStatistics

GetStatistics returns pool statistics

func (*ConnectionPool) MarkHealthy

func (cp *ConnectionPool) MarkHealthy(peerID string)

MarkHealthy marks a connection as healthy

func (*ConnectionPool) MarkUnhealthy

func (cp *ConnectionPool) MarkUnhealthy(peerID string)

MarkUnhealthy marks a connection as unhealthy

func (*ConnectionPool) MonitorHealth

func (cp *ConnectionPool) MonitorHealth(ctx context.Context)

MonitorHealth monitors connection health

func (*ConnectionPool) Release

func (cp *ConnectionPool) Release(peerID string)

Release releases a connection back to the pool

func (*ConnectionPool) Size

func (cp *ConnectionPool) Size() int

Size returns the current pool size

type GRPCTransport

type GRPCTransport struct {
	// contains filtered or unexported fields
}

GRPCTransport implements gRPC-based network transport

func NewGRPCTransport

func NewGRPCTransport(config GRPCTransportConfig, logger forge.Logger) *GRPCTransport

NewGRPCTransport creates a new gRPC transport

func (*GRPCTransport) AddPeer

func (gt *GRPCTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport

func (*GRPCTransport) GetAddress

func (gt *GRPCTransport) GetAddress() string

GetAddress returns the local address

func (*GRPCTransport) Receive

func (gt *GRPCTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages

func (*GRPCTransport) RemovePeer

func (gt *GRPCTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport

func (*GRPCTransport) Send

func (gt *GRPCTransport) Send(ctx context.Context, target string, message interface{}) error

Send sends a message to a peer

func (*GRPCTransport) Start

func (gt *GRPCTransport) Start(ctx context.Context) error

Start starts the gRPC transport

func (*GRPCTransport) Stop

func (gt *GRPCTransport) Stop(ctx context.Context) error

Stop stops the gRPC transport

type GRPCTransportConfig

type GRPCTransportConfig struct {
	NodeID               string
	Address              string
	Port                 int
	BufferSize           int
	MaxConnections       int
	ConnectionTimeout    time.Duration
	RequestTimeout       time.Duration
	KeepAliveTime        time.Duration
	KeepAliveTimeout     time.Duration
	MaxConcurrentStreams uint32
	TLSConfig            *tls.Config
}

GRPCTransportConfig contains gRPC transport configuration

type GobSerializer

type GobSerializer struct{}

GobSerializer implements gob-based serialization

func NewGobSerializer

func NewGobSerializer() *GobSerializer

NewGobSerializer creates a new gob serializer

func (*GobSerializer) Deserialize

func (gs *GobSerializer) Deserialize(data []byte, msg interface{}) error

Deserialize deserializes a message using gob

func (*GobSerializer) Serialize

func (gs *GobSerializer) Serialize(msg interface{}) ([]byte, error)

Serialize serializes a message using gob

func (*GobSerializer) Type

func (gs *GobSerializer) Type() SerializerType

Type returns the serializer type

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport implements HTTP-based network transport

func NewHTTPTransport

func NewHTTPTransport(config HTTPTransportConfig, logger forge.Logger) *HTTPTransport

NewHTTPTransport creates a new HTTP transport

func (*HTTPTransport) AddPeer

func (ht *HTTPTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport

func (*HTTPTransport) GetAddress

func (ht *HTTPTransport) GetAddress() string

GetAddress returns the local address

func (*HTTPTransport) Receive

func (ht *HTTPTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages

func (*HTTPTransport) RemovePeer

func (ht *HTTPTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport

func (*HTTPTransport) Send

func (ht *HTTPTransport) Send(ctx context.Context, target string, message interface{}) error

Send sends a message to a peer

func (*HTTPTransport) Start

func (ht *HTTPTransport) Start(ctx context.Context) error

Start starts the HTTP transport

func (*HTTPTransport) Stop

func (ht *HTTPTransport) Stop(ctx context.Context) error

Stop stops the HTTP transport

type HTTPTransportConfig

type HTTPTransportConfig struct {
	NodeID              string
	Address             string
	Port                int
	BufferSize          int
	RequestTimeout      time.Duration
	IdleConnTimeout     time.Duration
	MaxIdleConns        int
	MaxIdleConnsPerHost int
}

HTTPTransportConfig contains HTTP transport configuration

type JSONSerializer

type JSONSerializer struct{}

JSONSerializer implements JSON-based serialization

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new JSON serializer

func (*JSONSerializer) Deserialize

func (js *JSONSerializer) Deserialize(data []byte, msg interface{}) error

Deserialize deserializes a message using JSON

func (*JSONSerializer) Serialize

func (js *JSONSerializer) Serialize(msg interface{}) ([]byte, error)

Serialize serializes a message using JSON

func (*JSONSerializer) Type

func (js *JSONSerializer) Type() SerializerType

Type returns the serializer type

type LocalTransport

type LocalTransport struct {
	// contains filtered or unexported fields
}

LocalTransport implements an in-memory transport for testing

func NewLocalTransport

func NewLocalTransport(config LocalTransportConfig, logger forge.Logger) *LocalTransport

NewLocalTransport creates a new local transport

func (*LocalTransport) AddPeer

func (t *LocalTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport

func (*LocalTransport) Connect

func (t *LocalTransport) Connect(peer *LocalTransport)

Connect connects this transport to another local transport This is used for testing to establish bidirectional communication

func (*LocalTransport) Disconnect

func (t *LocalTransport) Disconnect(peerID string)

Disconnect disconnects this transport from a peer

func (*LocalTransport) DisconnectAll

func (t *LocalTransport) DisconnectAll()

DisconnectAll disconnects from all peers

func (*LocalTransport) GetAddress

func (t *LocalTransport) GetAddress() string

GetAddress returns the local address

func (*LocalTransport) GetPeers

func (t *LocalTransport) GetPeers() []string

GetPeers returns a list of connected peer IDs

func (*LocalTransport) Receive

func (t *LocalTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages

func (*LocalTransport) RemovePeer

func (t *LocalTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport

func (*LocalTransport) Send

func (t *LocalTransport) Send(ctx context.Context, target string, message interface{}) error

Send sends a message to a peer

func (*LocalTransport) SetLatency

func (t *LocalTransport) SetLatency(latency time.Duration)

SetLatency sets artificial latency for testing

func (*LocalTransport) Start

func (t *LocalTransport) Start(ctx context.Context) error

Start starts the transport

func (*LocalTransport) Stop

func (t *LocalTransport) Stop(ctx context.Context) error

Stop stops the transport

type LocalTransportConfig

type LocalTransportConfig struct {
	NodeID     string
	Address    string
	BufferSize int
}

LocalTransportConfig contains configuration for local transport

type MessageCodec

type MessageCodec struct {
	// contains filtered or unexported fields
}

MessageCodec provides encoding/decoding with envelope

func NewMessageCodec

func NewMessageCodec(serializerType SerializerType) *MessageCodec

NewMessageCodec creates a new message codec

func (*MessageCodec) Decode

func (mc *MessageCodec) Decode(data []byte) (*internal.Message, error)

Decode decodes an envelope into a message

func (*MessageCodec) Encode

func (mc *MessageCodec) Encode(msg internal.Message) ([]byte, error)

Encode encodes a message into an envelope

func (*MessageCodec) GetSerializer

func (mc *MessageCodec) GetSerializer() Serializer

GetSerializer returns the underlying serializer

type MessageEnvelope

type MessageEnvelope struct {
	Type           internal.MessageType
	From           string
	To             string
	Data           []byte
	Timestamp      int64
	SerializerType SerializerType
}

MessageEnvelope wraps a message with metadata

type MetricsCollector

type MetricsCollector interface {
	RecordMessage(msg *internal.Message, duration time.Duration, success bool)
}

MetricsCollector collects transport metrics

type MiddlewareChain

type MiddlewareChain struct {
	// contains filtered or unexported fields
}

MiddlewareChain chains multiple middleware

func NewMiddlewareChain

func NewMiddlewareChain(logger forge.Logger) *MiddlewareChain

NewMiddlewareChain creates a new middleware chain

func (*MiddlewareChain) Execute

func (mc *MiddlewareChain) Execute(handler TransportHandler) TransportHandler

Execute executes the middleware chain

func (*MiddlewareChain) Use

func (mc *MiddlewareChain) Use(middleware TransportMiddleware)

Use adds middleware to the chain

type PoolConfig

type PoolConfig struct {
	MaxConnections      int
	MaxIdleTime         time.Duration
	MaxConnectionAge    time.Duration
	HealthCheckInterval time.Duration
	ConnectTimeout      time.Duration
}

PoolConfig contains pool configuration

type PoolStatistics

type PoolStatistics struct {
	TotalConnections   int64
	ActiveConnections  int64
	IdleConnections    int64
	CreatedConnections int64
	ClosedConnections  int64
	FailedConnections  int64
	TotalUses          int64
}

PoolStatistics contains pool statistics

type PooledConnection

type PooledConnection struct {
	PeerID     string
	Connection interface{}
	Created    time.Time
	LastUsed   time.Time
	UseCount   int64
	InUse      bool
	Healthy    bool
	// contains filtered or unexported fields
}

PooledConnection represents a pooled connection

type ProtobufSerializer

type ProtobufSerializer struct{}

ProtobufSerializer implements Protocol Buffers serialization

func NewProtobufSerializer

func NewProtobufSerializer() *ProtobufSerializer

NewProtobufSerializer creates a new protobuf serializer

func (*ProtobufSerializer) Deserialize

func (ps *ProtobufSerializer) Deserialize(data []byte, msg interface{}) error

Deserialize deserializes a message using protobuf

func (*ProtobufSerializer) Serialize

func (ps *ProtobufSerializer) Serialize(msg interface{}) ([]byte, error)

Serialize serializes a message using protobuf

func (*ProtobufSerializer) Type

Type returns the serializer type

type Serializer

type Serializer interface {
	Serialize(msg interface{}) ([]byte, error)
	Deserialize(data []byte, msg interface{}) error
	Type() SerializerType
}

Serializer defines the interface for message serialization

type SerializerType

type SerializerType string

SerializerType represents the type of serializer

const (
	// SerializerGob uses Go's gob encoding
	SerializerGob SerializerType = "gob"
	// SerializerJSON uses JSON encoding
	SerializerJSON SerializerType = "json"
	// SerializerProtobuf uses Protocol Buffers
	SerializerProtobuf SerializerType = "protobuf"
)

type TCPTransport

type TCPTransport struct {
	// contains filtered or unexported fields
}

TCPTransport implements TCP-based network transport

func NewTCPTransport

func NewTCPTransport(config TCPTransportConfig, logger forge.Logger) *TCPTransport

NewTCPTransport creates a new TCP transport

func (*TCPTransport) AddPeer

func (tt *TCPTransport) AddPeer(nodeID, address string, port int) error

AddPeer adds a peer to the transport

func (*TCPTransport) GetAddress

func (tt *TCPTransport) GetAddress() string

GetAddress returns the local address

func (*TCPTransport) Receive

func (tt *TCPTransport) Receive() <-chan internal.Message

Receive returns a channel for receiving messages

func (*TCPTransport) RemovePeer

func (tt *TCPTransport) RemovePeer(nodeID string) error

RemovePeer removes a peer from the transport

func (*TCPTransport) Send

func (tt *TCPTransport) Send(ctx context.Context, target string, message interface{}) error

Send sends a message to a peer

func (*TCPTransport) Start

func (tt *TCPTransport) Start(ctx context.Context) error

Start starts the TCP transport

func (*TCPTransport) Stop

func (tt *TCPTransport) Stop(ctx context.Context) error

Stop stops the TCP transport

type TCPTransportConfig

type TCPTransportConfig struct {
	NodeID            string
	Address           string
	Port              int
	BufferSize        int
	ConnectionTimeout time.Duration
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	KeepAlive         time.Duration
	MaxRetries        int
}

TCPTransportConfig contains TCP transport configuration

type TLSConfig

type TLSConfig struct {
	// Enable TLS
	Enabled bool

	// Certificate and key paths
	CertFile string
	KeyFile  string
	CAFile   string

	// mTLS configuration
	RequireClientCert bool
	ClientCAFile      string

	// TLS version
	MinVersion uint16
	MaxVersion uint16

	// Cipher suites
	CipherSuites []uint16

	// Server name for SNI
	ServerName string

	// Skip verification (for testing only)
	InsecureSkipVerify bool
}

TLSConfig contains TLS configuration for secure transport

func DefaultTLSConfig

func DefaultTLSConfig() TLSConfig

DefaultTLSConfig returns a secure default TLS configuration

func MTLSConfig

func MTLSConfig(certFile, keyFile, caFile string) TLSConfig

MTLSConfig returns a default mTLS configuration

type TLSManager

type TLSManager struct {
	// contains filtered or unexported fields
}

TLSManager manages TLS configuration and certificate loading

func NewTLSManager

func NewTLSManager(config TLSConfig, logger forge.Logger) (*TLSManager, error)

NewTLSManager creates a new TLS manager

func (*TLSManager) GetCertificateInfo

func (tm *TLSManager) GetCertificateInfo() map[string]interface{}

GetCertificateInfo returns information about the loaded certificate

func (*TLSManager) GetTLSConfig

func (tm *TLSManager) GetTLSConfig() *tls.Config

GetTLSConfig returns the TLS configuration

func (*TLSManager) IsEnabled

func (tm *TLSManager) IsEnabled() bool

IsEnabled returns whether TLS is enabled

func (*TLSManager) IsMTLSEnabled

func (tm *TLSManager) IsMTLSEnabled() bool

IsMTLSEnabled returns whether mTLS is enabled

func (*TLSManager) ReloadCertificates

func (tm *TLSManager) ReloadCertificates() error

ReloadCertificates reloads certificates from disk

func (*TLSManager) ShouldReload

func (tm *TLSManager) ShouldReload() bool

ShouldReload checks if certificates should be reloaded

func (*TLSManager) VerifyPeerCertificate

func (tm *TLSManager) VerifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error

VerifyPeerCertificate verifies a peer certificate

type TransportHandler

type TransportHandler func(ctx context.Context, msg *internal.Message) error

TransportHandler handles transport operations

type TransportMiddleware

type TransportMiddleware func(next TransportHandler) TransportHandler

TransportMiddleware represents transport-level middleware

func AuthenticationMiddleware

func AuthenticationMiddleware(verifyFunc func(string) bool, logger forge.Logger) TransportMiddleware

AuthenticationMiddleware creates authentication middleware

func CircuitBreakerMiddleware

func CircuitBreakerMiddleware(threshold int, resetTimeout time.Duration, logger forge.Logger) TransportMiddleware

CircuitBreakerMiddleware creates circuit breaker middleware

func CompressionMiddleware

func CompressionMiddleware(minSize int, logger forge.Logger) TransportMiddleware

CompressionMiddleware creates compression middleware

func DeduplicationMiddleware

func DeduplicationMiddleware(window time.Duration, logger forge.Logger) TransportMiddleware

DeduplicationMiddleware creates message deduplication middleware

func LoggingMiddleware

func LoggingMiddleware(logger forge.Logger) TransportMiddleware

LoggingMiddleware creates logging middleware

func MetricsMiddleware

func MetricsMiddleware(collector MetricsCollector, logger forge.Logger) TransportMiddleware

MetricsMiddleware creates metrics collection middleware

func RateLimitMiddleware

func RateLimitMiddleware(maxRate int, window time.Duration, logger forge.Logger) TransportMiddleware

RateLimitMiddleware creates rate limiting middleware

func RetryMiddleware

func RetryMiddleware(maxRetries int, backoff time.Duration, logger forge.Logger) TransportMiddleware

RetryMiddleware creates retry middleware

func TimeoutMiddleware

func TimeoutMiddleware(timeout time.Duration, logger forge.Logger) TransportMiddleware

TimeoutMiddleware creates timeout middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL