connpostgres

package
v0.0.0-...-c6ec721 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: AGPL-3.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultSlotPrefix = "peerflow_slot_"

Variables

View Source
var ErrMismatchingRangeType = errors.New("mismatching range type")

Functions

func BuildQuery

func BuildQuery(logger log.Logger, query string, flowJobName string) (string, error)

func CTIDBlockPartitioningFunc

func CTIDBlockPartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)

CTIDBlockPartitioningFunc is a table partition strategy where partitions are created by dividing table blocks uniformly. Note that partition boundaries (block ranges) are uniform, but actual row distribution may be skewed due to table bloat, deleted tuples, or uneven data distribution across blocks.

func ComputeNumPartitions

func ComputeNumPartitions(ctx context.Context, pp PartitionParams, numRowsPerPartition int64) (int64, error)

ComputeNumPartitions computes the number of partitions given desired number of rows per partition, with automatic adjustment to respect the maximum partition limit. TODO: use estimated row count instead to speed up query execution on large tables

func CustomTypeToQKind

func CustomTypeToQKind(typeData shared.CustomDataType, version uint32) types.QValueKind

func GetDefaultPublicationName

func GetDefaultPublicationName(jobName string) string

func GetDefaultSlotName

func GetDefaultSlotName(jobName string) string

func MinMaxRangePartitioningFunc

func MinMaxRangePartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)

MinMaxRangePartitioningFunc is a table partition strategy where partitions are created by uniformly splitting the min/max value range. Note that partition boundaries are uniform, but actual row distribution may be skewed due to non-uniform data distribution, gaps in the value range, or deleted rows.

func NTileBucketPartitioningFunc

func NTileBucketPartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)

NTileBucketPartitioningFunc is a table partition implementation that divides rows into approximately equal-sized partitions based on row count. It uses the NTILE window function to assign rows to buckets and ensures more balanced row distribution across partitions.

func NewPgCopyPipe

func NewPgCopyPipe() (PgCopyReader, PgCopyWriter)

func NewPostgresConnFromConfig

func NewPostgresConnFromConfig(
	ctx context.Context,
	connConfig *pgx.ConnConfig,
	tlsHost string,
	rdsAuth *utils.RDSAuth,
	tunnel *utils.SSHTunnel,
) (*pgx.Conn, error)

func ParseConfig

func ParseConfig(connectionString string, pgConfig *protos.PostgresConfig) (*pgx.ConnConfig, error)

func PostgresOIDToQValueKind

func PostgresOIDToQValueKind(
	recvOID uint32,
	customTypeMapping map[uint32]shared.CustomDataType,
	typeMap *pgtype.Map,
	version uint32,
) (types.QValueKind, error)

func PullCdcRecords

func PullCdcRecords[Items model.Items](
	ctx context.Context,
	p *PostgresCDCSource,
	req *model.PullRecordsRequest[Items],
	processor replProcessor[Items],
	replLock *sync.Mutex,
) error

PullCdcRecords pulls records from req's cdc stream

Types

type NullableLSN

type NullableLSN struct {
	pglogrepl.LSN
	Null bool
}

type PartitionParams

type PartitionParams struct {
	// contains filtered or unexported fields
}

type PartitioningFunc

type PartitioningFunc func(context.Context, PartitionParams) ([]*protos.QRepPartition, error)

type PgCopyReader

type PgCopyReader struct {
	*io.PipeReader
	// contains filtered or unexported fields
}

func (PgCopyReader) CopyInto

func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error)

func (PgCopyReader) GetColumnNames

func (p PgCopyReader) GetColumnNames() ([]string, error)

type PgCopyShared

type PgCopyShared struct {
	// contains filtered or unexported fields
}

type PgCopyWriter

type PgCopyWriter struct {
	*io.PipeWriter
	// contains filtered or unexported fields
}

func (PgCopyWriter) Close

func (p PgCopyWriter) Close(err error)

func (PgCopyWriter) ExecuteQueryWithTx

func (p PgCopyWriter) ExecuteQueryWithTx(
	ctx context.Context,
	qe *QRepQueryExecutor,
	tx pgx.Tx,
	query string,
	args ...any,
) (int64, int64, error)

func (PgCopyWriter) SetSchema

func (p PgCopyWriter) SetSchema(schema []string)

type PostgresCDCConfig

type PostgresCDCConfig struct {
	CatalogPool                              shared.CatalogPool
	OtelManager                              *otel_metrics.OtelManager
	SrcTableIDNameMapping                    map[uint32]string
	TableNameMapping                         map[string]model.NameAndExclude
	TableNameSchemaMapping                   map[string]*protos.TableSchema
	RelationMessageMapping                   model.RelationMessageMapping
	FlowJobName                              string
	Slot                                     string
	Publication                              string
	HandleInheritanceForNonPartitionedTables bool
	SourceSchemaAsDestinationColumn          bool
	OriginMetaAsDestinationColumn            bool
	InternalVersion                          uint32
}

type PostgresCDCSource

type PostgresCDCSource struct {
	*PostgresConnector
	// contains filtered or unexported fields
}

type PostgresConnector

type PostgresConnector struct {
	Config *protos.PostgresConfig
	// contains filtered or unexported fields
}

func NewPostgresConnector

func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *protos.PostgresConfig) (*PostgresConnector, error)

func (*PostgresConnector) AddTablesToPublication

func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error

func (*PostgresConnector) CheckPublicationCreationPermissions

func (c *PostgresConnector) CheckPublicationCreationPermissions(ctx context.Context, srcTableNames []string) error

func (*PostgresConnector) CheckReplicationConnectivity

func (c *PostgresConnector) CheckReplicationConnectivity(ctx context.Context, env map[string]string) error

func (*PostgresConnector) CheckReplicationPermissions

func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error

func (*PostgresConnector) CheckSourceTables

func (c *PostgresConnector) CheckSourceTables(
	ctx context.Context,
	tableNames []*common.QualifiedTable,
	tableMappings []*protos.TableMapping,
	pubName string,
	noCDC bool,
) error

func (*PostgresConnector) CleanupSetupNormalizedTables

func (c *PostgresConnector) CleanupSetupNormalizedTables(ctx context.Context, tx any)

func (*PostgresConnector) Close

func (c *PostgresConnector) Close() error

Close closes all connections.

func (*PostgresConnector) Conn

func (c *PostgresConnector) Conn() *pgx.Conn

func (*PostgresConnector) ConnectionActive

func (c *PostgresConnector) ConnectionActive(ctx context.Context) error

ConnectionActive returns nil if the connection is active.

func (*PostgresConnector) CreatePublication

func (c *PostgresConnector) CreatePublication(
	ctx context.Context,
	srcTableNames []string,
	publication string,
) error

func (*PostgresConnector) CreateRawTable

CreateRawTable creates a raw table, implementing the Connector interface.

func (*PostgresConnector) CreateReplConn

func (c *PostgresConnector) CreateReplConn(ctx context.Context, env map[string]string) (*pgx.Conn, error)

func (*PostgresConnector) EnsurePullability

EnsurePullability ensures that a table is pullable, implementing the Connector interface.

func (*PostgresConnector) ExecuteCommand

func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error

func (*PostgresConnector) ExportTxSnapshot

func (c *PostgresConnector) ExportTxSnapshot(
	ctx context.Context,
	_ string,
	env map[string]string,
) (*protos.ExportTxSnapshotOutput, any, error)

func (*PostgresConnector) FinishExport

func (c *PostgresConnector) FinishExport(tx any) error

func (*PostgresConnector) FinishSetupNormalizedTables

func (c *PostgresConnector) FinishSetupNormalizedTables(ctx context.Context, tx any) error

func (*PostgresConnector) GetAllTables

func (*PostgresConnector) GetColumns

func (c *PostgresConnector) GetColumns(ctx context.Context, version uint32, schema string, table string) (*protos.TableColumnsResponse, error)

func (*PostgresConnector) GetDatabaseVariant

func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)

func (*PostgresConnector) GetLastNormalizeBatchID

func (c *PostgresConnector) GetLastNormalizeBatchID(ctx context.Context, jobName string) (int64, error)

func (*PostgresConnector) GetLastOffset

func (c *PostgresConnector) GetLastOffset(ctx context.Context, jobName string) (model.CdcCheckpoint, error)

GetLastOffset returns the last synced offset for a job.

func (*PostgresConnector) GetLastSyncBatchID

func (c *PostgresConnector) GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error)

func (*PostgresConnector) GetMaxValue

func (c *PostgresConnector) GetMaxValue(
	ctx context.Context,
	config *protos.QRepConfig,
	last *protos.QRepPartition,
) (any, error)

func (*PostgresConnector) GetQRepPartitions

func (c *PostgresConnector) GetQRepPartitions(
	ctx context.Context,
	config *protos.QRepConfig,
	last *protos.QRepPartition,
) ([]*protos.QRepPartition, error)

func (*PostgresConnector) GetSchemaNameOfColumnTypeByOID

func (c *PostgresConnector) GetSchemaNameOfColumnTypeByOID(ctx context.Context, typeOIDs []uint32) (map[uint32]string, error)

GetSchemaNameOfColumnTypeByOID returns a map of type OID to schema name for the given OIDs. If the type is in the "pg_catalog" schema, it returns an empty string for that OID.

func (*PostgresConnector) GetSchemas

func (*PostgresConnector) GetSelectedColumns

func (c *PostgresConnector) GetSelectedColumns(
	ctx context.Context,
	sourceTable *common.QualifiedTable,
	excludedColumns []string,
) ([]string, error)

func (*PostgresConnector) GetSlotInfo

func (c *PostgresConnector) GetSlotInfo(
	ctx context.Context,
	slotName string,
	peerdbManagedOnly bool,
	customSlotNames []string,
) ([]*protos.SlotInfo, error)

GetSlotInfo gets the information about the replication slot size and LSNs. If slotName is non-empty, only that slot is returned. If slotName is empty and peerdbManagedOnly is false, all slots in the database are returned. If slotName is empty and peerdbManagedOnly is true, only slots with the peerflow_slot_ prefix (plus any explicitly listed customSlotNames) are returned.

func (*PostgresConnector) GetTableSchema

func (c *PostgresConnector) GetTableSchema(
	ctx context.Context,
	env map[string]string,
	version uint32,
	system protos.TypeSystem,
	tableMapping []*protos.TableMapping,
) (map[string]*protos.TableSchema, error)

func (*PostgresConnector) GetTableSizeEstimatedBytes

func (c *PostgresConnector) GetTableSizeEstimatedBytes(ctx context.Context, tableIdentifier string) (int64, error)

func (*PostgresConnector) GetTablesFromPublication

func (c *PostgresConnector) GetTablesFromPublication(
	ctx context.Context,
	publicationName string,
	excludedTables []*common.QualifiedTable,
) ([]*common.QualifiedTable, error)

func (*PostgresConnector) GetTablesInSchema

func (c *PostgresConnector) GetTablesInSchema(
	ctx context.Context, schema string, cdcEnabled bool,
) (*protos.SchemaTablesResponse, error)

func (*PostgresConnector) GetVersion

func (c *PostgresConnector) GetVersion(ctx context.Context) (string, error)

func (*PostgresConnector) HandleSlotInfo

func (c *PostgresConnector) HandleSlotInfo(
	ctx context.Context,
	alerter *alerting.Alerter,
	catalogPool shared.CatalogPool,
	alertKeys *alerting.AlertKeys,
	slotMetricGauges otel_metrics.SlotMetricGauges,
) error

func (*PostgresConnector) IsInRecovery

func (c *PostgresConnector) IsInRecovery(ctx context.Context) (bool, error)

func (*PostgresConnector) IsQRepPartitionSynced

func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context,
	req *protos.IsQRepPartitionSyncedInput,
) (bool, error)

IsQRepPartitionSynced checks whether a specific partition is synced

func (*PostgresConnector) MajorVersion

func (c *PostgresConnector) MajorVersion(ctx context.Context) (shared.PGVersion, error)

func (*PostgresConnector) MaybeStartReplication

func (c *PostgresConnector) MaybeStartReplication(
	ctx context.Context,
	slotName string,
	publicationName string,
	lastOffset int64,
	pgVersion shared.PGVersion,
) error

func (*PostgresConnector) NeedsSetupMetadataTables

func (c *PostgresConnector) NeedsSetupMetadataTables(ctx context.Context) (bool, error)

NeedsSetupMetadataTables returns true if the metadata tables need to be set up.

func (*PostgresConnector) NewPostgresCDCSource

func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error)

Create a new PostgresCDCSource

func (*PostgresConnector) NewQRepQueryExecutor

func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32,
	flowJobName string, partitionID string,
) (*QRepQueryExecutor, error)

func (*PostgresConnector) NewQRepQueryExecutorSnapshot

func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32,
	snapshot string, flowJobName string, partitionID string,
) (*QRepQueryExecutor, error)

func (*PostgresConnector) NormalizeRecords

func (*PostgresConnector) PullFlowCleanup

func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error

func (*PostgresConnector) PullPg

func (*PostgresConnector) PullPgQRepRecords

func (c *PostgresConnector) PullPgQRepRecords(
	ctx context.Context,
	_otelManager *otel_metrics.OtelManager,
	config *protos.QRepConfig,
	_dstType protos.DBType,
	partition *protos.QRepPartition,
	stream PgCopyWriter,
) (int64, int64, error)

func (*PostgresConnector) PullQRepRecords

func (c *PostgresConnector) PullQRepRecords(
	ctx context.Context,
	_otelManager *otel_metrics.OtelManager,
	config *protos.QRepConfig,
	dstType protos.DBType,
	partition *protos.QRepPartition,
	stream *model.QRecordStream,
) (int64, int64, error)

func (*PostgresConnector) PullRecords

func (c *PostgresConnector) PullRecords(
	ctx context.Context,
	catalogPool shared.CatalogPool,
	otelManager *otel_metrics.OtelManager,
	req *model.PullRecordsRequest[model.RecordItems],
) error

func (*PostgresConnector) PullXminPgRecordStream

func (c *PostgresConnector) PullXminPgRecordStream(
	ctx context.Context,
	config *protos.QRepConfig,
	_dstType protos.DBType,
	partition *protos.QRepPartition,
	pipe PgCopyWriter,
) (int64, int64, int64, error)

func (*PostgresConnector) PullXminRecordStream

func (c *PostgresConnector) PullXminRecordStream(
	ctx context.Context,
	config *protos.QRepConfig,
	dstType protos.DBType,
	partition *protos.QRepPartition,
	stream *model.QRecordStream,
) (int64, int64, int64, error)

func (*PostgresConnector) RemoveTableEntriesFromRawTable

func (c *PostgresConnector) RemoveTableEntriesFromRawTable(
	ctx context.Context,
	req *protos.RemoveTablesFromRawTableInput,
) error

func (*PostgresConnector) RemoveTablesFromPublication

func (c *PostgresConnector) RemoveTablesFromPublication(ctx context.Context, req *protos.RemoveTablesFromPublicationInput) error

func (*PostgresConnector) RenameTables

func (c *PostgresConnector) RenameTables(
	ctx context.Context,
	req *protos.RenameTablesInput,
	tableNameSchemaMapping map[string]*protos.TableSchema,
) (*protos.RenameTablesOutput, error)

func (*PostgresConnector) ReplPing

func (c *PostgresConnector) ReplPing(ctx context.Context) error

To keep connection alive between sync batches. By default postgres drops connection after 1 minute of inactivity.

func (*PostgresConnector) ReplayTableSchemaDeltas

func (c *PostgresConnector) ReplayTableSchemaDeltas(
	ctx context.Context,
	_ map[string]string,
	flowJobName string,
	_ []*protos.TableMapping,
	schemaDeltas []*protos.TableSchemaDelta,
	_ []string,
) error

replayTableSchemaDeltaCore changes a destination table to match the schema at source This could involve adding or dropping multiple columns.

func (*PostgresConnector) SetLastOffset

func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, lastOffset model.CdcCheckpoint) error

SetLastOffset updates the last synced offset for a job.

func (*PostgresConnector) SetupMetadataTables

func (c *PostgresConnector) SetupMetadataTables(ctx context.Context) error

SetupMetadataTables sets up the metadata tables.

func (*PostgresConnector) SetupNormalizedTable

func (c *PostgresConnector) SetupNormalizedTable(
	ctx context.Context,
	tx any,
	config *protos.SetupNormalizedTableBatchInput,
	tableIdentifier string,
	tableSchema *protos.TableSchema,
) (bool, error)

func (*PostgresConnector) SetupQRepMetadataTables

func (c *PostgresConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error

SetupQRepMetadataTables function for postgres connector

func (*PostgresConnector) SetupReplConn

func (c *PostgresConnector) SetupReplConn(ctx context.Context, env map[string]string) error

func (*PostgresConnector) SetupReplication

SetupReplication sets up replication for the source connector

func (*PostgresConnector) StartSetupNormalizedTables

func (c *PostgresConnector) StartSetupNormalizedTables(ctx context.Context) (any, error)

func (*PostgresConnector) StatActivity

func (*PostgresConnector) SyncFlowCleanup

func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string) error

func (*PostgresConnector) SyncPg

func (*PostgresConnector) SyncPgQRepRecords

func (c *PostgresConnector) SyncPgQRepRecords(
	ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	pipe PgCopyReader,
) (int64, shared.QRepWarnings, error)

func (*PostgresConnector) SyncQRepRecords

func (c *PostgresConnector) SyncQRepRecords(
	ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	stream *model.QRecordStream,
) (int64, shared.QRepWarnings, error)

func (*PostgresConnector) SyncRecords

func (*PostgresConnector) UpdateReplStateLastOffset

func (c *PostgresConnector) UpdateReplStateLastOffset(_ context.Context, lastOffset model.CdcCheckpoint) error

func (*PostgresConnector) ValidateCheck

func (c *PostgresConnector) ValidateCheck(ctx context.Context) error

func (*PostgresConnector) ValidateMirrorSource

func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error

type QRepPullSink

type QRepPullSink interface {
	ExecuteQueryWithTx(context.Context, *QRepQueryExecutor, pgx.Tx, string, ...any) (int64, int64, error)
}

type QRepQueryExecutor

type QRepQueryExecutor struct {
	*PostgresConnector
	// contains filtered or unexported fields
}

func (*QRepQueryExecutor) ExecuteAndProcessQuery

func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
	ctx context.Context,
	query string,
	args ...any,
) (*model.QRecordBatch, error)

func (*QRepQueryExecutor) ExecuteAndProcessQueryStream

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
	ctx context.Context,
	stream *model.QRecordStream,
	dstType protos.DBType,
	query string,
	args ...any,
) (int64, int64, error)

func (*QRepQueryExecutor) ExecuteQuery

func (qe *QRepQueryExecutor) ExecuteQuery(ctx context.Context, query string, args ...any) (pgx.Rows, error)

func (*QRepQueryExecutor) ExecuteQueryIntoSink

func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(
	ctx context.Context,
	sink QRepPullSink,
	query string,
	args ...any,
) (int64, int64, error)

func (*QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin

func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
	ctx context.Context,
	sink QRepPullSink,
	query string,
	args ...any,
) (int64, int64, int64, error)

type QRepSyncSink

type QRepSyncSink interface {
	GetColumnNames() ([]string, error)
	CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error)
}

type RecordStreamSink

type RecordStreamSink struct {
	*model.QRecordStream
	DestinationType protos.DBType
}

func (RecordStreamSink) CopyInto

func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error)

func (RecordStreamSink) ExecuteQueryWithTx

func (stream RecordStreamSink) ExecuteQueryWithTx(
	ctx context.Context,
	qe *QRepQueryExecutor,
	tx pgx.Tx,
	query string,
	args ...any,
) (int64, int64, error)

func (RecordStreamSink) GetColumnNames

func (stream RecordStreamSink) GetColumnNames() ([]string, error)

type RelaxedNumberExtension

type RelaxedNumberExtension struct {
	jsoniter.DummyExtension
}

func (*RelaxedNumberExtension) CreateDecoder

func (extension *RelaxedNumberExtension) CreateDecoder(typ reflect2.Type) jsoniter.ValDecoder

type ReplState

type ReplState struct {
	Slot        string
	Publication string
	Offset      int64
	LastOffset  atomic.Int64
}

type ReplicaIdentityType

type ReplicaIdentityType rune
const (
	ReplicaIdentityDefault ReplicaIdentityType = 'd'
	ReplicaIdentityFull    ReplicaIdentityType = 'f'
	ReplicaIdentityIndex   ReplicaIdentityType = 'i'
	ReplicaIdentityNothing ReplicaIdentityType = 'n'
)

type SlotCheckResult

type SlotCheckResult struct {
	SlotExists        bool
	PublicationExists bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL