core

package
v1.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ObjectNameField = "metadata.name"
)

Variables

This section is empty.

Functions

func ContainsString

func ContainsString(slice []string, s string) bool

Helper functions to check and remove string from a slice of strings.

func DefaultAppName

func DefaultAppName() string

func GetAppName

func GetAppName(ctx context.Context) string

func GetCurrentTime

func GetCurrentTime(ctx context.Context) time.Time

func GetLogger

func GetLogger(ctx context.Context) *logrus.Entry

func GetRandFloat64

func GetRandFloat64(ctx context.Context) float64

func GetRandIntn

func GetRandIntn(ctx context.Context, n int) int

func GetRandomBytes

func GetRandomBytes(ctx context.Context, n int) []byte

func HTTPAddAdminRoute

func HTTPAddAdminRoute(ctx context.Context, r *mux.Router)

func HTTPAddHealthRoute

func HTTPAddHealthRoute(ctx context.Context, r *mux.Router)

func HTTPAddMetricsRoute

func HTTPAddMetricsRoute(_ context.Context, r *mux.Router)

func HTTPAddVersionRoute

func HTTPAddVersionRoute(ctx context.Context, r *mux.Router)

func HTTPAdminHandler

func HTTPAdminHandler(ctx context.Context) http.Handler

HTTPAdminHandler provides debugging utilities. It should usually listen to 127.0.0.1 instead of exposing publicly (unless authNed and authZed). Currently admin handler allows update logging level while application is running, e.g.:

curl http://127.0.0.1:8002/admin?log-level=debug

func HTTPHealthHandler

func HTTPHealthHandler(_ context.Context) http.Handler

func HTTPVersionHandler

func HTTPVersionHandler(_ context.Context) http.Handler

func NewDefaultContext

func NewDefaultContext(parent context.Context) context.Context

NewDefaultContext provides a default context for applications.

func NewHTTPMiddleware

func NewHTTPMiddleware(ctx context.Context, opts ...HTTPMiddlewareOption) []mux.MiddlewareFunc

HTTPMiddleware builds a http middleware that:

  • Adds contexted request logger with {method,path,ip} into request.Context

  • Collects <latencyMetricsName>_http_duration_seconds metrics with {path,method} dimensions, if latencyMetricsName is specified

- Wraps handler with http.TimeoutHandler with `handlerTimeout`.

- Wraps request.Body with MaxBytesReader with `requestBodyLimit`.

  • Instruments OpenTelemetry by adding a span to the request context that tracks the response and its return codes

func RemoveString

func RemoveString(slice []string, s string) (result []string)

func SetDefaultLogLevel

func SetDefaultLogLevel(l logrus.Level)

func SetLevel

func SetLevel(entry *logrus.Entry, level string) error

func WithAppName

func WithAppName(parent context.Context, appName string) context.Context

func WithClock

func WithClock(parent context.Context, clockFunc func() time.Time) context.Context

func WithDefaultLogger

func WithDefaultLogger(parent context.Context) context.Context

func WithLogger

func WithLogger(parent context.Context, entry *logrus.Entry) context.Context

func WithRandomSeed

func WithRandomSeed(parent context.Context, seed int64) context.Context

func WithSignalHandler

func WithSignalHandler(parent context.Context) context.Context

func WithTestingLogger

func WithTestingLogger(parent context.Context) (context.Context, *test.Hook)

Types

type Event

type Event struct {
	Kind          string
	ObjectMetaKey string
	Object        interface{}
}

func NewUpdateEvent

func NewUpdateEvent(kind string, oldObj, newObj interface{}) *Event

func (*Event) String

func (e *Event) String() string

type EventStreamMerger

type EventStreamMerger struct {
	// contains filtered or unexported fields
}

func NewEventStreamMerger

func NewEventStreamMerger() *EventStreamMerger

func (*EventStreamMerger) Merge

func (s *EventStreamMerger) Merge(ctx context.Context, eventChs ...<-chan *Event) <-chan *Event

func (*EventStreamMerger) WithBufferSize

func (s *EventStreamMerger) WithBufferSize(bufferSize int) *EventStreamMerger

type HTTPCodeError

type HTTPCodeError int

HTTPCodeError wraps an unexpected HTTP status code in an error.

func (HTTPCodeError) Error

func (e HTTPCodeError) Error() string

type HTTPMiddlewareOption

type HTTPMiddlewareOption func(*httpMiddleware)

func WithHandlerTimeout

func WithHandlerTimeout(t time.Duration) HTTPMiddlewareOption

WithHandlerTimeout allows one to modify the handler timeout. Default is 5 seconds.

func WithPrometheusRegisterer

func WithPrometheusRegisterer(registerer prometheus.Registerer) HTTPMiddlewareOption

WithPrometheusRegisterer sets a custom prometheus registerer for metrics. If not set, prometheus.DefaultRegisterer is used.

func WithRequestBodyLimit

func WithRequestBodyLimit(limit int64) HTTPMiddlewareOption

WithRequestBodyLimit modifies the request body limit. Default is 1MB.

func WithRequestMetrics

func WithRequestMetrics(serverName string) HTTPMiddlewareOption

WithRequestMetrics enables service specific metrics regarding request duration and count. By default these metrics are disabled.

func WithTelemetry

func WithTelemetry(serverName string, telemetryAttributes ...attribute.KeyValue) HTTPMiddlewareOption

WithTelemetry enables telemetry for http requests, provide telemetryAttributes for all http request spans

type HTTPService

type HTTPService struct {
	*mux.Router

	// Addr could be a TCP address like:
	//
	// - 0.0.0.0:8000 (any interface, port 8000)
	// - 127.0.0.1:8000 (loop-back interface only, port 8000)
	// - 0.0.0.0:0 (any interface, any free port)
	//
	// Addr could also be prefix with `unix://` (e.g.,
	// `unix:///tmp/test.sock`), which will make HTTPService listen to a
	// local unix domain socket instead of a network interface.
	Addr string

	// TLSCertFile and TLSKeyFile are passed to ServeTLS() to serve HTTPS.
	// By default, HTTPService serves plain HTTP.
	TLSCertFile, TLSKeyFile string

	// SocketPermission is only applicable when addr is a unix socket
	// path, e.g., 'unix:///tmp/test.sock'
	SocketPermission os.FileMode

	// ReadTimeout, WriteTimeout and IdleTimeout are timeout
	// configurations passed to go http.Server.
	ReadTimeout, WriteTimeout, IdleTimeout time.Duration

	// ShutDownGracePeriod is the grace period for
	// `http.Server:Shutdown`
	ShutDownGracePeriod time.Duration
}

HTTPService augments mux.Router with:

- Reasonable defaults

  • Helper methods for adding common routes (/healthz, /version, /metrics, /admin ...).

  • Handle both tcp addresses (e.g., :8000) and unix socket addresses (e.g., unix:///tmp/test.sock)

  • A non-blocking Start(ctx) method handles grace shutdown when receiving ctx.Done().

func NewHTTPService

func NewHTTPService(addr string) *HTTPService

func (*HTTPService) AddAdminRoute

func (s *HTTPService) AddAdminRoute(ctx context.Context)

func (*HTTPService) AddHealthRoute

func (s *HTTPService) AddHealthRoute(ctx context.Context)

func (*HTTPService) AddMetricsRoute

func (s *HTTPService) AddMetricsRoute(ctx context.Context)

func (*HTTPService) AddVersionRoute

func (s *HTTPService) AddVersionRoute(ctx context.Context)

func (*HTTPService) Start

func (s *HTTPService) Start(ctx context.Context) (net.Listener, error)

type KubeClients

type KubeClients struct {
	Config *rest.Config
	K8s    k8sclient.Interface
}

KubeClients aggregates kubeconfig and generated K8s clients, such as native K8s client and EGX CRD client client. It is immutable after creation.

func NewKubeClientsFromConfig

func NewKubeClientsFromConfig(ctx context.Context, config *rest.Config) (*KubeClients, error)

func NewKubeClientsFromConfigCh

func NewKubeClientsFromConfigCh(ctx context.Context, configCh <-chan *rest.Config) (*KubeClients, error)

func NewKubeClientsFromPath

func NewKubeClientsFromPath(ctx context.Context, path string) (*KubeClients, error)

type KubeClientsStream

type KubeClientsStream struct {
	// contains filtered or unexported fields
}

func NewKubeClientsStream

func NewKubeClientsStream() *KubeClientsStream

func (*KubeClientsStream) Start

func (s *KubeClientsStream) Start(ctx context.Context) <-chan *KubeClients

func (*KubeClientsStream) WithConfigCh

func (s *KubeClientsStream) WithConfigCh(ch <-chan *rest.Config) *KubeClientsStream

func (*KubeClientsStream) WithK8sNewForConfig

func (s *KubeClientsStream) WithK8sNewForConfig(f func(c *rest.Config) (k8sclient.Interface, error)) *KubeClientsStream

type KubeConfigurator

type KubeConfigurator interface {
	Start(ctx context.Context) <-chan *rest.Config
}

KubeConfigurator is an interface sends (updated) kubeconfig over a channel for configuring K8s clients.

type ObjectUpdate

type ObjectUpdate struct {
	NewObj interface{}
	OldObj interface{}
}

type PathKubeConfigurator

type PathKubeConfigurator struct {
	// contains filtered or unexported fields
}

PathKubeConfigurator implements KubeConfigurator by building rest.Config from given kubeconfig file path. Currently it only send kubeconfig once during init. It can be extended to watch kubeconfig file content changes if it is useful.

func NewPathKubeConfigurator

func NewPathKubeConfigurator() *PathKubeConfigurator

func (*PathKubeConfigurator) Start

func (c *PathKubeConfigurator) Start(ctx context.Context) <-chan *rest.Config

func (*PathKubeConfigurator) WithClientRateLimitingDisabled

func (c *PathKubeConfigurator) WithClientRateLimitingDisabled() *PathKubeConfigurator

func (*PathKubeConfigurator) WithConfigModifier

func (c *PathKubeConfigurator) WithConfigModifier(configModifier func(*rest.Config)) *PathKubeConfigurator

func (*PathKubeConfigurator) WithPath

type Queue

type Queue chan *Event

Queue is a simple go channel used to process events asyncly, could explore more advanced queues later.

type Rand

type Rand struct {
	sync.Mutex
	*rand.Rand
}

Rand wraps *rand.Rand, provides thread-safe access to a subset of methods used in EGX.

func (*Rand) Float64

func (r *Rand) Float64() float64

func (*Rand) Intn

func (r *Rand) Intn(n int) int

func (*Rand) RandomBytes

func (r *Rand) RandomBytes(n int) []byte

type RecurrentEventSource

type RecurrentEventSource struct {
	Kind     string
	Interval time.Duration
	Ticker   *time.Ticker
}

type RecurrentEventTicker

type RecurrentEventTicker struct {
	Sources []RecurrentEventSource
	C       Queue
}

func (*RecurrentEventTicker) Start

func (t *RecurrentEventTicker) Start(stopCh <-chan struct{})

TODO: find better tickers such that some randomness could be added into the sleep interval.

func (*RecurrentEventTicker) Stop

func (t *RecurrentEventTicker) Stop()

type TickerStream

type TickerStream struct {
	// contains filtered or unexported fields
}

func NewTickerStream

func NewTickerStream() *TickerStream

func (*TickerStream) Start

func (s *TickerStream) Start(ctx context.Context) <-chan *Event

func (*TickerStream) WithBufferSize

func (s *TickerStream) WithBufferSize(bufferSize int) *TickerStream

func (*TickerStream) WithImmediate

func (s *TickerStream) WithImmediate(immediate bool) *TickerStream

func (*TickerStream) WithInterval

func (s *TickerStream) WithInterval(interval time.Duration) *TickerStream

func (*TickerStream) WithKind

func (s *TickerStream) WithKind(kind string) *TickerStream

func (*TickerStream) WithRandomOffset

func (s *TickerStream) WithRandomOffset(randomOffset bool) *TickerStream

func (*TickerStream) WithZeroEventTimestamp

func (s *TickerStream) WithZeroEventTimestamp(zeroEventTimestamp bool) *TickerStream

type TokenFetcher

type TokenFetcher interface {
	FetchToken(ctx context.Context) (string, error)
	RefreshClient()
}

type TokenKubeConfigurator

type TokenKubeConfigurator struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TokenKubeConfigurator implements KubeConfigurator by building rest.Config use the bear token retrieved from a tokenFetcher. It checks token update at given `Interval`, and only sends new kubeconfig down to the channel if cached token get's changed.

func NewTokenKubeConfigurator

func NewTokenKubeConfigurator() *TokenKubeConfigurator

func (*TokenKubeConfigurator) Start

func (c *TokenKubeConfigurator) Start(ctx context.Context) <-chan *rest.Config

func (*TokenKubeConfigurator) WithFetcher

func (*TokenKubeConfigurator) WithInterval

func (*TokenKubeConfigurator) WithMasterURL

func (c *TokenKubeConfigurator) WithMasterURL(url string) *TokenKubeConfigurator

func (*TokenKubeConfigurator) WithTimeout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL