workflows

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: PostgreSQL Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicateWorkflow = errors.New("duplicate workflow already in progress")

Functions

func Provide

func Provide(i *do.Injector)

Types

type CreatePgBackRestBackupInput

type CreatePgBackRestBackupInput struct {
	DatabaseID        string                    `json:"database_id"`
	TaskID            uuid.UUID                 `json:"task_id"`
	NodeName          string                    `json:"node_name"`
	BackupFromStandby bool                      `json:"backup_from_standby"`
	Instances         []*InstanceHost           `json:"instances"`
	BackupOptions     *pgbackrest.BackupOptions `json:"backup_options"`
}

type CreatePgBackRestBackupOutput

type CreatePgBackRestBackupOutput struct{}

type DeleteDatabaseInput

type DeleteDatabaseInput struct {
	DatabaseID string    `json:"database_id"`
	TaskID     uuid.UUID `json:"task_id"`
}

type DeleteDatabaseOutput

type DeleteDatabaseOutput struct{}

type FailoverInput

type FailoverInput struct {
	DatabaseID          string
	NodeName            string
	Instances           []*activities.InstanceHost
	CandidateInstanceID string
	SkipValidation      bool
	TaskID              uuid.UUID
}

type FailoverOutput

type FailoverOutput struct{}

type InstanceHost

type InstanceHost struct {
	InstanceID string `json:"instance_id"`
	HostID     string `json:"host_id"`
}

type Orchestrator

type Orchestrator interface {
	WorkerQueues() ([]workflow.Queue, error)
}

type PgBackRestRestoreInput

type PgBackRestRestoreInput struct {
	TaskID        uuid.UUID               `json:"task_id"`
	Spec          *database.Spec          `json:"spec"`
	TargetNodes   []string                `json:"target_nodes"`
	RestoreConfig *database.RestoreConfig `json:"restore_config"`
	NodeTaskIDs   map[string]uuid.UUID    `json:"node_tasks_ids"`
}

type PgBackRestRestoreOutput

type PgBackRestRestoreOutput struct{}

type PlanRestoreInput

type PlanRestoreInput struct {
	Spec          *database.Spec          `json:"spec"`
	Current       *resource.State         `json:"current"`
	RestoreConfig *database.RestoreConfig `json:"restore_config"`
	NodeTaskIDs   map[string]uuid.UUID    `json:"node_tasks_ids"`
}

type PlanRestoreOutput

type PlanRestoreOutput struct {
	Plans []resource.Plan `json:"plans"`
}

type PlanUpdateInput

type PlanUpdateInput struct {
	Options operations.UpdateDatabaseOptions `json:"options"`
	Spec    *database.Spec                   `json:"spec"`
	Current *resource.State                  `json:"current"`
}

type PlanUpdateOutput

type PlanUpdateOutput struct {
	Plans []resource.Plan `json:"plans"`
}

type RefreshCurrentStateInput

type RefreshCurrentStateInput struct {
	DatabaseID string    `json:"database_id"`
	TaskID     uuid.UUID `json:"task_id"`
}

type RefreshCurrentStateOutput

type RefreshCurrentStateOutput struct {
	State *resource.State `json:"state"`
}

type RestartInstanceInput

type RestartInstanceInput struct {
	HostID      string    `json:"host_id"`
	DatabaseID  string    `json:"database_id"`
	InstanceID  string    `json:"instance_id"`
	TaskID      uuid.UUID `json:"task_id"`
	ScheduledAt time.Time `json:"scheduled_at"` // Optional, if empty, restart immediately
}

type RestartInstanceOutput

type RestartInstanceOutput struct{}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	cfg config.Config,
	client *client.Client,
	taskSvc *task.Service,
	workflows *Workflows,
) *Service

func (*Service) CancelDatabaseTask

func (s *Service) CancelDatabaseTask(ctx context.Context, DatabaseID string, taskID uuid.UUID) (*task.Task, error)

func (*Service) CreateDatabase

func (s *Service) CreateDatabase(ctx context.Context, spec *database.Spec) (*task.Task, error)

func (*Service) CreatePgBackRestBackup

func (s *Service) CreatePgBackRestBackup(
	ctx context.Context,
	databaseID string,
	nodeName string,
	backupFromStandby bool,
	instances []*InstanceHost,
	backupOptions *pgbackrest.BackupOptions,
) (*task.Task, error)

func (*Service) DeleteDatabase

func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) (*task.Task, error)

func (*Service) FailoverDatabaseNode

func (s *Service) FailoverDatabaseNode(ctx context.Context, input *FailoverInput) (*task.Task, error)

func (*Service) PgBackRestRestore

func (s *Service) PgBackRestRestore(
	ctx context.Context,
	spec *database.Spec,
	targetNodes []string,
	restoreConfig *database.RestoreConfig,
) (*task.Task, []*task.Task, error)

func (*Service) RestartInstance

func (s *Service) RestartInstance(ctx context.Context, input *RestartInstanceInput) (*task.Task, error)

func (*Service) StartInstance

func (s *Service) StartInstance(ctx context.Context, input *StartInstanceInput) (*task.Task, error)

func (*Service) StopInstance

func (s *Service) StopInstance(ctx context.Context, input *StopInstanceInput) (*task.Task, error)

func (*Service) SwitchoverDatabaseNode

func (s *Service) SwitchoverDatabaseNode(ctx context.Context, input *SwitchoverInput) (*task.Task, error)

func (*Service) UpdateDatabase

func (s *Service) UpdateDatabase(ctx context.Context, spec *database.Spec, forceUpdate bool) (*task.Task, error)

func (*Service) ValidateSpec

func (s *Service) ValidateSpec(ctx context.Context, input *ValidateSpecInput) (*ValidateSpecOutput, error)

type StartInstanceInput

type StartInstanceInput struct {
	DatabaseID string
	InstanceID string
	HostID     string
	Cohort     *host.Cohort
	TaskID     uuid.UUID
}

type StartInstanceOutput

type StartInstanceOutput struct{}

type StopInstanceInput

type StopInstanceInput struct {
	DatabaseID string
	InstanceID string
	HostID     string
	Cohort     *host.Cohort
	TaskID     uuid.UUID
}

type StopInstanceOutput

type StopInstanceOutput struct{}

type SwitchoverInput

type SwitchoverInput struct {
	DatabaseID          string
	NodeName            string
	Instances           []*activities.InstanceHost
	CandidateInstanceID string
	ScheduledAt         time.Time
	TaskID              uuid.UUID
}

type SwitchoverOutput

type SwitchoverOutput struct{}

type UpdateDatabaseInput

type UpdateDatabaseInput struct {
	TaskID      uuid.UUID      `json:"task_id"`
	Spec        *database.Spec `json:"spec"`
	ForceUpdate bool           `json:"force_update"`
}

type UpdateDatabaseOutput

type UpdateDatabaseOutput struct {
	Updated *resource.State `json:"current"`
}

type ValidateSpecInput

type ValidateSpecInput struct {
	DatabaseID   string
	Spec         *database.Spec
	PreviousSpec *database.Spec
}

type ValidateSpecOutput

type ValidateSpecOutput struct {
	Valid  bool     `json:"valid"`
	Errors []string `json:"errors,omitempty"`
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(be backend.Backend, workflows *Workflows, orch Orchestrator) (*Worker, error)

func (*Worker) Shutdown

func (w *Worker) Shutdown() error

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

type Workflows

type Workflows struct {
	Config     config.Config
	Activities *activities.Activities
}

func (*Workflows) CreatePgBackRestBackup

func (w *Workflows) CreatePgBackRestBackup(ctx workflow.Context, input *CreatePgBackRestBackupInput) (*CreatePgBackRestBackupOutput, error)

func (*Workflows) DeleteDatabase

func (w *Workflows) DeleteDatabase(ctx workflow.Context, input *DeleteDatabaseInput) (*DeleteDatabaseOutput, error)

func (*Workflows) ExecutePlanRestore

func (w *Workflows) ExecutePlanRestore(
	ctx workflow.Context,
	input *PlanRestoreInput,
) workflow.Future[*PlanRestoreOutput]

func (*Workflows) ExecutePlanUpdate

func (w *Workflows) ExecutePlanUpdate(
	ctx workflow.Context,
	input *PlanUpdateInput,
) workflow.Future[*PlanUpdateOutput]

func (*Workflows) ExecuteRefreshCurrentState

func (w *Workflows) ExecuteRefreshCurrentState(
	ctx workflow.Context,
	input *RefreshCurrentStateInput,
) workflow.Future[*RefreshCurrentStateOutput]

func (*Workflows) Failover

func (w *Workflows) Failover(ctx workflow.Context, in *FailoverInput) (*FailoverOutput, error)

func (*Workflows) PgBackRestRestore

func (w *Workflows) PgBackRestRestore(ctx workflow.Context, input *PgBackRestRestoreInput) (*PgBackRestRestoreOutput, error)

func (*Workflows) PlanRestore

func (w *Workflows) PlanRestore(ctx workflow.Context, input *PlanRestoreInput) (*PlanRestoreOutput, error)

func (*Workflows) PlanUpdate

func (w *Workflows) PlanUpdate(ctx workflow.Context, input *PlanUpdateInput) (*PlanUpdateOutput, error)

func (*Workflows) RefreshCurrentState

func (w *Workflows) RefreshCurrentState(ctx workflow.Context, input *RefreshCurrentStateInput) (*RefreshCurrentStateOutput, error)

func (*Workflows) Register

func (w *Workflows) Register(work *worker.Worker) error

func (*Workflows) RestartInstance

func (w *Workflows) RestartInstance(ctx workflow.Context, input *RestartInstanceInput) (*RestartInstanceOutput, error)

func (*Workflows) StartInstance

func (w *Workflows) StartInstance(ctx workflow.Context, input *StartInstanceInput) (*StartInstanceOutput, error)

func (*Workflows) StopInstance

func (w *Workflows) StopInstance(ctx workflow.Context, input *StopInstanceInput) (*StopInstanceOutput, error)

func (*Workflows) Switchover

func (w *Workflows) Switchover(ctx workflow.Context, in *SwitchoverInput) (*SwitchoverOutput, error)

func (*Workflows) UpdateDatabase

func (w *Workflows) UpdateDatabase(ctx workflow.Context, input *UpdateDatabaseInput) (*UpdateDatabaseOutput, error)

func (*Workflows) ValidateSpec

func (w *Workflows) ValidateSpec(ctx workflow.Context, input *ValidateSpecInput) (*ValidateSpecOutput, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL