Documentation
¶
Index ¶
- Constants
- func ContainsString(slice []string, s string) bool
- func DefaultAppName() string
- func GetAppName(ctx context.Context) string
- func GetCurrentTime(ctx context.Context) time.Time
- func GetLogger(ctx context.Context) *logrus.Entry
- func GetRandFloat64(ctx context.Context) float64
- func GetRandIntn(ctx context.Context, n int) int
- func GetRandomBytes(ctx context.Context, n int) []byte
- func HTTPAddAdminRoute(ctx context.Context, r *mux.Router)
- func HTTPAddHealthRoute(ctx context.Context, r *mux.Router)
- func HTTPAddMetricsRoute(_ context.Context, r *mux.Router)
- func HTTPAddVersionRoute(ctx context.Context, r *mux.Router)
- func HTTPAdminHandler(ctx context.Context) http.Handler
- func HTTPHealthHandler(_ context.Context) http.Handler
- func HTTPVersionHandler(_ context.Context) http.Handler
- func NewDefaultContext(parent context.Context) context.Context
- func NewHTTPMiddleware(ctx context.Context, opts ...HTTPMiddlewareOption) []mux.MiddlewareFunc
- func RemoveString(slice []string, s string) (result []string)
- func SetDefaultLogLevel(l logrus.Level)
- func SetLevel(entry *logrus.Entry, level string) error
- func WithAppName(parent context.Context, appName string) context.Context
- func WithClock(parent context.Context, clockFunc func() time.Time) context.Context
- func WithDefaultLogger(parent context.Context) context.Context
- func WithLogger(parent context.Context, entry *logrus.Entry) context.Context
- func WithRandomSeed(parent context.Context, seed int64) context.Context
- func WithSignalHandler(parent context.Context) context.Context
- func WithTestingLogger(parent context.Context) (context.Context, *test.Hook)
- type Event
- type EventStreamMerger
- type HTTPCodeError
- type HTTPMiddlewareOption
- func WithHandlerTimeout(t time.Duration) HTTPMiddlewareOption
- func WithPrometheusRegisterer(registerer prometheus.Registerer) HTTPMiddlewareOption
- func WithRequestBodyLimit(limit int64) HTTPMiddlewareOption
- func WithRequestMetrics(serverName string) HTTPMiddlewareOption
- func WithTelemetry(serverName string, telemetryAttributes ...attribute.KeyValue) HTTPMiddlewareOption
- type HTTPService
- func (s *HTTPService) AddAdminRoute(ctx context.Context)
- func (s *HTTPService) AddHealthRoute(ctx context.Context)
- func (s *HTTPService) AddMetricsRoute(ctx context.Context)
- func (s *HTTPService) AddVersionRoute(ctx context.Context)
- func (s *HTTPService) Start(ctx context.Context) (net.Listener, error)
- type KubeClients
- type KubeClientsStream
- type KubeConfigurator
- type ObjectUpdate
- type PathKubeConfigurator
- func (c *PathKubeConfigurator) Start(ctx context.Context) <-chan *rest.Config
- func (c *PathKubeConfigurator) WithClientRateLimitingDisabled() *PathKubeConfigurator
- func (c *PathKubeConfigurator) WithConfigModifier(configModifier func(*rest.Config)) *PathKubeConfigurator
- func (c *PathKubeConfigurator) WithPath(p string) *PathKubeConfigurator
- type Queue
- type Rand
- type RecurrentEventSource
- type RecurrentEventTicker
- type TickerStream
- func (s *TickerStream) Start(ctx context.Context) <-chan *Event
- func (s *TickerStream) WithBufferSize(bufferSize int) *TickerStream
- func (s *TickerStream) WithImmediate(immediate bool) *TickerStream
- func (s *TickerStream) WithInterval(interval time.Duration) *TickerStream
- func (s *TickerStream) WithKind(kind string) *TickerStream
- func (s *TickerStream) WithRandomOffset(randomOffset bool) *TickerStream
- func (s *TickerStream) WithZeroEventTimestamp(zeroEventTimestamp bool) *TickerStream
- type TokenFetcher
- type TokenKubeConfigurator
- func (c *TokenKubeConfigurator) Start(ctx context.Context) <-chan *rest.Config
- func (c *TokenKubeConfigurator) WithFetcher(f TokenFetcher) *TokenKubeConfigurator
- func (c *TokenKubeConfigurator) WithInterval(i time.Duration) *TokenKubeConfigurator
- func (c *TokenKubeConfigurator) WithMasterURL(url string) *TokenKubeConfigurator
- func (c *TokenKubeConfigurator) WithTimeout(t time.Duration) *TokenKubeConfigurator
Constants ¶
const (
ObjectNameField = "metadata.name"
)
Variables ¶
This section is empty.
Functions ¶
func ContainsString ¶
Helper functions to check and remove string from a slice of strings.
func DefaultAppName ¶
func DefaultAppName() string
func GetAppName ¶
func GetRandFloat64 ¶
func HTTPAdminHandler ¶
HTTPAdminHandler provides debugging utilities. It should usually listen to 127.0.0.1 instead of exposing publicly (unless authNed and authZed). Currently admin handler allows update logging level while application is running, e.g.:
func NewDefaultContext ¶
NewDefaultContext provides a default context for applications.
func NewHTTPMiddleware ¶
func NewHTTPMiddleware(ctx context.Context, opts ...HTTPMiddlewareOption) []mux.MiddlewareFunc
HTTPMiddleware builds a http middleware that:
Adds contexted request logger with {method,path,ip} into request.Context
Collects <latencyMetricsName>_http_duration_seconds metrics with {path,method} dimensions, if latencyMetricsName is specified
- Wraps handler with http.TimeoutHandler with `handlerTimeout`.
- Wraps request.Body with MaxBytesReader with `requestBodyLimit`.
- Instruments OpenTelemetry by adding a span to the request context that tracks the response and its return codes
func RemoveString ¶
func SetDefaultLogLevel ¶
Types ¶
type Event ¶
func NewUpdateEvent ¶
type EventStreamMerger ¶
type EventStreamMerger struct {
// contains filtered or unexported fields
}
func NewEventStreamMerger ¶
func NewEventStreamMerger() *EventStreamMerger
func (*EventStreamMerger) Merge ¶
func (s *EventStreamMerger) Merge(ctx context.Context, eventChs ...<-chan *Event) <-chan *Event
func (*EventStreamMerger) WithBufferSize ¶
func (s *EventStreamMerger) WithBufferSize(bufferSize int) *EventStreamMerger
type HTTPCodeError ¶
type HTTPCodeError int
HTTPCodeError wraps an unexpected HTTP status code in an error.
func (HTTPCodeError) Error ¶
func (e HTTPCodeError) Error() string
type HTTPMiddlewareOption ¶
type HTTPMiddlewareOption func(*httpMiddleware)
func WithHandlerTimeout ¶
func WithHandlerTimeout(t time.Duration) HTTPMiddlewareOption
WithHandlerTimeout allows one to modify the handler timeout. Default is 5 seconds.
func WithPrometheusRegisterer ¶
func WithPrometheusRegisterer(registerer prometheus.Registerer) HTTPMiddlewareOption
WithPrometheusRegisterer sets a custom prometheus registerer for metrics. If not set, prometheus.DefaultRegisterer is used.
func WithRequestBodyLimit ¶
func WithRequestBodyLimit(limit int64) HTTPMiddlewareOption
WithRequestBodyLimit modifies the request body limit. Default is 1MB.
func WithRequestMetrics ¶
func WithRequestMetrics(serverName string) HTTPMiddlewareOption
WithRequestMetrics enables service specific metrics regarding request duration and count. By default these metrics are disabled.
func WithTelemetry ¶
func WithTelemetry(serverName string, telemetryAttributes ...attribute.KeyValue) HTTPMiddlewareOption
WithTelemetry enables telemetry for http requests, provide telemetryAttributes for all http request spans
type HTTPService ¶
type HTTPService struct {
*mux.Router
// Addr could be a TCP address like:
//
// - 0.0.0.0:8000 (any interface, port 8000)
// - 127.0.0.1:8000 (loop-back interface only, port 8000)
// - 0.0.0.0:0 (any interface, any free port)
//
// Addr could also be prefix with `unix://` (e.g.,
// `unix:///tmp/test.sock`), which will make HTTPService listen to a
// local unix domain socket instead of a network interface.
Addr string
// TLSCertFile and TLSKeyFile are passed to ServeTLS() to serve HTTPS.
// By default, HTTPService serves plain HTTP.
TLSCertFile, TLSKeyFile string
// SocketPermission is only applicable when addr is a unix socket
// path, e.g., 'unix:///tmp/test.sock'
SocketPermission os.FileMode
// ReadTimeout, WriteTimeout and IdleTimeout are timeout
// configurations passed to go http.Server.
ReadTimeout, WriteTimeout, IdleTimeout time.Duration
// ShutDownGracePeriod is the grace period for
// `http.Server:Shutdown`
ShutDownGracePeriod time.Duration
}
HTTPService augments mux.Router with:
- Reasonable defaults
Helper methods for adding common routes (/healthz, /version, /metrics, /admin ...).
Handle both tcp addresses (e.g., :8000) and unix socket addresses (e.g., unix:///tmp/test.sock)
A non-blocking Start(ctx) method handles grace shutdown when receiving ctx.Done().
func NewHTTPService ¶
func NewHTTPService(addr string) *HTTPService
func (*HTTPService) AddAdminRoute ¶
func (s *HTTPService) AddAdminRoute(ctx context.Context)
func (*HTTPService) AddHealthRoute ¶
func (s *HTTPService) AddHealthRoute(ctx context.Context)
func (*HTTPService) AddMetricsRoute ¶
func (s *HTTPService) AddMetricsRoute(ctx context.Context)
func (*HTTPService) AddVersionRoute ¶
func (s *HTTPService) AddVersionRoute(ctx context.Context)
type KubeClients ¶
KubeClients aggregates kubeconfig and generated K8s clients, such as native K8s client and EGX CRD client client. It is immutable after creation.
func NewKubeClientsFromPath ¶
func NewKubeClientsFromPath(ctx context.Context, path string) (*KubeClients, error)
type KubeClientsStream ¶
type KubeClientsStream struct {
// contains filtered or unexported fields
}
func NewKubeClientsStream ¶
func NewKubeClientsStream() *KubeClientsStream
func (*KubeClientsStream) Start ¶
func (s *KubeClientsStream) Start(ctx context.Context) <-chan *KubeClients
func (*KubeClientsStream) WithConfigCh ¶
func (s *KubeClientsStream) WithConfigCh(ch <-chan *rest.Config) *KubeClientsStream
func (*KubeClientsStream) WithK8sNewForConfig ¶
func (s *KubeClientsStream) WithK8sNewForConfig(f func(c *rest.Config) (k8sclient.Interface, error)) *KubeClientsStream
type KubeConfigurator ¶
KubeConfigurator is an interface sends (updated) kubeconfig over a channel for configuring K8s clients.
type ObjectUpdate ¶
type ObjectUpdate struct {
NewObj interface{}
OldObj interface{}
}
type PathKubeConfigurator ¶
type PathKubeConfigurator struct {
// contains filtered or unexported fields
}
PathKubeConfigurator implements KubeConfigurator by building rest.Config from given kubeconfig file path. Currently it only send kubeconfig once during init. It can be extended to watch kubeconfig file content changes if it is useful.
func NewPathKubeConfigurator ¶
func NewPathKubeConfigurator() *PathKubeConfigurator
func (*PathKubeConfigurator) Start ¶
func (c *PathKubeConfigurator) Start(ctx context.Context) <-chan *rest.Config
func (*PathKubeConfigurator) WithClientRateLimitingDisabled ¶
func (c *PathKubeConfigurator) WithClientRateLimitingDisabled() *PathKubeConfigurator
func (*PathKubeConfigurator) WithConfigModifier ¶
func (c *PathKubeConfigurator) WithConfigModifier(configModifier func(*rest.Config)) *PathKubeConfigurator
func (*PathKubeConfigurator) WithPath ¶
func (c *PathKubeConfigurator) WithPath(p string) *PathKubeConfigurator
type Queue ¶
type Queue chan *Event
Queue is a simple go channel used to process events asyncly, could explore more advanced queues later.
type Rand ¶
Rand wraps *rand.Rand, provides thread-safe access to a subset of methods used in EGX.
func (*Rand) RandomBytes ¶
type RecurrentEventSource ¶
type RecurrentEventTicker ¶
type RecurrentEventTicker struct {
Sources []RecurrentEventSource
C Queue
}
func (*RecurrentEventTicker) Start ¶
func (t *RecurrentEventTicker) Start(stopCh <-chan struct{})
TODO: find better tickers such that some randomness could be added into the sleep interval.
func (*RecurrentEventTicker) Stop ¶
func (t *RecurrentEventTicker) Stop()
type TickerStream ¶
type TickerStream struct {
// contains filtered or unexported fields
}
func NewTickerStream ¶
func NewTickerStream() *TickerStream
func (*TickerStream) WithBufferSize ¶
func (s *TickerStream) WithBufferSize(bufferSize int) *TickerStream
func (*TickerStream) WithImmediate ¶
func (s *TickerStream) WithImmediate(immediate bool) *TickerStream
func (*TickerStream) WithInterval ¶
func (s *TickerStream) WithInterval(interval time.Duration) *TickerStream
func (*TickerStream) WithKind ¶
func (s *TickerStream) WithKind(kind string) *TickerStream
func (*TickerStream) WithRandomOffset ¶
func (s *TickerStream) WithRandomOffset(randomOffset bool) *TickerStream
func (*TickerStream) WithZeroEventTimestamp ¶
func (s *TickerStream) WithZeroEventTimestamp(zeroEventTimestamp bool) *TickerStream
type TokenFetcher ¶
type TokenKubeConfigurator ¶
TokenKubeConfigurator implements KubeConfigurator by building rest.Config use the bear token retrieved from a tokenFetcher. It checks token update at given `Interval`, and only sends new kubeconfig down to the channel if cached token get's changed.
func NewTokenKubeConfigurator ¶
func NewTokenKubeConfigurator() *TokenKubeConfigurator
func (*TokenKubeConfigurator) Start ¶
func (c *TokenKubeConfigurator) Start(ctx context.Context) <-chan *rest.Config
func (*TokenKubeConfigurator) WithFetcher ¶
func (c *TokenKubeConfigurator) WithFetcher(f TokenFetcher) *TokenKubeConfigurator
func (*TokenKubeConfigurator) WithInterval ¶
func (c *TokenKubeConfigurator) WithInterval(i time.Duration) *TokenKubeConfigurator
func (*TokenKubeConfigurator) WithMasterURL ¶
func (c *TokenKubeConfigurator) WithMasterURL(url string) *TokenKubeConfigurator
func (*TokenKubeConfigurator) WithTimeout ¶
func (c *TokenKubeConfigurator) WithTimeout(t time.Duration) *TokenKubeConfigurator