telemetry

package
v0.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ComponentIngress   = "ingress"
	ComponentTransport = "transport"
	ComponentExecution = "execution"
	ComponentEgress    = "egress"
)
View Source
const (
	TypeRequestStart = "request_start"
	TypeRequestEnd   = "request_end"
	TypeBatch        = "batch"
	TypeQueueDepth   = "queue_depth"
	TypeError        = "error"
	TypeStall        = "stall"
)

Variables

This section is empty.

Functions

func WithScope

func WithScope(ctx context.Context, metadata map[string]any) context.Context

Types

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

func NewAggregator

func NewAggregator(cfg Config) *Aggregator

func (*Aggregator) Close

func (a *Aggregator) Close()

func (*Aggregator) History

func (a *Aggregator) History() []Snapshot

func (*Aggregator) Publish

func (a *Aggregator) Publish(evt Event) bool

func (*Aggregator) Snapshot

func (a *Aggregator) Snapshot() Snapshot

func (*Aggregator) Subscribe

func (a *Aggregator) Subscribe() (<-chan Snapshot, func())

type ComponentSnapshot

type ComponentSnapshot struct {
	Name           string                    `json:"name"`
	Status         string                    `json:"status"`
	Pressure       string                    `json:"pressure"`
	Active         int64                     `json:"active"`
	RowsPerSecond  float64                   `json:"rows_per_second"`
	BytesPerSecond float64                   `json:"bytes_per_second"`
	ErrorRate      float64                   `json:"error_rate"`
	Latency        LatencySnapshot           `json:"latency"`
	Queue          QueueSnapshot             `json:"queue"`
	Windows        map[string]WindowSnapshot `json:"windows"`
}

type Config

type Config struct {
	BufferSize  int
	FeedSize    int
	HistorySize int
}

type EdgeSnapshot

type EdgeSnapshot struct {
	From          string  `json:"from"`
	To            string  `json:"to"`
	ThroughputMBs float64 `json:"throughput_mbps"`
	Imbalance     float64 `json:"imbalance"`
	Status        string  `json:"status"`
	Pressure      string  `json:"pressure"`
}

type Event

type Event struct {
	Time      time.Time      `json:"time"`
	Component string         `json:"component"`
	Type      string         `json:"type"`
	Rows      int64          `json:"rows"`
	Bytes     int64          `json:"bytes"`
	Latency   time.Duration  `json:"latency"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

func ScopedEvent

func ScopedEvent(ctx context.Context, evt Event) Event

type FeedEvent

type FeedEvent struct {
	Time       time.Time `json:"time"`
	Path       string    `json:"path"`
	Component  string    `json:"component"`
	Type       string    `json:"type"`
	Message    string    `json:"message"`
	Rows       int64     `json:"rows"`
	Bytes      int64     `json:"bytes"`
	LatencyMS  float64   `json:"latency_ms"`
	QueueDepth int64     `json:"queue_depth"`
}

type HealthSnapshot

type HealthSnapshot struct {
	Status  string   `json:"status"`
	Reasons []string `json:"reasons"`
}

type LatencySnapshot

type LatencySnapshot struct {
	P50MS float64 `json:"p50_ms"`
	P95MS float64 `json:"p95_ms"`
	P99MS float64 `json:"p99_ms"`
}

type PathSnapshot

type PathSnapshot struct {
	Name          string          `json:"name"`
	Status        string          `json:"status"`
	RowsPerSecond float64         `json:"rows_per_second"`
	MBPerSecond   float64         `json:"mb_per_second"`
	ErrorRate     float64         `json:"error_rate"`
	Latency       LatencySnapshot `json:"latency"`
	Window        string          `json:"window"`
}

type Publisher

type Publisher interface {
	Publish(Event) bool
}

type QueueSnapshot

type QueueSnapshot struct {
	Current     int64   `json:"current"`
	Capacity    int64   `json:"capacity"`
	Utilization float64 `json:"utilization"`
	Max1s       int64   `json:"max_1s"`
	Max1m       int64   `json:"max_1m"`
	Unit        string  `json:"unit,omitempty"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(aggregator *Aggregator) *Server

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Snapshot

type Snapshot struct {
	Time          time.Time           `json:"time"`
	Components    []ComponentSnapshot `json:"components"`
	Edges         []EdgeSnapshot      `json:"edges"`
	Paths         []PathSnapshot      `json:"paths"`
	Feed          []FeedEvent         `json:"feed,omitempty"`
	Health        HealthSnapshot      `json:"health"`
	DroppedEvents uint64              `json:"dropped_events"`
}

type WindowSnapshot

type WindowSnapshot struct {
	RowsPerSecond  float64         `json:"rows_per_second"`
	BytesPerSecond float64         `json:"bytes_per_second"`
	ErrorRate      float64         `json:"error_rate"`
	Latency        LatencySnapshot `json:"latency"`
	QueueDepth     int64           `json:"queue_depth"`
	QueueCapacity  int64           `json:"queue_capacity"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL