Documentation
¶
Index ¶
- func ParseOTLPHeaders(raw string) map[string]string
- func RecordBrokerAutoclaim(ctx context.Context, jobID, result string, count int)
- func RecordBrokerCounterPELSkew(ctx context.Context, jobID string, skew float64)
- func RecordBrokerCounterSyncSkew(ctx context.Context, jobID string, skew float64)
- func RecordBrokerDispatch(ctx context.Context, jobID, outcome string)
- func RecordBrokerMessageAge(ctx context.Context, jobID string, ageMs float64)
- func RecordBrokerOutbox(ctx context.Context, backlog int64, oldestAgeSeconds float64)
- func RecordBrokerOutboxSweep(ctx context.Context, outcome string, count int)
- func RecordBrokerPELWithoutConsumer(ctx context.Context, count int64)
- func RecordBrokerPacerDelay(ctx context.Context, domain string, delayMs float64)
- func RecordBrokerPacerPushback(ctx context.Context, domain, reason string)
- func RecordBrokerRedisPing(ctx context.Context, duration time.Duration, ok bool)
- func RecordBrokerRedisPool(ctx context.Context, snap RedisPoolSnapshot)
- func RecordBrokerStreamStats(ctx context.Context, s BrokerStreamStats)
- func RecordCrawlerPhase(ctx context.Context, metrics CrawlerPhaseMetrics)
- func RecordDBPoolRejection(ctx context.Context)
- func RecordDBPoolStats(ctx context.Context, snapshot DBPoolSnapshot)
- func RecordDBPressureAdjustment(ctx context.Context, direction string)
- func RecordDBPressureStats(ctx context.Context, emaMs float64, limit int32)
- func RecordFDStats(ctx context.Context, current, limit int, pressure float64)
- func RecordHTMLPersistBodyBytes(ctx context.Context, bytes int64)
- func RecordHTMLPersistQueueDepth(ctx context.Context, depth int)
- func RecordHTMLPersistUpload(ctx context.Context, outcome string)
- func RecordHTMLPersistUploadDuration(ctx context.Context, duration time.Duration)
- func RecordJobConcurrencySnapshot(ctx context.Context, jobID string, runningTasks int64, concurrencyLimit int64, ...)
- func RecordJobInfoCacheHit(ctx context.Context, jobID string)
- func RecordJobInfoCacheInvalidation(ctx context.Context, jobID, reason string)
- func RecordJobInfoCacheMiss(ctx context.Context, jobID string)
- func RecordJobInfoCacheSize(ctx context.Context, size int)
- func RecordLighthouseRun(ctx context.Context, jobID, outcome string)
- func RecordLighthouseRunDuration(ctx context.Context, jobID, outcome string, durationMs float64)
- func RecordLighthouseRunRetry(ctx context.Context, jobID, reason string)
- func RecordLighthouseScheduled(ctx context.Context, jobID, band string, count int)
- func RecordSemaphoreWait(ctx context.Context, waitMs float64)
- func RecordTaskClaimAttempt(ctx context.Context, jobID string, latency time.Duration, status string)
- func RecordTaskWaiting(ctx context.Context, jobID string, reason string, count int)
- func RecordWorkerConcurrency(ctx context.Context, workerID int, delta int64, capacity int64)
- func RecordWorkerTask(ctx context.Context, metrics WorkerTaskMetrics)
- func RecordWorkerTaskFailure(ctx context.Context, jobID string, reason string)
- func RecordWorkerTaskOutcome(ctx context.Context, metrics WorkerTaskOutcomeMetrics)
- func RecordWorkerTaskRetry(ctx context.Context, jobID string, reason string)
- func StartWorkerTaskSpan(ctx context.Context, info WorkerTaskSpanInfo) (context.Context, trace.Span)
- func WrapHandler(handler http.Handler, prov *Providers) http.Handler
- type BrokerStreamStats
- type Config
- type CrawlerPhaseMetrics
- type DBPoolSnapshot
- type Providers
- type RedisPoolSnapshot
- type WorkerTaskMetrics
- type WorkerTaskOutcomeMetrics
- type WorkerTaskSpanInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParseOTLPHeaders ¶ added in v0.33.0
ParseOTLPHeaders converts a comma-separated `key=value` list (matching the OTEL_EXPORTER_OTLP_HEADERS env var format) into a map. Whitespace around pairs and tokens is trimmed; empty pairs, pairs without `=`, and entries with an empty key are skipped.
func RecordBrokerAutoclaim ¶ added in v0.33.0
RecordBrokerAutoclaim increments the autoclaim outcomes counter. result values: "reclaimed", "dead_letter".
func RecordBrokerCounterPELSkew ¶ added in v0.33.1
RecordBrokerCounterPELSkew records the absolute difference between the Redis HASH counter for a job and the authoritative XPENDING count observed during reconciliation. A persistent non-zero skew indicates the counter leaked (drift fix shipped in fix-broker-counter-drift). jobID retained for call-site stability; omitted from labels to keep cardinality bounded.
func RecordBrokerCounterSyncSkew ¶ added in v0.33.0
RecordBrokerCounterSyncSkew records the absolute difference between the Redis running counter and Postgres running_tasks at sync time. jobID retained for call-site stability; omitted from labels to keep cardinality bounded — the histogram captures the distribution across all jobs, which is the signal dashboards care about.
func RecordBrokerDispatch ¶ added in v0.33.0
RecordBrokerDispatch increments the dispatch outcomes counter. outcome values: "ok", "err", "capacity", "paced". jobID is retained for call-site stability but not emitted as a label.
func RecordBrokerMessageAge ¶ added in v0.33.0
RecordBrokerMessageAge records how long a stream message sat pending before a consumer received it. Call once per parsed XREADGROUP message.
func RecordBrokerOutbox ¶ added in v0.33.0
RecordBrokerOutbox emits the outbox backlog + age gauges.
func RecordBrokerOutboxSweep ¶ added in v0.33.1
RecordBrokerOutboxSweep increments the outbox sweep outcomes counter.
outcome values (mutually exclusive, per-row):
- "dispatched": row was ZADDed to Redis and DELETEd from task_outbox
- "retried": ScheduleBatch failed for this entry (or at pipeline level) and attempts/run_at were bumped
- "dead_lettered": attempts exceeded the cap and the row was moved to task_outbox_dead
func RecordBrokerPELWithoutConsumer ¶ added in v0.33.1
RecordBrokerPELWithoutConsumer emits the number of jobs with a non-zero stream PEL that are NOT in the worker's active-job set. In a healthy system this is always zero — a non-zero reading means dispatch/consume have diverged and those jobs' tasks are stalled.
func RecordBrokerPacerDelay ¶ added in v0.33.0
RecordBrokerPacerDelay records the effective per-domain pacing delay observed at TryAcquire time. domain is retained for API stability but not emitted as a label.
func RecordBrokerPacerPushback ¶ added in v0.33.0
RecordBrokerPacerPushback increments the pushback counter. reason values: "gate" (domain-gate NX hold), "rate_limited" (release feedback). domain is retained for call-site stability but not emitted as a label (per-domain cardinality is unbounded at launch scale).
func RecordBrokerRedisPing ¶ added in v0.33.0
RecordBrokerRedisPing emits the periodic Redis PING RTT.
func RecordBrokerRedisPool ¶ added in v0.33.0
func RecordBrokerRedisPool(ctx context.Context, snap RedisPoolSnapshot)
RecordBrokerRedisPool emits the Redis client pool gauges.
func RecordBrokerStreamStats ¶ added in v0.33.0
func RecordBrokerStreamStats(ctx context.Context, s BrokerStreamStats)
RecordBrokerStreamStats emits Tier 1 broker depth gauges aggregated across all active jobs in the probe tick. Per-job drill-down is intentionally unavailable — use traces or logs (which carry job_id) for that.
func RecordCrawlerPhase ¶ added in v0.32.6
func RecordCrawlerPhase(ctx context.Context, metrics CrawlerPhaseMetrics)
RecordCrawlerPhase emits duration and count metrics for a crawler phase.
func RecordDBPoolRejection ¶
RecordDBPoolRejection increments the pool rejection counter when requests are rejected before acquiring a connection.
func RecordDBPoolStats ¶
func RecordDBPoolStats(ctx context.Context, snapshot DBPoolSnapshot)
RecordDBPoolStats records database pool utilisation metrics.
func RecordDBPressureAdjustment ¶ added in v0.32.0
RecordDBPressureAdjustment increments the adjustment counter. direction must be "up" or "down".
func RecordDBPressureStats ¶ added in v0.32.0
RecordDBPressureStats records the pressure controller's current EMA and concurrency limit. Call this alongside RecordDBPoolStats for a complete pool+pressure snapshot in Grafana.
func RecordFDStats ¶ added in v0.31.4
RecordFDStats records file descriptor usage metrics.
func RecordHTMLPersistBodyBytes ¶ added in v0.33.9
RecordHTMLPersistBodyBytes records the compressed payload size of a single upload. Catches large-page surprises and helps right-size the in-flight memory budget.
func RecordHTMLPersistQueueDepth ¶ added in v0.33.9
RecordHTMLPersistQueueDepth emits the current persister channel depth. Called periodically by the persister itself; useful for tuning the HTML_PERSIST_QUEUE / HTML_PERSIST_WORKERS ratio.
func RecordHTMLPersistUpload ¶ added in v0.33.9
RecordHTMLPersistUpload increments the persister outcome counter. outcome values: "ok", "err", "skipped".
func RecordHTMLPersistUploadDuration ¶ added in v0.33.9
RecordHTMLPersistUploadDuration records how long a single PUT to R2 took. Caller passes the wall-clock duration for one object.
func RecordJobConcurrencySnapshot ¶
func RecordJobConcurrencySnapshot(ctx context.Context, jobID string, runningTasks int64, concurrencyLimit int64, unlimited bool)
RecordJobConcurrencySnapshot captures the running task count and concurrency limit for a job. jobID is retained in the signature for call-site stability but omitted from metric labels — per-job cardinality is unbounded at launch scale and these gauges are intended for worker-wide snapshots.
func RecordJobInfoCacheHit ¶
Note: jobID argument is retained for API stability and future span/trace correlation, but is no longer attached as a metric label. See RecordWorkerTask.
func RecordJobInfoCacheMiss ¶
func RecordJobInfoCacheSize ¶
func RecordLighthouseRun ¶ added in v0.33.10
RecordLighthouseRun increments the consumer-side run outcome counter. outcome values: "succeeded", "failed", "skipped_quota", "shed". "shed" tracks audits the runner deferred via the soft memory-shed circuit breaker (left in 'running', message redelivered later); a rising shed rate is the signal that the analysis fleet is memory-saturated and needs scaling up.
func RecordLighthouseRunDuration ¶ added in v0.33.10
RecordLighthouseRunDuration records the wall-clock time of one audit. outcome is included so the histogram separates the cost of successful runs from the cost of failures (failures often cluster at timeout).
func RecordLighthouseRunRetry ¶ added in v0.33.10
RecordLighthouseRunRetry increments the transient-retry counter. reason is a short tag identifying the recognised stderr substring that triggered the retry (e.g. "target_crashed", "protocol_error"); keep cardinality low — these are the named patterns from the runner's transientStderrSubstrings list, not free-form error text.
func RecordLighthouseScheduled ¶ added in v0.33.10
RecordLighthouseScheduled increments the scheduler enqueue counter. band values: "fastest", "slowest", "reconcile". jobID is retained for call-site stability but not emitted as a label (per-job cardinality).
func RecordSemaphoreWait ¶ added in v0.32.0
RecordSemaphoreWait records the time spent waiting to acquire a DB queue semaphore slot.
func RecordTaskClaimAttempt ¶
func RecordTaskClaimAttempt(ctx context.Context, jobID string, latency time.Duration, status string)
RecordTaskClaimAttempt records the latency of claiming a task from the queue. jobID is retained for signature stability but not emitted as a label.
func RecordTaskWaiting ¶
RecordTaskWaiting records when tasks move into the waiting queue along with the reason.
func RecordWorkerConcurrency ¶
RecordWorkerConcurrency records the change in concurrent tasks for a worker. delta: +1 when starting a task, -1 when completing capacity: the worker's concurrency limit (only recorded once per worker on startup)
func RecordWorkerTask ¶
func RecordWorkerTask(ctx context.Context, metrics WorkerTaskMetrics)
RecordWorkerTask emits worker task metrics when instrumentation is initialised.
Note: job.id is intentionally dropped from metric labels to keep Prometheus active-series cardinality bounded. Use spans (which still carry job.id on the worker.process_task span attributes) to pivot per-job when needed.
func RecordWorkerTaskFailure ¶
RecordWorkerTaskFailure records a permanently failed task.
func RecordWorkerTaskOutcome ¶ added in v0.32.6
func RecordWorkerTaskOutcome(ctx context.Context, metrics WorkerTaskOutcomeMetrics)
RecordWorkerTaskOutcome emits task processing duration grouped by outcome.
func RecordWorkerTaskRetry ¶
RecordWorkerTaskRetry records a retry attempt for a task.
func StartWorkerTaskSpan ¶
func StartWorkerTaskSpan(ctx context.Context, info WorkerTaskSpanInfo) (context.Context, trace.Span)
StartWorkerTaskSpan starts a span for an individual worker task.
Types ¶
type BrokerStreamStats ¶ added in v0.33.0
BrokerStreamStats captures worker-wide broker depth, aggregated across all active jobs. Previously emitted per-job with a job.id label; the label was removed to keep Mimir series cardinality bounded as job counts grow. The original dashboard queries used sum(...) across the per-job series, so emitting the pre-aggregated total preserves the dashboard semantics.
type Config ¶
type Config struct {
Enabled bool
ServiceName string
Environment string
OTLPEndpoint string
OTLPHeaders map[string]string
OTLPInsecure bool
MetricsAddress string
}
Config controls observability initialisation.
type CrawlerPhaseMetrics ¶ added in v0.32.6
type DBPoolSnapshot ¶
type DBPoolSnapshot struct {
InUse int
Idle int
WaitCount int64
WaitDuration time.Duration
MaxOpen int
Reserved int
Usage float64
}
DBPoolSnapshot describes a database connection pool state.
type Providers ¶
type Providers struct {
TracerProvider *sdktrace.TracerProvider
MeterProvider *sdkmetric.MeterProvider
Propagator propagation.TextMapPropagator
MetricsHandler http.Handler
Shutdown func(ctx context.Context) error
Config Config
}
Providers exposes configured telemetry providers.
type RedisPoolSnapshot ¶ added in v0.33.0
RedisPoolSnapshot mirrors the subset of *redis.PoolStats we care about.
type WorkerTaskMetrics ¶
type WorkerTaskMetrics struct {
JobID string
Status string
Duration time.Duration
QueueWait time.Duration
TotalDuration time.Duration
}
WorkerTaskMetrics describes a processed task for metric recording.