Documentation
¶
Index ¶
- Constants
- func SchedulerWorkflow(ctx workflow.Context, input SchedulerWorkflowInput) error
- type BackfillRequest
- type BackfillSignal
- type BootstrapParams
- type BufferedFire
- type PauseSignal
- type ProcessFireRequest
- type ProcessFireResult
- type RunningWorkflowInfo
- type ScheduleDescription
- type SchedulerWorkflowInput
- type SchedulerWorkflowState
- type TriggerSource
- type UnpauseSignal
- type UpdateSignal
- type WorkerManager
Constants ¶
const ( WorkflowTypeName = "cadence-scheduler" TaskListName = "cadence-scheduler" SignalNamePause = "scheduler-pause" SignalNameUnpause = "scheduler-unpause" SignalNameUpdate = "scheduler-update" SignalNameBackfill = "scheduler-backfill" SignalNameDelete = "scheduler-delete" QueryTypeDescribe = "scheduler-describe" // Metric name strings emitted via tally.Scope (workflow.GetMetricsScope). SchedulerSignalReceivedCountPerDomain = "scheduler_signal_received_count_per_domain" SchedulerMissedFiredCountPerDomain = "scheduler_missed_fired_count_per_domain" SchedulerMissedSkippedCountPerDomain = "scheduler_missed_skipped_count_per_domain" SchedulerBackfillFiredCountPerDomain = "scheduler_backfill_fired_count_per_domain" // SchedulerBackfillRejectedCountPerDomain counts backfill signals dropped by // the workflow after the RPC has already returned success. Tagged with the // rejection reason (invalid_range, queue_full). SchedulerBackfillRejectedCountPerDomain = "scheduler_backfill_rejected_count_per_domain" SchedulerContinueAsNewCountPerDomain = "scheduler_continue_as_new_count_per_domain" // SchedulerBufferOverflowCountPerDomain measures fires dropped because the // BUFFER overlap policy queue is full. Tagged with the drop reason so // operators can distinguish drops driven by the user's buffer_limit // (reason=user_limit) from drops driven by the server-side ceiling that // protects ContinueAsNew payload size (reason=system_limit). SchedulerBufferOverflowCountPerDomain = "scheduler_buffer_overflow_count_per_domain" // Tag key strings for scheduler workflow metrics. SignalTypeTag = "signal_type" CatchUpPolicyTag = "catch_up_policy" ReasonTag = "reason" // ContinueAsNew reason tag values for scheduler_continue_as_new_count metric. ContinueAsNewReasonMissedRun = "missed_run" ContinueAsNewReasonBackfill = "back_fill" ContinueAsNewReasonBufferDrain = "buffer_drain" ContinueAsNewReasonSignal = "signal" ContinueAsNewReasonIterationCap = "iteration_cap" // Buffer overflow reason tag values for scheduler_buffer_overflow_count metric. // Distinguishes drops driven by the user's buffer_limit from drops driven by // the server-side cap that protects ContinueAsNew payload size. BufferOverflowReasonUserLimit = "user_limit" BufferOverflowReasonSystemLimit = "system_limit" // Reason tag values for scheduler_backfill_rejected_count_per_domain. BackfillRejectedReasonInvalidRange = "invalid_range" BackfillRejectedReasonQueueFull = "queue_full" // MaxBufferedFiresSystemLimit caps the BUFFER overlap policy queue regardless // of buffer_limit (including buffer_limit=0 meaning unlimited). It bounds the // ContinueAsNew payload size: each BufferedFire is ~50 bytes JSON, so 1000 // entries stays well within the workflow input size limit. MaxBufferedFiresSystemLimit = 1000 // MaxConcurrencyLimitSystemLimit caps ConcurrencyLimit for the bounded CONCURRENT // overlap policy regardless of the user-configured value. It bounds the // RunningWorkflows slice carried in ContinueAsNew payload: each RunningWorkflowInfo // is ~110 bytes JSON, so 1000 entries adds ~107KB — well within the 2MB hard limit // and leaving headroom for the rest of the workflow state. Exceeding the 2MB limit // causes Cadence to fail the workflow entirely with no graceful degradation. MaxConcurrencyLimitSystemLimit = 1000 // Search attribute keys set on target workflows started by the scheduler. // The string values are defined in common/definition to make them part of // the default indexed keys for all visibility backends. SearchAttrScheduleID = definition.CadenceScheduleID SearchAttrScheduleTime = definition.CadenceScheduleTime SearchAttrIsBackfill = definition.CadenceScheduleIsBackfill SearchAttrBackfillID = definition.CadenceScheduleBackfillID // Search attribute keys set on the scheduler workflow itself for ListSchedules. // CadenceScheduleState is a Keyword SA holding the current lifecycle state // ("active" or "paused"). Modeled as a string rather than a boolean so it can // be extended to additional states (e.g. "expired") without introducing new // search attributes. "Deleted" is not a value because a deleted schedule's // workflow is closed and filtered by workflow status instead. SearchAttrScheduleState = definition.CadenceScheduleState // CadenceScheduleCron holds the current cron expression so ListSchedules // can display it without querying each scheduler workflow. Refreshed on // workflow start (including after ContinueAsNew triggered by UpdateSchedule). SearchAttrScheduleCron = definition.CadenceScheduleCron // CadenceScheduleWorkflowType holds the target workflow type name that the // schedule starts on each fire. Same refresh semantics as the cron SA. SearchAttrScheduleWorkflowType = definition.CadenceScheduleWorkflowType ScheduleStateActive = "active" ScheduleStatePaused = "paused" )
Variables ¶
This section is empty.
Functions ¶
func SchedulerWorkflow ¶
func SchedulerWorkflow(ctx workflow.Context, input SchedulerWorkflowInput) error
SchedulerWorkflow is a long-running workflow that manages a single schedule. It computes the next fire time from the cron expression, waits via a timer, and dispatches the configured action. Signals control pause/unpause, update, backfill, and deletion.
The main loop follows a state-machine pattern: all inputs (signals and timer) uniformly mutate state, and then a single decision point inspects the resulting state to determine what to do next. ContinueAsNew is triggered on any state-changing signal (pause, unpause, update) so the new execution's input is always the authoritative source of truth.
Types ¶
type BackfillRequest ¶
type BackfillRequest struct {
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy"`
BackfillID string `json:"backfillId,omitempty"`
}
BackfillRequest is a queued backfill that persists across ContinueAsNew.
type BackfillSignal ¶
type BackfillSignal struct {
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy"`
BackfillID string `json:"backfillId,omitempty"`
}
BackfillSignal is the payload sent with a backfill signal.
type BootstrapParams ¶
type BootstrapParams struct {
ServiceClient workflowserviceclient.Interface
FrontendClient frontend.Client
MetricsClient metrics.Client
Logger log.Logger
DomainCache cache.DomainCache
MembershipResolver membership.Resolver
HostInfo membership.HostInfo
// RefreshInterval returns how often the manager should re-scan the
// domain cache to reconcile per-domain workers. Re-evaluated on every
// tick so live dynamic-config changes take effect on the next iteration.
// Nil falls back to a sensible default.
RefreshInterval dynamicproperties.DurationPropertyFn
}
BootstrapParams contains the parameters needed to create a scheduler worker manager.
type BufferedFire ¶
type BufferedFire struct {
ScheduledTime time.Time `json:"scheduledTime"`
TriggerSource TriggerSource `json:"triggerSource"`
OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy,omitempty"`
// BackfillID is set when TriggerSource is backfill so BUFFER drains stamp the same SA.
BackfillID string `json:"backfillId,omitempty"`
}
BufferedFire is a schedule fire queued for sequential execution by the BUFFER overlap policy. ScheduledTime and TriggerSource are preserved so the deferred start uses the same WorkflowID and RequestID it would have used at fire time. OverlapPolicy is the overlap policy in effect for this fire (schedule default or a backfill override). Zero (INVALID) means inherit input.Policies.OverlapPolicy for compatibility with older persisted workflow state.
type PauseSignal ¶
type PauseSignal struct {
Reason string `json:"reason,omitempty"`
PausedBy string `json:"pausedBy,omitempty"`
}
PauseSignal is the payload sent with a pause signal.
type ProcessFireRequest ¶
type ProcessFireRequest struct {
Domain string `json:"domain"`
ScheduleID string `json:"scheduleId"`
Action types.StartWorkflowAction `json:"action"`
ScheduledTime time.Time `json:"scheduledTime"`
TriggerSource TriggerSource `json:"triggerSource"`
OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy"`
LastStartedWorkflow *RunningWorkflowInfo `json:"lastStartedWorkflow,omitempty"`
// ConcurrencyLimit mirrors SchedulePolicies.ConcurrencyLimit:
// nil = unset (use server default; effectively unlimited)
// *int32(0) = explicitly unlimited
// *int32(N) = capped at N concurrent runs
ConcurrencyLimit *int32 `json:"concurrencyLimit,omitempty"`
// RunningWorkflows is the current in-flight set from workflow state; used
// only when OverlapPolicy==CONCURRENT and ConcurrencyLimit > 0.
RunningWorkflows []RunningWorkflowInfo `json:"runningWorkflows,omitempty"`
// BackfillID is non-empty only for fires driven by a schedule backfill (matches RPC BackfillID).
BackfillID string `json:"backfillId,omitempty"`
}
ProcessFireRequest is the input to processScheduleFireActivity. It contains everything the activity needs to resolve the overlap policy and start the target workflow. All side effects (describe, cancel, terminate, start) happen inside this single activity so the workflow history stays stable when the overlap logic evolves.
type ProcessFireResult ¶
type ProcessFireResult struct {
StartedWorkflow *RunningWorkflowInfo `json:"startedWorkflow,omitempty"`
TotalDelta int64 `json:"totalDelta"`
SkippedDelta int64 `json:"skippedDelta"`
// Buffered is true when the BUFFER overlap policy deferred this fire
// because the previous target workflow was still running. The workflow
// appends the fire to state.BufferedFires and retries draining on the
// next loop iteration.
Buffered bool `json:"buffered,omitempty"`
// ActiveWorkflows is the updated in-flight set for bounded CONCURRENT; the workflow
// replaces state.RunningWorkflows with it after each fire. Nil for all other policies.
ActiveWorkflows []RunningWorkflowInfo `json:"activeWorkflows,omitempty"`
}
ProcessFireResult is the output of processScheduleFireActivity. The workflow applies these counters and tracking info to its state after the activity returns.
type RunningWorkflowInfo ¶
type RunningWorkflowInfo struct {
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
}
RunningWorkflowInfo identifies a target workflow started by the scheduler, used for overlap policy checks.
type ScheduleDescription ¶
type ScheduleDescription struct {
ScheduleID string `json:"scheduleId"`
Domain string `json:"domain"`
Spec types.ScheduleSpec `json:"spec"`
Action types.ScheduleAction `json:"action"`
Policies types.SchedulePolicies `json:"policies"`
Paused bool `json:"paused"`
PauseReason string `json:"pauseReason,omitempty"`
PausedBy string `json:"pausedBy,omitempty"`
LastRunTime time.Time `json:"lastRunTime,omitempty"`
NextRunTime time.Time `json:"nextRunTime,omitempty"`
TotalRuns int64 `json:"totalRuns"`
MissedRuns int64 `json:"missedRuns"`
SkippedRuns int64 `json:"skippedRuns"`
Memo *types.Memo `json:"memo,omitempty"`
SearchAttributes *types.SearchAttributes `json:"searchAttributes,omitempty"`
}
ScheduleDescription is the query result returned by the describe query handler. It provides a snapshot of the schedule's current configuration and runtime state.
type SchedulerWorkflowInput ¶
type SchedulerWorkflowInput struct {
Domain string `json:"domain"`
ScheduleID string `json:"scheduleId"`
Spec types.ScheduleSpec `json:"spec"`
Action types.ScheduleAction `json:"action"`
Policies types.SchedulePolicies `json:"policies"`
SearchAttributes *types.SearchAttributes `json:"searchAttributes,omitempty"`
Memo *types.Memo `json:"memo,omitempty"`
State SchedulerWorkflowState `json:"state"`
}
SchedulerWorkflowInput is the input to the scheduler workflow. It carries the schedule definition and any prior state (for ContinueAsNew).
type SchedulerWorkflowState ¶
type SchedulerWorkflowState struct {
Paused bool `json:"paused"`
PauseReason string `json:"pauseReason,omitempty"`
PausedBy string `json:"pausedBy,omitempty"`
Deleted bool `json:"-"` // transient flag, not persisted across ContinueAsNew
LastRunTime time.Time `json:"lastRunTime,omitempty"` // most recent scheduled run time the workflow has processed
LastProcessedTime time.Time `json:"lastProcessedTime,omitempty"` // catch-up watermark: latest missed fire we've processed (fired or skipped)
NextRunTime time.Time `json:"nextRunTime,omitempty"`
TotalRuns int64 `json:"totalRuns"`
MissedRuns int64 `json:"missedRuns"`
SkippedRuns int64 `json:"skippedRuns"`
Iterations int `json:"iterations"`
PendingBackfills []BackfillRequest `json:"pendingBackfills,omitempty"`
// BufferedFires holds fires queued for sequential execution under the BUFFER
// overlap policy. Fires are appended when the previous target workflow is
// still running at fire time and drained in FIFO order on subsequent
// opportunities (timer wakeups, signal wakeups). Persisted across
// ContinueAsNew so buffered work isn't lost on workflow recycling.
BufferedFires []BufferedFire `json:"bufferedFires,omitempty"`
// LastStartedWorkflow tracks the most recently started target workflow so
// the overlap policy can check whether it is still running before starting
// the next one. Nil when no workflow has been started yet.
LastStartedWorkflow *RunningWorkflowInfo `json:"lastStartedWorkflow,omitempty"`
// RunningWorkflows holds in-flight target workflows under bounded CONCURRENT
// (ConcurrencyLimit > 0); completed entries are pruned by the activity on each fire.
RunningWorkflows []RunningWorkflowInfo `json:"runningWorkflows,omitempty"`
}
SchedulerWorkflowState is the mutable runtime state that survives ContinueAsNew.
type TriggerSource ¶
type TriggerSource string
TriggerSource identifies what caused a schedule fire, used to differentiate RequestIDs so that e.g. a backfill for the same timestamp as a normal fire does not collide in server-side deduplication.
const ( TriggerSourceSchedule TriggerSource = "schedule" TriggerSourceBackfill TriggerSource = "backfill" )
type UnpauseSignal ¶
type UnpauseSignal struct {
Reason string `json:"reason,omitempty"`
CatchUpPolicy types.ScheduleCatchUpPolicy `json:"catchUpPolicy,omitempty"`
}
UnpauseSignal is the payload sent with an unpause signal.
type UpdateSignal ¶
type UpdateSignal struct {
Spec *types.ScheduleSpec `json:"spec,omitempty"`
Action *types.ScheduleAction `json:"action,omitempty"`
Policies *types.SchedulePolicies `json:"policies,omitempty"`
SearchAttributes *types.SearchAttributes `json:"searchAttributes,omitempty"`
}
UpdateSignal is the payload sent with an update signal.
type WorkerManager ¶
type WorkerManager struct {
// contains filtered or unexported fields
}
WorkerManager manages per-domain scheduler workers. It periodically scans the domain cache and uses the membership hashring to determine which domains this host should cover. For each such domain it starts a Cadence SDK worker polling the scheduler task list. Each domain is covered by workerRedundancyFactor hosts simultaneously so that a single host failure does not cause a scheduling gap.
func NewWorkerManager ¶
func NewWorkerManager(params *BootstrapParams, enabledFn dynamicproperties.BoolPropertyFnWithDomainFilter) *WorkerManager
NewWorkerManager creates a new per-domain scheduler worker manager.
func (*WorkerManager) Start ¶
func (m *WorkerManager) Start()
Start begins the background loop that manages per-domain workers.
func (*WorkerManager) Stop ¶
func (m *WorkerManager) Stop()
Stop signals the background loop to stop and waits for it to finish. It then stops all active workers.