core

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: PostgreSQL Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TempOffset = 1000000
)

Variables

This section is empty.

Functions

func CheckDiffFileFormat

func CheckDiffFileFormat(filePath string, task *TableDiffTask) error

func RepsetDiff

func RepsetDiff(task *RepsetDiffCmd) (err error)

Types

type CompareRangesResult

type CompareRangesResult struct {
	Diffs     map[string]map[string][]map[string]any
	TotalDiff int
	Err       error
}

type CompareRangesWorkItem

type CompareRangesWorkItem struct {
	Node1  map[string]any
	Node2  map[string]any
	Ranges [][2][]any
}

type HashResult

type HashResult struct {
	// contains filtered or unexported fields
}

type HashTask

type HashTask struct {
	// contains filtered or unexported fields
}

type LeafHashResult

type LeafHashResult struct {
	BlockID int64
	Hash    []byte
	Err     error
}

type MerkleTreeTask

type MerkleTreeTask struct {
	types.Task
	types.DerivedFields

	QualifiedTableName string
	DBName             string
	Nodes              string
	ClientRole         string
	InvokeMethod       string

	Analyse           bool
	Rebalance         bool
	RecreateObjects   bool // TBD
	BlockSize         int
	MaxCpuRatio       float64
	BatchSize         int
	Output            string
	QuietMode         bool
	RangesFile        string
	WriteRanges       bool
	OverrideBlockSize bool
	Mode              string
	NoCDC             bool
	SkipDBUpdate      bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	DiffResult types.DiffOutput

	StartTime time.Time

	Ctx context.Context
	// contains filtered or unexported fields
}

func NewMerkleTreeTask

func NewMerkleTreeTask() *MerkleTreeTask

func (*MerkleTreeTask) BuildMtree

func (m *MerkleTreeTask) BuildMtree() (err error)

func (*MerkleTreeTask) CompareRanges

func (m *MerkleTreeTask) CompareRanges(workItems []CompareRangesWorkItem)

func (*MerkleTreeTask) DiffMtree

func (m *MerkleTreeTask) DiffMtree() (err error)

func (*MerkleTreeTask) GetClusterName

func (m *MerkleTreeTask) GetClusterName() string

func (*MerkleTreeTask) GetClusterNodes

func (m *MerkleTreeTask) GetClusterNodes() []map[string]any

func (*MerkleTreeTask) GetDBName

func (m *MerkleTreeTask) GetDBName() string

func (*MerkleTreeTask) GetNode

func (m *MerkleTreeTask) GetNode(nodeName string) (map[string]interface{}, error)

func (*MerkleTreeTask) GetNodeList

func (m *MerkleTreeTask) GetNodeList() []string

func (*MerkleTreeTask) GetNodes

func (m *MerkleTreeTask) GetNodes() string

func (*MerkleTreeTask) MtreeInit

func (m *MerkleTreeTask) MtreeInit() (err error)

func (*MerkleTreeTask) MtreeTeardown

func (m *MerkleTreeTask) MtreeTeardown() (err error)

func (*MerkleTreeTask) MtreeTeardownTable

func (m *MerkleTreeTask) MtreeTeardownTable() (err error)

func (*MerkleTreeTask) RunChecks

func (m *MerkleTreeTask) RunChecks(skipValidation bool) error

func (*MerkleTreeTask) SetClusterNodes

func (m *MerkleTreeTask) SetClusterNodes(cn []map[string]any)

func (*MerkleTreeTask) SetDBName

func (m *MerkleTreeTask) SetDBName(name string)

func (*MerkleTreeTask) SetDatabase

func (m *MerkleTreeTask) SetDatabase(db types.Database)

func (*MerkleTreeTask) SetNodeList

func (m *MerkleTreeTask) SetNodeList(nl []string)

func (*MerkleTreeTask) UpdateMtree

func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error)

func (*MerkleTreeTask) Validate

func (m *MerkleTreeTask) Validate() error

type NodeComparisonReport

type NodeComparisonReport struct {
	Status string              `json:"status"`
	Diffs  map[string]NodeDiff `json:"diffs,omitempty"`
}

type NodeDiff

type NodeDiff struct {
	MissingObjects SchemaObjects `json:"missing_objects"`
	ExtraObjects   SchemaObjects `json:"extra_objects"`
}

type NodeSchemaReport

type NodeSchemaReport struct {
	NodeName string        `json:"node_name"`
	Objects  SchemaObjects `json:"objects"`
}

type Range

type Range struct {
	Start any
	End   any
}

type RangeResults

type RangeResults map[string]HashResult

type RecursiveDiffTask

type RecursiveDiffTask struct {
	Node1Name                 string
	Node2Name                 string
	CurrentRange              Range
	CurrentEstimatedBlockSize int
}

type RepairReport

type RepairReport struct {
	OperationType           string         `json:"operation_type"`
	Mode                    string         `json:"mode"`
	Timestamp               string         `json:"time_stamp"`
	SuppliedArgs            map[string]any `json:"supplied_args"`
	DatabaseCredentialsUsed types.Database `json:"database_credentials_used"`
	Changes                 map[string]any `json:"changes"`
	RunTimeSeconds          float64        `json:"run_time,omitempty"`
}

type RepsetDiffCmd

type RepsetDiffCmd struct {
	types.Task

	ClusterName string
	DBName      string
	RepsetName  string
	Nodes       string
	Quiet       bool
	SkipTables  string
	SkipFile    string

	ConnectionPool    *pgxpool.Pool
	ConcurrencyFactor int
	BlockSize         int
	CompareUnitSize   int
	Output            string
	TableFilter       string
	OverrideBlockSize bool
	Ctx               context.Context

	ClientRole   string
	InvokeMethod string

	SkipDBUpdate  bool
	TaskStore     *taskstore.Store
	TaskStorePath string
	// contains filtered or unexported fields
}

func NewRepsetDiffTask added in v1.3.5

func NewRepsetDiffTask() *RepsetDiffCmd

func (*RepsetDiffCmd) CloneForSchedule added in v1.3.5

func (task *RepsetDiffCmd) CloneForSchedule(ctx context.Context) *RepsetDiffCmd

func (*RepsetDiffCmd) GetClusterName

func (c *RepsetDiffCmd) GetClusterName() string

func (*RepsetDiffCmd) GetClusterNodes

func (c *RepsetDiffCmd) GetClusterNodes() []map[string]any

func (*RepsetDiffCmd) GetDBName

func (c *RepsetDiffCmd) GetDBName() string

func (*RepsetDiffCmd) GetNodeList

func (c *RepsetDiffCmd) GetNodeList() []string

func (*RepsetDiffCmd) GetNodes

func (c *RepsetDiffCmd) GetNodes() string

func (*RepsetDiffCmd) RunChecks

func (c *RepsetDiffCmd) RunChecks(skipValidation bool) error

func (*RepsetDiffCmd) SetClusterNodes

func (c *RepsetDiffCmd) SetClusterNodes(cn []map[string]any)

func (*RepsetDiffCmd) SetDBName

func (c *RepsetDiffCmd) SetDBName(name string)

func (*RepsetDiffCmd) SetDatabase

func (c *RepsetDiffCmd) SetDatabase(db types.Database)

func (*RepsetDiffCmd) SetNodeList

func (c *RepsetDiffCmd) SetNodeList(nodes []string)

func (*RepsetDiffCmd) Validate

func (c *RepsetDiffCmd) Validate() error

type SchemaDiffCmd

type SchemaDiffCmd struct {
	types.Task

	ClusterName string
	DBName      string
	SchemaName  string
	Nodes       string
	Quiet       bool
	SkipTables  string
	SkipFile    string
	DDLOnly     bool

	ConnectionPool    *pgxpool.Pool
	ConcurrencyFactor int
	BlockSize         int
	CompareUnitSize   int
	Output            string
	TableFilter       string
	OverrideBlockSize bool
	Ctx               context.Context

	SkipDBUpdate  bool
	TaskStore     *taskstore.Store
	TaskStorePath string
	// contains filtered or unexported fields
}

func NewSchemaDiffTask added in v1.3.5

func NewSchemaDiffTask() *SchemaDiffCmd

func (*SchemaDiffCmd) CloneForSchedule added in v1.3.5

func (task *SchemaDiffCmd) CloneForSchedule(ctx context.Context) *SchemaDiffCmd

func (*SchemaDiffCmd) GetClusterName

func (c *SchemaDiffCmd) GetClusterName() string

func (*SchemaDiffCmd) GetClusterNodes

func (c *SchemaDiffCmd) GetClusterNodes() []map[string]any

func (*SchemaDiffCmd) GetDBName

func (c *SchemaDiffCmd) GetDBName() string

func (*SchemaDiffCmd) GetNodeList

func (c *SchemaDiffCmd) GetNodeList() []string

func (*SchemaDiffCmd) GetNodes

func (c *SchemaDiffCmd) GetNodes() string

func (*SchemaDiffCmd) RunChecks

func (c *SchemaDiffCmd) RunChecks(skipValidation bool) error

func (*SchemaDiffCmd) SchemaTableDiff

func (task *SchemaDiffCmd) SchemaTableDiff() (err error)

func (*SchemaDiffCmd) SetClusterNodes

func (c *SchemaDiffCmd) SetClusterNodes(cn []map[string]any)

func (*SchemaDiffCmd) SetDBName

func (c *SchemaDiffCmd) SetDBName(name string)

func (*SchemaDiffCmd) SetDatabase

func (c *SchemaDiffCmd) SetDatabase(db types.Database)

func (*SchemaDiffCmd) SetNodeList

func (c *SchemaDiffCmd) SetNodeList(nodes []string)

func (*SchemaDiffCmd) Validate

func (c *SchemaDiffCmd) Validate() error

type SchemaObjects

type SchemaObjects struct {
	Tables    []string `json:"tables"`
	Views     []string `json:"views"`
	Functions []string `json:"functions"`
	Indices   []string `json:"indices"`
}

func (SchemaObjects) IsEmpty

func (so SchemaObjects) IsEmpty() bool

type SpockDiffTask

type SpockDiffTask struct {
	types.Task
	types.DerivedFields
	DBName string
	Nodes  string
	Output string

	ClientRole   string
	InvokeMethod string
	Pools        map[string]*pgxpool.Pool

	DiffResult   *types.SpockDiffOutput
	DiffFilePath string
	SkipDBUpdate bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	Ctx context.Context
}

SpockDiffTask defines the task for comparing spock metadata across nodes.

func NewSpockDiffTask

func NewSpockDiffTask() *SpockDiffTask

func (*SpockDiffTask) ExecuteTask

func (t *SpockDiffTask) ExecuteTask() (err error)

func (*SpockDiffTask) GetClusterName

func (t *SpockDiffTask) GetClusterName() string

Implement ClusterConfigProvider interface for SpockDiffTask

func (*SpockDiffTask) GetClusterNodes

func (t *SpockDiffTask) GetClusterNodes() []map[string]any

func (*SpockDiffTask) GetDBName

func (t *SpockDiffTask) GetDBName() string

func (*SpockDiffTask) GetNodeList

func (t *SpockDiffTask) GetNodeList() []string

func (*SpockDiffTask) GetNodes

func (t *SpockDiffTask) GetNodes() string

func (*SpockDiffTask) RunChecks

func (t *SpockDiffTask) RunChecks(skipValidation bool) error

func (*SpockDiffTask) SetClusterNodes

func (t *SpockDiffTask) SetClusterNodes(cn []map[string]any)

func (*SpockDiffTask) SetDBName

func (t *SpockDiffTask) SetDBName(name string)

func (*SpockDiffTask) SetDatabase

func (t *SpockDiffTask) SetDatabase(db types.Database)

func (*SpockDiffTask) SetNodeList

func (t *SpockDiffTask) SetNodeList(nl []string)

func (*SpockDiffTask) Validate

func (t *SpockDiffTask) Validate() error

type SpockNodeConfig

type SpockNodeConfig struct {
	NodeName      string                    `json:"node_name"`
	Subscriptions []types.SpockSubscription `json:"subscriptions"`
	RepSetInfo    []types.SpockRepSetInfo   `json:"rep_set_info"`
	Hints         []string                  `json:"hints"`
}

SpockNodeConfig aggregates all spock configuration for a single node.

type TableDiffTask

type TableDiffTask struct {
	types.Task
	types.DerivedFields
	QualifiedTableName string
	DBName             string
	Nodes              string

	BaseTable           string
	FilteredViewName    string
	FilteredViewCreated bool

	BlockSize         int
	ConcurrencyFactor int
	Output            string
	TableFilter       string
	QuietMode         bool

	Mode              string
	OverrideBlockSize bool

	DiffFilePath string

	InvokeMethod string
	ClientRole   string

	DiffSummary map[string]string

	SkipDBUpdate bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	Pools map[string]*pgxpool.Pool

	CompareUnitSize int
	MaxDiffRows     int64

	DiffResult types.DiffOutput

	Ctx context.Context
	// contains filtered or unexported fields
}

func NewTableDiffTask

func NewTableDiffTask() *TableDiffTask

func (*TableDiffTask) AddPrimaryKeyToDiffSummary

func (t *TableDiffTask) AddPrimaryKeyToDiffSummary()

func (*TableDiffTask) CheckColumnSize

func (t *TableDiffTask) CheckColumnSize() error

func (*TableDiffTask) CloneForSchedule added in v1.3.5

func (t *TableDiffTask) CloneForSchedule(ctx context.Context) *TableDiffTask

func (*TableDiffTask) ExecuteRerunTask

func (t *TableDiffTask) ExecuteRerunTask() error

func (*TableDiffTask) ExecuteTask

func (t *TableDiffTask) ExecuteTask() (err error)

func (*TableDiffTask) GetClusterName

func (t *TableDiffTask) GetClusterName() string

Implement ClusterConfigProvider interface for TableDiffTask

func (*TableDiffTask) GetClusterNodes

func (t *TableDiffTask) GetClusterNodes() []map[string]any

func (*TableDiffTask) GetDBName

func (t *TableDiffTask) GetDBName() string

func (*TableDiffTask) GetNodeList

func (t *TableDiffTask) GetNodeList() []string

func (*TableDiffTask) GetNodes

func (t *TableDiffTask) GetNodes() string

func (*TableDiffTask) RunChecks

func (t *TableDiffTask) RunChecks(skipValidation bool) (err error)

func (*TableDiffTask) SetClusterNodes

func (t *TableDiffTask) SetClusterNodes(cn []map[string]any)

func (*TableDiffTask) SetDBName

func (t *TableDiffTask) SetDBName(name string)

func (*TableDiffTask) SetDatabase

func (t *TableDiffTask) SetDatabase(db types.Database)

func (*TableDiffTask) SetNodeList

func (t *TableDiffTask) SetNodeList(nl []string)

func (*TableDiffTask) Validate

func (t *TableDiffTask) Validate() error

type TableRepairTask

type TableRepairTask struct {
	types.Task
	types.DerivedFields

	QualifiedTableName string
	DBName             string
	Nodes              string

	DiffFilePath  string
	SourceOfTruth string

	QuietMode      bool
	DryRun         bool
	InsertOnly     bool
	UpsertOnly     bool
	FireTriggers   bool
	GenerateReport bool
	FixNulls       bool // TBD
	Bidirectional  bool

	InvokeMethod string // TBD
	ClientRole   string // TBD

	SkipDBUpdate bool

	TaskStore     *taskstore.Store
	TaskStorePath string

	Pools map[string]*pgxpool.Pool

	RawDiffs types.DiffOutput

	Ctx context.Context
	// contains filtered or unexported fields
}

func NewTableRepairTask

func NewTableRepairTask() *TableRepairTask

func (*TableRepairTask) GetClusterName

func (tr *TableRepairTask) GetClusterName() string

Defining these getters and setters to satisfy ClusterConfigProvider interface

func (*TableRepairTask) GetClusterNodes

func (tr *TableRepairTask) GetClusterNodes() []map[string]any

func (*TableRepairTask) GetDBName

func (tr *TableRepairTask) GetDBName() string

func (*TableRepairTask) GetNodeList

func (tr *TableRepairTask) GetNodeList() []string

func (*TableRepairTask) GetNodes

func (tr *TableRepairTask) GetNodes() string

func (*TableRepairTask) Run

func (t *TableRepairTask) Run(skipValidation bool) (err error)

func (*TableRepairTask) SetClusterNodes

func (tr *TableRepairTask) SetClusterNodes(cn []map[string]any)

func (*TableRepairTask) SetDBName

func (tr *TableRepairTask) SetDBName(name string)

func (*TableRepairTask) SetDatabase

func (tr *TableRepairTask) SetDatabase(db types.Database)

func (*TableRepairTask) SetNodeList

func (tr *TableRepairTask) SetNodeList(nl []string)

func (*TableRepairTask) ValidateAndPrepare

func (t *TableRepairTask) ValidateAndPrepare() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL