Documentation
¶
Index ¶
- Variables
- func BackoffFor(err error, attempt int) time.Duration
- func IsRetryable(err error) bool
- type Controller
- type Handler
- type ListFilter
- type Manager
- type ManagerOption
- type MemoryStore
- func (s *MemoryStore) Create(_ context.Context, t *Task) error
- func (s *MemoryStore) FindDueTasks(_ context.Context, now time.Time, limit int) ([]Task, error)
- func (s *MemoryStore) FindQueuedForKey(_ context.Context, key string) (*Task, error)
- func (s *MemoryStore) Get(_ context.Context, taskID string) (*Task, error)
- func (s *MemoryStore) List(_ context.Context, filter ListFilter) ([]Task, error)
- func (s *MemoryStore) MarkInterruptedOnStartup(_ context.Context) error
- func (s *MemoryStore) Update(_ context.Context, t *Task) error
- func (s *MemoryStore) UpdateStatus(_ context.Context, taskID string, fields map[string]interface{}) error
- type RetryableError
- type Store
- type SubmitRequest
- type Task
- type TaskResult
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("task: not found") ErrNotCancellable = errors.New("task: task is not in a cancellable state") ErrHandlerNotFound = errors.New("task: no handler registered for type") ErrManagerStopped = errors.New("task: manager is not running") ErrDuplicateHandler = errors.New("task: handler already registered for type") )
Functions ¶
func BackoffFor ¶
BackoffFor returns the delay before the next attempt. Falls back to exponential backoff: 2^attempt * 5s, capped at 10 minutes.
func IsRetryable ¶
IsRetryable reports whether err (possibly wrapped) is a RetryableError.
Types ¶
type Controller ¶
type Controller interface {
// UpdateProgress stores a human-readable progress string on the task.
UpdateProgress(ctx context.Context, text string) error
// IsCancelled reports whether the task's context has been cancelled.
IsCancelled(ctx context.Context) bool
}
Controller is passed to Handler.Run so handlers can report progress and check cancellation without importing the full manager.
type Handler ¶
type Handler interface {
// Type returns the task type string this handler handles.
Type() string
// Run executes the task. Returning a non-nil error causes the task to be
// failed or retried depending on whether the error is retryable and
// whether attempts remain. A context.Canceled error marks the task cancelled.
Run(ctx context.Context, t *Task, ctl Controller) (*TaskResult, error)
}
Handler is implemented by each task type and registered with the Manager.
type ListFilter ¶
type ListFilter struct {
OwnerType string
OwnerID string
Type string
Status []TaskStatus
Limit int
Offset int
}
ListFilter restricts the result set of Manager.List.
type Manager ¶
type Manager interface {
// Register adds a handler for a task type. May be called before Start.
Register(handler Handler) error
// Submit creates a new task and persists it. The scheduler picks it up
// within one poll interval (5s by default). Submit is durable: if the
// process crashes after Submit returns, the task survives in the DB.
Submit(ctx context.Context, req SubmitRequest) (*Task, error)
// Cancel transitions a task to cancelled. For running tasks it signals
// the handler's context; the handler is responsible for returning promptly.
Cancel(ctx context.Context, taskID string, reason string) error
// Get returns the current state of a task. Returns ErrNotFound if absent.
Get(ctx context.Context, taskID string) (*Task, error)
// List returns tasks matching the filter, ordered by created_at DESC.
List(ctx context.Context, filter ListFilter) ([]Task, error)
// Wait polls until taskID reaches a terminal status or ctx is cancelled.
// Useful for callers (e.g. HTTP handlers) that need a synchronous result.
Wait(ctx context.Context, taskID string) (*Task, error)
// Start runs restart recovery and launches the scheduler goroutine.
Start(ctx context.Context) error
// Stop signals the scheduler to stop. Running tasks are not interrupted.
Stop(ctx context.Context) error
}
Manager is the public interface for the task management system.
func NewManager ¶
func NewManager(store Store, opts ...ManagerOption) Manager
NewManager constructs a Manager backed by the provided Store.
type ManagerOption ¶
type ManagerOption func(*taskManager)
ManagerOption configures a Manager at construction time.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) ManagerOption
WithPollInterval overrides the scheduler polling interval. Useful in tests; default is 5s.
func WithWaitInterval ¶
func WithWaitInterval(d time.Duration) ManagerOption
WithWaitInterval overrides the Wait() polling interval. Useful in tests; default is 50ms.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is a thread-safe in-memory implementation of Store. Intended for tests; do not use in production.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore returns an empty MemoryStore.
func (*MemoryStore) FindDueTasks ¶
func (*MemoryStore) FindQueuedForKey ¶
func (*MemoryStore) List ¶
func (s *MemoryStore) List(_ context.Context, filter ListFilter) ([]Task, error)
func (*MemoryStore) MarkInterruptedOnStartup ¶
func (s *MemoryStore) MarkInterruptedOnStartup(_ context.Context) error
func (*MemoryStore) UpdateStatus ¶
type RetryableError ¶
type RetryableError struct {
Cause error
Backoff time.Duration // 0 = use default exponential backoff
}
RetryableError wraps an error to signal that it is transient and the task should be rescheduled if attempts remain.
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
func (*RetryableError) Unwrap ¶
func (e *RetryableError) Unwrap() error
type Store ¶
type Store interface {
// Create persists a new task. The task must have a non-empty ID.
Create(ctx context.Context, t *Task) error
// Get retrieves a task by its ID. Returns ErrNotFound if absent.
Get(ctx context.Context, taskID string) (*Task, error)
// Update overwrites all fields of an existing task record.
Update(ctx context.Context, t *Task) error
// List returns tasks matching filter, ordered by created_at DESC.
List(ctx context.Context, filter ListFilter) ([]Task, error)
// MarkInterruptedOnStartup transitions stale in-progress rows atomically:
// running → interrupted
// queued → pending (so the scheduler rebuilds order from DB)
// Called once during Manager.Start before the scheduler loop begins.
MarkInterruptedOnStartup(ctx context.Context) error
// FindDueTasks returns up to limit pending tasks whose scheduled_at is
// NULL or <= now, ordered by created_at ASC (oldest first).
FindDueTasks(ctx context.Context, now time.Time, limit int) ([]Task, error)
// FindQueuedForKey returns the oldest queued task for a serialization key,
// or nil if none exist.
FindQueuedForKey(ctx context.Context, key string) (*Task, error)
// UpdateStatus applies a partial update to the named task using the
// provided column→value map. Only the listed columns are written;
// other columns are left unchanged. This prevents concurrent goroutines
// from stomping each other's fields.
UpdateStatus(ctx context.Context, taskID string, fields map[string]interface{}) error
}
Store is the persistence interface for the task manager. The concrete implementation lives in internal/data/db.
type SubmitRequest ¶
type SubmitRequest struct {
Type string
OwnerType string
OwnerID string
Source string
SerializationKey string
Payload json.RawMessage
MaxAttempts int // defaults to 1 if zero or negative
ScheduledAt *time.Time // nil = run as soon as possible
}
SubmitRequest describes the parameters for creating a new task.
type Task ¶
type Task struct {
ID string
Type string
Status TaskStatus
OwnerType string
OwnerID string
Source string
SerializationKey string
Payload json.RawMessage
Result json.RawMessage
Progress string
Error string
Attempt int
MaxAttempts int
ScheduledAt *time.Time
StartedAt *time.Time
FinishedAt *time.Time
CancelledAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
// Recurrence holds cron spec JSON. Reserved for Phase 4; nil in Phase 1.
Recurrence json.RawMessage
// ParentTaskID links a recurring child instance to its recurrence parent.
ParentTaskID string
}
Task is the domain representation of a unit of persistent work.
type TaskResult ¶
type TaskResult struct {
Result json.RawMessage
}
TaskResult is returned by Handler.Run on success.
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the lifecycle state of a task.
const ( StatusPending TaskStatus = "pending" StatusQueued TaskStatus = "queued" StatusRunning TaskStatus = "running" StatusSucceeded TaskStatus = "succeeded" StatusFailed TaskStatus = "failed" StatusCancelled TaskStatus = "cancelled" StatusInterrupted TaskStatus = "interrupted" )
func (TaskStatus) IsTerminal ¶
func (s TaskStatus) IsTerminal() bool
IsTerminal reports whether the status is a terminal (non-actionable) state.