worker

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MPL-2.0 Imports: 26 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute prioritized tasks concurrently, backed by a TaskManager with a worker pool and a priority queue.

Breaking changes (January 2026)

  • Stop() removed. Use StopGraceful(ctx) or StopNow().
  • Local result streaming uses SubscribeResults(buffer); GetResults() is now a compatibility shim and the legacy local StreamResults() is removed (gRPC StreamResults remains).
  • RegisterTasks now returns an error.
  • Task.Execute replaces Fn in examples.
  • NewGRPCServer requires a handler map.
  • Rate limiting is deterministic: burst is min(maxWorkers, maxTasks) and ExecuteTask uses the shared limiter.
  • gRPC durable tasks use RegisterDurableTasks and the new DurableTask message.
  • When a durable backend is configured, use RegisterDurableTask(s) instead of RegisterTask(s).
  • DurableBackend now requires Extend (lease renewal support for custom backends).

Features

  • Task prioritization: tasks are scheduled by priority.
  • Concurrent execution: tasks run in a worker pool with strict rate limiting.
  • Middleware: wrap the TaskManager for logging/metrics, etc.
  • Results: fan-out subscriptions via SubscribeResults.
  • Cancellation: cancel tasks before or during execution.
  • Retries: exponential backoff with capped delays.
  • Durability: optional Redis-backed durable task queue (at-least-once, lease-based).

Architecture

flowchart LR
    Client[Client code] -->|register tasks| TaskManager
    TaskManager --> Queue[Priority Queue]
    Queue -->|dispatch| Worker1[Worker]
    Queue -->|dispatch| WorkerN[Worker]
    Worker1 --> Results[Result Broadcaster]
    WorkerN --> Results

gRPC Service

go-worker exposes its functionality over gRPC through the WorkerService. The service allows clients to register tasks, stream results, cancel running tasks and query their status.

Handlers and Payloads

The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.

Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function. For durable tasks, use RegisterDurableTasks with the DurableTask message (the payload is still an Any).

handlers := map[string]worker.HandlerSpec{
    "create_user": {
        Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
        Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
            p := payload.(*workerpb.CreateUserPayload)
            return &workerpb.CreateUserResponse{UserId: "1234"}, nil
        },
    },
}

srv := worker.NewGRPCServer(tm, handlers)

For production, configure TLS credentials and interceptors (logging/auth) on the gRPC server; see __examples/grpc for a complete setup. For a Redis-backed durable gRPC example, see __examples/grpc_durable.

Queue selection for gRPC tasks is done via metadata (worker.MetadataQueueKey, worker.MetadataWeightKey):

  • queue: named queue (empty means default)
  • weight: integer weight (as string)

Security defaults to follow in production:

  • Use TLS (prefer mTLS) for gRPC; the durable gRPC example uses insecure credentials for local demos only.
  • Scrub payloads and auth metadata from logs; log task IDs or correlation IDs instead of PII.
  • Implement auth via WithGRPCAuth and redact/validate tokens inside interceptors.
Typed handler registry (optional)

For compile-time payload checks in handlers, use the typed registry. It removes the need for payload type assertions inside your handler functions.

registry := worker.NewTypedHandlerRegistry()
_ = worker.AddTypedHandler(registry, "create_user", worker.TypedHandlerSpec[*workerpb.CreateUserPayload]{
    Make: func() *workerpb.CreateUserPayload { return &workerpb.CreateUserPayload{} },
    Fn: func(ctx context.Context, payload *workerpb.CreateUserPayload) (any, error) {
        return &workerpb.CreateUserResponse{UserId: "1234"}, nil
    },
})

srv := worker.NewGRPCServer(tm, registry.Handlers())
Durable gRPC client (example)

Use RegisterDurableTasks for persisted tasks (payload is still Any). Results stream is shared with non-durable tasks.

payload, _ := anypb.New(&workerpb.SendEmailPayload{
    To:      "ops@example.com",
    Subject: "Hello durable gRPC",
    Body:    "Persisted task",
})

resp, err := client.RegisterDurableTasks(ctx, &workerpb.RegisterDurableTasksRequest{
    Tasks: []*workerpb.DurableTask{
        {
            Name:           "send_email",
            Payload:        payload,
            IdempotencyKey: "durable:send_email:ops@example.com",
        },
    },
})
if err != nil {
    log.Fatal(err)
}
Authorization hook

You can enforce authentication/authorization at the gRPC boundary with WithGRPCAuth. Return a gRPC status error to control the response code (e.g., Unauthenticated or PermissionDenied).

auth := func(ctx context.Context, method string, _ any) error {
 md, _ := metadata.FromIncomingContext(ctx)
 values := md.Get("authorization")
 if len(values) == 0 {
  return status.Error(codes.Unauthenticated, "missing token")
 }

 token := strings.TrimSpace(strings.TrimPrefix(values[0], "Bearer "))
 if token != "expected-token" {
  return status.Error(codes.Unauthenticated, "missing or invalid token")
 }

 return nil
}

srv := worker.NewGRPCServer(tm, handlers, worker.WithGRPCAuth(auth))

Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for task execution or cancel tasks explicitly.

API Example (gRPC)

tm := worker.NewTaskManagerWithDefaults(context.Background())
handlers := map[string]worker.HandlerSpec{
    "create_user": {
        Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
        Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
            p := payload.(*workerpb.CreateUserPayload)
            return &workerpb.CreateUserResponse{UserId: "1234"}, nil
        },
    },
}

srv := worker.NewGRPCServer(tm, handlers)

gs := grpc.NewServer()
workerpb.RegisterWorkerServiceServer(gs, srv)
// listen and serve ...

client := workerpb.NewWorkerServiceClient(conn)

// register a task with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
    Username: "newuser",
    Email:    "newuser@example.com",
})
if err != nil {
    log.Fatal(err)
}

_, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
    Tasks: []*workerpb.Task{
        {
            Name:           "create_user",
            Payload:        payload,
            CorrelationId:  uuid.NewString(),
            IdempotencyKey: "create_user:newuser@example.com",
            Metadata:       map[string]string{"source": "api_example", "role": "admin"},
        },
    },
})

// cancel by id
_, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"})

// get task information
res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"})
fmt.Println(res.Status)

API Usage Examples

Quick Start
tm := worker.NewTaskManager(context.Background(), 2, 10, 5, 30*time.Second, time.Second, 3)

task := &worker.Task{
    ID:       uuid.New(),
    Priority: 1,
    Ctx:      context.Background(),
    Execute:  func(ctx context.Context, _ ...any) (any, error) { return "hello", nil },
}

if err := tm.RegisterTask(context.Background(), task); err != nil {
    log.Fatal(err)
}

results, cancel := tm.SubscribeResults(1)
res := <-results
cancel()

fmt.Println(res.Result)
Result backpressure

By default, full subscriber buffers drop new results. You can change the policy:

tm.SetResultsDropPolicy(worker.DropOldest)

GetResults() remains as a compatibility shim and returns a channel with a default buffer. Prefer SubscribeResults(buffer) so you can control buffering and explicitly unsubscribe.

Initialization

Create a new TaskManager by calling the NewTaskManager() function with the following parameters:

  • ctx is the base context for the task manager (used for shutdown and derived task contexts)
  • maxWorkers is the number of workers to start. If <= 0, it will default to the number of available CPUs
  • maxTasks is the maximum number of queued tasks, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second. If <= 0, rate limiting is disabled (the limiter uses a burst size of min(maxWorkers, maxTasks) for deterministic throttling)
  • timeout is the default timeout for tasks, defaults to 5 minutes
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 30*time.Second, 1*time.Second, 3)
Durable backend (Redis)

Durable tasks use a separate DurableTask type and a handler registry keyed by name. The default encoding is protobuf via ProtoDurableCodec. When a durable backend is enabled, RegisterTask/RegisterTasks are disabled in favor of RegisterDurableTask(s). See __examples/durable_redis for a runnable example.

client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})
if err != nil {
    log.Fatal(err)
}
defer client.Close()

backend, err := worker.NewRedisDurableBackend(client)
if err != nil {
    log.Fatal(err)
}

handlers := map[string]worker.DurableHandlerSpec{
    "send_email": {
        Make: func() proto.Message { return &workerpb.SendEmailRequest{} },
        Fn: func(ctx context.Context, payload proto.Message) (any, error) {
            req := payload.(*workerpb.SendEmailRequest)
            // process request
            return &workerpb.SendEmailResponse{MessageId: "msg-1"}, nil
        },
    },
}

tm := worker.NewTaskManagerWithOptions(
    context.Background(),
    worker.WithDurableBackend(backend),
    worker.WithDurableHandlers(handlers),
)

err = tm.RegisterDurableTask(context.Background(), worker.DurableTask{
    Handler: "send_email",
    Message: &workerpb.SendEmailRequest{To: "ops@example.com"},
    Retries: 5,
    Queue:   "email",
    Weight:  2,
})
if err != nil {
    log.Fatal(err)
}

Or use the typed durable registry for compile-time checks:

durableRegistry := worker.NewTypedDurableRegistry()
_ = worker.AddTypedDurableHandler(durableRegistry, "send_email", worker.TypedDurableHandlerSpec[*workerpb.SendEmailRequest]{
    Make: func() *workerpb.SendEmailRequest { return &workerpb.SendEmailRequest{} },
    Fn: func(ctx context.Context, payload *workerpb.SendEmailRequest) (any, error) {
        // process request
        return &workerpb.SendEmailResponse{MessageId: "msg-1"}, nil
    },
})

tm := worker.NewTaskManagerWithOptions(
    context.Background(),
    worker.WithDurableBackend(backend),
    worker.WithDurableHandlers(durableRegistry.Handlers()),
)

Defaults: lease is 30s, poll interval is 200ms, Redis dequeue batch is 50, and lease renewal is disabled (configurable via options). Queue weights for durable tasks can be configured with WithRedisDurableQueueWeights, and the default queue via WithRedisDurableDefaultQueue.

Operational notes (durable Redis):

  • Key hashing: Redis Lua scripts touch multiple keys; for clustered Redis, all keys must share the same hash slot. The backend auto-wraps the prefix in {} to enforce this (e.g., {go-worker}:ready).
  • DLQ: Failed tasks are pushed to a dead-letter list ({prefix}:dead).
  • DLQ replay: Use the workerctl durable dlq replay command or the __examples/durable_dlq_replay utility (dry-run by default; use --apply/-apply to replay).
  • Multi-node workers: Multiple workers can safely dequeue from the same backend. Lease timeouts handle worker crashes, but tune WithDurableLease for your workload.
  • Lease renewal: enable WithDurableLeaseRenewalInterval for long-running tasks to extend leases while a task executes.
  • Visibility: Ready/processing queues live in per-queue sorted sets: {prefix}:ready:<queue> and {prefix}:processing:<queue>. Known queues are tracked in {prefix}:queues.
  • Inspect utility: workerctl durable inspect (or __examples/durable_queue_inspect) prints ready/processing/dead counts; use --show-ids --queue=<name> (or -show-ids -queue=<name>) to display IDs.
CLI tooling (workerctl)

Build the CLI:

go build -o workerctl ./cmd/workerctl

Inspect queues:

./workerctl durable inspect --redis-addr localhost:6380 --redis-password supersecret --redis-prefix go-worker --queue default --show-ids --peek 10

List queues:

./workerctl durable queues --with-counts

Requeue specific tasks by ID:

./workerctl durable retry --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --apply

Requeue tasks from a source set (DLQ/ready/processing):

./workerctl durable retry --source dlq --limit 100 --apply
./workerctl durable retry --source ready --from-queue default --limit 50 --apply

Requeue a queue directly (shortcut):

./workerctl durable requeue --queue default --limit 50 --apply

Fetch a task by ID:

./workerctl durable get --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12

Delete a task (and optionally its hash):

./workerctl durable delete --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --apply
./workerctl durable delete --id 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12 --delete-hash --apply

Show stats in JSON:

./workerctl durable stats --json
./workerctl durable stats --watch 2s

Pause/resume durable dequeue:

./workerctl durable pause --apply
./workerctl durable resume --apply

Purge queues (use with care):

./workerctl durable purge --ready --processing --queue default --apply

Dump task metadata (JSON lines, no payloads):

./workerctl durable dump --queue default --ready --limit 100 > dump.jsonl

Export/import queue snapshots (JSONL):

./workerctl durable snapshot export --out snapshot.jsonl --ready --processing --dlq
./workerctl durable snapshot import --in snapshot.jsonl --apply

Replay DLQ items (dry-run by default):

./workerctl durable dlq replay --batch 100 --apply

Use --tls (and --tls-insecure if needed) for secure Redis connections.

Generate shell completion:

./workerctl completion zsh > "${fpath[1]}/_workerctl"
Multi-node coordination (durable Redis)

Durable processing is at-least-once. When multiple nodes consume from the same Redis backend:

  • Lease sizing: set WithDurableLease longer than your worst-case task duration (plus buffer). If a task exceeds its lease, it can be requeued and run again on another node.
  • Lease renewal (optional): set WithDurableLeaseRenewalInterval (less than the lease duration) to extend leases while tasks run.
  • Idempotency: enforce idempotency at the task level (idempotency key + handler-side dedupe) because duplicates are possible on retries and lease expiry.
  • Throughput control: worker count and polling interval are per node. If you need a global rate limit across nodes, enforce it externally or in the handler.
  • Clock skew: Redis uses server time for scores; keep node clocks in sync to avoid uneven dequeue/lease timing.
  • Isolation: use distinct prefixes per environment/region/tenant to avoid cross-talk.

Checklist:

  • Set WithDurableLease above p99 task duration (plus buffer).
  • Enable WithDurableLeaseRenewalInterval for tasks that can exceed the lease duration.
  • Keep task handlers idempotent; always use idempotency keys for external side effects.
  • Tune WithDurablePollInterval based on desired responsiveness vs. Redis load.
  • Scale WithMaxWorkers per node based on CPU and downstream throughput.

Example:

go run __examples/durable_queue_inspect/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker
go run __examples/durable_queue_inspect/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -queue=default -show-ids -peek=5

Sample output:

queue=default ready=3 processing=1
ready=3 processing=1 dead=0
ready IDs: 8c0f8b2d-0a4d-4a3b-9ad7-2d2a5b7f5d12, 9b18d5f2-3b7f-4d7a-9dd1-1bb1a3a56c55

DLQ replay example (dry-run by default):

go run __examples/durable_dlq_replay/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -batch=100
go run __examples/durable_dlq_replay/main.go -redis-addr=localhost:6380 -redis-password=supersecret -redis-prefix=go-worker -batch=100 -apply

Optional retention can be configured to prevent unbounded task registry growth:

tm.SetRetentionPolicy(worker.RetentionPolicy{
    TTL:        24 * time.Hour,
    MaxEntries: 100000,
})

Retention applies only to terminal tasks (completed/failed/cancelled/etc). Running or queued tasks are never evicted. Cleanup is best-effort: it runs on task completion and periodically when TTL > 0. If CleanupInterval is unset, the default interval is clamp(TTL/2, 1s, 1m). If MaxEntries is lower than the number of active tasks, the registry may exceed the limit until tasks finish.

Task lifecycle hooks can be configured for structured logging or tracing:

tm.SetHooks(worker.TaskHooks{
    OnQueued: func(task *worker.Task) {
        // log enqueue
    },
    OnStart: func(task *worker.Task) {
        // log start
    },
    OnFinish: func(task *worker.Task, status worker.TaskStatus, _ any, err error) {
        // log completion
        _ = err
        _ = status
    },
})

Tracing hooks can be configured with a tracer implementation:

tm.SetTracer(myTracer)

See __examples/tracing for a minimal logger-based tracer. See __examples/otel_tracing for OpenTelemetry tracing with a stdout exporter.

OpenTelemetry metrics

To export metrics with OpenTelemetry, configure a meter provider and pass it to the task manager:

exporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint())
if err != nil {
    log.Fatal(err)
}

reader := sdkmetric.NewPeriodicReader(exporter)
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
defer func() {
    _ = mp.Shutdown(context.Background())
}()

if err := tm.SetMeterProvider(mp); err != nil {
    log.Fatal(err)
}

See __examples/otel_metrics for a complete runnable example. See __examples/otel_metrics_otlp for an OTLP/HTTP exporter example.

Emitted metrics:

  • tasks_scheduled_total
  • tasks_running
  • tasks_completed_total
  • tasks_failed_total
  • tasks_cancelled_total
  • tasks_retried_total
  • results_dropped_total
  • queue_depth
  • task_latency_seconds
Registering Tasks

Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.

id := uuid.New()

task := &worker.Task{
    ID:          id,
    Name:        "Some task",
    Description: "Here goes the description of the task",
    Priority:    10,
    Queue:       "critical",
    Weight:      2,
    Ctx:         context.Background(),
    Execute: func(ctx context.Context, _ ...any) (any, error) {
        time.Sleep(time.Second)
        return fmt.Sprintf("task %s executed", id), nil
    },
    Retries:    3,
    RetryDelay: 2 * time.Second,
}

task2 := &worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Queue:    "default",
    Weight:   1,
    Ctx:      context.Background(),
    Execute:  func(ctx context.Context, _ ...any) (any, error) { return "Hello, World!", nil },
}

if err := tm.RegisterTasks(context.Background(), task, task2); err != nil {
    log.Fatal(err)
}

Queues and weights:

  • Queue groups tasks for scheduling. Empty means default.
  • Weight is a per-task scheduling hint within a queue (higher weight runs earlier among equal priorities).
  • Queue weights control inter-queue share via WithQueueWeights; change the default queue via WithDefaultQueue.

For gRPC, set metadata["queue"] and metadata["weight"] (string) on Task/DurableTask.

Scheduling Tasks

Schedule tasks for later execution with RunAt, RegisterTaskAt, or RegisterTaskAfter.

task, _ := worker.NewTask(context.Background(), func(ctx context.Context, _ ...any) (any, error) {
    return "delayed", nil
})

_ = tm.RegisterTaskAt(context.Background(), task, time.Now().Add(30*time.Second))
// or
_ = tm.RegisterTaskAfter(context.Background(), task, 30*time.Second)

Durable tasks can also be delayed by setting RunAt before RegisterDurableTask.

Stopping the Task Manager

Use StopGraceful to stop accepting new tasks and wait for completion, or StopNow to cancel tasks immediately.

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_ = tm.StopGraceful(ctx)
// or
// tm.StopNow()
Results

Subscribe to results with a dedicated channel per subscriber.

results, cancel := tm.SubscribeResults(10)

ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()

_ = tm.Wait(ctx)
cancel()

for res := range results {
    fmt.Println(res)
}
Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

_ = tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAll() method of the TaskManager struct.

tm.CancelAll()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

srv := worker.RegisterMiddleware[worker.Service](tm,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/google/uuid"
    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
)

func main() {
    tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 3*time.Second, 30*time.Second, 3)

    var srv worker.Service = worker.RegisterMiddleware[worker.Service](tm,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
        },
    )

    task := &worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Ctx:      context.Background(),
        Execute: func(ctx context.Context, _ ...any) (any, error) {
            return 2 + 5, nil
        },
    }

    _ = srv.RegisterTasks(context.Background(), task)

    results, cancel := srv.SubscribeResults(10)
    defer cancel()

    ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancelWait()
    _ = srv.Wait(ctx)

    for res := range results {
        fmt.Println(res)
    }
}

Versioning

This project follows Semantic Versioning.

Contribution Guidelines

We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.

Feature Requests

To propose new ideas, open an issue using the Feature request template.

Newcomer-Friendly Issues

Issues labeled good first issue or help wanted are ideal starting points for new contributors.

Release Notes

See CHANGELOG for the history of released versions.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// MetadataQueueKey is the metadata key for the queue name.
	MetadataQueueKey = "queue"
	// MetadataWeightKey is the metadata key for the task weight.
	MetadataWeightKey = "weight"
)
View Source
const (
	// ContextDeadlineReached means the context is past its deadline.
	ContextDeadlineReached = TaskStatus(1)
	// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
	RateLimited = TaskStatus(2)
	// Cancelled means `CancelTask` was invoked and the `Task` was cancelled.
	Cancelled = TaskStatus(3)
	// Failed means the `Task` failed.
	Failed = TaskStatus(4)
	// Queued means the `Task` is queued.
	Queued = TaskStatus(5)
	// Running means the `Task` is running.
	Running = TaskStatus(6)
	// Invalid means the `Task` is invalid.
	Invalid = TaskStatus(7)
	// Completed means the `Task` is completed.
	Completed = TaskStatus(8)
)

TaskStatus values.

View Source
const (
	// DefaultMaxTasks is the default maximum number of tasks that can be executed at once.
	DefaultMaxTasks = 10
	// DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second.
	DefaultTasksPerSecond = 5
	// DefaultQueueName is the default queue name when none is provided.
	DefaultQueueName = "default"
	// DefaultQueueWeight is the default scheduling weight for a queue.
	DefaultQueueWeight = 1
	// DefaultTaskWeight is the default scheduling weight for a task.
	DefaultTaskWeight = 1
	// DefaultTimeout is the default timeout for tasks.
	DefaultTimeout = 5 * time.Minute
	// DefaultRetryDelay is the default delay between retries.
	DefaultRetryDelay = 1 * time.Second
	// DefaultMaxRetries is the default maximum number of retries.
	DefaultMaxRetries = 3
	// ErrMsgContextDone is the error message used when the context is done.
	ErrMsgContextDone = "context done"
)

Variables

View Source
var (
	// ErrInvalidTaskID is returned when a task has an invalid ID.
	ErrInvalidTaskID = ewrap.New("invalid task id")
	// ErrInvalidTaskFunc is returned when a task has an invalid function.
	ErrInvalidTaskFunc = ewrap.New("invalid task function")
	// ErrInvalidTaskContext is returned when a task has an invalid context.
	ErrInvalidTaskContext = ewrap.New("invalid task context")
	// ErrTaskNotFound is returned when a task is not found.
	ErrTaskNotFound = ewrap.New("task not found")
	// ErrTaskTimeout is returned when a task times out.
	ErrTaskTimeout = ewrap.New("task timeout")
	// ErrTaskCancelled is returned when a task is cancelled.
	ErrTaskCancelled = ewrap.New("task cancelled")
	// ErrTaskAlreadyStarted is returned when a task is already started.
	ErrTaskAlreadyStarted = ewrap.New("task already started")
	// ErrTaskCompleted is returned when a task is already completed.
	ErrTaskCompleted = ewrap.New("task completed")
	// ErrDurableLeaseNotFound is returned when a durable lease cannot be renewed.
	ErrDurableLeaseNotFound = ewrap.New("durable lease not found")
)

Errors returned by the TaskManager.

View Source
var (
	// ErrHandlerNameRequired indicates a missing handler name during registration.
	ErrHandlerNameRequired = ewrap.New("handler name is required")
	// ErrHandlerSpecInvalid indicates an invalid handler spec during registration.
	ErrHandlerSpecInvalid = ewrap.New("handler spec is invalid")
	// ErrHandlerAlreadyRegistered indicates a duplicate handler name during registration.
	ErrHandlerAlreadyRegistered = ewrap.New("handler already registered")
	// ErrHandlerPayloadTypeMismatch indicates a payload type mismatch during handler invocation.
	ErrHandlerPayloadTypeMismatch = ewrap.New("handler payload type mismatch")
	// ErrHandlerRegistryNil indicates a nil handler registry during registration.
	ErrHandlerRegistryNil = ewrap.New("handler registry is nil")
)

Functions

func AddTypedDurableHandler added in v0.1.9

func AddTypedDurableHandler[T proto.Message](r *TypedDurableRegistry, name string, spec TypedDurableHandlerSpec[T]) error

AddTypedDurableHandler registers a typed durable handler spec into the registry.

func AddTypedHandler added in v0.1.9

func AddTypedHandler[T protoreflect.ProtoMessage](r *TypedHandlerRegistry, name string, spec TypedHandlerSpec[T]) error

AddTypedHandler registers a typed handler spec into the registry.

func RegisterMiddleware

func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T

RegisterMiddleware registers middlewares to the provided service.

Types

type DurableBackend added in v0.1.8

type DurableBackend interface {
	Enqueue(ctx context.Context, task DurableTask) error
	Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
	Ack(ctx context.Context, lease DurableTaskLease) error
	Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
	Fail(ctx context.Context, lease DurableTaskLease, err error) error
	Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
}

DurableBackend provides persistence and leasing for durable tasks.

type DurableCodec added in v0.1.8

type DurableCodec interface {
	Marshal(msg proto.Message) ([]byte, error)
	Unmarshal(data []byte, msg proto.Message) error
}

DurableCodec marshals and unmarshals durable task payloads.

type DurableHandlerSpec added in v0.1.8

type DurableHandlerSpec struct {
	Make func() proto.Message
	Fn   func(ctx context.Context, payload proto.Message) (any, error)
}

DurableHandlerSpec describes a durable task handler.

func TypedDurableHandler added in v0.1.9

func TypedDurableHandler[T proto.Message](spec TypedDurableHandlerSpec[T]) DurableHandlerSpec

TypedDurableHandler converts a typed spec into the untyped DurableHandlerSpec.

type DurableTask added in v0.1.8

type DurableTask struct {
	ID         uuid.UUID
	Handler    string
	Message    proto.Message
	Payload    []byte
	Priority   int
	RunAt      time.Time
	Queue      string
	Weight     int
	Retries    int
	RetryDelay time.Duration
	Metadata   map[string]string
}

DurableTask represents a task that can be persisted and rehydrated.

type DurableTaskLease added in v0.1.8

type DurableTaskLease struct {
	Task       DurableTask
	LeaseID    string
	Attempts   int
	MaxRetries int
}

DurableTaskLease represents a leased durable task.

type GRPCAuthFunc added in v0.1.6

type GRPCAuthFunc func(ctx context.Context, method string, req any) error

GRPCAuthFunc authorizes a gRPC request before handling it. Return a gRPC status error to control the response code.

type GRPCServer added in v0.1.1

type GRPCServer struct {
	// contains filtered or unexported fields
}

GRPCServer implements the generated WorkerServiceServer interface.

func NewGRPCServer added in v0.1.1

func NewGRPCServer(svc Service, handlers map[string]HandlerSpec, opts ...GRPCServerOption) *GRPCServer

NewGRPCServer creates a new gRPC server backed by the provided Service.

func (*GRPCServer) CancelTask added in v0.1.1

CancelTask cancels an active task by its ID.

func (*GRPCServer) GetTask added in v0.1.1

GetTask returns information about a task by its ID.

func (*GRPCServer) RegisterDurableTasks added in v0.1.8

RegisterDurableTasks registers one or more durable tasks with the underlying service.

func (*GRPCServer) RegisterTasks added in v0.1.1

RegisterTasks registers one or more tasks with the underlying service.

func (*GRPCServer) StreamResults added in v0.1.1

StreamResults streams task results back to the client.

type GRPCServerOption added in v0.1.6

type GRPCServerOption func(*GRPCServer)

GRPCServerOption configures the gRPC server.

func WithGRPCAuth added in v0.1.6

func WithGRPCAuth(auth GRPCAuthFunc) GRPCServerOption

WithGRPCAuth installs an authorization hook for gRPC requests.

type HandlerSpec added in v0.1.1

type HandlerSpec struct {
	// Make returns a zero value of the payload message to unmarshal into.
	Make func() protoreflect.ProtoMessage
	// Fn does the work. Your Task.Execute will call this.
	Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}

HandlerSpec describes a single handler for a gRPC method.

func TypedHandler added in v0.1.9

func TypedHandler[T protoreflect.ProtoMessage](spec TypedHandlerSpec[T]) HandlerSpec

TypedHandler converts a typed spec into the untyped HandlerSpec.

type MetricsSnapshot added in v0.1.0

type MetricsSnapshot struct {
	Scheduled        int64
	Running          int64
	Completed        int64
	Failed           int64
	Cancelled        int64
	Retried          int64
	ResultsDropped   int64
	QueueDepth       int
	TaskLatencyCount int64
	TaskLatencyTotal time.Duration
	TaskLatencyMax   time.Duration
}

MetricsSnapshot represents a snapshot of task metrics.

type Middleware

type Middleware[T any] func(T) T

Middleware describes a generic middleware.

type OTelMetricsOption added in v0.1.6

type OTelMetricsOption func(*otelMetricsConfig)

OTelMetricsOption configures OpenTelemetry metrics.

func WithOTelMeterName added in v0.1.6

func WithOTelMeterName(name string) OTelMetricsOption

WithOTelMeterName overrides the default OTel meter name.

func WithOTelMeterVersion added in v0.1.6

func WithOTelMeterVersion(version string) OTelMetricsOption

WithOTelMeterVersion sets the instrumentation version reported by the meter.

type ProtoDurableCodec added in v0.1.8

type ProtoDurableCodec struct{}

ProtoDurableCodec uses protobuf for serialization.

func (ProtoDurableCodec) Marshal added in v0.1.8

func (ProtoDurableCodec) Marshal(msg proto.Message) ([]byte, error)

Marshal marshals a protobuf message to bytes.

func (ProtoDurableCodec) Unmarshal added in v0.1.8

func (ProtoDurableCodec) Unmarshal(data []byte, msg proto.Message) error

Unmarshal unmarshals bytes into a protobuf message.

type QueueConfigurableBackend added in v0.1.9

type QueueConfigurableBackend interface {
	ConfigureQueues(defaultQueue string, weights map[string]int)
}

QueueConfigurableBackend allows TaskManager to propagate queue configuration to backends.

type RedisDurableBackend added in v0.1.8

type RedisDurableBackend struct {
	// contains filtered or unexported fields
}

RedisDurableBackend implements a durable task backend using Redis.

func NewRedisDurableBackend added in v0.1.8

func NewRedisDurableBackend(client rueidis.Client, opts ...RedisDurableOption) (*RedisDurableBackend, error)

NewRedisDurableBackend creates a new RedisDurableBackend with the given Redis client and options.

func (*RedisDurableBackend) Ack added in v0.1.8

Ack acknowledges the successful processing of a durable task.

func (*RedisDurableBackend) ConfigureQueues added in v0.1.9

func (b *RedisDurableBackend) ConfigureQueues(defaultQueue string, weights map[string]int)

ConfigureQueues applies default queue and weights for durable scheduling.

func (*RedisDurableBackend) Dequeue added in v0.1.8

func (b *RedisDurableBackend) Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)

Dequeue retrieves a batch of durable tasks from the Redis backend with a lease.

func (*RedisDurableBackend) Enqueue added in v0.1.8

func (b *RedisDurableBackend) Enqueue(ctx context.Context, task DurableTask) error

Enqueue adds a new durable task to the Redis backend.

func (*RedisDurableBackend) Extend added in v0.1.8

func (b *RedisDurableBackend) Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error

Extend renews the processing lease for a durable task.

func (*RedisDurableBackend) Fail added in v0.1.8

Fail marks a durable task as failed and moves it to the dead letter queue.

func (*RedisDurableBackend) Nack added in v0.1.8

Nack negatively acknowledges a durable task, making it available for reprocessing after a delay.

type RedisDurableOption added in v0.1.8

type RedisDurableOption func(*RedisDurableBackend)

RedisDurableOption defines an option for configuring RedisDurableBackend.

func WithRedisDurableBatchSize added in v0.1.8

func WithRedisDurableBatchSize(size int) RedisDurableOption

WithRedisDurableBatchSize sets the batch size for dequeuing tasks.

func WithRedisDurableDefaultQueue added in v0.1.9

func WithRedisDurableDefaultQueue(name string) RedisDurableOption

WithRedisDurableDefaultQueue sets the default queue name for durable tasks.

func WithRedisDurablePrefix added in v0.1.8

func WithRedisDurablePrefix(prefix string) RedisDurableOption

WithRedisDurablePrefix sets the key prefix for Redis durable backend.

func WithRedisDurableQueueWeights added in v0.1.9

func WithRedisDurableQueueWeights(weights map[string]int) RedisDurableOption

WithRedisDurableQueueWeights sets queue weights for durable task scheduling.

type Result added in v0.0.4

type Result struct {
	Task   *Task // the task that produced the result
	Result any   // the result of the task
	Error  error // the error returned by the task
}

Result is a task result.

func (*Result) String added in v0.0.4

func (r *Result) String() string

String returns a string representation of the result.

type ResultDropPolicy added in v0.1.6

type ResultDropPolicy uint8

ResultDropPolicy defines how to handle full subscriber buffers.

const (
	// DropNewest drops the new result when the subscriber buffer is full.
	DropNewest ResultDropPolicy = iota
	// DropOldest drops the oldest buffered result to make room for the new one.
	DropOldest
)

type RetentionPolicy added in v0.1.4

type RetentionPolicy struct {
	TTL             time.Duration
	MaxEntries      int
	CleanupInterval time.Duration
}

RetentionPolicy controls how completed tasks are kept in the registry.

type Service

type Service interface {
	// contains filtered or unexported methods
}

Service is an interface for a task manager.

type Task

type Task struct {
	ID          uuid.UUID          `json:"id"`          // ID is the id of the task
	Name        string             `json:"name"`        // Name is the name of the task
	Description string             `json:"description"` // Description is the description of the task
	Priority    int                `json:"priority"`    // Priority is the priority of the task
	RunAt       time.Time          `json:"run_at"`      // RunAt is the earliest time the task should execute
	Queue       string             `json:"queue"`       // Queue is the queue name for scheduling
	Weight      int                `json:"weight"`      // Weight influences scheduling share within a queue
	Execute     TaskFunc           `json:"-"`           // Execute is the function that will be executed by the task
	Ctx         context.Context    `json:"-"`           // Ctx is the context of the task
	CancelFunc  context.CancelFunc `json:"-"`           // CancelFunc is the cancel function of the task

	Retries    int           `json:"retries"`     // Retries is the maximum number of retries for failed tasks
	RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
	// contains filtered or unexported fields
}

Task represents a function that can be executed by the task manager.

Note: access Task state via methods to avoid data races.

func NewTask added in v0.0.7

func NewTask(ctx context.Context, fn TaskFunc) (*Task, error)

NewTask creates a new task with the provided function and context.

func (*Task) CancelledAt added in v0.1.4

func (task *Task) CancelledAt() time.Time

CancelledAt returns the cancellation time if set.

func (*Task) CancelledChan added in v0.0.4

func (task *Task) CancelledChan() <-chan struct{}

CancelledChan returns a channel which gets closed when the task is cancelled.

func (*Task) CompletedAt added in v0.1.4

func (task *Task) CompletedAt() time.Time

CompletedAt returns the completion time if set.

func (*Task) Error added in v0.0.2

func (task *Task) Error() error

Error returns the task error if available.

func (*Task) IsValid added in v0.0.2

func (task *Task) IsValid() (err error)

IsValid returns an error if the task is invalid.

func (*Task) Result added in v0.0.8

func (task *Task) Result() any

Result returns the task result if available.

func (*Task) ShouldSchedule added in v0.0.5

func (task *Task) ShouldSchedule() error

ShouldSchedule returns an error if the task should not be scheduled.

func (*Task) StartedAt added in v0.1.4

func (task *Task) StartedAt() time.Time

StartedAt returns the start time if set.

func (*Task) Status added in v0.0.4

func (task *Task) Status() TaskStatus

Status returns the current task status.

type TaskFunc added in v0.0.2

type TaskFunc func(ctx context.Context, args ...any) (any, error)

TaskFunc signature of `Task` function.

type TaskHooks added in v0.1.5

type TaskHooks struct {
	OnQueued func(task *Task)
	OnStart  func(task *Task)
	OnFinish func(task *Task, status TaskStatus, result any, err error)
	OnRetry  func(task *Task, delay time.Duration, attempt int)
}

TaskHooks defines optional callbacks for task lifecycle events.

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks.

func NewTaskManager

func NewTaskManager(
	ctx context.Context,
	maxWorkers, maxTasks int,
	tasksPerSecond float64,
	timeout, retryDelay time.Duration,
	maxRetries int,
) *TaskManager

NewTaskManager creates a new task manager.

  • ctx is the context for the task manager
  • maxWorkers is the number of workers to start, if <=0, the number of CPUs will be used
  • maxTasks is the maximum number of tasks that can be queued at once, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second; <=0 disables rate limiting The limiter uses a burst size of min(maxWorkers, maxTasks) for deterministic throttling.
  • timeout is the default timeout for tasks, defaults to 5 minutes
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)

func NewTaskManagerWithDefaults added in v0.0.5

func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager

NewTaskManagerWithDefaults creates a new task manager with default values.

  • maxWorkers: runtime.NumCPU()
  • maxTasks: 10
  • tasksPerSecond: 5
  • timeout: 5 minutes
  • retryDelay: 1 second
  • maxRetries: 3

func NewTaskManagerWithOptions added in v0.1.8

func NewTaskManagerWithOptions(ctx context.Context, opts ...TaskManagerOption) *TaskManager

NewTaskManagerWithOptions creates a new task manager using functional options.

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks.

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID) error

CancelTask cancels a task by its ID.

func (*TaskManager) ExecuteTask added in v0.0.2

func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)

ExecuteTask executes a task given its ID and returns the result.

func (*TaskManager) GetActiveTasks added in v0.0.4

func (tm *TaskManager) GetActiveTasks() int

GetActiveTasks returns the number of running tasks.

func (*TaskManager) GetMetrics added in v0.1.0

func (tm *TaskManager) GetMetrics() MetricsSnapshot

GetMetrics returns a snapshot of current metrics.

func (*TaskManager) GetResults

func (tm *TaskManager) GetResults() <-chan Result

GetResults returns a results channel (compatibility shim for legacy API). Use SubscribeResults for explicit unsubscription and buffer control.

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)

GetTask gets a task by its ID.

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []*Task

GetTasks gets all tasks.

func (*TaskManager) IsEmpty added in v0.0.7

func (tm *TaskManager) IsEmpty() bool

IsEmpty checks if the task scheduler queue is empty.

func (*TaskManager) RegisterDurableTask added in v0.1.8

func (tm *TaskManager) RegisterDurableTask(ctx context.Context, task DurableTask) error

RegisterDurableTask registers a durable task into the configured backend.

func (*TaskManager) RegisterDurableTaskAfter added in v0.2.0

func (tm *TaskManager) RegisterDurableTaskAfter(ctx context.Context, task DurableTask, delay time.Duration) error

RegisterDurableTaskAfter registers a durable task to execute after the provided delay.

func (*TaskManager) RegisterDurableTaskAt added in v0.2.0

func (tm *TaskManager) RegisterDurableTaskAt(ctx context.Context, task DurableTask, runAt time.Time) error

RegisterDurableTaskAt registers a durable task to execute at or after the provided time.

func (*TaskManager) RegisterDurableTasks added in v0.1.8

func (tm *TaskManager) RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error

RegisterDurableTasks registers multiple durable tasks.

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error

RegisterTask registers a new task to the task manager.

func (*TaskManager) RegisterTaskAfter added in v0.2.0

func (tm *TaskManager) RegisterTaskAfter(ctx context.Context, task *Task, delay time.Duration) error

RegisterTaskAfter registers a new task to execute after the provided delay.

func (*TaskManager) RegisterTaskAt added in v0.2.0

func (tm *TaskManager) RegisterTaskAt(ctx context.Context, task *Task, runAt time.Time) error

RegisterTaskAt registers a new task to execute at or after the provided time.

func (*TaskManager) RegisterTasks added in v0.0.4

func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error

RegisterTasks registers multiple tasks to the task manager at once.

func (*TaskManager) SetHooks added in v0.1.5

func (tm *TaskManager) SetHooks(hooks TaskHooks)

SetHooks configures callbacks for task lifecycle events.

func (*TaskManager) SetMaxWorkers added in v0.1.0

func (tm *TaskManager) SetMaxWorkers(maxWorkers int)

SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.

func (*TaskManager) SetMeterProvider added in v0.1.6

func (tm *TaskManager) SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error

SetMeterProvider enables OpenTelemetry metrics collection. Passing nil disables it.

func (*TaskManager) SetResultsDropPolicy added in v0.1.6

func (tm *TaskManager) SetResultsDropPolicy(policy ResultDropPolicy)

SetResultsDropPolicy configures how full subscriber buffers are handled.

func (*TaskManager) SetRetentionPolicy added in v0.1.4

func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)

SetRetentionPolicy configures task registry retention.

func (*TaskManager) SetTracer added in v0.1.6

func (tm *TaskManager) SetTracer(tracer TaskTracer)

SetTracer configures the task tracer.

func (*TaskManager) StartWorkers added in v0.0.4

func (tm *TaskManager) StartWorkers(ctx context.Context)

StartWorkers starts the task manager's workers and scheduler (idempotent).

func (*TaskManager) StopGraceful added in v0.1.4

func (tm *TaskManager) StopGraceful(ctx context.Context) error

StopGraceful stops accepting new tasks and waits for completion before stopping workers.

func (*TaskManager) StopNow added in v0.1.4

func (tm *TaskManager) StopNow()

StopNow cancels running tasks and stops workers immediately.

func (*TaskManager) SubscribeResults added in v0.1.4

func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())

SubscribeResults returns a results channel and an unsubscribe function.

func (*TaskManager) Wait added in v0.0.4

func (tm *TaskManager) Wait(ctx context.Context) error

Wait waits for all tasks to complete or context cancellation.

type TaskManagerOption added in v0.1.8

type TaskManagerOption func(*taskManagerConfig)

TaskManagerOption configures a TaskManager.

func WithDefaultQueue added in v0.1.9

func WithDefaultQueue(name string) TaskManagerOption

WithDefaultQueue sets the default queue name for tasks without a queue.

func WithDurableBackend added in v0.1.8

func WithDurableBackend(backend DurableBackend) TaskManagerOption

WithDurableBackend sets the durable backend for the TaskManager.

func WithDurableBatchSize added in v0.1.8

func WithDurableBatchSize(size int) TaskManagerOption

WithDurableBatchSize sets the durable task batch size.

func WithDurableCodec added in v0.1.8

func WithDurableCodec(codec DurableCodec) TaskManagerOption

WithDurableCodec sets the durable codec for the TaskManager.

func WithDurableHandlers added in v0.1.8

func WithDurableHandlers(handlers map[string]DurableHandlerSpec) TaskManagerOption

WithDurableHandlers sets the durable handlers for the TaskManager.

func WithDurableLease added in v0.1.8

func WithDurableLease(lease time.Duration) TaskManagerOption

WithDurableLease sets the durable task lease duration.

func WithDurableLeaseRenewalInterval added in v0.1.8

func WithDurableLeaseRenewalInterval(interval time.Duration) TaskManagerOption

WithDurableLeaseRenewalInterval sets the interval for renewing durable task leases. Set to <= 0 to disable renewal.

func WithDurablePollInterval added in v0.1.8

func WithDurablePollInterval(interval time.Duration) TaskManagerOption

WithDurablePollInterval sets the durable task polling interval.

func WithMaxRetries added in v0.1.8

func WithMaxRetries(n int) TaskManagerOption

WithMaxRetries sets the maximum number of retries for a task.

func WithMaxTasks added in v0.1.8

func WithMaxTasks(n int) TaskManagerOption

WithMaxTasks sets the maximum number of tasks in the queue.

func WithMaxWorkers added in v0.1.8

func WithMaxWorkers(n int) TaskManagerOption

WithMaxWorkers sets the maximum number of workers.

func WithQueueWeights added in v0.1.9

func WithQueueWeights(weights map[string]int) TaskManagerOption

WithQueueWeights sets the queue weight map for named queues.

func WithRetryDelay added in v0.1.8

func WithRetryDelay(delay time.Duration) TaskManagerOption

WithRetryDelay sets the delay between task retries.

func WithTasksPerSecond added in v0.1.8

func WithTasksPerSecond(n float64) TaskManagerOption

WithTasksPerSecond sets the maximum number of tasks to start per second.

func WithTimeout added in v0.1.8

func WithTimeout(timeout time.Duration) TaskManagerOption

WithTimeout sets the task execution timeout.

type TaskSpan added in v0.1.6

type TaskSpan interface {
	End(err error)
}

TaskSpan represents an in-flight task span.

type TaskStatus added in v0.0.4

type TaskStatus uint8

TaskStatus is a value used to represent the task status.

func (TaskStatus) String added in v0.0.4

func (ts TaskStatus) String() string

String returns the string representation of the task status.

type TaskTracer added in v0.1.6

type TaskTracer interface {
	Start(ctx context.Context, task *Task) (context.Context, TaskSpan)
}

TaskTracer provides tracing spans around task execution.

type TypedDurableHandlerSpec added in v0.1.9

type TypedDurableHandlerSpec[T proto.Message] struct {
	Make func() T
	Fn   func(ctx context.Context, payload T) (any, error)
}

TypedDurableHandlerSpec provides a compile-time checked handler signature for durable tasks.

type TypedDurableRegistry added in v0.1.9

type TypedDurableRegistry struct {
	// contains filtered or unexported fields
}

TypedDurableRegistry collects typed durable handlers and exposes the untyped map.

func NewTypedDurableRegistry added in v0.1.9

func NewTypedDurableRegistry() *TypedDurableRegistry

NewTypedDurableRegistry constructs a registry for typed durable handlers.

func (*TypedDurableRegistry) Add added in v0.1.9

Add registers an untyped durable handler spec into the registry.

func (*TypedDurableRegistry) Handlers added in v0.1.9

Handlers returns a defensive copy of the registered durable handler map.

type TypedHandlerRegistry added in v0.1.9

type TypedHandlerRegistry struct {
	// contains filtered or unexported fields
}

TypedHandlerRegistry collects typed gRPC handlers and exposes the untyped map.

func NewTypedHandlerRegistry added in v0.1.9

func NewTypedHandlerRegistry() *TypedHandlerRegistry

NewTypedHandlerRegistry constructs a registry for typed gRPC handlers.

func (*TypedHandlerRegistry) Add added in v0.1.9

func (r *TypedHandlerRegistry) Add(name string, spec HandlerSpec) error

Add registers an untyped handler spec into the registry.

func (*TypedHandlerRegistry) Handlers added in v0.1.9

func (r *TypedHandlerRegistry) Handlers() map[string]HandlerSpec

Handlers returns a defensive copy of the registered handler map.

type TypedHandlerSpec added in v0.1.9

type TypedHandlerSpec[T protoreflect.ProtoMessage] struct {
	Make func() T
	Fn   func(ctx context.Context, payload T) (any, error)
}

TypedHandlerSpec provides a compile-time checked handler signature for gRPC tasks.

Directories

Path Synopsis
__examples
durable_redis command
grpc command
grpc_durable command
manual command
middleware command
multi command
otel_metrics command
otel_tracing command
tracing command
cmd
workerctl command
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL