Documentation
¶
Overview ¶
Package accept contains the TCP accept layer for the registry server: connection handling, TLS configuration, rate limiting, log sampling, and panic recovery. It is intentionally free of domain logic — all message dispatch is delegated to the Dispatcher interface provided at construction.
Index ¶
- func GenerateSelfSignedCert() (tls.Certificate, error)
- func RecoveredPanicCount() uint64
- func SanitizeListenAddr(remoteAddr, clientAddr string) string
- type Acceptor
- func (a *Acceptor) Accept(done <-chan struct{}) error
- func (a *Acceptor) ConnCount() int64
- func (a *Acceptor) Listen(addr string) error
- func (a *Acceptor) Listener() net.Listener
- func (a *Acceptor) LogSamplerCleanup()
- func (a *Acceptor) RateLimitWhitelistSize() int
- func (a *Acceptor) RateLimiterCleanup()
- func (a *Acceptor) SetMaxConnections(max int64)
- func (a *Acceptor) SetRateLimitWhitelist(entries []WhitelistEntry) error
- func (a *Acceptor) SetTLS(certFile, keyFile string) error
- func (a *Acceptor) ShouldLog(key string) (bool, int64)
- func (a *Acceptor) TLSConfig() *tls.Config
- type Dispatcher
- type RateLimiter
- func (rl *RateLimiter) Allow(ip string) bool
- func (rl *RateLimiter) BucketCount() int
- func (rl *RateLimiter) Cleanup()
- func (rl *RateLimiter) HasBucket(ip string) bool
- func (rl *RateLimiter) SetClock(fn func() time.Time)
- func (rl *RateLimiter) SetWhitelist(entries []WhitelistEntry) error
- func (rl *RateLimiter) WhitelistSize() int
- type WhitelistEntry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateSelfSignedCert ¶
func GenerateSelfSignedCert() (tls.Certificate, error)
GenerateSelfSignedCert creates an in-memory self-signed TLS certificate.
func RecoveredPanicCount ¶
func RecoveredPanicCount() uint64
RecoveredPanicCount returns the total number of panics swallowed since start.
func SanitizeListenAddr ¶
SanitizeListenAddr uses the TCP source IP from remoteAddr but accepts the port from the client-provided address. Prevents clients from registering arbitrary IPs while allowing them to specify their actual listening port.
Types ¶
type Acceptor ¶
type Acceptor struct {
// contains filtered or unexported fields
}
Acceptor manages the TCP accept loop with TLS, rate limiting, log sampling, and per-connection panic recovery. All domain logic is delegated to the Dispatcher supplied at construction.
func NewAcceptor ¶
func NewAcceptor(maxConns int64, d Dispatcher) *Acceptor
NewAcceptor creates an Acceptor with the given connection limit and dispatcher. A rate limiter (100 req/s per IP, 50 000-bucket cap) and log sampler (every 1000th suppressed log) are created automatically.
func (*Acceptor) Accept ¶
Accept runs the accept loop. For each accepted connection it checks the connection limit and spawns handleConn in a goroutine. The loop returns nil when done is closed.
The caller is responsible for closing the listener (e.g. via Close()) when done is closed, which unblocks the Accept() call on the listener.
func (*Acceptor) LogSamplerCleanup ¶
func (a *Acceptor) LogSamplerCleanup()
LogSamplerCleanup resets the log sampler counters. Call from a maintenance loop.
func (*Acceptor) RateLimitWhitelistSize ¶
RateLimitWhitelistSize returns the number of installed whitelist rules.
func (*Acceptor) RateLimiterCleanup ¶
func (a *Acceptor) RateLimiterCleanup()
RateLimiterCleanup evicts idle per-IP buckets. Call from a maintenance loop.
Without periodic invocation the only eviction path is the in-line one in Allow() — and it runs only when an unknown IP arrives AND the bucket map is already at maxBuckets. Once the cap is reached, legitimate IPs arriving as the map fills get rejected until an active bucket either stops sending (so its lastFill goes stale) and a new IP collides with the full map at the right moment to trigger eviction. Wiring this into the same 10-second reapLoop that handles stale nodes/beacons keeps the bucket count proportional to recently-active IPs rather than to peak-historic-active IPs.
func (*Acceptor) SetMaxConnections ¶
SetMaxConnections overrides the active connection limit (for testing).
func (*Acceptor) SetRateLimitWhitelist ¶
func (a *Acceptor) SetRateLimitWhitelist(entries []WhitelistEntry) error
SetRateLimitWhitelist installs an elevated-rate whitelist on the Acceptor's rate limiter. See RateLimiter.SetWhitelist for semantics. Operator-facing entry point — fail-closed on parse errors.
func (*Acceptor) SetTLS ¶
SetTLS configures TLS. If certFile is empty a self-signed certificate is generated automatically.
func (*Acceptor) ShouldLog ¶
ShouldLog delegates to the internal log sampler. Returns true if this occurrence of key should be logged, plus the suppressed count since the last logged occurrence. Used by sibling files (e.g. wal_replay.go) that need access to log sampling without importing the sampler directly.
type Dispatcher ¶
type Dispatcher interface {
// HandleMessage dispatches a decoded JSON message and returns the response map.
HandleMessage(msg map[string]interface{}, remoteAddr string) (map[string]interface{}, error)
// HandleSubscribeReplication takes over the conn for replication streaming.
HandleSubscribeReplication(conn net.Conn)
// HandleBinaryHeartbeat processes a native binary heartbeat frame.
HandleBinaryHeartbeat(conn net.Conn, payload []byte)
// HandleBinaryLookup processes a native binary lookup frame.
HandleBinaryLookup(conn net.Conn, payload []byte, host string)
// HandleBinaryResolve processes a native binary resolve frame.
HandleBinaryResolve(conn net.Conn, payload []byte, host string)
// ReplicationToken returns the current replication auth token.
// An empty string means replication is disabled.
ReplicationToken() string
// AddRequest increments the server-level request counter.
AddRequest()
}
Dispatcher is the callback interface that the Acceptor uses to hand off decoded messages and native binary frames to the domain layer (Server). Implementing this interface on *Server keeps the accept layer free of any import cycle with the server package.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter tracks per-IP registration attempts using a token bucket.
Whitelist (optional): a list of CIDR ranges, each paired with an elevated per-window rate. When an IP falls into a whitelist CIDR, its bucket is created (or refilled toward) the elevated rate instead of the default `rate`. Whitelisted IPs also bypass the maxBuckets cap — since IP whitelisting is an operator-set trust signal and TCP source addresses cannot be spoofed, it is safe to always grant them a bucket slot. The whitelist is consulted only on bucket creation; existing buckets retain whatever per-bucket rate they were created with until they go idle and Cleanup() evicts them, after which the next request will re-evaluate the whitelist.
func NewRateLimiter ¶
func NewRateLimiter(rate int, window time.Duration, maxBuckets int) *RateLimiter
NewRateLimiter creates a rate limiter allowing rate requests per window per IP. maxBuckets caps the number of tracked IPs to prevent memory exhaustion. Pass 0 for unlimited (not recommended in production).
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow(ip string) bool
Allow checks if a request from the given IP is allowed.
func (*RateLimiter) BucketCount ¶
func (rl *RateLimiter) BucketCount() int
BucketCount returns the number of tracked IPs (for testing).
func (*RateLimiter) Cleanup ¶
func (rl *RateLimiter) Cleanup()
Cleanup removes stale buckets. Call periodically.
func (*RateLimiter) HasBucket ¶
func (rl *RateLimiter) HasBucket(ip string) bool
HasBucket returns whether a given IP has an active bucket (for testing).
func (*RateLimiter) SetClock ¶
func (rl *RateLimiter) SetClock(fn func() time.Time)
SetClock overrides the time source (for testing).
func (*RateLimiter) SetWhitelist ¶
func (rl *RateLimiter) SetWhitelist(entries []WhitelistEntry) error
SetWhitelist installs (or replaces) the elevated-rate whitelist. Each entry's CIDR is parsed; on any parse error or non-positive rate the call returns the error and leaves the existing whitelist unchanged (fail-closed so a typo in operator config can't silently drop the elevations operators relied on).
Bare IP literals (e.g. "1.2.3.4" with no slash) are auto-promoted to /32 (IPv4) or /128 (IPv6) for convenience. CIDR matching applies to both families.
Entries take effect on next bucket creation. Existing buckets keep the per-bucket rate they were created with; the periodic Cleanup reaper evicts idle buckets, after which the next request from that IP picks up the current whitelist.
func (*RateLimiter) WhitelistSize ¶
func (rl *RateLimiter) WhitelistSize() int
WhitelistSize returns the number of installed whitelist rules (for testing and operator-facing introspection).
type WhitelistEntry ¶
WhitelistEntry pairs a CIDR with its elevated per-window rate. Operator- supplied via RateLimiter.SetWhitelist / Acceptor.SetRateLimitWhitelist. Empty CIDR or non-positive Rate is rejected by SetWhitelist.