dashboard

package
v0.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package dashboard provides a Horizon / Bull-Board-style operational view over the parent queue package: live throughput, per-queue counters, and a failed-job list with retry / forget actions.

It is deliberately decoupled from any concrete Queue driver. Two seams connect it to a running system:

  • Stats — a concurrency-safe counter set the worker feeds via the OnPushed / OnProcessed / OnFailed / OnRetried hooks. The parent queue.Worker only exposes an OnFailed callback, so the remaining events are wired by the caller at dispatch / handler boundaries rather than discovered through an interface the Worker does not implement.
  • FailedJobs — a pluggable store of dead-lettered jobs the dashboard can list, retry, and forget. An in-memory reference implementation ships here (MemoryFailedJobs); a sqlqueue-backed store can be supplied by satisfying the same interface.

Wiring sketch:

st := dashboard.NewStats()
failed := dashboard.NewMemoryFailedJobs(func(ctx context.Context, j queue.Job) error {
    return q.Push(ctx, j) // re-enqueue on retry
})

worker.OnFailed(func(j queue.Job, err error) {
    st.OnFailed("default")
    failed.Record(j, "default", err)
})

mux.Handle("/queue/", http.StripPrefix("/queue",
    dashboard.New(st, failed).Handler()))

Everything is stdlib only (net/http, html/template, sync, time, encoding/json).

Example

Example wires the in-memory Inspector with a couple of dead-lettered jobs, mounts the dashboard Handler, and queries both the HTML overview and the JSON stats endpoint over httptest.

package main

import (
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"strings"

	"github.com/devituz/lagodev/queue"
	"github.com/devituz/lagodev/queue/dashboard"
)

// Example wires the in-memory Inspector with a couple of dead-lettered jobs,
// mounts the dashboard Handler, and queries both the HTML overview and the
// JSON stats endpoint over httptest.
func main() {
	// Counters the worker feeds, plus the dead-letter store.
	stats := dashboard.NewStats()
	failed := dashboard.NewMemoryFailedJobs(nil) // forget-only; no requeue needed here

	// Simulate two jobs on the "emails" queue that exhausted their retries.
	for _, j := range []queue.Job{
		{ID: "job-1", Name: "SendWelcomeEmail", Attempts: 3},
		{ID: "job-2", Name: "SendWelcomeEmail", Attempts: 3},
	} {
		stats.OnPushed("emails")
		stats.OnFailed("emails")
		failed.Record(j, "emails", errors.New("smtp: connection refused"))
	}

	srv := httptest.NewServer(dashboard.Handler(dashboard.NewInspector(stats, failed)))
	defer srv.Close()

	// 1) HTML overview must render and mention the queue.
	overview, err := http.Get(srv.URL + "/queues/")
	if err != nil {
		panic(err)
	}
	body, _ := io.ReadAll(overview.Body)
	overview.Body.Close()
	fmt.Println("overview status:", overview.StatusCode)
	fmt.Println("overview html:", containsAll(string(body), "Queue Overview", "emails"))

	// 2) JSON stats snapshot — read the failed count deterministically.
	resp, err := http.Get(srv.URL + "/queues/api/stats")
	if err != nil {
		panic(err)
	}
	fmt.Println("stats content-type:", resp.Header.Get("Content-Type"))

	var sv dashboard.StatsView
	if err := json.NewDecoder(resp.Body).Decode(&sv); err != nil {
		panic(err)
	}
	resp.Body.Close()

	fmt.Println("queues:", len(sv.Queues))
	fmt.Println("queue name:", sv.Queues[0].Queue)
	fmt.Println("queue failed:", sv.Queues[0].Failed)
	fmt.Println("totals failed:", sv.Totals.Failed)

	// 3) The Inspector lists both dead-lettered jobs.
	rows, _ := dashboard.NewInspector(stats, failed).Failed(1)
	fmt.Println("failed rows:", len(rows))

}

// containsAll reports whether s contains every substring in subs.
func containsAll(s string, subs ...string) bool {
	for _, sub := range subs {
		if !strings.Contains(s, sub) {
			return false
		}
	}
	return true
}
Output:
overview status: 200
overview html: true
stats content-type: application/json; charset=utf-8
queues: 1
queue name: emails
queue failed: 2
totals failed: 2
failed rows: 2

Index

Examples

Constants

View Source
const PageSize = 50

PageSize is the number of jobs returned per page by the Inspector-backed failed / pending listings.

Variables

View Source
var ErrNotFound = errors.New("dashboard: failed job not found")

ErrNotFound is returned by FailedJobs.Retry / Forget when the given id does not match a stored failed job.

Functions

func Handler

func Handler(insp Inspector) http.Handler

Handler returns an http.Handler exposing the full dashboard over insp. Mount it under a prefix with http.StripPrefix; routes are absolute under that prefix:

GET  /queues/                       overview (stats + throughput)
GET  /queues/failed                 failed list with retry/forget/flush
POST /queues/failed/{id}/retry      re-enqueue + drop  (csrf)
POST /queues/failed/{id}/forget     drop               (csrf)
POST /queues/failed/flush           drop all           (csrf)
GET  /queues/api/stats              JSON stats snapshot

All POST routes are CSRF-guarded via the double-submit cookie issued on the rendered pages. The handler is safe for concurrent use.

Types

type Dashboard

type Dashboard struct {
	// contains filtered or unexported fields
}

Dashboard serves the operational view. Construct it with New, mount its Handler under a path prefix (typically with http.StripPrefix), and feed the Stats and FailedJobs from your worker wiring.

Routes (relative to the mount point):

GET  /            overview: per-queue stats + throughput
GET  /failed      failed-job list with retry / forget buttons
POST /failed/retry   form: id, csrf  — re-enqueues and removes a job
POST /failed/forget  form: id, csrf  — drops a job
GET  /api/stats   JSON snapshot of per-queue stats + totals
GET  /api/failed  JSON list of failed jobs

The Handler is safe for concurrent use.

func New

func New(stats *Stats, failed FailedJobs) *Dashboard

New builds a Dashboard over the given collector and failed-job store. Either may be nil — a nil Stats renders an empty overview and a nil FailedJobs renders an empty failed list (and rejects POST actions).

func (*Dashboard) Handler

func (d *Dashboard) Handler() http.Handler

Handler returns the http.Handler exposing every dashboard route. Mount it under a prefix:

mux.Handle("/queue/", http.StripPrefix("/queue", dash.Handler()))

type FailedJob

type FailedJob struct {
	ID       string    `json:"id"`        // store-local identity (defaults to Job.ID)
	Queue    string    `json:"queue"`     // logical queue label
	Job      queue.Job `json:"job"`       // original job, replayed on retry
	Err      string    `json:"error"`     // last handler error message
	FailedAt time.Time `json:"failed_at"` // when it was dead-lettered
}

FailedJob is a dead-lettered job plus the metadata the dashboard needs to display and act on it. The embedded queue.Job is the original payload that Retry re-enqueues verbatim.

type FailedJobs

type FailedJobs interface {
	// List returns every stored failed job. Ordering is implementation-
	// defined; the reference store returns newest-first.
	List(ctx context.Context) ([]FailedJob, error)
	// Retry re-enqueues the job with the given id and removes it from the
	// store. Returns ErrNotFound if no such job exists.
	Retry(ctx context.Context, id string) error
	// Forget permanently drops the job with the given id without
	// re-enqueueing. Returns ErrNotFound if no such job exists.
	Forget(ctx context.Context, id string) error
}

FailedJobs is a pluggable store of dead-lettered jobs. The dashboard reads it for the failed-jobs view and mutates it through the retry / forget actions. Implementations must be safe for concurrent use.

MemoryFailedJobs is the reference implementation; a sqlqueue-backed store can be supplied by satisfying this interface (e.g. selecting from a `failed_jobs` table and re-inserting into the jobs table on Retry).

type Inspector

type Inspector interface {
	// Stats returns the per-queue counters plus aggregate totals.
	Stats() (StatsView, error)
	// Pending lists jobs waiting on the named queue, paginated (1-based).
	Pending(queue string, page int) ([]JobView, error)
	// Failed lists dead-lettered jobs across all queues, paginated (1-based).
	Failed(page int) ([]JobView, error)
	// Retry re-enqueues the failed job with the given id and drops the record.
	Retry(id string) error
	// Forget drops the failed job with the given id without re-enqueueing.
	Forget(id string) error
	// FlushFailed drops every failed job.
	FlushFailed() error
}

Inspector is the read/act seam the Handler renders. It is satisfied by an in-memory adapter (NewInspector), a sqlqueue-backed implementation, or a test fake. All methods must be safe for concurrent use.

func NewInspector

func NewInspector(stats *Stats, failed *MemoryFailedJobs) Inspector

NewInspector builds an Inspector over an in-process Stats collector and MemoryFailedJobs store. Either may be nil: a nil Stats yields empty counters and a nil store yields an empty failed list (and rejects the mutating actions).

type JobView

type JobView struct {
	ID       string    `json:"id"`
	Queue    string    `json:"queue"`
	Name     string    `json:"name"`
	Err      string    `json:"error,omitempty"`
	Attempts int       `json:"attempts"`
	FailedAt time.Time `json:"failed_at,omitempty"`
}

JobView is a flat, render-ready row describing one job for the dashboard. It is intentionally decoupled from queue.Job so an Inspector backed by a database can project only the columns it needs. Every string field is auto-escaped by html/template at render time.

type MemoryFailedJobs

type MemoryFailedJobs struct {
	// contains filtered or unexported fields
}

MemoryFailedJobs is an in-process FailedJobs store. It is the reference implementation used by tests and single-process apps. Safe for concurrent use.

func NewMemoryFailedJobs

func NewMemoryFailedJobs(requeue RequeueFunc) *MemoryFailedJobs

NewMemoryFailedJobs returns an empty store. requeue is invoked by Retry to push the job back onto its queue; pass nil for a forget-only store.

func (*MemoryFailedJobs) Forget

func (m *MemoryFailedJobs) Forget(_ context.Context, id string) error

Forget drops the record without re-enqueueing.

func (*MemoryFailedJobs) Len

func (m *MemoryFailedJobs) Len() int

Len reports the number of stored failed jobs (primarily for tests).

func (*MemoryFailedJobs) List

List returns stored failed jobs newest-first.

func (*MemoryFailedJobs) Record

func (m *MemoryFailedJobs) Record(j queue.Job, queueName string, err error)

Record stores a dead-lettered job. Call it from queue.Worker.OnFailed. queueName is the logical queue label; errMsg is the last handler error. The store id defaults to the job's ID.

func (*MemoryFailedJobs) Retry

func (m *MemoryFailedJobs) Retry(ctx context.Context, id string) error

Retry re-enqueues the job via the configured RequeueFunc and removes it from the store. If requeue fails the record is kept so the action can be retried, and the error is returned.

func (*MemoryFailedJobs) WithStats

func (m *MemoryFailedJobs) WithStats(s *Stats) *MemoryFailedJobs

WithStats wires a Stats so successful Retry calls bump OnRetried for the job's queue. Returns the receiver for chaining.

type QueueStat

type QueueStat struct {
	Queue      string  `json:"queue"`
	Pushed     uint64  `json:"pushed"`
	Processed  uint64  `json:"processed"`
	Failed     uint64  `json:"failed"`
	Retried    uint64  `json:"retried"`
	Pending    int64   `json:"pending"`    // Pushed-Processed-Failed, floored at 0
	Throughput float64 `json:"throughput"` // processed jobs/sec over the window
}

QueueStat is an immutable snapshot of one logical queue's counters, produced by Stats.Snapshot. Throughput is jobs processed per second averaged over the rolling window.

type RequeueFunc

type RequeueFunc func(ctx context.Context, j queue.Job) error

RequeueFunc re-enqueues a job during Retry. It typically wraps a Queue.Push. A nil RequeueFunc makes Retry forget-only (it still removes the record but performs no enqueue), which is occasionally useful in read-only dashboards.

type Sampler

type Sampler struct {
	// contains filtered or unexported fields
}

Sampler periodically reads an Inspector's processed counts and derives a jobs/min rate per queue over a sliding window. It runs a single background goroutine that must be stopped with Stop to avoid a leak.

func NewSampler

func NewSampler(insp Inspector, interval, window time.Duration) *Sampler

NewSampler builds a Sampler reading insp every interval and reporting a jobs/min rate averaged over window. Call Start to launch the goroutine and Stop to shut it down. interval and window default to 5s / 1m when <= 0.

func (*Sampler) Rate

func (s *Sampler) Rate(queue string) float64

Rate returns the most recent jobs/min reading for queue, or 0 if none has been recorded.

func (*Sampler) Sample

func (s *Sampler) Sample()

Sample forces an immediate reading. It is exported so tests can drive the sampler deterministically without waiting on the ticker.

func (*Sampler) Start

func (s *Sampler) Start() *Sampler

Start launches the sampling goroutine. It returns the receiver for chaining. Start is idempotent: a second call is a no-op.

func (*Sampler) Stop

func (s *Sampler) Stop()

Stop signals the sampling goroutine to exit and blocks until it has. It is safe to call multiple times and safe to call when Start was never invoked (it then returns promptly instead of blocking on a goroutine that will never close done).

func (*Sampler) Window

func (s *Sampler) Window(queue string) []ThroughputSample

Window returns a copy of the retained samples for queue (oldest first).

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats is a concurrency-safe collector of per-queue counters plus a rolling throughput window. The zero value is not usable; call NewStats.

A "queue" here is any caller-chosen label (queue.Job carries no queue name of its own), letting a single Stats track several logical queues fed by distinct workers.

func NewStats

func NewStats() *Stats

NewStats returns an empty, ready-to-use collector.

func (*Stats) OnFailed

func (s *Stats) OnFailed(queue string)

OnFailed records one job that exhausted its retries and was dead- lettered. Wire it from queue.Worker.OnFailed.

func (*Stats) OnProcessed

func (s *Stats) OnProcessed(queue string)

OnProcessed records one successfully handled job and advances the throughput window. Call it from the worker after a handler returns nil.

func (*Stats) OnPushed

func (s *Stats) OnPushed(queue string)

OnPushed records that one job was enqueued onto queue. Call it from the dispatch path (e.g. wrapping queue.Dispatch).

func (*Stats) OnRetried

func (s *Stats) OnRetried(queue string)

OnRetried records one job that failed an attempt and was re-queued (i.e. a Nack with attempts remaining), or one re-enqueued from the failed list. It is informational and does not feed throughput.

func (*Stats) Snapshot

func (s *Stats) Snapshot() []QueueStat

Snapshot returns a stable, sorted-by-queue copy of every counter set. Safe to call concurrently with the On* hooks.

func (*Stats) Totals

func (s *Stats) Totals() QueueStat

Totals collapses every queue into a single aggregate row (Queue == ""). Throughput is the sum of per-queue throughputs.

type StatsView

type StatsView struct {
	Queues []QueueStat `json:"queues"`
	Totals QueueStat   `json:"totals"`
}

StatsView is the snapshot returned by Inspector.Stats: per-queue rows plus an aggregate total. It mirrors the JSON shape of the /queues/api/stats endpoint.

type ThroughputSample

type ThroughputSample struct {
	At    time.Time `json:"at"`
	Queue string    `json:"queue"`
	Rate  float64   `json:"rate"` // jobs processed per minute over the trailing window
}

ThroughputSample is one reading of the jobs/min rate for a queue, taken at a fixed cadence by Sampler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL