worker

package
v1.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package worker is the Hanzo Tasks worker runtime. A Worker owns a pool of long-poll goroutines that claim workflow and activity tasks from the Tasks frontend over luxfi/zap, dispatches each task to the registered user function, and ships the result back.

Layering:

pkg/sdk/client    — Dial, Client, Transport, WorkerTransport
pkg/sdk/worker    — THIS PACKAGE
pkg/sdk/workflow  — Context / Future / ExecuteActivity / Sleep / ...
pkg/sdk/activity  — GetInfo / GetLogger / RecordHeartbeat
pkg/sdk/temporal  — *Error / RetryPolicy / failure serde

Zero go.temporal.io/* imports. Zero google.golang.org/grpc imports. Transport is luxfi/zap; logging is github.com/luxfi/log.

Determinism (Phase 1)

This worker re-runs the registered workflow function from its start on every workflow-task dispatch. The function must be pure with respect to its inputs: same arguments => same sequence of workflow primitives. Activities are expected to be idempotent, which is the same contract Temporal imposes. Phase 2 will land event-sourced replay (a history log and replay decider) without changing this package's public surface.

Poll concurrency

Options.MaxConcurrentWorkflowTaskPollers goroutines long-poll OpcodePollWorkflowTask. Options.MaxConcurrentActivityExecutionSize goroutines long-poll OpcodePollActivityTask. Both default to reasonable production numbers (see defaultOptions). Each poller blocks on one round trip to the frontend; when a task is returned the same goroutine dispatches it (no hand-off), then re-polls. Back-pressure is the transport's: if the frontend returns a nil task (idle), the poller re-issues immediately.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InterruptCh

func InterruptCh() <-chan any

InterruptCh returns a process-wide channel that closes on the first SIGINT or SIGTERM. Pass it to Worker.Run to trigger a graceful shutdown on Ctrl-C or `kill`. Repeated calls return the same channel.

The channel is `chan any` (not `chan os.Signal`) so it satisfies the Worker.Run signature (`<-chan any`) without leaking os.Signal into the caller's type surface. The value sent on interrupt is the os.Signal that caused it; callers typically ignore it.

Types

type Options

type Options struct {
	// MaxConcurrentActivityExecutionSize is both the activity poller
	// count and the soft cap on concurrent activity executions. Each
	// poller thread executes activities synchronously on itself, so
	// the two numbers are the same knob. 0 → defaultActivityPollers.
	MaxConcurrentActivityExecutionSize int

	// MaxConcurrentWorkflowTaskPollers is the workflow-task poller
	// count. 0 → defaultWorkflowPollers.
	MaxConcurrentWorkflowTaskPollers int

	// MaxConcurrentWorkflowTaskExecutionSize is the concurrency cap
	// on workflow-task execution. Distinct from the poller count; a
	// worker can poll more aggressively than it executes. 0 =
	// unlimited (bounded only by MaxConcurrentWorkflowTaskPollers).
	MaxConcurrentWorkflowTaskExecutionSize int

	// MaxConcurrentLocalActivityExecutionSize caps concurrent local
	// activity executions. Phase-1 local activities are dispatched
	// via the remote path; this knob is honoured in the common
	// semaphore around ExecuteActivity. 0 = unlimited.
	MaxConcurrentLocalActivityExecutionSize int

	// WorkerActivitiesPerSecond caps this worker's activity dispatch
	// rate. Per-worker; does not coordinate across replicas. 0 =
	// unlimited.
	WorkerActivitiesPerSecond float64

	// WorkerLocalActivitiesPerSecond caps this worker's local
	// activity dispatch rate. 0 = unlimited.
	WorkerLocalActivitiesPerSecond float64

	// TaskQueueActivitiesPerSecond is the shared-across-workers rate
	// intended as a task-queue global. Phase-1 enforces it per-worker
	// — true global coordination requires a server-side limiter and
	// arrives with the native serde milestone. 0 = unlimited.
	TaskQueueActivitiesPerSecond float64

	// EnableSessionWorker registers a default session tracker that
	// no-ops but satisfies the upstream API. Required by callers
	// that enable activity sessions; harmless otherwise.
	EnableSessionWorker bool

	// Identity is sent to the server on every poll so the frontend
	// attributes tasks to this worker. Empty → "<hostname>@<pid>".
	Identity string

	// Logger overrides the Worker's logger. Nil → luxlog.Noop().
	Logger luxlog.Logger
}

Options configures a Worker. Field shape matches go.temporal.io/sdk/worker.Options so caller code migrating from upstream compiles unchanged. Any zero value means "unlimited / default"; the Options struct is never validated for positivity.

type RegisterActivityOptions

type RegisterActivityOptions struct {
	// Name overrides the registry key.
	Name string
}

RegisterActivityOptions customises an activity registration.

type RegisterWorkflowOptions

type RegisterWorkflowOptions struct {
	// Name overrides the registry key. Empty uses the reflected
	// Go function name (see funcName).
	Name string

	// DisableAlreadyRegisteredCheck lets a caller replace a prior
	// registration without panicking — useful for hot-reload in
	// development. Default false: double-registration is a bug.
	DisableAlreadyRegisteredCheck bool
}

RegisterWorkflowOptions customises a workflow registration.

type Worker

type Worker interface {
	// RegisterWorkflow adds a workflow function to the registry under
	// its reflected Go name. Must be called before Start.
	RegisterWorkflow(w any)

	// RegisterWorkflowWithOptions adds a workflow function under an
	// explicit name. Must be called before Start.
	RegisterWorkflowWithOptions(w any, opts RegisterWorkflowOptions)

	// RegisterActivity adds an activity function under its reflected
	// Go name. Must be called before Start.
	RegisterActivity(a any)

	// RegisterActivityWithOptions adds an activity function under an
	// explicit name. Must be called before Start.
	RegisterActivityWithOptions(a any, opts RegisterActivityOptions)

	// Start begins polling. Non-blocking: returns as soon as the
	// poller goroutines are launched. Safe to call exactly once.
	Start() error

	// Run starts the worker and blocks until interruptCh closes or
	// Stop is called. Interruption triggers a graceful shutdown.
	Run(interruptCh <-chan any) error

	// Stop terminates all pollers and waits for in-flight tasks to
	// finish. Idempotent.
	Stop()
}

Worker is the Hanzo Tasks worker runtime. Methods are safe for concurrent use except as noted.

func New

func New(c client.Client, taskQueue string, options Options) Worker

New returns a Worker attached to c that polls taskQueue.

The Worker shares c's underlying Transport — only one luxfi/zap connection is opened per process even when several Workers run against different task queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL