Documentation
¶
Overview ¶
Package tasks implements a durable task execution plugin for Base.
It provides persistent task queues with DAG-based workflow scheduling, priority-based work-stealing, retry support, and timeout management, all backed by Base's embedded SQLite.
The plugin uses two internal collections:
- _tasks: individual work items with state machine lifecycle
- _workflows: ordered task DAGs (pipeline or fan-out)
Index ¶
- Constants
- Variables
- func CanTransition(from, to TaskState) bool
- func MustRegister(app core.App, config Config)
- func Register(app core.App, config Config) error
- func SetTaskExecutor(fn func(ctx context.Context, task *Task) (*Task, error))
- type Config
- type DurableConfig
- type DurableExecuteFunc
- type DurableStore
- func (ds *DurableStore) CancelTask(ctx context.Context, taskID string) error
- func (ds *DurableStore) Close()
- func (ds *DurableStore) GetTaskStatus(ctx context.Context, taskID string) (TaskState, string, error)
- func (ds *DurableStore) IsConnected() bool
- func (ds *DurableStore) SignalTask(ctx context.Context, taskID, signalName string, data interface{}) error
- func (ds *DurableStore) SubmitTask(ctx context.Context, task *Task) error
- func (ds *DurableStore) SubmitWorkflow(ctx context.Context, wf *Workflow, tasks []*Task, parallel bool) error
- type ExecuteFunc
- type Store
- func (s *Store) AdvanceWorkflows() error
- func (s *Store) AgentHasActiveTask(agentID string) (bool, error)
- func (s *Store) CancelTask(taskID string) error
- func (s *Store) CheckTimeouts() error
- func (s *Store) ClaimTask(taskID, agentID string) error
- func (s *Store) CompleteTask(taskID string, output map[string]any) error
- func (s *Store) CreateTask(task *Task) error
- func (s *Store) CreateWorkflow(wf *Workflow) error
- func (s *Store) FailTask(taskID string, errMsg string) error
- func (s *Store) GetNextPendingTask(spaceID, agentID string) (*Task, error)
- func (s *Store) GetTask(id string) (*Task, error)
- func (s *Store) GetWorkflow(id string) (*Workflow, error)
- func (s *Store) ListTasks(filters TaskFilters) ([]*Task, error)
- func (s *Store) ListWorkflows(spaceID string) ([]*Workflow, error)
- func (s *Store) StartTask(taskID string) error
- func (s *Store) UpdateProgress(taskID string, progress int) error
- func (s *Store) UpdateTask(task *Task) error
- func (s *Store) UpdateWorkflowTasks(wf *Workflow) error
- type Task
- type TaskFilters
- type TaskPriority
- type TaskState
- type Worker
- type Workflow
Constants ¶
const ( // StoreKey is the app.Store() key where the task store is registered. StoreKey = "tasks.store" // DurableKey is the app.Store() key where the durable store is registered. DurableKey = "tasks.durable" )
const ( // TasksCollection is the internal collection for tasks. TasksCollection = "_tasks" // WorkflowsCollection is the internal collection for workflows. WorkflowsCollection = "_workflows" )
Variables ¶
Functions ¶
func CanTransition ¶
CanTransition reports whether moving from one state to another is allowed.
func MustRegister ¶
MustRegister registers the tasks plugin and panics on failure.
Example — local only (SQLite scheduler):
tasks.MustRegister(app, tasks.Config{})
Example — with durable execution (tasks.hanzo.ai):
tasks.MustRegister(app, tasks.Config{
Durable: tasks.DurableConfig{
Enabled: true,
Address: "tasks.hanzo.ai:7233",
Namespace: "org-acme", // multi-tenant: org ID
},
})
Example — with env vars:
TASKS_ENABLED=true TASKS_NAMESPACE=org-acme ./myapp serve
Types ¶
type Config ¶
type Config struct {
// OnExecute is called to run a task locally. If nil, tasks must be
// completed externally via the API (work-stealing pattern).
OnExecute ExecuteFunc
// OnDurableExecute is called by Temporal activity workers. If nil,
// durable tasks complete with an acknowledgment message.
OnDurableExecute DurableExecuteFunc
// Durable configures Temporal-backed durable execution.
// When enabled, task creates are also submitted as Temporal workflows
// for crash-safe execution. SQLite remains the authoritative state.
Durable DurableConfig
// PollInterval controls scheduler tick frequency. Default 2s.
PollInterval time.Duration
// MaxConcurrent limits concurrent auto-executions. Default 10.
MaxConcurrent int
}
Config defines the tasks plugin configuration.
type DurableConfig ¶
type DurableConfig struct {
// Enabled activates durable execution. Default false (SQLite-only mode).
Enabled bool `json:"enabled" yaml:"enabled"`
// Address is the Temporal frontend address. Default "tasks.hanzo.ai:7233".
Address string `json:"address" yaml:"address"`
// Namespace is the Temporal namespace. For multi-tenant, use org ID.
// Default "default".
Namespace string `json:"namespace" yaml:"namespace"`
// DefaultQueue is the task queue name. Default "default".
DefaultQueue string `json:"default_queue" yaml:"default_queue"`
// RunWorker starts an embedded Temporal worker in this process.
// If false, tasks are submitted but executed by external workers.
// Default true when enabled.
RunWorker bool `json:"run_worker" yaml:"run_worker"`
}
DurableConfig holds connection settings for durable task execution. When enabled, tasks are submitted as Temporal workflows for crash-safe execution. Supports both local Temporal (localhost:7233) and cloud (tasks.hanzo.ai).
func DefaultDurableConfig ¶
func DefaultDurableConfig() DurableConfig
DefaultDurableConfig returns production defaults with env var overrides.
Env vars:
- TASKS_ENABLED (or HANZO_TASKS_ENABLED): "true" to enable
- TASKS_ADDRESS (or HANZO_TASKS_ADDRESS): Temporal frontend address
- TASKS_NAMESPACE (or HANZO_TASKS_NAMESPACE): Temporal namespace (org ID for multi-tenant)
- TASKS_QUEUE: default task queue name
- TASKS_WORKER: "false" to disable embedded worker
type DurableExecuteFunc ¶
DurableExecuteFunc is called by Temporal activities to execute a task.
type DurableStore ¶
DurableStore implements durable task execution via Temporal. Provides submit/cancel/signal/status for workflows that survive process restarts.
func GetDurable ¶
func GetDurable(app core.App) *DurableStore
GetDurable retrieves the registered durable store from the app, or nil.
func NewDurableStore ¶
func NewDurableStore(addr, namespace string) (*DurableStore, error)
NewDurableStore connects to the Temporal service.
func (*DurableStore) CancelTask ¶
func (ds *DurableStore) CancelTask(ctx context.Context, taskID string) error
CancelTask cancels a running workflow.
func (*DurableStore) Close ¶
func (ds *DurableStore) Close()
Close shuts down the client connection.
func (*DurableStore) GetTaskStatus ¶
func (ds *DurableStore) GetTaskStatus(ctx context.Context, taskID string) (TaskState, string, error)
GetTaskStatus queries a running workflow for its current state.
func (*DurableStore) IsConnected ¶
func (ds *DurableStore) IsConnected() bool
IsConnected reports whether the durable store has an active connection.
func (*DurableStore) SignalTask ¶
func (ds *DurableStore) SignalTask(ctx context.Context, taskID, signalName string, data interface{}) error
SignalTask sends a signal to a running workflow.
func (*DurableStore) SubmitTask ¶
func (ds *DurableStore) SubmitTask(ctx context.Context, task *Task) error
SubmitTask starts a durable workflow execution for a task. The task queue defaults to task.SpaceID (org-as-namespace for multi-tenant).
func (*DurableStore) SubmitWorkflow ¶
func (ds *DurableStore) SubmitWorkflow(ctx context.Context, wf *Workflow, tasks []*Task, parallel bool) error
SubmitWorkflow starts a pipeline or fan-out workflow.
type ExecuteFunc ¶
ExecuteFunc is called by the local scheduler to execute a claimed task. Return output map on success, or error on failure.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store provides SQLite-backed task persistence using Base collections.
func (*Store) AdvanceWorkflows ¶
AdvanceWorkflows checks non-terminal workflows and updates their state based on constituent task states.
func (*Store) AgentHasActiveTask ¶
AgentHasActiveTask reports whether the agent has a claimed or running task.
func (*Store) CancelTask ¶
CancelTask transitions any non-terminal task to cancelled.
func (*Store) CheckTimeouts ¶
CheckTimeouts fails or retries tasks that have exceeded their timeout.
func (*Store) CompleteTask ¶
CompleteTask transitions a running task to completed with output.
func (*Store) CreateTask ¶
CreateTask persists a new task. ID is auto-generated if empty.
func (*Store) CreateWorkflow ¶
CreateWorkflow persists a new workflow.
func (*Store) FailTask ¶
FailTask transitions a running task to failed. If retries remain, re-queues as pending.
func (*Store) GetNextPendingTask ¶
GetNextPendingTask finds and atomically claims the highest-priority pending task in the given space whose dependencies are all completed.
func (*Store) GetWorkflow ¶
GetWorkflow retrieves a workflow by ID.
func (*Store) ListTasks ¶
func (s *Store) ListTasks(filters TaskFilters) ([]*Task, error)
ListTasks returns filtered tasks, sorted by priority DESC then created ASC.
func (*Store) ListWorkflows ¶
ListWorkflows returns workflows for a space.
func (*Store) UpdateProgress ¶
UpdateProgress sets progress (0-100) on a running task.
func (*Store) UpdateTask ¶
UpdateTask patches mutable fields on an existing task.
func (*Store) UpdateWorkflowTasks ¶
UpdateWorkflowTasks updates the task ID list on an existing workflow.
type Task ¶
type Task struct {
ID string `json:"id"`
SpaceID string `json:"space_id"`
Title string `json:"title"`
Description string `json:"description,omitempty"`
State TaskState `json:"state"`
Priority TaskPriority `json:"priority"`
AssignedTo string `json:"assigned_to,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
WorkflowID string `json:"workflow_id,omitempty"`
ParentTaskID string `json:"parent_task_id,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
Labels []string `json:"labels,omitempty"`
Input map[string]any `json:"input,omitempty"`
Output map[string]any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Progress int `json:"progress"`
MaxRetries int `json:"max_retries"`
RetryCount int `json:"retry_count"`
Timeout time.Duration `json:"timeout,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Task is a durable work item.
func AgentTaskWorkflow ¶
AgentTaskWorkflow is the primary durable workflow for executing a single task. Survives process crashes and restarts via Temporal.
type TaskFilters ¶
type TaskFilters struct {
SpaceID string `json:"space_id,omitempty"`
State *TaskState `json:"state,omitempty"`
AssignedTo *string `json:"assigned_to,omitempty"`
Priority *TaskPriority `json:"priority,omitempty"`
Labels []string `json:"labels,omitempty"`
WorkflowID *string `json:"workflow_id,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
TaskFilters controls listing and searching.
type TaskPriority ¶
type TaskPriority int
TaskPriority controls scheduling order. Higher values are scheduled first.
const ( PriorityLow TaskPriority = 0 PriorityNormal TaskPriority = 1 PriorityHigh TaskPriority = 2 PriorityUrgent TaskPriority = 3 )
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker runs an embedded Temporal worker that polls a task queue and executes workflows and activities in-process.
type Workflow ¶
type Workflow struct {
ID string `json:"id"`
SpaceID string `json:"space_id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
State TaskState `json:"state"`
Tasks []string `json:"tasks"`
CreatedBy string `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Workflow chains tasks into a DAG.
func FanOutWorkflow ¶
FanOutWorkflow runs tasks in parallel and waits for all to complete.