workflow

package
v1.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package workflow is the workflow-code-facing API of the Hanzo Tasks SDK. User workflow functions import this package and call its helpers (ExecuteActivity, GetSignalChannel, NewSelector, …) to express durable orchestration.

Layering

The package is DELIBERATELY THIN. All runtime behaviour — deterministic time, activity dispatch, signal queues, selector scheduling — is delegated to a CoroutineEnv supplied by the worker (pkg/sdk/worker). Users never see CoroutineEnv; they see Context, Future, Channel, Selector. Those types carry a CoroutineEnv handle under the hood.

This separation lets pkg/sdk/workflow ship the public surface that base/commerce/ta call into today — with only the package path changing in their imports — while the coroutine scheduler and event-sourced replay engine live in pkg/sdk/worker and can evolve independently.

Zero upstream imports. Zero go.temporal.io/*. Zero google.golang.org/grpc. Logging via github.com/luxfi/log.

Determinism (Phase 1)

Phase 1 of the worker runs a workflow as a single Go goroutine driving CoroutineEnv synchronously. There is NO event-sourced replay yet. Crashes mid-workflow restart the workflow from the beginning with the same input. This is safe as long as activities are idempotent — which is the same contract Temporal imposes — but you do lose in-flight-activity progress on crash. Phase 2 will add the history log and true replay.

Inside a workflow function you MUST NOT call time.Now(), rand.Read, or spawn goroutines. Use workflow.Now(ctx), workflow.NewTimer, and (phase 2) workflow.Go. Map iteration order and time-sensitive branching are also forbidden. CI will add a `tasks-vet` linter for these in a follow-up.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ActivityName

func ActivityName(v any) string

ActivityName returns the registered wire name for an activity value. String activities are returned as-is; function values are reflected to the short Go identifier (pkg.Fn → Fn). Nil yields "nil". Mirrors the upstream Temporal default.

func EncodePayload

func EncodePayload(v any) ([]byte, error)

EncodePayload serialises a value to the wire form a Future carries. Phase 1 uses JSON; Phase 2 will swap this for the canonical ZAP codec. The helper is exposed so the worker / StubEnv produce the same shape user code later decodes.

func GetLogger

func GetLogger(ctx Context) log.Logger

GetLogger returns a logger bound to the current workflow. The worker wires it to suppress duplicate lines during replay.

func NewChildWorkflowFuture

func NewChildWorkflowFuture() (ChildWorkflowFuture, Settleable, Settleable)

NewChildWorkflowFuture constructs an unsettled ChildWorkflowFuture. The worker owns both halves; user code consumes the returned Future.

func Now

func Now(ctx Context) time.Time

Now returns the deterministic workflow clock. time.Now() is NOT deterministic and MUST NOT be called inside a workflow function; use this helper instead. Activities, by contrast, run off-ctx and can use time.Now freely.

func SelectFanIn

func SelectFanIn(cases []SelectCase, doneCh <-chan struct{}) int

SelectFanIn is the shared edge-triggered Select implementation. Exported for the worker env to reuse without duplication. User code should not call it directly — use workflow.NewSelector.

It blocks until exactly one case in cases is ready, or until doneCh closes, and returns the ready case index (or -1 on cancellation / empty input).

Implementation: each case parks on a readiness source —

  • Future cases: the Future.ReadyCh() (closed on Settle).
  • chanImpl cases: a single-buffer chan struct{} registered via RegisterReadyWaker; the channel fires once when the chan becomes readable (buffered value, parked unbuffered sender, or closed).

A small goroutine per case translates that signal into an index send on wakeCh. The main routine receives once from wakeCh and returns. On wake we re-scan for the earliest-ready case to preserve deterministic ordering across unrelated simultaneous ready events.

No polling. No time.Ticker. No 1 ms spin.

func Sleep

func Sleep(ctx Context, d time.Duration) error

Sleep blocks the workflow coroutine for d of workflow time. Returns temporal.Canceled if ctx is canceled before d elapses.

Sleep is the ONE-AND-ONLY way to delay execution inside a workflow; time.Sleep inside a workflow is both wrong (non-deterministic) and a scheduler bug (it blocks the coroutine thread).

func UpsertSearchAttributes added in v1.40.0

func UpsertSearchAttributes(ctx Context, attrs map[string]any) error

UpsertSearchAttributes upserts the supplied attribute map into the running workflow's visibility record. Mirrors go.temporal.io/sdk/workflow.UpsertSearchAttributes — keys are user- defined search attribute names registered on the namespace; values are encoded by the SQL visibility store.

This is the ONE workflow-side primitive callers in service/worker consume for search-attribute propagation; the typed and namespace- scoped variants from upstream are not used here and are not added.

func WithCancel

func WithCancel(parent Context) (Context, CancelFunc)

WithCancel derives a child Context whose cancel function, when invoked, cancels operations scoped to the child (timers created under it, activities scheduled under it, Selector.Select running under it). Canceling the child does not cancel the parent.

Mirrors go.temporal.io/sdk/workflow.WithCancel semantics so migration is a pure path swap.

Types

type ActivityOptions

type ActivityOptions struct {
	// TaskQueue routes the activity to a specific worker pool.
	// Empty means "the workflow's own task queue" (the common case).
	TaskQueue string

	// ScheduleToStartTimeout caps the wait between enqueue and
	// worker pick-up. 0 = unlimited.
	ScheduleToStartTimeout time.Duration

	// StartToCloseTimeout caps a single activity run. Exceeding it
	// forces a retry per RetryPolicy. 0 = unlimited (NOT
	// recommended; configure something reasonable).
	StartToCloseTimeout time.Duration

	// ScheduleToCloseTimeout is the end-to-end budget including
	// retries. 0 means "derive from StartToCloseTimeout *
	// MaximumAttempts".
	ScheduleToCloseTimeout time.Duration

	// HeartbeatTimeout is the max interval between heartbeats for
	// a long-running activity. Exceeding it aborts the attempt
	// and triggers retry. 0 = no heartbeat required.
	HeartbeatTimeout time.Duration

	// RetryPolicy controls attempt count and backoff. nil = the
	// SDK default (unlimited attempts, exponential backoff starting
	// at 1s, capped at 100s). Pass &temporal.RetryPolicy{MaximumAttempts: 1}
	// to disable retries entirely.
	RetryPolicy *temporal.RetryPolicy

	// WaitForCancellation, when true, makes cancellation wait for
	// the activity to acknowledge before returning. Default false:
	// cancellation unblocks the caller immediately and the
	// activity is expected to observe ctx.Done and exit.
	WaitForCancellation bool
}

ActivityOptions configures a single activity invocation. Attached to a Context via WithActivityOptions; every ExecuteActivity call below reads the attached options at dispatch time.

Shape matches go.temporal.io/sdk/workflow.ActivityOptions so existing base/commerce call sites compile unchanged after the import swap.

type CancelFunc

type CancelFunc func()

CancelFunc cancels a context or cancel scope. It is idempotent.

type Channel

type Channel interface {
	ReceiveChannel

	// Send enqueues a value. On an unbuffered channel Send blocks
	// the coroutine until a receiver pairs with it; on a buffered
	// channel it blocks only when the buffer is full.
	Send(ctx Context, val any) error

	// Close marks the channel closed. Further Sends panic;
	// Receives drain any buffered values then report ok=false.
	// Idempotent.
	Close()
}

Channel is the read/write view of a workflow channel. All user-created channels satisfy Channel; only workflow code can Send on them.

Sealed: see ReceiveChannel. Channel cannot be implemented outside this package.

func NewBufferedChannel

func NewBufferedChannel(ctx Context, size int) Channel

NewBufferedChannel returns a buffered workflow channel with the given capacity. Send blocks only when the buffer is full.

func NewChannel

func NewChannel(ctx Context) Channel

NewChannel returns an unbuffered workflow channel. Send blocks until a Receive pairs with it.

func NewChannelFromEnv

func NewChannelFromEnv(name string, size int) Channel

NewChannelFromEnv mints a Channel without needing a Context. The worker env uses this so it can satisfy its CoroutineEnv.NewChannel contract without a chicken-and-egg dependency on a Context that carries the env.

func NewNamedChannel

func NewNamedChannel(ctx Context, name string, size int) Channel

NewNamedChannel returns a buffered channel with a caller-supplied debug name. Names are used for replay identity and log output.

func NewSignalChannel added in v1.36.1

func NewSignalChannel(name string, buffer int) Channel

NewSignalChannel mints a named, buffered channel suitable for worker-side signal fan-in. The worker package uses this so it can return a sealed ReceiveChannel to user code without re-implementing the interface (which would require re-exposing the sealed method).

buffer sizes less than 1 are clamped to 1 so the dispatch-time pre-population never drops a signal.

type ChildWorkflowFuture

type ChildWorkflowFuture interface {
	Future

	// GetChildWorkflowExecution returns a Future that settles with
	// the child's WorkflowExecution ({WorkflowID, RunID}). It is
	// settled by the worker as soon as the frontend acknowledges
	// the schedule — typically well before the child's result
	// future settles.
	GetChildWorkflowExecution() Future
}

ChildWorkflowFuture is the handle returned by ExecuteChildWorkflow. It embeds Future (whose Get / IsReady / ReadyCh observe the child's final result) and adds GetChildWorkflowExecution, a secondary Future that settles with the child's {WorkflowID, RunID} as soon as the server mints the run.

Matches go.temporal.io/sdk/workflow.ChildWorkflowFuture shape-for- shape so caller code migrating from the upstream package compiles unchanged after the import swap.

func ExecuteChildWorkflow

func ExecuteChildWorkflow(ctx Context, childWorkflow any, args ...any) ChildWorkflowFuture

ExecuteChildWorkflow is the workflow-code entry point. It delegates to CoroutineEnv.ExecuteChildWorkflow, which in workerEnv forwards the request over ZAP. A misconfigured env (nil / StubEnv with no child registered) surfaces a typed error on result.Get.

type Context

type Context interface {
	// Deadline returns the time after which the workflow will be
	// canceled, or zero time + false if no deadline is set.
	Deadline() (time.Time, bool)

	// Done returns a channel that is closed when the workflow (or
	// the containing cancel scope) is canceled. Mirrors
	// context.Context.Done.
	Done() <-chan struct{}

	// Err reports why Done was closed. Returns nil while still
	// running, a *temporal.Error once canceled / timed out.
	Err() error

	// Value returns the value associated with key k, or nil.
	Value(k any) any
	// contains filtered or unexported methods
}

Context is the workflow-scoped counterpart to the stdlib context.Context. It carries the CoroutineEnv, a cancel scope, a deadline, per-call ActivityOptions, and user-supplied Values.

Like context.Context, Context is immutable: helpers (WithCancel, WithActivityOptions, WithValue) return a derived Context with one field replaced. The parent is unchanged.

Inside a workflow function treat Context exactly as you would treat context.Context in a regular request handler — but remember:

  • ctx.Done() closes when the workflow is canceled (not when the goroutine ends).
  • ctx.Err() returns temporal.Canceled once canceled, not context.Canceled.
  • ctx.Value(k) carries workflow-side Values, not stdlib context values — and may not cross the activity boundary.

func NewContextFromEnv

func NewContextFromEnv(e CoroutineEnv) Context

NewContextFromEnv constructs the root workflow.Context for a workflow execution. pkg/sdk/worker calls this once per run, passing the CoroutineEnv it owns; it then invokes the registered workflow function with the returned Context.

This is the ONLY way a Context enters the system. User code never calls NewContextFromEnv directly.

func WithActivityOptions

func WithActivityOptions(parent Context, opts ActivityOptions) Context

WithActivityOptions returns a Context carrying opts. The ctx returned is what ExecuteActivity(ctx, …) consumes when it dispatches the activity.

func WithLocalActivityOptions

func WithLocalActivityOptions(parent Context, opts LocalActivityOptions) Context

WithLocalActivityOptions returns a Context whose ExecuteLocalActivity consumes opts. Independent of WithActivityOptions — the two key spaces do not collide.

func WithValue

func WithValue(parent Context, key, val any) Context

WithValue returns a Context carrying key → val. Follows the same rules as context.WithValue: use a custom key type to avoid collisions, and don't abuse values for parameters.

type CoroutineEnv

type CoroutineEnv interface {
	// Now returns the deterministic workflow clock. It advances
	// only at commit points (timer fire, activity result); it does
	// not track wall-clock time.
	Now() time.Time

	// Logger returns a logger bound to this workflow execution.
	// Writes during replay are suppressed by the worker so logs
	// reflect only the real (non-replayed) progress of the run.
	Logger() log.Logger

	// Sleep blocks the coroutine for d of workflow time, unless the
	// workflow is canceled first. Returns temporal.Canceled if
	// canceled before d elapses.
	Sleep(d time.Duration) error

	// NewTimer schedules a durable timer and returns a Future that
	// settles with nil after d of workflow time, or with
	// temporal.Canceled if the timer is canceled.
	NewTimer(d time.Duration) Future

	// ExecuteActivity submits an activity for execution. The
	// Future settles with the activity's (result, error). The
	// activity handle is opaque to this package: it is resolved
	// against the worker's registered name by the worker itself.
	ExecuteActivity(opts ActivityOptions, activity any, args []any) Future

	// GetSignalChannel returns (a handle to) the named signal
	// channel. Repeated calls with the same name return handles
	// that observe the same queue — signals are keyed by name, not
	// by handle.
	GetSignalChannel(name string) ReceiveChannel

	// NewChannel creates a user channel. buffered=0 makes it
	// unbuffered; >0 is the capacity. name is used only for
	// debugging and replay identity and may be empty.
	NewChannel(name string, buffered int) Channel

	// Select waits for exactly one of the supplied cases to fire
	// and returns its index. The caller fires the matching user
	// callback; this keeps the env purely mechanical.
	//
	// Cases carry either a Future or a ReceiveChannel. The env's
	// implementation is free to reorder "ready" cases according to
	// its own determinism rules, but it must be deterministic
	// across replays.
	Select(cases []SelectCase) int

	// NewCancelScope derives a child cancel scope whose cancel
	// function, when invoked, cancels only workflow primitives
	// created under that scope (timers, activities, selects).
	// Returns a handle opaque to callers and the cancel function.
	NewCancelScope() (scope any, cancel CancelFunc)

	// ExecuteChildWorkflow starts a child workflow and returns a
	// ChildWorkflowFuture. The returned future's Get settles with
	// the child's result; its GetChildWorkflowExecution() returns a
	// Future that settles as soon as the server assigns a runID.
	//
	// Phase-1: the worker forwards the request to the frontend via
	// OpcodeStartChildWorkflow and emits a scheduleChildWorkflow
	// command (kind=3) so history-based replay can recover the
	// linkage. A StubEnv used in tests may fake this path.
	ExecuteChildWorkflow(childWorkflow any, args []any) ChildWorkflowFuture

	// CurrentScope returns the env's current cancel scope, which
	// Context uses to route Done()/Err() calls.
	CurrentScope() any

	// ScopeDone returns a done channel for the supplied scope. It
	// is closed when the scope is canceled.
	ScopeDone(scope any) <-chan struct{}

	// ScopeErr returns the cancellation error for the supplied
	// scope, or nil if not canceled.
	ScopeErr(scope any) error

	// WorkflowInfo returns a snapshot of stable metadata about the
	// running workflow (ID, RunID, TaskQueue, etc.). Phase 1
	// returns zero values for fields the worker hasn't wired yet.
	WorkflowInfo() Info

	// GetVersion returns the recorded version for changeID, clamped to
	// [minSupported, maxSupported]. The first call records
	// maxSupported; subsequent calls (and replays) return the recorded
	// value. See workflow.GetVersion for the user-facing contract.
	GetVersion(changeID string, minSupported, maxSupported Version) Version

	// SideEffect runs fn exactly once per workflow execution and
	// records its result into history. ctx is the user Context for
	// pass-through to fn. The returned bytes are the JSON-encoded
	// result; replays return the recorded bytes without re-running fn.
	SideEffect(fn func(ctx Context) any, ctx Context) ([]byte, error)

	// MutableSideEffect runs fn whenever the workflow re-evaluates
	// changeID. The recorded value is replaced when eq reports the new
	// value differs from the recorded one; otherwise the recorded
	// value is preserved across the call.
	MutableSideEffect(changeID string, fn func(ctx Context) any, eq func(a, b any) bool, ctx Context) ([]byte, error)

	// MetricsHandler returns the workflow-scoped metrics handler, or
	// nil when no provider is wired. workflow.GetMetricsHandler
	// upgrades a nil to a noop so user code never sees nil.
	MetricsHandler() MetricsHandler

	// UpsertSearchAttributes upserts attrs onto the workflow's
	// visibility record. Phase-1 forwards to the frontend; Phase-2
	// will record the upsert into history alongside other commands.
	UpsertSearchAttributes(attrs map[string]any) error
}

CoroutineEnv is the package-private seam between this public surface and the worker runtime. pkg/sdk/worker supplies a concrete implementation at workflow-execution start and passes it to NewContextFromEnv; user code never sees this type.

Every method runs on the workflow's single coroutine. A method that "blocks" (AwaitFuture, AwaitReceive, Sleep, Select) parks the coroutine until the event fires — during replay the same decisions must happen in the same order, which is the worker's problem, not this package's.

The interface is intentionally small: every primitive callers use is expressible as a combination of a few env calls. Adding a new public helper means composing existing env methods, not extending this interface — unless the helper needs a genuinely new runtime capability (e.g. workflow.Go in phase 2).

type Future

type Future interface {
	// Get blocks the workflow coroutine until the future settles,
	// then decodes the result into valPtr. valPtr may be nil, in
	// which case the result payload is dropped. Returns the
	// settlement error, if any.
	Get(ctx Context, valPtr any) error

	// IsReady reports whether the future has settled. Useful for
	// non-blocking peeks inside a Selector callback; most code
	// calls Get directly.
	IsReady() bool

	// ReadyCh returns a channel that is closed once the Future has
	// settled. It is used by the Selector fan-in so a blocking Select
	// can park on a single receive instead of polling. Stable across
	// calls: the same channel is returned every time.
	ReadyCh() <-chan struct{}
}

Future is the workflow-side handle for an asynchronous result — the completion of an activity, a timer, or a user-settled promise.

Future is exactly the Temporal shape so callers migrating from go.temporal.io/sdk/workflow change only the import.

f := workflow.ExecuteActivity(ctx, DoWork, req)
var resp Response
if err := f.Get(ctx, &resp); err != nil { ... }

func ExecuteActivity

func ExecuteActivity(ctx Context, activity any, args ...any) Future

ExecuteActivity dispatches an activity for execution and returns the Future that settles with its (result, error). activity may be a function value or a registered name (string). args are the activity's input parameters; they will be JSON-encoded for the wire.

The activity's attached ActivityOptions are read from ctx. If no options were attached, the SDK default is used (unlimited retry, no timeouts — configure something).

Callers MUST NOT spawn goroutines that call ExecuteActivity; in Phase 1 the workflow is a single coroutine and parallel dispatch is done by calling ExecuteActivity N times in a loop and then Get-ing the Futures (or selecting on them).

func ExecuteLocalActivity

func ExecuteLocalActivity(ctx Context, activity any, args ...any) Future

ExecuteLocalActivity dispatches an activity via the same wire as ExecuteActivity. Phase 1: local activities execute via the remote path. Same wire, same guarantees. True in-process fast path arrives with the event-sourced replay engine in Phase 2.

The options attached to ctx via WithLocalActivityOptions are translated to the ActivityOptions consumed by the remote path so the StartToCloseTimeout and RetryPolicy flow through unchanged.

func MutableSideEffect added in v1.40.0

func MutableSideEffect(ctx Context, changeID string, fn func(ctx Context) any, eq func(a, b any) bool) Future

MutableSideEffect runs fn whenever the workflow re-evaluates this changeID. It returns the previously-recorded value when eq reports the new value is equal to it; otherwise it records the new value.

Use MutableSideEffect for values that may evolve safely between replays (configuration tweakables, feature flags) where determinism only requires "same value seen by all dependents on the same step," not "same value across the whole run."

func NewTimer

func NewTimer(ctx Context, d time.Duration) Future

NewTimer schedules a durable timer that fires after d of workflow time and returns the Future that represents its completion. The Future's Get settles with nil error when the timer fires, or with temporal.Canceled if the ctx is canceled before d elapses.

Mirrors go.temporal.io/sdk/workflow.NewTimer.

func NewWallClockTimer

func NewWallClockTimer(d time.Duration, doneCh <-chan struct{}, errFn func() error) Future

NewWallClockTimer is the shared helper used by both StubEnv and workerEnv to produce a real, cancellable Future driven by a time.Timer. Exported so the worker package (a sibling) can reuse the same implementation without reaching into package internals.

  • d <= 0 settles the Future with (nil, nil) immediately.
  • Otherwise, a per-timer goroutine parks on the timer's channel alongside the supplied doneCh. If doneCh closes first, the Future settles with errFn() — typically the scope's cancel error.

Cancellation semantics: a closed doneCh always wins over a fired timer in the rare race where both are observable simultaneously.

func SideEffect added in v1.40.0

func SideEffect(ctx Context, fn func(ctx Context) any) Future

SideEffect runs fn exactly once per workflow execution and records its result into history. Replays return the recorded value without re- running fn. The returned Future settles synchronously.

Use SideEffect for nondeterministic computations whose result must be stable across replays (UUID generation, time.Now, etc.). Activities remain the right choice for I/O.

type Info

type Info struct {
	// WorkflowID is the caller-supplied business identifier.
	WorkflowID string

	// RunID is the engine-generated per-execution identifier.
	RunID string

	// WorkflowType is the registered name (typically the Go
	// function name) of the workflow being executed.
	WorkflowType string

	// TaskQueue is the queue the workflow is running on.
	TaskQueue string

	// Namespace groups workflow executions. Defaults to "default".
	Namespace string

	// Attempt is the 1-based retry attempt for this run.
	Attempt int32
}

Info is a stable snapshot of workflow identity. It mirrors the shape Temporal's WorkflowInfo carries so handlers that log it don't change at the call site after migration.

func GetInfo

func GetInfo(ctx Context) Info

GetInfo returns a snapshot of stable identity fields for the running workflow (ID, RunID, TaskQueue, …). Safe to log.

type LocalActivityOptions

type LocalActivityOptions struct {
	// ScheduleToCloseTimeout is the end-to-end budget including
	// retries. 0 = derive from StartToCloseTimeout * MaximumAttempts.
	ScheduleToCloseTimeout time.Duration

	// StartToCloseTimeout caps a single attempt. 0 = unlimited.
	StartToCloseTimeout time.Duration

	// RetryPolicy overrides the default retry policy. Nil = SDK
	// default (see temporal.DefaultRetryPolicy).
	RetryPolicy *temporal.RetryPolicy
}

LocalActivityOptions configures a single local-activity invocation. Mirrors go.temporal.io/sdk/workflow.LocalActivityOptions.

Phase-1 note: local activities execute via the remote path. Same wire, same guarantees. True in-process fast path arrives with the event-sourced replay engine in Phase 2. The options struct is preserved so caller code compiles unchanged and so the retry / timeout fields are propagated to the remote-path activity.

type MetricsCounter added in v1.40.0

type MetricsCounter interface {
	Inc(delta int64)
}

MetricsCounter is the additive metric handle.

type MetricsGauge added in v1.40.0

type MetricsGauge interface {
	Update(value float64)
}

MetricsGauge is the sampled metric handle.

type MetricsHandler added in v1.40.0

type MetricsHandler interface {
	// WithTags returns a derived handler that emits all metrics with
	// the supplied tags merged in. Tag values must be stable strings;
	// dynamic, per-call values still belong on individual metric calls.
	WithTags(tags map[string]string) MetricsHandler

	// Counter returns an additive metric. Counters never go down.
	Counter(name string) MetricsCounter

	// Gauge returns a sampled metric. The last value wins per tag set.
	Gauge(name string) MetricsGauge

	// Timer returns a timing metric. Record duration with Record.
	Timer(name string) MetricsTimer
}

MetricsHandler is the workflow-scoped metrics surface. It mirrors the shape system workers (scheduler, deletenamespace, workerdeployment) consume from upstream so call sites migrate by import-path swap only.

All methods are safe to call from inside a workflow function. A noop implementation is returned when no provider is wired so workflow code never has to nil-check.

func GetMetricsHandler added in v1.40.0

func GetMetricsHandler(ctx Context) MetricsHandler

GetMetricsHandler returns the workflow-scoped metrics handler. When no provider is wired (tests, stubs) the returned handler is a noop; callers can chain WithTags / Counter / Inc unconditionally.

type MetricsTimer added in v1.40.0

type MetricsTimer interface {
	Record(d time.Duration)
}

MetricsTimer is the timing metric handle.

type ReceiveChannel

type ReceiveChannel interface {
	// Receive blocks the workflow coroutine until a value is
	// available, decodes it into valPtr, and reports whether the
	// channel is still open via the ok return. If the channel is
	// closed and drained, ok is false and valPtr is untouched.
	Receive(ctx Context, valPtr any) (ok bool)

	// ReceiveAsync is the non-blocking form: it returns false
	// immediately if no value is buffered. Useful inside a
	// Selector callback where you only want to consume if ready.
	ReceiveAsync(valPtr any) (ok bool)

	// Name returns the channel's debug identifier. Empty for
	// unnamed user channels. Signal channels carry the signal name.
	Name() string
	// contains filtered or unexported methods
}

ReceiveChannel is the read-only view of a workflow channel. Both signal channels (GetSignalChannel) and user channels (NewChannel, NewBufferedChannel) satisfy it.

Mirrors go.temporal.io/sdk/workflow.ReceiveChannel exactly.

Sealed: the interface cannot be implemented outside this package. All ReceiveChannel values must originate from one of the package's constructors (NewChannel, NewBufferedChannel, NewNamedChannel, GetSignalChannel). Forks cannot ship divergent behind-the-scenes channel semantics that the selector / coroutine scheduler would quietly accept; any such type would fail to satisfy the sealed method.

func GetSignalChannel

func GetSignalChannel(ctx Context, name string) ReceiveChannel

GetSignalChannel returns the ReceiveChannel for the named signal. Two calls with the same name return handles that observe the same queue — signals are keyed by name, not handle identity. This matches Temporal's semantics and is what base/plugins/tasks relies on for claim/complete/fail/cancel fan-in.

Signals are durable: the worker persists them and re-delivers them on replay. Sending a signal is the client's job (pkg/sdk/client); this helper only exposes the workflow-side read.

type SelectCase

type SelectCase struct {
	Future  Future
	Channel ReceiveChannel
}

SelectCase is the internal representation of a case submitted to Selector.Select. Exactly one of Future / Channel is non-nil.

type Selector

type Selector interface {
	AddFuture(f Future, fn func(f Future)) Selector
	AddReceive(ch ReceiveChannel, fn func(ch ReceiveChannel, more bool)) Selector
	AddDefault(fn func()) Selector
	Select(ctx Context)
}

Selector is the multi-way choice primitive. Build one with NewSelector(ctx), attach cases with AddFuture / AddReceive / AddDefault, then block on Select(ctx). Exactly one case fires per Select call.

Mirrors go.temporal.io/sdk/workflow.Selector exactly.

func NewSelector

func NewSelector(ctx Context) Selector

NewSelector returns a Selector that blocks on Select until one of the attached cases fires. The ctx is captured only for the env handle; Select takes its own ctx so callers can pair a Selector with a cancel-scoped ctx for one iteration of a loop.

type Settleable

type Settleable interface {
	Future
	// Settle delivers a final (value, err). Subsequent Settle calls
	// are no-ops. value may be nil (e.g. for timers).
	Settle(value []byte, err error)
}

Settleable is the env-facing side of a Future. The worker owns a Settleable handle for each Future it hands back and completes it when the activity/timer finishes. This package exposes it so the worker and the in-memory StubEnv can both drive futures without reaching into unexported state.

func NewFuture

func NewFuture() Settleable

NewFuture constructs a settleable Future. It is intended for the worker / StubEnv to call; ordinary workflow code receives Futures from ExecuteActivity / NewTimer / a Selector callback.

type StubEnv

type StubEnv struct {
	// contains filtered or unexported fields
}

StubEnv is a single-goroutine, in-memory CoroutineEnv implementation sufficient for unit-testing workflow code without standing up a real worker. It IS NOT a substitute for the worker runtime; it lacks event-sourced replay, durable timers, and cross-process activity dispatch.

StubEnv is test-only. Production workflow dispatch uses workerEnv (pkg/sdk/worker/env.go). Do NOT depend on StubEnv from non-test code — its API surface is intentionally unstable and may add / remove deterministic-testing hooks over time.

Typical use:

env := workflow.NewStubEnv()
env.OnActivity("DoWork").Return(&Result{OK: true}, nil)
env.SignalAsync("claim", map[string]string{"agent_id": "alice"})
ctx := workflow.NewContextFromEnv(env)
out, err := MyWorkflow(ctx, input)

The stub processes activities synchronously: ExecuteActivity looks up the registered response by matching function name / string and settles the Future on the same goroutine before returning it.

Timers in the stub run against a real wall clock via workflow.Timer so that tests observe the same scheduling semantics as production (a Future that settles after d, cancelable via scope). Tests that want deterministic-time assertions use short durations (tens of milliseconds) or AdvanceClock in combination with the Now() helpers — AdvanceClock no longer force-fires timers.

func NewStubEnv

func NewStubEnv() *StubEnv

NewStubEnv returns an in-memory CoroutineEnv for tests. Clock starts at a stable epoch and does not advance unless the caller sets AutoAdvance or calls AdvanceClock.

func (*StubEnv) AdvanceClock

func (e *StubEnv) AdvanceClock(d time.Duration)

AdvanceClock manually advances workflow Now() by d. It does NOT fire pending timers: timers are wall-clock-based so their firing is driven by the real OS scheduler. Tests that want to observe timer firing should either use small durations or run them on a separate goroutine.

func (*StubEnv) Cancel

func (e *StubEnv) Cancel()

Cancel cancels the root scope. Equivalent to the workflow's top- level cancellation firing. All active Sleeps, Timers, and Selects return temporal.Canceled.

func (*StubEnv) CurrentScope

func (e *StubEnv) CurrentScope() any

func (*StubEnv) ExecuteActivity

func (e *StubEnv) ExecuteActivity(opts ActivityOptions, activity any, args []any) Future

func (*StubEnv) ExecuteChildWorkflow

func (e *StubEnv) ExecuteChildWorkflow(childWorkflow any, args []any) ChildWorkflowFuture

ExecuteChildWorkflow implements CoroutineEnv.

func (*StubEnv) GetSignalChannel

func (e *StubEnv) GetSignalChannel(name string) ReceiveChannel

func (*StubEnv) GetVersion added in v1.40.0

func (e *StubEnv) GetVersion(changeID string, minSupported, maxSupported Version) Version

GetVersion records max on first call per changeID, returns the recorded value clamped to [min, max] on later calls.

func (*StubEnv) Logger

func (e *StubEnv) Logger() log.Logger

func (*StubEnv) MetricsHandler added in v1.40.0

func (e *StubEnv) MetricsHandler() MetricsHandler

MetricsHandler returns the installed handler or nil.

func (*StubEnv) MutableSideEffect added in v1.40.0

func (e *StubEnv) MutableSideEffect(changeID string, fn func(ctx Context) any, eq func(a, b any) bool, ctx Context) ([]byte, error)

MutableSideEffect runs fn and records when eq reports a change.

func (*StubEnv) NewCancelScope

func (e *StubEnv) NewCancelScope() (any, CancelFunc)

func (*StubEnv) NewChannel

func (e *StubEnv) NewChannel(name string, buffered int) Channel

func (*StubEnv) NewTimer

func (e *StubEnv) NewTimer(d time.Duration) Future

NewTimer schedules a wall-clock timer using the real workflow.Timer helper. The returned Future settles with (nil, nil) after d, or with temporal.Canceled if the root scope cancels first.

func (*StubEnv) Now

func (e *StubEnv) Now() time.Time

func (*StubEnv) OnActivity

func (e *StubEnv) OnActivity(activity any) *stubActivityReg

OnActivity queues a response for the named activity. Pass a function value (reflect.TypeOf(fn).Name() is matched) or a literal string. Returns the env so multiple OnActivity.Return calls chain.

Example:

env.OnActivity("DoWork").Return(&Result{OK: true}, nil)
env.OnActivity(DoWork).Return(&Result{OK: false}, errors.New("boom"))

func (*StubEnv) OnChildWorkflow

func (e *StubEnv) OnChildWorkflow(childWorkflow any) *stubChildWorkflowReg

OnChildWorkflow queues a response for a named child workflow. The next ExecuteChildWorkflow invocation with a matching type consumes the queued entry (FIFO). When the queue is empty the stub settles the future with (nil, nil) — same fallback shape as ExecuteActivity.

func (*StubEnv) ScopeDone

func (e *StubEnv) ScopeDone(scope any) <-chan struct{}

func (*StubEnv) ScopeErr

func (e *StubEnv) ScopeErr(scope any) error

func (*StubEnv) Select

func (e *StubEnv) Select(cases []SelectCase) int

Select blocks until exactly one case is ready and returns its index. It uses the Selector fan-in (ReadyCh / RegisterReadyWaker) instead of a 1 ms polling spin: each case parks on the underlying Future's ReadyCh or the channel's waker, and the scope's done chan cancels the whole wait.

func (*StubEnv) SetAutoAdvance

func (e *StubEnv) SetAutoAdvance(d time.Duration)

SetAutoAdvance sets a per-Now increment. Setting to 1*time.Second means every Now() call jumps one second, which is enough to unblock simple Sleep-based tests without an explicit advance.

func (*StubEnv) SetInfo

func (e *StubEnv) SetInfo(i Info)

SetInfo overrides the WorkflowInfo surfaced by GetInfo. Useful for tests that assert on identity fields.

func (*StubEnv) SetLogger

func (e *StubEnv) SetLogger(l log.Logger)

SetLogger replaces the stub's logger. Defaults to log.Noop().

func (*StubEnv) SetMetricsHandler added in v1.40.0

func (e *StubEnv) SetMetricsHandler(h MetricsHandler)

SetMetricsHandler installs a workflow-scoped metrics handler the stub returns from MetricsHandler(). Pass nil to reset to the noop fallback.

func (*StubEnv) SideEffect added in v1.40.0

func (e *StubEnv) SideEffect(fn func(ctx Context) any, ctx Context) ([]byte, error)

SideEffect runs fn once per call ordinal and records its bytes.

func (*StubEnv) SignalAsync

func (e *StubEnv) SignalAsync(name string, val any)

SignalAsync enqueues a signal value for the named channel. The channel is created on demand if no workflow code has asked for it yet.

func (*StubEnv) Sleep

func (e *StubEnv) Sleep(d time.Duration) error

func (*StubEnv) UpsertSearchAttributes added in v1.40.0

func (e *StubEnv) UpsertSearchAttributes(attrs map[string]any) error

UpsertSearchAttributes records the upsert for tests to assert against.

func (*StubEnv) UpsertedSearchAttributes added in v1.40.0

func (e *StubEnv) UpsertedSearchAttributes() []map[string]any

UpsertedSearchAttributes returns a copy of the attributes the workflow upserted via UpsertSearchAttributes. Tests assert against it after running the workflow.

func (*StubEnv) WorkflowInfo

func (e *StubEnv) WorkflowInfo() Info

type Version added in v1.40.0

type Version int

Version is a deterministic-replay marker. GetVersion returns a stable integer for a given changeID across replays, allowing workflow authors to evolve workflow code without breaking in-flight executions.

Mirrors go.temporal.io/sdk/workflow.Version semantics so call sites migrate by changing the import path only.

const DefaultVersion Version = -1

DefaultVersion is the version returned by GetVersion the first time a changeID is encountered before any new version is introduced. New versions start at 0 and increase from there.

func GetVersion added in v1.40.0

func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version

GetVersion returns the version recorded for changeID in this workflow run, clamped to [minSupported, maxSupported]. The first call records maxSupported; all later calls (and replays) return the recorded value.

Workflow authors gate behavioural changes on the returned version:

v := workflow.GetVersion(ctx, "use-new-activity", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
    workflow.ExecuteActivity(ctx, oldActivity).Get(ctx, nil)
} else {
    workflow.ExecuteActivity(ctx, newActivity).Get(ctx, nil)
}

Phase-1 (no event-sourced replay yet): the recorded value is held in the per-run env. Crashes restart the workflow from the top, at which point the workflow re-records maxSupported — this is the documented Phase-1 contract: same input replays produce the same path because the same maxSupported is presented at the same call site.

type WorkflowExecution

type WorkflowExecution struct {
	WorkflowID string `json:"workflow_id"`
	RunID      string `json:"run_id,omitempty"`
}

WorkflowExecution is the child-side identifier pair. Declared here (rather than in client/) so the workflow package never imports the client package — keeping the dependency graph a strict line:

workflow → temporal
worker → {workflow, client, temporal, activity}
client → {temporal, converter}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL