proxymw

package
v0.1.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MIT Imports: 15 Imported by: 1

Documentation

Overview

Package proxymw holds interfaces and configuration to safeguard backend services from dynamic load

Index

Constants

View Source
const (
	BackpressureProxyType     = "backpressure"
	BackpressureUpdateCadence = 30 * time.Second
	MonitorQueryTimeout       = 15 * time.Second
	DefaultThrottleCurve      = 4.0
	InstantQueryEndpoint      = "/api/v1/query"
)
View Source
const (
	// https://sre.google/sre-book/handling-overload/
	CriticalityCriticalPlus = "CRITICAL_PLUS"
	CriticalityCritical     = "CRITICAL"
	// CriticalityDefault is used when the client does not set the X-Request-Criticality header.
	CriticalityDefault = CriticalityCritical
)
View Source
const (
	NoJitter time.Duration = 0
)

Variables

View Source
var (
	ErrJitterDelayRequired       = errors.New("delay must be non-empty when jitter is enabled")
	ErrBackpressureQueryRequired = errors.New(
		"must provide at least one backpressure query when backpressure is enabled",
	)
	ErrCongestionWindowMinBelowOne = errors.New("backpressure min window < 1")
	ErrCongestionWindowMaxBelowMin = errors.New("backpressure max window <= min window")
	ErrNegativeThrottleCurve       = errors.New("throttle curve cannot be negative")
	ErrNegativeQueryThresholds     = errors.New("backpressure query thresholds cannot be negative")
	ErrEmergencyBelowWarnThreshold = errors.New("emergency threshold must be > warn threshold")
	ErrExtraQueryQuotes            = errors.New("backpressure PromQL cannot be wrapped in quotes")

	ErrBackpressureBackoff = BlockErr(
		BackpressureProxyType,
		"congestion window closed, backoff from backpressure",
	)

	ErrNilRequest        = errors.New("nil *http.Request")
	ErrNilResponseWriter = errors.New("nil http.ResponseWriter")
	ErrNilResponse       = errors.New("nil *http.Response")
)

Functions

func BlockErr

func BlockErr(t string, format string, a ...any) error

func ParseHeaderKey added in v0.1.15

func ParseHeaderKey(rr Request, key HeaderKey) string

Types

type APIErrorResponse added in v0.0.3

type APIErrorResponse struct {
	Status    string `json:"status"`
	ErrorType string `json:"errorType"`
	Error     string `json:"error"`
}

APIErrorResponse represents the standard error response format

type Backpressure

type Backpressure struct {
	// contains filtered or unexported fields
}

Backpressure uses Additive Increase Multiplicative Decrease which is a congestion control algorithm to back off of expensive queries and is modeled after TCP's https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease. Backpressure signals are derived from PromQL metric signals and the system will never let less than a minimum number of queries through at one time. How does it work? 1. Start a background thread to keep backpressure metrics updated 2. On each request, set the "window" for how many concurrent requests are allowed 3. If we are within bounds, allow the request 4. If backpressure is not spiking, widen the window by one (additive) 5. if backpressure signals fire, cut the window in proportion to signal strength (multiplicative)

func NewBackpressure

func NewBackpressure(
	client ProxyClient,
	minWindow int,
	maxWindow int,
	queries []BackpressureQuery,
	monitorURL string,
) *Backpressure

func (*Backpressure) Init

func (bp *Backpressure) Init(ctx context.Context)

func (*Backpressure) Next added in v0.1.0

func (bp *Backpressure) Next(rr Request) error

type BackpressureConfig

type BackpressureConfig struct {
	EnableBackpressure        bool                `yaml:"enable_backpressure"`
	BackpressureMonitoringURL string              `yaml:"backpressure_monitoring_url"`
	BackpressureQueries       []BackpressureQuery `yaml:"backpressure_queries"`
	CongestionWindowMin       int                 `yaml:"congestion_window_min"`
	CongestionWindowMax       int                 `yaml:"congestion_window_max"`
}

func (BackpressureConfig) Validate

func (c BackpressureConfig) Validate() error

type BackpressureQuery added in v0.1.0

type BackpressureQuery struct {
	// Name is an optional human readable field used to emit tagged metrics.
	// When unset, operational metrics are omitted.
	// When set, read warn_threshold as proxymw_bp_warn_threshold{query_name="<name>"}
	Name string `yaml:"name,omitempty"`
	// Query is the PromQL to monitor system load or usage
	Query string `yaml:"query"`
	// WarningThreshold is the load value at which throttling begins (e.g., 80% capacity)
	WarningThreshold float64 `yaml:"warning_threshold"`
	// EmergencyThreshold is the load value at which the max num of requests are blocked (e.g., 100% capacity). Still lets through CongestionWindowMin
	EmergencyThreshold float64 `yaml:"emergency_threshold"`
	// ThrottlingCurve is a constant controlling the aggressiveness of throttling (e.g., default 4.0 for steep growth)
	ThrottlingCurve float64 `yaml:"throttling_curve"`
}

func (BackpressureQuery) Validate added in v0.1.0

func (q BackpressureQuery) Validate() error

type Config

type Config struct {
	BackpressureConfig `yaml:"backpressure_config"`
	EnableJitter       bool          `yaml:"enable_jitter"`
	JitterDelay        time.Duration `yaml:"jitter_delay"`
	EnableObserver     bool          `yaml:"enable_observer"`
	ClientTimeout      time.Duration `yaml:"client_timeout"`
	EnableCriticality  bool          `yaml:"enable_criticality"`
}

Config holds all middleware configuration options

func (Config) Validate

func (c Config) Validate() error

Validate ensures all enabled features have proper configuration

type HeaderKey added in v0.1.15

type HeaderKey string
const (
	HeaderCriticality HeaderKey = "X-Request-Criticality"
	HeaderCanWait     HeaderKey = "X-Can-Wait"
)

type Jitterer

type Jitterer struct {
	// contains filtered or unexported fields
}

Jitterer sleeps for a random amount of jitter before passing the request through. When EnableCriticality is set

1. CRITICAL_PLUS requests do not get jittered

2. Use max(X-Can-Wait, default) jitter if header is set

func NewJitterer

func NewJitterer(client ProxyClient, delay time.Duration, criticality bool) *Jitterer

func (*Jitterer) Init

func (j *Jitterer) Init(ctx context.Context)

func (*Jitterer) Next added in v0.1.0

func (j *Jitterer) Next(rr Request) error

type Mocker

type Mocker struct {
	ServeHTTPFunc func(w http.ResponseWriter, r *http.Request)
	RoundTripFunc func(r *http.Request) (*http.Response, error)
	InitFunc      func(context.Context)
	NextFunc      func(Request) error
	RequestFunc   func() *http.Request
}

Mocker simply mocks the main interfaces for unit testing

func (*Mocker) Init added in v0.1.0

func (m *Mocker) Init(ctx context.Context)

func (*Mocker) Next added in v0.1.0

func (m *Mocker) Next(rr Request) error

func (*Mocker) Request added in v0.1.11

func (m *Mocker) Request() *http.Request

func (*Mocker) RoundTrip added in v0.1.0

func (m *Mocker) RoundTrip(r *http.Request) (*http.Response, error)

func (*Mocker) ServeHTTP

func (m *Mocker) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer emits metrics such as error rate and how often proxies are blocking requests. Each client that blocks requests should tag their errors with a client type to filter metrics.

func NewObserver

func NewObserver(client ProxyClient) *Observer

func (*Observer) Init

func (o *Observer) Init(ctx context.Context)

func (*Observer) Next added in v0.1.0

func (o *Observer) Next(rr Request) error

type PrometheusResponse added in v0.0.3

type PrometheusResponse struct {
	Data struct {
		Result model.Vector `json:"result"`
	} `json:"data"`
}

type ProxyClient

type ProxyClient interface {
	Init(context.Context)
	Next(Request) error
}

ProxyClient defines the interface for middleware components

func NewFromConfig

func NewFromConfig(cfg Config, client ProxyClient) (ProxyClient, error)

type Request added in v0.1.0

type Request interface {
	Request() *http.Request
}

type RequestBlockedError

type RequestBlockedError struct {
	Err  error
	Type string
}

func (*RequestBlockedError) Error

func (e *RequestBlockedError) Error() string

type RequestResponseWrapper added in v0.1.0

type RequestResponseWrapper struct {
	// contains filtered or unexported fields
}

func (*RequestResponseWrapper) Request added in v0.1.0

func (c *RequestResponseWrapper) Request() *http.Request

func (*RequestResponseWrapper) Response added in v0.1.0

func (c *RequestResponseWrapper) Response() *http.Response

func (*RequestResponseWrapper) ResponseWriter added in v0.1.0

func (c *RequestResponseWrapper) ResponseWriter() http.ResponseWriter

func (*RequestResponseWrapper) SetResponse added in v0.1.0

func (c *RequestResponseWrapper) SetResponse(res *http.Response)

type Response added in v0.1.0

type Response interface {
	Response() *http.Response
	SetResponse(*http.Response)
}

type ResponseWriter added in v0.1.0

type ResponseWriter interface {
	ResponseWriter() http.ResponseWriter
}

type RoundTripperEntry added in v0.1.0

type RoundTripperEntry struct {
	// contains filtered or unexported fields
}

func NewRoundTripperFromConfig added in v0.1.0

func NewRoundTripperFromConfig(cfg Config, rt http.RoundTripper) (*RoundTripperEntry, error)

func (*RoundTripperEntry) Init added in v0.1.0

func (rte *RoundTripperEntry) Init(ctx context.Context)

func (*RoundTripperEntry) RoundTrip added in v0.1.0

func (rte *RoundTripperEntry) RoundTrip(req *http.Request) (*http.Response, error)

type RoundTripperExit added in v0.1.0

type RoundTripperExit struct {
	// contains filtered or unexported fields
}

RoundTripperExit represents the final handler in the middleware chain for http.RoundTripper

func (*RoundTripperExit) Init added in v0.1.0

func (rte *RoundTripperExit) Init(_ context.Context)

func (*RoundTripperExit) Next added in v0.1.0

func (rte *RoundTripperExit) Next(r Request) error

type ServeEntry added in v0.1.0

type ServeEntry struct {
	// contains filtered or unexported fields
}

ServeEntry represents the entry point of the middleware chain

func NewServeFromConfig added in v0.1.0

func NewServeFromConfig(cfg Config, next http.HandlerFunc) (*ServeEntry, error)

NewServeFromConfig constructs a middleware chain based on configuration. The middleware chain is constructed in the following order: 1. Request wrapping (Entry) 2. Metrics collection (Observer) 3. Request spreading (Jitter) 4. Adaptive rate limiting (Backpressure) 6. Final handler (Exit)

func (*ServeEntry) Init added in v0.1.0

func (se *ServeEntry) Init(ctx context.Context)

Init initializes the middleware chain

func (*ServeEntry) Proxy added in v0.1.0

func (se *ServeEntry) Proxy() http.Handler

Proxy returns an http.Handler that processes requests through the middleware chain

type ServeExit added in v0.1.0

type ServeExit struct {
	// contains filtered or unexported fields
}

ServeExit represents the final handler in the middleware chain for http.HandlerFunc

func (*ServeExit) Init added in v0.1.0

func (se *ServeExit) Init(_ context.Context)

func (*ServeExit) Next added in v0.1.0

func (se *ServeExit) Next(rr Request) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL