client

package
v1.36.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package client is the Hanzo Tasks workflow client. Native ZAP transport, zero go.temporal.io/* and zero google.golang.org/grpc imports.

The wire is ZAP framed per schema/tasks.zap. Opcodes 0x0060-0x009F are owned by this package; 0x0050-0x005F stay reserved for the legacy one-shot/schedule surface owned by pkg/tasks.

v1 wire note: request and response payloads are transported as a single bytes field holding JSON. Native ZAP serde for every RPC in schema/tasks.zap replaces JSON in a follow-up without changing opcodes.

Index

Constants

View Source
const (
	OpcodePollWorkflowTask             uint16 = 0x00A0
	OpcodePollActivityTask             uint16 = 0x00A1
	OpcodeRespondWorkflowTaskCompleted uint16 = 0x00A2
	OpcodeRespondActivityTaskCompleted uint16 = 0x00A3
	OpcodeRespondActivityTaskFailed    uint16 = 0x00A4
	OpcodeRecordActivityTaskHeartbeat  uint16 = 0x00A5

	// In-workflow activity scheduling (see schema/tasks.zap §
	// "Worker → server: in-workflow activity scheduling"). The
	// workerEnv uses these to dispatch activities without racing
	// against the worker's own pollActivityTask loop.
	OpcodeScheduleActivity   uint16 = 0x006B
	OpcodeWaitActivityResult uint16 = 0x006C
	OpcodeStartChildWorkflow uint16 = 0x006D

	// OpcodeError is the generic error response. Any handler can
	// return this shape regardless of the request opcode.
	OpcodeError uint16 = 0x00FF
)

Opcodes owned by the worker <-> matching surface. Opcodes 0x0070-0x007F were originally reserved here for worker poll/respond RPCs, but the canonical opcode allocation is:

0x0050-0x005F  pkg/tasks one-shot/schedule (legacy)
0x0060-0x006F  client workflow lifecycle (startWorkflow, signal, ...)
0x0070-0x007F  client schedule ops (createSchedule, ...)
0x0080-0x008F  client namespace ops (registerNamespace, ...)
0x0090-0x009F  client health + meta
0x00A0-0x00AF  worker poll / respond (see below)

Worker opcodes moved up to 0x00A0 to avoid collision with the client schedule ops at 0x0070-0x0073. Append-only across releases; never reuse a value.

View Source
const (
	// Poll request fields (both workflow + activity).
	FieldNamespace     = 0
	FieldTaskQueueName = 8
	FieldTaskQueueKind = 16
	FieldIdentity      = 24
	FieldWorkerBuildID = 32

	// WorkflowTask response fields.
	FieldTaskToken        = 0
	FieldWorkflowID       = 8
	FieldRunID            = 16
	FieldWorkflowTypeName = 24
	FieldHistoryBytes     = 32
	FieldNextPageToken    = 40

	// ActivityTask response fields.
	FieldActivityID            = 8
	FieldActivityTypeName      = 16
	FieldInputBytes            = 24
	FieldScheduledTimeMs       = 32
	FieldStartToCloseTimeoutMs = 40
	FieldHeartbeatTimeoutMs    = 48
	FieldActivityWorkflowID    = 56
	FieldActivityRunID         = 64

	// Respond request fields.
	FieldCommandsBytes = 8 // workflow task completed
	FieldResultBytes   = 8 // activity task completed
	FieldFailureBytes  = 8 // activity task failed
	FieldDetailsBytes  = 8 // heartbeat

	// Response fields.
	FieldRespStatus          = 0
	FieldRespError           = 4
	FieldRespCancelRequested = 8
)

Wire field offsets for the poll/respond request and response objects. These stay in one place so the client and the server-side handler decode the same layout. Worker-only — the client envelope uses the fields declared in client.go (envelopeBody / envelopeStatus / ...).

Variables

View Source
var ErrClosed = errors.New("hanzo/tasks/client: closed")

ErrClosed is returned by RPCs issued on a Client after Close.

View Source
var ErrNotImplementedLocally = errors.New("hanzo/tasks/client: RPC requires pkg/sdk/worker history fetch (not yet wired)")

ErrNotImplementedLocally marks RPCs that need a Worker-side history fetch RPC (currently missing from schema/tasks.zap). Callers see this when they ask a WorkflowRun for its result but the server has not exposed the history-fetch opcode yet.

Functions

func IdentityOf

func IdentityOf(c Client) string

IdentityOf returns the identity a Client was dialed with.

func NamespaceOf

func NamespaceOf(c Client) string

NamespaceOf returns the namespace a Client was dialed with. Workers use it as the default namespace for poll RPCs if the caller's worker.Options doesn't override.

Types

type ActivityTask

type ActivityTask struct {
	TaskToken             []byte
	WorkflowID            string
	RunID                 string
	ActivityID            string
	ActivityTypeName      string
	Input                 []byte
	ScheduledTimeMs       int64
	StartToCloseTimeoutMs int64
	HeartbeatTimeoutMs    int64
}

ActivityTask mirrors schema/tasks.zap:ActivityTask. Input is the raw encoded Payloads; the worker's activity dispatcher decodes it into the registered function's arg types.

type CheckHealthRequest

type CheckHealthRequest struct{}

CheckHealthRequest is reserved for future filters; empty in v1.

type CheckHealthResponse

type CheckHealthResponse struct {
	// Service is the opaque backend identifier ("hanzo-tasks").
	Service string `json:"service"`
	// Status is "ok" when the frontend is healthy, anything else on
	// degradation.
	Status string `json:"status"`
}

CheckHealthResponse reports the backend's liveness.

type Client

type Client interface {
	// ExecuteWorkflow starts a workflow (by type name when `workflow`
	// is a string, or by reflected name when it is a Go function).
	// Returns a handle whose Get blocks until the workflow terminates.
	ExecuteWorkflow(
		ctx context.Context,
		opts StartWorkflowOptions,
		workflow any,
		args ...any,
	) (WorkflowRun, error)

	// SignalWorkflow posts a signal to a running execution.
	SignalWorkflow(
		ctx context.Context,
		workflowID, runID, signalName string,
		arg any,
	) error

	// SignalWithStartWorkflow atomically delivers a signal to the
	// workflow identified by workflowID, starting it with the
	// supplied opts+workflow+args if no matching execution is
	// running. Opcode 0x0066.
	SignalWithStartWorkflow(
		ctx context.Context,
		workflowID, signalName string,
		signalArg any,
		opts StartWorkflowOptions,
		workflow any,
		workflowArgs ...any,
	) (WorkflowRun, error)

	// GetWorkflow returns a handle to an already-started workflow.
	// No RPC is issued; subsequent Get / GetWithOptions calls on the
	// handle poll DescribeWorkflow to observe terminal state.
	GetWorkflow(ctx context.Context, workflowID, runID string) WorkflowRun

	// QueryWorkflow runs a registered query on a (possibly running)
	// workflow execution and returns the query's encoded result.
	// Opcode 0x0067. Queries are read-only — they do not mutate
	// workflow history.
	QueryWorkflow(
		ctx context.Context,
		workflowID, runID, queryType string,
		args ...any,
	) (converter.EncodedValue, error)

	// CancelWorkflow asks the server to cancel an execution.
	CancelWorkflow(ctx context.Context, workflowID, runID string) error

	// TerminateWorkflow force-stops an execution with a reason.
	TerminateWorkflow(ctx context.Context, workflowID, runID, reason string) error

	// DescribeWorkflow returns the current info record.
	DescribeWorkflow(ctx context.Context, workflowID, runID string) (*WorkflowExecutionInfo, error)

	// ListWorkflows runs a visibility query.
	ListWorkflows(ctx context.Context, query string, pageSize int32, nextPageToken []byte) (*ListWorkflowsResponse, error)

	// RegisterNamespace creates a new namespace on the server.
	RegisterNamespace(ctx context.Context, req *RegisterNamespaceRequest) error
	// DescribeNamespace returns a namespace's info + config.
	DescribeNamespace(ctx context.Context, name string) (*Namespace, error)
	// ListNamespaces pages through registered namespaces.
	ListNamespaces(ctx context.Context, pageSize int32, nextPageToken []byte) (*ListNamespacesResponse, error)

	// CreateSchedule registers a recurring schedule.
	CreateSchedule(ctx context.Context, opts CreateScheduleOptions) error
	// ListSchedules pages through schedules.
	ListSchedules(ctx context.Context, pageSize int32, nextPageToken []byte) (*ListSchedulesResponse, error)
	// DeleteSchedule removes a schedule by ID.
	DeleteSchedule(ctx context.Context, scheduleID string) error
	// PauseSchedule toggles a schedule's paused flag.
	PauseSchedule(ctx context.Context, scheduleID string, paused bool) error

	// Health reports server reachability as two flat strings.
	Health(ctx context.Context) (service, status string, err error)

	// CheckHealth is the structured counterpart to Health, matching
	// the upstream client shape so callers migrating from
	// go.temporal.io/sdk/client compile unchanged. A nil req is
	// treated as an empty request. Backed by the same opcode
	// 0x0090 as Health.
	CheckHealth(ctx context.Context, req *CheckHealthRequest) (*CheckHealthResponse, error)

	// Close releases the underlying ZAP connection.
	Close()
}

Client is the Hanzo Tasks workflow client. The surface mirrors the upstream client.Client one-for-one for the RPCs used by base/commerce/ta.

func Dial

func Dial(opts Options) (Client, error)

Dial connects to a Hanzo Tasks frontend. Callers must Close() when done.

type CreateScheduleOptions

type CreateScheduleOptions struct {
	ID string

	// Spec: use either Cron expressions OR a single Interval.
	CronExpressions []string
	Interval        time.Duration
	StartTime       time.Time
	EndTime         time.Time
	Jitter          time.Duration
	Timezone        string

	// Action: the workflow to start on each tick.
	WorkflowID   string
	WorkflowType string
	TaskQueue    string
	Input        []any

	Paused bool
}

CreateScheduleOptions configures CreateSchedule. Mirrors schema/tasks.zap Schedule / ScheduleSpec / ScheduleAction on the v1 JSON wire.

type ListNamespacesResponse

type ListNamespacesResponse struct {
	Namespaces    []Namespace `json:"namespaces"`
	NextPageToken []byte      `json:"next_page_token,omitempty"`
}

ListNamespacesResponse is returned by ListNamespaces.

type ListSchedulesResponse

type ListSchedulesResponse struct {
	Schedules     []Schedule `json:"schedules"`
	NextPageToken []byte     `json:"next_page_token,omitempty"`
}

ListSchedulesResponse is returned by ListSchedules.

type ListWorkflowsResponse

type ListWorkflowsResponse struct {
	Executions    []WorkflowExecutionInfo `json:"executions"`
	NextPageToken []byte                  `json:"next_page_token,omitempty"`
}

ListWorkflowsResponse is returned by ListWorkflows.

type Namespace

type Namespace struct {
	Info   NamespaceInfo   `json:"info"`
	Config NamespaceConfig `json:"config"`
}

Namespace mirrors schema/tasks.zap Namespace for the v1 JSON wire.

type NamespaceConfig

type NamespaceConfig struct {
	Retention time.Duration `json:"-"`
}

NamespaceConfig mirrors schema/tasks.zap NamespaceConfig. Retention is exposed as a Go Duration on the Go surface and marshalled to/from milliseconds on the wire (see retentionJSON below).

func (NamespaceConfig) MarshalJSON

func (n NamespaceConfig) MarshalJSON() ([]byte, error)

MarshalJSON implements custom encoding for the wire shape.

func (*NamespaceConfig) UnmarshalJSON

func (n *NamespaceConfig) UnmarshalJSON(b []byte) error

UnmarshalJSON implements custom decoding for the wire shape.

type NamespaceInfo

type NamespaceInfo struct {
	Name        string         `json:"name"`
	State       NamespaceState `json:"state"`
	Description string         `json:"description,omitempty"`
	OwnerEmail  string         `json:"owner_email,omitempty"`
	ID          string         `json:"id,omitempty"`
}

NamespaceInfo mirrors schema/tasks.zap NamespaceInfo.

type NamespaceState

type NamespaceState int8

NamespaceState mirrors the Int8 state enum.

const (
	NamespaceStateUnspecified NamespaceState = 0
	NamespaceStateRegistered  NamespaceState = 1
	NamespaceStateDeprecated  NamespaceState = 2
	NamespaceStateDeleted     NamespaceState = 3
)

type Options

type Options struct {
	// HostPort is the "host:port" address of the Hanzo Tasks frontend
	// exposing the ZAP transport (default port 9652).
	HostPort string

	// Namespace scopes every RPC issued by the returned Client. Empty
	// string is rewritten to "default" at Dial time.
	Namespace string

	// Identity is sent as the caller identity on long-poll / worker RPCs.
	// Optional; defaults to "hanzo-tasks-sdk".
	Identity string

	// DialTimeout bounds the initial ZAP connect. Zero means "no bound".
	DialTimeout time.Duration

	// CallTimeout bounds a single request/response round trip. Zero
	// means "no bound per call" — callers are expected to pass their
	// own ctx deadline.
	CallTimeout time.Duration

	// Transport overrides the default ZAP transport. Primarily for
	// testing — leave nil in production.
	Transport Transport
}

Options configures Dial. Fields mirror the upstream Temporal Options surface so callers can swap import paths without changing arguments.

type PollActivityTaskRequest

type PollActivityTaskRequest struct {
	Namespace     string
	TaskQueueName string
	TaskQueueKind int8
	Identity      string
}

PollActivityTaskRequest mirrors schema/tasks.zap:PollActivityTaskRequest.

type PollWorkflowTaskRequest

type PollWorkflowTaskRequest struct {
	Namespace     string
	TaskQueueName string
	TaskQueueKind int8
	Identity      string
	WorkerBuildID string
}

PollWorkflowTaskRequest mirrors schema/tasks.zap:PollWorkflowTaskRequest.

type RecordActivityTaskHeartbeatRequest

type RecordActivityTaskHeartbeatRequest struct {
	TaskToken []byte
	Details   []byte
}

RecordActivityTaskHeartbeatRequest carries opaque details bytes.

type RegisterNamespaceRequest

type RegisterNamespaceRequest struct {
	Name        string        `json:"name"`
	Description string        `json:"description,omitempty"`
	OwnerEmail  string        `json:"owner_email,omitempty"`
	Retention   time.Duration `json:"-"`
}

RegisterNamespaceRequest is the argument to RegisterNamespace. Mirrors schema/tasks.zap: server receives the full Namespace.

type RespondActivityTaskCompletedRequest

type RespondActivityTaskCompletedRequest struct {
	TaskToken []byte
	Result    []byte
}

RespondActivityTaskCompletedRequest carries a serialised result payload.

type RespondActivityTaskFailedRequest

type RespondActivityTaskFailedRequest struct {
	TaskToken []byte
	Failure   []byte
}

RespondActivityTaskFailedRequest carries a serialised failure.

type RespondWorkflowTaskCompletedRequest

type RespondWorkflowTaskCompletedRequest struct {
	TaskToken []byte
	Commands  []byte
}

RespondWorkflowTaskCompletedRequest carries a serialised command list.

type RetryPolicyJSON

type RetryPolicyJSON struct {
	InitialIntervalMs      int64    `json:"initial_interval_ms,omitempty"`
	BackoffCoefficient     float64  `json:"backoff_coefficient,omitempty"`
	MaximumIntervalMs      int64    `json:"maximum_interval_ms,omitempty"`
	MaximumAttempts        int32    `json:"maximum_attempts,omitempty"`
	NonRetryableErrorTypes []string `json:"non_retryable_error_types,omitempty"`
}

RetryPolicyJSON is the Go-side retry-policy shape passed to schedule RPCs. Milliseconds on the wire; zero means "SDK default".

type Schedule

type Schedule struct {
	ID     string         `json:"id"`
	Spec   ScheduleSpec   `json:"spec"`
	Action ScheduleAction `json:"action"`
	Paused bool           `json:"paused,omitempty"`
}

Schedule mirrors schema/tasks.zap Schedule for the v1 wire.

type ScheduleAction

type ScheduleAction struct {
	WorkflowID   string `json:"workflow_id,omitempty"`
	WorkflowType string `json:"workflow_type"`
	TaskQueue    string `json:"task_queue"`
	Input        []any  `json:"input,omitempty"`
}

ScheduleAction mirrors schema/tasks.zap ScheduleAction.

type ScheduleActivityRequest

type ScheduleActivityRequest struct {
	Namespace      string
	WorkflowID     string
	RunID          string
	TaskQueue      string
	ActivityType   string
	Input          []byte
	StartToCloseMs int64
	HeartbeatMs    int64
	RetryPolicy    *RetryPolicyJSON
}

ScheduleActivityRequest mirrors schema/tasks.zap:ScheduleActivityRequest.

type ScheduleActivityResponse

type ScheduleActivityResponse struct {
	ActivityTaskID string
	// TaskToken is the server-minted, HMAC-signed opaque token the
	// worker must present on RespondActivityTaskCompleted/Failed. Empty
	// on error, populated on success.
	TaskToken []byte
}

ScheduleActivityResponse mirrors schema/tasks.zap:ScheduleActivityResponse.

type ScheduleSpec

type ScheduleSpec struct {
	Cron      []string      `json:"-"`
	Interval  time.Duration `json:"-"`
	StartTime time.Time     `json:"-"`
	EndTime   time.Time     `json:"-"`
	Jitter    time.Duration `json:"-"`
	Timezone  string        `json:"-"`
}

ScheduleSpec mirrors schema/tasks.zap ScheduleSpec on the wire. Durations are exposed as Go time.Duration on the Go surface and as milliseconds on the wire.

func (ScheduleSpec) MarshalJSON

func (s ScheduleSpec) MarshalJSON() ([]byte, error)

MarshalJSON implements the ms-on-the-wire contract.

func (*ScheduleSpec) UnmarshalJSON

func (s *ScheduleSpec) UnmarshalJSON(b []byte) error

UnmarshalJSON implements the ms-on-the-wire contract.

type StartChildWorkflowRequest

type StartChildWorkflowRequest struct {
	Namespace    string
	ParentID     string
	ParentRunID  string
	WorkflowID   string
	WorkflowType string
	TaskQueue    string
	Input        []any
	RetryPolicy  *RetryPolicyJSON
	TimeoutsMs   TimeoutsJSON
}

StartChildWorkflowRequest mirrors schema/tasks.zap StartChildWorkflowRequest.

type StartChildWorkflowResponse

type StartChildWorkflowResponse struct {
	RunID string
}

StartChildWorkflowResponse mirrors schema/tasks.zap.

type StartWorkflowOptions

type StartWorkflowOptions struct {
	// ID is the workflow ID. Empty string asks the server to generate
	// one (returned as WorkflowRun.GetID()).
	ID string

	// TaskQueue is required; workers poll this queue for tasks.
	TaskQueue string

	// WorkflowExecutionTimeout is the total allowed wall-clock time
	// across all runs (Continue-As-New chains).
	WorkflowExecutionTimeout time.Duration

	// WorkflowRunTimeout is the allowed wall-clock time for a single
	// run.
	WorkflowRunTimeout time.Duration

	// WorkflowTaskTimeout bounds a single workflow-task attempt on a
	// worker. Default on server is 10s.
	WorkflowTaskTimeout time.Duration

	// RetryPolicy overrides the workflow-level retry policy (distinct
	// from per-activity retry).
	RetryPolicy *temporal.RetryPolicy

	// CronSchedule makes the workflow recurring. Empty string runs once.
	CronSchedule string

	// Memo is opaque metadata attached to the execution.
	Memo map[string]any

	// SearchAttributes are indexed by the visibility store.
	SearchAttributes map[string]any
}

StartWorkflowOptions mirrors the upstream StartWorkflowOptions struct for ExecuteWorkflow. Only the fields currently mapped onto the wire (schema/tasks.zap StartWorkflowRequest + Timeouts + RetryPolicy) are meaningful; unlisted fields are ignored by the v1 wire.

type TimeoutsJSON

type TimeoutsJSON struct {
	WorkflowExecutionMs int64 `json:"workflow_execution_ms,omitempty"`
	WorkflowRunMs       int64 `json:"workflow_run_ms,omitempty"`
	WorkflowTaskMs      int64 `json:"workflow_task_ms,omitempty"`
}

TimeoutsJSON is the timeout triple in ms.

type Transport

type Transport interface {
	// Call issues a single request for opcode and returns the
	// response frame.
	Call(ctx context.Context, opcode uint16, body []byte) (respFrame []byte, err error)

	// Close releases transport resources.
	Close() error
}

Transport is the request/response abstraction used by Client for user-facing RPCs (workflow, schedule, namespace, health) and by WorkerTransport for worker poll/respond RPCs. Production code uses the default transport (luxfi/zap). Tests inject an in-memory fake.

Call is frame-in / frame-out:

  • body is the caller's request payload bytes. For user-facing RPCs the body is a plain JSON document that the client wraps in the ZAP envelope declared by envelopeBody/envelopeStatus/ envelopeError before invoking the transport. For worker RPCs the body is a self-framed ZAP object emitted by the worker transport's encoders.
  • opcode is stamped on the ZAP flags of the outgoing frame by the default transport. A caller that already builds a complete ZAP frame (like the worker) can pass the body bytes verbatim — the default transport wraps them when they are not yet framed and forwards them when they are.
  • The returned []byte is a complete ZAP response frame. Higher levels (clientImpl.roundTrip for user-facing RPCs, the worker transport decoders for worker RPCs) parse the frame with zap.Parse and read the fields they care about.

func TransportOf

func TransportOf(c Client) Transport

TransportOf extracts the underlying Transport from a Client returned by Dial. pkg/sdk/worker calls this to layer a typed WorkerTransport over the generic Transport the Client already owns — workers share the ZAP connection and long-poll channel with the Client so users pay for one node per process.

Callers that injected a custom Transport via Options.Transport receive the same value they passed in.

Returns nil if c was not produced by Dial (future Client implementations outside this package simply opt out of worker sharing).

type WaitActivityResultRequest

type WaitActivityResultRequest struct {
	ActivityTaskID string
	WaitMs         int64
}

WaitActivityResultRequest mirrors schema/tasks.zap:WaitActivityResultRequest.

type WaitActivityResultResponse

type WaitActivityResultResponse struct {
	Ready   bool
	Result  []byte
	Failure []byte
}

WaitActivityResultResponse mirrors schema/tasks.zap:WaitActivityResultResponse.

type WorkerTransport

type WorkerTransport interface {
	// Close tears down the underlying node. Safe to call more than
	// once.
	Close() error

	// PollWorkflowTask issues a long-poll for the next workflow
	// task on the given queue. The ctx carries the long-poll
	// deadline; returning a nil task and nil error signals "idle,
	// try again".
	PollWorkflowTask(ctx context.Context, req PollWorkflowTaskRequest) (*WorkflowTask, error)

	// PollActivityTask is the activity counterpart of
	// PollWorkflowTask. Same idle semantics.
	PollActivityTask(ctx context.Context, req PollActivityTaskRequest) (*ActivityTask, error)

	// RespondWorkflowTaskCompleted uploads the worker's commands
	// (encoded command list) for a workflow task.
	RespondWorkflowTaskCompleted(ctx context.Context, req RespondWorkflowTaskCompletedRequest) error

	// RespondActivityTaskCompleted reports a successful activity
	// result.
	RespondActivityTaskCompleted(ctx context.Context, req RespondActivityTaskCompletedRequest) error

	// RespondActivityTaskFailed reports an activity failure.
	RespondActivityTaskFailed(ctx context.Context, req RespondActivityTaskFailedRequest) error

	// RecordActivityTaskHeartbeat signals liveness for a long-running
	// activity. Returns cancelRequested=true if the server wants
	// the activity to stop.
	RecordActivityTaskHeartbeat(ctx context.Context, req RecordActivityTaskHeartbeatRequest) (bool, error)

	// ScheduleActivity asks the frontend to mint an activity task for
	// the given type + input and return a stable id (schema/tasks.zap
	// opcode 0x006B). The workerEnv in pkg/sdk/worker uses this from
	// inside a workflow to dispatch activities without racing the
	// worker's own pollActivityTask loop.
	ScheduleActivity(ctx context.Context, req ScheduleActivityRequest) (*ScheduleActivityResponse, error)

	// WaitActivityResult long-polls for the result of an activity id
	// previously returned by ScheduleActivity (opcode 0x006C). A
	// response with ready=false means "still pending" — the caller
	// re-issues. The ctx deadline bounds a single round-trip.
	WaitActivityResult(ctx context.Context, req WaitActivityResultRequest) (*WaitActivityResultResponse, error)

	// StartChildWorkflow asks the server to start a workflow whose
	// parent is recorded for linkage (opcode 0x006D). Returns the
	// run id; Phase-1 does not wait for child completion — the
	// caller uses DescribeWorkflow or a follow-up
	// WaitChildWorkflowResult when Phase-2 lands.
	StartChildWorkflow(ctx context.Context, req StartChildWorkflowRequest) (*StartChildWorkflowResponse, error)
}

WorkerTransport is the wire abstraction the Worker package depends on. One method per logical RPC that the worker issues against the Hanzo Tasks frontend. Production implementations are backed by luxfi/zap; tests inject an in-memory fake satisfying this same interface.

Return values are pre-parsed native Go types — callers never own a buffer whose backing bytes belong to the transport layer.

func NewWorkerTransport

func NewWorkerTransport(t Transport) WorkerTransport

NewWorkerTransport returns a typed wrapper over the generic Transport that issues the worker poll / respond RPCs defined in schema/tasks.zap.

The wire layout (object fields) follows the constants declared in transport.go. v1 encodes the body portions as JSON; native ZAP serde replaces JSON in a follow-up without changing opcodes.

type WorkflowExecutionInfo

type WorkflowExecutionInfo struct {
	WorkflowID    string         `json:"workflow_id"`
	RunID         string         `json:"run_id"`
	WorkflowType  string         `json:"workflow_type"`
	StartTime     time.Time      `json:"start_time,omitempty"`
	CloseTime     time.Time      `json:"close_time,omitempty"`
	Status        WorkflowStatus `json:"status"`
	HistoryLength int64          `json:"history_length"`
	TaskQueue     string         `json:"task_queue,omitempty"`
	Memo          map[string]any `json:"memo,omitempty"`
}

WorkflowExecutionInfo mirrors schema/tasks.zap WorkflowExecutionInfo for the v1 JSON wire. Exported so callers of DescribeWorkflow / ListWorkflows can inspect the result without pulling upstream types.

type WorkflowRun

type WorkflowRun interface {
	// Get blocks until the workflow terminates and decodes the
	// result into valuePtr. When the workflow continues-as-new the
	// default behaviour follows the chain to the final run.
	Get(ctx context.Context, valuePtr any) error

	// GetWithOptions is the option-parameterised form of Get. When
	// opts.DisableFollowingRuns is true, Get stops at the current
	// run rather than following a continue-as-new chain to the final
	// run.
	GetWithOptions(ctx context.Context, valuePtr any, opts WorkflowRunGetOptions) error

	GetID() string
	GetRunID() string
}

WorkflowRun is the handle returned by ExecuteWorkflow / SignalWithStartWorkflow / GetWorkflow. Get blocks until the workflow terminates and decodes the result into `valuePtr`.

type WorkflowRunGetOptions

type WorkflowRunGetOptions struct {
	// DisableFollowingRuns, when true, prevents Get from walking
	// the continue-as-new chain. It returns when the current run
	// terminates even if a successor run was started.
	DisableFollowingRuns bool
}

WorkflowRunGetOptions tunes WorkflowRun.GetWithOptions. Mirrors the upstream shape so caller code compiles unchanged after the import swap.

type WorkflowStatus

type WorkflowStatus int8

WorkflowStatus mirrors the `status` Int8 in schema/tasks.zap.

const (
	WorkflowStatusUnspecified    WorkflowStatus = 0
	WorkflowStatusRunning        WorkflowStatus = 1
	WorkflowStatusCompleted      WorkflowStatus = 2
	WorkflowStatusFailed         WorkflowStatus = 3
	WorkflowStatusCanceled       WorkflowStatus = 4
	WorkflowStatusTerminated     WorkflowStatus = 5
	WorkflowStatusContinuedAsNew WorkflowStatus = 6
	WorkflowStatusTimedOut       WorkflowStatus = 7
)

type WorkflowTask

type WorkflowTask struct {
	TaskToken        []byte
	WorkflowID       string
	RunID            string
	WorkflowTypeName string
	History          []byte
	NextPageToken    []byte
}

WorkflowTask mirrors schema/tasks.zap:WorkflowTask.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL