scheduler

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 42 Imported by: 0

Documentation

Overview

Package scheduler is a generated GoMock package.

Index

Constants

View Source
const (

	// Upper bound on how many times starting an individual buffered action should be retried.
	InvokerMaxStartAttempts = 10 // TODO - dial this up/remove it
)

Variables

View Source
var (
	CurrentTweakables = dynamicconfig.NewNamespaceTypedSetting(
		"scheduler.tweakables",
		DefaultTweakables,
		"A set of tweakable parameters for the CHASM scheduler.")

	RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
		"scheduler.retryPolicy.initialInterval",
		time.Second,
		`The initial backoff interval when retrying a failed task.`,
	)

	RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
		"scheduler.retryPolicy.maxInterval",
		time.Minute,
		`The maximum backoff interval when retrying a failed task.`,
	)

	ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting(
		"scheduler.serviceCallTimeout",
		2*time.Second,
		`The upper bound on how long a service call can take before being timed out.`,
	)

	DefaultTweakables = Tweakables{
		DefaultCatchupWindow:              365 * 24 * time.Hour,
		MinCatchupWindow:                  10 * time.Second,
		MaxBufferSize:                     1000,
		GeneratorBufferReserveSize:        50,
		CanceledTerminatedCountAsFailures: false,
		MaxActionsPerExecution:            5,
		IdleTime:                          7 * 24 * time.Hour,
	}
)
View Source
var (
	ErrConflictTokenMismatch = serviceerror.NewFailedPrecondition("mismatched conflict token")
	ErrClosed                = serviceerror.NewFailedPrecondition("schedule closed")
	ErrTooManyBackfillers    = serviceerror.NewFailedPrecondition("too many concurrent backfillers")
	ErrInvalidQuery          = serviceerror.NewInvalidArgument("missing or invalid query")
	ErrSentinel              = serviceerror.NewNotFound("schedule is a sentinel")
)

Functions

func Register

func Register(
	registry *chasm.Registry,
	library *Library,
) error

Types

type BackfillRequestType

type BackfillRequestType int
const (
	RequestTypeTrigger BackfillRequestType = iota
	RequestTypeBackfill
)

type Backfiller

The Backfiller component is responsible for buffering manually requested actions. Each backfill request has its own Backfiller node.

func (*Backfiller) LifecycleState

func (b *Backfiller) LifecycleState(ctx chasm.Context) chasm.LifecycleState

func (*Backfiller) RequestType

func (b *Backfiller) RequestType() BackfillRequestType

type BackfillerTaskExecutor

type BackfillerTaskExecutor struct {
	// contains filtered or unexported fields
}

func (*BackfillerTaskExecutor) Execute

func (*BackfillerTaskExecutor) Validate

func (b *BackfillerTaskExecutor) Validate(
	ctx chasm.Context,
	backfiller *Backfiller,
	attrs chasm.TaskAttributes,
	_ *schedulerpb.BackfillerTask,
) (bool, error)

type BackfillerTaskExecutorOptions

type BackfillerTaskExecutorOptions struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	BaseLogger     log.Logger
	SpecProcessor  SpecProcessor
}

type Config

type Config struct {
	Tweakables         dynamicconfig.TypedPropertyFnWithNamespaceFilter[Tweakables]
	ServiceCallTimeout dynamicconfig.DurationPropertyFn
	RetryPolicy        func() backoff.RetryPolicy
}

Config is the CHASM Scheduler dynamic config, shared among all sub-components.

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection) *Config

type Generator

The Generator component is responsible for buffering actions according to the schedule's specification. Manually requested actions (from an immediate request or backfill) are separately handled in the Backfiller component.

func NewGenerator

func NewGenerator(ctx chasm.MutableContext) *Generator

NewGenerator returns an initialized Generator component, which should be parented under a Scheduler root node.

func (*Generator) Generate

func (g *Generator) Generate(ctx chasm.MutableContext)

Generate immediately kicks off a new GeneratorTask. Used after updating the schedule specification.

func (*Generator) LifecycleState

func (g *Generator) LifecycleState(ctx chasm.Context) chasm.LifecycleState

func (*Generator) UpdateFutureActionTimes

func (g *Generator) UpdateFutureActionTimes(
	ctx chasm.Context,
	specBuilder *scheduler.SpecBuilder,
)

UpdateFutureActionTimes computes and stores the next scheduled action times.

type GeneratorTaskExecutor

type GeneratorTaskExecutor struct {
	SpecProcessor SpecProcessor
	// contains filtered or unexported fields
}

func (*GeneratorTaskExecutor) Execute

func (*GeneratorTaskExecutor) Validate

func (g *GeneratorTaskExecutor) Validate(
	ctx chasm.Context,
	generator *Generator,
	attrs chasm.TaskAttributes,
	_ *schedulerpb.GeneratorTask,
) (bool, error)

type GeneratorTaskExecutorOptions

type GeneratorTaskExecutorOptions struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	BaseLogger     log.Logger
	SpecProcessor  SpecProcessor
	SpecBuilder    *scheduler.SpecBuilder
}

type Invoker

The Invoker component is responsible for executing buffered actions.

func NewInvoker

func NewInvoker(ctx chasm.MutableContext) *Invoker

NewInvoker returns an initialized Invoker component, which should be parented under a Scheduler root component.

func (*Invoker) EnqueueBufferedStarts

func (i *Invoker) EnqueueBufferedStarts(ctx chasm.MutableContext, starts []*schedulespb.BufferedStart)

EnqueueBufferedStarts adds new BufferedStarts to the invocation queue, immediately kicking off a processing task.

func (*Invoker) LifecycleState

func (i *Invoker) LifecycleState(ctx chasm.Context) chasm.LifecycleState

type InvokerExecuteTaskExecutor

type InvokerExecuteTaskExecutor struct {
	// contains filtered or unexported fields
}

func (*InvokerExecuteTaskExecutor) Execute

func (*InvokerExecuteTaskExecutor) Validate

type InvokerProcessBufferTaskExecutor

type InvokerProcessBufferTaskExecutor struct {
	// contains filtered or unexported fields
}

func (*InvokerProcessBufferTaskExecutor) Execute

func (*InvokerProcessBufferTaskExecutor) Validate

type InvokerTaskExecutorOptions

type InvokerTaskExecutorOptions struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	BaseLogger     log.Logger
	SpecProcessor  SpecProcessor

	HistoryClient resource.HistoryClient

	// FrontendClient is used for specifically StartWorkflow calls, to ensure that
	// the request makes it through metering's interceptor. Because we don't change for
	// terminate/cancels, we can go directly to history for other service calls.
	FrontendClient workflowservice.WorkflowServiceClient
}

type Library

type Library struct {
	chasm.UnimplementedLibrary

	SchedulerIdleTaskExecutor        *SchedulerIdleTaskExecutor
	GeneratorTaskExecutor            *GeneratorTaskExecutor
	InvokerExecuteTaskExecutor       *InvokerExecuteTaskExecutor
	InvokerProcessBufferTaskExecutor *InvokerProcessBufferTaskExecutor
	BackfillerTaskExecutor           *BackfillerTaskExecutor
	// contains filtered or unexported fields
}

func NewLibrary

func NewLibrary(
	handler *handler,
	SchedulerIdleTaskExecutor *SchedulerIdleTaskExecutor,
	GeneratorTaskExecutor *GeneratorTaskExecutor,
	InvokerExecuteTaskExecutor *InvokerExecuteTaskExecutor,
	InvokerProcessBufferTaskExecutor *InvokerProcessBufferTaskExecutor,
	BackfillerTaskExecutor *BackfillerTaskExecutor,
) *Library

func (*Library) Components

func (l *Library) Components() []*chasm.RegistrableComponent

func (*Library) Name

func (l *Library) Name() string

func (*Library) RegisterServices

func (l *Library) RegisterServices(server *grpc.Server)

func (*Library) Tasks

func (l *Library) Tasks() []*chasm.RegistrableTask

type MockSpecProcessor

type MockSpecProcessor struct {
	// contains filtered or unexported fields
}

MockSpecProcessor is a mock of SpecProcessor interface.

func NewMockSpecProcessor

func NewMockSpecProcessor(ctrl *gomock.Controller) *MockSpecProcessor

NewMockSpecProcessor creates a new mock instance.

func (*MockSpecProcessor) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSpecProcessor) NextTime

func (m *MockSpecProcessor) NextTime(arg0 *Scheduler, after time.Time) (scheduler.GetNextTimeResult, error)

NextTime mocks base method.

func (*MockSpecProcessor) ProcessTimeRange

func (m *MockSpecProcessor) ProcessTimeRange(arg0 *Scheduler, start, end time.Time, overlapPolicy enums.ScheduleOverlapPolicy, workflowID, backfillID string, manual bool, limit *int) (*ProcessedTimeRange, error)

ProcessTimeRange mocks base method.

type MockSpecProcessorMockRecorder

type MockSpecProcessorMockRecorder struct {
	// contains filtered or unexported fields
}

MockSpecProcessorMockRecorder is the mock recorder for MockSpecProcessor.

func (*MockSpecProcessorMockRecorder) NextTime

func (mr *MockSpecProcessorMockRecorder) NextTime(arg0, after any) *gomock.Call

NextTime indicates an expected call of NextTime.

func (*MockSpecProcessorMockRecorder) ProcessTimeRange

func (mr *MockSpecProcessorMockRecorder) ProcessTimeRange(arg0, start, end, overlapPolicy, workflowID, backfillID, manual, limit any) *gomock.Call

ProcessTimeRange indicates an expected call of ProcessTimeRange.

type ProcessedTimeRange

type ProcessedTimeRange struct {
	NextWakeupTime time.Time
	LastActionTime time.Time
	BufferedStarts []*schedulespb.BufferedStart
	// DroppedCount is the number of actions that would have been buffered but
	// were dropped due to the limit being reached. Only populated when a limit
	// is provided.
	DroppedCount int64
}

type Scheduler

type Scheduler struct {
	chasm.UnimplementedComponent

	// Persisted internal state, consisting of state relevant to all components in
	// the scheduler tree.
	*schedulerpb.SchedulerState

	// Last success/failure payloads, stored on this separate data node
	// to minimize write traffic.
	LastCompletionResult chasm.Field[*schedulerpb.LastCompletionResult]

	Generator   chasm.Field[*Generator]
	Invoker     chasm.Field[*Invoker]
	Backfillers chasm.Map[string, *Backfiller] // Backfill ID => *Backfiller

	Visibility chasm.Field[*chasm.Visibility]
	// contains filtered or unexported fields
}

Scheduler is the root component of a CHASM scheduler tree. The rest of the tree will consist of 2 or more sub-components: - Generator: buffers actions according to the schedule specification - Invoker: executes buffered actions - Backfiller: buffers actions according to requested backfills

func CreateScheduler

func CreateScheduler(
	ctx chasm.MutableContext,
	req *schedulerpb.CreateScheduleRequest,
) (*Scheduler, error)

CreateScheduler initializes a new Scheduler for CreateSchedule requests.

func CreateSchedulerFromMigration

func CreateSchedulerFromMigration(
	ctx chasm.MutableContext,
	req *schedulerpb.CreateFromMigrationStateRequest,
) (*Scheduler, error)

CreateSchedulerFromMigration initializes a CHASM scheduler from migrated V1 state. Unlike CreateScheduler, this preserves the conflict token and other state from V1.

The migrated state components (scheduler, generator, invoker, backfillers) are directly initialized from the request, preserving all state including the conflict token for client compatibility.

func NewScheduler

func NewScheduler(
	ctx chasm.MutableContext,
	namespace, namespaceID, scheduleID string,
	input *schedulepb.Schedule,
	patch *schedulepb.SchedulePatch,
) (*Scheduler, error)

NewScheduler returns an initialized CHASM scheduler root component.

func NewSentinel

func NewSentinel(
	ctx chasm.MutableContext,
	namespace, namespaceID, scheduleID string,
) *Scheduler

NewSentinel returns a sentinel CHASM scheduler that exists only to reserve the schedule ID. Sentinels have no sub-components and return NotFound on all API operations.

func (*Scheduler) Delete

Delete marks the Scheduler as closed without an idle timer.

func (*Scheduler) Describe

Describe returns the current state of the Scheduler for DescribeSchedule requests.

func (*Scheduler) HandleNexusCompletion

func (s *Scheduler) HandleNexusCompletion(
	ctx chasm.MutableContext,
	info *persistencespb.ChasmNexusCompletion,
) error

HandleNexusCompletion allows Scheduler to record workflow completions from worfklows started by the same scheduler tree's Invoker.

func (*Scheduler) IsSentinel

func (s *Scheduler) IsSentinel() bool

IsSentinel returns true if this is a sentinel scheduler.

func (*Scheduler) LifecycleState

func (s *Scheduler) LifecycleState(ctx chasm.Context) chasm.LifecycleState

LifecycleState implements the chasm.Component interface.

func (*Scheduler) ListInfo

func (s *Scheduler) ListInfo(
	ctx chasm.Context,
) *schedulepb.ScheduleListInfo

ListInfo returns the ScheduleListInfo, used as the visibility memo, and to answer List queries.

func (*Scheduler) ListMatchingTimes

ListMatchingTimes returns the upcoming times that the schedule will trigger within the given time range.

func (*Scheduler) Memo

func (s *Scheduler) Memo(
	ctx chasm.Context,
) proto.Message

Memo returns the scheduler's info block for visibility.

func (*Scheduler) NewImmediateBackfiller

func (s *Scheduler) NewImmediateBackfiller(
	ctx chasm.MutableContext,
	request *schedulepb.TriggerImmediatelyRequest,
) *Backfiller

NewImmediateBackfiller returns an intialized Backfiller component, which should be parented under a Scheduler root node.

func (*Scheduler) NewRangeBackfiller

func (s *Scheduler) NewRangeBackfiller(
	ctx chasm.MutableContext,
	request *schedulepb.BackfillRequest,
) *Backfiller

NewRangeBackfiller returns an intialized Backfiller component, which should be parented under a Scheduler root node.

func (*Scheduler) Patch

Patch applies a patch to the schedule for PatchSchedule requests.

func (*Scheduler) SearchAttributes

func (s *Scheduler) SearchAttributes(chasm.Context) []chasm.SearchAttributeKeyValue

SearchAttributes returns the Temporal-managed key values for visibility.

func (*Scheduler) Terminate

Terminate implements the chasm.RootComponent interface.

func (*Scheduler) Update

Update replaces the schedule with a new one for UpdateSchedule requests.

func (*Scheduler) WorkflowID

func (s *Scheduler) WorkflowID() string

WorkflowID returns the Workflow ID given as part of the request spec. During start generation, nominal time is suffixed to this ID.

type SchedulerIdleTaskExecutor

type SchedulerIdleTaskExecutor struct {
	// contains filtered or unexported fields
}

func (*SchedulerIdleTaskExecutor) Execute

func (*SchedulerIdleTaskExecutor) Validate

func (r *SchedulerIdleTaskExecutor) Validate(
	ctx chasm.Context,
	scheduler *Scheduler,
	taskAttrs chasm.TaskAttributes,
	task *schedulerpb.SchedulerIdleTask,
) (bool, error)

type SchedulerIdleTaskExecutorOptions

type SchedulerIdleTaskExecutorOptions struct {
	fx.In

	Config *Config
}

type SpecProcessor

type SpecProcessor interface {
	// ProcessTimeRange generates buffered actions according to the schedule spec for
	// the given time range.
	//
	// The parameter manual is propagated to the returned BufferedStarts. When the limit
	// is set to a non-nil pointer, it will be decremented for each buffered start, and
	// the function will return early should limit reach 0.
	//
	// If backfillID is set, it will be used to generate request IDs.
	ProcessTimeRange(
		scheduler *Scheduler,
		start, end time.Time,
		overlapPolicy enumspb.ScheduleOverlapPolicy,
		workflowID string,
		backfillID string,
		manual bool,
		limit *int,
	) (*ProcessedTimeRange, error)

	// NextTime provides a peek at the next time in the spec following 'after'.
	NextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error)
}

SpecProcessor is used by the Generator and Backfiller to generate buffered actions according to the schedule spec.

type SpecProcessorImpl

type SpecProcessorImpl struct {
	// contains filtered or unexported fields
}

func NewSpecProcessor

func NewSpecProcessor(
	config *Config,
	metricsHandler metrics.Handler,
	logger log.Logger,
	specBuilder *legacyscheduler.SpecBuilder,
) *SpecProcessorImpl

func (*SpecProcessorImpl) NextTime

func (s *SpecProcessorImpl) NextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error)

NextTime returns the next time result, or an error if the schedule cannot be compiled.

func (*SpecProcessorImpl) ProcessTimeRange

func (s *SpecProcessorImpl) ProcessTimeRange(
	scheduler *Scheduler,
	start, end time.Time,
	overlapPolicy enumspb.ScheduleOverlapPolicy,
	workflowID string,
	backfillID string,
	manual bool,
	limit *int,
) (*ProcessedTimeRange, error)

type Tweakables

type Tweakables struct {
	DefaultCatchupWindow              time.Duration // Default for catchup window
	MinCatchupWindow                  time.Duration // Minimum for catchup window
	MaxBufferSize                     int           // MaxBufferSize limits the number of buffered actions pending execution in total
	GeneratorBufferReserveSize        int           // Minimum number of spaces in `BufferedStarts` reserved for automated actions.
	CanceledTerminatedCountAsFailures bool          // Whether cancelled+terminated count for pause-on-failure
	MaxActionsPerExecution            int           // Limits the number of actions (startWorkflow, terminate/cancel) taken by ExecuteTask in a single iteration
	IdleTime                          time.Duration // How long to keep schedules after they're done
}

Directories

Path Synopsis
gen
schedulerpb/v1
Code generated by protoc-gen-go-helpers.
Code generated by protoc-gen-go-helpers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL