Documentation
¶
Overview ¶
Package job provides a type-focused abstraction for one registered Temporal workflow. A Definition holds the metadata, lifecycle hooks, and per-job operations for a single workflow: register on a worker, execute by name, attach a schedule, describe runs, control lifecycle.
The package coexists with pkg/temporal's existing managers (WorkflowManager, ScheduleManager). Use a Definition when you have a typed handle to "your" workflow; use the namespace-wide managers when you need to inspect workflows you didn't register yourself.
Typical use:
def, err := job.New("orders-sync", "sync-orders",
job.WithRegister(func(w worker.Worker) { /* ... */ }),
job.WithExecute(func(ctx, c, opts, in) (client.WorkflowRun, error) { /* ... */ }),
job.WithNewInput(func() any { return &OrdersInput{} }),
job.WithSchedule(&job.ScheduleSpec{Interval: time.Hour}),
)
def.Register(worker)
run, _ := def.Execute(ctx, c, &OrdersInput{...})
detail, _ := def.Describe(ctx, c, run.WorkflowID, run.RunID)
Index ¶
- Variables
- func RegisterActivityOnce(w worker.Worker, typeName string, fn any, opts activity.RegisterOptions)
- func RegisterWorkflowOnce(w worker.Worker, typeName string, wf any, opts workflow.RegisterOptions)
- type ActivityEvent
- type ActivityStatus
- type CalendarSpec
- type Definition
- func (d *Definition) ApplySchedule(ctx context.Context, c client.Client) error
- func (d *Definition) Cancel(ctx context.Context, c client.Client, wfID, runID string) error
- func (d *Definition) DeleteSchedule(ctx context.Context, c client.Client) error
- func (d *Definition) Describe(ctx context.Context, c client.Client, wfID, runID string) (RunDetail, error)
- func (d *Definition) DescribeSchedule(ctx context.Context, c client.Client) (ScheduleDetail, error)
- func (d *Definition) Execute(ctx context.Context, c client.Client, input any, opts ...ExecuteOption) (RunHandle, error)
- func (d *Definition) GetRun(c client.Client, wfID, runID string) RunHandle
- func (d *Definition) History(ctx context.Context, c client.Client, wfID, runID string, opts HistoryOpts) (RunHistory, error)
- func (d *Definition) ListRuns(ctx context.Context, c client.Client, opts ListOpts) (RunPage, error)
- func (d *Definition) NewInput() any
- func (d *Definition) PauseSchedule(ctx context.Context, c client.Client, note string) error
- func (d *Definition) Query(ctx context.Context, c client.Client, wfID, runID, queryType string, ...) (any, error)
- func (d *Definition) Register(w worker.Worker)
- func (d *Definition) ResumeSchedule(ctx context.Context, c client.Client, note string) error
- func (d *Definition) Signal(ctx context.Context, c client.Client, wfID, runID, signalName string, ...) error
- func (d *Definition) Stats(ctx context.Context, c client.Client, opts StatsOpts) (DefinitionStats, error)
- func (d *Definition) Terminate(ctx context.Context, c client.Client, wfID, runID, reason string) error
- func (d *Definition) TriggerSchedule(ctx context.Context, c client.Client) error
- type DefinitionStats
- type ExecuteOption
- type HistoryOpts
- type ListOpts
- type Option
- type OverlapPolicy
- type Registry
- func (r *Registry) Add(d *Definition) error
- func (r *Registry) ApplySchedules(ctx context.Context, c client.Client) error
- func (r *Registry) Get(name string) (*Definition, bool)
- func (r *Registry) List() []*Definition
- func (r *Registry) MustGet(name string) *Definition
- func (r *Registry) Names() []string
- func (r *Registry) RegisterAll(w worker.Worker)
- type RunDetail
- type RunHandle
- type RunHistory
- type RunPage
- type RunSummary
- type ScheduleDetail
- type ScheduleListOpts
- type ScheduleRange
- type ScheduleSpec
- type ScheduleSummary
- type StatsOpts
- type Status
- type TaskQueueInfo
- type TimeRange
Constants ¶
This section is empty.
Variables ¶
var ( // Lookup ErrNotFound = errors.New("job: not found") ErrDuplicateName = errors.New("job: duplicate name") ErrInvalidDefinition = errors.New("job: invalid definition") // Lifecycle ErrAlreadyClosed = errors.New("job: workflow already closed") ErrNoSchedule = errors.New("job: no schedule configured") ErrScheduleNotFound = errors.New("job: schedule not found") // Wiring ErrNotRegistered = errors.New("job: register not configured") )
Functions ¶
func RegisterActivityOnce ¶
RegisterActivityOnce registers an activity on a worker idempotently. Activity name comes from opts.Name; pass empty Name only for typed-function activities (rare in this codebase).
func RegisterWorkflowOnce ¶
RegisterWorkflowOnce registers a workflow on a worker, returning silently if the (worker, typeName) pair has already been registered. Used by builder packages to make their RegisterAll-style helpers idempotent.
Types ¶
type ActivityEvent ¶
type ActivityEvent struct {
Name string
Status ActivityStatus
Attempt int32
StartTime time.Time
CloseTime time.Time
Duration time.Duration
Input []byte // raw payload; caller deserializes
Result []byte // raw payload; nil on failure
Error string // empty on success
}
ActivityEvent describes one activity attempt within a workflow run.
type ActivityStatus ¶
type ActivityStatus int
ActivityStatus mirrors Temporal's per-activity outcome.
const ( ActivityScheduled ActivityStatus = iota ActivityStarted ActivityCompleted ActivityFailed ActivityTimedOut ActivityCanceled )
func (ActivityStatus) String ¶
func (s ActivityStatus) String() string
type CalendarSpec ¶
type CalendarSpec struct {
Second []ScheduleRange
Minute []ScheduleRange
Hour []ScheduleRange
DayOfMonth []ScheduleRange
Month []ScheduleRange
Year []ScheduleRange
DayOfWeek []ScheduleRange
Comment string
}
CalendarSpec mirrors Temporal's ScheduleCalendarSpec.
type Definition ¶
type Definition struct {
Name string
TaskQueue string
Description string
Tags []string
Schedule *ScheduleSpec
// contains filtered or unexported fields
}
Definition is a type-focused description of one registered Temporal workflow. All per-job operations hang off the type as methods.
func New ¶
func New(name, taskQueue string, opts ...Option) (*Definition, error)
New constructs a Definition. Validates name, task queue, all closures, and the optional schedule. Returns ErrInvalidDefinition if anything is missing or inconsistent.
func (*Definition) ApplySchedule ¶
ApplySchedule creates or updates the Temporal schedule for this Definition. Schedule ID equals Definition.Name. If a schedule with that ID already exists, it is updated to match the current ScheduleSpec.
func (*Definition) DeleteSchedule ¶
DeleteSchedule removes the schedule from Temporal.
func (*Definition) Describe ¶
func (d *Definition) Describe(ctx context.Context, c client.Client, wfID, runID string) (RunDetail, error)
Describe returns the current state of one workflow run. runID "" = latest.
func (*Definition) DescribeSchedule ¶
func (d *Definition) DescribeSchedule(ctx context.Context, c client.Client) (ScheduleDetail, error)
DescribeSchedule returns the current schedule state.
func (*Definition) Execute ¶
func (d *Definition) Execute(ctx context.Context, c client.Client, input any, opts ...ExecuteOption) (RunHandle, error)
Execute starts a workflow run. The workflow ID defaults to "<Name>-<uuid>" unless overridden via WithWorkflowID(...).
func (*Definition) GetRun ¶
func (d *Definition) GetRun(c client.Client, wfID, runID string) RunHandle
GetRun returns a RunHandle for an existing workflow run identified by wfID and runID (runID "" = latest). Useful when reattaching to a run triggered elsewhere.
func (*Definition) History ¶
func (d *Definition) History(ctx context.Context, c client.Client, wfID, runID string, opts HistoryOpts) (RunHistory, error)
History returns the activity-event extraction of one run's history.
func (*Definition) ListRuns ¶
ListRuns lists workflow executions of this Definition, scoped by "WorkflowId STARTS_WITH '<Name>-'".
func (*Definition) NewInput ¶
func (d *Definition) NewInput() any
NewInput returns a fresh typed zero value for this Definition's workflow input. Callers fill it before calling Execute (e.g., via json.Unmarshal).
func (*Definition) PauseSchedule ¶
PauseSchedule pauses an existing schedule.
func (*Definition) Query ¶
func (d *Definition) Query(ctx context.Context, c client.Client, wfID, runID, queryType string, args ...any) (any, error)
Query invokes a synchronous query against a workflow run.
func (*Definition) Register ¶
func (d *Definition) Register(w worker.Worker)
Register wires the workflow and its activities onto a worker. Safe to call concurrently and multiple times — the builder-supplied register closure is expected to use RegisterWorkflowOnce / RegisterActivityOnce for idempotency when the underlying workflow type may be shared across Definitions.
func (*Definition) ResumeSchedule ¶
ResumeSchedule unpauses an existing schedule.
func (*Definition) Signal ¶
func (d *Definition) Signal(ctx context.Context, c client.Client, wfID, runID, signalName string, payload any) error
Signal sends a signal to a workflow run.
func (*Definition) Stats ¶
func (d *Definition) Stats(ctx context.Context, c client.Client, opts StatsOpts) (DefinitionStats, error)
Stats returns running/completed-today/failed-today counts scoped to this Definition's workflow IDs.
func (*Definition) Terminate ¶
func (d *Definition) Terminate(ctx context.Context, c client.Client, wfID, runID, reason string) error
Terminate hard-stops a workflow run.
func (*Definition) TriggerSchedule ¶
TriggerSchedule fires an immediate run of the schedule's action.
type DefinitionStats ¶
DefinitionStats is per-Definition aggregate counters.
type ExecuteOption ¶
type ExecuteOption func(*executeConfig)
ExecuteOption customizes a single Definition.Execute call.
func WithMemo ¶
func WithMemo(m map[string]any) ExecuteOption
WithMemo attaches a memo to the workflow execution.
func WithRetryPolicy ¶
func WithRetryPolicy(p *temporal.RetryPolicy) ExecuteOption
WithRetryPolicy sets the workflow-level retry policy.
func WithTaskTimeout ¶
func WithTaskTimeout(d time.Duration) ExecuteOption
WithTaskTimeout sets WorkflowTaskTimeout.
func WithTimeout ¶
func WithTimeout(d time.Duration) ExecuteOption
WithTimeout sets WorkflowExecutionTimeout.
func WithWorkflowID ¶
func WithWorkflowID(id string) ExecuteOption
WithWorkflowID overrides the default ID of "<Name>-<uuid>".
type HistoryOpts ¶
type HistoryOpts struct {
MaxEvents int // default 500 in the method; 0 = no cap (caller takes responsibility)
}
HistoryOpts configures Definition.History.
type ListOpts ¶
type ListOpts struct {
Status []Status // empty = any
TimeRange *TimeRange // by StartTime
PageSize int // default 100, max 1000
PageToken []byte
}
ListOpts configures Definition.ListRuns.
type Option ¶
type Option func(*Definition)
Option configures a Definition during construction.
func WithDescription ¶
WithDescription attaches a human-readable description.
func WithExecute ¶
func WithExecute(fn func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, input any) (client.WorkflowRun, error)) Option
WithExecute sets the workflow-execution closure. The closure receives a pre-built client.StartWorkflowOptions (ID + TaskQueue + caller overrides) and the typed input value.
func WithNewInput ¶
WithNewInput sets the factory that returns a typed zero value of the workflow input. Callers fill the value before calling Execute.
func WithRegister ¶
WithRegister sets the worker-registration closure.
func WithSchedule ¶
func WithSchedule(spec *ScheduleSpec) Option
WithSchedule attaches an optional schedule specification.
type OverlapPolicy ¶
type OverlapPolicy int
OverlapPolicy controls how the scheduler handles a new trigger when a previous run is still in flight. Values mirror Temporal's ScheduleOverlapPolicy.
const ( OverlapSkip OverlapPolicy = iota // default — drop new trigger if previous still running OverlapBufferOne // queue one trigger; drop further OverlapBufferAll // queue all triggers OverlapCancelOther // cancel running, start new OverlapTerminateOther // terminate running, start new OverlapAllowAll // start in parallel )
func (OverlapPolicy) ToSDK ¶
func (p OverlapPolicy) ToSDK() enumspb.ScheduleOverlapPolicy
ToSDK converts the OverlapPolicy to the equivalent Temporal SDK enum value.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps logical job names to Definitions. Construction does not validate seed Definitions twice — they were validated by New already.
func NewRegistry ¶
func NewRegistry(defs ...*Definition) *Registry
NewRegistry creates a Registry, optionally seeded. Duplicates among seeds silently use the later value (validate input upstream if that matters).
func (*Registry) Add ¶
func (r *Registry) Add(d *Definition) error
Add inserts a Definition. Returns ErrDuplicateName on conflict, ErrInvalidDefinition if the Definition is nil or missing required fields.
func (*Registry) ApplySchedules ¶
ApplySchedules creates or updates Temporal schedules for every Definition that has one. Returns the first error encountered (does not roll back).
func (*Registry) Get ¶
func (r *Registry) Get(name string) (*Definition, bool)
Get returns the Definition with the given name and a boolean indicating whether it was found.
func (*Registry) List ¶
func (r *Registry) List() []*Definition
List returns all Definitions, sorted by Name.
func (*Registry) MustGet ¶
func (r *Registry) MustGet(name string) *Definition
MustGet returns the Definition with the given name. Panics with fmt.Errorf("%w: %s", ErrNotFound, name) if absent.
func (*Registry) RegisterAll ¶
RegisterAll registers every Definition on the given worker. Idempotent: underlying workflow/activity types are deduplicated via RegisterWorkflowOnce / RegisterActivityOnce in builder closures.
type RunDetail ¶
type RunDetail struct {
WorkflowID string
RunID string
Type string
TaskQueue string
Status Status
StartTime time.Time
CloseTime *time.Time // nil if still running
ExecutionTime time.Duration
HistoryLength int64
Memo map[string]any
SearchAttributes map[string]any
}
RunDetail is the full description of one workflow run.
type RunHandle ¶
RunHandle is a lightweight handle to one workflow run. Returned by Definition.Execute and Definition.GetRun.
type RunHistory ¶
type RunHistory struct {
WorkflowID string
RunID string
Activities []ActivityEvent
Truncated bool
}
RunHistory is the activity-event extraction of one run's history, bounded by HistoryOpts.MaxEvents.
type RunPage ¶
type RunPage struct {
Runs []RunSummary
NextPageToken []byte
}
RunPage is one page of ListRuns results.
type RunSummary ¶
type RunSummary struct {
WorkflowID string
RunID string
Type string
Status Status
StartTime time.Time
CloseTime *time.Time
TaskQueue string
}
RunSummary is one row in a list of runs.
type ScheduleDetail ¶
type ScheduleDetail struct {
ScheduleSummary
Spec ScheduleSpec
RecentRuns []RunSummary
}
ScheduleDetail is the full schedule description.
type ScheduleListOpts ¶
ScheduleListOpts configures Registry.ListSchedules (future) and individual schedule paging.
type ScheduleRange ¶
ScheduleRange mirrors Temporal's ScheduleRange (Start/End/Step int).
type ScheduleSpec ¶
type ScheduleSpec struct {
Interval time.Duration
Cron string
Calendar []CalendarSpec
Overlap OverlapPolicy
Jitter time.Duration
Paused bool
Note string
CatchupWindow time.Duration
}
ScheduleSpec describes when a Definition's workflow should fire automatically. Exactly one of Interval / Cron / Calendar must be set.
type ScheduleSummary ¶
type ScheduleSummary struct {
ID string
WorkflowType string
Paused bool
NextRunTime *time.Time
LastRunTime *time.Time
Note string
}
ScheduleSummary is a lightweight schedule summary.
type StatsOpts ¶
type StatsOpts struct {
TodayOnly bool // default false — set true for "running + closed today"
Location *time.Location // if nil and TodayOnly: UTC; otherwise this zone's calendar day
}
StatsOpts configures Definition.Stats.
type Status ¶
type Status int
Status represents a workflow execution status, mirrored from Temporal's WorkflowExecutionStatus enum for use without leaking SDK enum types.
func StatusFromSDK ¶
func StatusFromSDK(s enumspb.WorkflowExecutionStatus) Status
StatusFromSDK maps a Temporal SDK WorkflowExecutionStatus to a job.Status.
func (Status) IsTerminal ¶
IsTerminal reports whether the status represents a closed (finished) workflow.
type TaskQueueInfo ¶
TaskQueueInfo describes a task queue's pollers and reachability.