proxymw

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: MIT Imports: 12 Imported by: 1

Documentation

Index

Constants

View Source
const (
	BackpressureProxyType = "ai_md"

	BackpressureUpdateCadence = time.Minute

	MonitorQueryTimeout = 10 * time.Second
)

Variables

View Source
var (
	ErrJitterDelayRequired         = errors.New("delay must be non-empty when jitter is enabled")
	ErrBackpressureQueryRequired   = errors.New("must provide at least one backpressure query when backpressure is enabled")
	ErrCongestionWindowMinBelowOne = errors.New("backpressure min window < 1")
	ErrCongestionWindowMaxBelowMin = errors.New("backpressure max window <= min window")
	ErrRegistryRequired            = errors.New("prometheus registry is required when observer is enabled")

	ErrBackpressureBackoff = BlockErr(BackpressureProxyType, "congestion window closed, backoff from backpressure")
)

Functions

func BlockErr

func BlockErr(t string, format string, a ...any) error

Types

type Backpressure

type Backpressure struct {
	// contains filtered or unexported fields
}

Backpressure uses Additive Increase Multiplicative Decrease which is a congestion control algorithm to back off of expensive queries and is modeled after TCP's https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease. Backpressure signals are derived from PromQL metric signals and the system will never let less than a minimum number of queries through at one time. How does it work? 1. Start a background thread to keep backpressure metrics updated 2. On each request, set the "window" for how many concurrent requests are allowed 3. if we are within bounds, allow the request 4. if backpressure is not spiking, widen the window by one (additive) 5. if backpressure signals fire, cut the window by half (multiplicative)

func NewBackpressure

func NewBackpressure(querier ProxyClient, minWindow, maxWindow int, queries []string, monitorURL string) *Backpressure

func (*Backpressure) Init

func (bp *Backpressure) Init(ctx context.Context)

func (*Backpressure) ServeHTTP

func (bp *Backpressure) ServeHTTP(w http.ResponseWriter, r *http.Request) error

type BackpressureConfig

type BackpressureConfig struct {
	EnableBackpressure        bool
	BackpressureMonitoringURL string
	BackpressureQueries       []string
	CongestionWindowMin       int
	CongestionWindowMax       int
}

func (BackpressureConfig) Validate

func (c BackpressureConfig) Validate() error

type Config

type Config struct {
	BackpressureConfig

	EnableJitter bool
	JitterDelay  time.Duration

	EnableObserver   bool
	ObserverRegistry *prometheus.Registry
}

func (Config) Validate

func (c Config) Validate() error

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

func NewFromConfig

func NewFromConfig(cfg Config, next http.HandlerFunc) (*Entry, error)

NewFromConfig reads the middleware config to inject related proxies. Proxies are wrapped from last to first (when enabled) to

1. Wrap *http.Request into the ProxyClient interface

2. Collect metrics on the internal proxies

3. Wait for some jitter to spread requests

4. Apply backpressure using signals from a Prometheus/Thanos server

5. Unwrap into the next http.HandlerFunc (or a passthrough http.ReverseProxy)

func (*Entry) Init

func (e *Entry) Init(ctx context.Context)

func (*Entry) Proxy

func (e *Entry) Proxy() http.Handler

type Exit

type Exit struct {
	// contains filtered or unexported fields
}

func (*Exit) Init

func (e *Exit) Init(_ context.Context)

func (*Exit) ServeHTTP

func (e *Exit) ServeHTTP(w http.ResponseWriter, r *http.Request) error

type Jitterer

type Jitterer struct {
	// contains filtered or unexported fields
}

Jitterer sleeps for a random amount of jitter before passing the request through.

func NewJitterer

func NewJitterer(querier ProxyClient, delay time.Duration) *Jitterer

func (*Jitterer) Init

func (j *Jitterer) Init(ctx context.Context)

func (*Jitterer) ServeHTTP

func (j *Jitterer) ServeHTTP(w http.ResponseWriter, r *http.Request) error

type Mocker

type Mocker struct {
	ServeHTTPFunc func(w http.ResponseWriter, r *http.Request)
}

Mocker simply mocks the main http.HandlerFunc methods for unit testing

func (*Mocker) ServeHTTP

func (m *Mocker) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer emits metrics such as error rate and how often queriers are blocking requests. Each querier that blocks requests should tag their errors with a querier type to filter metrics.

func NewObserver

func NewObserver(querier ProxyClient, reg *prometheus.Registry) *Observer

func (*Observer) Init

func (o *Observer) Init(ctx context.Context)

func (*Observer) ServeHTTP

func (o *Observer) ServeHTTP(w http.ResponseWriter, r *http.Request) error

type ProxyClient

type ProxyClient interface {
	Init(context.Context)
	ServeHTTP(http.ResponseWriter, *http.Request) error
}

type RequestBlockedError

type RequestBlockedError struct {
	Err  error
	Type string
}

func (*RequestBlockedError) Error

func (e *RequestBlockedError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL