observability

package
v0.33.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseOTLPHeaders added in v0.33.0

func ParseOTLPHeaders(raw string) map[string]string

ParseOTLPHeaders converts a comma-separated `key=value` list (matching the OTEL_EXPORTER_OTLP_HEADERS env var format) into a map. Whitespace around pairs and tokens is trimmed; empty pairs, pairs without `=`, and entries with an empty key are skipped.

func RecordBrokerAutoclaim added in v0.33.0

func RecordBrokerAutoclaim(ctx context.Context, jobID, result string, count int)

RecordBrokerAutoclaim increments the autoclaim outcomes counter. result values: "reclaimed", "dead_letter".

func RecordBrokerCounterPELSkew added in v0.33.1

func RecordBrokerCounterPELSkew(ctx context.Context, jobID string, skew float64)

RecordBrokerCounterPELSkew records the absolute difference between the Redis HASH counter for a job and the authoritative XPENDING count observed during reconciliation. A persistent non-zero skew indicates the counter leaked (drift fix shipped in fix-broker-counter-drift). jobID retained for call-site stability; omitted from labels to keep cardinality bounded.

func RecordBrokerCounterSyncSkew added in v0.33.0

func RecordBrokerCounterSyncSkew(ctx context.Context, jobID string, skew float64)

RecordBrokerCounterSyncSkew records the absolute difference between the Redis running counter and Postgres running_tasks at sync time. jobID retained for call-site stability; omitted from labels to keep cardinality bounded — the histogram captures the distribution across all jobs, which is the signal dashboards care about.

func RecordBrokerDispatch added in v0.33.0

func RecordBrokerDispatch(ctx context.Context, jobID, outcome string)

RecordBrokerDispatch increments the dispatch outcomes counter. outcome values: "ok", "err", "capacity", "paced". jobID is retained for call-site stability but not emitted as a label.

func RecordBrokerMessageAge added in v0.33.0

func RecordBrokerMessageAge(ctx context.Context, jobID string, ageMs float64)

RecordBrokerMessageAge records how long a stream message sat pending before a consumer received it. Call once per parsed XREADGROUP message.

func RecordBrokerOutbox added in v0.33.0

func RecordBrokerOutbox(ctx context.Context, backlog int64, oldestAgeSeconds float64)

RecordBrokerOutbox emits the outbox backlog + age gauges.

func RecordBrokerOutboxSweep added in v0.33.1

func RecordBrokerOutboxSweep(ctx context.Context, outcome string, count int)

RecordBrokerOutboxSweep increments the outbox sweep outcomes counter.

outcome values (mutually exclusive, per-row):

  • "dispatched": row was ZADDed to Redis and DELETEd from task_outbox
  • "retried": ScheduleBatch failed for this entry (or at pipeline level) and attempts/run_at were bumped
  • "dead_lettered": attempts exceeded the cap and the row was moved to task_outbox_dead

func RecordBrokerPELWithoutConsumer added in v0.33.1

func RecordBrokerPELWithoutConsumer(ctx context.Context, count int64)

RecordBrokerPELWithoutConsumer emits the number of jobs with a non-zero stream PEL that are NOT in the worker's active-job set. In a healthy system this is always zero — a non-zero reading means dispatch/consume have diverged and those jobs' tasks are stalled.

func RecordBrokerPacerDelay added in v0.33.0

func RecordBrokerPacerDelay(ctx context.Context, domain string, delayMs float64)

RecordBrokerPacerDelay records the effective per-domain pacing delay observed at TryAcquire time. domain is retained for API stability but not emitted as a label.

func RecordBrokerPacerPushback added in v0.33.0

func RecordBrokerPacerPushback(ctx context.Context, domain, reason string)

RecordBrokerPacerPushback increments the pushback counter. reason values: "gate" (domain-gate NX hold), "rate_limited" (release feedback). domain is retained for call-site stability but not emitted as a label (per-domain cardinality is unbounded at launch scale).

func RecordBrokerRedisPing added in v0.33.0

func RecordBrokerRedisPing(ctx context.Context, duration time.Duration, ok bool)

RecordBrokerRedisPing emits the periodic Redis PING RTT.

func RecordBrokerRedisPool added in v0.33.0

func RecordBrokerRedisPool(ctx context.Context, snap RedisPoolSnapshot)

RecordBrokerRedisPool emits the Redis client pool gauges.

func RecordBrokerStreamStats added in v0.33.0

func RecordBrokerStreamStats(ctx context.Context, s BrokerStreamStats)

RecordBrokerStreamStats emits Tier 1 broker depth gauges aggregated across all active jobs in the probe tick. Per-job drill-down is intentionally unavailable — use traces or logs (which carry job_id) for that.

func RecordCrawlerPhase added in v0.32.6

func RecordCrawlerPhase(ctx context.Context, metrics CrawlerPhaseMetrics)

RecordCrawlerPhase emits duration and count metrics for a crawler phase.

func RecordDBPoolRejection

func RecordDBPoolRejection(ctx context.Context)

RecordDBPoolRejection increments the pool rejection counter when requests are rejected before acquiring a connection.

func RecordDBPoolStats

func RecordDBPoolStats(ctx context.Context, snapshot DBPoolSnapshot)

RecordDBPoolStats records database pool utilisation metrics.

func RecordDBPressureAdjustment added in v0.32.0

func RecordDBPressureAdjustment(ctx context.Context, direction string)

RecordDBPressureAdjustment increments the adjustment counter. direction must be "up" or "down".

func RecordDBPressureStats added in v0.32.0

func RecordDBPressureStats(ctx context.Context, emaMs float64, limit int32)

RecordDBPressureStats records the pressure controller's current EMA and concurrency limit. Call this alongside RecordDBPoolStats for a complete pool+pressure snapshot in Grafana.

func RecordFDStats added in v0.31.4

func RecordFDStats(ctx context.Context, current, limit int, pressure float64)

RecordFDStats records file descriptor usage metrics.

func RecordHTMLPersistBodyBytes added in v0.33.9

func RecordHTMLPersistBodyBytes(ctx context.Context, bytes int64)

RecordHTMLPersistBodyBytes records the compressed payload size of a single upload. Catches large-page surprises and helps right-size the in-flight memory budget.

func RecordHTMLPersistQueueDepth added in v0.33.9

func RecordHTMLPersistQueueDepth(ctx context.Context, depth int)

RecordHTMLPersistQueueDepth emits the current persister channel depth. Called periodically by the persister itself; useful for tuning the HTML_PERSIST_QUEUE / HTML_PERSIST_WORKERS ratio.

func RecordHTMLPersistUpload added in v0.33.9

func RecordHTMLPersistUpload(ctx context.Context, outcome string)

RecordHTMLPersistUpload increments the persister outcome counter. outcome values: "ok", "err", "skipped".

func RecordHTMLPersistUploadDuration added in v0.33.9

func RecordHTMLPersistUploadDuration(ctx context.Context, duration time.Duration)

RecordHTMLPersistUploadDuration records how long a single PUT to R2 took. Caller passes the wall-clock duration for one object.

func RecordJobConcurrencySnapshot

func RecordJobConcurrencySnapshot(ctx context.Context, jobID string, runningTasks int64, concurrencyLimit int64, unlimited bool)

RecordJobConcurrencySnapshot captures the running task count and concurrency limit for a job. jobID is retained in the signature for call-site stability but omitted from metric labels — per-job cardinality is unbounded at launch scale and these gauges are intended for worker-wide snapshots.

func RecordJobInfoCacheHit

func RecordJobInfoCacheHit(ctx context.Context, jobID string)

Note: jobID argument is retained for API stability and future span/trace correlation, but is no longer attached as a metric label. See RecordWorkerTask.

func RecordJobInfoCacheInvalidation

func RecordJobInfoCacheInvalidation(ctx context.Context, jobID, reason string)

func RecordJobInfoCacheMiss

func RecordJobInfoCacheMiss(ctx context.Context, jobID string)

func RecordJobInfoCacheSize

func RecordJobInfoCacheSize(ctx context.Context, size int)

func RecordLighthouseRun added in v0.33.10

func RecordLighthouseRun(ctx context.Context, jobID, outcome string)

RecordLighthouseRun increments the consumer-side run outcome counter. outcome values: "succeeded", "failed", "skipped_quota", "shed". "shed" tracks audits the runner deferred via the soft memory-shed circuit breaker (left in 'running', message redelivered later); a rising shed rate is the signal that the analysis fleet is memory-saturated and needs scaling up.

func RecordLighthouseRunDuration added in v0.33.10

func RecordLighthouseRunDuration(ctx context.Context, jobID, outcome string, durationMs float64)

RecordLighthouseRunDuration records the wall-clock time of one audit. outcome is included so the histogram separates the cost of successful runs from the cost of failures (failures often cluster at timeout).

func RecordLighthouseRunRetry added in v0.33.10

func RecordLighthouseRunRetry(ctx context.Context, jobID, reason string)

RecordLighthouseRunRetry increments the transient-retry counter. reason is a short tag identifying the recognised stderr substring that triggered the retry (e.g. "target_crashed", "protocol_error"); keep cardinality low — these are the named patterns from the runner's transientStderrSubstrings list, not free-form error text.

func RecordLighthouseScheduled added in v0.33.10

func RecordLighthouseScheduled(ctx context.Context, jobID, band string, count int)

RecordLighthouseScheduled increments the scheduler enqueue counter. band values: "fastest", "slowest", "reconcile". jobID is retained for call-site stability but not emitted as a label (per-job cardinality).

func RecordSemaphoreWait added in v0.32.0

func RecordSemaphoreWait(ctx context.Context, waitMs float64)

RecordSemaphoreWait records the time spent waiting to acquire a DB queue semaphore slot.

func RecordTaskClaimAttempt

func RecordTaskClaimAttempt(ctx context.Context, jobID string, latency time.Duration, status string)

RecordTaskClaimAttempt records the latency of claiming a task from the queue. jobID is retained for signature stability but not emitted as a label.

func RecordTaskWaiting

func RecordTaskWaiting(ctx context.Context, jobID string, reason string, count int)

RecordTaskWaiting records when tasks move into the waiting queue along with the reason.

func RecordWorkerConcurrency

func RecordWorkerConcurrency(ctx context.Context, workerID int, delta int64, capacity int64)

RecordWorkerConcurrency records the change in concurrent tasks for a worker. delta: +1 when starting a task, -1 when completing capacity: the worker's concurrency limit (only recorded once per worker on startup)

func RecordWorkerTask

func RecordWorkerTask(ctx context.Context, metrics WorkerTaskMetrics)

RecordWorkerTask emits worker task metrics when instrumentation is initialised.

Note: job.id is intentionally dropped from metric labels to keep Prometheus active-series cardinality bounded. Use spans (which still carry job.id on the worker.process_task span attributes) to pivot per-job when needed.

func RecordWorkerTaskFailure

func RecordWorkerTaskFailure(ctx context.Context, jobID string, reason string)

RecordWorkerTaskFailure records a permanently failed task.

func RecordWorkerTaskOutcome added in v0.32.6

func RecordWorkerTaskOutcome(ctx context.Context, metrics WorkerTaskOutcomeMetrics)

RecordWorkerTaskOutcome emits task processing duration grouped by outcome.

func RecordWorkerTaskRetry

func RecordWorkerTaskRetry(ctx context.Context, jobID string, reason string)

RecordWorkerTaskRetry records a retry attempt for a task.

func StartWorkerTaskSpan

func StartWorkerTaskSpan(ctx context.Context, info WorkerTaskSpanInfo) (context.Context, trace.Span)

StartWorkerTaskSpan starts a span for an individual worker task.

func WrapHandler

func WrapHandler(handler http.Handler, prov *Providers) http.Handler

WrapHandler applies OpenTelemetry instrumentation to an http.Handler when the providers are active.

Types

type BrokerStreamStats added in v0.33.0

type BrokerStreamStats struct {
	StreamLength   int64
	ScheduledDepth int64
	Pending        int64
}

BrokerStreamStats captures worker-wide broker depth, aggregated across all active jobs. Previously emitted per-job with a job.id label; the label was removed to keep Mimir series cardinality bounded as job counts grow. The original dashboard queries used sum(...) across the per-job series, so emitting the pre-aggregated total preserves the dashboard semantics.

type Config

type Config struct {
	Enabled        bool
	ServiceName    string
	Environment    string
	OTLPEndpoint   string
	OTLPHeaders    map[string]string
	OTLPInsecure   bool
	MetricsAddress string
}

Config controls observability initialisation.

type CrawlerPhaseMetrics added in v0.32.6

type CrawlerPhaseMetrics struct {
	Phase    string
	Outcome  string
	Duration time.Duration
}

type DBPoolSnapshot

type DBPoolSnapshot struct {
	InUse        int
	Idle         int
	WaitCount    int64
	WaitDuration time.Duration
	MaxOpen      int
	Reserved     int
	Usage        float64
}

DBPoolSnapshot describes a database connection pool state.

type Providers

type Providers struct {
	TracerProvider *sdktrace.TracerProvider
	MeterProvider  *sdkmetric.MeterProvider
	Propagator     propagation.TextMapPropagator
	MetricsHandler http.Handler
	Shutdown       func(ctx context.Context) error
	Config         Config
}

Providers exposes configured telemetry providers.

func Init

func Init(ctx context.Context, cfg Config) (*Providers, error)

Init configures tracing and metrics exporters. When cfg.Enabled is false the function is a no-op.

type RedisPoolSnapshot added in v0.33.0

type RedisPoolSnapshot struct {
	InUse int64
	Idle  int64
	Waits int64
}

RedisPoolSnapshot mirrors the subset of *redis.PoolStats we care about.

type WorkerTaskMetrics

type WorkerTaskMetrics struct {
	JobID         string
	Status        string
	Duration      time.Duration
	QueueWait     time.Duration
	TotalDuration time.Duration
}

WorkerTaskMetrics describes a processed task for metric recording.

type WorkerTaskOutcomeMetrics added in v0.32.6

type WorkerTaskOutcomeMetrics struct {
	JobID    string
	Outcome  string
	Reason   string
	Duration time.Duration
}

type WorkerTaskSpanInfo

type WorkerTaskSpanInfo struct {
	JobID     string
	TaskID    string
	Domain    string
	Path      string
	FindLinks bool
}

WorkerTaskSpanInfo describes the attributes used when starting a worker task span.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL