connmux

package
v2.0.0-beta.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: MIT Imports: 12 Imported by: 0

README

connmux

connmux 是一个基于连接“前若干字节”进行协议分流的连接复用器:你可以在同一个端口上同时跑 gRPC、HTTP/1、HTTP/2 或自定义 TCP 协议。

设计理念与 github.com/soheilhy/cmux 类似,但这里是面向本仓库的独立实现;同时提供 github.com/pubgo/funk/v2/cmux 兼容包装层,便于平滑迁移。

快速开始

root, _ := net.Listen("tcp", ":8080")

m := connmux.New(root,
	connmux.WithReadTimeout(2*time.Second),
	connmux.WithMaxSniffBytes(1<<20),
)

// 按注册顺序匹配,越早注册优先级越高
grpcL := m.Match(connmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(connmux.HTTP1Fast())
rawL := m.Match(connmux.Any())

go grpcServer.Serve(grpcL)
go httpServer.Serve(httpL)
go serveRaw(rawL)

_ = m.Serve()

API 概览

  • New(root net.Listener, opts ...Option) *Mux
  • (*Mux).Match(matchers ...Matcher) net.Listener
  • (*Mux).MatchWithWriters(writers ...MatchWriter) net.Listener
  • (*Mux).Serve() error
  • (*Mux).Close() error
Options
  • WithReadTimeout(d time.Duration):sniff 阶段每次 Read 的超时;防 slowloris
  • WithMaxSniffBytes(n int):sniff 阶段最大缓存字节数(每连接)
  • WithConnBacklog(n int):每个子 listener 的队列长度(缓冲 Accept)
  • WithErrorHandler(func(error) bool):错误处理;返回 true 表示继续 Serve

内置匹配器

  • Any():兜底匹配
  • Prefix(...[]byte):按前缀匹配
  • HTTP1Fast():用 method 前缀快速判断 HTTP/1(快,但不解析)
  • HTTP1():解析 HTTP/1 request(更准)
  • HTTP2():匹配 HTTP/2 client preface
  • HTTP2HeaderField(name, value):匹配 HTTP/2 HEADERS 中某个字段
  • HTTP2HeaderFieldPrefix(name, valuePrefix):匹配 HTTP/2 HEADERS 字段前缀
  • HTTP2HeaderFieldSendSettings(name, value):用于 MatchWithWriters;sniff 阶段必要时写 SETTINGS,再去匹配 header

常见坑 / 设计约束

  • 匹配只发生一次:连接在 Accept 后决定归属,后续不能在同一连接上“切协议”。
  • 资源控制:匹配依赖读取并缓存字节;建议设置 WithReadTimeoutWithMaxSniffBytes
  • Java gRPC:部分 Java gRPC 客户端会等服务端 SETTINGS;请用 MatchWithWriters(HTTP2HeaderFieldSendSettings(...))
  • TLS:如果你在 connmux 之后再做 TLS(或让 net/http 依赖对 net.Conn 的类型断言识别 TLS),包装连接可能影响某些 TLS 相关识别;如需 Request.TLS 等状态,建议先终止 TLS 再做分流。

从 soheilhy/cmux 迁移

如果你原来写的是:

grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))

可以直接改为:

grpcL := m.Match(connmux.HTTP2HeaderField("content-type", "application/grpc"))

或者短期内先用兼容包装层:

import "github.com/pubgo/funk/v2/cmux"

兼容层会把调用转发到 connmux

测试

go test ./connmux

Documentation

Overview

Package connmux multiplexes network connections based on their initial bytes.

It lets you serve multiple protocols (gRPC/HTTP/1/HTTP/2/raw TCP, etc.) on the same listening port by sniffing the beginning of each accepted connection and dispatching it to a protocol-specific net.Listener.

This package is conceptually similar to github.com/soheilhy/cmux (Apache-2.0) but is an independent implementation tailored for this repository.

## Basics

root, _ := net.Listen("tcp", ":8080")
m := connmux.New(root,
	connmux.WithReadTimeout(2*time.Second),
	connmux.WithMaxSniffBytes(1<<20),
)

// Match order defines priority.
grpcL := m.Match(connmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(connmux.HTTP1Fast())
other := m.Match(connmux.Any())

go grpcServer.Serve(grpcL)
go httpServer.Serve(httpL)
go serveRaw(other)
_ = m.Serve()

## MatchWithWriters (Java gRPC)

Some clients (notably Java gRPC) may wait for the server SETTINGS frame before sending request headers. In those cases use MatchWithWriters with HTTP2HeaderFieldSendSettings:

grpcL := m.MatchWithWriters(
	connmux.HTTP2HeaderFieldSendSettings("content-type", "application/grpc"),
)

## Notes / limitations

  • The match decision is made when a connection is accepted. A single connection cannot switch protocols later.
  • Matching is based on reading and buffering initial bytes. Use WithMaxSniffBytes to cap memory per connection and WithReadTimeout to avoid slowloris-style hangs during sniffing.
  • If you terminate TLS after connmux, some stdlib components may not detect the underlying *tls.Conn due to type assertions on net.Conn wrappers. If your handler relies on TLS-specific state, prefer terminating TLS before multiplexing.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotMatched = errors.New("connmux: connection not matched")

ErrNotMatched indicates that an accepted connection did not match any rule.

View Source
var ErrServerClosed = errors.New("connmux: server closed")

ErrServerClosed is returned by (*Mux).Serve after Close is called.

Functions

This section is empty.

Types

type MatchWriter

type MatchWriter func(w io.Writer, r io.Reader) bool

MatchWriter is like Matcher but can also write back to the connection during sniffing (e.g. sending HTTP/2 SETTINGS to unblock certain clients).

func HTTP2HeaderFieldSendSettings

func HTTP2HeaderFieldSendSettings(name, value string) MatchWriter

HTTP2HeaderFieldSendSettings is a MatchWriter that writes an initial SETTINGS frame (when appropriate) while sniffing, then matches based on a header field.

This is useful for clients that wait for server SETTINGS before sending request HEADERS (e.g. Java gRPC).

type Matcher

type Matcher func(r io.Reader) bool

Matcher matches a connection by reading from r.

Notes:

  • r is a sniffing reader: reads are buffered and will be replayed to the final protocol server once the connection is dispatched.
  • Each matcher is evaluated against a fresh reader starting from the beginning of the buffered stream (OR semantics).

func Any

func Any() Matcher

Any matches any connection.

func HTTP1

func HTTP1() Matcher

HTTP1 matches by parsing an HTTP/1.x request line and headers.

func HTTP1Fast

func HTTP1Fast() Matcher

HTTP1Fast matches common HTTP/1.x methods using a small prefix check. It is faster but less accurate than HTTP1().

func HTTP2

func HTTP2() Matcher

HTTP2 matches an HTTP/2 connection by verifying the client preface.

func HTTP2HeaderField

func HTTP2HeaderField(name, value string) Matcher

HTTP2HeaderField matches an HTTP/2 connection by looking for an exact header field value (case-insensitive name match).

Note: some clients (notably Java gRPC) will not send HEADERS until they receive a SETTINGS frame from the server; for those, use HTTP2HeaderFieldSendSettings with MatchWithWriters.

func HTTP2HeaderFieldPrefix

func HTTP2HeaderFieldPrefix(name, valuePrefix string) Matcher

HTTP2HeaderFieldPrefix matches an HTTP/2 connection by looking for a header field value prefix (case-insensitive name match).

func Prefix

func Prefix(prefixes ...[]byte) Matcher

Prefix matches if the connection starts with any of the provided prefixes.

This matcher only reads as many bytes as the longest prefix.

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux multiplexes a single net.Listener into multiple protocol-specific listeners.

Example
package main

import (
	"fmt"
	"io"
	"net"
	"time"

	"github.com/pubgo/funk/v2/closer"
	"github.com/pubgo/funk/v2/connmux"
)

func main() {
	root, _ := net.Listen("tcp", "127.0.0.1:0")
	defer closer.SafeClose(root)

	m := connmux.New(root, connmux.WithReadTimeout(2*time.Second))
	httpL := m.Match(connmux.HTTP1Fast())
	_ = m.Match(connmux.Any())

	go func() { _ = m.Serve() }()
	defer closer.SafeClose(m)

	c, _ := net.Dial("tcp", root.Addr().String())
	defer closer.SafeClose(c)
	_, _ = c.Write([]byte("GET / HTTP/1.1\r\nHost: example\r\n\r\n"))

	s, _ := httpL.Accept()
	defer closer.SafeClose(s)

	b := make([]byte, 3)
	_, _ = io.ReadFull(s, b)
	fmt.Println(string(b))

}
Output:

GET

func New

func New(l net.Listener, opts ...Option) *Mux

New creates a new connection multiplexer.

func (*Mux) Close

func (m *Mux) Close() error

Close stops Serve and closes the root listener and all matched listeners.

func (*Mux) Match

func (m *Mux) Match(matchers ...Matcher) net.Listener

Match registers a rule and returns a listener that only accepts matching connections. Match order defines priority.

func (*Mux) MatchWithWriters

func (m *Mux) MatchWithWriters(writers ...MatchWriter) net.Listener

MatchWithWriters registers a rule composed of match-writers. Match order defines priority.

func (*Mux) Serve

func (m *Mux) Serve() error

Serve starts accepting on the root listener and dispatching to matched listeners. It blocks until the root listener errors or Close is called.

type Option

type Option func(*Mux)

Option configures a Mux.

func WithConnBacklog

func WithConnBacklog(n int) Option

WithConnBacklog sets the per-matched-listener connection backlog. Defaults to 128.

func WithErrorHandler

func WithErrorHandler(h func(error) bool) Option

WithErrorHandler sets a handler for accept/match errors. If h returns true, Serve continues; otherwise Serve returns the error.

func WithMaxSniffBytes

func WithMaxSniffBytes(n int) Option

WithMaxSniffBytes caps how many bytes can be buffered while matching. Defaults to 1 MiB.

func WithReadTimeout

func WithReadTimeout(d time.Duration) Option

WithReadTimeout sets a per-read deadline while sniffing. A zero duration disables deadlines (default).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL