Documentation
¶
Overview ¶
Package dashboard provides a Horizon / Bull-Board-style operational view over the parent queue package: live throughput, per-queue counters, and a failed-job list with retry / forget actions.
It is deliberately decoupled from any concrete Queue driver. Two seams connect it to a running system:
- Stats — a concurrency-safe counter set the worker feeds via the OnPushed / OnProcessed / OnFailed / OnRetried hooks. The parent queue.Worker only exposes an OnFailed callback, so the remaining events are wired by the caller at dispatch / handler boundaries rather than discovered through an interface the Worker does not implement.
- FailedJobs — a pluggable store of dead-lettered jobs the dashboard can list, retry, and forget. An in-memory reference implementation ships here (MemoryFailedJobs); a sqlqueue-backed store can be supplied by satisfying the same interface.
Wiring sketch:
st := dashboard.NewStats()
failed := dashboard.NewMemoryFailedJobs(func(ctx context.Context, j queue.Job) error {
return q.Push(ctx, j) // re-enqueue on retry
})
worker.OnFailed(func(j queue.Job, err error) {
st.OnFailed("default")
failed.Record(j, "default", err)
})
mux.Handle("/queue/", http.StripPrefix("/queue",
dashboard.New(st, failed).Handler()))
Everything is stdlib only (net/http, html/template, sync, time, encoding/json).
Example ¶
Example wires the in-memory Inspector with a couple of dead-lettered jobs, mounts the dashboard Handler, and queries both the HTML overview and the JSON stats endpoint over httptest.
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"github.com/devituz/lagodev/queue"
"github.com/devituz/lagodev/queue/dashboard"
)
// Example wires the in-memory Inspector with a couple of dead-lettered jobs,
// mounts the dashboard Handler, and queries both the HTML overview and the
// JSON stats endpoint over httptest.
func main() {
// Counters the worker feeds, plus the dead-letter store.
stats := dashboard.NewStats()
failed := dashboard.NewMemoryFailedJobs(nil) // forget-only; no requeue needed here
// Simulate two jobs on the "emails" queue that exhausted their retries.
for _, j := range []queue.Job{
{ID: "job-1", Name: "SendWelcomeEmail", Attempts: 3},
{ID: "job-2", Name: "SendWelcomeEmail", Attempts: 3},
} {
stats.OnPushed("emails")
stats.OnFailed("emails")
failed.Record(j, "emails", errors.New("smtp: connection refused"))
}
srv := httptest.NewServer(dashboard.Handler(dashboard.NewInspector(stats, failed)))
defer srv.Close()
// 1) HTML overview must render and mention the queue.
overview, err := http.Get(srv.URL + "/queues/")
if err != nil {
panic(err)
}
body, _ := io.ReadAll(overview.Body)
overview.Body.Close()
fmt.Println("overview status:", overview.StatusCode)
fmt.Println("overview html:", containsAll(string(body), "Queue Overview", "emails"))
// 2) JSON stats snapshot — read the failed count deterministically.
resp, err := http.Get(srv.URL + "/queues/api/stats")
if err != nil {
panic(err)
}
fmt.Println("stats content-type:", resp.Header.Get("Content-Type"))
var sv dashboard.StatsView
if err := json.NewDecoder(resp.Body).Decode(&sv); err != nil {
panic(err)
}
resp.Body.Close()
fmt.Println("queues:", len(sv.Queues))
fmt.Println("queue name:", sv.Queues[0].Queue)
fmt.Println("queue failed:", sv.Queues[0].Failed)
fmt.Println("totals failed:", sv.Totals.Failed)
// 3) The Inspector lists both dead-lettered jobs.
rows, _ := dashboard.NewInspector(stats, failed).Failed(1)
fmt.Println("failed rows:", len(rows))
}
// containsAll reports whether s contains every substring in subs.
func containsAll(s string, subs ...string) bool {
for _, sub := range subs {
if !strings.Contains(s, sub) {
return false
}
}
return true
}
Output: overview status: 200 overview html: true stats content-type: application/json; charset=utf-8 queues: 1 queue name: emails queue failed: 2 totals failed: 2 failed rows: 2
Index ¶
- Constants
- Variables
- func Handler(insp Inspector) http.Handler
- type Dashboard
- type FailedJob
- type FailedJobs
- type Inspector
- type JobView
- type MemoryFailedJobs
- func (m *MemoryFailedJobs) Forget(_ context.Context, id string) error
- func (m *MemoryFailedJobs) Len() int
- func (m *MemoryFailedJobs) List(_ context.Context) ([]FailedJob, error)
- func (m *MemoryFailedJobs) Record(j queue.Job, queueName string, err error)
- func (m *MemoryFailedJobs) Retry(ctx context.Context, id string) error
- func (m *MemoryFailedJobs) WithStats(s *Stats) *MemoryFailedJobs
- type QueueStat
- type RequeueFunc
- type Sampler
- type Stats
- type StatsView
- type ThroughputSample
Examples ¶
Constants ¶
const PageSize = 50
PageSize is the number of jobs returned per page by the Inspector-backed failed / pending listings.
Variables ¶
var ErrNotFound = errors.New("dashboard: failed job not found")
ErrNotFound is returned by FailedJobs.Retry / Forget when the given id does not match a stored failed job.
Functions ¶
func Handler ¶
Handler returns an http.Handler exposing the full dashboard over insp. Mount it under a prefix with http.StripPrefix; routes are absolute under that prefix:
GET /queues/ overview (stats + throughput)
GET /queues/failed failed list with retry/forget/flush
POST /queues/failed/{id}/retry re-enqueue + drop (csrf)
POST /queues/failed/{id}/forget drop (csrf)
POST /queues/failed/flush drop all (csrf)
GET /queues/api/stats JSON stats snapshot
All POST routes are CSRF-guarded via the double-submit cookie issued on the rendered pages. The handler is safe for concurrent use.
Types ¶
type Dashboard ¶
type Dashboard struct {
// contains filtered or unexported fields
}
Dashboard serves the operational view. Construct it with New, mount its Handler under a path prefix (typically with http.StripPrefix), and feed the Stats and FailedJobs from your worker wiring.
Routes (relative to the mount point):
GET / overview: per-queue stats + throughput GET /failed failed-job list with retry / forget buttons POST /failed/retry form: id, csrf — re-enqueues and removes a job POST /failed/forget form: id, csrf — drops a job GET /api/stats JSON snapshot of per-queue stats + totals GET /api/failed JSON list of failed jobs
The Handler is safe for concurrent use.
func New ¶
func New(stats *Stats, failed FailedJobs) *Dashboard
New builds a Dashboard over the given collector and failed-job store. Either may be nil — a nil Stats renders an empty overview and a nil FailedJobs renders an empty failed list (and rejects POST actions).
type FailedJob ¶
type FailedJob struct {
ID string `json:"id"` // store-local identity (defaults to Job.ID)
Queue string `json:"queue"` // logical queue label
Job queue.Job `json:"job"` // original job, replayed on retry
Err string `json:"error"` // last handler error message
FailedAt time.Time `json:"failed_at"` // when it was dead-lettered
}
FailedJob is a dead-lettered job plus the metadata the dashboard needs to display and act on it. The embedded queue.Job is the original payload that Retry re-enqueues verbatim.
type FailedJobs ¶
type FailedJobs interface {
// List returns every stored failed job. Ordering is implementation-
// defined; the reference store returns newest-first.
List(ctx context.Context) ([]FailedJob, error)
// Retry re-enqueues the job with the given id and removes it from the
// store. Returns ErrNotFound if no such job exists.
Retry(ctx context.Context, id string) error
// Forget permanently drops the job with the given id without
// re-enqueueing. Returns ErrNotFound if no such job exists.
Forget(ctx context.Context, id string) error
}
FailedJobs is a pluggable store of dead-lettered jobs. The dashboard reads it for the failed-jobs view and mutates it through the retry / forget actions. Implementations must be safe for concurrent use.
MemoryFailedJobs is the reference implementation; a sqlqueue-backed store can be supplied by satisfying this interface (e.g. selecting from a `failed_jobs` table and re-inserting into the jobs table on Retry).
type Inspector ¶
type Inspector interface {
// Stats returns the per-queue counters plus aggregate totals.
Stats() (StatsView, error)
// Pending lists jobs waiting on the named queue, paginated (1-based).
Pending(queue string, page int) ([]JobView, error)
// Failed lists dead-lettered jobs across all queues, paginated (1-based).
Failed(page int) ([]JobView, error)
// Retry re-enqueues the failed job with the given id and drops the record.
Retry(id string) error
// Forget drops the failed job with the given id without re-enqueueing.
Forget(id string) error
// FlushFailed drops every failed job.
FlushFailed() error
}
Inspector is the read/act seam the Handler renders. It is satisfied by an in-memory adapter (NewInspector), a sqlqueue-backed implementation, or a test fake. All methods must be safe for concurrent use.
func NewInspector ¶
func NewInspector(stats *Stats, failed *MemoryFailedJobs) Inspector
NewInspector builds an Inspector over an in-process Stats collector and MemoryFailedJobs store. Either may be nil: a nil Stats yields empty counters and a nil store yields an empty failed list (and rejects the mutating actions).
type JobView ¶
type JobView struct {
ID string `json:"id"`
Queue string `json:"queue"`
Name string `json:"name"`
Err string `json:"error,omitempty"`
Attempts int `json:"attempts"`
FailedAt time.Time `json:"failed_at,omitempty"`
}
JobView is a flat, render-ready row describing one job for the dashboard. It is intentionally decoupled from queue.Job so an Inspector backed by a database can project only the columns it needs. Every string field is auto-escaped by html/template at render time.
type MemoryFailedJobs ¶
type MemoryFailedJobs struct {
// contains filtered or unexported fields
}
MemoryFailedJobs is an in-process FailedJobs store. It is the reference implementation used by tests and single-process apps. Safe for concurrent use.
func NewMemoryFailedJobs ¶
func NewMemoryFailedJobs(requeue RequeueFunc) *MemoryFailedJobs
NewMemoryFailedJobs returns an empty store. requeue is invoked by Retry to push the job back onto its queue; pass nil for a forget-only store.
func (*MemoryFailedJobs) Forget ¶
func (m *MemoryFailedJobs) Forget(_ context.Context, id string) error
Forget drops the record without re-enqueueing.
func (*MemoryFailedJobs) Len ¶
func (m *MemoryFailedJobs) Len() int
Len reports the number of stored failed jobs (primarily for tests).
func (*MemoryFailedJobs) List ¶
func (m *MemoryFailedJobs) List(_ context.Context) ([]FailedJob, error)
List returns stored failed jobs newest-first.
func (*MemoryFailedJobs) Record ¶
func (m *MemoryFailedJobs) Record(j queue.Job, queueName string, err error)
Record stores a dead-lettered job. Call it from queue.Worker.OnFailed. queueName is the logical queue label; errMsg is the last handler error. The store id defaults to the job's ID.
func (*MemoryFailedJobs) Retry ¶
func (m *MemoryFailedJobs) Retry(ctx context.Context, id string) error
Retry re-enqueues the job via the configured RequeueFunc and removes it from the store. If requeue fails the record is kept so the action can be retried, and the error is returned.
func (*MemoryFailedJobs) WithStats ¶
func (m *MemoryFailedJobs) WithStats(s *Stats) *MemoryFailedJobs
WithStats wires a Stats so successful Retry calls bump OnRetried for the job's queue. Returns the receiver for chaining.
type QueueStat ¶
type QueueStat struct {
Queue string `json:"queue"`
Pushed uint64 `json:"pushed"`
Processed uint64 `json:"processed"`
Failed uint64 `json:"failed"`
Retried uint64 `json:"retried"`
Pending int64 `json:"pending"` // Pushed-Processed-Failed, floored at 0
Throughput float64 `json:"throughput"` // processed jobs/sec over the window
}
QueueStat is an immutable snapshot of one logical queue's counters, produced by Stats.Snapshot. Throughput is jobs processed per second averaged over the rolling window.
type RequeueFunc ¶
RequeueFunc re-enqueues a job during Retry. It typically wraps a Queue.Push. A nil RequeueFunc makes Retry forget-only (it still removes the record but performs no enqueue), which is occasionally useful in read-only dashboards.
type Sampler ¶
type Sampler struct {
// contains filtered or unexported fields
}
Sampler periodically reads an Inspector's processed counts and derives a jobs/min rate per queue over a sliding window. It runs a single background goroutine that must be stopped with Stop to avoid a leak.
func NewSampler ¶
NewSampler builds a Sampler reading insp every interval and reporting a jobs/min rate averaged over window. Call Start to launch the goroutine and Stop to shut it down. interval and window default to 5s / 1m when <= 0.
func (*Sampler) Rate ¶
Rate returns the most recent jobs/min reading for queue, or 0 if none has been recorded.
func (*Sampler) Sample ¶
func (s *Sampler) Sample()
Sample forces an immediate reading. It is exported so tests can drive the sampler deterministically without waiting on the ticker.
func (*Sampler) Start ¶
Start launches the sampling goroutine. It returns the receiver for chaining. Start is idempotent: a second call is a no-op.
func (*Sampler) Stop ¶
func (s *Sampler) Stop()
Stop signals the sampling goroutine to exit and blocks until it has. It is safe to call multiple times and safe to call when Start was never invoked (it then returns promptly instead of blocking on a goroutine that will never close done).
func (*Sampler) Window ¶
func (s *Sampler) Window(queue string) []ThroughputSample
Window returns a copy of the retained samples for queue (oldest first).
type Stats ¶
type Stats struct {
// contains filtered or unexported fields
}
Stats is a concurrency-safe collector of per-queue counters plus a rolling throughput window. The zero value is not usable; call NewStats.
A "queue" here is any caller-chosen label (queue.Job carries no queue name of its own), letting a single Stats track several logical queues fed by distinct workers.
func (*Stats) OnFailed ¶
OnFailed records one job that exhausted its retries and was dead- lettered. Wire it from queue.Worker.OnFailed.
func (*Stats) OnProcessed ¶
OnProcessed records one successfully handled job and advances the throughput window. Call it from the worker after a handler returns nil.
func (*Stats) OnPushed ¶
OnPushed records that one job was enqueued onto queue. Call it from the dispatch path (e.g. wrapping queue.Dispatch).
func (*Stats) OnRetried ¶
OnRetried records one job that failed an attempt and was re-queued (i.e. a Nack with attempts remaining), or one re-enqueued from the failed list. It is informational and does not feed throughput.