Documentation
¶
Overview ¶
Package workflow is the workflow-code-facing API of the Hanzo Tasks SDK. User workflow functions import this package and call its helpers (ExecuteActivity, GetSignalChannel, NewSelector, …) to express durable orchestration.
Layering ¶
The package is DELIBERATELY THIN. All runtime behaviour — deterministic time, activity dispatch, signal queues, selector scheduling — is delegated to a CoroutineEnv supplied by the worker (pkg/sdk/worker). Users never see CoroutineEnv; they see Context, Future, Channel, Selector. Those types carry a CoroutineEnv handle under the hood.
This separation lets pkg/sdk/workflow ship the public surface that base/commerce/ta call into today — with only the package path changing in their imports — while the coroutine scheduler and event-sourced replay engine live in pkg/sdk/worker and can evolve independently.
Zero upstream imports. Zero go.temporal.io/*. Zero google.golang.org/grpc. Logging via github.com/luxfi/log.
Determinism (Phase 1)
Phase 1 of the worker runs a workflow as a single Go goroutine driving CoroutineEnv synchronously. There is NO event-sourced replay yet. Crashes mid-workflow restart the workflow from the beginning with the same input. This is safe as long as activities are idempotent — which is the same contract Temporal imposes — but you do lose in-flight-activity progress on crash. Phase 2 will add the history log and true replay.
Inside a workflow function you MUST NOT call time.Now(), rand.Read, or spawn goroutines. Use workflow.Now(ctx), workflow.NewTimer, and (phase 2) workflow.Go. Map iteration order and time-sensitive branching are also forbidden. CI will add a `tasks-vet` linter for these in a follow-up.
Index ¶
- func ActivityName(v any) string
- func EncodePayload(v any) ([]byte, error)
- func GetLogger(ctx Context) log.Logger
- func NewChildWorkflowFuture() (ChildWorkflowFuture, Settleable, Settleable)
- func Now(ctx Context) time.Time
- func SelectFanIn(cases []SelectCase, doneCh <-chan struct{}) int
- func Sleep(ctx Context, d time.Duration) error
- func WithCancel(parent Context) (Context, CancelFunc)
- type ActivityOptions
- type CancelFunc
- type Channel
- type ChildWorkflowFuture
- type Context
- type CoroutineEnv
- type Future
- type Info
- type LocalActivityOptions
- type ReceiveChannel
- type SelectCase
- type Selector
- type Settleable
- type StubEnv
- func (e *StubEnv) AdvanceClock(d time.Duration)
- func (e *StubEnv) Cancel()
- func (e *StubEnv) CurrentScope() any
- func (e *StubEnv) ExecuteActivity(opts ActivityOptions, activity any, args []any) Future
- func (e *StubEnv) ExecuteChildWorkflow(childWorkflow any, args []any) ChildWorkflowFuture
- func (e *StubEnv) GetSignalChannel(name string) ReceiveChannel
- func (e *StubEnv) Logger() log.Logger
- func (e *StubEnv) NewCancelScope() (any, CancelFunc)
- func (e *StubEnv) NewChannel(name string, buffered int) Channel
- func (e *StubEnv) NewTimer(d time.Duration) Future
- func (e *StubEnv) Now() time.Time
- func (e *StubEnv) OnActivity(activity any) *stubActivityReg
- func (e *StubEnv) OnChildWorkflow(childWorkflow any) *stubChildWorkflowReg
- func (e *StubEnv) ScopeDone(scope any) <-chan struct{}
- func (e *StubEnv) ScopeErr(scope any) error
- func (e *StubEnv) Select(cases []SelectCase) int
- func (e *StubEnv) SetAutoAdvance(d time.Duration)
- func (e *StubEnv) SetInfo(i Info)
- func (e *StubEnv) SetLogger(l log.Logger)
- func (e *StubEnv) SignalAsync(name string, val any)
- func (e *StubEnv) Sleep(d time.Duration) error
- func (e *StubEnv) WorkflowInfo() Info
- type WorkflowExecution
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ActivityName ¶
ActivityName returns the registered wire name for an activity value. String activities are returned as-is; function values are reflected to the short Go identifier (pkg.Fn → Fn). Nil yields "nil". Mirrors the upstream Temporal default.
func EncodePayload ¶
EncodePayload serialises a value to the wire form a Future carries. Phase 1 uses JSON; Phase 2 will swap this for the canonical ZAP codec. The helper is exposed so the worker / StubEnv produce the same shape user code later decodes.
func GetLogger ¶
GetLogger returns a logger bound to the current workflow. The worker wires it to suppress duplicate lines during replay.
func NewChildWorkflowFuture ¶
func NewChildWorkflowFuture() (ChildWorkflowFuture, Settleable, Settleable)
NewChildWorkflowFuture constructs an unsettled ChildWorkflowFuture. The worker owns both halves; user code consumes the returned Future.
func Now ¶
Now returns the deterministic workflow clock. time.Now() is NOT deterministic and MUST NOT be called inside a workflow function; use this helper instead. Activities, by contrast, run off-ctx and can use time.Now freely.
func SelectFanIn ¶
func SelectFanIn(cases []SelectCase, doneCh <-chan struct{}) int
SelectFanIn is the shared edge-triggered Select implementation. Exported for the worker env to reuse without duplication. User code should not call it directly — use workflow.NewSelector.
It blocks until exactly one case in cases is ready, or until doneCh closes, and returns the ready case index (or -1 on cancellation / empty input).
Implementation: each case parks on a readiness source —
- Future cases: the Future.ReadyCh() (closed on Settle).
- chanImpl cases: a single-buffer chan struct{} registered via RegisterReadyWaker; the channel fires once when the chan becomes readable (buffered value, parked unbuffered sender, or closed).
A small goroutine per case translates that signal into an index send on wakeCh. The main routine receives once from wakeCh and returns. On wake we re-scan for the earliest-ready case to preserve deterministic ordering across unrelated simultaneous ready events.
No polling. No time.Ticker. No 1 ms spin.
func Sleep ¶
Sleep blocks the workflow coroutine for d of workflow time. Returns temporal.Canceled if ctx is canceled before d elapses.
Sleep is the ONE-AND-ONLY way to delay execution inside a workflow; time.Sleep inside a workflow is both wrong (non-deterministic) and a scheduler bug (it blocks the coroutine thread).
func WithCancel ¶
func WithCancel(parent Context) (Context, CancelFunc)
WithCancel derives a child Context whose cancel function, when invoked, cancels operations scoped to the child (timers created under it, activities scheduled under it, Selector.Select running under it). Canceling the child does not cancel the parent.
Mirrors go.temporal.io/sdk/workflow.WithCancel semantics so migration is a pure path swap.
Types ¶
type ActivityOptions ¶
type ActivityOptions struct {
// TaskQueue routes the activity to a specific worker pool.
// Empty means "the workflow's own task queue" (the common case).
TaskQueue string
// ScheduleToStartTimeout caps the wait between enqueue and
// worker pick-up. 0 = unlimited.
ScheduleToStartTimeout time.Duration
// StartToCloseTimeout caps a single activity run. Exceeding it
// forces a retry per RetryPolicy. 0 = unlimited (NOT
// recommended; configure something reasonable).
StartToCloseTimeout time.Duration
// ScheduleToCloseTimeout is the end-to-end budget including
// retries. 0 means "derive from StartToCloseTimeout *
// MaximumAttempts".
ScheduleToCloseTimeout time.Duration
// HeartbeatTimeout is the max interval between heartbeats for
// a long-running activity. Exceeding it aborts the attempt
// and triggers retry. 0 = no heartbeat required.
HeartbeatTimeout time.Duration
// RetryPolicy controls attempt count and backoff. nil = the
// SDK default (unlimited attempts, exponential backoff starting
// at 1s, capped at 100s). Pass &temporal.RetryPolicy{MaximumAttempts: 1}
// to disable retries entirely.
RetryPolicy *temporal.RetryPolicy
// WaitForCancellation, when true, makes cancellation wait for
// the activity to acknowledge before returning. Default false:
// cancellation unblocks the caller immediately and the
// activity is expected to observe ctx.Done and exit.
WaitForCancellation bool
}
ActivityOptions configures a single activity invocation. Attached to a Context via WithActivityOptions; every ExecuteActivity call below reads the attached options at dispatch time.
Shape matches go.temporal.io/sdk/workflow.ActivityOptions so existing base/commerce call sites compile unchanged after the import swap.
type CancelFunc ¶
type CancelFunc func()
CancelFunc cancels a context or cancel scope. It is idempotent.
type Channel ¶
type Channel interface {
ReceiveChannel
// Send enqueues a value. On an unbuffered channel Send blocks
// the coroutine until a receiver pairs with it; on a buffered
// channel it blocks only when the buffer is full.
Send(ctx Context, val any) error
// Close marks the channel closed. Further Sends panic;
// Receives drain any buffered values then report ok=false.
// Idempotent.
Close()
}
Channel is the read/write view of a workflow channel. All user-created channels satisfy Channel; only workflow code can Send on them.
Sealed: see ReceiveChannel. Channel cannot be implemented outside this package.
func NewBufferedChannel ¶
NewBufferedChannel returns a buffered workflow channel with the given capacity. Send blocks only when the buffer is full.
func NewChannel ¶
NewChannel returns an unbuffered workflow channel. Send blocks until a Receive pairs with it.
func NewChannelFromEnv ¶
NewChannelFromEnv mints a Channel without needing a Context. The worker env uses this so it can satisfy its CoroutineEnv.NewChannel contract without a chicken-and-egg dependency on a Context that carries the env.
func NewNamedChannel ¶
NewNamedChannel returns a buffered channel with a caller-supplied debug name. Names are used for replay identity and log output.
func NewSignalChannel ¶ added in v1.36.1
NewSignalChannel mints a named, buffered channel suitable for worker-side signal fan-in. The worker package uses this so it can return a sealed ReceiveChannel to user code without re-implementing the interface (which would require re-exposing the sealed method).
buffer sizes less than 1 are clamped to 1 so the dispatch-time pre-population never drops a signal.
type ChildWorkflowFuture ¶
type ChildWorkflowFuture interface {
Future
// GetChildWorkflowExecution returns a Future that settles with
// the child's WorkflowExecution ({WorkflowID, RunID}). It is
// settled by the worker as soon as the frontend acknowledges
// the schedule — typically well before the child's result
// future settles.
GetChildWorkflowExecution() Future
}
ChildWorkflowFuture is the handle returned by ExecuteChildWorkflow. It embeds Future (whose Get / IsReady / ReadyCh observe the child's final result) and adds GetChildWorkflowExecution, a secondary Future that settles with the child's {WorkflowID, RunID} as soon as the server mints the run.
Matches go.temporal.io/sdk/workflow.ChildWorkflowFuture shape-for- shape so caller code migrating from the upstream package compiles unchanged after the import swap.
func ExecuteChildWorkflow ¶
func ExecuteChildWorkflow(ctx Context, childWorkflow any, args ...any) ChildWorkflowFuture
ExecuteChildWorkflow is the workflow-code entry point. It delegates to CoroutineEnv.ExecuteChildWorkflow, which in workerEnv forwards the request over ZAP. A misconfigured env (nil / StubEnv with no child registered) surfaces a typed error on result.Get.
type Context ¶
type Context interface {
// Deadline returns the time after which the workflow will be
// canceled, or zero time + false if no deadline is set.
Deadline() (time.Time, bool)
// Done returns a channel that is closed when the workflow (or
// the containing cancel scope) is canceled. Mirrors
// context.Context.Done.
Done() <-chan struct{}
// Err reports why Done was closed. Returns nil while still
// running, a *temporal.Error once canceled / timed out.
Err() error
// Value returns the value associated with key k, or nil.
Value(k any) any
// contains filtered or unexported methods
}
Context is the workflow-scoped counterpart to the stdlib context.Context. It carries the CoroutineEnv, a cancel scope, a deadline, per-call ActivityOptions, and user-supplied Values.
Like context.Context, Context is immutable: helpers (WithCancel, WithActivityOptions, WithValue) return a derived Context with one field replaced. The parent is unchanged.
Inside a workflow function treat Context exactly as you would treat context.Context in a regular request handler — but remember:
- ctx.Done() closes when the workflow is canceled (not when the goroutine ends).
- ctx.Err() returns temporal.Canceled once canceled, not context.Canceled.
- ctx.Value(k) carries workflow-side Values, not stdlib context values — and may not cross the activity boundary.
func NewContextFromEnv ¶
func NewContextFromEnv(e CoroutineEnv) Context
NewContextFromEnv constructs the root workflow.Context for a workflow execution. pkg/sdk/worker calls this once per run, passing the CoroutineEnv it owns; it then invokes the registered workflow function with the returned Context.
This is the ONLY way a Context enters the system. User code never calls NewContextFromEnv directly.
func WithActivityOptions ¶
func WithActivityOptions(parent Context, opts ActivityOptions) Context
WithActivityOptions returns a Context carrying opts. The ctx returned is what ExecuteActivity(ctx, …) consumes when it dispatches the activity.
func WithLocalActivityOptions ¶
func WithLocalActivityOptions(parent Context, opts LocalActivityOptions) Context
WithLocalActivityOptions returns a Context whose ExecuteLocalActivity consumes opts. Independent of WithActivityOptions — the two key spaces do not collide.
type CoroutineEnv ¶
type CoroutineEnv interface {
// Now returns the deterministic workflow clock. It advances
// only at commit points (timer fire, activity result); it does
// not track wall-clock time.
Now() time.Time
// Logger returns a logger bound to this workflow execution.
// Writes during replay are suppressed by the worker so logs
// reflect only the real (non-replayed) progress of the run.
Logger() log.Logger
// Sleep blocks the coroutine for d of workflow time, unless the
// workflow is canceled first. Returns temporal.Canceled if
// canceled before d elapses.
Sleep(d time.Duration) error
// NewTimer schedules a durable timer and returns a Future that
// settles with nil after d of workflow time, or with
// temporal.Canceled if the timer is canceled.
NewTimer(d time.Duration) Future
// ExecuteActivity submits an activity for execution. The
// Future settles with the activity's (result, error). The
// activity handle is opaque to this package: it is resolved
// against the worker's registered name by the worker itself.
ExecuteActivity(opts ActivityOptions, activity any, args []any) Future
// GetSignalChannel returns (a handle to) the named signal
// channel. Repeated calls with the same name return handles
// that observe the same queue — signals are keyed by name, not
// by handle.
GetSignalChannel(name string) ReceiveChannel
// NewChannel creates a user channel. buffered=0 makes it
// unbuffered; >0 is the capacity. name is used only for
// debugging and replay identity and may be empty.
NewChannel(name string, buffered int) Channel
// Select waits for exactly one of the supplied cases to fire
// and returns its index. The caller fires the matching user
// callback; this keeps the env purely mechanical.
//
// Cases carry either a Future or a ReceiveChannel. The env's
// implementation is free to reorder "ready" cases according to
// its own determinism rules, but it must be deterministic
// across replays.
Select(cases []SelectCase) int
// NewCancelScope derives a child cancel scope whose cancel
// function, when invoked, cancels only workflow primitives
// created under that scope (timers, activities, selects).
// Returns a handle opaque to callers and the cancel function.
NewCancelScope() (scope any, cancel CancelFunc)
// ExecuteChildWorkflow starts a child workflow and returns a
// ChildWorkflowFuture. The returned future's Get settles with
// the child's result; its GetChildWorkflowExecution() returns a
// Future that settles as soon as the server assigns a runID.
//
// Phase-1: the worker forwards the request to the frontend via
// OpcodeStartChildWorkflow and emits a scheduleChildWorkflow
// command (kind=3) so history-based replay can recover the
// linkage. A StubEnv used in tests may fake this path.
ExecuteChildWorkflow(childWorkflow any, args []any) ChildWorkflowFuture
// CurrentScope returns the env's current cancel scope, which
// Context uses to route Done()/Err() calls.
CurrentScope() any
// ScopeDone returns a done channel for the supplied scope. It
// is closed when the scope is canceled.
ScopeDone(scope any) <-chan struct{}
// ScopeErr returns the cancellation error for the supplied
// scope, or nil if not canceled.
ScopeErr(scope any) error
// WorkflowInfo returns a snapshot of stable metadata about the
// running workflow (ID, RunID, TaskQueue, etc.). Phase 1
// returns zero values for fields the worker hasn't wired yet.
WorkflowInfo() Info
}
CoroutineEnv is the package-private seam between this public surface and the worker runtime. pkg/sdk/worker supplies a concrete implementation at workflow-execution start and passes it to NewContextFromEnv; user code never sees this type.
Every method runs on the workflow's single coroutine. A method that "blocks" (AwaitFuture, AwaitReceive, Sleep, Select) parks the coroutine until the event fires — during replay the same decisions must happen in the same order, which is the worker's problem, not this package's.
The interface is intentionally small: every primitive callers use is expressible as a combination of a few env calls. Adding a new public helper means composing existing env methods, not extending this interface — unless the helper needs a genuinely new runtime capability (e.g. workflow.Go in phase 2).
type Future ¶
type Future interface {
// Get blocks the workflow coroutine until the future settles,
// then decodes the result into valPtr. valPtr may be nil, in
// which case the result payload is dropped. Returns the
// settlement error, if any.
Get(ctx Context, valPtr any) error
// IsReady reports whether the future has settled. Useful for
// non-blocking peeks inside a Selector callback; most code
// calls Get directly.
IsReady() bool
// ReadyCh returns a channel that is closed once the Future has
// settled. It is used by the Selector fan-in so a blocking Select
// can park on a single receive instead of polling. Stable across
// calls: the same channel is returned every time.
ReadyCh() <-chan struct{}
}
Future is the workflow-side handle for an asynchronous result — the completion of an activity, a timer, or a user-settled promise.
Future is exactly the Temporal shape so callers migrating from go.temporal.io/sdk/workflow change only the import.
f := workflow.ExecuteActivity(ctx, DoWork, req)
var resp Response
if err := f.Get(ctx, &resp); err != nil { ... }
func ExecuteActivity ¶
ExecuteActivity dispatches an activity for execution and returns the Future that settles with its (result, error). activity may be a function value or a registered name (string). args are the activity's input parameters; they will be JSON-encoded for the wire.
The activity's attached ActivityOptions are read from ctx. If no options were attached, the SDK default is used (unlimited retry, no timeouts — configure something).
Callers MUST NOT spawn goroutines that call ExecuteActivity; in Phase 1 the workflow is a single coroutine and parallel dispatch is done by calling ExecuteActivity N times in a loop and then Get-ing the Futures (or selecting on them).
func ExecuteLocalActivity ¶
ExecuteLocalActivity dispatches an activity via the same wire as ExecuteActivity. Phase 1: local activities execute via the remote path. Same wire, same guarantees. True in-process fast path arrives with the event-sourced replay engine in Phase 2.
The options attached to ctx via WithLocalActivityOptions are translated to the ActivityOptions consumed by the remote path so the StartToCloseTimeout and RetryPolicy flow through unchanged.
func NewTimer ¶
NewTimer schedules a durable timer that fires after d of workflow time and returns the Future that represents its completion. The Future's Get settles with nil error when the timer fires, or with temporal.Canceled if the ctx is canceled before d elapses.
Mirrors go.temporal.io/sdk/workflow.NewTimer.
func NewWallClockTimer ¶
NewWallClockTimer is the shared helper used by both StubEnv and workerEnv to produce a real, cancellable Future driven by a time.Timer. Exported so the worker package (a sibling) can reuse the same implementation without reaching into package internals.
- d <= 0 settles the Future with (nil, nil) immediately.
- Otherwise, a per-timer goroutine parks on the timer's channel alongside the supplied doneCh. If doneCh closes first, the Future settles with errFn() — typically the scope's cancel error.
Cancellation semantics: a closed doneCh always wins over a fired timer in the rare race where both are observable simultaneously.
type Info ¶
type Info struct {
// WorkflowID is the caller-supplied business identifier.
WorkflowID string
// RunID is the engine-generated per-execution identifier.
RunID string
// WorkflowType is the registered name (typically the Go
// function name) of the workflow being executed.
WorkflowType string
// TaskQueue is the queue the workflow is running on.
TaskQueue string
// Namespace groups workflow executions. Defaults to "default".
Namespace string
// Attempt is the 1-based retry attempt for this run.
Attempt int32
}
Info is a stable snapshot of workflow identity. It mirrors the shape Temporal's WorkflowInfo carries so handlers that log it don't change at the call site after migration.
type LocalActivityOptions ¶
type LocalActivityOptions struct {
// ScheduleToCloseTimeout is the end-to-end budget including
// retries. 0 = derive from StartToCloseTimeout * MaximumAttempts.
ScheduleToCloseTimeout time.Duration
// StartToCloseTimeout caps a single attempt. 0 = unlimited.
StartToCloseTimeout time.Duration
// RetryPolicy overrides the default retry policy. Nil = SDK
// default (see temporal.DefaultRetryPolicy).
RetryPolicy *temporal.RetryPolicy
}
LocalActivityOptions configures a single local-activity invocation. Mirrors go.temporal.io/sdk/workflow.LocalActivityOptions.
Phase-1 note: local activities execute via the remote path. Same wire, same guarantees. True in-process fast path arrives with the event-sourced replay engine in Phase 2. The options struct is preserved so caller code compiles unchanged and so the retry / timeout fields are propagated to the remote-path activity.
type ReceiveChannel ¶
type ReceiveChannel interface {
// Receive blocks the workflow coroutine until a value is
// available, decodes it into valPtr, and reports whether the
// channel is still open via the ok return. If the channel is
// closed and drained, ok is false and valPtr is untouched.
Receive(ctx Context, valPtr any) (ok bool)
// ReceiveAsync is the non-blocking form: it returns false
// immediately if no value is buffered. Useful inside a
// Selector callback where you only want to consume if ready.
ReceiveAsync(valPtr any) (ok bool)
// Name returns the channel's debug identifier. Empty for
// unnamed user channels. Signal channels carry the signal name.
Name() string
// contains filtered or unexported methods
}
ReceiveChannel is the read-only view of a workflow channel. Both signal channels (GetSignalChannel) and user channels (NewChannel, NewBufferedChannel) satisfy it.
Mirrors go.temporal.io/sdk/workflow.ReceiveChannel exactly.
Sealed: the interface cannot be implemented outside this package. All ReceiveChannel values must originate from one of the package's constructors (NewChannel, NewBufferedChannel, NewNamedChannel, GetSignalChannel). Forks cannot ship divergent behind-the-scenes channel semantics that the selector / coroutine scheduler would quietly accept; any such type would fail to satisfy the sealed method.
func GetSignalChannel ¶
func GetSignalChannel(ctx Context, name string) ReceiveChannel
GetSignalChannel returns the ReceiveChannel for the named signal. Two calls with the same name return handles that observe the same queue — signals are keyed by name, not handle identity. This matches Temporal's semantics and is what base/plugins/tasks relies on for claim/complete/fail/cancel fan-in.
Signals are durable: the worker persists them and re-delivers them on replay. Sending a signal is the client's job (pkg/sdk/client); this helper only exposes the workflow-side read.
type SelectCase ¶
type SelectCase struct {
Future Future
Channel ReceiveChannel
}
SelectCase is the internal representation of a case submitted to Selector.Select. Exactly one of Future / Channel is non-nil.
type Selector ¶
type Selector interface {
AddFuture(f Future, fn func(f Future)) Selector
AddReceive(ch ReceiveChannel, fn func(ch ReceiveChannel, more bool)) Selector
AddDefault(fn func()) Selector
Select(ctx Context)
}
Selector is the multi-way choice primitive. Build one with NewSelector(ctx), attach cases with AddFuture / AddReceive / AddDefault, then block on Select(ctx). Exactly one case fires per Select call.
Mirrors go.temporal.io/sdk/workflow.Selector exactly.
func NewSelector ¶
NewSelector returns a Selector that blocks on Select until one of the attached cases fires. The ctx is captured only for the env handle; Select takes its own ctx so callers can pair a Selector with a cancel-scoped ctx for one iteration of a loop.
type Settleable ¶
type Settleable interface {
Future
// Settle delivers a final (value, err). Subsequent Settle calls
// are no-ops. value may be nil (e.g. for timers).
Settle(value []byte, err error)
}
Settleable is the env-facing side of a Future. The worker owns a Settleable handle for each Future it hands back and completes it when the activity/timer finishes. This package exposes it so the worker and the in-memory StubEnv can both drive futures without reaching into unexported state.
func NewFuture ¶
func NewFuture() Settleable
NewFuture constructs a settleable Future. It is intended for the worker / StubEnv to call; ordinary workflow code receives Futures from ExecuteActivity / NewTimer / a Selector callback.
type StubEnv ¶
type StubEnv struct {
// contains filtered or unexported fields
}
StubEnv is a single-goroutine, in-memory CoroutineEnv implementation sufficient for unit-testing workflow code without standing up a real worker. It IS NOT a substitute for the worker runtime; it lacks event-sourced replay, durable timers, and cross-process activity dispatch.
StubEnv is test-only. Production workflow dispatch uses workerEnv (pkg/sdk/worker/env.go). Do NOT depend on StubEnv from non-test code — its API surface is intentionally unstable and may add / remove deterministic-testing hooks over time.
Typical use:
env := workflow.NewStubEnv()
env.OnActivity("DoWork").Return(&Result{OK: true}, nil)
env.SignalAsync("claim", map[string]string{"agent_id": "alice"})
ctx := workflow.NewContextFromEnv(env)
out, err := MyWorkflow(ctx, input)
The stub processes activities synchronously: ExecuteActivity looks up the registered response by matching function name / string and settles the Future on the same goroutine before returning it.
Timers in the stub run against a real wall clock via workflow.Timer so that tests observe the same scheduling semantics as production (a Future that settles after d, cancelable via scope). Tests that want deterministic-time assertions use short durations (tens of milliseconds) or AdvanceClock in combination with the Now() helpers — AdvanceClock no longer force-fires timers.
func NewStubEnv ¶
func NewStubEnv() *StubEnv
NewStubEnv returns an in-memory CoroutineEnv for tests. Clock starts at a stable epoch and does not advance unless the caller sets AutoAdvance or calls AdvanceClock.
func (*StubEnv) AdvanceClock ¶
AdvanceClock manually advances workflow Now() by d. It does NOT fire pending timers: timers are wall-clock-based so their firing is driven by the real OS scheduler. Tests that want to observe timer firing should either use small durations or run them on a separate goroutine.
func (*StubEnv) Cancel ¶
func (e *StubEnv) Cancel()
Cancel cancels the root scope. Equivalent to the workflow's top- level cancellation firing. All active Sleeps, Timers, and Selects return temporal.Canceled.
func (*StubEnv) CurrentScope ¶
func (*StubEnv) ExecuteActivity ¶
func (e *StubEnv) ExecuteActivity(opts ActivityOptions, activity any, args []any) Future
func (*StubEnv) ExecuteChildWorkflow ¶
func (e *StubEnv) ExecuteChildWorkflow(childWorkflow any, args []any) ChildWorkflowFuture
ExecuteChildWorkflow implements CoroutineEnv.
func (*StubEnv) GetSignalChannel ¶
func (e *StubEnv) GetSignalChannel(name string) ReceiveChannel
func (*StubEnv) NewCancelScope ¶
func (e *StubEnv) NewCancelScope() (any, CancelFunc)
func (*StubEnv) NewTimer ¶
NewTimer schedules a wall-clock timer using the real workflow.Timer helper. The returned Future settles with (nil, nil) after d, or with temporal.Canceled if the root scope cancels first.
func (*StubEnv) OnActivity ¶
OnActivity queues a response for the named activity. Pass a function value (reflect.TypeOf(fn).Name() is matched) or a literal string. Returns the env so multiple OnActivity.Return calls chain.
Example:
env.OnActivity("DoWork").Return(&Result{OK: true}, nil)
env.OnActivity(DoWork).Return(&Result{OK: false}, errors.New("boom"))
func (*StubEnv) OnChildWorkflow ¶
OnChildWorkflow queues a response for a named child workflow. The next ExecuteChildWorkflow invocation with a matching type consumes the queued entry (FIFO). When the queue is empty the stub settles the future with (nil, nil) — same fallback shape as ExecuteActivity.
func (*StubEnv) Select ¶
func (e *StubEnv) Select(cases []SelectCase) int
Select blocks until exactly one case is ready and returns its index. It uses the Selector fan-in (ReadyCh / RegisterReadyWaker) instead of a 1 ms polling spin: each case parks on the underlying Future's ReadyCh or the channel's waker, and the scope's done chan cancels the whole wait.
func (*StubEnv) SetAutoAdvance ¶
SetAutoAdvance sets a per-Now increment. Setting to 1*time.Second means every Now() call jumps one second, which is enough to unblock simple Sleep-based tests without an explicit advance.
func (*StubEnv) SetInfo ¶
SetInfo overrides the WorkflowInfo surfaced by GetInfo. Useful for tests that assert on identity fields.
func (*StubEnv) SignalAsync ¶
SignalAsync enqueues a signal value for the named channel. The channel is created on demand if no workflow code has asked for it yet.
func (*StubEnv) WorkflowInfo ¶
type WorkflowExecution ¶
type WorkflowExecution struct {
WorkflowID string `json:"workflow_id"`
RunID string `json:"run_id,omitempty"`
}
WorkflowExecution is the child-side identifier pair. Declared here (rather than in client/) so the workflow package never imports the client package — keeping the dependency graph a strict line:
workflow → temporal
worker → {workflow, client, temporal, activity}
client → {temporal, converter}