accept

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: AGPL-3.0 Imports: 23 Imported by: 0

Documentation

Overview

Package accept contains the TCP accept layer for the registry server: connection handling, TLS configuration, rate limiting, log sampling, and panic recovery. It is intentionally free of domain logic — all message dispatch is delegated to the Dispatcher interface provided at construction.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateSelfSignedCert

func GenerateSelfSignedCert() (tls.Certificate, error)

GenerateSelfSignedCert creates an in-memory self-signed TLS certificate.

func RecoveredPanicCount

func RecoveredPanicCount() uint64

RecoveredPanicCount returns the total number of panics swallowed since start.

func SanitizeListenAddr

func SanitizeListenAddr(remoteAddr, clientAddr string) string

SanitizeListenAddr uses the TCP source IP from remoteAddr but accepts the port from the client-provided address. Prevents clients from registering arbitrary IPs while allowing them to specify their actual listening port.

Types

type Acceptor

type Acceptor struct {
	// contains filtered or unexported fields
}

Acceptor manages the TCP accept loop with TLS, rate limiting, log sampling, and per-connection panic recovery. All domain logic is delegated to the Dispatcher supplied at construction.

func NewAcceptor

func NewAcceptor(maxConns int64, d Dispatcher) *Acceptor

NewAcceptor creates an Acceptor with the given connection limit and dispatcher. A rate limiter (100 req/s per IP, 50 000-bucket cap) and log sampler (every 1000th suppressed log) are created automatically.

func (*Acceptor) Accept

func (a *Acceptor) Accept(done <-chan struct{}) error

Accept runs the accept loop. For each accepted connection it checks the connection limit and spawns handleConn in a goroutine. The loop returns nil when done is closed.

The caller is responsible for closing the listener (e.g. via Close()) when done is closed, which unblocks the Accept() call on the listener.

func (*Acceptor) ConnCount

func (a *Acceptor) ConnCount() int64

ConnCount returns the number of active connections.

func (*Acceptor) Listen

func (a *Acceptor) Listen(addr string) error

Listen binds the listener on addr (with TLS if configured).

func (*Acceptor) Listener

func (a *Acceptor) Listener() net.Listener

Listener returns the underlying net.Listener (nil before Listen).

func (*Acceptor) LogSamplerCleanup

func (a *Acceptor) LogSamplerCleanup()

LogSamplerCleanup resets the log sampler counters. Call from a maintenance loop.

func (*Acceptor) RateLimitWhitelistSize

func (a *Acceptor) RateLimitWhitelistSize() int

RateLimitWhitelistSize returns the number of installed whitelist rules.

func (*Acceptor) RateLimiterCleanup

func (a *Acceptor) RateLimiterCleanup()

RateLimiterCleanup evicts idle per-IP buckets. Call from a maintenance loop.

Without periodic invocation the only eviction path is the in-line one in Allow() — and it runs only when an unknown IP arrives AND the bucket map is already at maxBuckets. Once the cap is reached, legitimate IPs arriving as the map fills get rejected until an active bucket either stops sending (so its lastFill goes stale) and a new IP collides with the full map at the right moment to trigger eviction. Wiring this into the same 10-second reapLoop that handles stale nodes/beacons keeps the bucket count proportional to recently-active IPs rather than to peak-historic-active IPs.

func (*Acceptor) SetMaxConnections

func (a *Acceptor) SetMaxConnections(max int64)

SetMaxConnections overrides the active connection limit (for testing).

func (*Acceptor) SetRateLimitWhitelist

func (a *Acceptor) SetRateLimitWhitelist(entries []WhitelistEntry) error

SetRateLimitWhitelist installs an elevated-rate whitelist on the Acceptor's rate limiter. See RateLimiter.SetWhitelist for semantics. Operator-facing entry point — fail-closed on parse errors.

func (*Acceptor) SetTLS

func (a *Acceptor) SetTLS(certFile, keyFile string) error

SetTLS configures TLS. If certFile is empty a self-signed certificate is generated automatically.

func (*Acceptor) ShouldLog

func (a *Acceptor) ShouldLog(key string) (bool, int64)

ShouldLog delegates to the internal log sampler. Returns true if this occurrence of key should be logged, plus the suppressed count since the last logged occurrence. Used by sibling files (e.g. wal_replay.go) that need access to log sampling without importing the sampler directly.

func (*Acceptor) TLSConfig

func (a *Acceptor) TLSConfig() *tls.Config

TLSConfig returns the active TLS config (nil = plaintext).

type Dispatcher

type Dispatcher interface {
	// HandleMessage dispatches a decoded JSON message and returns the response map.
	HandleMessage(msg map[string]interface{}, remoteAddr string) (map[string]interface{}, error)

	// HandleSubscribeReplication takes over the conn for replication streaming.
	HandleSubscribeReplication(conn net.Conn)

	// HandleBinaryHeartbeat processes a native binary heartbeat frame.
	HandleBinaryHeartbeat(conn net.Conn, payload []byte)

	// HandleBinaryLookup processes a native binary lookup frame.
	HandleBinaryLookup(conn net.Conn, payload []byte, host string)

	// HandleBinaryResolve processes a native binary resolve frame.
	HandleBinaryResolve(conn net.Conn, payload []byte, host string)

	// ReplicationToken returns the current replication auth token.
	// An empty string means replication is disabled.
	ReplicationToken() string

	// AddRequest increments the server-level request counter.
	AddRequest()
}

Dispatcher is the callback interface that the Acceptor uses to hand off decoded messages and native binary frames to the domain layer (Server). Implementing this interface on *Server keeps the accept layer free of any import cycle with the server package.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter tracks per-IP registration attempts using a token bucket.

Whitelist (optional): a list of CIDR ranges, each paired with an elevated per-window rate. When an IP falls into a whitelist CIDR, its bucket is created (or refilled toward) the elevated rate instead of the default `rate`. Whitelisted IPs also bypass the maxBuckets cap — since IP whitelisting is an operator-set trust signal and TCP source addresses cannot be spoofed, it is safe to always grant them a bucket slot. The whitelist is consulted only on bucket creation; existing buckets retain whatever per-bucket rate they were created with until they go idle and Cleanup() evicts them, after which the next request will re-evaluate the whitelist.

func NewRateLimiter

func NewRateLimiter(rate int, window time.Duration, maxBuckets int) *RateLimiter

NewRateLimiter creates a rate limiter allowing rate requests per window per IP. maxBuckets caps the number of tracked IPs to prevent memory exhaustion. Pass 0 for unlimited (not recommended in production).

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow(ip string) bool

Allow checks if a request from the given IP is allowed.

func (*RateLimiter) BucketCount

func (rl *RateLimiter) BucketCount() int

BucketCount returns the number of tracked IPs (for testing).

func (*RateLimiter) Cleanup

func (rl *RateLimiter) Cleanup()

Cleanup removes stale buckets. Call periodically.

func (*RateLimiter) HasBucket

func (rl *RateLimiter) HasBucket(ip string) bool

HasBucket returns whether a given IP has an active bucket (for testing).

func (*RateLimiter) SetClock

func (rl *RateLimiter) SetClock(fn func() time.Time)

SetClock overrides the time source (for testing).

func (*RateLimiter) SetWhitelist

func (rl *RateLimiter) SetWhitelist(entries []WhitelistEntry) error

SetWhitelist installs (or replaces) the elevated-rate whitelist. Each entry's CIDR is parsed; on any parse error or non-positive rate the call returns the error and leaves the existing whitelist unchanged (fail-closed so a typo in operator config can't silently drop the elevations operators relied on).

Bare IP literals (e.g. "1.2.3.4" with no slash) are auto-promoted to /32 (IPv4) or /128 (IPv6) for convenience. CIDR matching applies to both families.

Entries take effect on next bucket creation. Existing buckets keep the per-bucket rate they were created with; the periodic Cleanup reaper evicts idle buckets, after which the next request from that IP picks up the current whitelist.

func (*RateLimiter) WhitelistSize

func (rl *RateLimiter) WhitelistSize() int

WhitelistSize returns the number of installed whitelist rules (for testing and operator-facing introspection).

type WhitelistEntry

type WhitelistEntry struct {
	CIDR string
	Rate int
}

WhitelistEntry pairs a CIDR with its elevated per-window rate. Operator- supplied via RateLimiter.SetWhitelist / Acceptor.SetRateLimitWhitelist. Empty CIDR or non-positive Rate is rejected by SetWhitelist.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL