Documentation
¶
Overview ¶
Package client is the Hanzo Tasks workflow client. Native ZAP transport, zero go.temporal.io/* and zero google.golang.org/grpc imports.
The wire is ZAP framed per schema/tasks.zap. Opcodes 0x0060-0x009F are owned by this package; 0x0050-0x005F stay reserved for the legacy one-shot/schedule surface owned by pkg/tasks.
v1 wire note: request and response payloads are transported as a single bytes field holding JSON. Native ZAP serde for every RPC in schema/tasks.zap replaces JSON in a follow-up without changing opcodes.
Index ¶
- Constants
- Variables
- func IdentityOf(c Client) string
- func NamespaceOf(c Client) string
- type ActivityTask
- type CheckHealthRequest
- type CheckHealthResponse
- type Client
- type CreateScheduleOptions
- type ListNamespacesResponse
- type ListSchedulesResponse
- type ListWorkflowsResponse
- type Namespace
- type NamespaceConfig
- type NamespaceInfo
- type NamespaceState
- type Options
- type PollActivityTaskRequest
- type PollWorkflowTaskRequest
- type RecordActivityTaskHeartbeatRequest
- type RegisterNamespaceRequest
- type RespondActivityTaskCompletedRequest
- type RespondActivityTaskFailedRequest
- type RespondWorkflowTaskCompletedRequest
- type RetryPolicyJSON
- type Schedule
- type ScheduleAction
- type ScheduleActivityRequest
- type ScheduleActivityResponse
- type ScheduleSpec
- type StartChildWorkflowRequest
- type StartChildWorkflowResponse
- type StartWorkflowOptions
- type TimeoutsJSON
- type Transport
- type WaitActivityResultRequest
- type WaitActivityResultResponse
- type WorkerTransport
- type WorkflowExecutionInfo
- type WorkflowRun
- type WorkflowRunGetOptions
- type WorkflowStatus
- type WorkflowTask
Constants ¶
const ( OpcodePollWorkflowTask uint16 = 0x00A0 OpcodePollActivityTask uint16 = 0x00A1 OpcodeRespondWorkflowTaskCompleted uint16 = 0x00A2 OpcodeRespondActivityTaskCompleted uint16 = 0x00A3 OpcodeRespondActivityTaskFailed uint16 = 0x00A4 OpcodeRecordActivityTaskHeartbeat uint16 = 0x00A5 // In-workflow activity scheduling (see schema/tasks.zap § // "Worker → server: in-workflow activity scheduling"). The // workerEnv uses these to dispatch activities without racing // against the worker's own pollActivityTask loop. OpcodeScheduleActivity uint16 = 0x006B OpcodeWaitActivityResult uint16 = 0x006C OpcodeStartChildWorkflow uint16 = 0x006D // OpcodeError is the generic error response. Any handler can // return this shape regardless of the request opcode. OpcodeError uint16 = 0x00FF )
Opcodes owned by the worker <-> matching surface. Opcodes 0x0070-0x007F were originally reserved here for worker poll/respond RPCs, but the canonical opcode allocation is:
0x0050-0x005F pkg/tasks one-shot/schedule (legacy) 0x0060-0x006F client workflow lifecycle (startWorkflow, signal, ...) 0x0070-0x007F client schedule ops (createSchedule, ...) 0x0080-0x008F client namespace ops (registerNamespace, ...) 0x0090-0x009F client health + meta 0x00A0-0x00AF worker poll / respond (see below)
Worker opcodes moved up to 0x00A0 to avoid collision with the client schedule ops at 0x0070-0x0073. Append-only across releases; never reuse a value.
const ( // Poll request fields (both workflow + activity). FieldNamespace = 0 FieldTaskQueueName = 8 FieldTaskQueueKind = 16 FieldIdentity = 24 FieldWorkerBuildID = 32 // WorkflowTask response fields. FieldTaskToken = 0 FieldWorkflowID = 8 FieldRunID = 16 FieldWorkflowTypeName = 24 FieldHistoryBytes = 32 FieldNextPageToken = 40 // ActivityTask response fields. FieldActivityID = 8 FieldActivityTypeName = 16 FieldInputBytes = 24 FieldScheduledTimeMs = 32 FieldStartToCloseTimeoutMs = 40 FieldHeartbeatTimeoutMs = 48 FieldActivityWorkflowID = 56 FieldActivityRunID = 64 // Respond request fields. FieldCommandsBytes = 8 // workflow task completed FieldResultBytes = 8 // activity task completed FieldFailureBytes = 8 // activity task failed FieldDetailsBytes = 8 // heartbeat // Response fields. FieldRespStatus = 0 FieldRespError = 4 FieldRespCancelRequested = 8 )
Wire field offsets for the poll/respond request and response objects. These stay in one place so the client and the server-side handler decode the same layout. Worker-only — the client envelope uses the fields declared in client.go (envelopeBody / envelopeStatus / ...).
Variables ¶
var ErrClosed = errors.New("hanzo/tasks/client: closed")
ErrClosed is returned by RPCs issued on a Client after Close.
var ErrNotImplementedLocally = errors.New("hanzo/tasks/client: RPC requires pkg/sdk/worker history fetch (not yet wired)")
ErrNotImplementedLocally marks RPCs that need a Worker-side history fetch RPC (currently missing from schema/tasks.zap). Callers see this when they ask a WorkflowRun for its result but the server has not exposed the history-fetch opcode yet.
Functions ¶
func IdentityOf ¶
IdentityOf returns the identity a Client was dialed with.
func NamespaceOf ¶
NamespaceOf returns the namespace a Client was dialed with. Workers use it as the default namespace for poll RPCs if the caller's worker.Options doesn't override.
Types ¶
type ActivityTask ¶
type ActivityTask struct {
TaskToken []byte
WorkflowID string
RunID string
ActivityID string
ActivityTypeName string
Input []byte
ScheduledTimeMs int64
StartToCloseTimeoutMs int64
HeartbeatTimeoutMs int64
}
ActivityTask mirrors schema/tasks.zap:ActivityTask. Input is the raw encoded Payloads; the worker's activity dispatcher decodes it into the registered function's arg types.
type CheckHealthRequest ¶
type CheckHealthRequest struct{}
CheckHealthRequest is reserved for future filters; empty in v1.
type CheckHealthResponse ¶
type CheckHealthResponse struct {
// Service is the opaque backend identifier ("hanzo-tasks").
Service string `json:"service"`
// Status is "ok" when the frontend is healthy, anything else on
// degradation.
Status string `json:"status"`
}
CheckHealthResponse reports the backend's liveness.
type Client ¶
type Client interface {
// ExecuteWorkflow starts a workflow (by type name when `workflow`
// is a string, or by reflected name when it is a Go function).
// Returns a handle whose Get blocks until the workflow terminates.
ExecuteWorkflow(
ctx context.Context,
opts StartWorkflowOptions,
workflow any,
args ...any,
) (WorkflowRun, error)
// SignalWorkflow posts a signal to a running execution.
SignalWorkflow(
ctx context.Context,
workflowID, runID, signalName string,
arg any,
) error
// SignalWithStartWorkflow atomically delivers a signal to the
// workflow identified by workflowID, starting it with the
// supplied opts+workflow+args if no matching execution is
// running. Opcode 0x0066.
SignalWithStartWorkflow(
ctx context.Context,
workflowID, signalName string,
signalArg any,
opts StartWorkflowOptions,
workflow any,
workflowArgs ...any,
) (WorkflowRun, error)
// GetWorkflow returns a handle to an already-started workflow.
// No RPC is issued; subsequent Get / GetWithOptions calls on the
// handle poll DescribeWorkflow to observe terminal state.
GetWorkflow(ctx context.Context, workflowID, runID string) WorkflowRun
// QueryWorkflow runs a registered query on a (possibly running)
// workflow execution and returns the query's encoded result.
// Opcode 0x0067. Queries are read-only — they do not mutate
// workflow history.
QueryWorkflow(
ctx context.Context,
workflowID, runID, queryType string,
args ...any,
) (converter.EncodedValue, error)
// CancelWorkflow asks the server to cancel an execution.
CancelWorkflow(ctx context.Context, workflowID, runID string) error
// TerminateWorkflow force-stops an execution with a reason.
TerminateWorkflow(ctx context.Context, workflowID, runID, reason string) error
// DescribeWorkflow returns the current info record.
DescribeWorkflow(ctx context.Context, workflowID, runID string) (*WorkflowExecutionInfo, error)
// ListWorkflows runs a visibility query.
ListWorkflows(ctx context.Context, query string, pageSize int32, nextPageToken []byte) (*ListWorkflowsResponse, error)
// RegisterNamespace creates a new namespace on the server.
RegisterNamespace(ctx context.Context, req *RegisterNamespaceRequest) error
// DescribeNamespace returns a namespace's info + config.
DescribeNamespace(ctx context.Context, name string) (*Namespace, error)
// ListNamespaces pages through registered namespaces.
ListNamespaces(ctx context.Context, pageSize int32, nextPageToken []byte) (*ListNamespacesResponse, error)
// CreateSchedule registers a recurring schedule.
CreateSchedule(ctx context.Context, opts CreateScheduleOptions) error
// ListSchedules pages through schedules.
ListSchedules(ctx context.Context, pageSize int32, nextPageToken []byte) (*ListSchedulesResponse, error)
// DeleteSchedule removes a schedule by ID.
DeleteSchedule(ctx context.Context, scheduleID string) error
// PauseSchedule toggles a schedule's paused flag.
PauseSchedule(ctx context.Context, scheduleID string, paused bool) error
// Health reports server reachability as two flat strings.
Health(ctx context.Context) (service, status string, err error)
// CheckHealth is the structured counterpart to Health, matching
// the upstream client shape so callers migrating from
// go.temporal.io/sdk/client compile unchanged. A nil req is
// treated as an empty request. Backed by the same opcode
// 0x0090 as Health.
CheckHealth(ctx context.Context, req *CheckHealthRequest) (*CheckHealthResponse, error)
// Close releases the underlying ZAP connection.
Close()
}
Client is the Hanzo Tasks workflow client. The surface mirrors the upstream client.Client one-for-one for the RPCs used by base/commerce/ta.
type CreateScheduleOptions ¶
type CreateScheduleOptions struct {
ID string
// Spec: use either Cron expressions OR a single Interval.
CronExpressions []string
Interval time.Duration
StartTime time.Time
EndTime time.Time
Jitter time.Duration
Timezone string
// Action: the workflow to start on each tick.
WorkflowID string
WorkflowType string
TaskQueue string
Input []any
Paused bool
}
CreateScheduleOptions configures CreateSchedule. Mirrors schema/tasks.zap Schedule / ScheduleSpec / ScheduleAction on the v1 JSON wire.
type ListNamespacesResponse ¶
type ListNamespacesResponse struct {
Namespaces []Namespace `json:"namespaces"`
NextPageToken []byte `json:"next_page_token,omitempty"`
}
ListNamespacesResponse is returned by ListNamespaces.
type ListSchedulesResponse ¶
type ListSchedulesResponse struct {
Schedules []Schedule `json:"schedules"`
NextPageToken []byte `json:"next_page_token,omitempty"`
}
ListSchedulesResponse is returned by ListSchedules.
type ListWorkflowsResponse ¶
type ListWorkflowsResponse struct {
Executions []WorkflowExecutionInfo `json:"executions"`
NextPageToken []byte `json:"next_page_token,omitempty"`
}
ListWorkflowsResponse is returned by ListWorkflows.
type Namespace ¶
type Namespace struct {
Info NamespaceInfo `json:"info"`
Config NamespaceConfig `json:"config"`
}
Namespace mirrors schema/tasks.zap Namespace for the v1 JSON wire.
type NamespaceConfig ¶
NamespaceConfig mirrors schema/tasks.zap NamespaceConfig. Retention is exposed as a Go Duration on the Go surface and marshalled to/from milliseconds on the wire (see retentionJSON below).
func (NamespaceConfig) MarshalJSON ¶
func (n NamespaceConfig) MarshalJSON() ([]byte, error)
MarshalJSON implements custom encoding for the wire shape.
func (*NamespaceConfig) UnmarshalJSON ¶
func (n *NamespaceConfig) UnmarshalJSON(b []byte) error
UnmarshalJSON implements custom decoding for the wire shape.
type NamespaceInfo ¶
type NamespaceInfo struct {
Name string `json:"name"`
State NamespaceState `json:"state"`
Description string `json:"description,omitempty"`
OwnerEmail string `json:"owner_email,omitempty"`
ID string `json:"id,omitempty"`
}
NamespaceInfo mirrors schema/tasks.zap NamespaceInfo.
type NamespaceState ¶
type NamespaceState int8
NamespaceState mirrors the Int8 state enum.
const ( NamespaceStateUnspecified NamespaceState = 0 NamespaceStateRegistered NamespaceState = 1 NamespaceStateDeprecated NamespaceState = 2 NamespaceStateDeleted NamespaceState = 3 )
type Options ¶
type Options struct {
// HostPort is the "host:port" address of the Hanzo Tasks frontend
// exposing the ZAP transport (default port 9652).
HostPort string
// Namespace scopes every RPC issued by the returned Client. Empty
// string is rewritten to "default" at Dial time.
Namespace string
// Identity is sent as the caller identity on long-poll / worker RPCs.
// Optional; defaults to "hanzo-tasks-sdk".
Identity string
// DialTimeout bounds the initial ZAP connect. Zero means "no bound".
DialTimeout time.Duration
// CallTimeout bounds a single request/response round trip. Zero
// means "no bound per call" — callers are expected to pass their
// own ctx deadline.
CallTimeout time.Duration
// Transport overrides the default ZAP transport. Primarily for
// testing — leave nil in production.
Transport Transport
}
Options configures Dial. Fields mirror the upstream Temporal Options surface so callers can swap import paths without changing arguments.
type PollActivityTaskRequest ¶
type PollActivityTaskRequest struct {
Namespace string
TaskQueueName string
TaskQueueKind int8
Identity string
}
PollActivityTaskRequest mirrors schema/tasks.zap:PollActivityTaskRequest.
type PollWorkflowTaskRequest ¶
type PollWorkflowTaskRequest struct {
Namespace string
TaskQueueName string
TaskQueueKind int8
Identity string
WorkerBuildID string
}
PollWorkflowTaskRequest mirrors schema/tasks.zap:PollWorkflowTaskRequest.
type RecordActivityTaskHeartbeatRequest ¶
RecordActivityTaskHeartbeatRequest carries opaque details bytes.
type RegisterNamespaceRequest ¶
type RegisterNamespaceRequest struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
OwnerEmail string `json:"owner_email,omitempty"`
Retention time.Duration `json:"-"`
}
RegisterNamespaceRequest is the argument to RegisterNamespace. Mirrors schema/tasks.zap: server receives the full Namespace.
type RespondActivityTaskCompletedRequest ¶
RespondActivityTaskCompletedRequest carries a serialised result payload.
type RespondActivityTaskFailedRequest ¶
RespondActivityTaskFailedRequest carries a serialised failure.
type RespondWorkflowTaskCompletedRequest ¶
RespondWorkflowTaskCompletedRequest carries a serialised command list.
type RetryPolicyJSON ¶
type RetryPolicyJSON struct {
InitialIntervalMs int64 `json:"initial_interval_ms,omitempty"`
BackoffCoefficient float64 `json:"backoff_coefficient,omitempty"`
MaximumIntervalMs int64 `json:"maximum_interval_ms,omitempty"`
MaximumAttempts int32 `json:"maximum_attempts,omitempty"`
NonRetryableErrorTypes []string `json:"non_retryable_error_types,omitempty"`
}
RetryPolicyJSON is the Go-side retry-policy shape passed to schedule RPCs. Milliseconds on the wire; zero means "SDK default".
type Schedule ¶
type Schedule struct {
ID string `json:"id"`
Spec ScheduleSpec `json:"spec"`
Action ScheduleAction `json:"action"`
Paused bool `json:"paused,omitempty"`
}
Schedule mirrors schema/tasks.zap Schedule for the v1 wire.
type ScheduleAction ¶
type ScheduleAction struct {
WorkflowID string `json:"workflow_id,omitempty"`
WorkflowType string `json:"workflow_type"`
TaskQueue string `json:"task_queue"`
Input []any `json:"input,omitempty"`
}
ScheduleAction mirrors schema/tasks.zap ScheduleAction.
type ScheduleActivityRequest ¶
type ScheduleActivityRequest struct {
Namespace string
WorkflowID string
RunID string
TaskQueue string
ActivityType string
Input []byte
StartToCloseMs int64
HeartbeatMs int64
RetryPolicy *RetryPolicyJSON
}
ScheduleActivityRequest mirrors schema/tasks.zap:ScheduleActivityRequest.
type ScheduleActivityResponse ¶
type ScheduleActivityResponse struct {
ActivityTaskID string
// TaskToken is the server-minted, HMAC-signed opaque token the
// worker must present on RespondActivityTaskCompleted/Failed. Empty
// on error, populated on success.
TaskToken []byte
}
ScheduleActivityResponse mirrors schema/tasks.zap:ScheduleActivityResponse.
type ScheduleSpec ¶
type ScheduleSpec struct {
Cron []string `json:"-"`
Interval time.Duration `json:"-"`
StartTime time.Time `json:"-"`
EndTime time.Time `json:"-"`
Jitter time.Duration `json:"-"`
Timezone string `json:"-"`
}
ScheduleSpec mirrors schema/tasks.zap ScheduleSpec on the wire. Durations are exposed as Go time.Duration on the Go surface and as milliseconds on the wire.
func (ScheduleSpec) MarshalJSON ¶
func (s ScheduleSpec) MarshalJSON() ([]byte, error)
MarshalJSON implements the ms-on-the-wire contract.
func (*ScheduleSpec) UnmarshalJSON ¶
func (s *ScheduleSpec) UnmarshalJSON(b []byte) error
UnmarshalJSON implements the ms-on-the-wire contract.
type StartChildWorkflowRequest ¶
type StartChildWorkflowRequest struct {
Namespace string
ParentID string
ParentRunID string
WorkflowID string
WorkflowType string
TaskQueue string
Input []any
RetryPolicy *RetryPolicyJSON
TimeoutsMs TimeoutsJSON
}
StartChildWorkflowRequest mirrors schema/tasks.zap StartChildWorkflowRequest.
type StartChildWorkflowResponse ¶
type StartChildWorkflowResponse struct {
RunID string
}
StartChildWorkflowResponse mirrors schema/tasks.zap.
type StartWorkflowOptions ¶
type StartWorkflowOptions struct {
// ID is the workflow ID. Empty string asks the server to generate
// one (returned as WorkflowRun.GetID()).
ID string
// TaskQueue is required; workers poll this queue for tasks.
TaskQueue string
// WorkflowExecutionTimeout is the total allowed wall-clock time
// across all runs (Continue-As-New chains).
WorkflowExecutionTimeout time.Duration
// WorkflowRunTimeout is the allowed wall-clock time for a single
// run.
WorkflowRunTimeout time.Duration
// WorkflowTaskTimeout bounds a single workflow-task attempt on a
// worker. Default on server is 10s.
WorkflowTaskTimeout time.Duration
// RetryPolicy overrides the workflow-level retry policy (distinct
// from per-activity retry).
RetryPolicy *temporal.RetryPolicy
// CronSchedule makes the workflow recurring. Empty string runs once.
CronSchedule string
// Memo is opaque metadata attached to the execution.
Memo map[string]any
// SearchAttributes are indexed by the visibility store.
SearchAttributes map[string]any
}
StartWorkflowOptions mirrors the upstream StartWorkflowOptions struct for ExecuteWorkflow. Only the fields currently mapped onto the wire (schema/tasks.zap StartWorkflowRequest + Timeouts + RetryPolicy) are meaningful; unlisted fields are ignored by the v1 wire.
type TimeoutsJSON ¶
type TimeoutsJSON struct {
WorkflowExecutionMs int64 `json:"workflow_execution_ms,omitempty"`
WorkflowRunMs int64 `json:"workflow_run_ms,omitempty"`
WorkflowTaskMs int64 `json:"workflow_task_ms,omitempty"`
}
TimeoutsJSON is the timeout triple in ms.
type Transport ¶
type Transport interface {
// Call issues a single request for opcode and returns the
// response frame.
Call(ctx context.Context, opcode uint16, body []byte) (respFrame []byte, err error)
// Close releases transport resources.
Close() error
}
Transport is the request/response abstraction used by Client for user-facing RPCs (workflow, schedule, namespace, health) and by WorkerTransport for worker poll/respond RPCs. Production code uses the default transport (luxfi/zap). Tests inject an in-memory fake.
Call is frame-in / frame-out:
- body is the caller's request payload bytes. For user-facing RPCs the body is a plain JSON document that the client wraps in the ZAP envelope declared by envelopeBody/envelopeStatus/ envelopeError before invoking the transport. For worker RPCs the body is a self-framed ZAP object emitted by the worker transport's encoders.
- opcode is stamped on the ZAP flags of the outgoing frame by the default transport. A caller that already builds a complete ZAP frame (like the worker) can pass the body bytes verbatim — the default transport wraps them when they are not yet framed and forwards them when they are.
- The returned []byte is a complete ZAP response frame. Higher levels (clientImpl.roundTrip for user-facing RPCs, the worker transport decoders for worker RPCs) parse the frame with zap.Parse and read the fields they care about.
func TransportOf ¶
TransportOf extracts the underlying Transport from a Client returned by Dial. pkg/sdk/worker calls this to layer a typed WorkerTransport over the generic Transport the Client already owns — workers share the ZAP connection and long-poll channel with the Client so users pay for one node per process.
Callers that injected a custom Transport via Options.Transport receive the same value they passed in.
Returns nil if c was not produced by Dial (future Client implementations outside this package simply opt out of worker sharing).
type WaitActivityResultRequest ¶
WaitActivityResultRequest mirrors schema/tasks.zap:WaitActivityResultRequest.
type WaitActivityResultResponse ¶
WaitActivityResultResponse mirrors schema/tasks.zap:WaitActivityResultResponse.
type WorkerTransport ¶
type WorkerTransport interface {
// Close tears down the underlying node. Safe to call more than
// once.
Close() error
// PollWorkflowTask issues a long-poll for the next workflow
// task on the given queue. The ctx carries the long-poll
// deadline; returning a nil task and nil error signals "idle,
// try again".
PollWorkflowTask(ctx context.Context, req PollWorkflowTaskRequest) (*WorkflowTask, error)
// PollActivityTask is the activity counterpart of
// PollWorkflowTask. Same idle semantics.
PollActivityTask(ctx context.Context, req PollActivityTaskRequest) (*ActivityTask, error)
// RespondWorkflowTaskCompleted uploads the worker's commands
// (encoded command list) for a workflow task.
RespondWorkflowTaskCompleted(ctx context.Context, req RespondWorkflowTaskCompletedRequest) error
// RespondActivityTaskCompleted reports a successful activity
// result.
RespondActivityTaskCompleted(ctx context.Context, req RespondActivityTaskCompletedRequest) error
// RespondActivityTaskFailed reports an activity failure.
RespondActivityTaskFailed(ctx context.Context, req RespondActivityTaskFailedRequest) error
// RecordActivityTaskHeartbeat signals liveness for a long-running
// activity. Returns cancelRequested=true if the server wants
// the activity to stop.
RecordActivityTaskHeartbeat(ctx context.Context, req RecordActivityTaskHeartbeatRequest) (bool, error)
// ScheduleActivity asks the frontend to mint an activity task for
// the given type + input and return a stable id (schema/tasks.zap
// opcode 0x006B). The workerEnv in pkg/sdk/worker uses this from
// inside a workflow to dispatch activities without racing the
// worker's own pollActivityTask loop.
ScheduleActivity(ctx context.Context, req ScheduleActivityRequest) (*ScheduleActivityResponse, error)
// WaitActivityResult long-polls for the result of an activity id
// previously returned by ScheduleActivity (opcode 0x006C). A
// response with ready=false means "still pending" — the caller
// re-issues. The ctx deadline bounds a single round-trip.
WaitActivityResult(ctx context.Context, req WaitActivityResultRequest) (*WaitActivityResultResponse, error)
// StartChildWorkflow asks the server to start a workflow whose
// parent is recorded for linkage (opcode 0x006D). Returns the
// run id; Phase-1 does not wait for child completion — the
// caller uses DescribeWorkflow or a follow-up
// WaitChildWorkflowResult when Phase-2 lands.
StartChildWorkflow(ctx context.Context, req StartChildWorkflowRequest) (*StartChildWorkflowResponse, error)
}
WorkerTransport is the wire abstraction the Worker package depends on. One method per logical RPC that the worker issues against the Hanzo Tasks frontend. Production implementations are backed by luxfi/zap; tests inject an in-memory fake satisfying this same interface.
Return values are pre-parsed native Go types — callers never own a buffer whose backing bytes belong to the transport layer.
func NewWorkerTransport ¶
func NewWorkerTransport(t Transport) WorkerTransport
NewWorkerTransport returns a typed wrapper over the generic Transport that issues the worker poll / respond RPCs defined in schema/tasks.zap.
The wire layout (object fields) follows the constants declared in transport.go. v1 encodes the body portions as JSON; native ZAP serde replaces JSON in a follow-up without changing opcodes.
type WorkflowExecutionInfo ¶
type WorkflowExecutionInfo struct {
WorkflowID string `json:"workflow_id"`
RunID string `json:"run_id"`
WorkflowType string `json:"workflow_type"`
StartTime time.Time `json:"start_time,omitempty"`
CloseTime time.Time `json:"close_time,omitempty"`
Status WorkflowStatus `json:"status"`
HistoryLength int64 `json:"history_length"`
TaskQueue string `json:"task_queue,omitempty"`
Memo map[string]any `json:"memo,omitempty"`
}
WorkflowExecutionInfo mirrors schema/tasks.zap WorkflowExecutionInfo for the v1 JSON wire. Exported so callers of DescribeWorkflow / ListWorkflows can inspect the result without pulling upstream types.
type WorkflowRun ¶
type WorkflowRun interface {
// Get blocks until the workflow terminates and decodes the
// result into valuePtr. When the workflow continues-as-new the
// default behaviour follows the chain to the final run.
Get(ctx context.Context, valuePtr any) error
// GetWithOptions is the option-parameterised form of Get. When
// opts.DisableFollowingRuns is true, Get stops at the current
// run rather than following a continue-as-new chain to the final
// run.
GetWithOptions(ctx context.Context, valuePtr any, opts WorkflowRunGetOptions) error
GetID() string
GetRunID() string
}
WorkflowRun is the handle returned by ExecuteWorkflow / SignalWithStartWorkflow / GetWorkflow. Get blocks until the workflow terminates and decodes the result into `valuePtr`.
type WorkflowRunGetOptions ¶
type WorkflowRunGetOptions struct {
// DisableFollowingRuns, when true, prevents Get from walking
// the continue-as-new chain. It returns when the current run
// terminates even if a successor run was started.
DisableFollowingRuns bool
}
WorkflowRunGetOptions tunes WorkflowRun.GetWithOptions. Mirrors the upstream shape so caller code compiles unchanged after the import swap.
type WorkflowStatus ¶
type WorkflowStatus int8
WorkflowStatus mirrors the `status` Int8 in schema/tasks.zap.
const ( WorkflowStatusUnspecified WorkflowStatus = 0 WorkflowStatusRunning WorkflowStatus = 1 WorkflowStatusCompleted WorkflowStatus = 2 WorkflowStatusFailed WorkflowStatus = 3 WorkflowStatusCanceled WorkflowStatus = 4 WorkflowStatusTerminated WorkflowStatus = 5 WorkflowStatusContinuedAsNew WorkflowStatus = 6 WorkflowStatusTimedOut WorkflowStatus = 7 )