worker

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: MPL-2.0 Imports: 46 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute prioritized tasks concurrently, backed by a TaskManager with a worker pool and a priority queue.

Breaking changes (January 2026)

  • Stop() removed. Use StopGraceful(ctx) or StopNow().
  • Local result streaming uses SubscribeResults(buffer); GetResults() is now a compatibility shim and the legacy local StreamResults() is removed (gRPC StreamResults remains).
  • RegisterTasks now returns an error.
  • Task.Execute replaces Fn in examples.
  • NewGRPCServer requires a handler map.
  • Rate limiting is deterministic: burst is min(maxWorkers, maxTasks) and ExecuteTask uses the shared limiter.
  • gRPC durable tasks use RegisterDurableTasks and the new DurableTask message.
  • When a durable backend is configured, use RegisterDurableTask(s) instead of RegisterTask(s).
  • DurableBackend now requires Extend (lease renewal support for custom backends).

Features

  • Task prioritization: tasks are scheduled by priority.
  • Concurrent execution: tasks run in a worker pool with strict rate limiting.
  • Middleware: wrap the TaskManager for logging/metrics, etc.
  • Results: fan-out subscriptions via SubscribeResults.
  • Cancellation: cancel tasks before or during execution.
  • Retries: exponential backoff with capped delays.
  • Durability: optional Redis-backed durable task queue (at-least-once, lease-based).

Admin UI

The admin UI is a Next.js app that connects to the worker admin gateway over HTTP/JSON with mTLS. For local setup and environment variables, see:

  • docs/admin-ui.md
  • PRD-admin-service.md (API contract + gateway design)

Local stack:

./scripts/gen-admin-certs.sh
docker compose -f compose.admin.yaml up --build

Job event history can be persisted across restarts by configuring the worker service file-backed store (WORKER_JOB_EVENT_DIR); see the admin UI docs.

Architecture

flowchart LR
    Client[Client code] -->|register tasks| TaskManager
    TaskManager --> Queue[Priority Queue]
    Queue -->|dispatch| Worker1[Worker]
    Queue -->|dispatch| WorkerN[Worker]
    Worker1 --> Results[Result Broadcaster]
    WorkerN --> Results

gRPC Service

go-worker exposes its functionality over gRPC through the WorkerService. The service allows clients to register tasks, stream results, cancel running tasks and query their status.

Handlers and Payloads

The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.

Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function. For durable tasks, use RegisterDurableTasks with the DurableTask message (the payload is still an Any).

handlers := map[string]worker.HandlerSpec{
    "create_user": {
        Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
        Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
            p := payload.(*workerpb.CreateUserPayload)
            return &workerpb.CreateUserResponse{UserId: "1234"}, nil
        },
    },
}

srv := worker.NewGRPCServer(tm, handlers)

For production, configure TLS credentials and interceptors (logging/auth) on the gRPC server; see __examples/grpc for a complete setup. For a Redis-backed durable gRPC example, see __examples/grpc_durable.

Queue selection for gRPC tasks is done via metadata (worker.MetadataQueueKey, worker.MetadataWeightKey):

  • queue: named queue (empty means default)
  • weight: integer weight (as string)

Security defaults to follow in production:

  • Use TLS (prefer mTLS) for gRPC; the durable gRPC example uses insecure credentials for local demos only.
  • Scrub payloads and auth metadata from logs; log task IDs or correlation IDs instead of PII.
  • Implement auth via WithGRPCAuth and redact/validate tokens inside interceptors.
Typed handler registry (optional)

For compile-time payload checks in handlers, use the typed registry. It removes the need for payload type assertions inside your handler functions.

registry := worker.NewTypedHandlerRegistry()
_ = worker.AddTypedHandler(registry, "create_user", worker.TypedHandlerSpec[*workerpb.CreateUserPayload]{
    Make: func() *workerpb.CreateUserPayload { return &workerpb.CreateUserPayload{} },
    Fn: func(ctx context.Context, payload *workerpb.CreateUserPayload) (any, error) {
        return &workerpb.CreateUserResponse{UserId: "1234"}, nil
    },
})

srv := worker.NewGRPCServer(tm, registry.Handlers())
Durable gRPC client (example)

Use RegisterDurableTasks for persisted tasks (payload is still Any). Results stream is shared with non-durable tasks.

payload, _ := anypb.New(&workerpb.SendEmailPayload{
    To:      "ops@example.com",
    Subject: "Hello durable gRPC",
    Body:    "Persisted task",
})

resp, err := client.RegisterDurableTasks(ctx, &workerpb.RegisterDurableTasksRequest{
    Tasks: []*workerpb.DurableTask{
        {
            Name:           "send_email",
            Payload:        payload,
            IdempotencyKey: "durable:send_email:ops@example.com",
        },
    },
})
if err != nil {
    log.Fatal(err)
}
Authorization hook

You can enforce authentication/authorization at the gRPC boundary with WithGRPCAuth. Return a gRPC status error to control the response code (e.g., Unauthenticated or PermissionDenied).

auth := func(ctx context.Context, method string, _ any) error {
 md, _ := metadata.FromIncomingContext(ctx)
 values := md.Get("authorization")
 if len(values) == 0 {
  return status.Error(codes.Unauthenticated, "missing token")
 }

 token := strings.TrimSpace(strings.TrimPrefix(values[0], "Bearer "))
 if token != "expected-token" {
  return status.Error(codes.Unauthenticated, "missing or invalid token")
 }

 return nil
}

srv := worker.NewGRPCServer(tm, handlers, worker.WithGRPCAuth(auth))

Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for task execution or cancel tasks explicitly.

API Example (gRPC)

tm := worker.NewTaskManagerWithDefaults(context.Background())
handlers := map[string]worker.HandlerSpec{
    "create_user": {
        Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
        Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
            p := payload.(*workerpb.CreateUserPayload)
            return &workerpb.CreateUserResponse{UserId: "1234"}, nil
        },
    },
}

srv := worker.NewGRPCServer(tm, handlers)

gs := grpc.NewServer()
workerpb.RegisterWorkerServiceServer(gs, srv)
// listen and serve ...

client := workerpb.NewWorkerServiceClient(conn)

// register a task with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
    Username: "newuser",
    Email:    "newuser@example.com",
})
if err != nil {
    log.Fatal(err)
}

_, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
    Tasks: []*workerpb.Task{
        {
            Name:           "create_user",
            Payload:        payload,
            CorrelationId:  uuid.NewString(),
            IdempotencyKey: "create_user:newuser@example.com",
            Metadata:       map[string]string{"source": "api_example", "role": "admin"},
        },
    },
})

// cancel by id
_, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"})

// get task information
res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"})
fmt.Println(res.Status)

API Usage Examples

Quick Start
tm := worker.NewTaskManager(context.Background(), 2, 10, 5, 30*time.Second, time.Second, 3)

task := &worker.Task{
    ID:       uuid.New(),
    Priority: 1,
    Ctx:      context.Background(),
    Execute:  func(ctx context.Context, _ ...any) (any, error) { return "hello", nil },
}

if err := tm.RegisterTask(context.Background(), task); err != nil {
    log.Fatal(err)
}

results, cancel := tm.SubscribeResults(1)
res := <-results
cancel()

fmt.Println(res.Result)
Result backpressure

By default, full subscriber buffers drop new results. You can change the policy:

tm.SetResultsDropPolicy(worker.DropOldest)

GetResults() remains as a compatibility shim and returns a channel with a default buffer. Prefer SubscribeResults(buffer) so you can control buffering and explicitly unsubscribe.

Initialization

Create a new TaskManager by calling the NewTaskManager() function with the following parameters:

  • ctx is the base context for the task manager (used for shutdown and derived task contexts)
  • maxWorkers is the number of workers to start. If <= 0, it will default to the number of available CPUs
  • maxTasks is the maximum number of queued tasks, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second. If <= 0, rate limiting is disabled (the limiter uses a burst size of min(maxWorkers, maxTasks) for deterministic throttling)
  • timeout is the default timeout for tasks, defaults to 5 minutes
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 30*time.Second, 1*time.Second, 3)
Durable backend (Redis)

Durable tasks use a separate DurableTask type and a handler registry keyed by name. The default encoding is protobuf via ProtoDurableCodec. When a durable backend is enabled, RegisterTask/RegisterTasks are disabled in favor of RegisterDurableTask(s). See __examples/durable_redis for a runnable example.

client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})
if err != nil {
    log.Fatal(err)
}
defer client.Close()

backend, err := worker.NewRedisDurableBackend(client)
if err != nil {
    log.Fatal(err)
}

handlers := map[string]worker.DurableHandlerSpec{
    "send_email": {
        Make: func() proto.Message { return &workerpb.SendEmailRequest{} },
        Fn: func(ctx context.Context, payload proto.Message) (any, error) {
            req := payload.(*workerpb.SendEmailRequest)
            // process request
            return &workerpb.SendEmailResponse{MessageId: "msg-1"}, nil
        },
    },
}

tm := worker.NewTaskManagerWithOptions(
    context.Background(),
    worker.WithDurableBackend(backend),
    worker.WithDurableHandlers(handlers),
)

err = tm.RegisterDurableTask(context.Background(), worker.DurableTask{
    Handler: "send_email",
    Message: &workerpb.SendEmailRequest{To: "ops@example.com"},
    Retries: 5,
    Queue:   "email",
    Weight:  2,
})
if err != nil {
    log.Fatal(err)
}

Or use the typed durable registry for compile-time checks:

durableRegistry := worker.NewTypedDurableRegistry()
_ = worker.AddTypedDurableHandler(durableRegistry, "send_email", worker.TypedDurableHandlerSpec[*workerpb.SendEmailRequest]{
    Make: func() *workerpb.SendEmailRequest { return &workerpb.SendEmailRequest{} },
    Fn: func(ctx context.Context, payload *workerpb.SendEmailRequest) (any, error) {
        // process request
        return &workerpb.SendEmailResponse{MessageId: "msg-1"}, nil
    },
})

tm := worker.NewTaskManagerWithOptions(
    context.Background(),
    worker.WithDurableBackend(backend),
    worker.WithDurableHandlers(durableRegistry.Handlers()),
)

Defaults: lease is 30s, poll interval is 200ms, Redis dequeue batch is 50, and lease renewal is disabled (configurable via options). Queue weights for durable tasks can be configured with WithRedisDurableQueueWeights, and the default queue via WithRedisDurableDefaultQueue.

Scheduled jobs (cron)

You can register cron-based jobs that enqueue tasks on a schedule. Both 5-field and 6-field (seconds) cron expressions are supported.

tm := worker.NewTaskManagerWithDefaults(context.Background())

err := tm.RegisterCronTask(context.Background(), "hourly-report", "0 * * * *", func(ctx context.Context) (*worker.Task, error) {
 return worker.NewTask(ctx, func(ctx context.Context, _ ...any) (any, error) {
  // do work
  return "ok", nil
 })
})
if err != nil {
 panic(err)
}

For durable backends, use:

_ = tm.RegisterDurableCronTask(context.Background(), "daily-email", "0 0 * * *", func(ctx context.Context) (worker.DurableTask, error) {
 return worker.DurableTask{
  Handler: "send_email",
  Payload: []byte("..."),
 }, nil
})

Operational notes (durable Redis):

  • Key hashing: Redis Lua scripts touch multiple keys; for clustered Redis, all keys must share the same hash slot. The backend auto-wraps the prefix in {} to enforce this (e.g., {go-worker}:ready).
  • DLQ: Failed tasks are pushed to a dead-letter list ({prefix}:dead).
  • DLQ replay: Use the workerctl durable dlq replay command or the __examples/durable_dlq_replay utility (dry-run by default; use --apply/-apply to replay).
  • Multi-node workers: Multiple workers can safely dequeue from the same backend. Lease timeouts handle worker crashes, but tune WithDurableLease for your workload.
  • Lease renewal: enable WithDurableLeaseRenewalInterval for long-running tasks to extend leases while a task executes.
  • Global coordination: optional global rate limiting (WithRedisDurableGlobalRateLimit) and leader lock (WithRedisDurableLeaderLock) can limit dequeue rate or enforce a single active leader.
  • Visibility: Ready/processing queues live in per-queue sorted sets: {prefix}:ready:<queue> and {prefix}:processing:<queue>. Known queues are tracked in {prefix}:queues.
  • Inspect utility: workerctl durable inspect (or __examples/durable_queue_inspect) prints ready/processing/dead counts; use --show-ids --queue=<name> (or -show-ids -queue=<name>) to display IDs.
CLI tooling (workerctl)

Build the CLI:

go build -o workerctl ./cmd/workerctl

Inspect queues:

./workerctl durable inspect --redis-addr localhost:6380 --redis-password supersecret --redis-prefix go-worker --queue default --show-ids --peek 10

List queues:

./workerctl durable queues --with-counts

Requeue specific tasks by ID:

./workerctl durable retry --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --apply

Requeue tasks from a source set (DLQ/ready/processing):

./workerctl durable retry --source dlq --limit 100 --apply
./workerctl durable retry --source ready --from-queue default --limit 50 --apply

Requeue a queue directly (shortcut):

./workerctl durable requeue --queue default --limit 50 --apply

Fetch a task by ID:

./workerctl durable get --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12

Enqueue a durable task from JSON/YAML payload:

./workerctl durable enqueue --handler send_email --queue default --payload '{"to":"ops@example.com","subject":"Hello","body":"Hi"}' --apply
./workerctl durable enqueue --handler send_email --payload-file payload.yaml --payload-format yaml --apply
./workerctl durable enqueue --handler send_email --payload-b64 "$(base64 -w0 payload.bin)" --apply

Note: the payload is stored as raw bytes. JSON/YAML are encoded to JSON bytes. Make sure the bytes match your durable codec (default is protobuf).

Delete a task (and optionally its hash):

./workerctl durable delete --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --apply
./workerctl durable delete --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --delete-hash --apply

Show stats in JSON:

./workerctl durable stats --json
./workerctl durable stats --watch 2s

Pause/resume durable dequeue:

./workerctl durable pause --apply
./workerctl durable resume --apply
./workerctl durable paused

Purge queues (use with care):

./workerctl durable purge --ready --processing --queue default --apply

Dump task metadata (JSON lines, no payloads):

./workerctl durable dump --queue default --ready --limit 100 > dump.jsonl

Export/import queue snapshots (JSONL):

./workerctl durable snapshot export --out snapshot.jsonl --ready --processing --dlq
./workerctl durable snapshot import --in snapshot.jsonl --apply

Replay DLQ items (dry-run by default):

./workerctl durable dlq replay --batch 100 --apply

Use --tls (and --tls-insecure if needed) for secure Redis connections.

Generate shell completion:

./workerctl completion zsh > "${fpath[1]}/_workerctl"
Multi-node coordination (durable Redis)

Durable processing is at-least-once. When multiple nodes consume from the same Redis backend:

  • Lease sizing: set WithDurableLease longer than your worst-case task duration (plus buffer). If a task exceeds its lease, it can be requeued and run again on another node.
  • Lease renewal (optional): set WithDurableLeaseRenewalInterval (less than the lease duration) to extend leases while tasks run.
  • Idempotency: enforce idempotency at the task level (idempotency key + handler-side dedupe) because duplicates are possible on retries and lease expiry.
  • Throughput control: worker count and polling interval are per node. If you need a global rate limit across nodes, enforce it externally or in the handler.
  • Clock skew: Redis uses server time for scores; keep node clocks in sync to avoid uneven dequeue/lease timing.
  • Isolation: use distinct prefixes per environment/region/tenant to avoid cross-talk.

Checklist:

  • Set WithDurableLease above p99 task duration (plus buffer).
  • Enable WithDurableLeaseRenewalInterval for tasks that can exceed the lease duration.
  • Keep task handlers idempotent; always use idempotency keys for external side effects.
  • Tune WithDurablePollInterval based on desired responsiveness vs. Redis load.
  • Scale WithMaxWorkers per node based on CPU and downstream throughput.

Example:

go run __examples/durable_queue_inspect/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker
go run __examples/durable_queue_inspect/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -queue=default -show-ids -peek=5

Sample output:

queue=default ready=3 processing=1
ready=3 processing=1 dead=0
ready IDs: 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12, 9b18d5f2-3b7f-4d7a-9dd1-1bb1a3a56c55

DLQ replay example (dry-run by default):

go run __examples/durable_dlq_replay/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -batch=100
go run __examples/durable_dlq_replay/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -batch=100 -apply

Optional retention can be configured to prevent unbounded task registry growth:

tm.SetRetentionPolicy(worker.RetentionPolicy{
    TTL:        24 * time.Hour,
    MaxEntries: 100000,
})

Retention applies only to terminal tasks (completed/failed/cancelled/etc). Running or queued tasks are never evicted. Cleanup is best-effort: it runs on task completion and periodically when TTL > 0. If CleanupInterval is unset, the default interval is clamp(TTL/2, 1s, 1m). If MaxEntries is lower than the number of active tasks, the registry may exceed the limit until tasks finish.

Task lifecycle hooks can be configured for structured logging or tracing:

tm.SetHooks(worker.TaskHooks{
    OnQueued: func(task *worker.Task) {
        // log enqueue
    },
    OnStart: func(task *worker.Task) {
        // log start
    },
    OnFinish: func(task *worker.Task, status worker.TaskStatus, _ any, err error) {
        // log completion
        _ = err
        _ = status
    },
})

Tracing hooks can be configured with a tracer implementation:

tm.SetTracer(myTracer)

See __examples/tracing for a minimal logger-based tracer. See __examples/otel_tracing for OpenTelemetry tracing with a stdout exporter.

OpenTelemetry metrics

To export metrics with OpenTelemetry, configure a meter provider and pass it to the task manager:

exporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint())
if err != nil {
    log.Fatal(err)
}

reader := sdkmetric.NewPeriodicReader(exporter)
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
defer func() {
    _ = mp.Shutdown(context.Background())
}()

if err := tm.SetMeterProvider(mp); err != nil {
    log.Fatal(err)
}

See __examples/otel_metrics for a complete runnable example. See __examples/otel_metrics_otlp for an OTLP/HTTP exporter example.

Emitted metrics:

  • tasks_scheduled_total
  • tasks_running
  • tasks_completed_total
  • tasks_failed_total
  • tasks_cancelled_total
  • tasks_retried_total
  • results_dropped_total
  • queue_depth
  • task_latency_seconds
Registering Tasks

Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.

id := uuid.New()

task := &worker.Task{
    ID:          id,
    Name:        "Some task",
    Description: "Here goes the description of the task",
    Priority:    10,
    Queue:       "critical",
    Weight:      2,
    Ctx:         context.Background(),
    Execute: func(ctx context.Context, _ ...any) (any, error) {
        time.Sleep(time.Second)
        return fmt.Sprintf("task %s executed", id), nil
    },
    Retries:    3,
    RetryDelay: 2 * time.Second,
}

task2 := &worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Queue:    "default",
    Weight:   1,
    Ctx:      context.Background(),
    Execute:  func(ctx context.Context, _ ...any) (any, error) { return "Hello, World!", nil },
}

if err := tm.RegisterTasks(context.Background(), task, task2); err != nil {
    log.Fatal(err)
}

Queues and weights:

  • Queue groups tasks for scheduling. Empty means default.
  • Weight is a per-task scheduling hint within a queue (higher weight runs earlier among equal priorities).
  • Queue weights control inter-queue share via WithQueueWeights; change the default queue via WithDefaultQueue.

For gRPC, set metadata["queue"] and metadata["weight"] (string) on Task/DurableTask.

Scheduling Tasks

Schedule tasks for later execution with RunAt, RegisterTaskAt, or RegisterTaskAfter.

task, _ := worker.NewTask(context.Background(), func(ctx context.Context, _ ...any) (any, error) {
    return "delayed", nil
})

_ = tm.RegisterTaskAt(context.Background(), task, time.Now().Add(30*time.Second))
// or
_ = tm.RegisterTaskAfter(context.Background(), task, 30*time.Second)

Durable tasks can also be delayed by setting RunAt before RegisterDurableTask.

Stopping the Task Manager

Use StopGraceful to stop accepting new tasks and wait for completion, or StopNow to cancel tasks immediately.

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_ = tm.StopGraceful(ctx)
// or
// tm.StopNow()
Results

Subscribe to results with a dedicated channel per subscriber.

results, cancel := tm.SubscribeResults(10)

ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()

_ = tm.Wait(ctx)
cancel()

for res := range results {
    fmt.Println(res)
}
Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

_ = tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAll() method of the TaskManager struct.

tm.CancelAll()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

srv := worker.RegisterMiddleware[worker.Service](tm,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/google/uuid"
    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
)

func main() {
    tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 3*time.Second, 30*time.Second, 3)

    var srv worker.Service = worker.RegisterMiddleware[worker.Service](tm,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
        },
    )

    task := &worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Ctx:      context.Background(),
        Execute: func(ctx context.Context, _ ...any) (any, error) {
            return 2 + 5, nil
        },
    }

    _ = srv.RegisterTasks(context.Background(), task)

    results, cancel := srv.SubscribeResults(10)
    defer cancel()

    ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancelWait()
    _ = srv.Wait(ctx)

    for res := range results {
        fmt.Println(res)
    }
}

Versioning

This project follows Semantic Versioning.

Contribution Guidelines

We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.

Feature Requests

To propose new ideas, open an issue using the Feature request template.

Newcomer-Friendly Issues

Issues labeled good first issue or help wanted are ideal starting points for new contributors.

Release Notes

See CHANGELOG for the history of released versions.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// JobSourceGitTag builds a job from a git tag (default).
	JobSourceGitTag = "git_tag"
	// JobSourceTarballURL builds a job from an HTTP(S) tarball URL.
	JobSourceTarballURL = "tarball_url"
	// JobSourceTarballPath builds a job from a local tarball path.
	JobSourceTarballPath = "tarball_path"
)
View Source
const (
	// MetadataQueueKey is the metadata key for the queue name.
	MetadataQueueKey = "queue"
	// MetadataWeightKey is the metadata key for the task weight.
	MetadataWeightKey = "weight"
)
View Source
const (
	// ContextDeadlineReached means the context is past its deadline.
	ContextDeadlineReached = TaskStatus(1)
	// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
	RateLimited = TaskStatus(2)
	// Cancelled means `CancelTask` was invoked and the `Task` was cancelled.
	Cancelled = TaskStatus(3)
	// Failed means the `Task` failed.
	Failed = TaskStatus(4)
	// Queued means the `Task` is queued.
	Queued = TaskStatus(5)
	// Running means the `Task` is running.
	Running = TaskStatus(6)
	// Invalid means the `Task` is invalid.
	Invalid = TaskStatus(7)
	// Completed means the `Task` is completed.
	Completed = TaskStatus(8)
)

TaskStatus values.

View Source
const (
	// DefaultMaxTasks is the default maximum number of tasks that can be executed at once.
	DefaultMaxTasks = 10
	// DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second.
	DefaultTasksPerSecond = 5
	// DefaultQueueName is the default queue name when none is provided.
	DefaultQueueName = "default"
	// DefaultQueueWeight is the default scheduling weight for a queue.
	DefaultQueueWeight = 1
	// DefaultTaskWeight is the default scheduling weight for a task.
	DefaultTaskWeight = 1
	// DefaultTimeout is the default timeout for tasks.
	DefaultTimeout = 5 * time.Minute
	// DefaultRetryDelay is the default delay between retries.
	DefaultRetryDelay = 1 * time.Second
	// DefaultMaxRetries is the default maximum number of retries.
	DefaultMaxRetries = 3
	// ErrMsgContextDone is the error message used when the context is done.
	ErrMsgContextDone = "context done"
)
View Source
const JobHandlerName = "job_runner"

JobHandlerName is the durable handler name for containerized jobs.

Variables

View Source
var (
	// ErrAdminBackendUnavailable indicates the admin backend is not configured.
	ErrAdminBackendUnavailable = ewrap.New("admin backend unavailable")
	// ErrAdminUnsupported indicates the backend does not support the admin operation.
	ErrAdminUnsupported = ewrap.New("admin operation unsupported")
	// ErrAdminQueueNotFound indicates the queue was not found.
	ErrAdminQueueNotFound = ewrap.New("admin queue not found")
	// ErrAdminQueueNameRequired indicates a queue name is required.
	ErrAdminQueueNameRequired = ewrap.New("admin queue name is required")
	// ErrAdminQueueWeightInvalid indicates a queue weight is invalid.
	ErrAdminQueueWeightInvalid = ewrap.New("admin queue weight is invalid")
	// ErrAdminQueuePauseUnsupported indicates pausing queues is not supported.
	ErrAdminQueuePauseUnsupported = ewrap.New("admin queue pause unsupported")
	// ErrAdminDLQFilterTooLarge indicates the DLQ is too large for filtered queries.
	ErrAdminDLQFilterTooLarge = ewrap.New("DLQ too large for filtered query")
	// ErrAdminDLQEntryIDRequired indicates a DLQ id is required.
	ErrAdminDLQEntryIDRequired = ewrap.New("admin DLQ id is required")
	// ErrAdminDLQEntryNotFound indicates the DLQ entry was not found.
	ErrAdminDLQEntryNotFound = ewrap.New("admin DLQ entry not found")
	// ErrAdminReplayIDsRequired indicates replay-by-id requires at least one id.
	ErrAdminReplayIDsRequired = ewrap.New("admin replay ids are required")
	// ErrAdminReplayIDsTooLarge indicates too many ids were provided.
	ErrAdminReplayIDsTooLarge = ewrap.New("admin replay ids limit exceeded")
	// ErrAdminReplayLimitExceeded indicates replay limit exceeds policy.
	ErrAdminReplayLimitExceeded = ewrap.New("admin replay limit exceeds policy")
	// ErrAdminScheduleRunRateLimited indicates schedule run cap was exceeded.
	ErrAdminScheduleRunRateLimited = ewrap.New("admin schedule run rate limit exceeded")
	// ErrAdminApprovalRequired indicates a policy approval token is required.
	ErrAdminApprovalRequired = ewrap.New("admin approval token is required")
	// ErrAdminApprovalInvalid indicates a policy approval token is invalid.
	ErrAdminApprovalInvalid = ewrap.New("admin approval token is invalid")
	// ErrAdminScheduleNameRequired indicates a schedule name is required.
	ErrAdminScheduleNameRequired = ewrap.New("admin schedule name is required")
	// ErrAdminScheduleSpecRequired indicates a schedule spec is required.
	ErrAdminScheduleSpecRequired = ewrap.New("admin schedule spec is required")
	// ErrAdminScheduleNotFound indicates the schedule was not found.
	ErrAdminScheduleNotFound = ewrap.New("admin schedule not found")
	// ErrAdminScheduleFactoryMissing indicates a factory was not registered.
	ErrAdminScheduleFactoryMissing = ewrap.New("admin schedule factory missing")
	// ErrAdminScheduleDurableMismatch indicates durable flag mismatched.
	ErrAdminScheduleDurableMismatch = ewrap.New("admin schedule durable mismatch")
	// ErrAdminJobNameRequired indicates a job name is required.
	ErrAdminJobNameRequired = ewrap.New("admin job name is required")
	// ErrAdminJobRepoRequired indicates a job repo is required.
	ErrAdminJobRepoRequired = ewrap.New("admin job repo is required")
	// ErrAdminJobTagRequired indicates a job tag is required.
	ErrAdminJobTagRequired = ewrap.New("admin job tag is required")
	// ErrAdminJobSourceInvalid indicates the job source is invalid.
	ErrAdminJobSourceInvalid = ewrap.New("admin job source is invalid")
	// ErrAdminJobTarballURLRequired indicates a tarball URL is required.
	ErrAdminJobTarballURLRequired = ewrap.New("admin job tarball url is required")
	// ErrAdminJobTarballPathRequired indicates a tarball path is required.
	ErrAdminJobTarballPathRequired = ewrap.New("admin job tarball path is required")
	// ErrAdminJobTarballSHAInvalid indicates tarball SHA256 is invalid.
	ErrAdminJobTarballSHAInvalid = ewrap.New("admin job tarball sha256 is invalid")
	// ErrAdminJobCommandRequired indicates a job command is required.
	ErrAdminJobCommandRequired = ewrap.New("admin job command is required")
	// ErrAdminJobNotFound indicates the job was not found.
	ErrAdminJobNotFound = ewrap.New("admin job not found")
)
View Source
var (
	// ErrInvalidTaskID is returned when a task has an invalid ID.
	ErrInvalidTaskID = ewrap.New("invalid task id")
	// ErrInvalidTaskFunc is returned when a task has an invalid function.
	ErrInvalidTaskFunc = ewrap.New("invalid task function")
	// ErrInvalidTaskContext is returned when a task has an invalid context.
	ErrInvalidTaskContext = ewrap.New("invalid task context")
	// ErrTaskNotFound is returned when a task is not found.
	ErrTaskNotFound = ewrap.New("task not found")
	// ErrTaskTimeout is returned when a task times out.
	ErrTaskTimeout = ewrap.New("task timeout")
	// ErrTaskCancelled is returned when a task is cancelled.
	ErrTaskCancelled = ewrap.New("task cancelled")
	// ErrTaskAlreadyStarted is returned when a task is already started.
	ErrTaskAlreadyStarted = ewrap.New("task already started")
	// ErrTaskCompleted is returned when a task is already completed.
	ErrTaskCompleted = ewrap.New("task completed")
	// ErrDurableLeaseNotFound is returned when a durable lease cannot be renewed.
	ErrDurableLeaseNotFound = ewrap.New("durable lease not found")
)

Errors returned by the TaskManager.

View Source
var (
	// ErrHandlerNameRequired indicates a missing handler name during registration.
	ErrHandlerNameRequired = ewrap.New("handler name is required")
	// ErrHandlerSpecInvalid indicates an invalid handler spec during registration.
	ErrHandlerSpecInvalid = ewrap.New("handler spec is invalid")
	// ErrHandlerAlreadyRegistered indicates a duplicate handler name during registration.
	ErrHandlerAlreadyRegistered = ewrap.New("handler already registered")
	// ErrHandlerPayloadTypeMismatch indicates a payload type mismatch during handler invocation.
	ErrHandlerPayloadTypeMismatch = ewrap.New("handler payload type mismatch")
	// ErrHandlerRegistryNil indicates a nil handler registry during registration.
	ErrHandlerRegistryNil = ewrap.New("handler registry is nil")
)

Functions

func AddTypedDurableHandler added in v0.1.9

func AddTypedDurableHandler[T proto.Message](r *TypedDurableRegistry, name string, spec TypedDurableHandlerSpec[T]) error

AddTypedDurableHandler registers a typed durable handler spec into the registry.

func AddTypedHandler added in v0.1.9

func AddTypedHandler[T protoreflect.ProtoMessage](r *TypedHandlerRegistry, name string, spec TypedHandlerSpec[T]) error

AddTypedHandler registers a typed handler spec into the registry.

func IsTarballSource added in v0.2.2

func IsTarballSource(source string) bool

IsTarballSource returns true when the job source expects a tarball.

func JobPayloadFromSpec added in v0.2.2

func JobPayloadFromSpec(spec AdminJobSpec) (*structpb.Struct, error)

JobPayloadFromSpec converts a job spec into a protobuf Struct payload.

func LoadAdminMTLSConfig added in v0.2.2

func LoadAdminMTLSConfig(certFile, keyFile, caFile string) (*tls.Config, error)

LoadAdminMTLSConfig loads a server TLS config that enforces mTLS.

func NewAdminGatewayServer added in v0.2.2

func NewAdminGatewayServer(cfg AdminGatewayConfig) (*http.Server, error)

NewAdminGatewayServer builds an HTTPS gateway that proxies to AdminService.

func NormalizeJobSource added in v0.2.2

func NormalizeJobSource(source string) string

NormalizeJobSource returns a valid job source, defaulting to git tag.

func RegisterMiddleware

func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T

RegisterMiddleware registers middlewares to the provided service.

Types

type AdminActionCounters added in v0.2.2

type AdminActionCounters struct {
	Pause  int64
	Resume int64
	Replay int64
}

AdminActionCounters tracks admin action counts.

type AdminAuditEvent added in v0.2.2

type AdminAuditEvent struct {
	At          time.Time         `json:"at"`
	Actor       string            `json:"actor"`
	RequestID   string            `json:"request_id"`
	Action      string            `json:"action"`
	Target      string            `json:"target"`
	Status      string            `json:"status"`
	PayloadHash string            `json:"payload_hash"`
	Detail      string            `json:"detail"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

AdminAuditEvent describes an admin mutation audit record.

type AdminAuditEventFilter added in v0.2.2

type AdminAuditEventFilter struct {
	Action string
	Target string
	Limit  int
}

AdminAuditEventFilter filters admin audit records.

type AdminAuditEventPage added in v0.2.2

type AdminAuditEventPage struct {
	Events []AdminAuditEvent
}

AdminAuditEventPage represents admin audit records.

type AdminBackend added in v0.2.2

type AdminBackend interface {
	AdminOverview(ctx context.Context) (AdminOverview, error)
	// contains filtered or unexported methods
}

AdminBackend provides admin data and actions for a backend.

type AdminCoordination added in v0.2.2

type AdminCoordination struct {
	GlobalRateLimit string
	LeaderLock      string
	Lease           string
	Paused          bool
}

AdminCoordination describes coordination state for durable dequeue.

type AdminDLQEntry added in v0.2.2

type AdminDLQEntry struct {
	ID       string
	Queue    string
	Handler  string
	Attempts int
	AgeMs    int64
}

AdminDLQEntry represents a DLQ entry.

type AdminDLQEntryDetail added in v0.2.2

type AdminDLQEntryDetail struct {
	ID          string
	Queue       string
	Handler     string
	Attempts    int
	AgeMs       int64
	FailedAtMs  int64
	UpdatedAtMs int64
	LastError   string
	PayloadSize int64
	Metadata    map[string]string
}

AdminDLQEntryDetail represents a detailed DLQ entry.

type AdminDLQFilter added in v0.2.2

type AdminDLQFilter struct {
	Limit   int
	Offset  int
	Queue   string
	Handler string
	Query   string
}

AdminDLQFilter controls DLQ listing.

type AdminDLQPage added in v0.2.2

type AdminDLQPage struct {
	Entries []AdminDLQEntry
	Total   int64
}

AdminDLQPage represents a page of DLQ entries.

type AdminGatewayConfig added in v0.2.2

type AdminGatewayConfig struct {
	GRPCAddr string
	HTTPAddr string

	// GRPCTLS is optional; when nil, the gateway dials gRPC without TLS.
	GRPCTLS *tls.Config

	// TLSCertFile, TLSKeyFile, and TLSCAFile are required for mTLS.
	TLSCertFile    string
	TLSKeyFile     string
	TLSCAFile      string
	JobTarballDir  string
	AuditExportMax int

	ReadTimeout   time.Duration
	WriteTimeout  time.Duration
	Observability *AdminObservability
}

AdminGatewayConfig configures the admin HTTP gateway.

type AdminGuardrails added in v0.2.2

type AdminGuardrails struct {
	ReplayLimitMax    int
	ReplayIDsMax      int
	ScheduleRunMax    int
	ScheduleRunWindow time.Duration
	RequireApproval   bool
	ApprovalToken     string
}

AdminGuardrails configures safety limits for high-impact admin operations.

type AdminJob added in v0.2.2

type AdminJob struct {
	AdminJobSpec
	CreatedAt time.Time
	UpdatedAt time.Time
}

AdminJob represents a persisted job definition.

type AdminJobEvent added in v0.2.2

type AdminJobEvent struct {
	TaskID       string            `json:"task_id"`
	Name         string            `json:"name"`
	Status       string            `json:"status"`
	Queue        string            `json:"queue"`
	Repo         string            `json:"repo"`
	Tag          string            `json:"tag"`
	Path         string            `json:"path"`
	Dockerfile   string            `json:"dockerfile"`
	Command      string            `json:"command"`
	ScheduleName string            `json:"schedule_name"`
	ScheduleSpec string            `json:"schedule_spec"`
	StartedAt    time.Time         `json:"started_at"`
	FinishedAt   time.Time         `json:"finished_at"`
	DurationMs   int64             `json:"duration_ms"`
	Result       string            `json:"result"`
	Error        string            `json:"error"`
	Metadata     map[string]string `json:"metadata"`
}

AdminJobEvent describes a containerized job execution event.

type AdminJobEventFilter added in v0.2.2

type AdminJobEventFilter struct {
	Name  string
	Limit int
}

AdminJobEventFilter filters job execution events.

type AdminJobEventPage added in v0.2.2

type AdminJobEventPage struct {
	Events []AdminJobEvent
}

AdminJobEventPage represents job execution events.

type AdminJobEventStore added in v0.2.2

type AdminJobEventStore interface {
	Record(ctx context.Context, event AdminJobEvent) error
	List(ctx context.Context, filter AdminJobEventFilter) (AdminJobEventPage, error)
}

AdminJobEventStore persists and lists admin job events.

type AdminJobSpec added in v0.2.2

type AdminJobSpec struct {
	Name        string
	Description string
	Repo        string
	Tag         string
	Source      string
	TarballURL  string
	TarballPath string
	TarballSHA  string
	Path        string
	Dockerfile  string
	Command     []string
	Env         []string
	Queue       string
	Retries     int
	Timeout     time.Duration
}

AdminJobSpec defines a job configuration for containerized execution.

func JobSpecFromPayload added in v0.2.2

func JobSpecFromPayload(payload *structpb.Struct) (AdminJobSpec, error)

JobSpecFromPayload decodes a protobuf Struct payload into a job spec.

type AdminObservability added in v0.2.2

type AdminObservability struct {
	// contains filtered or unexported fields
}

AdminObservability collects lightweight admin service metrics.

func NewAdminObservability added in v0.2.2

func NewAdminObservability() *AdminObservability

NewAdminObservability creates an in-memory admin metrics collector.

func (*AdminObservability) Middleware added in v0.2.2

func (collector *AdminObservability) Middleware(next http.Handler) http.Handler

Middleware records per-route HTTP latency and status metrics.

func (*AdminObservability) RecordGRPC added in v0.2.2

func (collector *AdminObservability) RecordGRPC(method string, code codes.Code, duration time.Duration)

RecordGRPC records gRPC method metrics.

func (*AdminObservability) RecordHTTP added in v0.2.2

func (collector *AdminObservability) RecordHTTP(method, route string, statusCode int, duration time.Duration)

RecordHTTP records HTTP route metrics.

func (*AdminObservability) RecordJobOutcome added in v0.2.2

func (collector *AdminObservability) RecordJobOutcome(statusCode string, duration time.Duration)

RecordJobOutcome records a final job status and duration.

func (*AdminObservability) RecordJobQueued added in v0.2.2

func (collector *AdminObservability) RecordJobQueued()

RecordJobQueued increments running gauge when a job run starts.

func (*AdminObservability) Snapshot added in v0.2.2

func (collector *AdminObservability) Snapshot() AdminObservabilitySnapshot

Snapshot returns a consistent view of collected metrics.

func (*AdminObservability) UnaryServerInterceptor added in v0.2.2

func (collector *AdminObservability) UnaryServerInterceptor() grpc.UnaryServerInterceptor

UnaryServerInterceptor records gRPC latency and status metrics.

type AdminObservabilityJobStat added in v0.2.2

type AdminObservabilityJobStat struct {
	Running     int64     `json:"running"`
	Completed   int64     `json:"completed"`
	Failed      int64     `json:"failed"`
	TotalMs     int64     `json:"totalMs"`
	MaxMs       int64     `json:"maxMs"`
	AvgMs       float64   `json:"avgMs"`
	LastCode    string    `json:"lastCode"`
	LastUpdated time.Time `json:"lastUpdated"`
}

AdminObservabilityJobStat describes aggregate job-runner outcomes.

type AdminObservabilityMethodStat added in v0.2.2

type AdminObservabilityMethodStat struct {
	Calls       int64     `json:"calls"`
	Errors      int64     `json:"errors"`
	TotalMs     int64     `json:"totalMs"`
	MaxMs       int64     `json:"maxMs"`
	AvgMs       float64   `json:"avgMs"`
	LastCode    string    `json:"lastCode"`
	LastUpdated time.Time `json:"lastUpdated"`
}

AdminObservabilityMethodStat describes aggregate metrics for an HTTP/gRPC method.

type AdminObservabilitySnapshot added in v0.2.2

type AdminObservabilitySnapshot struct {
	StartedAt time.Time                               `json:"startedAt"`
	UptimeSec int64                                   `json:"uptimeSec"`
	HTTP      map[string]AdminObservabilityMethodStat `json:"http"`
	GRPC      map[string]AdminObservabilityMethodStat `json:"grpc"`
	Jobs      AdminObservabilityJobStat               `json:"jobs"`
}

AdminObservabilitySnapshot is a serializable view of admin metrics.

type AdminOverview added in v0.2.2

type AdminOverview struct {
	ActiveWorkers int
	QueuedTasks   int64
	Queues        int
	AvgLatencyMs  int64
	P95LatencyMs  int64
	Coordination  AdminCoordination
	Actions       AdminActionCounters
}

AdminOverview describes the admin overview snapshot.

type AdminQueueSummary added in v0.2.2

type AdminQueueSummary struct {
	Name       string
	Ready      int64
	Processing int64
	Dead       int64
	Weight     int
	Paused     bool
}

AdminQueueSummary represents queue counts and weights.

type AdminSchedule added in v0.2.2

type AdminSchedule struct {
	Name    string
	Spec    string
	NextRun time.Time
	LastRun time.Time
	Durable bool
	Paused  bool
}

AdminSchedule represents a cron schedule entry.

type AdminScheduleEvent added in v0.2.2

type AdminScheduleEvent struct {
	TaskID     string
	Name       string
	Spec       string
	Durable    bool
	Status     string
	Queue      string
	StartedAt  time.Time
	FinishedAt time.Time
	DurationMs int64
	Result     string
	Error      string
	Metadata   map[string]string
}

AdminScheduleEvent describes a cron schedule execution event.

type AdminScheduleEventFilter added in v0.2.2

type AdminScheduleEventFilter struct {
	Name  string
	Limit int
}

AdminScheduleEventFilter filters schedule events.

type AdminScheduleEventPage added in v0.2.2

type AdminScheduleEventPage struct {
	Events []AdminScheduleEvent
}

AdminScheduleEventPage represents schedule events.

type AdminScheduleFactory added in v0.2.2

type AdminScheduleFactory struct {
	Name    string
	Durable bool
}

AdminScheduleFactory describes a registered schedule factory.

type AdminScheduleSpec added in v0.2.2

type AdminScheduleSpec struct {
	Name    string
	Spec    string
	Durable bool
}

AdminScheduleSpec defines a schedule request.

type CronDurableFactory added in v0.2.1

type CronDurableFactory func(ctx context.Context) (DurableTask, error)

CronDurableFactory builds a durable task when a cron schedule fires.

type CronTaskFactory added in v0.2.1

type CronTaskFactory func(ctx context.Context) (*Task, error)

CronTaskFactory builds a task when a cron schedule fires.

type DurableBackend added in v0.1.8

type DurableBackend interface {
	Enqueue(ctx context.Context, task DurableTask) error
	Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
	Ack(ctx context.Context, lease DurableTaskLease) error
	Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
	Fail(ctx context.Context, lease DurableTaskLease, err error) error
	Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
}

DurableBackend provides persistence and leasing for durable tasks.

type DurableCodec added in v0.1.8

type DurableCodec interface {
	Marshal(msg proto.Message) ([]byte, error)
	Unmarshal(data []byte, msg proto.Message) error
}

DurableCodec marshals and unmarshals durable task payloads.

type DurableHandlerSpec added in v0.1.8

type DurableHandlerSpec struct {
	Make func() proto.Message
	Fn   func(ctx context.Context, payload proto.Message) (any, error)
}

DurableHandlerSpec describes a durable task handler.

func TypedDurableHandler added in v0.1.9

func TypedDurableHandler[T proto.Message](spec TypedDurableHandlerSpec[T]) DurableHandlerSpec

TypedDurableHandler converts a typed spec into the untyped DurableHandlerSpec.

type DurableTask added in v0.1.8

type DurableTask struct {
	ID         uuid.UUID
	Handler    string
	Message    proto.Message
	Payload    []byte
	Priority   int
	RunAt      time.Time
	Queue      string
	Weight     int
	Retries    int
	RetryDelay time.Duration
	Metadata   map[string]string
}

DurableTask represents a task that can be persisted and rehydrated.

type DurableTaskLease added in v0.1.8

type DurableTaskLease struct {
	Task       DurableTask
	LeaseID    string
	Attempts   int
	MaxRetries int
}

DurableTaskLease represents a leased durable task.

type FileJobEventStore added in v0.2.2

type FileJobEventStore struct {
	// contains filtered or unexported fields
}

FileJobEventStore stores job events as JSON files on disk.

func NewFileJobEventStore added in v0.2.2

func NewFileJobEventStore(dir string, opts ...FileJobEventStoreOption) (*FileJobEventStore, error)

NewFileJobEventStore creates a file-backed job event store.

func (*FileJobEventStore) List added in v0.2.2

List returns recent job events.

func (*FileJobEventStore) Record added in v0.2.2

func (s *FileJobEventStore) Record(ctx context.Context, event AdminJobEvent) error

Record appends a job event record.

type FileJobEventStoreOption added in v0.2.2

type FileJobEventStoreOption func(*FileJobEventStore)

FileJobEventStoreOption configures FileJobEventStore behavior.

func WithJobEventStoreCacheTTL added in v0.2.2

func WithJobEventStoreCacheTTL(ttl time.Duration) FileJobEventStoreOption

WithJobEventStoreCacheTTL configures cache TTL for file-backed job events.

func WithJobEventStoreMaxEntries added in v0.2.2

func WithJobEventStoreMaxEntries(maxEntries int) FileJobEventStoreOption

WithJobEventStoreMaxEntries configures max entries retained per key (0 = unbounded).

type GRPCAuthFunc added in v0.1.6

type GRPCAuthFunc func(ctx context.Context, method string, req any) error

GRPCAuthFunc authorizes a gRPC request before handling it. Return a gRPC status error to control the response code.

type GRPCServer added in v0.1.1

type GRPCServer struct {
	// contains filtered or unexported fields
}

GRPCServer implements the generated WorkerServiceServer interface.

func NewGRPCServer added in v0.1.1

func NewGRPCServer(svc Service, handlers map[string]HandlerSpec, opts ...GRPCServerOption) *GRPCServer

NewGRPCServer creates a new gRPC server backed by the provided Service.

func (*GRPCServer) CancelTask added in v0.1.1

CancelTask cancels an active task by its ID.

func (*GRPCServer) CreateSchedule added in v0.2.2

CreateSchedule registers or updates a cron schedule.

func (*GRPCServer) DeleteJob added in v0.2.2

DeleteJob removes a job definition.

func (*GRPCServer) DeleteSchedule added in v0.2.2

DeleteSchedule removes a cron schedule.

func (*GRPCServer) GetDLQEntry added in v0.2.2

GetDLQEntry returns a detailed DLQ entry by id.

func (*GRPCServer) GetHealth added in v0.2.2

GetHealth returns build and runtime info for the admin service.

func (*GRPCServer) GetJob added in v0.2.2

GetJob returns a job definition by name.

func (*GRPCServer) GetOverview added in v0.2.2

GetOverview returns the admin overview snapshot.

func (*GRPCServer) GetQueue added in v0.2.2

GetQueue returns a queue summary by name.

func (*GRPCServer) GetTask added in v0.1.1

GetTask returns information about a task by its ID.

func (*GRPCServer) ListAuditEvents added in v0.2.2

ListAuditEvents returns recent admin mutation audit records.

func (*GRPCServer) ListDLQ added in v0.2.2

ListDLQ returns DLQ entries.

func (*GRPCServer) ListJobEvents added in v0.2.2

ListJobEvents returns recent job execution events.

func (*GRPCServer) ListJobs added in v0.2.2

ListJobs returns admin job definitions.

func (*GRPCServer) ListQueues added in v0.2.2

ListQueues returns queue summaries.

func (*GRPCServer) ListScheduleEvents added in v0.2.2

ListScheduleEvents returns recent cron schedule execution events.

func (*GRPCServer) ListScheduleFactories added in v0.2.2

ListScheduleFactories returns registered schedule factories.

func (*GRPCServer) ListSchedules added in v0.2.2

ListSchedules returns cron schedule summaries.

func (*GRPCServer) PauseDequeue added in v0.2.2

PauseDequeue pauses durable dequeue.

func (*GRPCServer) PauseQueue added in v0.2.2

PauseQueue pauses or resumes a queue.

func (*GRPCServer) PauseSchedule added in v0.2.2

PauseSchedule pauses or resumes a cron schedule.

func (*GRPCServer) PauseSchedules added in v0.2.2

PauseSchedules pauses or resumes all cron schedules.

func (*GRPCServer) RegisterDurableTasks added in v0.1.8

RegisterDurableTasks registers one or more durable tasks with the underlying service.

func (*GRPCServer) RegisterTasks added in v0.1.1

RegisterTasks registers one or more tasks with the underlying service.

func (*GRPCServer) ReplayDLQ added in v0.2.2

ReplayDLQ replays DLQ entries.

func (*GRPCServer) ReplayDLQByID added in v0.2.2

ReplayDLQByID replays DLQ entries by ID.

func (*GRPCServer) ResetQueueWeight added in v0.2.2

ResetQueueWeight resets a queue weight to default.

func (*GRPCServer) ResumeDequeue added in v0.2.2

ResumeDequeue resumes durable dequeue.

func (*GRPCServer) RunJob added in v0.2.2

RunJob enqueues a job immediately.

func (*GRPCServer) RunSchedule added in v0.2.2

RunSchedule triggers a cron schedule immediately.

func (*GRPCServer) StreamResults added in v0.1.1

StreamResults streams task results back to the client.

func (*GRPCServer) UpdateQueueWeight added in v0.2.2

UpdateQueueWeight updates a queue weight.

func (*GRPCServer) UpsertJob added in v0.2.2

UpsertJob creates or updates a job definition.

type GRPCServerOption added in v0.1.6

type GRPCServerOption func(*GRPCServer)

GRPCServerOption configures the gRPC server.

func WithAdminBackend added in v0.2.2

func WithAdminBackend(backend AdminBackend) GRPCServerOption

WithAdminBackend sets the admin backend for AdminService.

func WithAdminGuardrails added in v0.2.2

func WithAdminGuardrails(guardrails AdminGuardrails) GRPCServerOption

WithAdminGuardrails configures safety limits for admin mutations.

func WithGRPCAuth added in v0.1.6

func WithGRPCAuth(auth GRPCAuthFunc) GRPCServerOption

WithGRPCAuth installs an authorization hook for gRPC requests.

type HandlerSpec added in v0.1.1

type HandlerSpec struct {
	// Make returns a zero value of the payload message to unmarshal into.
	Make func() protoreflect.ProtoMessage
	// Fn does the work. Your Task.Execute will call this.
	Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}

HandlerSpec describes a single handler for a gRPC method.

func TypedHandler added in v0.1.9

func TypedHandler[T protoreflect.ProtoMessage](spec TypedHandlerSpec[T]) HandlerSpec

TypedHandler converts a typed spec into the untyped HandlerSpec.

type MetricsSnapshot added in v0.1.0

type MetricsSnapshot struct {
	Scheduled        int64
	Running          int64
	Completed        int64
	Failed           int64
	Cancelled        int64
	Retried          int64
	ResultsDropped   int64
	QueueDepth       int
	TaskLatencyCount int64
	TaskLatencyTotal time.Duration
	TaskLatencyMax   time.Duration
}

MetricsSnapshot represents a snapshot of task metrics.

type Middleware

type Middleware[T any] func(T) T

Middleware describes a generic middleware.

type OTelMetricsOption added in v0.1.6

type OTelMetricsOption func(*otelMetricsConfig)

OTelMetricsOption configures OpenTelemetry metrics.

func WithOTelMeterName added in v0.1.6

func WithOTelMeterName(name string) OTelMetricsOption

WithOTelMeterName overrides the default OTel meter name.

func WithOTelMeterVersion added in v0.1.6

func WithOTelMeterVersion(version string) OTelMetricsOption

WithOTelMeterVersion sets the instrumentation version reported by the meter.

type ProtoDurableCodec added in v0.1.8

type ProtoDurableCodec struct{}

ProtoDurableCodec uses protobuf for serialization.

func (ProtoDurableCodec) Marshal added in v0.1.8

func (ProtoDurableCodec) Marshal(msg proto.Message) ([]byte, error)

Marshal marshals a protobuf message to bytes.

func (ProtoDurableCodec) Unmarshal added in v0.1.8

func (ProtoDurableCodec) Unmarshal(data []byte, msg proto.Message) error

Unmarshal unmarshals bytes into a protobuf message.

type QueueConfigurableBackend added in v0.1.9

type QueueConfigurableBackend interface {
	ConfigureQueues(defaultQueue string, weights map[string]int)
}

QueueConfigurableBackend allows TaskManager to propagate queue configuration to backends.

type RedisDurableBackend added in v0.1.8

type RedisDurableBackend struct {
	// contains filtered or unexported fields
}

RedisDurableBackend implements a durable task backend using Redis.

func NewRedisDurableBackend added in v0.1.8

func NewRedisDurableBackend(client rueidis.Client, opts ...RedisDurableOption) (*RedisDurableBackend, error)

NewRedisDurableBackend creates a new RedisDurableBackend with the given Redis client and options.

func (*RedisDurableBackend) Ack added in v0.1.8

Ack acknowledges the successful processing of a durable task.

func (*RedisDurableBackend) AdminAuditEvents added in v0.2.2

AdminAuditEvents returns recent admin mutation audit events.

func (*RedisDurableBackend) AdminCreateSchedule added in v0.2.2

func (*RedisDurableBackend) AdminCreateSchedule(ctx context.Context, _ AdminScheduleSpec) (AdminSchedule, error)

AdminCreateSchedule is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminDLQ added in v0.2.2

AdminDLQ returns entries from the dead letter queue.

func (*RedisDurableBackend) AdminDLQEntry added in v0.2.2

func (b *RedisDurableBackend) AdminDLQEntry(ctx context.Context, id string) (AdminDLQEntryDetail, error)

AdminDLQEntry returns detailed DLQ entry information.

func (*RedisDurableBackend) AdminDeleteJob added in v0.2.2

func (b *RedisDurableBackend) AdminDeleteJob(ctx context.Context, name string) (bool, error)

AdminDeleteJob removes a job definition.

func (*RedisDurableBackend) AdminDeleteSchedule added in v0.2.2

func (*RedisDurableBackend) AdminDeleteSchedule(ctx context.Context, _ string) (bool, error)

AdminDeleteSchedule is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminJob added in v0.2.2

func (b *RedisDurableBackend) AdminJob(ctx context.Context, name string) (AdminJob, error)

AdminJob returns a job definition by name.

func (*RedisDurableBackend) AdminJobEvents added in v0.2.2

AdminJobEvents returns job execution events from Redis.

func (*RedisDurableBackend) AdminJobs added in v0.2.2

func (b *RedisDurableBackend) AdminJobs(ctx context.Context) ([]AdminJob, error)

AdminJobs lists registered job definitions.

func (*RedisDurableBackend) AdminOverview added in v0.2.2

func (b *RedisDurableBackend) AdminOverview(ctx context.Context) (AdminOverview, error)

AdminOverview retrieves an overview of the durable backend status.

func (*RedisDurableBackend) AdminPause added in v0.2.2

func (b *RedisDurableBackend) AdminPause(ctx context.Context) error

AdminPause pauses dequeueing of tasks.

func (*RedisDurableBackend) AdminPauseQueue added in v0.2.2

func (b *RedisDurableBackend) AdminPauseQueue(ctx context.Context, name string, paused bool) (AdminQueueSummary, error)

AdminPauseQueue pauses or resumes a specific queue.

func (*RedisDurableBackend) AdminPauseSchedule added in v0.2.2

func (*RedisDurableBackend) AdminPauseSchedule(ctx context.Context, _ string, _ bool) (AdminSchedule, error)

AdminPauseSchedule is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminPauseSchedules added in v0.2.2

func (*RedisDurableBackend) AdminPauseSchedules(ctx context.Context, _ bool) (int, error)

AdminPauseSchedules is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminQueue added in v0.2.2

func (b *RedisDurableBackend) AdminQueue(ctx context.Context, name string) (AdminQueueSummary, error)

AdminQueue returns a summary for a single queue.

func (*RedisDurableBackend) AdminQueues added in v0.2.2

func (b *RedisDurableBackend) AdminQueues(ctx context.Context) ([]AdminQueueSummary, error)

AdminQueues returns summaries for all queues.

func (*RedisDurableBackend) AdminRecordAuditEvent added in v0.2.2

func (b *RedisDurableBackend) AdminRecordAuditEvent(ctx context.Context, event AdminAuditEvent, limit int) error

AdminRecordAuditEvent persists an admin mutation audit event.

func (*RedisDurableBackend) AdminRecordJobEvent added in v0.2.2

func (b *RedisDurableBackend) AdminRecordJobEvent(ctx context.Context, event AdminJobEvent, limit int) error

AdminRecordJobEvent persists a job execution event.

func (*RedisDurableBackend) AdminReplayDLQ added in v0.2.2

func (b *RedisDurableBackend) AdminReplayDLQ(ctx context.Context, limit int) (int, error)

AdminReplayDLQ replays tasks from the dead letter queue back to their respective ready queues.

func (*RedisDurableBackend) AdminReplayDLQByID added in v0.2.2

func (b *RedisDurableBackend) AdminReplayDLQByID(ctx context.Context, ids []string) (int, error)

AdminReplayDLQByID replays specific DLQ entries by ID.

func (*RedisDurableBackend) AdminResetQueueWeight added in v0.2.2

func (b *RedisDurableBackend) AdminResetQueueWeight(ctx context.Context, name string) (AdminQueueSummary, error)

AdminResetQueueWeight resets a queue weight to the default.

func (*RedisDurableBackend) AdminResume added in v0.2.2

func (b *RedisDurableBackend) AdminResume(ctx context.Context) error

AdminResume resumes dequeueing of tasks.

func (*RedisDurableBackend) AdminRunSchedule added in v0.2.2

func (*RedisDurableBackend) AdminRunSchedule(ctx context.Context, _ string) (string, error)

AdminRunSchedule is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminScheduleEvents added in v0.2.2

AdminScheduleEvents is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminScheduleFactories added in v0.2.2

func (*RedisDurableBackend) AdminScheduleFactories(ctx context.Context) ([]AdminScheduleFactory, error)

AdminScheduleFactories is not supported by the Redis durable backend.

func (*RedisDurableBackend) AdminSchedules added in v0.2.2

func (*RedisDurableBackend) AdminSchedules(ctx context.Context) ([]AdminSchedule, error)

AdminSchedules returns cron schedule data if supported.

func (*RedisDurableBackend) AdminSetQueueWeight added in v0.2.2

func (b *RedisDurableBackend) AdminSetQueueWeight(ctx context.Context, name string, weight int) (AdminQueueSummary, error)

AdminSetQueueWeight updates the scheduler weight for a queue.

func (*RedisDurableBackend) AdminUpsertJob added in v0.2.2

func (b *RedisDurableBackend) AdminUpsertJob(ctx context.Context, spec AdminJobSpec) (AdminJob, error)

AdminUpsertJob creates or updates a job definition.

func (*RedisDurableBackend) ConfigureQueues added in v0.1.9

func (b *RedisDurableBackend) ConfigureQueues(defaultQueue string, weights map[string]int)

ConfigureQueues applies default queue and weights for durable scheduling.

func (*RedisDurableBackend) Dequeue added in v0.1.8

func (b *RedisDurableBackend) Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)

Dequeue retrieves a batch of durable tasks from the Redis backend with a lease.

func (*RedisDurableBackend) Enqueue added in v0.1.8

func (b *RedisDurableBackend) Enqueue(ctx context.Context, task DurableTask) error

Enqueue adds a new durable task to the Redis backend.

func (*RedisDurableBackend) Extend added in v0.1.8

func (b *RedisDurableBackend) Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error

Extend renews the processing lease for a durable task.

func (*RedisDurableBackend) Fail added in v0.1.8

Fail marks a durable task as failed and moves it to the dead letter queue.

func (*RedisDurableBackend) Nack added in v0.1.8

Nack negatively acknowledges a durable task, making it available for reprocessing after a delay.

type RedisDurableOption added in v0.1.8

type RedisDurableOption func(*RedisDurableBackend)

RedisDurableOption defines an option for configuring RedisDurableBackend.

func WithRedisAdminJobEventStore added in v0.2.2

func WithRedisAdminJobEventStore(store AdminJobEventStore) RedisDurableOption

WithRedisAdminJobEventStore configures a job event store for admin events.

func WithRedisDurableBatchSize added in v0.1.8

func WithRedisDurableBatchSize(size int) RedisDurableOption

WithRedisDurableBatchSize sets the batch size for dequeuing tasks.

func WithRedisDurableDefaultQueue added in v0.1.9

func WithRedisDurableDefaultQueue(name string) RedisDurableOption

WithRedisDurableDefaultQueue sets the default queue name for durable tasks.

func WithRedisDurableGlobalRateLimit added in v0.2.1

func WithRedisDurableGlobalRateLimit(rate float64, burst int) RedisDurableOption

WithRedisDurableGlobalRateLimit enables a Redis-based global rate limit for dequeue. The rate is tokens per second and burst is the max token bucket size.

func WithRedisDurableLeaderLock added in v0.2.1

func WithRedisDurableLeaderLock(lease time.Duration) RedisDurableOption

WithRedisDurableLeaderLock enables a Redis-based leader lock for dequeue coordination.

func WithRedisDurablePrefix added in v0.1.8

func WithRedisDurablePrefix(prefix string) RedisDurableOption

WithRedisDurablePrefix sets the key prefix for Redis durable backend.

func WithRedisDurableQueueWeights added in v0.1.9

func WithRedisDurableQueueWeights(weights map[string]int) RedisDurableOption

WithRedisDurableQueueWeights sets queue weights for durable task scheduling.

type Result added in v0.0.4

type Result struct {
	Task   *Task // the task that produced the result
	Result any   // the result of the task
	Error  error // the error returned by the task
}

Result is a task result.

func (*Result) String added in v0.0.4

func (r *Result) String() string

String returns a string representation of the result.

type ResultDropPolicy added in v0.1.6

type ResultDropPolicy uint8

ResultDropPolicy defines how to handle full subscriber buffers.

const (
	// DropNewest drops the new result when the subscriber buffer is full.
	DropNewest ResultDropPolicy = iota
	// DropOldest drops the oldest buffered result to make room for the new one.
	DropOldest
)

type RetentionPolicy added in v0.1.4

type RetentionPolicy struct {
	TTL             time.Duration
	MaxEntries      int
	CleanupInterval time.Duration
}

RetentionPolicy controls how completed tasks are kept in the registry.

type Service

type Service interface {
	// contains filtered or unexported methods
}

Service is an interface for a task manager.

type Task

type Task struct {
	ID          uuid.UUID          `json:"id"`          // ID is the id of the task
	Name        string             `json:"name"`        // Name is the name of the task
	Description string             `json:"description"` // Description is the description of the task
	Priority    int                `json:"priority"`    // Priority is the priority of the task
	RunAt       time.Time          `json:"run_at"`      // RunAt is the earliest time the task should execute
	Queue       string             `json:"queue"`       // Queue is the queue name for scheduling
	Weight      int                `json:"weight"`      // Weight influences scheduling share within a queue
	Execute     TaskFunc           `json:"-"`           // Execute is the function that will be executed by the task
	Ctx         context.Context    `json:"-"`           // Ctx is the context of the task
	CancelFunc  context.CancelFunc `json:"-"`           // CancelFunc is the cancel function of the task

	Retries    int           `json:"retries"`     // Retries is the maximum number of retries for failed tasks
	RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
	// contains filtered or unexported fields
}

Task represents a function that can be executed by the task manager.

Note: access Task state via methods to avoid data races.

func NewTask added in v0.0.7

func NewTask(ctx context.Context, fn TaskFunc) (*Task, error)

NewTask creates a new task with the provided function and context.

func (*Task) CancelledAt added in v0.1.4

func (task *Task) CancelledAt() time.Time

CancelledAt returns the cancellation time if set.

func (*Task) CancelledChan added in v0.0.4

func (task *Task) CancelledChan() <-chan struct{}

CancelledChan returns a channel which gets closed when the task is cancelled.

func (*Task) CompletedAt added in v0.1.4

func (task *Task) CompletedAt() time.Time

CompletedAt returns the completion time if set.

func (*Task) Error added in v0.0.2

func (task *Task) Error() error

Error returns the task error if available.

func (*Task) IsValid added in v0.0.2

func (task *Task) IsValid() (err error)

IsValid returns an error if the task is invalid.

func (*Task) Result added in v0.0.8

func (task *Task) Result() any

Result returns the task result if available.

func (*Task) ShouldSchedule added in v0.0.5

func (task *Task) ShouldSchedule() error

ShouldSchedule returns an error if the task should not be scheduled.

func (*Task) StartedAt added in v0.1.4

func (task *Task) StartedAt() time.Time

StartedAt returns the start time if set.

func (*Task) Status added in v0.0.4

func (task *Task) Status() TaskStatus

Status returns the current task status.

type TaskFunc added in v0.0.2

type TaskFunc func(ctx context.Context, args ...any) (any, error)

TaskFunc signature of `Task` function.

type TaskHooks added in v0.1.5

type TaskHooks struct {
	OnQueued func(task *Task)
	OnStart  func(task *Task)
	OnFinish func(task *Task, status TaskStatus, result any, err error)
	OnRetry  func(task *Task, delay time.Duration, attempt int)
}

TaskHooks defines optional callbacks for task lifecycle events.

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks.

func NewTaskManager

func NewTaskManager(
	ctx context.Context,
	maxWorkers, maxTasks int,
	tasksPerSecond float64,
	timeout, retryDelay time.Duration,
	maxRetries int,
) *TaskManager

NewTaskManager creates a new task manager.

  • ctx is the context for the task manager
  • maxWorkers is the number of workers to start, if <=0, the number of CPUs will be used
  • maxTasks is the maximum number of tasks that can be queued at once, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second; <=0 disables rate limiting The limiter uses a burst size of min(maxWorkers, maxTasks) for deterministic throttling.
  • timeout is the default timeout for tasks, defaults to 5 minutes
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)

func NewTaskManagerWithDefaults added in v0.0.5

func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager

NewTaskManagerWithDefaults creates a new task manager with default values.

  • maxWorkers: runtime.NumCPU()
  • maxTasks: 10
  • tasksPerSecond: 5
  • timeout: 5 minutes
  • retryDelay: 1 second
  • maxRetries: 3

func NewTaskManagerWithOptions added in v0.1.8

func NewTaskManagerWithOptions(ctx context.Context, opts ...TaskManagerOption) *TaskManager

NewTaskManagerWithOptions creates a new task manager using functional options.

func (*TaskManager) AdminAuditEvents added in v0.2.2

func (tm *TaskManager) AdminAuditEvents(
	ctx context.Context,
	filter AdminAuditEventFilter,
) (AdminAuditEventPage, error)

AdminAuditEvents returns recent admin mutation audit records.

func (*TaskManager) AdminCreateSchedule added in v0.2.2

func (tm *TaskManager) AdminCreateSchedule(ctx context.Context, spec AdminScheduleSpec) (AdminSchedule, error)

AdminCreateSchedule creates or updates a cron schedule by name.

func (*TaskManager) AdminDLQ added in v0.2.2

func (tm *TaskManager) AdminDLQ(ctx context.Context, filter AdminDLQFilter) (AdminDLQPage, error)

AdminDLQ lists DLQ entries.

func (*TaskManager) AdminDLQEntry added in v0.2.2

func (tm *TaskManager) AdminDLQEntry(ctx context.Context, id string) (AdminDLQEntryDetail, error)

AdminDLQEntry returns a detailed DLQ entry by ID.

func (*TaskManager) AdminDeleteJob added in v0.2.2

func (tm *TaskManager) AdminDeleteJob(ctx context.Context, name string) (bool, error)

AdminDeleteJob removes a job definition and its cron factory.

func (*TaskManager) AdminDeleteSchedule added in v0.2.2

func (tm *TaskManager) AdminDeleteSchedule(ctx context.Context, name string) (bool, error)

AdminDeleteSchedule removes a cron schedule by name.

func (*TaskManager) AdminJob added in v0.2.2

func (tm *TaskManager) AdminJob(ctx context.Context, name string) (AdminJob, error)

AdminJob returns a job definition by name.

func (*TaskManager) AdminJobEvents added in v0.2.2

func (tm *TaskManager) AdminJobEvents(
	ctx context.Context,
	filter AdminJobEventFilter,
) (AdminJobEventPage, error)

AdminJobEvents returns recent job execution events.

func (*TaskManager) AdminJobs added in v0.2.2

func (tm *TaskManager) AdminJobs(ctx context.Context) ([]AdminJob, error)

AdminJobs lists registered job definitions.

func (*TaskManager) AdminOverview added in v0.2.2

func (tm *TaskManager) AdminOverview(ctx context.Context) (AdminOverview, error)

AdminOverview returns an admin overview snapshot.

func (*TaskManager) AdminPause added in v0.2.2

func (tm *TaskManager) AdminPause(ctx context.Context) error

AdminPause pauses durable dequeue.

func (*TaskManager) AdminPauseQueue added in v0.2.2

func (tm *TaskManager) AdminPauseQueue(ctx context.Context, name string, paused bool) (AdminQueueSummary, error)

AdminPauseQueue pauses or resumes a specific queue.

func (*TaskManager) AdminPauseSchedule added in v0.2.2

func (tm *TaskManager) AdminPauseSchedule(ctx context.Context, name string, paused bool) (AdminSchedule, error)

AdminPauseSchedule pauses or resumes a cron schedule by name.

func (*TaskManager) AdminPauseSchedules added in v0.2.2

func (tm *TaskManager) AdminPauseSchedules(ctx context.Context, paused bool) (int, error)

AdminPauseSchedules pauses or resumes all cron schedules.

func (*TaskManager) AdminQueue added in v0.2.2

func (tm *TaskManager) AdminQueue(ctx context.Context, name string) (AdminQueueSummary, error)

AdminQueue returns a queue summary by name.

func (*TaskManager) AdminQueues added in v0.2.2

func (tm *TaskManager) AdminQueues(ctx context.Context) ([]AdminQueueSummary, error)

AdminQueues lists queue summaries.

func (*TaskManager) AdminRecordAuditEvent added in v0.2.2

func (tm *TaskManager) AdminRecordAuditEvent(ctx context.Context, event AdminAuditEvent, _ int) error

AdminRecordAuditEvent persists an admin mutation audit record.

func (*TaskManager) AdminReplayDLQ added in v0.2.2

func (tm *TaskManager) AdminReplayDLQ(ctx context.Context, limit int) (int, error)

AdminReplayDLQ replays DLQ entries.

func (*TaskManager) AdminReplayDLQByID added in v0.2.2

func (tm *TaskManager) AdminReplayDLQByID(ctx context.Context, ids []string) (int, error)

AdminReplayDLQByID replays specific DLQ entries by ID.

func (*TaskManager) AdminResetQueueWeight added in v0.2.2

func (tm *TaskManager) AdminResetQueueWeight(ctx context.Context, name string) (AdminQueueSummary, error)

AdminResetQueueWeight resets a queue weight to default.

func (*TaskManager) AdminResume added in v0.2.2

func (tm *TaskManager) AdminResume(ctx context.Context) error

AdminResume resumes durable dequeue.

func (*TaskManager) AdminRunJob added in v0.2.2

func (tm *TaskManager) AdminRunJob(ctx context.Context, name string) (string, error)

AdminRunJob enqueues a job immediately as a durable task.

func (*TaskManager) AdminRunSchedule added in v0.2.2

func (tm *TaskManager) AdminRunSchedule(ctx context.Context, name string) (string, error)

AdminRunSchedule triggers a cron schedule immediately.

func (*TaskManager) AdminScheduleEvents added in v0.2.2

func (tm *TaskManager) AdminScheduleEvents(
	ctx context.Context,
	filter AdminScheduleEventFilter,
) (AdminScheduleEventPage, error)

AdminScheduleEvents returns recent cron schedule execution events.

func (*TaskManager) AdminScheduleFactories added in v0.2.2

func (tm *TaskManager) AdminScheduleFactories(ctx context.Context) ([]AdminScheduleFactory, error)

AdminScheduleFactories lists registered schedule factories.

func (*TaskManager) AdminSchedules added in v0.2.2

func (tm *TaskManager) AdminSchedules(ctx context.Context) ([]AdminSchedule, error)

AdminSchedules lists cron schedules.

func (*TaskManager) AdminSetQueueWeight added in v0.2.2

func (tm *TaskManager) AdminSetQueueWeight(ctx context.Context, name string, weight int) (AdminQueueSummary, error)

AdminSetQueueWeight updates queue weight and returns the updated summary.

func (*TaskManager) AdminUpsertJob added in v0.2.2

func (tm *TaskManager) AdminUpsertJob(ctx context.Context, spec AdminJobSpec) (AdminJob, error)

AdminUpsertJob creates or updates a job definition and registers a cron factory.

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks.

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID) error

CancelTask cancels a task by its ID.

func (*TaskManager) ExecuteTask added in v0.0.2

func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)

ExecuteTask executes a task given its ID and returns the result.

func (*TaskManager) GetActiveTasks added in v0.0.4

func (tm *TaskManager) GetActiveTasks() int

GetActiveTasks returns the number of running tasks.

func (*TaskManager) GetMetrics added in v0.1.0

func (tm *TaskManager) GetMetrics() MetricsSnapshot

GetMetrics returns a snapshot of current metrics.

func (*TaskManager) GetResults

func (tm *TaskManager) GetResults() <-chan Result

GetResults returns a results channel (compatibility shim for legacy API). Use SubscribeResults for explicit unsubscription and buffer control.

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)

GetTask gets a task by its ID.

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []*Task

GetTasks gets all tasks.

func (*TaskManager) IsEmpty added in v0.0.7

func (tm *TaskManager) IsEmpty() bool

IsEmpty checks if the task scheduler queue is empty.

func (*TaskManager) RegisterCronTask added in v0.2.1

func (tm *TaskManager) RegisterCronTask(
	ctx context.Context,
	name string,
	spec string,
	factory CronTaskFactory,
) error

RegisterCronTask registers a cron job that enqueues a task on each tick.

func (*TaskManager) RegisterDurableCronTask added in v0.2.1

func (tm *TaskManager) RegisterDurableCronTask(
	ctx context.Context,
	name string,
	spec string,
	factory CronDurableFactory,
) error

RegisterDurableCronTask registers a cron job that enqueues a durable task on each tick.

func (*TaskManager) RegisterDurableTask added in v0.1.8

func (tm *TaskManager) RegisterDurableTask(ctx context.Context, task DurableTask) error

RegisterDurableTask registers a durable task into the configured backend.

func (*TaskManager) RegisterDurableTaskAfter added in v0.2.0

func (tm *TaskManager) RegisterDurableTaskAfter(ctx context.Context, task DurableTask, delay time.Duration) error

RegisterDurableTaskAfter registers a durable task to execute after the provided delay.

func (*TaskManager) RegisterDurableTaskAt added in v0.2.0

func (tm *TaskManager) RegisterDurableTaskAt(ctx context.Context, task DurableTask, runAt time.Time) error

RegisterDurableTaskAt registers a durable task to execute at or after the provided time.

func (*TaskManager) RegisterDurableTasks added in v0.1.8

func (tm *TaskManager) RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error

RegisterDurableTasks registers multiple durable tasks.

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error

RegisterTask registers a new task to the task manager.

func (*TaskManager) RegisterTaskAfter added in v0.2.0

func (tm *TaskManager) RegisterTaskAfter(ctx context.Context, task *Task, delay time.Duration) error

RegisterTaskAfter registers a new task to execute after the provided delay.

func (*TaskManager) RegisterTaskAt added in v0.2.0

func (tm *TaskManager) RegisterTaskAt(ctx context.Context, task *Task, runAt time.Time) error

RegisterTaskAt registers a new task to execute at or after the provided time.

func (*TaskManager) RegisterTasks added in v0.0.4

func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error

RegisterTasks registers multiple tasks to the task manager at once.

func (*TaskManager) SetHooks added in v0.1.5

func (tm *TaskManager) SetHooks(hooks TaskHooks)

SetHooks configures callbacks for task lifecycle events.

func (*TaskManager) SetMaxWorkers added in v0.1.0

func (tm *TaskManager) SetMaxWorkers(maxWorkers int)

SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.

func (*TaskManager) SetMeterProvider added in v0.1.6

func (tm *TaskManager) SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error

SetMeterProvider enables OpenTelemetry metrics collection. Passing nil disables it.

func (*TaskManager) SetResultsDropPolicy added in v0.1.6

func (tm *TaskManager) SetResultsDropPolicy(policy ResultDropPolicy)

SetResultsDropPolicy configures how full subscriber buffers are handled.

func (*TaskManager) SetRetentionPolicy added in v0.1.4

func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)

SetRetentionPolicy configures task registry retention.

func (*TaskManager) SetTracer added in v0.1.6

func (tm *TaskManager) SetTracer(tracer TaskTracer)

SetTracer configures the task tracer.

func (*TaskManager) StartWorkers added in v0.0.4

func (tm *TaskManager) StartWorkers(ctx context.Context)

StartWorkers starts the task manager's workers and scheduler (idempotent).

func (*TaskManager) StopGraceful added in v0.1.4

func (tm *TaskManager) StopGraceful(ctx context.Context) error

StopGraceful stops accepting new tasks and waits for completion before stopping workers.

func (*TaskManager) StopNow added in v0.1.4

func (tm *TaskManager) StopNow()

StopNow cancels running tasks and stops workers immediately.

func (*TaskManager) SubscribeResults added in v0.1.4

func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())

SubscribeResults returns a results channel and an unsubscribe function.

func (*TaskManager) SyncJobFactories added in v0.2.2

func (tm *TaskManager) SyncJobFactories(ctx context.Context) error

SyncJobFactories loads persisted jobs and registers factories for them.

func (*TaskManager) UnregisterCronTask added in v0.2.1

func (tm *TaskManager) UnregisterCronTask(name string) bool

UnregisterCronTask removes a cron job by name.

func (*TaskManager) Wait added in v0.0.4

func (tm *TaskManager) Wait(ctx context.Context) error

Wait waits for all tasks to complete or context cancellation.

type TaskManagerOption added in v0.1.8

type TaskManagerOption func(*taskManagerConfig)

TaskManagerOption configures a TaskManager.

func WithAdminAuditEventLimit added in v0.2.2

func WithAdminAuditEventLimit(limit int) TaskManagerOption

WithAdminAuditEventLimit sets the maximum number of in-memory admin audit events.

func WithCronLocation added in v0.2.1

func WithCronLocation(location *time.Location) TaskManagerOption

WithCronLocation sets the time zone for cron schedules.

func WithDefaultQueue added in v0.1.9

func WithDefaultQueue(name string) TaskManagerOption

WithDefaultQueue sets the default queue name for tasks without a queue.

func WithDurableBackend added in v0.1.8

func WithDurableBackend(backend DurableBackend) TaskManagerOption

WithDurableBackend sets the durable backend for the TaskManager.

func WithDurableBatchSize added in v0.1.8

func WithDurableBatchSize(size int) TaskManagerOption

WithDurableBatchSize sets the durable task batch size.

func WithDurableCodec added in v0.1.8

func WithDurableCodec(codec DurableCodec) TaskManagerOption

WithDurableCodec sets the durable codec for the TaskManager.

func WithDurableHandlers added in v0.1.8

func WithDurableHandlers(handlers map[string]DurableHandlerSpec) TaskManagerOption

WithDurableHandlers sets the durable handlers for the TaskManager.

func WithDurableLease added in v0.1.8

func WithDurableLease(lease time.Duration) TaskManagerOption

WithDurableLease sets the durable task lease duration.

func WithDurableLeaseRenewalInterval added in v0.1.8

func WithDurableLeaseRenewalInterval(interval time.Duration) TaskManagerOption

WithDurableLeaseRenewalInterval sets the interval for renewing durable task leases. Set to <= 0 to disable renewal.

func WithDurablePollInterval added in v0.1.8

func WithDurablePollInterval(interval time.Duration) TaskManagerOption

WithDurablePollInterval sets the durable task polling interval.

func WithMaxRetries added in v0.1.8

func WithMaxRetries(n int) TaskManagerOption

WithMaxRetries sets the maximum number of retries for a task.

func WithMaxTasks added in v0.1.8

func WithMaxTasks(n int) TaskManagerOption

WithMaxTasks sets the maximum number of tasks in the queue.

func WithMaxWorkers added in v0.1.8

func WithMaxWorkers(n int) TaskManagerOption

WithMaxWorkers sets the maximum number of workers.

func WithQueueWeights added in v0.1.9

func WithQueueWeights(weights map[string]int) TaskManagerOption

WithQueueWeights sets the queue weight map for named queues.

func WithRetryDelay added in v0.1.8

func WithRetryDelay(delay time.Duration) TaskManagerOption

WithRetryDelay sets the delay between task retries.

func WithTasksPerSecond added in v0.1.8

func WithTasksPerSecond(n float64) TaskManagerOption

WithTasksPerSecond sets the maximum number of tasks to start per second.

func WithTimeout added in v0.1.8

func WithTimeout(timeout time.Duration) TaskManagerOption

WithTimeout sets the task execution timeout.

type TaskSpan added in v0.1.6

type TaskSpan interface {
	End(err error)
}

TaskSpan represents an in-flight task span.

type TaskStatus added in v0.0.4

type TaskStatus uint8

TaskStatus is a value used to represent the task status.

func (TaskStatus) String added in v0.0.4

func (ts TaskStatus) String() string

String returns the string representation of the task status.

type TaskTracer added in v0.1.6

type TaskTracer interface {
	Start(ctx context.Context, task *Task) (context.Context, TaskSpan)
}

TaskTracer provides tracing spans around task execution.

type TypedDurableHandlerSpec added in v0.1.9

type TypedDurableHandlerSpec[T proto.Message] struct {
	Make func() T
	Fn   func(ctx context.Context, payload T) (any, error)
}

TypedDurableHandlerSpec provides a compile-time checked handler signature for durable tasks.

type TypedDurableRegistry added in v0.1.9

type TypedDurableRegistry struct {
	// contains filtered or unexported fields
}

TypedDurableRegistry collects typed durable handlers and exposes the untyped map.

func NewTypedDurableRegistry added in v0.1.9

func NewTypedDurableRegistry() *TypedDurableRegistry

NewTypedDurableRegistry constructs a registry for typed durable handlers.

func (*TypedDurableRegistry) Add added in v0.1.9

Add registers an untyped durable handler spec into the registry.

func (*TypedDurableRegistry) Handlers added in v0.1.9

Handlers returns a defensive copy of the registered durable handler map.

type TypedHandlerRegistry added in v0.1.9

type TypedHandlerRegistry struct {
	// contains filtered or unexported fields
}

TypedHandlerRegistry collects typed gRPC handlers and exposes the untyped map.

func NewTypedHandlerRegistry added in v0.1.9

func NewTypedHandlerRegistry() *TypedHandlerRegistry

NewTypedHandlerRegistry constructs a registry for typed gRPC handlers.

func (*TypedHandlerRegistry) Add added in v0.1.9

func (r *TypedHandlerRegistry) Add(name string, spec HandlerSpec) error

Add registers an untyped handler spec into the registry.

func (*TypedHandlerRegistry) Handlers added in v0.1.9

func (r *TypedHandlerRegistry) Handlers() map[string]HandlerSpec

Handlers returns a defensive copy of the registered handler map.

type TypedHandlerSpec added in v0.1.9

type TypedHandlerSpec[T protoreflect.ProtoMessage] struct {
	Make func() T
	Fn   func(ctx context.Context, payload T) (any, error)
}

TypedHandlerSpec provides a compile-time checked handler signature for gRPC tasks.

Directories

Path Synopsis
__examples
durable_redis command
grpc command
grpc_durable command
manual command
middleware command
multi command
otel_metrics command
otel_tracing command
tracing command
cmd
worker-admin command
worker-service command
workerctl command
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL