job

package
v2.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package job provides a type-focused abstraction for one registered Temporal workflow. A Definition holds the metadata, lifecycle hooks, and per-job operations for a single workflow: register on a worker, execute by name, attach a schedule, describe runs, control lifecycle.

The package coexists with pkg/temporal's existing managers (WorkflowManager, ScheduleManager). Use a Definition when you have a typed handle to "your" workflow; use the namespace-wide managers when you need to inspect workflows you didn't register yourself.

Typical use:

def, err := job.New("orders-sync", "sync-orders",
    job.WithRegister(func(w worker.Worker) { /* ... */ }),
    job.WithExecute(func(ctx, c, opts, in) (client.WorkflowRun, error) { /* ... */ }),
    job.WithNewInput(func() any { return &OrdersInput{} }),
    job.WithSchedule(&job.ScheduleSpec{Interval: time.Hour}),
)
def.Register(worker)
run, _ := def.Execute(ctx, c, &OrdersInput{...})
detail, _ := def.Describe(ctx, c, run.WorkflowID, run.RunID)

Index

Constants

This section is empty.

Variables

View Source
var (
	// Lookup
	ErrNotFound          = errors.New("job: not found")
	ErrDuplicateName     = errors.New("job: duplicate name")
	ErrInvalidDefinition = errors.New("job: invalid definition")

	// Lifecycle
	ErrAlreadyClosed    = errors.New("job: workflow already closed")
	ErrNoSchedule       = errors.New("job: no schedule configured")
	ErrScheduleNotFound = errors.New("job: schedule not found")

	// Wiring
	ErrNotRegistered = errors.New("job: register not configured")
)

Functions

func RegisterActivityOnce

func RegisterActivityOnce(w worker.Worker, typeName string, fn any, opts activity.RegisterOptions)

RegisterActivityOnce registers an activity on a worker idempotently. Activity name comes from opts.Name; pass empty Name only for typed-function activities (rare in this codebase).

func RegisterWorkflowOnce

func RegisterWorkflowOnce(w worker.Worker, typeName string, wf any, opts workflow.RegisterOptions)

RegisterWorkflowOnce registers a workflow on a worker, returning silently if the (worker, typeName) pair has already been registered. Used by builder packages to make their RegisterAll-style helpers idempotent.

Types

type ActivityEvent

type ActivityEvent struct {
	Name      string
	Status    ActivityStatus
	Attempt   int32
	StartTime time.Time
	CloseTime time.Time
	Duration  time.Duration
	Input     []byte // raw payload; caller deserializes
	Result    []byte // raw payload; nil on failure
	Error     string // empty on success
}

ActivityEvent describes one activity attempt within a workflow run.

type ActivityStatus

type ActivityStatus int

ActivityStatus mirrors Temporal's per-activity outcome.

const (
	ActivityScheduled ActivityStatus = iota
	ActivityStarted
	ActivityCompleted
	ActivityFailed
	ActivityTimedOut
	ActivityCanceled
)

func (ActivityStatus) String

func (s ActivityStatus) String() string

type CalendarSpec

type CalendarSpec struct {
	Second     []ScheduleRange
	Minute     []ScheduleRange
	Hour       []ScheduleRange
	DayOfMonth []ScheduleRange
	Month      []ScheduleRange
	Year       []ScheduleRange
	DayOfWeek  []ScheduleRange
	Comment    string
}

CalendarSpec mirrors Temporal's ScheduleCalendarSpec.

type Definition

type Definition struct {
	Name        string
	TaskQueue   string
	Description string
	Tags        []string
	Schedule    *ScheduleSpec
	// contains filtered or unexported fields
}

Definition is a type-focused description of one registered Temporal workflow. All per-job operations hang off the type as methods.

func New

func New(name, taskQueue string, opts ...Option) (*Definition, error)

New constructs a Definition. Validates name, task queue, all closures, and the optional schedule. Returns ErrInvalidDefinition if anything is missing or inconsistent.

func (*Definition) ApplySchedule

func (d *Definition) ApplySchedule(ctx context.Context, c client.Client) error

ApplySchedule creates or updates the Temporal schedule for this Definition. Schedule ID equals Definition.Name. If a schedule with that ID already exists, it is updated to match the current ScheduleSpec.

func (*Definition) Cancel

func (d *Definition) Cancel(ctx context.Context, c client.Client, wfID, runID string) error

Cancel requests cancellation of a workflow run.

func (*Definition) DeleteSchedule

func (d *Definition) DeleteSchedule(ctx context.Context, c client.Client) error

DeleteSchedule removes the schedule from Temporal.

func (*Definition) Describe

func (d *Definition) Describe(ctx context.Context, c client.Client, wfID, runID string) (RunDetail, error)

Describe returns the current state of one workflow run. runID "" = latest.

func (*Definition) DescribeSchedule

func (d *Definition) DescribeSchedule(ctx context.Context, c client.Client) (ScheduleDetail, error)

DescribeSchedule returns the current schedule state.

func (*Definition) Execute

func (d *Definition) Execute(ctx context.Context, c client.Client, input any, opts ...ExecuteOption) (RunHandle, error)

Execute starts a workflow run. The workflow ID defaults to "<Name>-<uuid>" unless overridden via WithWorkflowID(...).

func (*Definition) GetRun

func (d *Definition) GetRun(c client.Client, wfID, runID string) RunHandle

GetRun returns a RunHandle for an existing workflow run identified by wfID and runID (runID "" = latest). Useful when reattaching to a run triggered elsewhere.

func (*Definition) History

func (d *Definition) History(ctx context.Context, c client.Client, wfID, runID string, opts HistoryOpts) (RunHistory, error)

History returns the activity-event extraction of one run's history.

func (*Definition) ListRuns

func (d *Definition) ListRuns(ctx context.Context, c client.Client, opts ListOpts) (RunPage, error)

ListRuns lists workflow executions of this Definition, scoped by "WorkflowId STARTS_WITH '<Name>-'".

func (*Definition) NewInput

func (d *Definition) NewInput() any

NewInput returns a fresh typed zero value for this Definition's workflow input. Callers fill it before calling Execute (e.g., via json.Unmarshal).

func (*Definition) PauseSchedule

func (d *Definition) PauseSchedule(ctx context.Context, c client.Client, note string) error

PauseSchedule pauses an existing schedule.

func (*Definition) Query

func (d *Definition) Query(ctx context.Context, c client.Client, wfID, runID, queryType string, args ...any) (any, error)

Query invokes a synchronous query against a workflow run.

func (*Definition) Register

func (d *Definition) Register(w worker.Worker)

Register wires the workflow and its activities onto a worker. Safe to call concurrently and multiple times — the builder-supplied register closure is expected to use RegisterWorkflowOnce / RegisterActivityOnce for idempotency when the underlying workflow type may be shared across Definitions.

func (*Definition) ResumeSchedule

func (d *Definition) ResumeSchedule(ctx context.Context, c client.Client, note string) error

ResumeSchedule unpauses an existing schedule.

func (*Definition) Signal

func (d *Definition) Signal(ctx context.Context, c client.Client, wfID, runID, signalName string, payload any) error

Signal sends a signal to a workflow run.

func (*Definition) Stats

Stats returns running/completed-today/failed-today counts scoped to this Definition's workflow IDs.

func (*Definition) Terminate

func (d *Definition) Terminate(ctx context.Context, c client.Client, wfID, runID, reason string) error

Terminate hard-stops a workflow run.

func (*Definition) TriggerSchedule

func (d *Definition) TriggerSchedule(ctx context.Context, c client.Client) error

TriggerSchedule fires an immediate run of the schedule's action.

type DefinitionStats

type DefinitionStats struct {
	Running        int64
	CompletedToday int64
	FailedToday    int64
	AsOf           time.Time
}

DefinitionStats is per-Definition aggregate counters.

type ExecuteOption

type ExecuteOption func(*executeConfig)

ExecuteOption customizes a single Definition.Execute call.

func WithMemo

func WithMemo(m map[string]any) ExecuteOption

WithMemo attaches a memo to the workflow execution.

func WithRetryPolicy

func WithRetryPolicy(p *temporal.RetryPolicy) ExecuteOption

WithRetryPolicy sets the workflow-level retry policy.

func WithTaskTimeout

func WithTaskTimeout(d time.Duration) ExecuteOption

WithTaskTimeout sets WorkflowTaskTimeout.

func WithTimeout

func WithTimeout(d time.Duration) ExecuteOption

WithTimeout sets WorkflowExecutionTimeout.

func WithWorkflowID

func WithWorkflowID(id string) ExecuteOption

WithWorkflowID overrides the default ID of "<Name>-<uuid>".

type HistoryOpts

type HistoryOpts struct {
	MaxEvents int // default 500 in the method; 0 = no cap (caller takes responsibility)
}

HistoryOpts configures Definition.History.

type ListOpts

type ListOpts struct {
	Status    []Status   // empty = any
	TimeRange *TimeRange // by StartTime
	PageSize  int        // default 100, max 1000
	PageToken []byte
}

ListOpts configures Definition.ListRuns.

type Option

type Option func(*Definition)

Option configures a Definition during construction.

func WithDescription

func WithDescription(desc string) Option

WithDescription attaches a human-readable description.

func WithExecute

func WithExecute(fn func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, input any) (client.WorkflowRun, error)) Option

WithExecute sets the workflow-execution closure. The closure receives a pre-built client.StartWorkflowOptions (ID + TaskQueue + caller overrides) and the typed input value.

func WithNewInput

func WithNewInput(fn func() any) Option

WithNewInput sets the factory that returns a typed zero value of the workflow input. Callers fill the value before calling Execute.

func WithRegister

func WithRegister(fn func(worker.Worker)) Option

WithRegister sets the worker-registration closure.

func WithSchedule

func WithSchedule(spec *ScheduleSpec) Option

WithSchedule attaches an optional schedule specification.

func WithTags

func WithTags(tags ...string) Option

WithTags attaches user-defined tags.

type OverlapPolicy

type OverlapPolicy int

OverlapPolicy controls how the scheduler handles a new trigger when a previous run is still in flight. Values mirror Temporal's ScheduleOverlapPolicy.

const (
	OverlapSkip           OverlapPolicy = iota // default — drop new trigger if previous still running
	OverlapBufferOne                           // queue one trigger; drop further
	OverlapBufferAll                           // queue all triggers
	OverlapCancelOther                         // cancel running, start new
	OverlapTerminateOther                      // terminate running, start new
	OverlapAllowAll                            // start in parallel
)

func (OverlapPolicy) ToSDK

ToSDK converts the OverlapPolicy to the equivalent Temporal SDK enum value.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps logical job names to Definitions. Construction does not validate seed Definitions twice — they were validated by New already.

func NewRegistry

func NewRegistry(defs ...*Definition) *Registry

NewRegistry creates a Registry, optionally seeded. Duplicates among seeds silently use the later value (validate input upstream if that matters).

func (*Registry) Add

func (r *Registry) Add(d *Definition) error

Add inserts a Definition. Returns ErrDuplicateName on conflict, ErrInvalidDefinition if the Definition is nil or missing required fields.

func (*Registry) ApplySchedules

func (r *Registry) ApplySchedules(ctx context.Context, c client.Client) error

ApplySchedules creates or updates Temporal schedules for every Definition that has one. Returns the first error encountered (does not roll back).

func (*Registry) Get

func (r *Registry) Get(name string) (*Definition, bool)

Get returns the Definition with the given name and a boolean indicating whether it was found.

func (*Registry) List

func (r *Registry) List() []*Definition

List returns all Definitions, sorted by Name.

func (*Registry) MustGet

func (r *Registry) MustGet(name string) *Definition

MustGet returns the Definition with the given name. Panics with fmt.Errorf("%w: %s", ErrNotFound, name) if absent.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns all registered names, sorted alphabetically.

func (*Registry) RegisterAll

func (r *Registry) RegisterAll(w worker.Worker)

RegisterAll registers every Definition on the given worker. Idempotent: underlying workflow/activity types are deduplicated via RegisterWorkflowOnce / RegisterActivityOnce in builder closures.

type RunDetail

type RunDetail struct {
	WorkflowID       string
	RunID            string
	Type             string
	TaskQueue        string
	Status           Status
	StartTime        time.Time
	CloseTime        *time.Time // nil if still running
	ExecutionTime    time.Duration
	HistoryLength    int64
	Memo             map[string]any
	SearchAttributes map[string]any
}

RunDetail is the full description of one workflow run.

type RunHandle

type RunHandle struct {
	WorkflowID string
	RunID      string
	// contains filtered or unexported fields
}

RunHandle is a lightweight handle to one workflow run. Returned by Definition.Execute and Definition.GetRun.

func (RunHandle) Get

func (h RunHandle) Get(ctx context.Context, valuePtr any) error

Get blocks until the workflow completes and unmarshals its result into valuePtr (must be a non-nil pointer). Returns the workflow's error if it failed. Returns nil if the handle has no underlying run (e.g., constructed from GetRun on an unknown ID).

type RunHistory

type RunHistory struct {
	WorkflowID string
	RunID      string
	Activities []ActivityEvent
	Truncated  bool
}

RunHistory is the activity-event extraction of one run's history, bounded by HistoryOpts.MaxEvents.

type RunPage

type RunPage struct {
	Runs          []RunSummary
	NextPageToken []byte
}

RunPage is one page of ListRuns results.

type RunSummary

type RunSummary struct {
	WorkflowID string
	RunID      string
	Type       string
	Status     Status
	StartTime  time.Time
	CloseTime  *time.Time
	TaskQueue  string
}

RunSummary is one row in a list of runs.

type ScheduleDetail

type ScheduleDetail struct {
	ScheduleSummary
	Spec       ScheduleSpec
	RecentRuns []RunSummary
}

ScheduleDetail is the full schedule description.

type ScheduleListOpts

type ScheduleListOpts struct {
	PageSize  int
	PageToken []byte
}

ScheduleListOpts configures Registry.ListSchedules (future) and individual schedule paging.

type ScheduleRange

type ScheduleRange struct {
	Start int
	End   int
	Step  int
}

ScheduleRange mirrors Temporal's ScheduleRange (Start/End/Step int).

type ScheduleSpec

type ScheduleSpec struct {
	Interval time.Duration
	Cron     string
	Calendar []CalendarSpec

	Overlap       OverlapPolicy
	Jitter        time.Duration
	Paused        bool
	Note          string
	CatchupWindow time.Duration
}

ScheduleSpec describes when a Definition's workflow should fire automatically. Exactly one of Interval / Cron / Calendar must be set.

type ScheduleSummary

type ScheduleSummary struct {
	ID           string
	WorkflowType string
	Paused       bool
	NextRunTime  *time.Time
	LastRunTime  *time.Time
	Note         string
}

ScheduleSummary is a lightweight schedule summary.

type StatsOpts

type StatsOpts struct {
	TodayOnly bool           // default false — set true for "running + closed today"
	Location  *time.Location // if nil and TodayOnly: UTC; otherwise this zone's calendar day
}

StatsOpts configures Definition.Stats.

type Status

type Status int

Status represents a workflow execution status, mirrored from Temporal's WorkflowExecutionStatus enum for use without leaking SDK enum types.

const (
	StatusUnknown Status = iota
	StatusRunning
	StatusCompleted
	StatusFailed
	StatusCanceled
	StatusTerminated
	StatusContinuedAsNew
	StatusTimedOut
)

func StatusFromSDK

func StatusFromSDK(s enumspb.WorkflowExecutionStatus) Status

StatusFromSDK maps a Temporal SDK WorkflowExecutionStatus to a job.Status.

func (Status) IsTerminal

func (s Status) IsTerminal() bool

IsTerminal reports whether the status represents a closed (finished) workflow.

func (Status) String

func (s Status) String() string

String returns the lowercase snake_case name of the status.

type TaskQueueInfo

type TaskQueueInfo struct {
	Name        string
	WorkerCount int
}

TaskQueueInfo describes a task queue's pollers and reachability.

type TimeRange

type TimeRange struct {
	Start time.Time
	End   time.Time
}

TimeRange filters by a start-time inclusive range.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL