Documentation
¶
Overview ¶
Package tasks provides the Hanzo Tasks client for Go applications.
Drop-in replacement for Base's Tasks() / Cron():
// Before (Base cron):
e.App.Tasks().Add("settlement", "*/30 * * * * *", func() { ... })
// After (Hanzo Tasks):
tasks.Default().Add("settlement", "30s", func() { ... })
Add() accepts both Go duration strings ("30s", "5m", "1h", "24h") and standard 5-field cron expressions ("0 3 * * *", "0 0 5 1,4,7,10 *", "*/5 * * * *"). Anything that parses as a Go duration is treated as an interval; anything else is treated as a cron expression.
If TASKS_URL is set, schedules run as durable Hanzo Tasks workflows (retries, dead letter, audit trail). If not, runs locally via goroutine timer (dev mode, same behaviour as cron but no persistence).
If zapAddr is set, ZAP binary transport is preferred over HTTP for submitting tasks (lower latency, same semantics). HTTP is fallback.
Package tasks provides the Hanzo Tasks client for Go applications.
Two methods, two use cases:
client := tasks.New(os.Getenv("TASKS_URL"), os.Getenv("TASKS_ZAP"), nil)
client.Add("settlement.process", "30s", fn) // recurring schedule (duration)
client.Add("audit.archive", "0 3 * * *", fn) // recurring schedule (cron)
client.Now("webhook.deliver", payload) // fire once immediately
Transport priority: ZAP (binary, low-latency) > HTTP > local goroutine. When TASKS_ZAP is set, tasks submit over ZAP binary protocol. When TASKS_URL is set, tasks submit over HTTP as fallback. When neither is set, tasks run locally via goroutine timers (dev mode).
Integration with Hanzo Base:
app.Tasks().Add("cleanup", "1h", fn)
app.Tasks().Now("email.send", payload)
Index ¶
- Constants
- func SetDefault(c *Client)
- type BatchOperation
- type Client
- type Deployment
- type DeploymentBuild
- type EmbedConfig
- type Embedded
- type Event
- type ExecutionRef
- type Handler
- type Identity
- type Namespace
- type NamespaceCfg
- type NamespaceInfo
- type NexusEndpoint
- type Schedule
- type ScheduleAction
- type ScheduleInfo
- type ScheduleSpec
- type ScheduleState
- type TypeRef
- type WorkflowExecution
Constants ¶
const ( OpcodeTaskSubmit uint16 = 0x0050 // one-shot task OpcodeTaskSchedule uint16 = 0x0051 // recurring schedule )
ZAP opcodes for task submission.
Variables ¶
This section is empty.
Functions ¶
func SetDefault ¶ added in v1.9.4
func SetDefault(c *Client)
SetDefault installs the process-wide task client. main() should call this once during boot. Subsequent callers use Default() to dispatch tasks.
Types ¶
type BatchOperation ¶ added in v1.38.0
type BatchOperation struct {
BatchId string `json:"batchId"`
Namespace string `json:"namespace"`
Operation string `json:"operation"` // BATCH_OPERATION_TYPE_TERMINATE|CANCEL|SIGNAL|RESET
Reason string `json:"reason"`
Query string `json:"query"` // visibility query — "WorkflowType='X'"
State string `json:"state"` // BATCH_OPERATION_STATE_RUNNING|COMPLETED|FAILED
StartTime string `json:"startTime"`
CloseTime string `json:"closeTime,omitempty"`
TotalCount int64 `json:"totalOperationCount"`
DoneCount int64 `json:"completeOperationCount"`
}
BatchOperation — bulk terminate/cancel/signal across many executions.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages both one-shot tasks and recurring schedules.
func Default ¶ added in v1.9.4
func Default() *Client
Default returns the process-wide client. If SetDefault was never called, lazily creates a client from TASKS_URL / TASKS_ZAP env vars so callers that register schedules before main() finished wiring still work. main() may replace this with a handler-bound client via SetDefault.
func New ¶
New creates a Client. If both tasksURL and zapAddr are empty, everything runs locally. If zapAddr is set, ZAP transport is preferred. HTTP is fallback.
func (*Client) Add ¶
Add registers a recurring task.
spec is either a Go duration ("30s", "5m", "1h") or a standard 5-field cron expression ("0 3 * * *", "*/5 * * * *", "0 14 8 * *"). The fn runs on that cadence. If TASKS_URL is set, creates a durable Hanzo Tasks schedule so retries, dead-letter and audit are handled server-side. Otherwise runs locally.
tasks.Default().Add("settlement.process", "30s", func() { ... })
tasks.Default().Add("audit.archive", "0 3 * * *", func() { ... })
type Deployment ¶ added in v1.38.0
type Deployment struct {
SeriesName string `json:"seriesName"`
Namespace string `json:"namespace"`
BuildIDs []DeploymentBuild `json:"buildIds"`
DefaultBuildId string `json:"defaultBuildId"`
CreateTime string `json:"createTime"`
}
Deployment — a worker version series. Workers register a buildId; the engine routes workflow tasks based on default + ramping rules.
type DeploymentBuild ¶ added in v1.38.0
type EmbedConfig ¶ added in v1.33.1
type EmbedConfig struct {
DataDir string // "" → "./tasks-data" (reserved; memdb today)
ZAPPort int // 0 → 9999
Namespace string // "" → "default"
Logger *slog.Logger // nil → slog.Default()
}
EmbedConfig configures the in-process Tasks server.
type Embedded ¶ added in v1.33.1
type Embedded struct {
// contains filtered or unexported fields
}
Embedded is the handle to a running in-process Tasks server.
func Embed ¶ added in v1.33.1
func Embed(ctx context.Context, cfg EmbedConfig) (*Embedded, error)
Embed starts the Tasks server. Stop before exit.
func (*Embedded) EventsHandler ¶ added in v1.40.0
EventsHandler returns the SSE realtime stream of engine events.
func (*Embedded) HTTPHandler ¶ added in v1.37.2
HTTPHandler returns the browser-only JSON shim. Mirrors zapHandlers.
func (*Embedded) MCPHandler ¶ added in v1.40.0
MCPHandler returns the JSON-RPC 2.0 MCP endpoint.
type Event ¶ added in v1.40.0
type Event struct {
Kind string `json:"kind"` // workflow.started|workflow.canceled|workflow.terminated|workflow.signaled|schedule.created|schedule.paused|schedule.resumed|schedule.deleted|namespace.registered|batch.started
Namespace string `json:"namespace,omitempty"`
WorkflowID string `json:"workflow_id,omitempty"`
RunID string `json:"run_id,omitempty"`
ScheduleID string `json:"schedule_id,omitempty"`
BatchID string `json:"batch_id,omitempty"`
At string `json:"at"`
Data any `json:"data,omitempty"`
}
type ExecutionRef ¶ added in v1.38.0
type Identity ¶ added in v1.38.0
type Identity struct {
Email string `json:"email"`
Namespace string `json:"namespace"`
Role string `json:"role"` // owner|admin|developer|viewer
GrantTime string `json:"grantTime"`
}
Identity — who is allowed to operate on this namespace. Sourced from IAM (X-User-Email) when wired, manually managed today.
type Namespace ¶ added in v1.38.0
type Namespace struct {
NamespaceInfo NamespaceInfo `json:"namespaceInfo"`
Config NamespaceCfg `json:"config"`
IsActive bool `json:"isActive"`
}
type NamespaceCfg ¶ added in v1.38.0
type NamespaceInfo ¶ added in v1.38.0
type NamespaceInfo struct {
Name string `json:"name"`
State string `json:"state"` // NAMESPACE_STATE_REGISTERED|DEPRECATED|DELETED
Description string `json:"description,omitempty"`
OwnerEmail string `json:"ownerEmail,omitempty"`
Region string `json:"region,omitempty"` // e.g. "do-sfo3", informational
CreateTime string `json:"createTime,omitempty"`
}
type NexusEndpoint ¶ added in v1.38.0
type NexusEndpoint struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Description string `json:"description,omitempty"`
Target string `json:"target"` // ns2://other-namespace/handler
CreateTime string `json:"createTime"`
}
NexusEndpoint — cross-namespace operation bridge.
type Schedule ¶ added in v1.38.0
type Schedule struct {
ScheduleId string `json:"scheduleId"`
Namespace string `json:"namespace"`
Spec ScheduleSpec `json:"spec"`
Action ScheduleAction `json:"action"`
State ScheduleState `json:"state"`
Info ScheduleInfo `json:"info"`
}
type ScheduleAction ¶ added in v1.38.0
type ScheduleInfo ¶ added in v1.38.0
type ScheduleSpec ¶ added in v1.38.0
type ScheduleState ¶ added in v1.38.0
type WorkflowExecution ¶ added in v1.38.0
type WorkflowExecution struct {
Execution ExecutionRef `json:"execution"`
Type TypeRef `json:"type"`
StartTime string `json:"startTime,omitempty"`
CloseTime string `json:"closeTime,omitempty"`
Status string `json:"status"` // WORKFLOW_EXECUTION_STATUS_*
TaskQueue string `json:"taskQueue,omitempty"`
HistoryLen int64 `json:"historyLength,omitempty"`
Memo any `json:"memo,omitempty"`
SearchAttrs map[string]any `json:"searchAttrs,omitempty"`
Input any `json:"input,omitempty"`
Result any `json:"result,omitempty"`
}