Documentation
¶
Index ¶
- Constants
- Variables
- func BuildQuery(logger log.Logger, query string, flowJobName string) (string, error)
- func CTIDBlockPartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)
- func ComputeNumPartitions(ctx context.Context, pp PartitionParams, numRowsPerPartition int64) (int64, error)
- func CustomTypeToQKind(typeData shared.CustomDataType, version uint32) types.QValueKind
- func GetDefaultPublicationName(jobName string) string
- func GetDefaultSlotName(jobName string) string
- func MinMaxRangePartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)
- func NTileBucketPartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)
- func NewPgCopyPipe() (PgCopyReader, PgCopyWriter)
- func NewPostgresConnFromConfig(ctx context.Context, connConfig *pgx.ConnConfig, tlsHost string, ...) (*pgx.Conn, error)
- func ParseConfig(connectionString string, pgConfig *protos.PostgresConfig) (*pgx.ConnConfig, error)
- func PostgresOIDToQValueKind(recvOID uint32, customTypeMapping map[uint32]shared.CustomDataType, ...) (types.QValueKind, error)
- func PullCdcRecords[Items model.Items](ctx context.Context, p *PostgresCDCSource, ...) error
- type NullableLSN
- type PartitionParams
- type PartitioningFunc
- type PgCopyReader
- type PgCopyShared
- type PgCopyWriter
- type PostgresCDCConfig
- type PostgresCDCSource
- type PostgresConnector
- func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error
- func (c *PostgresConnector) CheckPublicationCreationPermissions(ctx context.Context, srcTableNames []string) error
- func (c *PostgresConnector) CheckReplicationConnectivity(ctx context.Context, env map[string]string) error
- func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error
- func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []*common.QualifiedTable, ...) error
- func (c *PostgresConnector) CleanupSetupNormalizedTables(ctx context.Context, tx any)
- func (c *PostgresConnector) Close() error
- func (c *PostgresConnector) Conn() *pgx.Conn
- func (c *PostgresConnector) ConnectionActive(ctx context.Context) error
- func (c *PostgresConnector) CreatePublication(ctx context.Context, srcTableNames []string, publication string) error
- func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)
- func (c *PostgresConnector) CreateReplConn(ctx context.Context, env map[string]string) (*pgx.Conn, error)
- func (c *PostgresConnector) EnsurePullability(ctx context.Context, req *protos.EnsurePullabilityBatchInput) (*protos.EnsurePullabilityBatchOutput, error)
- func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error
- func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context, _ string, env map[string]string) (*protos.ExportTxSnapshotOutput, any, error)
- func (c *PostgresConnector) FinishExport(tx any) error
- func (c *PostgresConnector) FinishSetupNormalizedTables(ctx context.Context, tx any) error
- func (c *PostgresConnector) GetAllTables(ctx context.Context) (*protos.AllTablesResponse, error)
- func (c *PostgresConnector) GetColumns(ctx context.Context, version uint32, schema string, table string) (*protos.TableColumnsResponse, error)
- func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)
- func (c *PostgresConnector) GetDefaultPartitionKeyForTables(ctx context.Context, input *protos.GetDefaultPartitionKeyForTablesInput) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
- func (c *PostgresConnector) GetLastNormalizeBatchID(ctx context.Context, jobName string) (int64, error)
- func (c *PostgresConnector) GetLastOffset(ctx context.Context, jobName string) (model.CdcCheckpoint, error)
- func (c *PostgresConnector) GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error)
- func (c *PostgresConnector) GetMaxValue(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) (any, error)
- func (c *PostgresConnector) GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)
- func (c *PostgresConnector) GetSchemaNameOfColumnTypeByOID(ctx context.Context, typeOIDs []uint32) (map[uint32]string, error)
- func (c *PostgresConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasResponse, error)
- func (c *PostgresConnector) GetSelectedColumns(ctx context.Context, sourceTable *common.QualifiedTable, ...) ([]string, error)
- func (c *PostgresConnector) GetSlotInfo(ctx context.Context, slotName string, peerdbManagedOnly bool, ...) ([]*protos.SlotInfo, error)
- func (c *PostgresConnector) GetTableSchema(ctx context.Context, env map[string]string, version uint32, ...) (map[string]*protos.TableSchema, error)
- func (c *PostgresConnector) GetTableSizeEstimatedBytes(ctx context.Context, tableIdentifier string) (int64, error)
- func (c *PostgresConnector) GetTablesFromPublication(ctx context.Context, publicationName string, ...) ([]*common.QualifiedTable, error)
- func (c *PostgresConnector) GetTablesInSchema(ctx context.Context, schema string, cdcEnabled bool) (*protos.SchemaTablesResponse, error)
- func (c *PostgresConnector) GetVersion(ctx context.Context) (string, error)
- func (c *PostgresConnector) HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool shared.CatalogPool, ...) error
- func (c *PostgresConnector) IsInRecovery(ctx context.Context) (bool, error)
- func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput) (bool, error)
- func (c *PostgresConnector) MajorVersion(ctx context.Context) (shared.PGVersion, error)
- func (c *PostgresConnector) MaybeStartReplication(ctx context.Context, slotName string, publicationName string, lastOffset int64, ...) error
- func (c *PostgresConnector) NeedsSetupMetadataTables(ctx context.Context) (bool, error)
- func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error)
- func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32, flowJobName string, ...) (*QRepQueryExecutor, error)
- func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32, snapshot string, ...) (*QRepQueryExecutor, error)
- func (c *PostgresConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error)
- func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error
- func (c *PostgresConnector) PullPg(ctx context.Context, catalogPool shared.CatalogPool, ...) error
- func (c *PostgresConnector) PullPgQRepRecords(ctx context.Context, _otelManager *otel_metrics.OtelManager, ...) (int64, int64, error)
- func (c *PostgresConnector) PullQRepRecords(ctx context.Context, _otelManager *otel_metrics.OtelManager, ...) (int64, int64, error)
- func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool shared.CatalogPool, ...) error
- func (c *PostgresConnector) PullXminPgRecordStream(ctx context.Context, config *protos.QRepConfig, _dstType protos.DBType, ...) (int64, int64, int64, error)
- func (c *PostgresConnector) PullXminRecordStream(ctx context.Context, config *protos.QRepConfig, dstType protos.DBType, ...) (int64, int64, int64, error)
- func (c *PostgresConnector) RemoveTableEntriesFromRawTable(ctx context.Context, req *protos.RemoveTablesFromRawTableInput) error
- func (c *PostgresConnector) RemoveTablesFromPublication(ctx context.Context, req *protos.RemoveTablesFromPublicationInput) error
- func (c *PostgresConnector) RenameTables(ctx context.Context, req *protos.RenameTablesInput, ...) (*protos.RenameTablesOutput, error)
- func (c *PostgresConnector) ReplPing(ctx context.Context) error
- func (c *PostgresConnector) ReplayTableSchemaDeltas(ctx context.Context, _ map[string]string, flowJobName string, ...) error
- func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, lastOffset model.CdcCheckpoint) error
- func (c *PostgresConnector) SetupMetadataTables(ctx context.Context) error
- func (c *PostgresConnector) SetupNormalizedTable(ctx context.Context, tx any, config *protos.SetupNormalizedTableBatchInput, ...) (bool, error)
- func (c *PostgresConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error
- func (c *PostgresConnector) SetupReplConn(ctx context.Context, env map[string]string) error
- func (c *PostgresConnector) SetupReplication(ctx context.Context, req *protos.SetupReplicationInput) (model.SetupReplicationResult, error)
- func (c *PostgresConnector) StartSetupNormalizedTables(ctx context.Context) (any, error)
- func (c *PostgresConnector) StatActivity(ctx context.Context, req *protos.PostgresPeerActivityInfoRequest) (*protos.PeerStatResponse, error)
- func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string) error
- func (c *PostgresConnector) SyncPg(ctx context.Context, req *model.SyncRecordsRequest[model.PgItems]) (*model.SyncResponse, error)
- func (c *PostgresConnector) SyncPgQRepRecords(ctx context.Context, config *protos.QRepConfig, ...) (int64, shared.QRepWarnings, error)
- func (c *PostgresConnector) SyncQRepRecords(ctx context.Context, config *protos.QRepConfig, ...) (int64, shared.QRepWarnings, error)
- func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error)
- func (c *PostgresConnector) UpdateReplStateLastOffset(_ context.Context, lastOffset model.CdcCheckpoint) error
- func (c *PostgresConnector) ValidateCheck(ctx context.Context) error
- func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error
- type QRepPullSink
- type QRepQueryExecutor
- func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(ctx context.Context, query string, args ...any) (*model.QRecordBatch, error)
- func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(ctx context.Context, stream *model.QRecordStream, dstType protos.DBType, ...) (int64, int64, error)
- func (qe *QRepQueryExecutor) ExecuteQuery(ctx context.Context, query string, args ...any) (pgx.Rows, error)
- func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(ctx context.Context, sink QRepPullSink, query string, args ...any) (int64, int64, error)
- func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(ctx context.Context, sink QRepPullSink, query string, args ...any) (int64, int64, int64, error)
- type QRepSyncSink
- type RecordStreamSink
- func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error)
- func (stream RecordStreamSink) ExecuteQueryWithTx(ctx context.Context, qe *QRepQueryExecutor, tx pgx.Tx, query string, ...) (int64, int64, error)
- func (stream RecordStreamSink) GetColumnNames() ([]string, error)
- type RelaxedNumberExtension
- type ReplState
- type ReplicaIdentityType
- type SlotCheckResult
Constants ¶
const DefaultSlotPrefix = "peerflow_slot_"
Variables ¶
var ErrMismatchingRangeType = errors.New("mismatching range type")
Functions ¶
func BuildQuery ¶
func CTIDBlockPartitioningFunc ¶
func CTIDBlockPartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)
CTIDBlockPartitioningFunc is a table partition strategy where partitions are created by dividing table blocks uniformly. Note that partition boundaries (block ranges) are uniform, but actual row distribution may be skewed due to table bloat, deleted tuples, or uneven data distribution across blocks.
func ComputeNumPartitions ¶
func ComputeNumPartitions(ctx context.Context, pp PartitionParams, numRowsPerPartition int64) (int64, error)
ComputeNumPartitions computes the number of partitions given desired number of rows per partition, with automatic adjustment to respect the maximum partition limit. TODO: use estimated row count instead to speed up query execution on large tables
func CustomTypeToQKind ¶
func CustomTypeToQKind(typeData shared.CustomDataType, version uint32) types.QValueKind
func GetDefaultSlotName ¶
func MinMaxRangePartitioningFunc ¶
func MinMaxRangePartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)
MinMaxRangePartitioningFunc is a table partition strategy where partitions are created by uniformly splitting the min/max value range. Note that partition boundaries are uniform, but actual row distribution may be skewed due to non-uniform data distribution, gaps in the value range, or deleted rows.
func NTileBucketPartitioningFunc ¶
func NTileBucketPartitioningFunc(ctx context.Context, pp PartitionParams) ([]*protos.QRepPartition, error)
NTileBucketPartitioningFunc is a table partition implementation that divides rows into approximately equal-sized partitions based on row count. It uses the NTILE window function to assign rows to buckets and ensures more balanced row distribution across partitions.
func NewPgCopyPipe ¶
func NewPgCopyPipe() (PgCopyReader, PgCopyWriter)
func ParseConfig ¶
func ParseConfig(connectionString string, pgConfig *protos.PostgresConfig) (*pgx.ConnConfig, error)
func PostgresOIDToQValueKind ¶
func PostgresOIDToQValueKind( recvOID uint32, customTypeMapping map[uint32]shared.CustomDataType, typeMap *pgtype.Map, version uint32, ) (types.QValueKind, error)
func PullCdcRecords ¶
func PullCdcRecords[Items model.Items]( ctx context.Context, p *PostgresCDCSource, req *model.PullRecordsRequest[Items], processor replProcessor[Items], replLock *sync.Mutex, ) error
PullCdcRecords pulls records from req's cdc stream
Types ¶
type NullableLSN ¶
type PartitionParams ¶
type PartitionParams struct {
// contains filtered or unexported fields
}
type PartitioningFunc ¶
type PartitioningFunc func(context.Context, PartitionParams) ([]*protos.QRepPartition, error)
type PgCopyReader ¶
type PgCopyReader struct {
*io.PipeReader
// contains filtered or unexported fields
}
func (PgCopyReader) CopyInto ¶
func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error)
func (PgCopyReader) GetColumnNames ¶
func (p PgCopyReader) GetColumnNames() ([]string, error)
type PgCopyShared ¶
type PgCopyShared struct {
// contains filtered or unexported fields
}
type PgCopyWriter ¶
type PgCopyWriter struct {
*io.PipeWriter
// contains filtered or unexported fields
}
func (PgCopyWriter) Close ¶
func (p PgCopyWriter) Close(err error)
func (PgCopyWriter) ExecuteQueryWithTx ¶
func (PgCopyWriter) SetSchema ¶
func (p PgCopyWriter) SetSchema(schema []string)
type PostgresCDCConfig ¶
type PostgresCDCConfig struct {
CatalogPool shared.CatalogPool
OtelManager *otel_metrics.OtelManager
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
TableNameSchemaMapping map[string]*protos.TableSchema
RelationMessageMapping model.RelationMessageMapping
FlowJobName string
Slot string
Publication string
HandleInheritanceForNonPartitionedTables bool
SourceSchemaAsDestinationColumn bool
OriginMetaAsDestinationColumn bool
InternalVersion uint32
}
type PostgresCDCSource ¶
type PostgresCDCSource struct {
*PostgresConnector
// contains filtered or unexported fields
}
type PostgresConnector ¶
type PostgresConnector struct {
Config *protos.PostgresConfig
// contains filtered or unexported fields
}
func NewPostgresConnector ¶
func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *protos.PostgresConfig) (*PostgresConnector, error)
func (*PostgresConnector) AddTablesToPublication ¶
func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error
func (*PostgresConnector) CheckPublicationCreationPermissions ¶
func (c *PostgresConnector) CheckPublicationCreationPermissions(ctx context.Context, srcTableNames []string) error
func (*PostgresConnector) CheckReplicationConnectivity ¶
func (*PostgresConnector) CheckReplicationPermissions ¶
func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error
func (*PostgresConnector) CheckSourceTables ¶
func (c *PostgresConnector) CheckSourceTables( ctx context.Context, tableNames []*common.QualifiedTable, tableMappings []*protos.TableMapping, pubName string, noCDC bool, ) error
func (*PostgresConnector) CleanupSetupNormalizedTables ¶
func (c *PostgresConnector) CleanupSetupNormalizedTables(ctx context.Context, tx any)
func (*PostgresConnector) Close ¶
func (c *PostgresConnector) Close() error
Close closes all connections.
func (*PostgresConnector) Conn ¶
func (c *PostgresConnector) Conn() *pgx.Conn
func (*PostgresConnector) ConnectionActive ¶
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error
ConnectionActive returns nil if the connection is active.
func (*PostgresConnector) CreatePublication ¶
func (*PostgresConnector) CreateRawTable ¶
func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)
CreateRawTable creates a raw table, implementing the Connector interface.
func (*PostgresConnector) CreateReplConn ¶
func (*PostgresConnector) EnsurePullability ¶
func (c *PostgresConnector) EnsurePullability( ctx context.Context, req *protos.EnsurePullabilityBatchInput, ) (*protos.EnsurePullabilityBatchOutput, error)
EnsurePullability ensures that a table is pullable, implementing the Connector interface.
func (*PostgresConnector) ExecuteCommand ¶
func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error
func (*PostgresConnector) ExportTxSnapshot ¶
func (c *PostgresConnector) ExportTxSnapshot( ctx context.Context, _ string, env map[string]string, ) (*protos.ExportTxSnapshotOutput, any, error)
func (*PostgresConnector) FinishExport ¶
func (c *PostgresConnector) FinishExport(tx any) error
func (*PostgresConnector) FinishSetupNormalizedTables ¶
func (c *PostgresConnector) FinishSetupNormalizedTables(ctx context.Context, tx any) error
func (*PostgresConnector) GetAllTables ¶
func (c *PostgresConnector) GetAllTables(ctx context.Context) (*protos.AllTablesResponse, error)
func (*PostgresConnector) GetColumns ¶
func (c *PostgresConnector) GetColumns(ctx context.Context, version uint32, schema string, table string) (*protos.TableColumnsResponse, error)
func (*PostgresConnector) GetDatabaseVariant ¶
func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)
func (*PostgresConnector) GetDefaultPartitionKeyForTables ¶
func (c *PostgresConnector) GetDefaultPartitionKeyForTables( ctx context.Context, input *protos.GetDefaultPartitionKeyForTablesInput, ) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
func (*PostgresConnector) GetLastNormalizeBatchID ¶
func (*PostgresConnector) GetLastOffset ¶
func (c *PostgresConnector) GetLastOffset(ctx context.Context, jobName string) (model.CdcCheckpoint, error)
GetLastOffset returns the last synced offset for a job.
func (*PostgresConnector) GetLastSyncBatchID ¶
func (*PostgresConnector) GetMaxValue ¶
func (c *PostgresConnector) GetMaxValue( ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ) (any, error)
func (*PostgresConnector) GetQRepPartitions ¶
func (c *PostgresConnector) GetQRepPartitions( ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ) ([]*protos.QRepPartition, error)
func (*PostgresConnector) GetSchemaNameOfColumnTypeByOID ¶
func (c *PostgresConnector) GetSchemaNameOfColumnTypeByOID(ctx context.Context, typeOIDs []uint32) (map[uint32]string, error)
GetSchemaNameOfColumnTypeByOID returns a map of type OID to schema name for the given OIDs. If the type is in the "pg_catalog" schema, it returns an empty string for that OID.
func (*PostgresConnector) GetSchemas ¶
func (c *PostgresConnector) GetSchemas(ctx context.Context) (*protos.PeerSchemasResponse, error)
func (*PostgresConnector) GetSelectedColumns ¶
func (c *PostgresConnector) GetSelectedColumns( ctx context.Context, sourceTable *common.QualifiedTable, excludedColumns []string, ) ([]string, error)
func (*PostgresConnector) GetSlotInfo ¶
func (c *PostgresConnector) GetSlotInfo( ctx context.Context, slotName string, peerdbManagedOnly bool, customSlotNames []string, ) ([]*protos.SlotInfo, error)
GetSlotInfo gets the information about the replication slot size and LSNs. If slotName is non-empty, only that slot is returned. If slotName is empty and peerdbManagedOnly is false, all slots in the database are returned. If slotName is empty and peerdbManagedOnly is true, only slots with the peerflow_slot_ prefix (plus any explicitly listed customSlotNames) are returned.
func (*PostgresConnector) GetTableSchema ¶
func (c *PostgresConnector) GetTableSchema( ctx context.Context, env map[string]string, version uint32, system protos.TypeSystem, tableMapping []*protos.TableMapping, ) (map[string]*protos.TableSchema, error)
func (*PostgresConnector) GetTableSizeEstimatedBytes ¶
func (*PostgresConnector) GetTablesFromPublication ¶
func (c *PostgresConnector) GetTablesFromPublication( ctx context.Context, publicationName string, excludedTables []*common.QualifiedTable, ) ([]*common.QualifiedTable, error)
func (*PostgresConnector) GetTablesInSchema ¶
func (c *PostgresConnector) GetTablesInSchema( ctx context.Context, schema string, cdcEnabled bool, ) (*protos.SchemaTablesResponse, error)
func (*PostgresConnector) GetVersion ¶
func (c *PostgresConnector) GetVersion(ctx context.Context) (string, error)
func (*PostgresConnector) HandleSlotInfo ¶
func (c *PostgresConnector) HandleSlotInfo( ctx context.Context, alerter *alerting.Alerter, catalogPool shared.CatalogPool, alertKeys *alerting.AlertKeys, slotMetricGauges otel_metrics.SlotMetricGauges, ) error
func (*PostgresConnector) IsInRecovery ¶
func (c *PostgresConnector) IsInRecovery(ctx context.Context) (bool, error)
func (*PostgresConnector) IsQRepPartitionSynced ¶
func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput, ) (bool, error)
IsQRepPartitionSynced checks whether a specific partition is synced
func (*PostgresConnector) MajorVersion ¶
func (*PostgresConnector) MaybeStartReplication ¶
func (*PostgresConnector) NeedsSetupMetadataTables ¶
func (c *PostgresConnector) NeedsSetupMetadataTables(ctx context.Context) (bool, error)
NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
func (*PostgresConnector) NewPostgresCDCSource ¶
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error)
Create a new PostgresCDCSource
func (*PostgresConnector) NewQRepQueryExecutor ¶
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32, flowJobName string, partitionID string, ) (*QRepQueryExecutor, error)
func (*PostgresConnector) NewQRepQueryExecutorSnapshot ¶
func (*PostgresConnector) NormalizeRecords ¶
func (c *PostgresConnector) NormalizeRecords( ctx context.Context, req *model.NormalizeRecordsRequest, ) (model.NormalizeResponse, error)
func (*PostgresConnector) PullFlowCleanup ¶
func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error
func (*PostgresConnector) PullPg ¶
func (c *PostgresConnector) PullPg( ctx context.Context, catalogPool shared.CatalogPool, otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[model.PgItems], ) error
func (*PostgresConnector) PullPgQRepRecords ¶
func (c *PostgresConnector) PullPgQRepRecords( ctx context.Context, _otelManager *otel_metrics.OtelManager, config *protos.QRepConfig, _dstType protos.DBType, partition *protos.QRepPartition, stream PgCopyWriter, ) (int64, int64, error)
func (*PostgresConnector) PullQRepRecords ¶
func (c *PostgresConnector) PullQRepRecords( ctx context.Context, _otelManager *otel_metrics.OtelManager, config *protos.QRepConfig, dstType protos.DBType, partition *protos.QRepPartition, stream *model.QRecordStream, ) (int64, int64, error)
func (*PostgresConnector) PullRecords ¶
func (c *PostgresConnector) PullRecords( ctx context.Context, catalogPool shared.CatalogPool, otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[model.RecordItems], ) error
func (*PostgresConnector) PullXminPgRecordStream ¶
func (c *PostgresConnector) PullXminPgRecordStream( ctx context.Context, config *protos.QRepConfig, _dstType protos.DBType, partition *protos.QRepPartition, pipe PgCopyWriter, ) (int64, int64, int64, error)
func (*PostgresConnector) PullXminRecordStream ¶
func (c *PostgresConnector) PullXminRecordStream( ctx context.Context, config *protos.QRepConfig, dstType protos.DBType, partition *protos.QRepPartition, stream *model.QRecordStream, ) (int64, int64, int64, error)
func (*PostgresConnector) RemoveTableEntriesFromRawTable ¶
func (c *PostgresConnector) RemoveTableEntriesFromRawTable( ctx context.Context, req *protos.RemoveTablesFromRawTableInput, ) error
func (*PostgresConnector) RemoveTablesFromPublication ¶
func (c *PostgresConnector) RemoveTablesFromPublication(ctx context.Context, req *protos.RemoveTablesFromPublicationInput) error
func (*PostgresConnector) RenameTables ¶
func (c *PostgresConnector) RenameTables( ctx context.Context, req *protos.RenameTablesInput, tableNameSchemaMapping map[string]*protos.TableSchema, ) (*protos.RenameTablesOutput, error)
func (*PostgresConnector) ReplPing ¶
func (c *PostgresConnector) ReplPing(ctx context.Context) error
To keep connection alive between sync batches. By default postgres drops connection after 1 minute of inactivity.
func (*PostgresConnector) ReplayTableSchemaDeltas ¶
func (c *PostgresConnector) ReplayTableSchemaDeltas( ctx context.Context, _ map[string]string, flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ []string, ) error
replayTableSchemaDeltaCore changes a destination table to match the schema at source This could involve adding or dropping multiple columns.
func (*PostgresConnector) SetLastOffset ¶
func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, lastOffset model.CdcCheckpoint) error
SetLastOffset updates the last synced offset for a job.
func (*PostgresConnector) SetupMetadataTables ¶
func (c *PostgresConnector) SetupMetadataTables(ctx context.Context) error
SetupMetadataTables sets up the metadata tables.
func (*PostgresConnector) SetupNormalizedTable ¶
func (c *PostgresConnector) SetupNormalizedTable( ctx context.Context, tx any, config *protos.SetupNormalizedTableBatchInput, tableIdentifier string, tableSchema *protos.TableSchema, ) (bool, error)
func (*PostgresConnector) SetupQRepMetadataTables ¶
func (c *PostgresConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error
SetupQRepMetadataTables function for postgres connector
func (*PostgresConnector) SetupReplConn ¶
func (*PostgresConnector) SetupReplication ¶
func (c *PostgresConnector) SetupReplication( ctx context.Context, req *protos.SetupReplicationInput, ) (model.SetupReplicationResult, error)
SetupReplication sets up replication for the source connector
func (*PostgresConnector) StartSetupNormalizedTables ¶
func (c *PostgresConnector) StartSetupNormalizedTables(ctx context.Context) (any, error)
func (*PostgresConnector) StatActivity ¶
func (c *PostgresConnector) StatActivity( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerStatResponse, error)
func (*PostgresConnector) SyncFlowCleanup ¶
func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string) error
func (*PostgresConnector) SyncPg ¶
func (c *PostgresConnector) SyncPg(ctx context.Context, req *model.SyncRecordsRequest[model.PgItems]) (*model.SyncResponse, error)
func (*PostgresConnector) SyncPgQRepRecords ¶
func (c *PostgresConnector) SyncPgQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, pipe PgCopyReader, ) (int64, shared.QRepWarnings, error)
func (*PostgresConnector) SyncQRepRecords ¶
func (c *PostgresConnector) SyncQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, stream *model.QRecordStream, ) (int64, shared.QRepWarnings, error)
func (*PostgresConnector) SyncRecords ¶
func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error)
func (*PostgresConnector) UpdateReplStateLastOffset ¶
func (c *PostgresConnector) UpdateReplStateLastOffset(_ context.Context, lastOffset model.CdcCheckpoint) error
func (*PostgresConnector) ValidateCheck ¶
func (c *PostgresConnector) ValidateCheck(ctx context.Context) error
func (*PostgresConnector) ValidateMirrorSource ¶
func (c *PostgresConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error
type QRepPullSink ¶
type QRepQueryExecutor ¶
type QRepQueryExecutor struct {
*PostgresConnector
// contains filtered or unexported fields
}
func (*QRepQueryExecutor) ExecuteAndProcessQuery ¶
func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( ctx context.Context, query string, args ...any, ) (*model.QRecordBatch, error)
func (*QRepQueryExecutor) ExecuteAndProcessQueryStream ¶
func (*QRepQueryExecutor) ExecuteQuery ¶
func (*QRepQueryExecutor) ExecuteQueryIntoSink ¶
func (qe *QRepQueryExecutor) ExecuteQueryIntoSink( ctx context.Context, sink QRepPullSink, query string, args ...any, ) (int64, int64, error)
func (*QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin ¶
func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin( ctx context.Context, sink QRepPullSink, query string, args ...any, ) (int64, int64, int64, error)
type QRepSyncSink ¶
type RecordStreamSink ¶
type RecordStreamSink struct {
*model.QRecordStream
DestinationType protos.DBType
}
func (RecordStreamSink) CopyInto ¶
func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error)
func (RecordStreamSink) ExecuteQueryWithTx ¶
func (RecordStreamSink) GetColumnNames ¶
func (stream RecordStreamSink) GetColumnNames() ([]string, error)
type RelaxedNumberExtension ¶
type RelaxedNumberExtension struct {
jsoniter.DummyExtension
}
func (*RelaxedNumberExtension) CreateDecoder ¶
func (extension *RelaxedNumberExtension) CreateDecoder(typ reflect2.Type) jsoniter.ValDecoder
type ReplicaIdentityType ¶
type ReplicaIdentityType rune
const ( ReplicaIdentityDefault ReplicaIdentityType = 'd' ReplicaIdentityFull ReplicaIdentityType = 'f' ReplicaIdentityIndex ReplicaIdentityType = 'i' ReplicaIdentityNothing ReplicaIdentityType = 'n' )