task

package
v0.260507.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MPL-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound         = errors.New("task: not found")
	ErrNotCancellable   = errors.New("task: task is not in a cancellable state")
	ErrHandlerNotFound  = errors.New("task: no handler registered for type")
	ErrManagerStopped   = errors.New("task: manager is not running")
	ErrDuplicateHandler = errors.New("task: handler already registered for type")
)

Functions

func BackoffFor

func BackoffFor(err error, attempt int) time.Duration

BackoffFor returns the delay before the next attempt. Falls back to exponential backoff: 2^attempt * 5s, capped at 10 minutes.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable reports whether err (possibly wrapped) is a RetryableError.

Types

type Controller

type Controller interface {
	// UpdateProgress stores a human-readable progress string on the task.
	UpdateProgress(ctx context.Context, text string) error
	// IsCancelled reports whether the task's context has been cancelled.
	IsCancelled(ctx context.Context) bool
}

Controller is passed to Handler.Run so handlers can report progress and check cancellation without importing the full manager.

type Handler

type Handler interface {
	// Type returns the task type string this handler handles.
	Type() string
	// Run executes the task. Returning a non-nil error causes the task to be
	// failed or retried depending on whether the error is retryable and
	// whether attempts remain. A context.Canceled error marks the task cancelled.
	Run(ctx context.Context, t *Task, ctl Controller) (*TaskResult, error)
}

Handler is implemented by each task type and registered with the Manager.

type ListFilter

type ListFilter struct {
	OwnerType string
	OwnerID   string
	Type      string
	Status    []TaskStatus
	Limit     int
	Offset    int
}

ListFilter restricts the result set of Manager.List.

type Manager

type Manager interface {
	// Register adds a handler for a task type. May be called before Start.
	Register(handler Handler) error

	// Submit creates a new task and persists it. The scheduler picks it up
	// within one poll interval (5s by default). Submit is durable: if the
	// process crashes after Submit returns, the task survives in the DB.
	Submit(ctx context.Context, req SubmitRequest) (*Task, error)

	// Cancel transitions a task to cancelled. For running tasks it signals
	// the handler's context; the handler is responsible for returning promptly.
	Cancel(ctx context.Context, taskID string, reason string) error

	// Get returns the current state of a task. Returns ErrNotFound if absent.
	Get(ctx context.Context, taskID string) (*Task, error)

	// List returns tasks matching the filter, ordered by created_at DESC.
	List(ctx context.Context, filter ListFilter) ([]Task, error)

	// Wait polls until taskID reaches a terminal status or ctx is cancelled.
	// Useful for callers (e.g. HTTP handlers) that need a synchronous result.
	Wait(ctx context.Context, taskID string) (*Task, error)

	// Start runs restart recovery and launches the scheduler goroutine.
	Start(ctx context.Context) error

	// Stop signals the scheduler to stop. Running tasks are not interrupted.
	Stop(ctx context.Context) error
}

Manager is the public interface for the task management system.

func NewManager

func NewManager(store Store, opts ...ManagerOption) Manager

NewManager constructs a Manager backed by the provided Store.

type ManagerOption

type ManagerOption func(*taskManager)

ManagerOption configures a Manager at construction time.

func WithPollInterval

func WithPollInterval(d time.Duration) ManagerOption

WithPollInterval overrides the scheduler polling interval. Useful in tests; default is 5s.

func WithWaitInterval

func WithWaitInterval(d time.Duration) ManagerOption

WithWaitInterval overrides the Wait() polling interval. Useful in tests; default is 50ms.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a thread-safe in-memory implementation of Store. Intended for tests; do not use in production.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore returns an empty MemoryStore.

func (*MemoryStore) Create

func (s *MemoryStore) Create(_ context.Context, t *Task) error

func (*MemoryStore) FindDueTasks

func (s *MemoryStore) FindDueTasks(_ context.Context, now time.Time, limit int) ([]Task, error)

func (*MemoryStore) FindQueuedForKey

func (s *MemoryStore) FindQueuedForKey(_ context.Context, key string) (*Task, error)

func (*MemoryStore) Get

func (s *MemoryStore) Get(_ context.Context, taskID string) (*Task, error)

func (*MemoryStore) List

func (s *MemoryStore) List(_ context.Context, filter ListFilter) ([]Task, error)

func (*MemoryStore) MarkInterruptedOnStartup

func (s *MemoryStore) MarkInterruptedOnStartup(_ context.Context) error

func (*MemoryStore) Update

func (s *MemoryStore) Update(_ context.Context, t *Task) error

func (*MemoryStore) UpdateStatus

func (s *MemoryStore) UpdateStatus(_ context.Context, taskID string, fields map[string]interface{}) error

type RetryableError

type RetryableError struct {
	Cause   error
	Backoff time.Duration // 0 = use default exponential backoff
}

RetryableError wraps an error to signal that it is transient and the task should be rescheduled if attempts remain.

func (*RetryableError) Error

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap

func (e *RetryableError) Unwrap() error

type Store

type Store interface {
	// Create persists a new task. The task must have a non-empty ID.
	Create(ctx context.Context, t *Task) error

	// Get retrieves a task by its ID. Returns ErrNotFound if absent.
	Get(ctx context.Context, taskID string) (*Task, error)

	// Update overwrites all fields of an existing task record.
	Update(ctx context.Context, t *Task) error

	// List returns tasks matching filter, ordered by created_at DESC.
	List(ctx context.Context, filter ListFilter) ([]Task, error)

	// MarkInterruptedOnStartup transitions stale in-progress rows atomically:
	//   running → interrupted
	//   queued  → pending (so the scheduler rebuilds order from DB)
	// Called once during Manager.Start before the scheduler loop begins.
	MarkInterruptedOnStartup(ctx context.Context) error

	// FindDueTasks returns up to limit pending tasks whose scheduled_at is
	// NULL or <= now, ordered by created_at ASC (oldest first).
	FindDueTasks(ctx context.Context, now time.Time, limit int) ([]Task, error)

	// FindQueuedForKey returns the oldest queued task for a serialization key,
	// or nil if none exist.
	FindQueuedForKey(ctx context.Context, key string) (*Task, error)

	// UpdateStatus applies a partial update to the named task using the
	// provided column→value map. Only the listed columns are written;
	// other columns are left unchanged. This prevents concurrent goroutines
	// from stomping each other's fields.
	UpdateStatus(ctx context.Context, taskID string, fields map[string]interface{}) error
}

Store is the persistence interface for the task manager. The concrete implementation lives in internal/data/db.

type SubmitRequest

type SubmitRequest struct {
	Type             string
	OwnerType        string
	OwnerID          string
	Source           string
	SerializationKey string
	Payload          json.RawMessage
	MaxAttempts      int        // defaults to 1 if zero or negative
	ScheduledAt      *time.Time // nil = run as soon as possible
}

SubmitRequest describes the parameters for creating a new task.

type Task

type Task struct {
	ID               string
	Type             string
	Status           TaskStatus
	OwnerType        string
	OwnerID          string
	Source           string
	SerializationKey string

	Payload json.RawMessage
	Result  json.RawMessage

	Progress string
	Error    string

	Attempt     int
	MaxAttempts int

	ScheduledAt *time.Time
	StartedAt   *time.Time
	FinishedAt  *time.Time
	CancelledAt *time.Time
	CreatedAt   time.Time
	UpdatedAt   time.Time

	// Recurrence holds cron spec JSON. Reserved for Phase 4; nil in Phase 1.
	Recurrence json.RawMessage
	// ParentTaskID links a recurring child instance to its recurrence parent.
	ParentTaskID string
}

Task is the domain representation of a unit of persistent work.

type TaskResult

type TaskResult struct {
	Result json.RawMessage
}

TaskResult is returned by Handler.Run on success.

type TaskStatus

type TaskStatus string

TaskStatus represents the lifecycle state of a task.

const (
	StatusPending     TaskStatus = "pending"
	StatusQueued      TaskStatus = "queued"
	StatusRunning     TaskStatus = "running"
	StatusSucceeded   TaskStatus = "succeeded"
	StatusFailed      TaskStatus = "failed"
	StatusCancelled   TaskStatus = "cancelled"
	StatusInterrupted TaskStatus = "interrupted"
)

func (TaskStatus) IsTerminal

func (s TaskStatus) IsTerminal() bool

IsTerminal reports whether the status is a terminal (non-actionable) state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL