Documentation
¶
Index ¶
Constants ¶
const ( BackpressureProxyType = "backpressure" BackpressureUpdateCadence = time.Minute MonitorQueryTimeout = 10 * time.Second )
Variables ¶
var ( ErrJitterDelayRequired = errors.New("delay must be non-empty when jitter is enabled") ErrBackpressureQueryRequired = errors.New("must provide at least one backpressure query when backpressure is enabled") ErrCongestionWindowMinBelowOne = errors.New("backpressure min window < 1") ErrCongestionWindowMaxBelowMin = errors.New("backpressure max window <= min window") ErrRegistryRequired = errors.New("prometheus registry is required when observer is enabled") ErrBackpressureBackoff = BlockErr(BackpressureProxyType, "congestion window closed, backoff from backpressure") )
Functions ¶
Types ¶
type Backpressure ¶
type Backpressure struct {
// contains filtered or unexported fields
}
Backpressure uses Additive Increase Multiplicative Decrease which is a congestion control algorithm to back off of expensive queries and is modeled after TCP's https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease. Backpressure signals are derived from PromQL metric signals and the system will never let less than a minimum number of queries through at one time. How does it work? 1. Start a background thread to keep backpressure metrics updated 2. On each request, set the "window" for how many concurrent requests are allowed 3. if we are within bounds, allow the request 4. if backpressure is not spiking, widen the window by one (additive) 5. if backpressure signals fire, cut the window by half (multiplicative)
func NewBackpressure ¶
func NewBackpressure(querier ProxyClient, minWindow, maxWindow int, queries []string, monitorURL string) *Backpressure
func (*Backpressure) Init ¶
func (bp *Backpressure) Init(ctx context.Context)
func (*Backpressure) ServeHTTP ¶
func (bp *Backpressure) ServeHTTP(w http.ResponseWriter, r *http.Request) error
type BackpressureConfig ¶
type BackpressureConfig struct {
EnableBackpressure bool
BackpressureMonitoringURL string
BackpressureQueries []string
CongestionWindowMin int
CongestionWindowMax int
}
func (BackpressureConfig) Validate ¶
func (c BackpressureConfig) Validate() error
type Config ¶
type Config struct {
BackpressureConfig
EnableJitter bool
JitterDelay time.Duration
EnableObserver bool
ObserverRegistry *prometheus.Registry
}
type Entry ¶
type Entry struct {
// contains filtered or unexported fields
}
func NewFromConfig ¶
func NewFromConfig(cfg Config, next http.HandlerFunc) (*Entry, error)
NewFromConfig reads the middleware config to inject related proxies. Proxies are wrapped from last to first (when enabled) to
1. Wrap *http.Request into the ProxyClient interface
2. Collect metrics on the internal proxies
3. Wait for some jitter to spread requests
4. Apply backpressure using signals from a Prometheus/Thanos server
5. Unwrap into the next http.HandlerFunc (or a passthrough http.ReverseProxy)
type Jitterer ¶
type Jitterer struct {
// contains filtered or unexported fields
}
Jitterer sleeps for a random amount of jitter before passing the request through.
func NewJitterer ¶
func NewJitterer(querier ProxyClient, delay time.Duration) *Jitterer
type Mocker ¶
type Mocker struct {
ServeHTTPFunc func(w http.ResponseWriter, r *http.Request)
}
Mocker simply mocks the main http.HandlerFunc methods for unit testing
type Observer ¶
type Observer struct {
// contains filtered or unexported fields
}
Observer emits metrics such as error rate and how often queriers are blocking requests. Each querier that blocks requests should tag their errors with a querier type to filter metrics.
func NewObserver ¶
func NewObserver(querier ProxyClient, reg *prometheus.Registry) *Observer
type ProxyClient ¶
type RequestBlockedError ¶
func (*RequestBlockedError) Error ¶
func (e *RequestBlockedError) Error() string