scheduler

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: PostgreSQL Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ScheduledJobPrefix    = "scheduled_jobs"
	SchedulerLeaderPrefix = "scheduler_leader"

	JobStatusPending   = "pending"
	JobStatusRunning   = "running"
	JobStatusCompleted = "completed"
	JobStatusFailed    = "failed"

	WorkflowCreatePgBackRestBackup = "CreatePgBackRestBackup"
)
View Source
const ResourceTypeScheduledJob resource.Type = "scheduler.job"

Variables

View Source
var ErrNonLeader = errors.New("the elector is not leader")

Functions

func Provide

func Provide(i *do.Injector)

func RegisterResourceTypes

func RegisterResourceTypes(registry *resource.Registry)

func ScheduledJobResourceIdentifier

func ScheduledJobResourceIdentifier(id string) resource.Identifier

Types

type CreatePgBackRestBackupScheduleInput

type CreatePgBackRestBackupScheduleInput struct {
	DatabaseID string `json:"database_id"`
	NodeName   string `json:"node_name"`
	Type       string `json:"type"`
}

Schedule input struct matching scheduled job args

type DefaultWorkflowExecutor

type DefaultWorkflowExecutor struct {
	// contains filtered or unexported fields
}

func NewDefaultWorkflowExecutor

func NewDefaultWorkflowExecutor(wf *workflows.Service, dbSvc *database.Service) *DefaultWorkflowExecutor

func (*DefaultWorkflowExecutor) Execute

func (e *DefaultWorkflowExecutor) Execute(ctx context.Context, workflowName string, args map[string]interface{}) error

type Elector

type Elector struct {
	// contains filtered or unexported fields
}

func NewElector

func NewElector(
	hostID string,
	store *LeaderStore,
	logger zerolog.Logger,
	ttl time.Duration,
) *Elector

func (*Elector) Error

func (e *Elector) Error() <-chan error

func (*Elector) IsLeader

func (e *Elector) IsLeader(_ context.Context) error

func (*Elector) Shutdown

func (e *Elector) Shutdown() error

func (*Elector) Start

func (e *Elector) Start(ctx context.Context) error

type LeaderStore

type LeaderStore struct {
	// contains filtered or unexported fields
}

func NewLeaderStore

func NewLeaderStore(client *clientv3.Client, root string) *LeaderStore

func (*LeaderStore) Create

func (s *LeaderStore) Create(item *StoredLeader) storage.PutOp[*StoredLeader]

func (*LeaderStore) GetByKey

func (s *LeaderStore) GetByKey() storage.GetOp[*StoredLeader]

func (*LeaderStore) Key

func (s *LeaderStore) Key() string

func (*LeaderStore) Update

func (s *LeaderStore) Update(item *StoredLeader) storage.PutOp[*StoredLeader]

func (*LeaderStore) Watch

func (s *LeaderStore) Watch() storage.WatchOp[*StoredLeader]

type ScheduledJobResource

type ScheduledJobResource struct {
	ID        string                 `json:"id"`                   // Unique job identifier
	CronExpr  string                 `json:"cron_expr"`            // Cron expression for scheduling
	Workflow  string                 `json:"workflow"`             // Name of the workflow to execute
	Args      map[string]interface{} `json:"args"`                 // Arguments to the workflow
	DependsOn []resource.Identifier  `json:"depends_on,omitempty"` // Optional resource dependencies
}

func NewScheduledJobResource

func NewScheduledJobResource(
	id, cronExpr, workflow string,
	args map[string]interface{},
	dependsOn []resource.Identifier,
) *ScheduledJobResource

func (*ScheduledJobResource) Create

func (*ScheduledJobResource) Delete

func (*ScheduledJobResource) Dependencies

func (r *ScheduledJobResource) Dependencies() []resource.Identifier

func (*ScheduledJobResource) DiffIgnore

func (r *ScheduledJobResource) DiffIgnore() []string

func (*ScheduledJobResource) Executor

func (r *ScheduledJobResource) Executor() resource.Executor

func (*ScheduledJobResource) Identifier

func (r *ScheduledJobResource) Identifier() resource.Identifier

func (*ScheduledJobResource) Refresh

func (*ScheduledJobResource) ResourceVersion

func (r *ScheduledJobResource) ResourceVersion() string

func (*ScheduledJobResource) Update

type ScheduledJobRunner

type ScheduledJobRunner struct {
	Job      *StoredScheduledJob
	Executor WorkflowExecutor
	Logger   zerolog.Logger
	Store    *ScheduledJobStore
}

func NewScheduledJobRunner

func NewScheduledJobRunner(
	job *StoredScheduledJob,
	executor WorkflowExecutor,
	logger zerolog.Logger,
	store *ScheduledJobStore,
) (*ScheduledJobRunner, error)

func (*ScheduledJobRunner) Run

func (r *ScheduledJobRunner) Run(ctx context.Context)

type ScheduledJobStore

type ScheduledJobStore struct {
	// contains filtered or unexported fields
}

func NewScheduledJobStore

func NewScheduledJobStore(client *clientv3.Client, root string) *ScheduledJobStore

func (*ScheduledJobStore) Delete

func (s *ScheduledJobStore) Delete(jobID string) storage.DeleteOp

func (*ScheduledJobStore) Get

func (*ScheduledJobStore) GetAll

func (*ScheduledJobStore) Key

func (s *ScheduledJobStore) Key(id string) string

func (*ScheduledJobStore) Prefix

func (s *ScheduledJobStore) Prefix() string

func (*ScheduledJobStore) Put

func (*ScheduledJobStore) WatchJobs

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	logger zerolog.Logger,
	store *ScheduledJobStore,
	executor WorkflowExecutor,
	etcdClient *clientv3.Client,
	elector *Elector,
) *Service

NewService initializes a new scheduled job service with a scheduler and job store.

func (*Service) DeleteJob

func (s *Service) DeleteJob(ctx context.Context, jobID string) error

func (*Service) Error

func (s *Service) Error() <-chan error

func (*Service) JobExists

func (s *Service) JobExists(jobID string) bool

func (*Service) ListScheduledJobs

func (s *Service) ListScheduledJobs() []string

func (*Service) RegisterJob

func (s *Service) RegisterJob(ctx context.Context, job *StoredScheduledJob) error

func (*Service) Shutdown

func (s *Service) Shutdown() error

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

func (*Service) UnregisterJob

func (s *Service) UnregisterJob(jobID string)

type StoredLeader

type StoredLeader struct {
	storage.StoredValue
	HostID    string
	CreatedAt time.Time
}

type StoredScheduledJob

type StoredScheduledJob struct {
	storage.StoredValue
	ID       string         `json:"id"`
	CronExpr string         `json:"cron_expr"`
	Workflow string         `json:"workflow"`
	ArgsJSON map[string]any `json:"args_json"`
}

type WorkflowExecutor

type WorkflowExecutor interface {
	Execute(ctx context.Context, workflowName string, args map[string]interface{}) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL