scheduler

package
v1.4.1-prerelease21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkflowTypeName = "cadence-scheduler"
	TaskListName     = "cadence-scheduler"

	SignalNamePause    = "scheduler-pause"
	SignalNameUnpause  = "scheduler-unpause"
	SignalNameUpdate   = "scheduler-update"
	SignalNameBackfill = "scheduler-backfill"
	SignalNameDelete   = "scheduler-delete"

	QueryTypeDescribe = "scheduler-describe"

	// Metric name strings emitted via tally.Scope (workflow.GetMetricsScope).
	SchedulerSignalReceivedCountPerDomain = "scheduler_signal_received_count_per_domain"
	SchedulerMissedFiredCountPerDomain    = "scheduler_missed_fired_count_per_domain"
	SchedulerMissedSkippedCountPerDomain  = "scheduler_missed_skipped_count_per_domain"
	SchedulerBackfillFiredCountPerDomain  = "scheduler_backfill_fired_count_per_domain"
	// SchedulerBackfillRejectedCountPerDomain counts backfill signals dropped by
	// the workflow after the RPC has already returned success. Tagged with the
	// rejection reason (invalid_range, queue_full).
	SchedulerBackfillRejectedCountPerDomain = "scheduler_backfill_rejected_count_per_domain"
	SchedulerContinueAsNewCountPerDomain    = "scheduler_continue_as_new_count_per_domain"
	// SchedulerBufferOverflowCountPerDomain measures fires dropped because the
	// BUFFER overlap policy queue is full. Tagged with the drop reason so
	// operators can distinguish drops driven by the user's buffer_limit
	// (reason=user_limit) from drops driven by the server-side ceiling that
	// protects ContinueAsNew payload size (reason=system_limit).
	SchedulerBufferOverflowCountPerDomain = "scheduler_buffer_overflow_count_per_domain"

	// Tag key strings for scheduler workflow metrics.
	SignalTypeTag    = "signal_type"
	CatchUpPolicyTag = "catch_up_policy"
	ReasonTag        = "reason"

	// ContinueAsNew reason tag values for scheduler_continue_as_new_count metric.
	ContinueAsNewReasonMissedRun    = "missed_run"
	ContinueAsNewReasonBackfill     = "back_fill"
	ContinueAsNewReasonBufferDrain  = "buffer_drain"
	ContinueAsNewReasonSignal       = "signal"
	ContinueAsNewReasonIterationCap = "iteration_cap"

	// Buffer overflow reason tag values for scheduler_buffer_overflow_count metric.
	// Distinguishes drops driven by the user's buffer_limit from drops driven by
	// the server-side cap that protects ContinueAsNew payload size.
	BufferOverflowReasonUserLimit   = "user_limit"
	BufferOverflowReasonSystemLimit = "system_limit"

	// Reason tag values for scheduler_backfill_rejected_count_per_domain.
	BackfillRejectedReasonInvalidRange = "invalid_range"
	BackfillRejectedReasonQueueFull    = "queue_full"

	// MaxBufferedFiresSystemLimit caps the BUFFER overlap policy queue regardless
	// of buffer_limit (including buffer_limit=0 meaning unlimited). It bounds the
	// ContinueAsNew payload size: each BufferedFire is ~50 bytes JSON, so 1000
	// entries stays well within the workflow input size limit.
	MaxBufferedFiresSystemLimit = 1000

	// MaxConcurrencyLimitSystemLimit caps ConcurrencyLimit for the bounded CONCURRENT
	// overlap policy regardless of the user-configured value. It bounds the
	// RunningWorkflows slice carried in ContinueAsNew payload: each RunningWorkflowInfo
	// is ~110 bytes JSON, so 1000 entries adds ~107KB — well within the 2MB hard limit
	// and leaving headroom for the rest of the workflow state. Exceeding the 2MB limit
	// causes Cadence to fail the workflow entirely with no graceful degradation.
	MaxConcurrencyLimitSystemLimit = 1000

	// Search attribute keys set on target workflows started by the scheduler.
	// The string values are defined in common/definition to make them part of
	// the default indexed keys for all visibility backends.
	SearchAttrScheduleID   = definition.CadenceScheduleID
	SearchAttrScheduleTime = definition.CadenceScheduleTime
	SearchAttrIsBackfill   = definition.CadenceScheduleIsBackfill
	SearchAttrBackfillID   = definition.CadenceScheduleBackfillID

	// Search attribute keys set on the scheduler workflow itself for ListSchedules.
	// CadenceScheduleState is a Keyword SA holding the current lifecycle state
	// ("active" or "paused"). Modeled as a string rather than a boolean so it can
	// be extended to additional states (e.g. "expired") without introducing new
	// search attributes. "Deleted" is not a value because a deleted schedule's
	// workflow is closed and filtered by workflow status instead.
	SearchAttrScheduleState = definition.CadenceScheduleState
	// CadenceScheduleCron holds the current cron expression so ListSchedules
	// can display it without querying each scheduler workflow. Refreshed on
	// workflow start (including after ContinueAsNew triggered by UpdateSchedule).
	SearchAttrScheduleCron = definition.CadenceScheduleCron
	// CadenceScheduleWorkflowType holds the target workflow type name that the
	// schedule starts on each fire. Same refresh semantics as the cron SA.
	SearchAttrScheduleWorkflowType = definition.CadenceScheduleWorkflowType

	ScheduleStateActive = "active"
	ScheduleStatePaused = "paused"
)

Variables

This section is empty.

Functions

func SchedulerWorkflow

func SchedulerWorkflow(ctx workflow.Context, input SchedulerWorkflowInput) error

SchedulerWorkflow is a long-running workflow that manages a single schedule. It computes the next fire time from the cron expression, waits via a timer, and dispatches the configured action. Signals control pause/unpause, update, backfill, and deletion.

The main loop follows a state-machine pattern: all inputs (signals and timer) uniformly mutate state, and then a single decision point inspects the resulting state to determine what to do next. ContinueAsNew is triggered on any state-changing signal (pause, unpause, update) so the new execution's input is always the authoritative source of truth.

Types

type BackfillRequest

type BackfillRequest struct {
	StartTime     time.Time                   `json:"startTime"`
	EndTime       time.Time                   `json:"endTime"`
	OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy"`
	BackfillID    string                      `json:"backfillId,omitempty"`
}

BackfillRequest is a queued backfill that persists across ContinueAsNew.

type BackfillSignal

type BackfillSignal struct {
	StartTime     time.Time                   `json:"startTime"`
	EndTime       time.Time                   `json:"endTime"`
	OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy"`
	BackfillID    string                      `json:"backfillId,omitempty"`
}

BackfillSignal is the payload sent with a backfill signal.

type BootstrapParams

type BootstrapParams struct {
	ServiceClient      workflowserviceclient.Interface
	FrontendClient     frontend.Client
	MetricsClient      metrics.Client
	Logger             log.Logger
	DomainCache        cache.DomainCache
	MembershipResolver membership.Resolver
	HostInfo           membership.HostInfo
	// RefreshInterval returns how often the manager should re-scan the
	// domain cache to reconcile per-domain workers. Re-evaluated on every
	// tick so live dynamic-config changes take effect on the next iteration.
	// Nil falls back to a sensible default.
	RefreshInterval dynamicproperties.DurationPropertyFn
}

BootstrapParams contains the parameters needed to create a scheduler worker manager.

type BufferedFire

type BufferedFire struct {
	ScheduledTime time.Time                   `json:"scheduledTime"`
	TriggerSource TriggerSource               `json:"triggerSource"`
	OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy,omitempty"`
	// BackfillID is set when TriggerSource is backfill so BUFFER drains stamp the same SA.
	BackfillID string `json:"backfillId,omitempty"`
}

BufferedFire is a schedule fire queued for sequential execution by the BUFFER overlap policy. ScheduledTime and TriggerSource are preserved so the deferred start uses the same WorkflowID and RequestID it would have used at fire time. OverlapPolicy is the overlap policy in effect for this fire (schedule default or a backfill override). Zero (INVALID) means inherit input.Policies.OverlapPolicy for compatibility with older persisted workflow state.

type PauseSignal

type PauseSignal struct {
	Reason   string `json:"reason,omitempty"`
	PausedBy string `json:"pausedBy,omitempty"`
}

PauseSignal is the payload sent with a pause signal.

type ProcessFireRequest

type ProcessFireRequest struct {
	Domain              string                      `json:"domain"`
	ScheduleID          string                      `json:"scheduleId"`
	Action              types.StartWorkflowAction   `json:"action"`
	ScheduledTime       time.Time                   `json:"scheduledTime"`
	TriggerSource       TriggerSource               `json:"triggerSource"`
	OverlapPolicy       types.ScheduleOverlapPolicy `json:"overlapPolicy"`
	LastStartedWorkflow *RunningWorkflowInfo        `json:"lastStartedWorkflow,omitempty"`
	// ConcurrencyLimit mirrors SchedulePolicies.ConcurrencyLimit:
	//   nil      = unset (use server default; effectively unlimited)
	//   *int32(0) = explicitly unlimited
	//   *int32(N) = capped at N concurrent runs
	ConcurrencyLimit *int32 `json:"concurrencyLimit,omitempty"`
	// RunningWorkflows is the current in-flight set from workflow state; used
	// only when OverlapPolicy==CONCURRENT and ConcurrencyLimit > 0.
	RunningWorkflows []RunningWorkflowInfo `json:"runningWorkflows,omitempty"`
	// BackfillID is non-empty only for fires driven by a schedule backfill (matches RPC BackfillID).
	BackfillID string `json:"backfillId,omitempty"`
}

ProcessFireRequest is the input to processScheduleFireActivity. It contains everything the activity needs to resolve the overlap policy and start the target workflow. All side effects (describe, cancel, terminate, start) happen inside this single activity so the workflow history stays stable when the overlap logic evolves.

type ProcessFireResult

type ProcessFireResult struct {
	StartedWorkflow *RunningWorkflowInfo `json:"startedWorkflow,omitempty"`
	TotalDelta      int64                `json:"totalDelta"`
	SkippedDelta    int64                `json:"skippedDelta"`
	// Buffered is true when the BUFFER overlap policy deferred this fire
	// because the previous target workflow was still running. The workflow
	// appends the fire to state.BufferedFires and retries draining on the
	// next loop iteration.
	Buffered bool `json:"buffered,omitempty"`
	// ActiveWorkflows is the updated in-flight set for bounded CONCURRENT; the workflow
	// replaces state.RunningWorkflows with it after each fire. Nil for all other policies.
	ActiveWorkflows []RunningWorkflowInfo `json:"activeWorkflows,omitempty"`
}

ProcessFireResult is the output of processScheduleFireActivity. The workflow applies these counters and tracking info to its state after the activity returns.

type RunningWorkflowInfo

type RunningWorkflowInfo struct {
	WorkflowID string `json:"workflowId"`
	RunID      string `json:"runId"`
}

RunningWorkflowInfo identifies a target workflow started by the scheduler, used for overlap policy checks.

type ScheduleDescription

type ScheduleDescription struct {
	ScheduleID       string                  `json:"scheduleId"`
	Domain           string                  `json:"domain"`
	Spec             types.ScheduleSpec      `json:"spec"`
	Action           types.ScheduleAction    `json:"action"`
	Policies         types.SchedulePolicies  `json:"policies"`
	Paused           bool                    `json:"paused"`
	PauseReason      string                  `json:"pauseReason,omitempty"`
	PausedBy         string                  `json:"pausedBy,omitempty"`
	LastRunTime      time.Time               `json:"lastRunTime,omitempty"`
	NextRunTime      time.Time               `json:"nextRunTime,omitempty"`
	TotalRuns        int64                   `json:"totalRuns"`
	MissedRuns       int64                   `json:"missedRuns"`
	SkippedRuns      int64                   `json:"skippedRuns"`
	Memo             *types.Memo             `json:"memo,omitempty"`
	SearchAttributes *types.SearchAttributes `json:"searchAttributes,omitempty"`
}

ScheduleDescription is the query result returned by the describe query handler. It provides a snapshot of the schedule's current configuration and runtime state.

type SchedulerWorkflowInput

type SchedulerWorkflowInput struct {
	Domain           string                  `json:"domain"`
	ScheduleID       string                  `json:"scheduleId"`
	Spec             types.ScheduleSpec      `json:"spec"`
	Action           types.ScheduleAction    `json:"action"`
	Policies         types.SchedulePolicies  `json:"policies"`
	SearchAttributes *types.SearchAttributes `json:"searchAttributes,omitempty"`
	Memo             *types.Memo             `json:"memo,omitempty"`
	State            SchedulerWorkflowState  `json:"state"`
}

SchedulerWorkflowInput is the input to the scheduler workflow. It carries the schedule definition and any prior state (for ContinueAsNew).

type SchedulerWorkflowState

type SchedulerWorkflowState struct {
	Paused            bool              `json:"paused"`
	PauseReason       string            `json:"pauseReason,omitempty"`
	PausedBy          string            `json:"pausedBy,omitempty"`
	Deleted           bool              `json:"-"`                           // transient flag, not persisted across ContinueAsNew
	LastRunTime       time.Time         `json:"lastRunTime,omitempty"`       // most recent scheduled run time the workflow has processed
	LastProcessedTime time.Time         `json:"lastProcessedTime,omitempty"` // catch-up watermark: latest missed fire we've processed (fired or skipped)
	NextRunTime       time.Time         `json:"nextRunTime,omitempty"`
	TotalRuns         int64             `json:"totalRuns"`
	MissedRuns        int64             `json:"missedRuns"`
	SkippedRuns       int64             `json:"skippedRuns"`
	Iterations        int               `json:"iterations"`
	PendingBackfills  []BackfillRequest `json:"pendingBackfills,omitempty"`
	// BufferedFires holds fires queued for sequential execution under the BUFFER
	// overlap policy. Fires are appended when the previous target workflow is
	// still running at fire time and drained in FIFO order on subsequent
	// opportunities (timer wakeups, signal wakeups). Persisted across
	// ContinueAsNew so buffered work isn't lost on workflow recycling.
	BufferedFires []BufferedFire `json:"bufferedFires,omitempty"`
	// LastStartedWorkflow tracks the most recently started target workflow so
	// the overlap policy can check whether it is still running before starting
	// the next one. Nil when no workflow has been started yet.
	LastStartedWorkflow *RunningWorkflowInfo `json:"lastStartedWorkflow,omitempty"`
	// RunningWorkflows holds in-flight target workflows under bounded CONCURRENT
	// (ConcurrencyLimit > 0); completed entries are pruned by the activity on each fire.
	RunningWorkflows []RunningWorkflowInfo `json:"runningWorkflows,omitempty"`
}

SchedulerWorkflowState is the mutable runtime state that survives ContinueAsNew.

type TriggerSource

type TriggerSource string

TriggerSource identifies what caused a schedule fire, used to differentiate RequestIDs so that e.g. a backfill for the same timestamp as a normal fire does not collide in server-side deduplication.

const (
	TriggerSourceSchedule TriggerSource = "schedule"
	TriggerSourceBackfill TriggerSource = "backfill"
)

type UnpauseSignal

type UnpauseSignal struct {
	Reason        string                      `json:"reason,omitempty"`
	CatchUpPolicy types.ScheduleCatchUpPolicy `json:"catchUpPolicy,omitempty"`
}

UnpauseSignal is the payload sent with an unpause signal.

type UpdateSignal

type UpdateSignal struct {
	Spec             *types.ScheduleSpec     `json:"spec,omitempty"`
	Action           *types.ScheduleAction   `json:"action,omitempty"`
	Policies         *types.SchedulePolicies `json:"policies,omitempty"`
	SearchAttributes *types.SearchAttributes `json:"searchAttributes,omitempty"`
}

UpdateSignal is the payload sent with an update signal.

type WorkerManager

type WorkerManager struct {
	// contains filtered or unexported fields
}

WorkerManager manages per-domain scheduler workers. It periodically scans the domain cache and uses the membership hashring to determine which domains this host should cover. For each such domain it starts a Cadence SDK worker polling the scheduler task list. Each domain is covered by workerRedundancyFactor hosts simultaneously so that a single host failure does not cause a scheduling gap.

func NewWorkerManager

NewWorkerManager creates a new per-domain scheduler worker manager.

func (*WorkerManager) Start

func (m *WorkerManager) Start()

Start begins the background loop that manages per-domain workers.

func (*WorkerManager) Stop

func (m *WorkerManager) Stop()

Stop signals the background loop to stop and waits for it to finish. It then stops all active workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL