Documentation
¶
Index ¶
- Constants
- Variables
- func Provide(i *do.Injector)
- func RegisterResourceTypes(registry *resource.Registry)
- func ScheduledJobResourceIdentifier(id string) resource.Identifier
- type CreatePgBackRestBackupScheduleInput
- type DefaultWorkflowExecutor
- type Elector
- type LeaderStore
- func (s *LeaderStore) Create(item *StoredLeader) storage.PutOp[*StoredLeader]
- func (s *LeaderStore) GetByKey() storage.GetOp[*StoredLeader]
- func (s *LeaderStore) Key() string
- func (s *LeaderStore) Update(item *StoredLeader) storage.PutOp[*StoredLeader]
- func (s *LeaderStore) Watch() storage.WatchOp[*StoredLeader]
- type ScheduledJobResource
- func (r *ScheduledJobResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *ScheduledJobResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *ScheduledJobResource) Dependencies() []resource.Identifier
- func (r *ScheduledJobResource) DiffIgnore() []string
- func (r *ScheduledJobResource) Executor() resource.Executor
- func (r *ScheduledJobResource) Identifier() resource.Identifier
- func (r *ScheduledJobResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *ScheduledJobResource) ResourceVersion() string
- func (r *ScheduledJobResource) Update(ctx context.Context, rc *resource.Context) error
- type ScheduledJobRunner
- type ScheduledJobStore
- func (s *ScheduledJobStore) Delete(jobID string) storage.DeleteOp
- func (s *ScheduledJobStore) Get(jobID string) storage.GetOp[*StoredScheduledJob]
- func (s *ScheduledJobStore) GetAll() storage.GetMultipleOp[*StoredScheduledJob]
- func (s *ScheduledJobStore) Key(id string) string
- func (s *ScheduledJobStore) Prefix() string
- func (s *ScheduledJobStore) Put(job *StoredScheduledJob) storage.PutOp[*StoredScheduledJob]
- func (s *ScheduledJobStore) WatchJobs() storage.WatchOp[*StoredScheduledJob]
- type Service
- func (s *Service) DeleteJob(ctx context.Context, jobID string) error
- func (s *Service) Error() <-chan error
- func (s *Service) JobExists(jobID string) bool
- func (s *Service) ListScheduledJobs() []string
- func (s *Service) RegisterJob(ctx context.Context, job *StoredScheduledJob) error
- func (s *Service) Shutdown() error
- func (s *Service) Start(ctx context.Context) error
- func (s *Service) UnregisterJob(jobID string)
- type StoredLeader
- type StoredScheduledJob
- type WorkflowExecutor
Constants ¶
View Source
const ( ScheduledJobPrefix = "scheduled_jobs" SchedulerLeaderPrefix = "scheduler_leader" JobStatusPending = "pending" JobStatusRunning = "running" JobStatusCompleted = "completed" JobStatusFailed = "failed" WorkflowCreatePgBackRestBackup = "CreatePgBackRestBackup" )
View Source
const ResourceTypeScheduledJob resource.Type = "scheduler.job"
Variables ¶
View Source
var ErrNonLeader = errors.New("the elector is not leader")
Functions ¶
func RegisterResourceTypes ¶
func ScheduledJobResourceIdentifier ¶
func ScheduledJobResourceIdentifier(id string) resource.Identifier
Types ¶
type CreatePgBackRestBackupScheduleInput ¶
type CreatePgBackRestBackupScheduleInput struct {
DatabaseID string `json:"database_id"`
NodeName string `json:"node_name"`
Type string `json:"type"`
}
Schedule input struct matching scheduled job args
type DefaultWorkflowExecutor ¶
type DefaultWorkflowExecutor struct {
// contains filtered or unexported fields
}
func NewDefaultWorkflowExecutor ¶
func NewDefaultWorkflowExecutor(wf *workflows.Service, dbSvc *database.Service) *DefaultWorkflowExecutor
type LeaderStore ¶
type LeaderStore struct {
// contains filtered or unexported fields
}
func NewLeaderStore ¶
func NewLeaderStore(client *clientv3.Client, root string) *LeaderStore
func (*LeaderStore) Create ¶
func (s *LeaderStore) Create(item *StoredLeader) storage.PutOp[*StoredLeader]
func (*LeaderStore) GetByKey ¶
func (s *LeaderStore) GetByKey() storage.GetOp[*StoredLeader]
func (*LeaderStore) Key ¶
func (s *LeaderStore) Key() string
func (*LeaderStore) Update ¶
func (s *LeaderStore) Update(item *StoredLeader) storage.PutOp[*StoredLeader]
func (*LeaderStore) Watch ¶
func (s *LeaderStore) Watch() storage.WatchOp[*StoredLeader]
type ScheduledJobResource ¶
type ScheduledJobResource struct {
ID string `json:"id"` // Unique job identifier
CronExpr string `json:"cron_expr"` // Cron expression for scheduling
Workflow string `json:"workflow"` // Name of the workflow to execute
Args map[string]interface{} `json:"args"` // Arguments to the workflow
DependsOn []resource.Identifier `json:"depends_on,omitempty"` // Optional resource dependencies
}
func NewScheduledJobResource ¶
func NewScheduledJobResource( id, cronExpr, workflow string, args map[string]interface{}, dependsOn []resource.Identifier, ) *ScheduledJobResource
func (*ScheduledJobResource) Dependencies ¶
func (r *ScheduledJobResource) Dependencies() []resource.Identifier
func (*ScheduledJobResource) DiffIgnore ¶
func (r *ScheduledJobResource) DiffIgnore() []string
func (*ScheduledJobResource) Executor ¶
func (r *ScheduledJobResource) Executor() resource.Executor
func (*ScheduledJobResource) Identifier ¶
func (r *ScheduledJobResource) Identifier() resource.Identifier
func (*ScheduledJobResource) ResourceVersion ¶
func (r *ScheduledJobResource) ResourceVersion() string
type ScheduledJobRunner ¶
type ScheduledJobRunner struct {
Job *StoredScheduledJob
Executor WorkflowExecutor
Logger zerolog.Logger
Store *ScheduledJobStore
}
func NewScheduledJobRunner ¶
func NewScheduledJobRunner( job *StoredScheduledJob, executor WorkflowExecutor, logger zerolog.Logger, store *ScheduledJobStore, ) (*ScheduledJobRunner, error)
func (*ScheduledJobRunner) Run ¶
func (r *ScheduledJobRunner) Run(ctx context.Context)
type ScheduledJobStore ¶
type ScheduledJobStore struct {
// contains filtered or unexported fields
}
func NewScheduledJobStore ¶
func NewScheduledJobStore(client *clientv3.Client, root string) *ScheduledJobStore
func (*ScheduledJobStore) Delete ¶
func (s *ScheduledJobStore) Delete(jobID string) storage.DeleteOp
func (*ScheduledJobStore) Get ¶
func (s *ScheduledJobStore) Get(jobID string) storage.GetOp[*StoredScheduledJob]
func (*ScheduledJobStore) GetAll ¶
func (s *ScheduledJobStore) GetAll() storage.GetMultipleOp[*StoredScheduledJob]
func (*ScheduledJobStore) Key ¶
func (s *ScheduledJobStore) Key(id string) string
func (*ScheduledJobStore) Prefix ¶
func (s *ScheduledJobStore) Prefix() string
func (*ScheduledJobStore) Put ¶
func (s *ScheduledJobStore) Put(job *StoredScheduledJob) storage.PutOp[*StoredScheduledJob]
func (*ScheduledJobStore) WatchJobs ¶
func (s *ScheduledJobStore) WatchJobs() storage.WatchOp[*StoredScheduledJob]
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService( logger zerolog.Logger, store *ScheduledJobStore, executor WorkflowExecutor, etcdClient *clientv3.Client, elector *Elector, ) *Service
NewService initializes a new scheduled job service with a scheduler and job store.
func (*Service) ListScheduledJobs ¶
func (*Service) RegisterJob ¶
func (s *Service) RegisterJob(ctx context.Context, job *StoredScheduledJob) error
func (*Service) UnregisterJob ¶
type StoredLeader ¶
type StoredLeader struct {
storage.StoredValue
HostID string
CreatedAt time.Time
}
type StoredScheduledJob ¶
Click to show internal directories.
Click to hide internal directories.