Documentation
¶
Overview ¶
Package tunnel provides the core tunneling functionality for Wormhole.
The tunnel package implements a multiplexed connection protocol that allows multiple virtual streams to be established over a single TCP connection. This is similar to the yamux protocol but with a custom binary frame format.
Frame Protocol ¶
The frame protocol uses a simple binary format:
+----------+----------+------------+----------+------------------+ | Version | Type | StreamID | Length | Payload | | 1 byte | 1 byte | 4 bytes | 4 bytes | N bytes | +----------+----------+------------+----------+------------------+
Frame types:
- DATA (0x01): Regular data frame
- WINDOW_UPDATE (0x02): Flow control window update
- PING (0x03): Keep-alive ping
- CLOSE (0x04): Stream close notification
- HANDSHAKE (0x05): Initial handshake frame
- ERROR (0x06): Error notification
Usage ¶
Server side:
mux, err := tunnel.Server(conn, config)
if err != nil {
return err
}
for {
stream, err := mux.AcceptStream()
if err != nil {
return err
}
go handleStream(stream)
}
Client side:
mux, err := tunnel.Client(conn, config)
if err != nil {
return err
}
stream, err := mux.OpenStream(ctx)
if err != nil {
return err
}
// Use stream as io.ReadWriteCloser
Index ¶
- Constants
- Variables
- func EncodeFrame(w io.Writer, f *Frame) error
- func ParseError(f *Frame) (code uint32, message string, err error)
- func ParsePing(f *Frame) (uint32, error)
- func ParsePong(f *Frame) (uint32, error)
- func ParseWindowUpdate(f *Frame) (uint32, error)
- type DialFunc
- type Frame
- func DecodeFrame(r io.Reader) (*Frame, error)
- func NewCloseFrame(streamID uint32) *Frame
- func NewDataFrame(streamID uint32, payload []byte) *Frame
- func NewErrorFrame(streamID uint32, code uint32, message string) *Frame
- func NewFrame(frameType FrameType, streamID uint32, payload []byte) *Frame
- func NewHandshakeFrame(payload []byte) *Frame
- func NewPingFrame(pingID uint32) *Frame
- func NewPongFrame(pingID uint32) *Frame
- func NewWindowUpdateFrame(streamID uint32, increment uint32) *Frame
- type FrameCodec
- type FrameCodecOption
- type FrameType
- type Mux
- func (m *Mux) AcceptStream() (*Stream, error)
- func (m *Mux) AcceptStreamContext(ctx context.Context) (*Stream, error)
- func (m *Mux) Close() error
- func (m *Mux) IsClosed() bool
- func (m *Mux) LocalAddr() net.Addr
- func (m *Mux) NumStreams() int
- func (m *Mux) OpenStream() (*Stream, error)
- func (m *Mux) OpenStreamContext(ctx context.Context) (*Stream, error)
- func (m *Mux) RemoteAddr() net.Addr
- type MuxConfig
- type Pool
- func (p *Pool) Close() error
- func (p *Pool) Get(ctx context.Context) (*Mux, error)
- func (p *Pool) IsClosed() bool
- func (p *Pool) Len() int
- func (p *Pool) OpenStream(ctx context.Context) (*Stream, *Mux, error)
- func (p *Pool) Put(mux *Mux)
- func (p *Pool) Stats() PoolStats
- func (p *Pool) Warmup(ctx context.Context) error
- type PoolConfig
- type PoolStats
- type RemoteError
- type Stream
- func (s *Stream) Close() error
- func (s *Stream) Done() <-chan struct{}
- func (s *Stream) ID() uint32
- func (s *Stream) IsClosed() bool
- func (s *Stream) Read(p []byte) (int, error)
- func (s *Stream) SetDeadline(t time.Time) error
- func (s *Stream) SetReadDeadline(t time.Time) error
- func (s *Stream) SetWriteDeadline(t time.Time) error
- func (s *Stream) Write(p []byte) (int, error)
- type StreamConfig
Constants ¶
const ( // FrameVersion is the current protocol version. FrameVersion uint8 = 1 // FrameHeaderSize is the size of the frame header in bytes: // [Version:1][Type:1][StreamID:4][Length:4] = 10 bytes. FrameHeaderSize = 10 // MaxFramePayloadSize is the maximum payload size (16MB). MaxFramePayloadSize = 16 * 1024 * 1024 // DefaultFramePayloadSize is the default payload size (32KB). DefaultFramePayloadSize = 32 * 1024 )
Frame protocol constants.
Variables ¶
var ( ErrInvalidVersion = errors.New("tunnel: invalid frame version") ErrInvalidFrameType = errors.New("tunnel: invalid frame type") ErrPayloadTooLarge = errors.New("tunnel: payload too large") ErrFrameTooShort = errors.New("tunnel: frame too short") ErrInvalidStreamID = errors.New("tunnel: invalid stream ID") ErrUnexpectedEOF = errors.New("tunnel: unexpected EOF") ErrChecksumMismatch = errors.New("tunnel: checksum mismatch") ErrConnectionClosed = errors.New("tunnel: connection closed") ErrStreamClosed = errors.New("tunnel: stream closed") ErrStreamNotFound = errors.New("tunnel: stream not found") ErrStreamAlreadyExist = errors.New("tunnel: stream already exists") ErrTimeout = errors.New("tunnel: operation timeout") ErrMuxClosed = errors.New("tunnel: mux closed") )
Frame errors.
var DefaultCodec = NewFrameCodec()
DefaultCodec is the default frame codec.
Functions ¶
func EncodeFrame ¶
EncodeFrame is a convenience function that uses the default codec.
func ParseError ¶
ParseError extracts the error code and message from an error frame.
func ParseWindowUpdate ¶
ParseWindowUpdate extracts the increment from a window update frame.
Types ¶
type Frame ¶
type Frame struct {
// Version is the protocol version (always 1 for now).
Version uint8
// Type indicates the frame type.
Type FrameType
// StreamID identifies the stream this frame belongs to.
// StreamID 0 is reserved for connection-level frames.
StreamID uint32
// Payload contains the frame data.
// The length is stored in the header but derived from len(Payload).
Payload []byte
}
Frame represents a single protocol frame.
Frame format:
+----------+----------+------------+----------+------------------+ | Version | Type | StreamID | Length | Payload | | 1 byte | 1 byte | 4 bytes | 4 bytes | N bytes | +----------+----------+------------+----------+------------------+
func DecodeFrame ¶
DecodeFrame is a convenience function that uses the default codec.
func NewCloseFrame ¶
NewCloseFrame creates a new close frame.
func NewDataFrame ¶
NewDataFrame creates a new data frame.
func NewErrorFrame ¶
NewErrorFrame creates a new error frame.
func NewHandshakeFrame ¶
NewHandshakeFrame creates a new handshake frame.
func NewPongFrame ¶
NewPongFrame creates a new pong (ping response) frame.
func NewWindowUpdateFrame ¶
NewWindowUpdateFrame creates a new window update frame.
func (*Frame) IsConnectionLevel ¶
IsConnectionLevel returns true if this is a connection-level frame.
type FrameCodec ¶
type FrameCodec struct {
// contains filtered or unexported fields
}
FrameCodec handles encoding and decoding of frames. It is safe for concurrent use.
func NewFrameCodec ¶
func NewFrameCodec(opts ...FrameCodecOption) *FrameCodec
NewFrameCodec creates a new frame codec with the given options.
type FrameCodecOption ¶
type FrameCodecOption func(*FrameCodec)
FrameCodecOption is a function that configures a FrameCodec.
func WithMaxPayloadSize ¶
func WithMaxPayloadSize(size uint32) FrameCodecOption
WithMaxPayloadSize sets the maximum payload size.
type FrameType ¶
type FrameType uint8
FrameType represents the type of a frame.
const ( // FrameData carries stream data. FrameData FrameType = 0x01 // FrameWindowUpdate updates the flow control window. FrameWindowUpdate FrameType = 0x02 // FramePing is used for keep-alive. FramePing FrameType = 0x03 // FramePong is the response to a ping frame. FramePong FrameType = 0x04 // FrameClose signals stream closure. FrameClose FrameType = 0x05 // FrameHandshake is used for initial handshake. FrameHandshake FrameType = 0x06 // FrameError signals an error condition. FrameError FrameType = 0x07 )
Frame types.
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux is a multiplexer that manages multiple streams over a single connection.
func (*Mux) AcceptStream ¶
AcceptStream waits for and returns the next incoming stream.
func (*Mux) AcceptStreamContext ¶
AcceptStreamContext waits for the next incoming stream with context.
func (*Mux) NumStreams ¶
NumStreams returns the number of active streams.
func (*Mux) OpenStreamContext ¶
OpenStreamContext opens a new stream with context.
func (*Mux) RemoteAddr ¶
RemoteAddr returns the remote network address.
type MuxConfig ¶
type MuxConfig struct {
// AcceptBacklog is the maximum number of pending streams.
AcceptBacklog int
// StreamConfig is the configuration for new streams.
StreamConfig StreamConfig
// KeepAliveInterval is the interval between keep-alive pings.
// Set to 0 to disable keep-alive.
KeepAliveInterval time.Duration
// KeepAliveTimeout is the timeout for keep-alive pings.
KeepAliveTimeout time.Duration
// MaxFrameSize is the maximum frame size.
MaxFrameSize uint32
// EnableFlowControl enables flow control.
EnableFlowControl bool
}
MuxConfig contains configuration for a multiplexer.
func DefaultMuxConfig ¶
func DefaultMuxConfig() MuxConfig
DefaultMuxConfig returns the default multiplexer configuration.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages a pool of multiplexed connections.
func NewPool ¶
func NewPool(config PoolConfig, dialFunc DialFunc, isClient bool) *Pool
NewPool creates a new connection pool.
func (*Pool) OpenStream ¶
OpenStream gets a connection and opens a stream.
type PoolConfig ¶
type PoolConfig struct {
// MaxSize is the maximum number of connections in the pool.
MaxSize int
// MinSize is the minimum number of connections to maintain.
MinSize int
// MaxIdleTime is the maximum time a connection can be idle before being closed.
MaxIdleTime time.Duration
// HealthCheckInterval is the interval between health checks.
HealthCheckInterval time.Duration
// ConnectTimeout is the timeout for establishing new connections.
ConnectTimeout time.Duration
// MuxConfig is the configuration for multiplexers.
MuxConfig MuxConfig
}
PoolConfig contains configuration for a connection pool.
func DefaultPoolConfig ¶
func DefaultPoolConfig() PoolConfig
DefaultPoolConfig returns the default pool configuration.
type PoolStats ¶
type PoolStats struct {
// TotalConnections is the total number of connections created.
TotalConnections int64
// ActiveConnections is the current number of active connections.
ActiveConnections int64
// IdleConnections is the current number of idle connections.
IdleConnections int64
// TotalStreams is the total number of streams created.
TotalStreams int64
// FailedConnections is the number of failed connection attempts.
FailedConnections int64
// HealthCheckFailures is the number of health check failures.
HealthCheckFailures int64
}
PoolStats contains pool statistics.
type RemoteError ¶
RemoteError represents an error received from the remote end.
func (*RemoteError) Error ¶
func (e *RemoteError) Error() string
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a virtual stream within a multiplexed connection. It implements io.ReadWriteCloser and provides flow control.
func (*Stream) Done ¶
func (s *Stream) Done() <-chan struct{}
Done returns a channel that is closed when the stream is closed.
func (*Stream) Read ¶
Read reads data from the stream. It blocks until data is available, the stream is closed, or the deadline expires.
func (*Stream) SetDeadline ¶
SetDeadline sets the read and write deadlines.
func (*Stream) SetReadDeadline ¶
SetReadDeadline sets the deadline for future Read calls.
func (*Stream) SetWriteDeadline ¶
SetWriteDeadline sets the deadline for future Write calls.
type StreamConfig ¶
type StreamConfig struct {
// WindowSize is the initial flow control window size.
WindowSize uint32
// MaxWindowSize is the maximum window size.
MaxWindowSize uint32
// ReadBufferSize is the size of the read buffer.
ReadBufferSize int
}
StreamConfig contains configuration for a stream.
func DefaultStreamConfig ¶
func DefaultStreamConfig() StreamConfig
DefaultStreamConfig returns the default stream configuration.