core

package
v1.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2025 License: PostgreSQL Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TempOffset = 1000000
)

Variables

This section is empty.

Functions

func CheckDiffFileFormat

func CheckDiffFileFormat(filePath string, task *TableDiffTask) error

func RepsetDiff

func RepsetDiff(task *RepsetDiffCmd) (err error)

Types

type CompareRangesResult

type CompareRangesResult struct {
	Diffs     map[string]map[string][]map[string]any
	TotalDiff int
	Err       error
}

type CompareRangesWorkItem

type CompareRangesWorkItem struct {
	Node1  map[string]any
	Node2  map[string]any
	Ranges [][2][]any
}

type HashResult

type HashResult struct {
	// contains filtered or unexported fields
}

type HashTask

type HashTask struct {
	// contains filtered or unexported fields
}

type LeafHashResult

type LeafHashResult struct {
	BlockID int64
	Hash    []byte
	Err     error
}

type MerkleTreeTask

type MerkleTreeTask struct {
	types.Task
	types.DerivedFields

	QualifiedTableName string
	DBName             string
	Nodes              string

	Analyse           bool
	Rebalance         bool
	RecreateObjects   bool // TBD
	BlockSize         int
	MaxCpuRatio       float64
	BatchSize         int
	Output            string
	QuietMode         bool
	RangesFile        string
	WriteRanges       bool
	OverrideBlockSize bool
	Mode              string
	NoCDC             bool
	SkipDBUpdate      bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	DiffResult types.DiffOutput

	StartTime time.Time

	Ctx context.Context
	// contains filtered or unexported fields
}

func NewMerkleTreeTask

func NewMerkleTreeTask() *MerkleTreeTask

func (*MerkleTreeTask) BuildMtree

func (m *MerkleTreeTask) BuildMtree() (err error)

func (*MerkleTreeTask) CompareRanges

func (m *MerkleTreeTask) CompareRanges(workItems []CompareRangesWorkItem)

func (*MerkleTreeTask) DiffMtree

func (m *MerkleTreeTask) DiffMtree() (err error)

func (*MerkleTreeTask) GetClusterName

func (m *MerkleTreeTask) GetClusterName() string

func (*MerkleTreeTask) GetClusterNodes

func (m *MerkleTreeTask) GetClusterNodes() []map[string]any

func (*MerkleTreeTask) GetDBName

func (m *MerkleTreeTask) GetDBName() string

func (*MerkleTreeTask) GetNode

func (m *MerkleTreeTask) GetNode(nodeName string) (map[string]interface{}, error)

func (*MerkleTreeTask) GetNodeList

func (m *MerkleTreeTask) GetNodeList() []string

func (*MerkleTreeTask) GetNodes

func (m *MerkleTreeTask) GetNodes() string

func (*MerkleTreeTask) MtreeInit

func (m *MerkleTreeTask) MtreeInit() (err error)

func (*MerkleTreeTask) MtreeTeardown

func (m *MerkleTreeTask) MtreeTeardown() (err error)

func (*MerkleTreeTask) MtreeTeardownTable

func (m *MerkleTreeTask) MtreeTeardownTable() (err error)

func (*MerkleTreeTask) RunChecks

func (m *MerkleTreeTask) RunChecks(skipValidation bool) error

func (*MerkleTreeTask) SetClusterNodes

func (m *MerkleTreeTask) SetClusterNodes(cn []map[string]any)

func (*MerkleTreeTask) SetDBName

func (m *MerkleTreeTask) SetDBName(name string)

func (*MerkleTreeTask) SetDatabase

func (m *MerkleTreeTask) SetDatabase(db types.Database)

func (*MerkleTreeTask) SetNodeList

func (m *MerkleTreeTask) SetNodeList(nl []string)

func (*MerkleTreeTask) UpdateMtree

func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error)

func (*MerkleTreeTask) Validate

func (m *MerkleTreeTask) Validate() error

type NodeComparisonReport

type NodeComparisonReport struct {
	Status string              `json:"status"`
	Diffs  map[string]NodeDiff `json:"diffs,omitempty"`
}

type NodeDiff

type NodeDiff struct {
	MissingObjects SchemaObjects `json:"missing_objects"`
	ExtraObjects   SchemaObjects `json:"extra_objects"`
}

type NodeSchemaReport

type NodeSchemaReport struct {
	NodeName string        `json:"node_name"`
	Objects  SchemaObjects `json:"objects"`
}

type Range

type Range struct {
	Start any
	End   any
}

type RangeResults

type RangeResults map[string]HashResult

type RecursiveDiffTask

type RecursiveDiffTask struct {
	Node1Name                 string
	Node2Name                 string
	CurrentRange              Range
	CurrentEstimatedBlockSize int
}

type RepairReport

type RepairReport struct {
	OperationType           string         `json:"operation_type"`
	Mode                    string         `json:"mode"`
	Timestamp               string         `json:"time_stamp"`
	SuppliedArgs            map[string]any `json:"supplied_args"`
	DatabaseCredentialsUsed types.Database `json:"database_credentials_used"`
	Changes                 map[string]any `json:"changes"`
	RunTimeSeconds          float64        `json:"run_time,omitempty"`
}

type RepsetDiffCmd

type RepsetDiffCmd struct {
	types.Task

	ClusterName string
	DBName      string
	RepsetName  string
	Nodes       string
	Quiet       bool
	SkipTables  string
	SkipFile    string

	ConnectionPool    *pgxpool.Pool
	ConcurrencyFactor int
	BlockSize         int
	CompareUnitSize   int
	Output            string
	TableFilter       string
	OverrideBlockSize bool
	Ctx               context.Context

	SkipDBUpdate  bool
	TaskStore     *taskstore.Store
	TaskStorePath string
	// contains filtered or unexported fields
}

func NewRepsetDiffTask added in v1.3.5

func NewRepsetDiffTask() *RepsetDiffCmd

func (*RepsetDiffCmd) CloneForSchedule added in v1.3.5

func (task *RepsetDiffCmd) CloneForSchedule(ctx context.Context) *RepsetDiffCmd

func (*RepsetDiffCmd) GetClusterName

func (c *RepsetDiffCmd) GetClusterName() string

func (*RepsetDiffCmd) GetClusterNodes

func (c *RepsetDiffCmd) GetClusterNodes() []map[string]any

func (*RepsetDiffCmd) GetDBName

func (c *RepsetDiffCmd) GetDBName() string

func (*RepsetDiffCmd) GetNodeList

func (c *RepsetDiffCmd) GetNodeList() []string

func (*RepsetDiffCmd) GetNodes

func (c *RepsetDiffCmd) GetNodes() string

func (*RepsetDiffCmd) RunChecks

func (c *RepsetDiffCmd) RunChecks(skipValidation bool) error

func (*RepsetDiffCmd) SetClusterNodes

func (c *RepsetDiffCmd) SetClusterNodes(cn []map[string]any)

func (*RepsetDiffCmd) SetDBName

func (c *RepsetDiffCmd) SetDBName(name string)

func (*RepsetDiffCmd) SetDatabase

func (c *RepsetDiffCmd) SetDatabase(db types.Database)

func (*RepsetDiffCmd) SetNodeList

func (c *RepsetDiffCmd) SetNodeList(nodes []string)

func (*RepsetDiffCmd) Validate

func (c *RepsetDiffCmd) Validate() error

type SchemaDiffCmd

type SchemaDiffCmd struct {
	types.Task

	ClusterName string
	DBName      string
	SchemaName  string
	Nodes       string
	Quiet       bool
	SkipTables  string
	SkipFile    string
	DDLOnly     bool

	ConnectionPool    *pgxpool.Pool
	ConcurrencyFactor int
	BlockSize         int
	CompareUnitSize   int
	Output            string
	TableFilter       string
	OverrideBlockSize bool
	Ctx               context.Context

	SkipDBUpdate  bool
	TaskStore     *taskstore.Store
	TaskStorePath string
	// contains filtered or unexported fields
}

func NewSchemaDiffTask added in v1.3.5

func NewSchemaDiffTask() *SchemaDiffCmd

func (*SchemaDiffCmd) CloneForSchedule added in v1.3.5

func (task *SchemaDiffCmd) CloneForSchedule(ctx context.Context) *SchemaDiffCmd

func (*SchemaDiffCmd) GetClusterName

func (c *SchemaDiffCmd) GetClusterName() string

func (*SchemaDiffCmd) GetClusterNodes

func (c *SchemaDiffCmd) GetClusterNodes() []map[string]any

func (*SchemaDiffCmd) GetDBName

func (c *SchemaDiffCmd) GetDBName() string

func (*SchemaDiffCmd) GetNodeList

func (c *SchemaDiffCmd) GetNodeList() []string

func (*SchemaDiffCmd) GetNodes

func (c *SchemaDiffCmd) GetNodes() string

func (*SchemaDiffCmd) RunChecks

func (c *SchemaDiffCmd) RunChecks(skipValidation bool) error

func (*SchemaDiffCmd) SchemaTableDiff

func (task *SchemaDiffCmd) SchemaTableDiff() (err error)

func (*SchemaDiffCmd) SetClusterNodes

func (c *SchemaDiffCmd) SetClusterNodes(cn []map[string]any)

func (*SchemaDiffCmd) SetDBName

func (c *SchemaDiffCmd) SetDBName(name string)

func (*SchemaDiffCmd) SetDatabase

func (c *SchemaDiffCmd) SetDatabase(db types.Database)

func (*SchemaDiffCmd) SetNodeList

func (c *SchemaDiffCmd) SetNodeList(nodes []string)

func (*SchemaDiffCmd) Validate

func (c *SchemaDiffCmd) Validate() error

type SchemaObjects

type SchemaObjects struct {
	Tables    []string `json:"tables"`
	Views     []string `json:"views"`
	Functions []string `json:"functions"`
	Indices   []string `json:"indices"`
}

func (SchemaObjects) IsEmpty

func (so SchemaObjects) IsEmpty() bool

type SpockDiffTask

type SpockDiffTask struct {
	types.Task
	types.DerivedFields
	DBName string
	Nodes  string
	Output string

	ClientRole string
	Pools      map[string]*pgxpool.Pool

	DiffResult   *types.SpockDiffOutput
	DiffFilePath string
	SkipDBUpdate bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	Ctx context.Context
}

SpockDiffTask defines the task for comparing spock metadata across nodes.

func NewSpockDiffTask

func NewSpockDiffTask() *SpockDiffTask

func (*SpockDiffTask) ExecuteTask

func (t *SpockDiffTask) ExecuteTask() (err error)

func (*SpockDiffTask) GetClusterName

func (t *SpockDiffTask) GetClusterName() string

Implement ClusterConfigProvider interface for SpockDiffTask

func (*SpockDiffTask) GetClusterNodes

func (t *SpockDiffTask) GetClusterNodes() []map[string]any

func (*SpockDiffTask) GetDBName

func (t *SpockDiffTask) GetDBName() string

func (*SpockDiffTask) GetNodeList

func (t *SpockDiffTask) GetNodeList() []string

func (*SpockDiffTask) GetNodes

func (t *SpockDiffTask) GetNodes() string

func (*SpockDiffTask) RunChecks

func (t *SpockDiffTask) RunChecks(skipValidation bool) error

func (*SpockDiffTask) SetClusterNodes

func (t *SpockDiffTask) SetClusterNodes(cn []map[string]any)

func (*SpockDiffTask) SetDBName

func (t *SpockDiffTask) SetDBName(name string)

func (*SpockDiffTask) SetDatabase

func (t *SpockDiffTask) SetDatabase(db types.Database)

func (*SpockDiffTask) SetNodeList

func (t *SpockDiffTask) SetNodeList(nl []string)

func (*SpockDiffTask) Validate

func (t *SpockDiffTask) Validate() error

type SpockNodeConfig

type SpockNodeConfig struct {
	NodeName      string                    `json:"node_name"`
	Subscriptions []types.SpockSubscription `json:"subscriptions"`
	RepSetInfo    []types.SpockRepSetInfo   `json:"rep_set_info"`
	Hints         []string                  `json:"hints"`
}

SpockNodeConfig aggregates all spock configuration for a single node.

type TableDiffTask

type TableDiffTask struct {
	types.Task
	types.DerivedFields
	QualifiedTableName string
	DBName             string
	Nodes              string

	BlockSize         int
	ConcurrencyFactor int
	Output            string
	TableFilter       string
	QuietMode         bool

	Mode              string
	OverrideBlockSize bool

	DiffFilePath string

	InvokeMethod string
	ClientRole   string

	DiffSummary map[string]string

	SkipDBUpdate bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	Pools map[string]*pgxpool.Pool

	CompareUnitSize int

	DiffResult types.DiffOutput

	Ctx context.Context
	// contains filtered or unexported fields
}

func NewTableDiffTask

func NewTableDiffTask() *TableDiffTask

func (*TableDiffTask) AddPrimaryKeyToDiffSummary

func (t *TableDiffTask) AddPrimaryKeyToDiffSummary()

func (*TableDiffTask) CheckColumnSize

func (t *TableDiffTask) CheckColumnSize() error

func (*TableDiffTask) CloneForSchedule added in v1.3.5

func (t *TableDiffTask) CloneForSchedule(ctx context.Context) *TableDiffTask

func (*TableDiffTask) ExecuteRerunTask

func (t *TableDiffTask) ExecuteRerunTask() error

func (*TableDiffTask) ExecuteTask

func (t *TableDiffTask) ExecuteTask() (err error)

func (*TableDiffTask) GetClusterName

func (t *TableDiffTask) GetClusterName() string

Implement ClusterConfigProvider interface for TableDiffTask

func (*TableDiffTask) GetClusterNodes

func (t *TableDiffTask) GetClusterNodes() []map[string]any

func (*TableDiffTask) GetDBName

func (t *TableDiffTask) GetDBName() string

func (*TableDiffTask) GetNodeList

func (t *TableDiffTask) GetNodeList() []string

func (*TableDiffTask) GetNodes

func (t *TableDiffTask) GetNodes() string

func (*TableDiffTask) RunChecks

func (t *TableDiffTask) RunChecks(skipValidation bool) error

func (*TableDiffTask) SetClusterNodes

func (t *TableDiffTask) SetClusterNodes(cn []map[string]any)

func (*TableDiffTask) SetDBName

func (t *TableDiffTask) SetDBName(name string)

func (*TableDiffTask) SetDatabase

func (t *TableDiffTask) SetDatabase(db types.Database)

func (*TableDiffTask) SetNodeList

func (t *TableDiffTask) SetNodeList(nl []string)

func (*TableDiffTask) Validate

func (t *TableDiffTask) Validate() error

type TableRepairTask

type TableRepairTask struct {
	types.Task
	types.DerivedFields

	QualifiedTableName string
	DBName             string
	Nodes              string

	DiffFilePath  string
	SourceOfTruth string

	QuietMode      bool
	DryRun         bool // TBD
	InsertOnly     bool
	UpsertOnly     bool
	FireTriggers   bool
	GenerateReport bool // TBD
	FixNulls       bool // TBD
	Bidirectional  bool // TBD

	InvokeMethod string // TBD
	ClientRole   string // TBD

	SkipDBUpdate bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	Pools map[string]*pgxpool.Pool

	RawDiffs types.DiffOutput

	Ctx context.Context
	// contains filtered or unexported fields
}

func NewTableRepairTask

func NewTableRepairTask() *TableRepairTask

func (*TableRepairTask) GetClusterName

func (tr *TableRepairTask) GetClusterName() string

Defining these getters and setters to satisfy ClusterConfigProvider interface

func (*TableRepairTask) GetClusterNodes

func (tr *TableRepairTask) GetClusterNodes() []map[string]any

func (*TableRepairTask) GetDBName

func (tr *TableRepairTask) GetDBName() string

func (*TableRepairTask) GetNodeList

func (tr *TableRepairTask) GetNodeList() []string

func (*TableRepairTask) GetNodes

func (tr *TableRepairTask) GetNodes() string

func (*TableRepairTask) Run

func (t *TableRepairTask) Run(skipValidation bool) (err error)

func (*TableRepairTask) SetClusterNodes

func (tr *TableRepairTask) SetClusterNodes(cn []map[string]any)

func (*TableRepairTask) SetDBName

func (tr *TableRepairTask) SetDBName(name string)

func (*TableRepairTask) SetDatabase

func (tr *TableRepairTask) SetDatabase(db types.Database)

func (*TableRepairTask) SetNodeList

func (tr *TableRepairTask) SetNodeList(nl []string)

func (*TableRepairTask) ValidateAndPrepare

func (t *TableRepairTask) ValidateAndPrepare() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL