Documentation
¶
Overview ¶
Package worker is the Hanzo Tasks worker runtime. A Worker owns a pool of long-poll goroutines that claim workflow and activity tasks from the Tasks frontend over luxfi/zap, dispatches each task to the registered user function, and ships the result back.
Layering:
pkg/sdk/client — Dial, Client, Transport, WorkerTransport pkg/sdk/worker — THIS PACKAGE pkg/sdk/workflow — Context / Future / ExecuteActivity / Sleep / ... pkg/sdk/activity — GetInfo / GetLogger / RecordHeartbeat pkg/sdk/temporal — *Error / RetryPolicy / failure serde
Zero go.temporal.io/* imports. Zero google.golang.org/grpc imports. Transport is luxfi/zap; logging is github.com/luxfi/log.
Determinism (Phase 1)
This worker re-runs the registered workflow function from its start on every workflow-task dispatch. The function must be pure with respect to its inputs: same arguments => same sequence of workflow primitives. Activities are expected to be idempotent, which is the same contract Temporal imposes. Phase 2 will land event-sourced replay (a history log and replay decider) without changing this package's public surface.
Poll concurrency ¶
Options.MaxConcurrentWorkflowTaskPollers goroutines long-poll OpcodePollWorkflowTask. Options.MaxConcurrentActivityExecutionSize goroutines long-poll OpcodePollActivityTask. Both default to reasonable production numbers (see defaultOptions). Each poller blocks on one round trip to the frontend; when a task is returned the same goroutine dispatches it (no hand-off), then re-polls. Back-pressure is the transport's: if the frontend returns a nil task (idle), the poller re-issues immediately.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InterruptCh ¶
func InterruptCh() <-chan any
InterruptCh returns a process-wide channel that closes on the first SIGINT or SIGTERM. Pass it to Worker.Run to trigger a graceful shutdown on Ctrl-C or `kill`. Repeated calls return the same channel.
The channel is `chan any` (not `chan os.Signal`) so it satisfies the Worker.Run signature (`<-chan any`) without leaking os.Signal into the caller's type surface. The value sent on interrupt is the os.Signal that caused it; callers typically ignore it.
Types ¶
type Options ¶
type Options struct {
// MaxConcurrentActivityExecutionSize is both the activity poller
// count and the soft cap on concurrent activity executions. Each
// poller thread executes activities synchronously on itself, so
// the two numbers are the same knob. 0 → defaultActivityPollers.
MaxConcurrentActivityExecutionSize int
// MaxConcurrentWorkflowTaskPollers is the workflow-task poller
// count. 0 → defaultWorkflowPollers.
MaxConcurrentWorkflowTaskPollers int
// MaxConcurrentWorkflowTaskExecutionSize is the concurrency cap
// on workflow-task execution. Distinct from the poller count; a
// worker can poll more aggressively than it executes. 0 =
// unlimited (bounded only by MaxConcurrentWorkflowTaskPollers).
MaxConcurrentWorkflowTaskExecutionSize int
// MaxConcurrentLocalActivityExecutionSize caps concurrent local
// activity executions. Phase-1 local activities are dispatched
// via the remote path; this knob is honoured in the common
// semaphore around ExecuteActivity. 0 = unlimited.
MaxConcurrentLocalActivityExecutionSize int
// WorkerActivitiesPerSecond caps this worker's activity dispatch
// rate. Per-worker; does not coordinate across replicas. 0 =
// unlimited.
WorkerActivitiesPerSecond float64
// WorkerLocalActivitiesPerSecond caps this worker's local
// activity dispatch rate. 0 = unlimited.
WorkerLocalActivitiesPerSecond float64
// TaskQueueActivitiesPerSecond is the shared-across-workers rate
// intended as a task-queue global. Phase-1 enforces it per-worker
// — true global coordination requires a server-side limiter and
// arrives with the native serde milestone. 0 = unlimited.
TaskQueueActivitiesPerSecond float64
// EnableSessionWorker registers a default session tracker that
// no-ops but satisfies the upstream API. Required by callers
// that enable activity sessions; harmless otherwise.
EnableSessionWorker bool
// Identity is sent to the server on every poll so the frontend
// attributes tasks to this worker. Empty → "<hostname>@<pid>".
Identity string
// Logger overrides the Worker's logger. Nil → luxlog.Noop().
Logger luxlog.Logger
}
Options configures a Worker. Field shape matches go.temporal.io/sdk/worker.Options so caller code migrating from upstream compiles unchanged. Any zero value means "unlimited / default"; the Options struct is never validated for positivity.
type RegisterActivityOptions ¶
type RegisterActivityOptions struct {
// Name overrides the registry key.
Name string
}
RegisterActivityOptions customises an activity registration.
type RegisterWorkflowOptions ¶
type RegisterWorkflowOptions struct {
// Name overrides the registry key. Empty uses the reflected
// Go function name (see funcName).
Name string
// DisableAlreadyRegisteredCheck lets a caller replace a prior
// registration without panicking — useful for hot-reload in
// development. Default false: double-registration is a bug.
DisableAlreadyRegisteredCheck bool
}
RegisterWorkflowOptions customises a workflow registration.
type Worker ¶
type Worker interface {
// RegisterWorkflow adds a workflow function to the registry under
// its reflected Go name. Must be called before Start.
RegisterWorkflow(w any)
// RegisterWorkflowWithOptions adds a workflow function under an
// explicit name. Must be called before Start.
RegisterWorkflowWithOptions(w any, opts RegisterWorkflowOptions)
// RegisterActivity adds an activity function under its reflected
// Go name. Must be called before Start.
RegisterActivity(a any)
// RegisterActivityWithOptions adds an activity function under an
// explicit name. Must be called before Start.
RegisterActivityWithOptions(a any, opts RegisterActivityOptions)
// Start begins polling. Non-blocking: returns as soon as the
// poller goroutines are launched. Safe to call exactly once.
Start() error
// Run starts the worker and blocks until interruptCh closes or
// Stop is called. Interruption triggers a graceful shutdown.
Run(interruptCh <-chan any) error
// Stop terminates all pollers and waits for in-flight tasks to
// finish. Idempotent.
Stop()
}
Worker is the Hanzo Tasks worker runtime. Methods are safe for concurrent use except as noted.