Documentation
¶
Index ¶
- Constants
- Variables
- func ConnectToInstance(ctx context.Context, opts *ConnectionOptions) (*pgx.Conn, error)
- func DatabaseStateModifiable(state DatabaseState) bool
- func GetPrimaryInstanceID(ctx context.Context, patroniClient *patroni.Client, timeout time.Duration) (string, error)
- func InstanceIDFor(hostID, databaseID, nodeName string) string
- func InstanceResourceIdentifier(instanceID string) resource.Identifier
- func LagTrackerCommitTSIdentifier(originNode, receiverNode string) resource.Identifier
- func NodeResourceIdentifier(nodeName string) resource.Identifier
- func Provide(i *do.Injector)
- func RegisterResourceTypes(registry *resource.Registry)
- func ReplicationSlotAdvanceFromCTSResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
- func ReplicationSlotCreateResourceIdentifier(databaseName, providerNode, subscriberNode string) resource.Identifier
- func SubscriptionResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
- func SwitchoverResourceIdentifier(nodeName string) resource.Identifier
- func SyncEventResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
- func ValidateChangedSpec(current, updated *Spec) error
- func WaitForPatroniRunning(ctx context.Context, patroniClient *patroni.Client, timeout time.Duration) error
- func WaitForSyncEventResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
- type BackupConfig
- type BackupSchedule
- type BackupScheduleType
- type ConnectionInfo
- type ConnectionOptions
- type Database
- type DatabaseState
- type DatabaseStore
- func (s *DatabaseStore) Create(item *StoredDatabase) storage.PutOp[*StoredDatabase]
- func (s *DatabaseStore) Delete(item *StoredDatabase) storage.DeleteValueOp[*StoredDatabase]
- func (s *DatabaseStore) ExistsByKey(databaseID string) storage.ExistsOp
- func (s *DatabaseStore) GetAll() storage.GetMultipleOp[*StoredDatabase]
- func (s *DatabaseStore) GetByKey(databaseID string) storage.GetOp[*StoredDatabase]
- func (s *DatabaseStore) GetByKeys(databaseIDs ...string) storage.GetMultipleOp[*StoredDatabase]
- func (s *DatabaseStore) Key(databaseID string) string
- func (s *DatabaseStore) Prefix() string
- func (s *DatabaseStore) Update(item *StoredDatabase) storage.PutOp[*StoredDatabase]
- type Extension
- type ExtraNetworkSpec
- type ExtraVolumesSpec
- type Instance
- type InstanceResource
- func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context, dbName string) (*pgx.Conn, error)
- func (r *InstanceResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *InstanceResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *InstanceResource) Dependencies() []resource.Identifier
- func (r *InstanceResource) DiffIgnore() []string
- func (r *InstanceResource) Executor() resource.Executor
- func (r *InstanceResource) Identifier() resource.Identifier
- func (r *InstanceResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *InstanceResource) ResourceVersion() string
- func (r *InstanceResource) Update(ctx context.Context, rc *resource.Context) error
- func (r *InstanceResource) Validate() error
- type InstanceResources
- type InstanceSpec
- type InstanceState
- type InstanceStatus
- type InstanceStatusStore
- func (s *InstanceStatusStore) DatabasePrefix(databaseID string) string
- func (s *InstanceStatusStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp
- func (s *InstanceStatusStore) GetAll() storage.GetMultipleOp[*StoredInstanceStatus]
- func (s *InstanceStatusStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredInstanceStatus]
- func (s *InstanceStatusStore) GetByKey(databaseID, instanceID string) storage.GetOp[*StoredInstanceStatus]
- func (s *InstanceStatusStore) Key(databaseID, instanceID string) string
- func (s *InstanceStatusStore) Prefix() string
- func (s *InstanceStatusStore) Put(item *StoredInstanceStatus) storage.PutOp[*StoredInstanceStatus]
- type InstanceStore
- func (s *InstanceStore) DatabasePrefix(databaseID string) string
- func (s *InstanceStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp
- func (s *InstanceStore) GetAll() storage.GetMultipleOp[*StoredInstance]
- func (s *InstanceStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredInstance]
- func (s *InstanceStore) GetByKey(databaseID, instanceID string) storage.GetOp[*StoredInstance]
- func (s *InstanceStore) Key(databaseID, instanceID string) string
- func (s *InstanceStore) Prefix() string
- func (s *InstanceStore) Put(item *StoredInstance) storage.PutOp[*StoredInstance]
- type InstanceUpdateOptions
- type LagTrackerCommitTimestampResource
- func (r *LagTrackerCommitTimestampResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *LagTrackerCommitTimestampResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *LagTrackerCommitTimestampResource) Dependencies() []resource.Identifier
- func (r *LagTrackerCommitTimestampResource) DiffIgnore() []string
- func (r *LagTrackerCommitTimestampResource) Executor() resource.Executor
- func (r *LagTrackerCommitTimestampResource) Identifier() resource.Identifier
- func (r *LagTrackerCommitTimestampResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *LagTrackerCommitTimestampResource) ResourceVersion() string
- func (r *LagTrackerCommitTimestampResource) Update(ctx context.Context, rc *resource.Context) error
- type Node
- type NodeInstances
- type NodeResource
- func (n *NodeResource) Create(ctx context.Context, rc *resource.Context) error
- func (n *NodeResource) Delete(ctx context.Context, rc *resource.Context) error
- func (n *NodeResource) Dependencies() []resource.Identifier
- func (n *NodeResource) DiffIgnore() []string
- func (n *NodeResource) Executor() resource.Executor
- func (n *NodeResource) Identifier() resource.Identifier
- func (n *NodeResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (n *NodeResource) ResourceVersion() string
- func (n *NodeResource) Update(ctx context.Context, rc *resource.Context) error
- type Orchestrator
- type OrchestratorOpts
- type ReplicationSlotAdvanceFromCTSResource
- func (r *ReplicationSlotAdvanceFromCTSResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *ReplicationSlotAdvanceFromCTSResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *ReplicationSlotAdvanceFromCTSResource) Dependencies() []resource.Identifier
- func (r *ReplicationSlotAdvanceFromCTSResource) DiffIgnore() []string
- func (r *ReplicationSlotAdvanceFromCTSResource) Executor() resource.Executor
- func (r *ReplicationSlotAdvanceFromCTSResource) Identifier() resource.Identifier
- func (r *ReplicationSlotAdvanceFromCTSResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *ReplicationSlotAdvanceFromCTSResource) ResourceVersion() string
- func (r *ReplicationSlotAdvanceFromCTSResource) Update(ctx context.Context, rc *resource.Context) error
- type ReplicationSlotCreateResource
- func (r *ReplicationSlotCreateResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *ReplicationSlotCreateResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *ReplicationSlotCreateResource) Dependencies() []resource.Identifier
- func (r *ReplicationSlotCreateResource) DiffIgnore() []string
- func (r *ReplicationSlotCreateResource) Executor() resource.Executor
- func (r *ReplicationSlotCreateResource) Identifier() resource.Identifier
- func (r *ReplicationSlotCreateResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *ReplicationSlotCreateResource) ResourceVersion() string
- func (r *ReplicationSlotCreateResource) Update(ctx context.Context, rc *resource.Context) error
- type RestoreConfig
- type Service
- func (s *Service) CreateDatabase(ctx context.Context, spec *Spec) (*Database, error)
- func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error
- func (s *Service) DeleteInstance(ctx context.Context, databaseID, instanceID string) error
- func (s *Service) GetAllInstances(ctx context.Context) ([]*Instance, error)
- func (s *Service) GetDatabase(ctx context.Context, databaseID string) (*Database, error)
- func (s *Service) GetDatabases(ctx context.Context) ([]*Database, error)
- func (s *Service) GetInstance(ctx context.Context, databaseID, instanceID string) (*Instance, error)
- func (s *Service) GetInstances(ctx context.Context, databaseID string) ([]*Instance, error)
- func (s *Service) GetStoredInstanceState(ctx context.Context, databaseID, instanceID string) (InstanceState, error)
- func (s *Service) InstanceCountForHost(ctx context.Context, hostID string) (int, error)
- func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error
- func (s *Service) UpdateDatabase(ctx context.Context, state DatabaseState, spec *Spec) (*Database, error)
- func (s *Service) UpdateDatabaseState(ctx context.Context, databaseID string, from, to DatabaseState) error
- func (s *Service) UpdateInstance(ctx context.Context, opts *InstanceUpdateOptions) error
- func (s *Service) UpdateInstanceStatus(ctx context.Context, databaseID string, instanceID string, ...) error
- type Spec
- func (s *Spec) Clone() *Spec
- func (s *Spec) DefaultOptionalFieldsFrom(other *Spec)
- func (s *Spec) Node(name string) (*Node, error)
- func (s *Spec) NodeInstances() ([]*NodeInstances, error)
- func (s *Spec) NodeNames() []string
- func (s *Spec) NormalizeBackupConfig()
- func (s *Spec) RemoveBackupConfigFrom(nodes ...string)
- func (s *Spec) ValidateNodeNames(names ...string) error
- type SpecStore
- func (s *SpecStore) Create(item *StoredSpec) storage.PutOp[*StoredSpec]
- func (s *SpecStore) Delete(item *StoredSpec) storage.DeleteValueOp[*StoredSpec]
- func (s *SpecStore) ExistsByKey(databaseID string) storage.ExistsOp
- func (s *SpecStore) GetAll() storage.GetMultipleOp[*StoredSpec]
- func (s *SpecStore) GetByKey(databaseID string) storage.GetOp[*StoredSpec]
- func (s *SpecStore) GetByKeys(databaseIDs ...string) storage.GetMultipleOp[*StoredSpec]
- func (s *SpecStore) Key(databaseID string) string
- func (s *SpecStore) Prefix() string
- func (s *SpecStore) Update(item *StoredSpec) storage.PutOp[*StoredSpec]
- type Store
- type StoredDatabase
- type StoredInstance
- type StoredInstanceStatus
- type StoredSpec
- type SubscriptionResource
- func (s *SubscriptionResource) AddDependentResource(dep resource.Identifier)
- func (s *SubscriptionResource) Create(ctx context.Context, rc *resource.Context) error
- func (s *SubscriptionResource) Delete(ctx context.Context, rc *resource.Context) error
- func (s *SubscriptionResource) Dependencies() []resource.Identifier
- func (s *SubscriptionResource) DiffIgnore() []string
- func (s *SubscriptionResource) Executor() resource.Executor
- func (s *SubscriptionResource) Identifier() resource.Identifier
- func (s *SubscriptionResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (s *SubscriptionResource) ResourceVersion() string
- func (s *SubscriptionResource) Update(ctx context.Context, rc *resource.Context) error
- type SubscriptionStatus
- type SwarmOpts
- type SwitchoverResource
- func (s *SwitchoverResource) Create(ctx context.Context, rc *resource.Context) error
- func (s *SwitchoverResource) Delete(ctx context.Context, rc *resource.Context) error
- func (s *SwitchoverResource) Dependencies() []resource.Identifier
- func (s *SwitchoverResource) DiffIgnore() []string
- func (s *SwitchoverResource) Executor() resource.Executor
- func (s *SwitchoverResource) Identifier() resource.Identifier
- func (s *SwitchoverResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (s *SwitchoverResource) ResourceVersion() string
- func (s *SwitchoverResource) Update(ctx context.Context, rc *resource.Context) error
- type SyncEventResource
- func (r *SyncEventResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *SyncEventResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *SyncEventResource) Dependencies() []resource.Identifier
- func (r *SyncEventResource) DiffIgnore() []string
- func (r *SyncEventResource) Executor() resource.Executor
- func (r *SyncEventResource) Identifier() resource.Identifier
- func (r *SyncEventResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *SyncEventResource) ResourceVersion() string
- func (r *SyncEventResource) Update(ctx context.Context, rc *resource.Context) error
- type User
- type ValidationResult
- type WaitForSyncEventResource
- func (r *WaitForSyncEventResource) Create(ctx context.Context, rc *resource.Context) error
- func (r *WaitForSyncEventResource) Delete(ctx context.Context, rc *resource.Context) error
- func (r *WaitForSyncEventResource) Dependencies() []resource.Identifier
- func (r *WaitForSyncEventResource) DiffIgnore() []string
- func (r *WaitForSyncEventResource) Executor() resource.Executor
- func (r *WaitForSyncEventResource) Identifier() resource.Identifier
- func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Context) error
- func (r *WaitForSyncEventResource) ResourceVersion() string
- func (r *WaitForSyncEventResource) Update(ctx context.Context, rc *resource.Context) error
Constants ¶
const ResourceTypeInstance resource.Type = "database.instance"
const ResourceTypeLagTrackerCommitTS resource.Type = "database.lag_tracker_commit_ts"
const ResourceTypeNode resource.Type = "database.node"
const ResourceTypeReplicationSlotAdvanceFromCTS resource.Type = "database.replication_slot_advance_from_cts"
const ResourceTypeReplicationSlotCreate resource.Type = "database.replication_slot_create"
const ResourceTypeSubscription resource.Type = "database.subscription"
const ResourceTypeSwitchover resource.Type = "database.switchover"
const ResourceTypeSyncEvent resource.Type = "database.sync_event"
const ResourceTypeWaitForSyncEvent resource.Type = "database.wait_for_sync_event"
Variables ¶
var ( ErrDatabaseAlreadyExists = errors.New("database already exists") ErrDatabaseNotFound = errors.New("database not found") ErrDatabaseNotModifiable = errors.New("database not modifiable") ErrInstanceNotFound = errors.New("instance not found") ErrInstanceStopped = errors.New("instance stopped") ErrInvalidDatabaseUpdate = errors.New("invalid database update") ErrInvalidSourceNode = errors.New("invalid source node") )
var ErrNodeNotInDBSpec = errors.New("node not in db spec")
Functions ¶
func ConnectToInstance ¶
func DatabaseStateModifiable ¶
func DatabaseStateModifiable(state DatabaseState) bool
func GetPrimaryInstanceID ¶
func InstanceIDFor ¶
func InstanceResourceIdentifier ¶
func InstanceResourceIdentifier(instanceID string) resource.Identifier
func LagTrackerCommitTSIdentifier ¶
func LagTrackerCommitTSIdentifier(originNode, receiverNode string) resource.Identifier
func NodeResourceIdentifier ¶
func NodeResourceIdentifier(nodeName string) resource.Identifier
func RegisterResourceTypes ¶
func ReplicationSlotAdvanceFromCTSResourceIdentifier ¶
func ReplicationSlotAdvanceFromCTSResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
ReplicationSlotAdvanceFromCTSResourceIdentifier creates a stable identifier for this resource.
func ReplicationSlotCreateResourceIdentifier ¶
func ReplicationSlotCreateResourceIdentifier(databaseName, providerNode, subscriberNode string) resource.Identifier
func SubscriptionResourceIdentifier ¶
func SubscriptionResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
func SwitchoverResourceIdentifier ¶
func SwitchoverResourceIdentifier(nodeName string) resource.Identifier
func SyncEventResourceIdentifier ¶
func SyncEventResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
func ValidateChangedSpec ¶
func WaitForPatroniRunning ¶
func WaitForSyncEventResourceIdentifier ¶
func WaitForSyncEventResourceIdentifier(providerNode, subscriberNode string) resource.Identifier
Types ¶
type BackupConfig ¶
type BackupConfig struct {
Repositories []*pgbackrest.Repository `json:"repositories"`
Schedules []*BackupSchedule `json:"schedules"`
}
func (*BackupConfig) Clone ¶
func (b *BackupConfig) Clone() *BackupConfig
func (*BackupConfig) DefaultOptionalFieldsFrom ¶
func (b *BackupConfig) DefaultOptionalFieldsFrom(other *BackupConfig)
DefaultOptionalFieldsFrom will default this config's optional fields to the values from the given config.
type BackupSchedule ¶
type BackupSchedule struct {
ID string `json:"id"`
Type BackupScheduleType `json:"type"`
CronExpression string `json:"cron_expression"`
}
func (*BackupSchedule) Clone ¶
func (b *BackupSchedule) Clone() *BackupSchedule
type BackupScheduleType ¶
type BackupScheduleType string
const ( BackupScheduleTypeFull BackupScheduleType = "full" BackupScheduleTypeIncremental BackupScheduleType = "incr" )
type ConnectionInfo ¶
type ConnectionInfo struct {
AdminHost string
AdminPort int
PeerHost string
PeerPort int
PeerSSLCert string
PeerSSLKey string
PeerSSLRootCert string
PatroniPort int
ClientHost string
ClientIPv4Address string
ClientPort int
InstanceHostname string
}
func (*ConnectionInfo) PatroniURL ¶
func (c *ConnectionInfo) PatroniURL() *url.URL
type DatabaseState ¶
type DatabaseState string
const ( DatabaseStateCreating DatabaseState = "creating" DatabaseStateModifying DatabaseState = "modifying" DatabaseStateAvailable DatabaseState = "available" DatabaseStateDeleting DatabaseState = "deleting" DatabaseStateDegraded DatabaseState = "degraded" DatabaseStateFailed DatabaseState = "failed" DatabaseStateBackingUp DatabaseState = "backing_up" DatabaseStateRestoring DatabaseState = "restoring" DatabaseStateStopped DatabaseState = "stopped" DatabaseStateUnknown DatabaseState = "unknown" )
type DatabaseStore ¶
type DatabaseStore struct {
// contains filtered or unexported fields
}
func NewDatabaseStore ¶
func NewDatabaseStore(client *clientv3.Client, root string) *DatabaseStore
func (*DatabaseStore) Create ¶
func (s *DatabaseStore) Create(item *StoredDatabase) storage.PutOp[*StoredDatabase]
func (*DatabaseStore) Delete ¶
func (s *DatabaseStore) Delete(item *StoredDatabase) storage.DeleteValueOp[*StoredDatabase]
func (*DatabaseStore) ExistsByKey ¶
func (s *DatabaseStore) ExistsByKey(databaseID string) storage.ExistsOp
func (*DatabaseStore) GetAll ¶
func (s *DatabaseStore) GetAll() storage.GetMultipleOp[*StoredDatabase]
func (*DatabaseStore) GetByKey ¶
func (s *DatabaseStore) GetByKey(databaseID string) storage.GetOp[*StoredDatabase]
func (*DatabaseStore) GetByKeys ¶
func (s *DatabaseStore) GetByKeys(databaseIDs ...string) storage.GetMultipleOp[*StoredDatabase]
func (*DatabaseStore) Key ¶
func (s *DatabaseStore) Key(databaseID string) string
func (*DatabaseStore) Prefix ¶
func (s *DatabaseStore) Prefix() string
func (*DatabaseStore) Update ¶
func (s *DatabaseStore) Update(item *StoredDatabase) storage.PutOp[*StoredDatabase]
type ExtraNetworkSpec ¶
type ExtraVolumesSpec ¶
type Instance ¶
type Instance struct {
InstanceID string `json:"instance_id"`
DatabaseID string `json:"database_id"`
HostID string `json:"host_id"`
NodeName string `json:"node_name"`
State InstanceState `json:"state"`
Status *InstanceStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Error string `json:"error,omitempty"`
}
type InstanceResource ¶
type InstanceResource struct {
Spec *InstanceSpec `json:"spec"`
InstanceHostname string `json:"instance_hostname"`
PrimaryInstanceID string `json:"primary_instance_id"`
OrchestratorDependencies []resource.Identifier `json:"dependencies"`
ConnectionInfo *ConnectionInfo `json:"connection_info"`
}
func GetAllInstances ¶
func GetPrimaryInstance ¶
func (*InstanceResource) Connection ¶
func (*InstanceResource) Dependencies ¶
func (r *InstanceResource) Dependencies() []resource.Identifier
func (*InstanceResource) DiffIgnore ¶
func (r *InstanceResource) DiffIgnore() []string
func (*InstanceResource) Executor ¶
func (r *InstanceResource) Executor() resource.Executor
func (*InstanceResource) Identifier ¶
func (r *InstanceResource) Identifier() resource.Identifier
func (*InstanceResource) ResourceVersion ¶
func (r *InstanceResource) ResourceVersion() string
func (*InstanceResource) Validate ¶
func (r *InstanceResource) Validate() error
type InstanceResources ¶
type InstanceResources struct {
Instance *InstanceResource
Resources []*resource.ResourceData
}
func NewInstanceResources ¶
func NewInstanceResources(instance *InstanceResource, resources []resource.Resource) (*InstanceResources, error)
func (*InstanceResources) DatabaseID ¶
func (r *InstanceResources) DatabaseID() string
func (*InstanceResources) DatabaseName ¶
func (r *InstanceResources) DatabaseName() string
func (*InstanceResources) HostID ¶
func (r *InstanceResources) HostID() string
func (*InstanceResources) InstanceID ¶
func (r *InstanceResources) InstanceID() string
func (*InstanceResources) NodeName ¶
func (r *InstanceResources) NodeName() string
type InstanceSpec ¶
type InstanceSpec struct {
InstanceID string `json:"instance_id"`
TenantID *string `json:"tenant_id,omitempty"`
DatabaseID string `json:"database_id"`
HostID string `json:"host_id"`
DatabaseName string `json:"database_name"`
NodeName string `json:"node_name"`
NodeOrdinal int `json:"node_ordinal"`
PgEdgeVersion *host.PgEdgeVersion `json:"pg_edge_version"`
Port *int `json:"port"`
CPUs float64 `json:"cpus"`
MemoryBytes uint64 `json:"memory"`
DatabaseUsers []*User `json:"database_users"`
BackupConfig *BackupConfig `json:"backup_config"`
RestoreConfig *RestoreConfig `json:"restore_config"`
PostgreSQLConf map[string]any `json:"postgresql_conf"`
ClusterSize int `json:"cluster_size"`
OrchestratorOpts *OrchestratorOpts `json:"orchestrator_opts,omitempty"`
}
func (*InstanceSpec) Clone ¶
func (s *InstanceSpec) Clone() *InstanceSpec
type InstanceState ¶
type InstanceState string
const ( InstanceStateCreating InstanceState = "creating" InstanceStateModifying InstanceState = "modifying" InstanceStateBackingUp InstanceState = "backing_up" InstanceStateAvailable InstanceState = "available" InstanceStateDegraded InstanceState = "degraded" InstanceStateFailed InstanceState = "failed" InstanceStateStopped InstanceState = "stopped" InstanceStateUnknown InstanceState = "unknown" )
type InstanceStatus ¶
type InstanceStatus struct {
PostgresVersion *string `json:"postgres_version,omitempty"`
SpockVersion *string `json:"spock_version,omitempty"`
Hostname *string `json:"hostname,omitempty"`
IPv4Address *string `json:"ipv4_address,omitempty"`
Port *int `json:"port,omitempty"`
PatroniState *patroni.State `json:"patroni_state,omitempty"`
Role *patroni.InstanceRole `json:"role,omitempty"`
ReadOnly *string `json:"read_only,omitempty"`
PendingRestart *bool `json:"pending_restart,omitempty"`
PatroniPaused *bool `json:"patroni_paused,omitempty"`
StatusUpdatedAt *time.Time `json:"status_updated_at,omitempty"`
Stopped *bool `json:"stopped,omitempty"`
Subscriptions []SubscriptionStatus `json:"subscriptions,omitempty"`
Error *string `json:"error,omitempty"`
}
func (*InstanceStatus) IsPrimary ¶
func (s *InstanceStatus) IsPrimary() bool
type InstanceStatusStore ¶
type InstanceStatusStore struct {
// contains filtered or unexported fields
}
func NewInstanceStatusStore ¶
func NewInstanceStatusStore(client *clientv3.Client, root string) *InstanceStatusStore
func (*InstanceStatusStore) DatabasePrefix ¶
func (s *InstanceStatusStore) DatabasePrefix(databaseID string) string
func (*InstanceStatusStore) DeleteByKey ¶
func (s *InstanceStatusStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp
func (*InstanceStatusStore) GetAll ¶
func (s *InstanceStatusStore) GetAll() storage.GetMultipleOp[*StoredInstanceStatus]
func (*InstanceStatusStore) GetByDatabaseID ¶
func (s *InstanceStatusStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredInstanceStatus]
func (*InstanceStatusStore) GetByKey ¶
func (s *InstanceStatusStore) GetByKey(databaseID, instanceID string) storage.GetOp[*StoredInstanceStatus]
func (*InstanceStatusStore) Key ¶
func (s *InstanceStatusStore) Key(databaseID, instanceID string) string
func (*InstanceStatusStore) Prefix ¶
func (s *InstanceStatusStore) Prefix() string
func (*InstanceStatusStore) Put ¶
func (s *InstanceStatusStore) Put(item *StoredInstanceStatus) storage.PutOp[*StoredInstanceStatus]
type InstanceStore ¶
type InstanceStore struct {
// contains filtered or unexported fields
}
func NewInstanceStore ¶
func NewInstanceStore(client *clientv3.Client, root string) *InstanceStore
func (*InstanceStore) DatabasePrefix ¶
func (s *InstanceStore) DatabasePrefix(databaseID string) string
func (*InstanceStore) DeleteByKey ¶
func (s *InstanceStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp
func (*InstanceStore) GetAll ¶
func (s *InstanceStore) GetAll() storage.GetMultipleOp[*StoredInstance]
func (*InstanceStore) GetByDatabaseID ¶
func (s *InstanceStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredInstance]
func (*InstanceStore) GetByKey ¶
func (s *InstanceStore) GetByKey(databaseID, instanceID string) storage.GetOp[*StoredInstance]
func (*InstanceStore) Key ¶
func (s *InstanceStore) Key(databaseID, instanceID string) string
func (*InstanceStore) Prefix ¶
func (s *InstanceStore) Prefix() string
func (*InstanceStore) Put ¶
func (s *InstanceStore) Put(item *StoredInstance) storage.PutOp[*StoredInstance]
type InstanceUpdateOptions ¶
type LagTrackerCommitTimestampResource ¶
type LagTrackerCommitTimestampResource struct {
// Planner fields
OriginNode string `json:"origin_node"`
ReceiverNode string `json:"receiver_node"`
// Dependency wiring
ExtraDependencies []resource.Identifier `json:"dependent_resources,omitempty"`
// Output (filled at Refresh/Create time)
CommitTimestamp *time.Time `json:"commit_timestamp,omitempty"`
}
func NewLagTrackerCommitTimestampResource ¶
func NewLagTrackerCommitTimestampResource(originNode, receiverNode string) *LagTrackerCommitTimestampResource
func (*LagTrackerCommitTimestampResource) Dependencies ¶
func (r *LagTrackerCommitTimestampResource) Dependencies() []resource.Identifier
func (*LagTrackerCommitTimestampResource) DiffIgnore ¶
func (r *LagTrackerCommitTimestampResource) DiffIgnore() []string
func (*LagTrackerCommitTimestampResource) Executor ¶
func (r *LagTrackerCommitTimestampResource) Executor() resource.Executor
func (*LagTrackerCommitTimestampResource) Identifier ¶
func (r *LagTrackerCommitTimestampResource) Identifier() resource.Identifier
func (*LagTrackerCommitTimestampResource) ResourceVersion ¶
func (r *LagTrackerCommitTimestampResource) ResourceVersion() string
type Node ¶
type Node struct {
Name string `json:"name"`
HostIDs []string `json:"host_ids"`
PostgresVersion string `json:"postgres_version"`
Port *int `json:"port"`
CPUs float64 `json:"cpus"`
MemoryBytes uint64 `json:"memory"`
PostgreSQLConf map[string]any `json:"postgresql_conf"`
BackupConfig *BackupConfig `json:"backup_config"`
RestoreConfig *RestoreConfig `json:"restore_config"`
OrchestratorOpts *OrchestratorOpts `json:"orchestrator_opts,omitempty"`
SourceNode string `json:"source_node,omitempty"`
}
func (*Node) DefaultOptionalFieldsFrom ¶
DefaultOptionalFieldsFrom will default this node's optional fields to the values from the given node.
type NodeInstances ¶
type NodeInstances struct {
NodeName string `json:"node_name"`
SourceNode string `json:"source_node"`
Instances []*InstanceSpec `json:"instances"`
RestoreConfig *RestoreConfig `json:"restore_config"`
}
func (*NodeInstances) InstanceIDs ¶
func (n *NodeInstances) InstanceIDs() []string
type NodeResource ¶
type NodeResource struct {
Name string `json:"name"`
InstanceIDs []string `json:"instance_ids"`
PrimaryInstanceID string `json:"primary_instance_id"`
}
func (*NodeResource) Dependencies ¶
func (n *NodeResource) Dependencies() []resource.Identifier
func (*NodeResource) DiffIgnore ¶
func (n *NodeResource) DiffIgnore() []string
func (*NodeResource) Executor ¶
func (n *NodeResource) Executor() resource.Executor
func (*NodeResource) Identifier ¶
func (n *NodeResource) Identifier() resource.Identifier
func (*NodeResource) ResourceVersion ¶
func (n *NodeResource) ResourceVersion() string
type Orchestrator ¶
type Orchestrator interface {
GenerateInstanceResources(spec *InstanceSpec) (*InstanceResources, error)
GenerateInstanceRestoreResources(spec *InstanceSpec, taskID uuid.UUID) (*InstanceResources, error)
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*ConnectionInfo, error)
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID string, options *pgbackrest.BackupOptions) error
ValidateInstanceSpecs(ctx context.Context, specs []*InstanceSpec) ([]*ValidationResult, error)
StopInstance(ctx context.Context, instanceID string) error
StartInstance(ctx context.Context, instanceID string) error
}
type OrchestratorOpts ¶
type OrchestratorOpts struct {
Swarm *SwarmOpts `json:"docker,omitempty"`
}
func (*OrchestratorOpts) Clone ¶
func (o *OrchestratorOpts) Clone() *OrchestratorOpts
type ReplicationSlotAdvanceFromCTSResource ¶
type ReplicationSlotAdvanceFromCTSResource struct {
ProviderNode string `json:"provider_node"` // slot lives here
SubscriberNode string `json:"subscriber_node"` // target/receiver node
}
ReplicationSlotAdvanceFromCTSResource advances the replication slot on the provider to the LSN derived from the commit timestamp captured in lag_tracker.
func (*ReplicationSlotAdvanceFromCTSResource) Dependencies ¶
func (r *ReplicationSlotAdvanceFromCTSResource) Dependencies() []resource.Identifier
func (*ReplicationSlotAdvanceFromCTSResource) DiffIgnore ¶
func (r *ReplicationSlotAdvanceFromCTSResource) DiffIgnore() []string
No diff-ignore fields needed; this always executes idempotently when asked.
func (*ReplicationSlotAdvanceFromCTSResource) Executor ¶
func (r *ReplicationSlotAdvanceFromCTSResource) Executor() resource.Executor
Execute on the provider node (the slot exists there).
func (*ReplicationSlotAdvanceFromCTSResource) Identifier ¶
func (r *ReplicationSlotAdvanceFromCTSResource) Identifier() resource.Identifier
func (*ReplicationSlotAdvanceFromCTSResource) ResourceVersion ¶
func (r *ReplicationSlotAdvanceFromCTSResource) ResourceVersion() string
type ReplicationSlotCreateResource ¶
type ReplicationSlotCreateResource struct {
DatabaseName string `json:"database_name"`
ProviderNode string `json:"provider_node"`
SubscriberNode string `json:"subscriber_node"`
}
func (*ReplicationSlotCreateResource) Dependencies ¶
func (r *ReplicationSlotCreateResource) Dependencies() []resource.Identifier
func (*ReplicationSlotCreateResource) DiffIgnore ¶
func (r *ReplicationSlotCreateResource) DiffIgnore() []string
func (*ReplicationSlotCreateResource) Executor ¶
func (r *ReplicationSlotCreateResource) Executor() resource.Executor
func (*ReplicationSlotCreateResource) Identifier ¶
func (r *ReplicationSlotCreateResource) Identifier() resource.Identifier
func (*ReplicationSlotCreateResource) ResourceVersion ¶
func (r *ReplicationSlotCreateResource) ResourceVersion() string
type RestoreConfig ¶
type RestoreConfig struct {
SourceDatabaseID string `json:"source_database_id"`
SourceNodeName string `json:"source_node_name"`
SourceDatabaseName string `json:"source_database_name"`
Repository *pgbackrest.Repository `json:"repository"`
RestoreOptions map[string]string `json:"restore_options"`
}
func (*RestoreConfig) Clone ¶
func (r *RestoreConfig) Clone() *RestoreConfig
func (*RestoreConfig) DefaultOptionalFieldsFrom ¶
func (r *RestoreConfig) DefaultOptionalFieldsFrom(other *RestoreConfig)
DefaultOptionalFieldsFrom will default this config's optional fields to the values from the given config.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func NewService(orchestrator Orchestrator, store *Store, hostSvc *host.Service) *Service
func (*Service) CreateDatabase ¶
func (*Service) DeleteDatabase ¶
func (*Service) DeleteInstance ¶
func (*Service) GetAllInstances ¶
func (*Service) GetDatabase ¶
func (*Service) GetDatabases ¶
func (*Service) GetInstance ¶
func (*Service) GetInstances ¶
func (*Service) GetStoredInstanceState ¶
func (*Service) InstanceCountForHost ¶
func (*Service) PopulateSpecDefaults ¶
func (*Service) UpdateDatabase ¶
func (*Service) UpdateDatabaseState ¶
func (*Service) UpdateInstance ¶
func (s *Service) UpdateInstance(ctx context.Context, opts *InstanceUpdateOptions) error
func (*Service) UpdateInstanceStatus ¶
type Spec ¶
type Spec struct {
DatabaseID string `json:"database_id"`
TenantID *string `json:"tenant_id,omitempty"`
DatabaseName string `json:"database_name"`
PostgresVersion string `json:"postgres_version"`
SpockVersion string `json:"spock_version"`
Port *int `json:"port"`
CPUs float64 `json:"cpus"`
MemoryBytes uint64 `json:"memory"`
Nodes []*Node `json:"nodes"`
DatabaseUsers []*User `json:"database_users"`
BackupConfig *BackupConfig `json:"backup_config"`
RestoreConfig *RestoreConfig `json:"restore_config"`
PostgreSQLConf map[string]any `json:"postgresql_conf"`
OrchestratorOpts *OrchestratorOpts `json:"orchestrator_opts,omitempty"`
}
func (*Spec) DefaultOptionalFieldsFrom ¶
DefaultOptionalFieldsFrom will default this spec's optional fields to the values from the given spec.
func (*Spec) NodeInstances ¶
func (s *Spec) NodeInstances() ([]*NodeInstances, error)
func (*Spec) NormalizeBackupConfig ¶
func (s *Spec) NormalizeBackupConfig()
NormalizeBackupConfig normalizes the backup config so that its defined per-node rather than at the database level. This is useful as a preliminary step if we need to modify the backup configs on the user's behalf.
func (*Spec) RemoveBackupConfigFrom ¶
RemoveBackupConfigFrom removes backup configuration from the given nodes. It normalizes the backup configuration first to ensure that only the given nodes are affected.
func (*Spec) ValidateNodeNames ¶
type SpecStore ¶
type SpecStore struct {
// contains filtered or unexported fields
}
func (*SpecStore) Create ¶
func (s *SpecStore) Create(item *StoredSpec) storage.PutOp[*StoredSpec]
func (*SpecStore) Delete ¶
func (s *SpecStore) Delete(item *StoredSpec) storage.DeleteValueOp[*StoredSpec]
func (*SpecStore) GetAll ¶
func (s *SpecStore) GetAll() storage.GetMultipleOp[*StoredSpec]
func (*SpecStore) GetByKey ¶
func (s *SpecStore) GetByKey(databaseID string) storage.GetOp[*StoredSpec]
func (*SpecStore) GetByKeys ¶
func (s *SpecStore) GetByKeys(databaseIDs ...string) storage.GetMultipleOp[*StoredSpec]
func (*SpecStore) Update ¶
func (s *SpecStore) Update(item *StoredSpec) storage.PutOp[*StoredSpec]
type Store ¶
type Store struct {
Spec *SpecStore
Database *DatabaseStore
Instance *InstanceStore
InstanceStatus *InstanceStatusStore
// contains filtered or unexported fields
}
type StoredDatabase ¶
type StoredDatabase struct {
storage.StoredValue
DatabaseID string `json:"database_id"`
TenantID *string `json:"tenant_id,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
State DatabaseState `json:"state"`
}
type StoredInstance ¶
type StoredInstance struct {
storage.StoredValue
InstanceID string `json:"instance_id"`
DatabaseID string `json:"database_id"`
HostID string `json:"host_id"`
NodeName string `json:"node_name"`
State InstanceState `json:"state"`
CreatedAt time.Time `json:"created_at"`
UpdateAt time.Time `json:"updated_at"`
Error string `json:"error,omitempty"`
}
func NewStoredInstance ¶
func NewStoredInstance(opts *InstanceUpdateOptions) *StoredInstance
func (*StoredInstance) Update ¶
func (i *StoredInstance) Update(opts *InstanceUpdateOptions)
type StoredInstanceStatus ¶
type StoredInstanceStatus struct {
storage.StoredValue
DatabaseID string `json:"database_id"`
InstanceID string `json:"instance_id"`
Status *InstanceStatus `json:"status"`
}
type StoredSpec ¶
type StoredSpec struct {
storage.StoredValue
*Spec
}
type SubscriptionResource ¶
type SubscriptionResource struct {
SubscriberNode string `json:"subscriber_node"`
ProviderNode string `json:"provider_node"`
Disabled bool `json:"disabled"`
SyncStructure bool `json:"sync_structure"`
SyncData bool `json:"sync_data"`
ExtraDependencies []resource.Identifier `json:"dependent_subscriptions"`
NeedsUpdate bool `json:"needs_update"`
}
func (*SubscriptionResource) AddDependentResource ¶
func (s *SubscriptionResource) AddDependentResource(dep resource.Identifier)
func (*SubscriptionResource) Dependencies ¶
func (s *SubscriptionResource) Dependencies() []resource.Identifier
func (*SubscriptionResource) DiffIgnore ¶
func (s *SubscriptionResource) DiffIgnore() []string
func (*SubscriptionResource) Executor ¶
func (s *SubscriptionResource) Executor() resource.Executor
func (*SubscriptionResource) Identifier ¶
func (s *SubscriptionResource) Identifier() resource.Identifier
func (*SubscriptionResource) ResourceVersion ¶
func (s *SubscriptionResource) ResourceVersion() string
type SubscriptionStatus ¶
type SwarmOpts ¶
type SwarmOpts struct {
ExtraVolumes []ExtraVolumesSpec `json:"extra_volumes,omitempty"`
ExtraNetworks []ExtraNetworkSpec `json:"extra_networks,omitempty"`
ExtraLabels map[string]string `json:"extra_labels,omitempty"` // optional, used for custom labels on the swarm service
}
type SwitchoverResource ¶
type SwitchoverResource struct {
HostID string `json:"host_id"`
InstanceID string `json:"instance_id"`
TargetRole patroni.InstanceRole `json:"target_role"`
}
func (*SwitchoverResource) Dependencies ¶
func (s *SwitchoverResource) Dependencies() []resource.Identifier
func (*SwitchoverResource) DiffIgnore ¶
func (s *SwitchoverResource) DiffIgnore() []string
func (*SwitchoverResource) Executor ¶
func (s *SwitchoverResource) Executor() resource.Executor
func (*SwitchoverResource) Identifier ¶
func (s *SwitchoverResource) Identifier() resource.Identifier
func (*SwitchoverResource) ResourceVersion ¶
func (s *SwitchoverResource) ResourceVersion() string
type SyncEventResource ¶
type SyncEventResource struct {
ProviderNode string `json:"provider_node"`
SubscriberNode string `json:"subscriber_node"`
SyncEventLsn string `json:"sync_event_lsn"`
ExtraDependencies []resource.Identifier `json:"extra_dependencies"`
}
func (*SyncEventResource) Dependencies ¶
func (r *SyncEventResource) Dependencies() []resource.Identifier
func (*SyncEventResource) DiffIgnore ¶
func (r *SyncEventResource) DiffIgnore() []string
func (*SyncEventResource) Executor ¶
func (r *SyncEventResource) Executor() resource.Executor
func (*SyncEventResource) Identifier ¶
func (r *SyncEventResource) Identifier() resource.Identifier
func (*SyncEventResource) Refresh ¶
Confirm synchronization by sending sync_event from provider and waiting for it on subscriber
func (*SyncEventResource) ResourceVersion ¶
func (r *SyncEventResource) ResourceVersion() string
type User ¶
type User struct {
Username string `json:"username"`
Password string `json:"password"`
DBOwner bool `json:"db_owner,omitempty"`
Attributes []string `json:"attributes,omitempty"`
Roles []string `json:"roles,omitempty"`
}
func (*User) DefaultOptionalFieldsFrom ¶
DefaultOptionalFieldsFrom will default this user's optional fields to the values from the given user.
type ValidationResult ¶
type WaitForSyncEventResource ¶
type WaitForSyncEventResource struct {
SubscriberNode string `json:"subscriber_node"`
ProviderNode string `json:"provider_node"`
}
func (*WaitForSyncEventResource) Dependencies ¶
func (r *WaitForSyncEventResource) Dependencies() []resource.Identifier
func (*WaitForSyncEventResource) DiffIgnore ¶
func (r *WaitForSyncEventResource) DiffIgnore() []string
func (*WaitForSyncEventResource) Executor ¶
func (r *WaitForSyncEventResource) Executor() resource.Executor
func (*WaitForSyncEventResource) Identifier ¶
func (r *WaitForSyncEventResource) Identifier() resource.Identifier
func (*WaitForSyncEventResource) Refresh ¶
Confirm synchronization by sending sync_event from provider and waiting for it on subscriber
func (*WaitForSyncEventResource) ResourceVersion ¶
func (r *WaitForSyncEventResource) ResourceVersion() string
Source Files
¶
- connection.go
- database.go
- database_store.go
- instance.go
- instance_resource.go
- instance_status_store.go
- instance_store.go
- lag_tracker_commit_ts_resource.go
- node_resource.go
- orchestrator.go
- provide.go
- replication_slot_advance_from_cts_resource.go
- replication_slot_create_resource.go
- resources.go
- service.go
- spec.go
- spec_store.go
- status.go
- store.go
- subscription_resource.go
- switchover_resource.go
- sync_event_resource.go
- utils.go
- wait_for_sync_event_resource.go