tunnel

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package tunnel provides the core tunneling functionality for Wormhole.

The tunnel package implements a multiplexed connection protocol that allows multiple virtual streams to be established over a single TCP connection. This is similar to the yamux protocol but with a custom binary frame format.

Frame Protocol

The frame protocol uses a simple binary format:

+----------+----------+------------+----------+------------------+
| Version  |   Type   |  StreamID  |  Length  |     Payload      |
|  1 byte  |  1 byte  |  4 bytes   |  4 bytes |    N bytes       |
+----------+----------+------------+----------+------------------+

Frame types:

  • DATA (0x01): Regular data frame
  • WINDOW_UPDATE (0x02): Flow control window update
  • PING (0x03): Keep-alive ping
  • CLOSE (0x04): Stream close notification
  • HANDSHAKE (0x05): Initial handshake frame
  • ERROR (0x06): Error notification

Usage

Server side:

mux, err := tunnel.Server(conn, config)
if err != nil {
    return err
}
for {
    stream, err := mux.AcceptStream()
    if err != nil {
        return err
    }
    go handleStream(stream)
}

Client side:

mux, err := tunnel.Client(conn, config)
if err != nil {
    return err
}
stream, err := mux.OpenStream(ctx)
if err != nil {
    return err
}
// Use stream as io.ReadWriteCloser

Index

Constants

View Source
const (
	// FrameVersion is the current protocol version.
	FrameVersion uint8 = 1

	// FrameHeaderSize is the size of the frame header in bytes:
	// [Version:1][Type:1][StreamID:4][Length:4] = 10 bytes.
	FrameHeaderSize = 10

	// MaxFramePayloadSize is the maximum payload size (16MB).
	MaxFramePayloadSize = 16 * 1024 * 1024

	// DefaultFramePayloadSize is the default payload size (32KB).
	DefaultFramePayloadSize = 32 * 1024
)

Frame protocol constants.

Variables

View Source
var (
	ErrInvalidVersion     = errors.New("tunnel: invalid frame version")
	ErrInvalidFrameType   = errors.New("tunnel: invalid frame type")
	ErrPayloadTooLarge    = errors.New("tunnel: payload too large")
	ErrFrameTooShort      = errors.New("tunnel: frame too short")
	ErrInvalidStreamID    = errors.New("tunnel: invalid stream ID")
	ErrUnexpectedEOF      = errors.New("tunnel: unexpected EOF")
	ErrChecksumMismatch   = errors.New("tunnel: checksum mismatch")
	ErrConnectionClosed   = errors.New("tunnel: connection closed")
	ErrStreamClosed       = errors.New("tunnel: stream closed")
	ErrStreamNotFound     = errors.New("tunnel: stream not found")
	ErrStreamAlreadyExist = errors.New("tunnel: stream already exists")
	ErrTimeout            = errors.New("tunnel: operation timeout")
	ErrMuxClosed          = errors.New("tunnel: mux closed")
)

Frame errors.

View Source
var DefaultCodec = NewFrameCodec()

DefaultCodec is the default frame codec.

Functions

func EncodeFrame

func EncodeFrame(w io.Writer, f *Frame) error

EncodeFrame is a convenience function that uses the default codec.

func ParseError

func ParseError(f *Frame) (code uint32, message string, err error)

ParseError extracts the error code and message from an error frame.

func ParsePing

func ParsePing(f *Frame) (uint32, error)

ParsePing extracts the ping ID from a ping frame.

func ParsePong

func ParsePong(f *Frame) (uint32, error)

ParsePong extracts the ping ID from a pong frame.

func ParseWindowUpdate

func ParseWindowUpdate(f *Frame) (uint32, error)

ParseWindowUpdate extracts the increment from a window update frame.

Types

type DialFunc

type DialFunc func(ctx context.Context) (net.Conn, error)

DialFunc is a function that creates a new connection.

type Frame

type Frame struct {
	// Version is the protocol version (always 1 for now).
	Version uint8

	// Type indicates the frame type.
	Type FrameType

	// StreamID identifies the stream this frame belongs to.
	// StreamID 0 is reserved for connection-level frames.
	StreamID uint32

	// Payload contains the frame data.
	// The length is stored in the header but derived from len(Payload).
	Payload []byte
}

Frame represents a single protocol frame.

Frame format:

+----------+----------+------------+----------+------------------+
| Version  |   Type   |  StreamID  |  Length  |     Payload      |
|  1 byte  |  1 byte  |  4 bytes   |  4 bytes |    N bytes       |
+----------+----------+------------+----------+------------------+

func DecodeFrame

func DecodeFrame(r io.Reader) (*Frame, error)

DecodeFrame is a convenience function that uses the default codec.

func NewCloseFrame

func NewCloseFrame(streamID uint32) *Frame

NewCloseFrame creates a new close frame.

func NewDataFrame

func NewDataFrame(streamID uint32, payload []byte) *Frame

NewDataFrame creates a new data frame.

func NewErrorFrame

func NewErrorFrame(streamID uint32, code uint32, message string) *Frame

NewErrorFrame creates a new error frame.

func NewFrame

func NewFrame(frameType FrameType, streamID uint32, payload []byte) *Frame

NewFrame creates a new frame with the given parameters.

func NewHandshakeFrame

func NewHandshakeFrame(payload []byte) *Frame

NewHandshakeFrame creates a new handshake frame.

func NewPingFrame

func NewPingFrame(pingID uint32) *Frame

NewPingFrame creates a new ping frame.

func NewPongFrame

func NewPongFrame(pingID uint32) *Frame

NewPongFrame creates a new pong (ping response) frame.

func NewWindowUpdateFrame

func NewWindowUpdateFrame(streamID uint32, increment uint32) *Frame

NewWindowUpdateFrame creates a new window update frame.

func (*Frame) Clone

func (f *Frame) Clone() *Frame

Clone creates a deep copy of the frame.

func (*Frame) IsConnectionLevel

func (f *Frame) IsConnectionLevel() bool

IsConnectionLevel returns true if this is a connection-level frame.

func (*Frame) Length

func (f *Frame) Length() uint32

Length returns the payload length.

func (*Frame) String

func (f *Frame) String() string

String returns a string representation of the frame.

func (*Frame) TotalSize

func (f *Frame) TotalSize() int

TotalSize returns the total frame size including header.

func (*Frame) Validate

func (f *Frame) Validate() error

Validate checks if the frame is valid.

type FrameCodec

type FrameCodec struct {
	// contains filtered or unexported fields
}

FrameCodec handles encoding and decoding of frames. It is safe for concurrent use.

func NewFrameCodec

func NewFrameCodec(opts ...FrameCodecOption) *FrameCodec

NewFrameCodec creates a new frame codec with the given options.

func (*FrameCodec) Decode

func (c *FrameCodec) Decode(r io.Reader) (*Frame, error)

Decode reads a frame from the reader.

func (*FrameCodec) Encode

func (c *FrameCodec) Encode(w io.Writer, f *Frame) error

Encode writes the frame to the writer.

type FrameCodecOption

type FrameCodecOption func(*FrameCodec)

FrameCodecOption is a function that configures a FrameCodec.

func WithMaxPayloadSize

func WithMaxPayloadSize(size uint32) FrameCodecOption

WithMaxPayloadSize sets the maximum payload size.

type FrameType

type FrameType uint8

FrameType represents the type of a frame.

const (
	// FrameData carries stream data.
	FrameData FrameType = 0x01

	// FrameWindowUpdate updates the flow control window.
	FrameWindowUpdate FrameType = 0x02

	// FramePing is used for keep-alive.
	FramePing FrameType = 0x03

	// FramePong is the response to a ping frame.
	FramePong FrameType = 0x04

	// FrameClose signals stream closure.
	FrameClose FrameType = 0x05

	// FrameHandshake is used for initial handshake.
	FrameHandshake FrameType = 0x06

	// FrameError signals an error condition.
	FrameError FrameType = 0x07
)

Frame types.

func (FrameType) IsValid

func (t FrameType) IsValid() bool

IsValid returns whether the frame type is valid.

func (FrameType) String

func (t FrameType) String() string

String returns the string representation of the frame type.

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux is a multiplexer that manages multiple streams over a single connection.

func Client

func Client(conn net.Conn, config MuxConfig) (*Mux, error)

Client creates a new client-side multiplexer.

func Server

func Server(conn net.Conn, config MuxConfig) (*Mux, error)

Server creates a new server-side multiplexer.

func (*Mux) AcceptStream

func (m *Mux) AcceptStream() (*Stream, error)

AcceptStream waits for and returns the next incoming stream.

func (*Mux) AcceptStreamContext

func (m *Mux) AcceptStreamContext(ctx context.Context) (*Stream, error)

AcceptStreamContext waits for the next incoming stream with context.

func (*Mux) Close

func (m *Mux) Close() error

Close closes the multiplexer and all streams.

func (*Mux) IsClosed

func (m *Mux) IsClosed() bool

IsClosed returns whether the multiplexer is closed.

func (*Mux) LocalAddr

func (m *Mux) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Mux) NumStreams

func (m *Mux) NumStreams() int

NumStreams returns the number of active streams.

func (*Mux) OpenStream

func (m *Mux) OpenStream() (*Stream, error)

OpenStream opens a new stream.

func (*Mux) OpenStreamContext

func (m *Mux) OpenStreamContext(ctx context.Context) (*Stream, error)

OpenStreamContext opens a new stream with context.

func (*Mux) RemoteAddr

func (m *Mux) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

type MuxConfig

type MuxConfig struct {
	// AcceptBacklog is the maximum number of pending streams.
	AcceptBacklog int

	// StreamConfig is the configuration for new streams.
	StreamConfig StreamConfig

	// KeepAliveInterval is the interval between keep-alive pings.
	// Set to 0 to disable keep-alive.
	KeepAliveInterval time.Duration

	// KeepAliveTimeout is the timeout for keep-alive pings.
	KeepAliveTimeout time.Duration

	// MaxFrameSize is the maximum frame size.
	MaxFrameSize uint32

	// EnableFlowControl enables flow control.
	EnableFlowControl bool
}

MuxConfig contains configuration for a multiplexer.

func DefaultMuxConfig

func DefaultMuxConfig() MuxConfig

DefaultMuxConfig returns the default multiplexer configuration.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a pool of multiplexed connections.

func NewPool

func NewPool(config PoolConfig, dialFunc DialFunc, isClient bool) *Pool

NewPool creates a new connection pool.

func (*Pool) Close

func (p *Pool) Close() error

Close closes the pool and all connections.

func (*Pool) Get

func (p *Pool) Get(ctx context.Context) (*Mux, error)

Get returns a connection from the pool or creates a new one.

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns whether the pool is closed.

func (*Pool) Len

func (p *Pool) Len() int

Len returns the number of connections in the pool.

func (*Pool) OpenStream

func (p *Pool) OpenStream(ctx context.Context) (*Stream, *Mux, error)

OpenStream gets a connection and opens a stream.

func (*Pool) Put

func (p *Pool) Put(mux *Mux)

Put returns a connection to the pool.

func (*Pool) Stats

func (p *Pool) Stats() PoolStats

Stats returns the current pool statistics.

func (*Pool) Warmup

func (p *Pool) Warmup(ctx context.Context) error

Warmup pre-creates connections up to MinSize.

type PoolConfig

type PoolConfig struct {
	// MaxSize is the maximum number of connections in the pool.
	MaxSize int

	// MinSize is the minimum number of connections to maintain.
	MinSize int

	// MaxIdleTime is the maximum time a connection can be idle before being closed.
	MaxIdleTime time.Duration

	// HealthCheckInterval is the interval between health checks.
	HealthCheckInterval time.Duration

	// ConnectTimeout is the timeout for establishing new connections.
	ConnectTimeout time.Duration

	// MuxConfig is the configuration for multiplexers.
	MuxConfig MuxConfig
}

PoolConfig contains configuration for a connection pool.

func DefaultPoolConfig

func DefaultPoolConfig() PoolConfig

DefaultPoolConfig returns the default pool configuration.

type PoolStats

type PoolStats struct {
	// TotalConnections is the total number of connections created.
	TotalConnections int64

	// ActiveConnections is the current number of active connections.
	ActiveConnections int64

	// IdleConnections is the current number of idle connections.
	IdleConnections int64

	// TotalStreams is the total number of streams created.
	TotalStreams int64

	// FailedConnections is the number of failed connection attempts.
	FailedConnections int64

	// HealthCheckFailures is the number of health check failures.
	HealthCheckFailures int64
}

PoolStats contains pool statistics.

type RemoteError

type RemoteError struct {
	Code    uint32
	Message string
}

RemoteError represents an error received from the remote end.

func (*RemoteError) Error

func (e *RemoteError) Error() string

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a virtual stream within a multiplexed connection. It implements io.ReadWriteCloser and provides flow control.

func (*Stream) Close

func (s *Stream) Close() error

Close closes the stream.

func (*Stream) Done

func (s *Stream) Done() <-chan struct{}

Done returns a channel that is closed when the stream is closed.

func (*Stream) ID

func (s *Stream) ID() uint32

ID returns the stream ID.

func (*Stream) IsClosed

func (s *Stream) IsClosed() bool

IsClosed returns whether the stream is closed.

func (*Stream) Read

func (s *Stream) Read(p []byte) (int, error)

Read reads data from the stream. It blocks until data is available, the stream is closed, or the deadline expires.

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls.

func (*Stream) Write

func (s *Stream) Write(p []byte) (int, error)

Write writes data to the stream. It blocks until all data is written, the stream is closed, or the deadline expires.

type StreamConfig

type StreamConfig struct {
	// WindowSize is the initial flow control window size.
	WindowSize uint32

	// MaxWindowSize is the maximum window size.
	MaxWindowSize uint32

	// ReadBufferSize is the size of the read buffer.
	ReadBufferSize int
}

StreamConfig contains configuration for a stream.

func DefaultStreamConfig

func DefaultStreamConfig() StreamConfig

DefaultStreamConfig returns the default stream configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL