tasks

package
v1.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package tasks provides the Hanzo Tasks client for Go applications.

Drop-in replacement for Base's Tasks() / Cron():

// Before (Base cron):
e.App.Tasks().Add("settlement", "*/30 * * * * *", func() { ... })

// After (Hanzo Tasks):
tasks.Default().Add("settlement", "30s", func() { ... })

Add() accepts both Go duration strings ("30s", "5m", "1h", "24h") and standard 5-field cron expressions ("0 3 * * *", "0 0 5 1,4,7,10 *", "*/5 * * * *"). Anything that parses as a Go duration is treated as an interval; anything else is treated as a cron expression.

If TASKS_URL is set, schedules run as durable Hanzo Tasks workflows (retries, dead letter, audit trail). If not, runs locally via goroutine timer (dev mode, same behaviour as cron but no persistence).

If zapAddr is set, ZAP binary transport is preferred over HTTP for submitting tasks (lower latency, same semantics). HTTP is fallback.

Package tasks provides the Hanzo Tasks client for Go applications.

Two methods, two use cases:

client := tasks.New(os.Getenv("TASKS_URL"), os.Getenv("TASKS_ZAP"), nil)
client.Add("settlement.process", "30s", fn)   // recurring schedule (duration)
client.Add("audit.archive", "0 3 * * *", fn)  // recurring schedule (cron)
client.Now("webhook.deliver", payload)         // fire once immediately

Transport priority: ZAP (binary, low-latency) > HTTP > local goroutine. When TASKS_ZAP is set, tasks submit over ZAP binary protocol. When TASKS_URL is set, tasks submit over HTTP as fallback. When neither is set, tasks run locally via goroutine timers (dev mode).

Integration with Hanzo Base:

app.Tasks().Add("cleanup", "1h", fn)
app.Tasks().Now("email.send", payload)

Index

Constants

View Source
const (
	OpcodeTaskSubmit   uint16 = 0x0050 // one-shot task
	OpcodeTaskSchedule uint16 = 0x0051 // recurring schedule
)

ZAP opcodes for task submission.

View Source
const (
	// Worker → server (Call). Existing 0x00A2..0x00A5 declared in
	// pkg/sdk/client/transport.go remain authoritative.
	OpcodeSubscribeWorkflowTasks uint16 = 0x00A0
	OpcodeSubscribeActivityTasks uint16 = 0x00A1
	OpcodeUnsubscribeTasks       uint16 = 0x00A6

	// Server → worker (Send).
	OpcodeDeliverWorkflowTask   uint16 = 0x00B0
	OpcodeDeliverActivityTask   uint16 = 0x00B1
	OpcodeDeliverActivityResult uint16 = 0x00B2
	OpcodeDeliverCancelRequest  uint16 = 0x00B3
	OpcodeDeliverQuery          uint16 = 0x00B4

	// Worker → server (Call). Query response.
	OpcodeRespondQuery uint16 = 0x00C4
)

Variables

View Source
var ErrNoWorkersSubscribed = fmt.Errorf("no workers subscribed to task queue")

ErrNoWorkersSubscribed is returned by QueryWorkflow when no worker is subscribed to the workflow's task queue. Callers must surface this as a 503-class condition; there is no engine-side fallback.

Functions

func SetDefault added in v1.9.4

func SetDefault(c *Client)

SetDefault installs the process-wide task client. main() should call this once during boot. Subsequent callers use Default() to dispatch tasks.

Types

type BatchOperation added in v1.38.0

type BatchOperation struct {
	BatchId    string `json:"batchId"`
	Namespace  string `json:"namespace"`
	Operation  string `json:"operation"` // BATCH_OPERATION_TYPE_TERMINATE|CANCEL|SIGNAL|RESET
	Reason     string `json:"reason"`
	Query      string `json:"query"` // visibility query — "WorkflowType='X'"
	State      string `json:"state"` // BATCH_OPERATION_STATE_RUNNING|COMPLETED|FAILED
	StartTime  string `json:"startTime"`
	CloseTime  string `json:"closeTime,omitempty"`
	TotalCount int64  `json:"totalOperationCount"`
	DoneCount  int64  `json:"completeOperationCount"`
}

BatchOperation — bulk terminate/cancel/signal across many executions.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages both one-shot tasks and recurring schedules.

func Default added in v1.9.4

func Default() *Client

Default returns the process-wide client. If SetDefault was never called, lazily creates a client from TASKS_URL / TASKS_ZAP env vars so callers that register schedules before main() finished wiring still work. main() may replace this with a handler-bound client via SetDefault.

func New

func New(tasksURL, zapAddr string, handler Handler) *Client

New creates a Client. If both tasksURL and zapAddr are empty, everything runs locally. If zapAddr is set, ZAP transport is preferred. HTTP is fallback.

func (*Client) Add

func (c *Client) Add(name, spec string, fn func()) error

Add registers a recurring task.

spec is either a Go duration ("30s", "5m", "1h") or a standard 5-field cron expression ("0 3 * * *", "*/5 * * * *", "0 14 8 * *"). The fn runs on that cadence. If TASKS_URL is set, creates a durable Hanzo Tasks schedule so retries, dead-letter and audit are handled server-side. Otherwise runs locally.

tasks.Default().Add("settlement.process", "30s", func() { ... })
tasks.Default().Add("audit.archive", "0 3 * * *", func() { ... })

func (*Client) Now

func (c *Client) Now(taskType string, payload map[string]any) error

Now submits a one-shot task for durable execution. Prefers ZAP transport when zapAddr is configured, falls back to HTTP.

func (*Client) Stop

func (c *Client) Stop()

Stop cancels all local schedules and closes ZAP connection. Call on shutdown.

type Deployment added in v1.38.0

type Deployment struct {
	Name           string              `json:"name"`
	Namespace      string              `json:"namespace"`
	Description    string              `json:"description,omitempty"`
	OwnerEmail     string              `json:"ownerEmail,omitempty"`
	DefaultCompute string              `json:"defaultCompute,omitempty"`
	Versions       []DeploymentVersion `json:"versions"`
	DefaultBuildId string              `json:"defaultBuildId"`
	CreateTime     string              `json:"createTime"`
	UpdateTime     string              `json:"updateTime,omitempty"`
}

Deployment — a worker version series. Workers register a buildId; the engine routes workflow tasks based on default + ramping rules.

type DeploymentPatch added in v1.43.0

type DeploymentPatch struct {
	Description    *string `json:"description,omitempty"`
	OwnerEmail     *string `json:"ownerEmail,omitempty"`
	DefaultCompute *string `json:"defaultCompute,omitempty"`
}

DeploymentPatch carries the optional fields of an UpdateDeployment. Empty strings are ignored; non-nil maps fully replace existing data.

type DeploymentVersion added in v1.43.0

type DeploymentVersion struct {
	BuildId     string            `json:"buildId"`
	State       string            `json:"state"` // DEPLOYMENT_STATE_DRAFT|CURRENT|RAMPING|RETIRED
	Description string            `json:"description,omitempty"`
	Compute     string            `json:"compute,omitempty"`
	Image       string            `json:"image,omitempty"`
	Env         map[string]string `json:"env,omitempty"`
	CreateTime  string            `json:"createTime"`
	UpdateTime  string            `json:"updateTime,omitempty"`
}

type DeploymentVersionPatch added in v1.43.0

type DeploymentVersionPatch struct {
	Description *string           `json:"description,omitempty"`
	Compute     *string           `json:"compute,omitempty"`
	Image       *string           `json:"image,omitempty"`
	Env         map[string]string `json:"env,omitempty"`
}

DeploymentVersionPatch is metadata-only — buildId and CreateTime are immutable.

type EmbedConfig added in v1.33.1

type EmbedConfig struct {
	DataDir   string       // "" → "./tasks-data" (reserved; memdb today)
	ZAPPort   int          // 0 → 9999
	Namespace string       // "" → "default"
	Logger    *slog.Logger // nil → slog.Default()
	// JWTValidator validates the auth_token field on every ZAP request.
	// nil = no ZAP-side validation (dev / embedded). When non-nil and
	// RequireIdentity=true, every ZAP request must carry an auth_token
	// that validates against IAM; per-request engine is WithOrg-scoped
	// to claims.Owner. This mirrors the HTTP middleware trust boundary.
	JWTValidator    *auth.Validator
	RequireIdentity bool
	// Replicator wires consensus-replication for every shard. nil →
	// LocalReplicator (single-node passthrough). cmd/tasksd builds a
	// QuasarReplicator when --replicator=quasar is set.
	Replicator replication.Replicator
	// Router selects the (org, ns, taskQueue) leader. nil → solo
	// router that returns the local node for every key.
	Router routing.Router
	// NodeID is this process's stable identifier. "" → "tasks-embed".
	NodeID string
}

EmbedConfig configures the in-process Tasks server.

type Embedded added in v1.33.1

type Embedded struct {
	// contains filtered or unexported fields
}

Embedded is the handle to a running in-process Tasks server.

func Embed added in v1.33.1

func Embed(ctx context.Context, cfg EmbedConfig) (*Embedded, error)

Embed starts the Tasks server. Stop before exit.

func (*Embedded) ClusterHandler added in v1.43.0

func (e *Embedded) ClusterHandler() http.Handler

ClusterHandler exposes /v1/tasks/cluster, /v1/tasks/cluster/health, /v1/tasks/namespaces/{ns}/migrate. Probe routes are unauthenticated; migrate accepts the same X-Org-Id-scoped identity as everything else.

func (*Embedded) EventsHandler added in v1.40.0

func (e *Embedded) EventsHandler() http.Handler

EventsHandler returns the SSE realtime stream of engine events.

func (*Embedded) HTTPHandler added in v1.37.2

func (e *Embedded) HTTPHandler() http.Handler

HTTPHandler returns the browser-only JSON shim. Mirrors zapHandlers. Per-request engine is scoped to the X-Org-Id minted by pkg/auth from the validated IAM JWT (Authorization: Bearer). Client-supplied identity headers are stripped before the handler runs. Empty org → legacy unscoped store (embedded/dev path only).

func (*Embedded) MCPHandler added in v1.40.0

func (e *Embedded) MCPHandler() http.Handler

MCPHandler returns the JSON-RPC 2.0 MCP endpoint.

func (*Embedded) RegisterWorker added in v1.43.0

func (e *Embedded) RegisterWorker(w Worker)

RegisterWorker upserts a worker into the in-memory registry. Workers self-register on first poll/heartbeat. Real wiring will move to the dispatcher Subscribe path; this is the public surface the UI reads.

func (*Embedded) Stop added in v1.33.1

func (e *Embedded) Stop(ctx context.Context) error

Stop shuts the server down. Idempotent.

func (*Embedded) ZAPPort added in v1.33.1

func (e *Embedded) ZAPPort() int

ZAPPort returns the bound ZAP port.

type Event added in v1.40.0

type Event struct {
	Kind       string `json:"kind"`             // workflow.started|workflow.canceled|workflow.terminated|workflow.signaled|schedule.created|schedule.paused|schedule.resumed|schedule.deleted|namespace.registered|batch.started
	OrgID      string `json:"org_id,omitempty"` // tenant scope; "" = unscoped (embedded/dev)
	Namespace  string `json:"namespace,omitempty"`
	WorkflowID string `json:"workflow_id,omitempty"`
	RunID      string `json:"run_id,omitempty"`
	ScheduleID string `json:"schedule_id,omitempty"`
	BatchID    string `json:"batch_id,omitempty"`
	At         string `json:"at"`
	Data       any    `json:"data,omitempty"`
}

type ExecutionRef added in v1.38.0

type ExecutionRef struct {
	WorkflowId string `json:"workflowId"`
	RunId      string `json:"runId"`
}

type Handler

type Handler func(taskType string, payload map[string]any)

Handler processes a one-shot task (webhook delivery, settlement, etc.)

type HistoryEvent added in v1.43.0

type HistoryEvent struct {
	EventId    int64          `json:"eventId"`
	EventTime  string         `json:"eventTime"`
	EventType  string         `json:"eventType"`
	Attributes map[string]any `json:"attributes,omitempty"`
}

HistoryEvent — a single durable record in a workflow execution's history. Modeled on Temporal's HistoryEvent but JSON-native and owned by Hanzo. EventId is monotonic per (namespace, workflowId, runId); EventType matches the WORKFLOW_EXECUTION_* / WORKFLOW_TASK_* / ACTIVITY_TASK_* / TIMER_* family.

type Identity added in v1.38.0

type Identity struct {
	Email     string `json:"email"`
	Namespace string `json:"namespace"`
	Role      string `json:"role"` // owner|admin|developer|viewer
	GrantTime string `json:"grantTime"`
}

Identity — who is allowed to operate on this namespace. Sourced from IAM (X-User-Email) when wired, manually managed today.

type Namespace added in v1.38.0

type Namespace struct {
	NamespaceInfo NamespaceInfo `json:"namespaceInfo"`
	Config        NamespaceCfg  `json:"config"`
	IsActive      bool          `json:"isActive"`
}

type NamespaceCfg added in v1.38.0

type NamespaceCfg struct {
	WorkflowExecutionRetentionTtl string            `json:"workflowExecutionRetentionTtl"` // "720h"
	APSLimit                      int               `json:"apsLimit"`                      // actions per second
	HistoryArchivalState          string            `json:"historyArchivalState,omitempty"`
	HistoryArchivalUri            string            `json:"historyArchivalUri,omitempty"`
	VisibilityArchivalState       string            `json:"visibilityArchivalState,omitempty"`
	VisibilityArchivalUri         string            `json:"visibilityArchivalUri,omitempty"`
	CustomData                    map[string]string `json:"customData,omitempty"`
}

type NamespaceInfo added in v1.38.0

type NamespaceInfo struct {
	Name        string `json:"name"`
	State       string `json:"state"` // NAMESPACE_STATE_REGISTERED|DEPRECATED|DELETED
	Description string `json:"description,omitempty"`
	OwnerEmail  string `json:"ownerEmail,omitempty"`
	Region      string `json:"region,omitempty"` // e.g. "do-sfo3", informational
	CreateTime  string `json:"createTime,omitempty"`
}

type NamespaceMetadataPatch added in v1.43.0

type NamespaceMetadataPatch struct {
	Description             *string           `json:"description,omitempty"`
	OwnerEmail              *string           `json:"ownerEmail,omitempty"`
	Retention               *string           `json:"retention,omitempty"`
	HistoryArchivalState    *string           `json:"historyArchivalState,omitempty"`
	HistoryArchivalUri      *string           `json:"historyArchivalUri,omitempty"`
	VisibilityArchivalState *string           `json:"visibilityArchivalState,omitempty"`
	VisibilityArchivalUri   *string           `json:"visibilityArchivalUri,omitempty"`
	CustomData              map[string]string `json:"customData,omitempty"`
}

NamespaceMetadataPatch carries the fields a metadata update may set. Empty string fields are ignored; non-nil maps fully replace.

type NexusEndpoint added in v1.38.0

type NexusEndpoint struct {
	Name        string `json:"name"`
	Namespace   string `json:"namespace"`
	Description string `json:"description,omitempty"`
	Target      string `json:"target"` // ns2://other-namespace/handler
	CreateTime  string `json:"createTime"`
}

NexusEndpoint — cross-namespace operation bridge.

type RetryPolicy added in v1.43.0

type RetryPolicy struct {
	InitialInterval        string   `json:"initialInterval,omitempty"`
	BackoffCoefficient     float64  `json:"backoffCoefficient,omitempty"`
	MaximumInterval        string   `json:"maximumInterval,omitempty"`
	MaximumAttempts        int      `json:"maximumAttempts,omitempty"`
	NonRetryableErrorTypes []string `json:"nonRetryableErrorTypes,omitempty"`
}

RetryPolicy — schedule retry knobs for an activity.

type Schedule added in v1.38.0

type Schedule struct {
	ScheduleId string         `json:"scheduleId"`
	Namespace  string         `json:"namespace"`
	Spec       ScheduleSpec   `json:"spec"`
	Action     ScheduleAction `json:"action"`
	State      ScheduleState  `json:"state"`
	Info       ScheduleInfo   `json:"info"`
}

type ScheduleAction added in v1.38.0

type ScheduleAction struct {
	WorkflowType TypeRef `json:"workflowType"`
	TaskQueue    string  `json:"taskQueue"`
	Input        any     `json:"input,omitempty"`
}

type ScheduleInfo added in v1.38.0

type ScheduleInfo struct {
	CreateTime     string `json:"createTime"`
	UpdateTime     string `json:"updateTime,omitempty"`
	ActionCount    int64  `json:"actionCount"`
	NextActionTime string `json:"nextActionTime,omitempty"`
}

type ScheduleSpec added in v1.38.0

type ScheduleSpec struct {
	CronString []string `json:"cronString,omitempty"`
	Interval   []struct {
		Interval string `json:"interval"`
		Phase    string `json:"phase,omitempty"`
	} `json:"interval,omitempty"`
}

type ScheduleState added in v1.38.0

type ScheduleState struct {
	Paused bool   `json:"paused"`
	Note   string `json:"note,omitempty"`
}

type SearchAttribute added in v1.43.0

type SearchAttribute struct {
	Name string `json:"name"`
	Type string `json:"type"` // Keyword|Text|Int|Double|Bool|Datetime|KeywordList
}

SearchAttribute is a typed search attribute registered to a namespace.

type StandaloneActivity added in v1.43.0

type StandaloneActivity struct {
	Execution              ExecutionRef `json:"execution"`
	Type                   TypeRef      `json:"type"`
	TaskQueue              string       `json:"taskQueue,omitempty"`
	Status                 string       `json:"status"` // ACTIVITY_TASK_STATE_*
	StartTime              string       `json:"startTime,omitempty"`
	CloseTime              string       `json:"closeTime,omitempty"`
	RetryPolicy            *RetryPolicy `json:"retryPolicy,omitempty"`
	Input                  any          `json:"input,omitempty"`
	Result                 any          `json:"result,omitempty"`
	FailureCause           string       `json:"failureCause,omitempty"`
	Identity               string       `json:"identity,omitempty"`
	Attempt                int          `json:"attempt"`
	MaximumAttempts        int          `json:"maximumAttempts,omitempty"`
	ScheduleToCloseTimeout string       `json:"scheduleToCloseTimeout,omitempty"`
	ScheduleToStartTimeout string       `json:"scheduleToStartTimeout,omitempty"`
	StartToCloseTimeout    string       `json:"startToCloseTimeout,omitempty"`
	HeartbeatTimeout       string       `json:"heartbeatTimeout,omitempty"`
	LastHeartbeatTime      string       `json:"lastHeartbeatTime,omitempty"`
	HistoryLength          int64        `json:"historyLength,omitempty"`
}

StandaloneActivity — a first-class activity record keyed by (ns, activityId, runId), independent of any workflow. Workers schedule, heartbeat, and complete activities that the engine tracks without an enclosing workflow execution.

type TypeRef added in v1.38.0

type TypeRef struct {
	Name string `json:"name"`
}

type VersionValidationResult added in v1.43.0

type VersionValidationResult struct {
	NetworkOk         bool     `json:"networkOk"`
	WorkerRegistered  bool     `json:"workerRegistered"`
	HeartbeatReceived bool     `json:"heartbeatReceived"`
	LatencyMs         int      `json:"latencyMs"`
	Errors            []string `json:"errors"`
}

VersionValidationResult is the synthetic snapshot returned by ValidateVersion. Real validation requires the worker SDK runtime to answer; for now we return a deterministic body keyed off whether the version row exists.

type Worker added in v1.43.0

type Worker struct {
	Identity      string `json:"identity"`
	Namespace     string `json:"namespace"`
	TaskQueue     string `json:"taskQueue,omitempty"`
	SDKName       string `json:"sdkName,omitempty"`
	SDKVersion    string `json:"sdkVersion,omitempty"`
	LastHeartbeat string `json:"lastHeartbeat,omitempty"`
	FirstSeen     string `json:"firstSeen,omitempty"`
}

Worker is the registered worker as exposed to the UI.

type WorkflowExecution added in v1.38.0

type WorkflowExecution struct {
	Execution    ExecutionRef          `json:"execution"`
	Type         TypeRef               `json:"type"`
	StartTime    string                `json:"startTime,omitempty"`
	CloseTime    string                `json:"closeTime,omitempty"`
	Status       string                `json:"status"` // WORKFLOW_EXECUTION_STATUS_*
	TaskQueue    string                `json:"taskQueue,omitempty"`
	HistoryLen   int64                 `json:"historyLength,omitempty"`
	Memo         any                   `json:"memo,omitempty"`
	SearchAttrs  map[string]any        `json:"searchAttrs,omitempty"`
	Input        any                   `json:"input,omitempty"`
	Result       any                   `json:"result,omitempty"`
	UserMetadata *WorkflowUserMetadata `json:"userMetadata,omitempty"`
}

type WorkflowUserMetadata added in v1.43.0

type WorkflowUserMetadata struct {
	Summary   string `json:"summary,omitempty"`
	Details   string `json:"details,omitempty"`
	UpdatedBy string `json:"updatedBy,omitempty"`
	UpdatedAt string `json:"updatedAt,omitempty"`
}

WorkflowUserMetadata is operator-supplied summary/details for a workflow.

Directories

Path Synopsis
Package migration moves a single (org, namespace) shard between nodes.
Package migration moves a single (org, namespace) shard between nodes.
Package replication is the consensus-replication boundary for the Tasks shard manager.
Package replication is the consensus-replication boundary for the Tasks shard manager.
Package routing implements sticky leader selection for (org, namespace, taskQueue) triples.
Package routing implements sticky leader selection for (org, namespace, taskQueue) triples.
Package store implements per-(org, namespace) SQLite shards with optional consensus replication.
Package store implements per-(org, namespace) SQLite shards with optional consensus replication.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL