upstream

package
v0.18.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2026 License: MIT Imports: 22 Imported by: 6

Documentation

Overview

Package upstream is parapet's reverse-proxy and load-balancing layer: it turns a pool of backend targets into an http.RoundTripper that the proxy (upstream.New) drives, and layers a stack of reliability primitives on top of plain distribution.

The reliability stack

Every balancer below is a drop-in http.RoundTripper for upstream.New, reads its configuration once before the first request, and tracks state with per-target atomics so the hot path stays lock-free. Two of them (ActiveHealthCheck and HedgingLoadBalancer) WRAP another balancer rather than owning targets directly, so the pieces compose (see "Composition" below).

Distribution (no health logic)
  RoundRobinLoadBalancer          even spread, every target equal
  WeightedRoundRobinLoadBalancer  bias request COUNT by Target.Weight (SWRR)
  LeastConnLoadBalancer           bias by live in-flight CONNECTIONS; honours
                                  Target.MaxConcurrent (the bulkhead cap)

Error-based reliability
  EjectingLoadBalancer            passive outlier ejection on consecutive
                                  failures, backed-off cooldown
  CircuitBreakingLoadBalancer     fail-FAST: reject an open target with no
                                  round-trip; Closed/Open/HalfOpen

Latency-based reliability
  LatencyEjectingLoadBalancer     eject a "gray failure" (200s but slow) on a
                                  decayed-mean TTFB vs the pool median
  HedgingLoadBalancer             speculative retry after HedgeDelay to cut
                                  tail latency (wraps any balancer)

Active probing (wraps any balancer)
  ActiveHealthCheck               out-of-band probes; gates the wrapped
                                  balancer's pick, only ever REMOVES candidates

Choosing a primitive by failure mode

Pick by the failure you are defending against. An Upstream uses ONE balancer, so for the owning balancer choose the dominant failure mode (then compose the wrappers — active health and hedging — on top):

Failure mode                     Reach for
------------------------------   --------------------------------------------
flaky backend, hard 5xx/errors   EjectingLoadBalancer (keeps routing during a
                                 total outage) — or CircuitBreakingLoadBalancer
                                 if you would rather shed than hammer
dead / brownout origin, want     CircuitBreakingLoadBalancer (fail fast; an
to fail fast and shed            open target costs no connect+timeout)
tail latency (p99) on an         HedgingLoadBalancer (race a duplicate after
otherwise healthy pool           HedgeDelay, take the first answer)
gray failure: 200s but one       LatencyEjectingLoadBalancer (relative-to-pool
host is far slower than peers    median; error ejection / breakers miss it)
overload: a slow backend         Target.MaxConcurrent on LeastConnLoadBalancer
draining the pool                (hard per-target bulkhead cap) + a total-
                                 request-deadline middleware (see "Overload")
cold deploy / readiness /        ActiveHealthCheck (probe out-of-band; route
black-holing a fresh pod         only to answering targets) — wraps any balancer
uneven backend capacity          WeightedRoundRobinLoadBalancer (by request
                                 count) or LeastConnLoadBalancer (by concurrency)

Error ejection (EjectingLoadBalancer / CircuitBreakingLoadBalancer) is driven by the IsFailure hook, which by default counts only transport errors other than a client cancel; set it to also treat 5xx as failures. LatencyEjectingLoadBalancer is latency-ONLY — a transport error is not timed and does not eject — so if hard errors are your dominant mode, use an error balancer instead of (or, via an ActiveHealthCheck gate, alongside) it.

All-down semantics — the load-bearing distinction

When EVERY target is unavailable the primitives DIVERGE, and which one you ran decides whether a correlated outage degrades or hard-fails. This is the single most important property to know before an incident:

Primitive                       When all targets are out
-----------------------------   --------------------------------------------
RoundRobinLoadBalancer          FAIL OPEN — routes best-effort to the next
WeightedRoundRobin (SWRR)       slot (a broken signal must not black-hole a
LeastConnLoadBalancer (health)  healthy pool). Empty pool -> ErrUnavailable.
EjectingLoadBalancer            FAIL OPEN — all ejected -> route anyway, so a
LatencyEjectingLoadBalancer     transient outage / systemic slowdown can't
                                black-hole all traffic (slow-but-up beats 503).

LeastConnLoadBalancer           SHEDS 503 — when every target is at its
  (capacity, MaxConcurrent)     MaxConcurrent cap (the bulkhead contract;
                                independent of health — a probe-dark pool still
                                routes best-effort, a saturated one still sheds).
CircuitBreakingLoadBalancer     SHEDS 503 — every target open or probe-
                                saturated returns ErrUnavailable, deliberately
                                shedding load instead of hammering a dead origin.

So the ejecting balancers and plain distribution route BEST-EFFORT under a total outage (preferring a degraded answer to none); the circuit breaker and the least-conn capacity cap SHED a 503. That is intentional: a latency outlier or a flaky host is still serving (route to it), but a hammered dead origin or a saturated pool is better protected by shedding. ActiveHealthCheck never adds an all-down OVERRIDE — when its gate marks every target down each balancer falls back to its OWN policy above, so a broken probe path cannot turn a fail-open pool into a black hole.

A 503 surfaced this way is upstream.ErrUnavailable, which Upstream's error handler maps to HTTP 503 (any other transport error maps to 502), and which prom.Upstream() counts in upstream_fast_rejects_total before any round-trip.

Overload and the MaxConcurrent latch

Target.MaxConcurrent (LeastConnLoadBalancer only) is a HARD per-target cap on in-flight requests — the bulkhead pattern. It bounds blast radius: a slow backend holds at most this many requests and cannot drain the pool; surplus routes to an under-cap target, and only when EVERY target is full does the balancer shed 503.

A slot is held until the response BODY is closed, not at the response headers, so the cap bounds true end-to-end concurrency including slow body streams. Nothing else reclaims a slot: a backend that sends headers then stalls mid-body keeps its slot until the request context is cancelled. No http.Transport timeout covers that stall (ResponseHeaderTimeout bounds only time-to-headers; IdleConnTimeout reaps only idle pooled conns), and the existing write-header timeout in pkg/timeout disarms once upstream headers are written — so it does NOT cover a mid-body stall either. Without a bound on TOTAL request time, after MaxConcurrent stalled requests the cap becomes a LATCH, not a limiter, and the target sheds all traffic permanently. To defend against that, pair the cap with a total-request-deadline middleware in pkg/timeout (a request-scoped context deadline the transport honors, covering the whole request including the body) so a stalled request is cancelled and its slot reclaimed.

Composition

The primitives COMPOSE; reach for more than one when you face more than one failure mode:

  • ActiveHealthCheck wraps ANY balancer. It owns one probe goroutine per target and publishes a per-target up/down gate that the wrapped balancer consults inside its OWN pick. Active health only ever REMOVES candidates — it never overrides the balancer's own (passive) verdict — so a target must satisfy BOTH the active gate AND the balancer's strategy to be chosen: the two compose by AND. The wrapped balancer keeps its exact strategy over the survivors (a weighted balancer keeps its ratio, the breaker still trips, least-conn still balances). Pass the SAME []*Target to both the balancer and the wrapper so the gate's indices line up.

  • HedgingLoadBalancer wraps ANY balancer. After HedgeDelay it duplicates the in-flight (idempotent, body-less) request via a second call to the wrapped balancer, which self-selects a different target, and returns the first winner. Hedging and the proxy's retry loop layer rather than multiply. If the wrapped balancer has a custom IsFailure, it MUST exclude context.Canceled or a cancelled losing leg slowly ejects/trips the healthy backend it raced.

  • Passive + active gate together by AND: e.g. EjectingLoadBalancer's pick takes a target only when it is both not-ejected (passive) AND gate-up (active).

A sensible production stack therefore reads outside-in as: ActiveHealthCheck -> (Hedging ->) a CircuitBreaking or Ejecting balancer over the target pool. See the runnable Example (ExampleNewActiveHealthCheck and the composition example) for how the pieces wire together.

Observability

Two hooks make the stack observable; both are nil-by-default (zero hot-path cost) and the callee owns its own concurrency. Wire them to pkg/prom and leave pkg/upstream free of any Prometheus dependency:

  • Upstream.OnRoundTrip (a RoundTripFunc) fires once per attempt (each retry included) with the resolved host, status, time-to-headers, and error. Assign prom.Upstream() to it for upstream_requests{host,status}, upstream_request_duration_seconds{host}, and upstream_fast_rejects_total{host} (the all-down 503s shed before any round-trip).

  • A balancer's OnStateChange (a StateChangeFunc) fires once per per-target transition (concurrent threshold crossers collapse to one), after the new state is published. Assign prom.UpstreamState() to it for upstream_breaker_state{host} (gauge: 0 closed / 1 open / 2 half_open), upstream_state_transitions_total{host,from,to,reason} (the authoritative signal — alert on it), and upstream_probe_down_total{host,cause}. The cause label is ActiveHealthCheck's classified probe-down reason — one of timeout, refused, reset, dns, tls, status, error (a bounded closed set) — for telling a bad probe path from a dead backend from a too-tight Timeout mid-incident.

The ejecting balancers report ReasonEject / ReasonRecover; the circuit breaker reports the full Closed/Open/HalfOpen edge set (ReasonTrip, ReasonReopen, ReasonHeal, ReasonProbe, ReasonExpire); ActiveHealthCheck reports ReasonProbeDown (carrying the cause) and ReasonProbeRecover.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrUnavailable = errors.New("upstream: unavailable")
)

Errors

Functions

This section is empty.

Types

type ActiveHealthCheck added in v0.18.0

type ActiveHealthCheck struct {

	// Targets is the set of upstreams to probe; MUST be the same slice the wrapped
	// Balancer was built over (see NewActiveHealthCheck).
	Targets []*Target

	// Balancer is the wrapped strategy. It receives the health gate if it implements
	// the internal gate interface (all package balancers do).
	Balancer http.RoundTripper

	// Path and Method are the probe request's URL path and HTTP method. Default "/"
	// and GET.
	Path   string
	Method string

	// Scheme is the probe URL scheme. It matters ONLY for targets backed by the
	// dynamic multi-scheme Transport, which dispatches on it (use "h2c" or "unix" to
	// match an h2c/unix data path); the dedicated transports (HTTPTransport,
	// HTTPSTransport, H2CTransport, UnixTransport) force their own scheme and ignore
	// it. Default "http". A wrong scheme here probes the wrong protocol and can drive
	// a healthy backend down, so set it (or ProbeTransport) for non-http data paths.
	Scheme string

	// Interval is the time between probes for a target. Default 10s.
	Interval time.Duration

	// Timeout bounds a single probe (a per-probe context deadline). Default 5s.
	Timeout time.Duration

	// HealthyThld is the number of consecutive successful probes that mark a down
	// target up again. Default 1 (recover fast).
	HealthyThld int

	// UnhealthyThld is the number of consecutive failed probes that mark an up target
	// down. Default 3.
	UnhealthyThld int

	// ProbeTransport overrides the transport used for probes; nil reuses each
	// Target.Transport (so the probe exercises the real connection pool/TLS). Set it
	// to isolate probe traffic from the data pool, or for exotic schemes. Note a probe
	// shares the data transport's per-host connection budget: with a very small
	// MaxConn (e.g. 1) an in-flight probe can hold the only connection for up to
	// Timeout, briefly blocking data requests to that origin — set ProbeTransport to
	// avoid the contention.
	ProbeTransport http.RoundTripper

	// StartUnhealthy makes targets begin DOWN (fail-closed cold start): only the first
	// successful probe admits them. Default false — targets begin up, so a
	// misconfigured probe path cannot black-hole a fresh deploy.
	StartUnhealthy bool

	// IsHealthy decides whether a probe result is healthy; nil treats a non-error
	// response with status < 400 as healthy.
	IsHealthy func(resp *http.Response, err error) bool

	// OnStateChange observes this target's active-health gate flipping: ReasonProbeDown
	// (UnhealthyThld consecutive failing probes took an up target down, From StateClosed
	// To StateOpen, carrying a classified ProbeCause) and ReasonProbeRecover (HealthyThld
	// consecutive successes readmitted a down one, From StateOpen To StateClosed,
	// CauseNone). nil disables it at zero cost. It fires synchronously on the target's
	// sole prober goroutine, exactly once per crossing, AFTER the gate bit is published;
	// the callee owns its own concurrency across targets (see prom.UpstreamState). The
	// initial gate (StartUnhealthy or not) is NOT a transition and fires nothing — with
	// StartUnhealthy the first admitting probe is a genuine ReasonProbeRecover. A
	// graceful shutdown / Close never emits a spurious ReasonProbeDown. The ProbeCause
	// label is a bounded closed set.
	OnStateChange StateChangeFunc
	// contains filtered or unexported fields
}

ActiveHealthCheck adds out-of-band health probing to a wrapped balancer. It is a drop-in http.RoundTripper for upstream.New: it owns one probe goroutine per target, and publishes each target's verdict into a shared []atomic.Bool "gate" that the wrapped balancer consults in its existing pick. Active health only ever REMOVES candidates — it never overrides the balancer's own (passive) verdict, so a target must satisfy BOTH the active gate AND the balancer's strategy to be chosen; the two compose by AND. When the gate marks a target down, the balancer routes around it per its own policy; when EVERY target is gated down the balancer fails open over its own all-down semantics (RoundRobin/Ejecting/LatencyEjecting/ LeastConn route best-effort; CircuitBreaking still sheds 503) — active HC never adds an all-down override, so a broken probe path cannot black-hole a whole pool.

Lifecycle: probing auto-starts on the first RoundTrip and (when served by a parapet.Server, via ServerContextKey) stops on graceful shutdown, like pkg/healthz. For a bare RoundTripper, or to fully close the shutdown-race window, call Start(ctx) before serving and Close() after. Close() drains every probe goroutine, so none outlives it.

Configuration fields are read once, before the first probe; set them before serving.

func NewActiveHealthCheck added in v0.18.0

func NewActiveHealthCheck(targets []*Target, inner http.RoundTripper) *ActiveHealthCheck

NewActiveHealthCheck wraps a balancer with background health probing. inner MUST be a balancer built over the SAME []*Target (same pointers, same order) as targets, so the health gate's indices line up — pass the identical slice to both, e.g. NewActiveHealthCheck(targets, NewEjectingLoadBalancer(targets)). Every balancer in this package implements the gate, so inner skips probe-down targets inside its OWN pick (preserving its strategy); a custom RoundTripper that does not implement it still gets probed, but probing cannot influence its routing.

Example

Add ACTIVE health checking: probe each target out-of-band and route only to those answering, on top of any balancer's own (passive) strategy. Pass the SAME []*Target to both the balancer and the wrapper so the health gate's indices line up. Active HC only removes candidates; the balancer keeps its strategy over the survivors and composes with passive ejection. When served by a parapet.Server it stops on graceful shutdown automatically; call Start(ctx)/Close() for explicit control.

package main

import (
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	targets := []*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
	}
	ahc := upstream.NewActiveHealthCheck(targets, upstream.NewRoundRobinLoadBalancer(targets))
	ahc.Path = "/healthz"
	ahc.Interval = 5 * time.Second
	ahc.UnhealthyThld = 3 // down after 3 consecutive failed probes
	// Observe gate flips (probe-down with a classified cause / probe-recover); wire to
	// prom.UpstreamState in production for upstream_probe_down_total{host,cause}.
	ahc.OnStateChange = func(c upstream.StateChange) {
		_ = c.Reason // ReasonProbeDown or ReasonProbeRecover
		_ = c.Cause  // classified failure cause on a down event, e.g. "timeout"
	}

	s := parapet.New()
	s.Use(upstream.New(ahc)) // ahc is the proxy's transport, like any balancer
}
Example (Compose)

Compose a production reliability stack: ACTIVE health probing wrapping a CIRCUIT BREAKER over the pool. The breaker fails fast (an open target costs no round-trip) and sheds 503 when every target is open; active health probes out-of-band and only REMOVES probe-down targets from the breaker's pick — the two gate together by AND. Pass the SAME []*Target to both so their indices line up, and wire each layer's OnStateChange to prom.UpstreamState() so trips, recoveries, and probe-downs are observable. See pkg/upstream's package doc for the failure-mode / all-down- semantics guide this example illustrates.

package main

import (
	"net/http"
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	targets := []*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
		{Host: "10.0.0.3:8080", Transport: tr},
	}

	// Inner: a circuit breaker that fails fast and sheds 503 on a correlated outage.
	cb := upstream.NewCircuitBreakingLoadBalancer(targets)
	cb.FailureThreshold = 5
	cb.OpenTimeout = 5 * time.Second
	cb.IsFailure = func(resp *http.Response, err error) bool {
		return err != nil || (resp != nil && resp.StatusCode >= 500)
	}
	cb.OnStateChange = func(c upstream.StateChange) {
		_ = c.From // wire to prom.UpstreamState() in production
		_ = c.To
		_ = c.Reason // ReasonTrip / ReasonHeal / ReasonProbe / ...
	}

	// Outer: active probing that gates the breaker's pick (only removes candidates).
	ahc := upstream.NewActiveHealthCheck(targets, cb)
	ahc.Path = "/healthz"
	ahc.Interval = 5 * time.Second
	ahc.UnhealthyThld = 3
	ahc.OnStateChange = func(c upstream.StateChange) {
		_ = c.Reason // ReasonProbeDown / ReasonProbeRecover
		_ = c.Cause  // classified failure cause on a probe-down (e.g. "timeout")
	}

	u := upstream.New(ahc) // ahc is the proxy's transport, like any balancer
	u.OnRoundTrip = func(r *http.Request, info upstream.RoundTripInfo) {
		_ = info.Host   // resolved target; "" when shed before any pick
		_ = info.Status // wire to prom.Upstream() in production
	}

	s := parapet.New()
	s.Use(u)
}

func (*ActiveHealthCheck) Close added in v0.18.0

func (a *ActiveHealthCheck) Close() error

Close stops probing and drains every probe goroutine; idempotent. After Close, start is a no-op, so a late first-RoundTrip never resurrects the prober.

func (*ActiveHealthCheck) RoundTrip added in v0.18.0

func (a *ActiveHealthCheck) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip starts probing (once), wires graceful shutdown on the lazy path, then defers to the wrapped balancer — the gate already filters its pick, so this never reroutes.

func (*ActiveHealthCheck) Start added in v0.18.0

func (a *ActiveHealthCheck) Start(ctx context.Context)

Start begins probing under ctx (cancel ctx, or call Close, to stop). Use it for a bare RoundTripper or to bound the prober's lifetime explicitly; otherwise the first RoundTrip auto-starts probing. Calling it after Close, or after a lazy start, is a no-op.

type CircuitBreakingLoadBalancer added in v0.18.0

type CircuitBreakingLoadBalancer struct {

	// Targets is the set of upstreams to balance across.
	Targets []*Target

	// FailureThreshold is the number of consecutive failures that trips a CLOSED
	// target to OPEN. Defaults to 5.
	FailureThreshold int

	// SuccessThreshold is the number of consecutive HALF-OPEN probe successes that
	// closes a target. Defaults to 2.
	SuccessThreshold int

	// OpenTimeout is the base cooldown a target stays OPEN before a probe is allowed.
	// It doubles on each repeat trip, capped at MaxOpenTimeout. Defaults to 5s.
	OpenTimeout time.Duration

	// MaxOpenTimeout caps the backed-off OPEN cooldown. Defaults to 1m.
	MaxOpenTimeout time.Duration

	// ProbeTimeout bounds how long a HALF-OPEN probe may hold its slot. If a probe
	// does not complete within it (e.g. a transport with no response timeout hangs),
	// its slot is reclaimed and the target re-opens, so one stuck probe cannot wedge
	// the target half-open forever. Set it above the upstream transport's response
	// timeout so it only fires for genuinely hung probes. Defaults to 2m (above the
	// HTTP transports' 1m default response-header timeout).
	ProbeTimeout time.Duration

	// HalfOpenMaxProbes is the maximum number of concurrent HALF-OPEN probes (a
	// connect+headers admission cap, not an in-flight-body cap). Defaults to 1.
	HalfOpenMaxProbes int32

	// IsFailure decides whether a round-trip result counts as a failure. When nil,
	// any transport error other than a client-canceled request counts. Set it to
	// also treat responses such as 5xx as failures. In HALF-OPEN a non-failure
	// counts as a probe success, so by default a client-canceled probe nudges the
	// breaker toward closing rather than being treated as neutral (a wrongly closed
	// but still-broken target simply re-trips on the next real failure).
	IsFailure func(resp *http.Response, err error) bool

	// OnStateChange observes per-target circuit state transitions (nil disables);
	// see prom.UpstreamState. It is fired synchronously from the goroutine that
	// commits the transition, exactly once per transition, after the new state is
	// published. The callee owns its own concurrency.
	OnStateChange StateChangeFunc
	// contains filtered or unexported fields
}

CircuitBreakingLoadBalancer is a round-robin load balancer that wraps each target in a circuit breaker. Unlike EjectingLoadBalancer (which keeps routing — paying the full connect+timeout — to a degraded target and fails open when all targets are out), this REJECTS a request to an open target without a round-trip: pick skips it and selects a healthy peer in the same call, for every method.

Each target moves through the canonical three states:

  • CLOSED: routes normally; FailureThreshold consecutive failures trip it to OPEN.
  • OPEN: skipped during selection (fail fast, no round-trip) for OpenTimeout, which backs off (doubling, capped at MaxOpenTimeout) on each repeat trip.
  • HALF-OPEN: admits up to HalfOpenMaxProbes trial requests; SuccessThreshold consecutive successes close it, one failure re-opens it with a longer backoff. A single success clears a CLOSED target's failure count.

When EVERY target is open (or probe-saturated) it returns ErrUnavailable (a 503) rather than failing open — deliberately shedding load instead of hammering a dead origin during a correlated outage. Use EjectingLoadBalancer if you want fail-open.

It is a drop-in http.RoundTripper for upstream.New, ignores Target.Weight (it is a plain round-robin balancer), and tracks state with per-target atomics so the hot path is lock-free. A failure is observed at the response headers (a transport error, or via IsFailure), not at body close — so a backend that returns 200 headers then fails mid-body is not seen as a failure (set IsFailure to catch 5xx at the status line). Configuration fields are read once; set them before serving.

func NewCircuitBreakingLoadBalancer added in v0.18.0

func NewCircuitBreakingLoadBalancer(targets []*Target) *CircuitBreakingLoadBalancer

NewCircuitBreakingLoadBalancer creates a round-robin load balancer with a per-target circuit breaker.

Example

Add circuit breaking: a target that fails repeatedly is opened and REJECTED without a round-trip (fail fast), then probed for recovery via a half-open trickle. Unlike ejection, when every target is open it returns 503 rather than hammering a dead origin. Here IsFailure also counts 5xx as failures.

package main

import (
	"net/http"
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	lb := upstream.NewCircuitBreakingLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
	})
	lb.FailureThreshold = 5
	lb.OpenTimeout = 10 * time.Second
	lb.IsFailure = func(resp *http.Response, err error) bool {
		return err != nil || (resp != nil && resp.StatusCode >= 500)
	}

	s := parapet.New()
	s.Use(upstream.New(lb))
}

func (*CircuitBreakingLoadBalancer) RoundTrip added in v0.18.0

RoundTrip routes the request to a healthy target, skipping open ones, and records the outcome against the chosen target's breaker.

type EjectingLoadBalancer added in v0.18.0

type EjectingLoadBalancer struct {

	// Targets is the set of upstreams to balance across.
	Targets []*Target

	// MaxFails is the number of consecutive failures that ejects a target.
	// Defaults to 3.
	MaxFails int

	// EjectTimeout is the base cooldown a target stays ejected. It doubles on
	// each consecutive ejection, capped at MaxEjectTimeout. Defaults to 30s.
	EjectTimeout time.Duration

	// MaxEjectTimeout caps the ejection cooldown. Defaults to 5m.
	MaxEjectTimeout time.Duration

	// IsFailure decides whether a round-trip result counts as a failure. When
	// nil, any transport error other than a client-canceled request counts.
	// Set it to also treat responses such as 5xx as failures.
	IsFailure func(resp *http.Response, err error) bool

	// OnStateChange observes a target being ejected (ReasonEject) or returned to
	// rotation on a confirmed success (ReasonRecover); nil disables it. It reflects
	// committed eject/recover events, NOT cooldown-expiry rotation membership: a
	// target whose cooldown has expired but has not yet served a successful request
	// still reads StateOpen until that success. Alert on the prom.UpstreamState
	// transitions counter, which is exact. The callee owns its own concurrency.
	OnStateChange StateChangeFunc
	// contains filtered or unexported fields
}

EjectingLoadBalancer is a round-robin load balancer that passively tracks per-target failures and temporarily ejects targets that fail repeatedly.

A target is ejected once it returns MaxFails consecutive failures, and stays ejected for EjectTimeout (doubling on each repeat ejection up to MaxEjectTimeout). Ejected targets are skipped during selection; when their cooldown expires they become selectable again with no background probing. A single successful response clears a target's failure count and backoff.

If every target is ejected the balancer fails open and routes anyway, so a transient outage cannot black-hole all traffic.

Configuration fields are read once, before the first request; set them before serving.

func NewEjectingLoadBalancer added in v0.18.0

func NewEjectingLoadBalancer(targets []*Target) *EjectingLoadBalancer

NewEjectingLoadBalancer creates a round-robin load balancer with passive health checking (outlier ejection).

Example

Add passive health checking: a target that returns repeated failures is ejected from rotation for a backed-off cooldown. Here IsFailure also counts 5xx responses, not just transport errors.

package main

import (
	"net/http"
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	lb := upstream.NewEjectingLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
	})
	lb.MaxFails = 5
	lb.EjectTimeout = 10 * time.Second
	lb.IsFailure = func(resp *http.Response, err error) bool {
		return err != nil || (resp != nil && resp.StatusCode >= 500)
	}

	s := parapet.New()
	s.Use(upstream.New(lb))
}

func (*EjectingLoadBalancer) RoundTrip added in v0.18.0

func (l *EjectingLoadBalancer) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip sends a request to a healthy upstream server.

type H2CTransport added in v0.9.0

type H2CTransport struct {
	HTTP2Transport *http2.Transport
	HTTPTransport  *http.Transport

	// DialContext, if non-nil, replaces the default net.Dialer used to open
	// TCP connections to the upstream. nil uses an internal net.Dialer with
	// the default dial timeout and keep-alive (today's behavior, unchanged).
	// Use it to observe dial errors, re-resolve endpoints, or wrap the
	// connection. A custom dialer is responsible for honoring its own
	// timeouts; the default dial timeout is ignored when DialContext is set.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

H2CTransport type

func (*H2CTransport) RoundTrip added in v0.9.0

func (t *H2CTransport) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip implement http.RoundTripper

type HTTPSTransport added in v0.9.0

type HTTPSTransport struct {
	DialTimeout           time.Duration
	TCPKeepAlive          time.Duration
	DisableKeepAlives     bool
	MaxConn               int
	MaxIdleConns          int
	IdleConnTimeout       time.Duration
	ExpectContinueTimeout time.Duration
	ResponseHeaderTimeout time.Duration
	TLSClientConfig       *tls.Config

	// DialContext, if non-nil, replaces the default net.Dialer used to open
	// TCP connections to the upstream (the TLS handshake is then performed on
	// top of that connection). nil uses an internal net.Dialer built from
	// DialTimeout/TCPKeepAlive (today's behavior, unchanged). Use it to
	// observe dial errors, re-resolve endpoints, or wrap the connection. A
	// custom dialer is responsible for honoring its own timeouts; DialTimeout
	// is ignored when DialContext is set.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

HTTPSTransport type

func (*HTTPSTransport) RoundTrip added in v0.9.0

func (t *HTTPSTransport) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip implement http.RoundTripper

type HTTPTransport added in v0.9.0

type HTTPTransport struct {
	DialTimeout           time.Duration
	TCPKeepAlive          time.Duration
	DisableKeepAlives     bool
	MaxConn               int
	MaxIdleConns          int
	IdleConnTimeout       time.Duration
	ExpectContinueTimeout time.Duration
	ResponseHeaderTimeout time.Duration

	// DialContext, if non-nil, replaces the default net.Dialer used to open
	// TCP connections to the upstream. nil uses an internal net.Dialer built
	// from DialTimeout/TCPKeepAlive (today's behavior, unchanged). Use it to
	// observe dial errors, re-resolve endpoints, or wrap the connection. A
	// custom dialer is responsible for honoring its own timeouts; DialTimeout
	// is ignored when DialContext is set.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

HTTPTransport type

Example (DialContext)

Wrap the transport's dialer to observe dial failures (e.g. to log them or emit a metric) without changing how the connection is made. DialContext is an optional seam: leaving it nil keeps the default net.Dialer behavior. When it is set, the custom dialer owns its own timeouts — DialTimeout is ignored.

package main

import (
	"context"
	"log"
	"net"
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	base := &net.Dialer{Timeout: 2 * time.Second}

	tr := &upstream.HTTPTransport{
		DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
			conn, err := base.DialContext(ctx, network, addr)
			if err != nil {
				log.Printf("upstream dial failed: addr=%s err=%v", addr, err)
				return nil, err
			}
			return conn, nil
		},
	}

	s := parapet.New()
	s.Use(upstream.SingleHost("10.0.0.1:8080", tr))
}

func (*HTTPTransport) RoundTrip added in v0.9.0

func (t *HTTPTransport) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip implement http.RoundTripper

type HedgingLoadBalancer added in v0.18.0

type HedgingLoadBalancer struct {

	// Next is the wrapped balancer. Each attempt calls Next.RoundTrip, which picks
	// and advances past a target, so a hedge lands on a different one.
	Next http.RoundTripper

	// HedgeDelay is how long to wait for the in-flight request before launching a
	// hedge. <= 0 disables hedging (pure pass-through to Next.RoundTrip).
	HedgeDelay time.Duration

	// MaxHedge is the number of extra speculative attempts beyond the original (each
	// HedgeDelay-spaced). Defaults to 1 (at most one hedge -> 2x fan-out).
	MaxHedge int

	// HedgeOnError launches the next hedge immediately on a losing transport error
	// rather than waiting out HedgeDelay. NewHedgingLoadBalancer enables it; a bare
	// HedgingLoadBalancer{} literal leaves it false (a bool can't be defaulted in init).
	HedgeOnError bool

	// IsHedgeable decides whether a request may be hedged; nil uses the idempotent
	// body-less rule (and never hedges a request already being retried).
	IsHedgeable func(r *http.Request) bool

	// IsWinner decides whether a leg's result wins the race; nil means "a response
	// with no transport error". Set it to, say, only accept non-5xx.
	IsWinner func(resp *http.Response, err error) bool
	// contains filtered or unexported fields
}

HedgingLoadBalancer races an idempotent, body-less request against up to MaxHedge hedges, returns the first response to win, and cancels the losing legs. After HedgeDelay the in-flight request is duplicated via a second call to the wrapped balancer, which self-selects (and advances past) a target, so the hedge naturally lands on a different host. The race happens entirely inside RoundTrip, so the proxy only ever sees the single winner — there is no concurrent write to the client.

It is a drop-in http.RoundTripper for upstream.New and composes with every balancer. Only idempotent body-LESS requests are hedged (GET/HEAD/OPTIONS/TRACE with no body); this is deliberately stricter than the Upstream retry path, which also retries rewindable (GetBody) body-bearing requests. A hedge launches a second concurrent leg, and r.Clone only shallow-copies Body, so two legs would share one reader — hedging a body-bearing request without per-leg GetBody rewind would let a leg send a consumed/empty body. A request already inside the proxy's retry loop is not additionally hedged — retries and hedges layer, never multiply. HedgeDelay <= 0 disables hedging entirely (no timer, no clone, no goroutine).

The wrapped balancer's Next.RoundTrip MUST honor request-context cancellation (every transport in this package does): a hedge cancels the losing legs, so a transport that ignores cancellation would leak a draining goroutine per hedge. For the same reason, if you give the wrapped balancer a custom IsFailure, it MUST exclude context.Canceled (the default does) — otherwise every cancelled losing leg is counted as a failure and slowly ejects/trips the healthy backend it raced.

Configuration fields are read once; set them before serving.

func NewHedgingLoadBalancer added in v0.18.0

func NewHedgingLoadBalancer(next http.RoundTripper) *HedgingLoadBalancer

NewHedgingLoadBalancer wraps a load balancer (or any http.RoundTripper that picks a target per call) with speculative-retry hedging to cut tail latency. HedgeDelay is left 0, so the returned wrapper is a zero-cost pass-through until you set it.

Example

Cut tail latency by hedging: wrap any balancer so a slow idempotent request is duplicated to another target after HedgeDelay, taking the first response. It composes with every balancer (here round-robin). If the wrapped balancer uses a custom IsFailure, exclude context.Canceled so cancelled losing legs don't eject the healthy backend they raced.

package main

import (
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	lb := upstream.NewRoundRobinLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
	})
	h := upstream.NewHedgingLoadBalancer(lb)
	h.HedgeDelay = 30 * time.Millisecond // ~p95; <= 0 disables

	s := parapet.New()
	s.Use(upstream.New(h)) // h is the proxy's transport, like any balancer
}

func (*HedgingLoadBalancer) RoundTrip added in v0.18.0

func (l *HedgingLoadBalancer) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip races the request against hedges and returns the winner.

type LatencyEjectingLoadBalancer added in v0.18.0

type LatencyEjectingLoadBalancer struct {

	// Targets is the set of upstreams to balance across.
	Targets []*Target

	// EjectionFactor ejects a target whose decayed mean TTFB is >= the pool median
	// times this. Must be > 1 (a factor <= 1 would eject the median itself).
	// Defaults to 3.0; below ~2.0 is aggressive (ejects on normal tail jitter).
	EjectionFactor float64

	// MinSamples is the number of measured round-trips a target must accumulate
	// before it is eligible to be ejected or to count toward the pool median. It
	// gates cold-start noise (a first request's TLS handshake / cold pool) and, since
	// a re-probed target's sample count is reset on ejection, also gates re-probes.
	// Defaults to 100.
	MinSamples uint64

	// MinHosts is the minimum number of eligible targets for a valid pool median; a
	// pool with fewer never latency-ejects. Defaults to 3 (a median needs >= ~3 to be
	// robust to the outlier it is meant to expose).
	MinHosts int

	// HalfLife is the wall-clock half-life of the time-decayed latency EWMA, so the
	// effective window is independent of a target's request rate. Defaults to 10s.
	HalfLife time.Duration

	// MinEjectDelta is an additive floor: a target must also exceed the median by at
	// least this to be ejected, which kills the ratio instability of a fast pool (3ms
	// is 3x a 1ms median but operationally irrelevant). Defaults to 50ms.
	MinEjectDelta time.Duration

	// MinEjectLatency is an absolute floor: a target below it is never ejected,
	// however slow it is relative to peers. Defaults to 0 (off).
	MinEjectLatency time.Duration

	// MaxEjectionPercent caps the fraction of the pool that may be latency-ejected at
	// once, so the pool cannot progressively drain. Defaults to 30. It is a near-hard
	// cap: a concurrent burst of decisions can transiently overshoot it by the
	// in-flight concurrency (the counts are lock-free scans), self-correcting as
	// cooldowns expire — the exact anti-oscillation guarantee comes from the median
	// being robust, not from this count.
	MaxEjectionPercent int

	// PanicThreshold: if more than this percent of the pool is ejected, the slowdown
	// is treated as systemic — no further ejections happen and pick routes to all
	// targets (including slow ones). Defaults to 50; init raises it above
	// MaxEjectionPercent so it is never dead code.
	PanicThreshold int

	// EjectTimeout is the base cooldown a target stays ejected, doubling on each
	// repeat ejection up to MaxEjectTimeout. Defaults to 30s.
	EjectTimeout time.Duration

	// MaxEjectTimeout caps the ejection cooldown. Defaults to 5m.
	MaxEjectTimeout time.Duration

	// OnStateChange observes a target being ejected (ReasonEject) or healed back into
	// rotation (ReasonRecover); nil disables it. Like EjectingLoadBalancer, it
	// reflects committed eject/recover events, not cooldown-expiry rotation
	// membership. See prom.UpstreamState. The callee owns its own concurrency.
	OnStateChange StateChangeFunc
	// contains filtered or unexported fields
}

LatencyEjectingLoadBalancer is a round-robin load balancer that ejects a "gray failure" — a target that keeps returning 200s but whose mean time-to-first-byte has crept far above its peers, which error-based ejection and the circuit breaker both miss. A target whose decayed mean TTFB exceeds EjectionFactor x the pool median (and a small absolute slack) is taken out of rotation for a backed-off cooldown and passively re-probed, reusing EjectingLoadBalancer's eject mechanics.

It is LATENCY-ONLY: a transport error is not timed (a ~0ns connection-refused would falsely lower the mean and make a dead host look fast) and does not eject a target here. An Upstream uses one balancer, so if hard errors are your dominant failure mode use EjectingLoadBalancer or the circuit breaker INSTEAD (they do not catch gray failures) — choose by failure mode. When all targets are ejected, or when more than PanicThreshold% are (a systemic slowdown), it FAILS OPEN and routes to all, including the slow ones: a slow-but-up host beats a 503. This is deliberately the opposite of CircuitBreakingLoadBalancer, which sheds load — appropriate because a latency outlier is still serving.

Detection is relative-to-pool, so it self-tunes: a uniform slowdown raises every target and the median together, so no one is an outlier. Two structural limits follow from that: a pool of fewer than MinHosts never latency-ejects (you cannot name an outlier without a baseline), and a slowdown affecting a MAJORITY of the pool is not detected (the slow hosts become the median) — that is a systemic event, handled by failing open, not a per-host outlier. It also tracks the MEAN, so a tail-only regression (healthy median, fat p99) is not caught, and it assumes targets see statistically similar traffic — a target fronting heavier routes can read as a false outlier. Weight is ignored. State is per-target atomics; the hot path is lock-free. Configuration fields are read once; set them before serving.

func NewLatencyEjectingLoadBalancer added in v0.18.0

func NewLatencyEjectingLoadBalancer(targets []*Target) *LatencyEjectingLoadBalancer

NewLatencyEjectingLoadBalancer creates a round-robin load balancer with passive latency-based outlier ejection.

Example

Catch a "gray failure" — a backend still returning 200s but far slower than its peers — that error-based ejection misses. A target whose mean latency exceeds EjectionFactor x the pool median is ejected and re-probed. A uniform slowdown ejects no one (it self-tunes against the pool median).

package main

import (
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	lb := upstream.NewLatencyEjectingLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
		{Host: "10.0.0.3:8080", Transport: tr},
	})
	lb.EjectionFactor = 3 // eject a target 3x slower than the pool median
	lb.HalfLife = 10 * time.Second

	s := parapet.New()
	s.Use(upstream.New(lb))
}

func (*LatencyEjectingLoadBalancer) RoundTrip added in v0.18.0

RoundTrip picks a target, times the round-trip, and records the latency.

type LeastConnLoadBalancer added in v0.18.0

type LeastConnLoadBalancer struct {

	// Targets is the set of upstreams to balance across.
	Targets []*Target

	// OnShed observes a shed (this balancer returned ErrUnavailable before any
	// round-trip): ShedEmpty for no targets, ShedSaturated when every gate-up target
	// is at its MaxConcurrent cap, ShedAllDark when the active-HC gate marked the
	// whole pool down. Nil disables it at zero hot-path cost; see prom.UpstreamShed.
	// It fires synchronously on the request goroutine, before ErrUnavailable returns.
	OnShed ShedFunc
	// contains filtered or unexported fields
}

LeastConnLoadBalancer routes each request to the target holding the fewest in-flight requests, weighted by Weight: it minimizes active/Weight, so a target with twice the weight is kept at roughly twice the concurrency. Targets with equal load are served round-robin. It balances by concurrent CONNECTIONS (use WeightedRoundRobinLoadBalancer to balance by request count instead), which adapts to slow backends and long-lived requests that a round-robin count misses.

A request stays counted as in-flight until its response body is closed, so it must be driven by something that closes the body — parapet's reverse proxy does. Used as a bare http.RoundTripper, the caller must close every response Body (the standard RoundTripper contract) or the target's active count leaks. With a Target.MaxConcurrent cap set, a leaked body is worse than skewed routing: it permanently burns a hard slot, so after MaxConcurrent leaks the target sheds all traffic (see the timeout warning on Target.MaxConcurrent).

func NewLeastConnLoadBalancer added in v0.18.0

func NewLeastConnLoadBalancer(targets []*Target) *LeastConnLoadBalancer

NewLeastConnLoadBalancer creates a least-connection load balancer. Configuration fields are read once, before the first request; set them before serving.

Example

Route by live concurrency: each request goes to the target holding the fewest in-flight requests (scaled by Weight), which adapts to slow backends better than counting requests.

package main

import (
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	// MaxConcurrent caps each target's in-flight requests (the bulkhead pattern):
	// the cap is hard, surplus routes to an under-cap target, and once every target
	// is full the balancer sheds with 503. A held slot is freed only when the body
	// is closed, so bound TOTAL request time (a request-context deadline the
	// transport honors) to keep a stalled backend from latching the cap — a
	// response-header timeout alone does not (see the Target.MaxConcurrent docs).
	tr := &upstream.HTTPTransport{ResponseHeaderTimeout: 5 * time.Second}
	lb := upstream.NewLeastConnLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr, MaxConcurrent: 100},
		{Host: "10.0.0.2:8080", Transport: tr, MaxConcurrent: 100},
	})

	s := parapet.New()
	s.Use(upstream.New(lb))
}

func (*LeastConnLoadBalancer) Inflight added in v0.18.0

func (l *LeastConnLoadBalancer) Inflight() []TargetLoad

Inflight returns a live snapshot of every target's current in-flight count and bulkhead cap, for scrape-time metrics (see prom.UpstreamInflight) or tests. It is safe to call concurrently with serving: each Active is a lone atomic load that adds no contention to the claim/dec hot path, and Cap is immutable after init. It is also safe before the first request — it forces init (l.once), so the configured targets are always present (Active 0 until traffic arrives), and a scrape that races the first RoundTrip never sees a nil peers slice. The returned slice is freshly allocated and owned by the caller; call it at scrape cadence, never on the request path.

func (*LeastConnLoadBalancer) RoundTrip added in v0.18.0

func (l *LeastConnLoadBalancer) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip sends a request to the least-loaded target and keeps it counted as in-flight until the response body is closed.

type ProbeCause added in v0.18.0

type ProbeCause uint8

ProbeCause classifies WHY an ActiveHealthCheck probe failed. It is carried on a ReasonProbeDown StateChange (StateChange.Cause) so an operator can tell a bad probe path from a dead backend from a too-tight Timeout mid-incident. It is a CLOSED set — a bounded Prometheus label, never unbounded. The zero value, CauseNone, means "not a probe-down event": every non-ActiveHealthCheck emitter (circuit breaker, ejecting, latency-ejecting) leaves it zero, and a probe RECOVER carries CauseNone too. A transport error matching none of the specific cases collapses into CauseError, so the label set can never grow.

const (
	CauseNone    ProbeCause = iota // not a probe-down event (the zero value)
	CauseTimeout                   // per-probe Timeout deadline fired (context.DeadlineExceeded / net.Error.Timeout)
	CauseRefused                   // connection refused (syscall.ECONNREFUSED): nothing listening
	CauseReset                     // connection reset / closed mid-probe (syscall.ECONNRESET, io.EOF, io.ErrUnexpectedEOF)
	CauseDNS                       // name resolution failed (*net.DNSError)
	CauseTLS                       // TLS handshake / certificate failure (tls.RecordHeaderError, *tls.CertificateVerificationError)
	CauseStatus                    // a response arrived but healthy() rejected it (e.g. status >= 400)
	CauseError                     // any other transport error (catch-all, keeps the set closed)
)

func (ProbeCause) String added in v0.18.0

func (c ProbeCause) String() string

type Reason added in v0.18.0

type Reason uint8

Reason names what drove a transition, for metric labelling.

const (
	ReasonTrip         Reason = iota // closed -> open: failure threshold crossed (circuit breaker)
	ReasonReopen                     // half_open -> open: a probe failed (circuit breaker)
	ReasonHeal                       // half_open -> closed: success threshold met (circuit breaker)
	ReasonProbe                      // open -> half_open: cooldown expired, probing (circuit breaker)
	ReasonExpire                     // half_open -> open: a probe overstayed ProbeTimeout (circuit breaker)
	ReasonEject                      // -> open: an ejecting balancer took a target out of rotation
	ReasonRecover                    // open -> closed: an ejecting balancer returned a target to rotation
	ReasonProbeDown                  // closed -> open: an active health probe failed UnhealthyThld times in a row
	ReasonProbeRecover               // open -> closed: an active health probe succeeded HealthyThld times in a row
)

func (Reason) String added in v0.18.0

func (r Reason) String() string

type RoundRobinLoadBalancer added in v0.9.0

type RoundRobinLoadBalancer struct {
	Targets []*Target
	// contains filtered or unexported fields
}

RoundRobinLoadBalancer strategy

func NewRoundRobinLoadBalancer added in v0.9.0

func NewRoundRobinLoadBalancer(targets []*Target) *RoundRobinLoadBalancer

NewRoundRobinLoadBalancer creates new round-robin load balancer

Example

Spread requests across a pool with plain round-robin. Each Target pairs a host:port with the transport used to reach it; upstream.New wraps the balancer (itself an http.RoundTripper) as the proxy's transport.

package main

import (
	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	lb := upstream.NewRoundRobinLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr},
		{Host: "10.0.0.2:8080", Transport: tr},
		{Host: "10.0.0.3:8080", Transport: tr},
	})

	s := parapet.New()
	s.Use(upstream.New(lb))
}

func (*RoundRobinLoadBalancer) RoundTrip added in v0.9.0

func (l *RoundRobinLoadBalancer) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip sends a request to the next upstream server in round-robin order, skipping any the active-HC gate marks down; if every target is down it falls open to the next slot so traffic is never fully black-holed.

type RoundTripFunc added in v0.18.0

type RoundTripFunc func(r *http.Request, info RoundTripInfo)

RoundTripFunc observes an upstream round-trip. Assign one to Upstream.OnRoundTrip to make the origin observable — see prom.Upstream for Prometheus metrics. It is invoked once per attempt (including each retry), synchronously on the request goroutine, right after the transport returns and before the response unwinds back through the proxy. The callee owns its own concurrency.

type RoundTripInfo added in v0.18.0

type RoundTripInfo struct {
	// Host is the resolved upstream target the request was sent to (r.URL.Host after
	// load balancing). Unlike the client Host header, it is operator-configured and
	// therefore bounded, so it is safe as a metric label. It may be empty when the
	// load balancer rejected the request before choosing a target (ErrUnavailable).
	Host string

	// Duration is the time to the response headers: the transport round-trip
	// (connect + send request + time-to-first-byte), measured before the body is
	// streamed on to the client. It is set on both success and error.
	Duration time.Duration

	// Status is the upstream response's status code, or 0 when the round-trip failed
	// before any response (Err is then non-nil).
	Status int

	// Err is the transport error — connection refused, timeout, ErrUnavailable when
	// the load balancer had no target, etc. — or nil once a response was received,
	// regardless of that response's status code.
	Err error

	// Attempt is the zero-based retry index: 0 on the first try, 1 on the first
	// retry, and so on (read from the proxy's retry context). Existing observers
	// simply see a new zero-valued field.
	Attempt int
}

RoundTripInfo reports one upstream round-trip — a single attempt to a backend, so a retried request produces one per attempt — to a RoundTripFunc.

type ShedFunc added in v0.18.0

type ShedFunc func(ShedReason)

ShedFunc observes a LeastConnLoadBalancer shed: the balancer returned ErrUnavailable for the given reason before any round-trip. Assign one to LeastConnLoadBalancer.OnShed to make bulkhead saturation observable — see prom.UpstreamShed. It is invoked synchronously on the request goroutine at the shed site, exactly once per shed, before ErrUnavailable is returned, and never on a successful pick or on pick's internal CAS re-scan. Nil disables it at zero hot-path cost. Sheds can be high-frequency under sustained overload, so the callee must be cheap and allocation-free (the prom impl is a single counter Inc). The callee owns its own concurrency.

type ShedReason added in v0.18.0

type ShedReason uint8

ShedReason classifies why LeastConnLoadBalancer shed a request before any round-trip (it returned ErrUnavailable, which the proxy maps to 503). It is a CLOSED set — a bounded Prometheus label, never unbounded — reported via ShedFunc. A capacity shed is pool-wide (no single host), so it is named by reason only.

const (
	// ShedEmpty: the pool has no targets at all (len(peers) == 0). A
	// misconfiguration or torn-down pool, not load — distinct from a saturated one.
	ShedEmpty ShedReason = iota
	// ShedSaturated: every gate-up target is at its MaxConcurrent cap. The bulkhead
	// working as designed under overload — the brownout signal.
	ShedSaturated
	// ShedAllDark: the active-HC gate marked every target down and the fail-open
	// re-scan still found nothing admittable. A probe-dark/dead pool, distinct from a
	// merely-saturated healthy one. Unreachable without an active-HC gate installed
	// (a nil gate is "all up", so a no-HC pool sheds as ShedSaturated, never here).
	// The saturated/all_dark split is best-effort under health-state churn: a gate
	// that flips up between the gated scan and the fail-open re-scan can briefly
	// attribute a freshly-saturated shed to all_dark (the shed itself is unaffected).
	ShedAllDark
)

func (ShedReason) String added in v0.18.0

func (r ShedReason) String() string

type State added in v0.18.0

type State uint8

State is a reliability balancer's per-target health state, reported via OnStateChange. The circuit breaker uses all three; EjectingLoadBalancer and LatencyEjectingLoadBalancer use only StateClosed (in rotation) and StateOpen (ejected). The numeric value doubles as the Prometheus gauge value exported by prom.UpstreamState.

const (
	StateClosed   State = iota // routing normally / in rotation
	StateOpen                  // tripped open / ejected — skipped
	StateHalfOpen              // admitting trial probes (circuit breaker only)
)

func (State) String added in v0.18.0

func (s State) String() string

type StateChange added in v0.18.0

type StateChange struct {
	Host   string // resolved upstream target (operator-configured, bounded label)
	From   State
	To     State
	Reason Reason
	// Cause classifies a probe failure on the ActiveHealthCheck down crossing
	// (Reason == ReasonProbeDown). It is CauseNone (the zero value) on every other
	// event, including a probe recover and all circuit-breaker / ejecting transitions.
	Cause ProbeCause
}

StateChange reports one per-target reliability transition to a StateChangeFunc.

type StateChangeFunc added in v0.18.0

type StateChangeFunc func(StateChange)

StateChangeFunc observes a per-target reliability state transition. Assign one to a reliability balancer's OnStateChange to make ejection / circuit-breaker state observable — see prom.UpstreamState. It is invoked synchronously on the goroutine that commits the transition, exactly once per transition (concurrent threshold crossers collapse to one), after the new state is published. The callee owns its own concurrency. Nil disables it at zero hot-path cost.

type Target added in v0.9.0

type Target struct {
	Transport http.RoundTripper
	Host      string

	// Weight biases the weighted balancers toward this target:
	// WeightedRoundRobinLoadBalancer gives it a proportionally larger share of the
	// request COUNT; LeastConnLoadBalancer lets it hold a proportionally larger
	// share of concurrent in-flight requests. Values <= 0 are treated as 1.
	// RoundRobinLoadBalancer, EjectingLoadBalancer, and CircuitBreakingLoadBalancer
	// ignore this field and weight every target equally.
	Weight int

	// MaxConcurrent caps the in-flight requests LeastConnLoadBalancer routes to this
	// target (the bulkhead pattern), isolating blast radius: a slow backend can hold
	// at most this many in-flight requests and cannot drain the pool. A request
	// counts as in-flight until its response body is closed (not at the headers), so
	// the cap bounds true end-to-end concurrency including slow body streams. Beyond
	// the cap, requests route to another under-cap target; when EVERY target is at
	// its cap the balancer sheds (ErrUnavailable -> 503) rather than overloading a
	// saturated origin. The cap is hard — never exceeded, even under a concurrent
	// burst. Values <= 0 mean unbounded (the default). Only LeastConnLoadBalancer
	// honors it; the other balancers ignore it.
	//
	// WARNING: a slot is held until the response body is closed; nothing else
	// reclaims it. A backend that sends headers then stalls mid-body keeps its slot
	// until the request's context is cancelled (which closes the body). No
	// http.Transport timeout covers that stall: ResponseHeaderTimeout bounds only
	// time-to-headers, and IdleConnTimeout reaps only idle pooled connections, never
	// an in-flight stalled one. To bound a held slot you must cap TOTAL request time
	// so the context is cancelled mid-body — a request-scoped context deadline the
	// transport honors (note pkg/timeout disarms once upstream headers are written,
	// so it does NOT cover a mid-body stall). Without such a total-time bound, after
	// MaxConcurrent stalled requests the target sheds all traffic permanently — the
	// cap becomes a latch, not a limiter.
	MaxConcurrent int
}

Target is the load balancer target

type TargetLoad added in v0.18.0

type TargetLoad struct {
	Host   string // resolved upstream target (operator-configured, bounded label)
	Active int64  // in-flight requests right now
	Cap    int64  // MaxConcurrent bulkhead cap; 0 == unbounded
}

TargetLoad is one target's live bulkhead occupancy, returned by LeastConnLoadBalancer.Inflight for scrape-time observability and tests. Cap is 0 when the target is unbounded (Target.MaxConcurrent <= 0). Active is a point-in-time atomic load taken without locking, so across a returned slice the Active values are each individually exact but not a single frozen pool-wide instant — which is exactly what a per-target saturation gauge wants (each bulkhead is independent).

type Transport added in v0.12.3

type Transport struct {
	DialTimeout           time.Duration
	TCPKeepAlive          time.Duration
	DisableKeepAlives     bool
	MaxConn               int
	MaxIdleConns          int
	IdleConnTimeout       time.Duration
	ExpectContinueTimeout time.Duration
	ResponseHeaderTimeout time.Duration
	DisableCompression    bool
	TLSClientConfig       *tls.Config

	// DialContext, if non-nil, replaces the default net.Dialer used to open
	// TCP connections to the upstream for the http and h2c (plaintext) paths.
	// nil uses an internal net.Dialer built from DialTimeout/TCPKeepAlive
	// (today's behavior, unchanged). Use it to observe dial errors, re-resolve
	// endpoints, or wrap the connection. A custom dialer is responsible for
	// honoring its own timeouts; DialTimeout is ignored when DialContext is
	// set. The unix-socket path is unaffected.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

Transport does RoundTrip dynamically from request's scheme

Example

Pick the transport for the backend's protocol. Transport auto-selects per the request URL scheme (http/https, h2c for cleartext HTTP/2, unix sockets), so one instance can front a mixed pool; the dedicated transports pin a single protocol.

package main

import (
	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	s := parapet.New()
	s.Use(upstream.SingleHost("10.0.0.1:8080", &upstream.Transport{
		MaxIdleConns:       64,
		DisableCompression: true,
	}))
}

func (*Transport) RoundTrip added in v0.12.3

func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip implement http.RoundTripper

type UnixTransport added in v0.9.0

type UnixTransport struct {
	DisableKeepAlives     bool
	MaxIdleConns          int
	IdleConnTimeout       time.Duration
	ExpectContinueTimeout time.Duration
	ResponseHeaderTimeout time.Duration
	// contains filtered or unexported fields
}

UnixTransport type

func (*UnixTransport) RoundTrip added in v0.9.0

func (t *UnixTransport) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip implement http.RoundTripper

type Upstream

type Upstream struct {
	Transport   http.RoundTripper
	ErrorLog    *log.Logger
	OnRoundTrip RoundTripFunc // observe each origin round-trip (nil disables); see prom.Upstream

	// RetryPolicy decides whether a request is eligible to be retried after a
	// transport error. nil uses the default canRetry: an idempotent method
	// (GET/HEAD/OPTIONS/TRACE) AND a body that is either absent or rewindable
	// (r.Body is nil/http.NoBody, OR r.GetBody != nil). Set it to widen
	// eligibility — e.g. to retry an idempotent PUT/DELETE — or to narrow it.
	//
	// RETRY AMPLIFICATION — READ BEFORE WIDENING. An eligible request may be sent
	// to upstreams up to Retries+1 times (one initial attempt plus Retries
	// re-attempts). If the same Upstream is also fronted by a HedgingLoadBalancer,
	// each of those attempts can additionally fan out to MaxHedge speculative
	// copies, so the worst-case origin load multiplies (≈ (Retries+1) × (MaxHedge+1)).
	// Size Retries (and MaxHedge) for that ceiling, and only mark a request
	// retryable here when the upstream is genuinely idempotent for it — a retried
	// non-idempotent request can double-apply a side effect (a duplicate POST, a
	// second charge). A body-bearing request is only retried when r.GetBody is set
	// (so each attempt can be rewound to the full body); without GetBody, even an
	// eligible method is not retried.
	RetryPolicy func(r *http.Request) bool

	Host          string // override host
	Path          string // target prefix path
	Retries       int
	BackoffFactor time.Duration
}

Upstream controls request flow to upstream server via load balancer

Example

Tune the proxy and its transport: rewrite the upstream Host header, prefix a target path, cap the retry budget, and bound the dial / response timeouts.

package main

import (
	"time"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	m := upstream.SingleHost("10.0.0.1:8080", &upstream.HTTPTransport{
		DialTimeout:           2 * time.Second,
		ResponseHeaderTimeout: 30 * time.Second,
		MaxIdleConns:          64,
	})
	m.Host = "api.internal" // Host header sent to the backend
	m.Path = "/v1"          // prefix joined ahead of the request path
	m.Retries = 2           // idempotent requests only; 0 disables retries
	m.BackoffFactor = 100 * time.Millisecond

	s := parapet.New()
	s.Use(m)
}
Example (OnRoundTrip)

Observe each origin round-trip via OnRoundTrip — invoked once per attempt with the resolved target, status, latency, and error. Wire it to metrics or logging (see prom.Upstream); here it just inspects the info.

package main

import (
	"net/http"

	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	m := upstream.SingleHost("10.0.0.1:8080", &upstream.HTTPTransport{})
	m.OnRoundTrip = func(r *http.Request, info upstream.RoundTripInfo) {
		_ = info.Host     // resolved upstream target
		_ = info.Status   // response status, or 0 on a pre-response failure
		_ = info.Duration // time to response headers
		_ = info.Err      // transport error, or nil once a response arrived
	}

	s := parapet.New()
	s.Use(m)
}
Example (RetryPolicy)

ExampleUpstream_retryPolicy widens retry eligibility to idempotent PUT and DELETE. The default policy retries only GET/HEAD/OPTIONS/TRACE (and only when a body, if present, is rewindable via GetBody). A custom RetryPolicy lets the operator opt in to methods they know their upstream applies idempotently — but a retried request can hit upstreams up to Retries+1 times, so only widen this when the upstream is genuinely idempotent for the method.

package main

import (
	"net/http"

	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	up := upstream.SingleHost("backend:8080", &upstream.HTTPTransport{})
	up.RetryPolicy = func(r *http.Request) bool {
		switch r.Method {
		case http.MethodGet, http.MethodHead, http.MethodPut, http.MethodDelete:
			// A body-bearing PUT is still only safe to retry when it can be rewound;
			// require GetBody so each re-attempt resends the full body.
			return r.Body == nil || r.Body == http.NoBody || r.GetBody != nil
		default:
			return false
		}
	}
	_ = up
}

func New

func New(transport http.RoundTripper) *Upstream

New creates new upstream

func SingleHost added in v0.2.0

func SingleHost(host string, transport http.RoundTripper) *Upstream

SingleHost creates new single host upstream

Example

Proxy every request to one backend. SingleHost is the common case: pick a transport for the wire protocol (HTTPTransport for plain HTTP/1.1 here) and point it at a host:port. The returned *Upstream is a parapet.Middleware.

package main

import (
	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	s := parapet.New()
	s.Use(upstream.SingleHost("10.0.0.1:8080", &upstream.HTTPTransport{}))
}

func (*Upstream) RoundTrip added in v0.7.0

func (m *Upstream) RoundTrip(r *http.Request) (*http.Response, error)

RoundTrip wraps transport round-trip

func (Upstream) ServeHandler

func (m Upstream) ServeHandler(h http.Handler) http.Handler

ServeHandler implements middleware interface

type WeightedRoundRobinLoadBalancer added in v0.18.0

type WeightedRoundRobinLoadBalancer struct {

	// Targets is the set of upstreams to balance across.
	Targets []*Target
	// contains filtered or unexported fields
}

WeightedRoundRobinLoadBalancer distributes requests across targets in proportion to their Weight, using smooth weighted round-robin (the nginx algorithm): a heavy target's picks are interleaved with the others rather than dealt in a burst, and the long-run share is exactly Weight/sum(Weight). With all weights equal it degenerates to plain index-order round-robin.

It balances by request COUNT (use LeastConnLoadBalancer to balance by concurrent in-flight requests instead).

func NewWeightedRoundRobinLoadBalancer added in v0.18.0

func NewWeightedRoundRobinLoadBalancer(targets []*Target) *WeightedRoundRobinLoadBalancer

NewWeightedRoundRobinLoadBalancer creates a weighted round-robin load balancer using smooth weighted round-robin (SWRR). Each target receives a share of requests proportional to its Weight; targets with equal weight are served in plain round-robin order. Configuration fields are read once, before the first request; set them before serving.

Example

Bias traffic by capacity with weighted round-robin: a Weight of 3 receives three times the request share of a Weight of 1.

package main

import (
	"github.com/moonrhythm/parapet"
	"github.com/moonrhythm/parapet/pkg/upstream"
)

func main() {
	tr := &upstream.HTTPTransport{}
	lb := upstream.NewWeightedRoundRobinLoadBalancer([]*upstream.Target{
		{Host: "10.0.0.1:8080", Transport: tr, Weight: 3}, // larger box
		{Host: "10.0.0.2:8080", Transport: tr, Weight: 1},
	})

	s := parapet.New()
	s.Use(upstream.New(lb))
}

func (*WeightedRoundRobinLoadBalancer) RoundTrip added in v0.18.0

RoundTrip sends a request to the next weighted target.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL