tasks

package
v0.42.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package tasks implements a durable task execution plugin for Base.

It provides persistent task queues with DAG-based workflow scheduling, priority-based work-stealing, retry support, and timeout management, all backed by Base's embedded SQLite.

The plugin uses two internal collections:

  • _tasks: individual work items with state machine lifecycle
  • _workflows: ordered task DAGs (pipeline or fan-out)

Index

Constants

View Source
const (
	// StoreKey is the app.Store() key where the task store is registered.
	StoreKey = "tasks.store"
	// DurableKey is the app.Store() key where the durable store is registered.
	DurableKey = "tasks.durable"
)
View Source
const (
	// TasksCollection is the internal collection for tasks.
	TasksCollection = "_tasks"
	// WorkflowsCollection is the internal collection for workflows.
	WorkflowsCollection = "_workflows"
)
View Source
const (
	SignalClaim    = "claim"
	SignalComplete = "complete"
	SignalFail     = "fail"
	SignalProgress = "progress"
	SignalUpdate   = "update"
	SignalCancel   = "cancel"
)

Signal names used by handlers to control task workflows.

Variables

View Source
var (
	ErrTaskNotFound      = errors.New("task not found")
	ErrWorkflowNotFound  = errors.New("workflow not found")
	ErrInvalidTransition = errors.New("invalid state transition")
	ErrAlreadyClaimed    = errors.New("task already claimed")
)

Functions

func CanTransition

func CanTransition(from, to TaskState) bool

CanTransition reports whether moving from one state to another is allowed.

func MustRegister

func MustRegister(app core.App, config Config)

MustRegister registers the tasks plugin and panics on failure.

Example — local only (SQLite scheduler):

tasks.MustRegister(app, tasks.Config{})

Example — with durable execution (tasks.hanzo.ai):

tasks.MustRegister(app, tasks.Config{
	Durable: tasks.DurableConfig{
		Enabled:   true,
		Address:   "tasks.hanzo.ai:7233",
		Namespace: "org-acme",  // multi-tenant: org ID
	},
})

Example — with env vars:

TASKS_ENABLED=true TASKS_NAMESPACE=org-acme ./myapp serve

func Register

func Register(app core.App, config Config) error

Register registers the tasks plugin in the provided app instance.

func SetTaskExecutor

func SetTaskExecutor(fn func(ctx context.Context, task *Task) (*Task, error))

SetTaskExecutor registers the function called by Temporal activities. This is typically set during plugin registration via Config.OnDurableExecute.

Types

type Config

type Config struct {
	// OnExecute is called to run a task locally. If nil, tasks must be
	// completed externally via the API (work-stealing pattern).
	OnExecute ExecuteFunc

	// OnDurableExecute is called by Temporal activity workers. If nil,
	// durable tasks complete with an acknowledgment message.
	OnDurableExecute DurableExecuteFunc

	// Durable configures Temporal-backed durable execution.
	// When enabled, task creates are also submitted as Temporal workflows
	// for crash-safe execution. SQLite remains the authoritative state.
	Durable DurableConfig

	// PollInterval controls scheduler tick frequency. Default 2s.
	PollInterval time.Duration

	// MaxConcurrent limits concurrent auto-executions. Default 10.
	MaxConcurrent int
}

Config defines the tasks plugin configuration.

type DurableConfig

type DurableConfig struct {
	// Enabled activates durable execution. Default false (SQLite-only mode).
	Enabled bool `json:"enabled" yaml:"enabled"`

	// Address is the Temporal frontend address. Default "tasks.hanzo.ai:7233".
	Address string `json:"address" yaml:"address"`

	// Namespace is the Temporal namespace. For multi-tenant, use org ID.
	// Default "default".
	Namespace string `json:"namespace" yaml:"namespace"`

	// DefaultQueue is the task queue name. Default "default".
	DefaultQueue string `json:"default_queue" yaml:"default_queue"`

	// RunWorker starts an embedded Temporal worker in this process.
	// If false, tasks are submitted but executed by external workers.
	// Default true when enabled.
	RunWorker bool `json:"run_worker" yaml:"run_worker"`
}

DurableConfig holds connection settings for durable task execution. When enabled, tasks are submitted as Temporal workflows for crash-safe execution. Supports both local Temporal (localhost:7233) and cloud (tasks.hanzo.ai).

func DefaultDurableConfig

func DefaultDurableConfig() DurableConfig

DefaultDurableConfig returns production defaults with env var overrides.

Env vars:

  • TASKS_ENABLED (or HANZO_TASKS_ENABLED): "true" to enable
  • TASKS_ADDRESS (or HANZO_TASKS_ADDRESS): Temporal frontend address
  • TASKS_NAMESPACE (or HANZO_TASKS_NAMESPACE): Temporal namespace (org ID for multi-tenant)
  • TASKS_QUEUE: default task queue name
  • TASKS_WORKER: "false" to disable embedded worker

type DurableExecuteFunc

type DurableExecuteFunc func(ctx context.Context, task *Task) (*Task, error)

DurableExecuteFunc is called by Temporal activities to execute a task.

type DurableStore

type DurableStore struct {
	Client client.Client // default namespace client
	// contains filtered or unexported fields
}

DurableStore implements durable task execution via Temporal. Supports multi-tenant org isolation via per-namespace client connections.

func GetDurable

func GetDurable(app core.App) *DurableStore

GetDurable retrieves the registered durable store from the app, or nil.

func NewDurableStore

func NewDurableStore(addr, namespace string) (*DurableStore, error)

NewDurableStore connects to the Temporal service.

func (*DurableStore) CancelTask

func (ds *DurableStore) CancelTask(ctx context.Context, taskID string, orgID ...string) error

CancelTask cancels a running workflow.

func (*DurableStore) ClientForOrg added in v0.29.6

func (ds *DurableStore) ClientForOrg(orgID string) (client.Client, error)

ClientForOrg returns a Temporal client scoped to the given org namespace. Connections are cached and protected by a mutex. Falls back to the default client if org is empty or matches the default namespace.

func (*DurableStore) Close

func (ds *DurableStore) Close()

Close shuts down all client connections.

func (*DurableStore) GetNextTask added in v0.29.5

func (ds *DurableStore) GetNextTask(ctx context.Context, spaceID, agentID string, orgID ...string) (*Task, error)

GetNextTask finds the next pending task in a space and claims it for the agent.

func (*DurableStore) GetTaskStatus

func (ds *DurableStore) GetTaskStatus(ctx context.Context, taskID string, orgID ...string) (TaskState, string, error)

GetTaskStatus queries a running workflow for its current state. orgID selects the Temporal namespace. Empty string uses the default.

func (*DurableStore) IsConnected

func (ds *DurableStore) IsConnected() bool

IsConnected reports whether the durable store has an active connection.

func (*DurableStore) ListTasks added in v0.29.5

func (ds *DurableStore) ListTasks(ctx context.Context, spaceID string, orgID ...string) ([]*Task, error)

ListTasks returns tasks in a space by querying workflow visibility.

func (*DurableStore) ListWorkflows added in v0.29.5

func (ds *DurableStore) ListWorkflows(ctx context.Context, spaceID string, orgID ...string) ([]*Workflow, error)

ListWorkflows returns workflows in a space by querying visibility.

func (*DurableStore) SignalTask

func (ds *DurableStore) SignalTask(ctx context.Context, taskID, signalName string, data interface{}, orgID ...string) error

SignalTask sends a signal to a running workflow.

func (*DurableStore) SubmitTask

func (ds *DurableStore) SubmitTask(ctx context.Context, task *Task) error

SubmitTask starts a durable workflow execution for a task. Uses task.OrgID as the Temporal namespace for multi-tenant isolation. Falls back to default namespace if OrgID is empty.

func (*DurableStore) SubmitWorkflow

func (ds *DurableStore) SubmitWorkflow(ctx context.Context, wf *Workflow, tasks []*Task, parallel bool) error

SubmitWorkflow starts a pipeline or fan-out workflow.

type ExecuteFunc

type ExecuteFunc func(task *Task) (map[string]any, error)

ExecuteFunc is called by the local scheduler to execute a claimed task. Return output map on success, or error on failure.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store provides SQLite-backed task persistence using Base collections.

func GetStore

func GetStore(app core.App) *Store

GetStore retrieves the registered task store from the app, or nil.

func NewStore

func NewStore(app core.App) *Store

NewStore creates a new task store.

func (*Store) AdvanceWorkflows

func (s *Store) AdvanceWorkflows() error

AdvanceWorkflows checks non-terminal workflows and updates their state based on constituent task states.

func (*Store) AgentHasActiveTask

func (s *Store) AgentHasActiveTask(agentID string, orgID ...string) (bool, error)

AgentHasActiveTask reports whether the agent has a claimed or running task. orgID scopes the query to a specific org when provided.

func (*Store) CancelTask

func (s *Store) CancelTask(taskID string, orgID ...string) error

CancelTask transitions any non-terminal task to cancelled. orgID scopes the mutation to a specific org when provided.

func (*Store) CheckTimeouts

func (s *Store) CheckTimeouts() error

CheckTimeouts fails or retries tasks that have exceeded their timeout.

func (*Store) ClaimTask

func (s *Store) ClaimTask(taskID, agentID string, orgID ...string) error

ClaimTask atomically transitions a task from pending to claimed. orgID scopes the mutation to a specific org when provided.

func (*Store) CompleteTask

func (s *Store) CompleteTask(taskID string, output map[string]any, orgID ...string) error

CompleteTask transitions a running task to completed with output. orgID scopes the mutation to a specific org when provided.

func (*Store) CreateTask

func (s *Store) CreateTask(task *Task) error

CreateTask persists a new task. ID is auto-generated if empty.

func (*Store) CreateWorkflow

func (s *Store) CreateWorkflow(wf *Workflow) error

CreateWorkflow persists a new workflow.

func (*Store) FailTask

func (s *Store) FailTask(taskID string, errMsg string, orgID ...string) error

FailTask transitions a running task to failed. If retries remain, re-queues as pending. Uses a single atomic SQL with CASE to avoid TOCTOU races. orgID scopes the mutation to a specific org when provided.

func (*Store) GetNextPendingTask

func (s *Store) GetNextPendingTask(spaceID, agentID string, orgID ...string) (*Task, error)

GetNextPendingTask finds and atomically claims the highest-priority pending task in the given space whose dependencies are all completed. orgID scopes the query to a specific org when provided.

func (*Store) GetTask

func (s *Store) GetTask(id string, orgID ...string) (*Task, error)

GetTask retrieves a task by ID. If orgID is provided, verifies the task belongs to that org.

func (*Store) GetWorkflow

func (s *Store) GetWorkflow(id string, orgID ...string) (*Workflow, error)

GetWorkflow retrieves a workflow by ID. If orgID is provided, verifies the workflow belongs to that org.

func (*Store) ListTasks

func (s *Store) ListTasks(filters TaskFilters) ([]*Task, error)

ListTasks returns filtered tasks, sorted by priority DESC then created ASC.

func (*Store) ListWorkflows

func (s *Store) ListWorkflows(spaceID string, orgID ...string) ([]*Workflow, error)

ListWorkflows returns workflows for a space, optionally scoped to an org.

func (*Store) StartTask

func (s *Store) StartTask(taskID string, orgID ...string) error

StartTask transitions a claimed (or pending) task to running. orgID scopes the mutation to a specific org when provided.

func (*Store) UpdateProgress

func (s *Store) UpdateProgress(taskID string, progress int, orgID ...string) error

UpdateProgress sets progress (0-100) on a running task. orgID scopes the mutation to a specific org when provided.

func (*Store) UpdateTask

func (s *Store) UpdateTask(task *Task) error

UpdateTask patches mutable fields on an existing task.

func (*Store) UpdateWorkflowTasks

func (s *Store) UpdateWorkflowTasks(wf *Workflow) error

UpdateWorkflowTasks updates the task ID list on an existing workflow. When the workflow has an OrgID, the update is scoped to that org.

type Task

type Task struct {
	ID           string            `json:"id"`
	OrgID        string            `json:"org_id,omitempty"` // IAM org — maps to Temporal namespace
	SpaceID      string            `json:"space_id"`
	Title        string            `json:"title"`
	Description  string            `json:"description,omitempty"`
	State        TaskState         `json:"state"`
	Priority     TaskPriority      `json:"priority"`
	AssignedTo   string            `json:"assigned_to,omitempty"`
	CreatedBy    string            `json:"created_by,omitempty"`
	WorkflowID   string            `json:"workflow_id,omitempty"`
	ParentTaskID string            `json:"parent_task_id,omitempty"`
	DependsOn    []string          `json:"depends_on,omitempty"`
	Labels       []string          `json:"labels,omitempty"`
	Input        map[string]any    `json:"input,omitempty"`
	Output       map[string]any    `json:"output,omitempty"`
	Error        string            `json:"error,omitempty"`
	Progress     int               `json:"progress"`
	MaxRetries   int               `json:"max_retries"`
	RetryCount   int               `json:"retry_count"`
	Timeout      time.Duration     `json:"timeout,omitempty"`
	CreatedAt    time.Time         `json:"created_at"`
	UpdatedAt    time.Time         `json:"updated_at"`
	StartedAt    *time.Time        `json:"started_at,omitempty"`
	CompletedAt  *time.Time        `json:"completed_at,omitempty"`
	Metadata     map[string]string `json:"metadata,omitempty"`
}

Task is a durable work item.

func AgentTaskWorkflow

func AgentTaskWorkflow(ctx workflow.Context, task *Task) (*Task, error)

AgentTaskWorkflow is the primary durable workflow for executing a single task. It supports two modes:

  1. Auto-execute: if OnDurableExecute is configured, runs the activity immediately.
  2. Signal-driven: waits for external signals (claim/complete/fail) from handlers. This is the human-in-the-loop / agent-in-the-loop pattern.

func ExecuteTaskActivity

func ExecuteTaskActivity(ctx context.Context, task *Task) (*Task, error)

ExecuteTaskActivity is the activity implementation called by Temporal workflows.

type TaskFilters

type TaskFilters struct {
	OrgID      string        `json:"org_id,omitempty"`
	SpaceID    string        `json:"space_id,omitempty"`
	State      *TaskState    `json:"state,omitempty"`
	AssignedTo *string       `json:"assigned_to,omitempty"`
	Priority   *TaskPriority `json:"priority,omitempty"`
	Labels     []string      `json:"labels,omitempty"`
	WorkflowID *string       `json:"workflow_id,omitempty"`
	Limit      int           `json:"limit,omitempty"`
	Offset     int           `json:"offset,omitempty"`
}

TaskFilters controls listing and searching.

type TaskPriority

type TaskPriority int

TaskPriority controls scheduling order. Higher values are scheduled first.

const (
	PriorityLow    TaskPriority = 0
	PriorityNormal TaskPriority = 1
	PriorityHigh   TaskPriority = 2
	PriorityUrgent TaskPriority = 3
)

type TaskState

type TaskState string

TaskState represents the lifecycle state of a task.

const (
	TaskPending   TaskState = "pending"
	TaskClaimed   TaskState = "claimed"
	TaskRunning   TaskState = "running"
	TaskCompleted TaskState = "completed"
	TaskFailed    TaskState = "failed"
	TaskCancelled TaskState = "cancelled"
	TaskRetrying  TaskState = "retrying"
)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker runs an embedded Temporal worker that polls a task queue and executes workflows and activities in-process.

func NewWorker

func NewWorker(c client.Client, queue string) *Worker

NewWorker creates a worker for a specific task queue. For multi-tenant deployments, each org/space can have its own queue.

func (*Worker) Start

func (tw *Worker) Start() error

Start begins polling the task queue.

func (*Worker) Stop

func (tw *Worker) Stop()

Stop gracefully shuts down the worker.

type Workflow

type Workflow struct {
	ID          string            `json:"id"`
	OrgID       string            `json:"org_id,omitempty"` // IAM org — maps to Temporal namespace
	SpaceID     string            `json:"space_id"`
	Name        string            `json:"name"`
	Description string            `json:"description,omitempty"`
	State       TaskState         `json:"state"`
	Tasks       []string          `json:"tasks"`
	CreatedBy   string            `json:"created_by,omitempty"`
	CreatedAt   time.Time         `json:"created_at"`
	UpdatedAt   time.Time         `json:"updated_at"`
	CompletedAt *time.Time        `json:"completed_at,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

Workflow chains tasks into a DAG.

func FanOutWorkflow

func FanOutWorkflow(ctx workflow.Context, wf *Workflow, tasks []*Task) (*Workflow, error)

FanOutWorkflow runs tasks in parallel and waits for all to complete.

func PipelineWorkflow

func PipelineWorkflow(ctx workflow.Context, wf *Workflow, tasks []*Task) (*Workflow, error)

PipelineWorkflow runs tasks sequentially — each waits for the previous to complete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL