Documentation
¶
Index ¶
- Constants
- Variables
- func ArrayCastElements[T any](arr []any) []T
- func ArrayMinus[T comparable](first, second []T) []T
- func ArraysHaveOverlap[T comparable](first, second []T) bool
- func AtomicInt64Max(a *atomic.Int64, v int64)
- func BigIntDivCeil(x, y *big.Int) *big.Int
- func DecodePKCS8PrivateKey(rawKey []byte, password *string) (*rsa.PrivateKey, error)
- func DivCeil[T constraints.Integer](x, y T) T
- func GetCustomDataTypes(ctx context.Context, conn *pgx.Conn) (map[uint32]CustomDataType, error)
- func IsSQLStateError(err error, sqlStates ...string) bool
- func IsSQLStateErrorSubstring(err error, sqlState string, substring string) bool
- func IsValidReplicationName(s string) bool
- func JoinHostPort[I constraints.Integer](host string, port I) string
- func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T
- func LogError(logger log.Logger, err error) error
- func NewSearchAttributes(mirrorName string) temporal.SearchAttributes
- func NewSlogHandler(handler slog.Handler) slog.Handler
- func NewSlogHandlerOptions() *slog.HandlerOptions
- func ParsePgArrayStringToStringSlice(data string, delim byte) []string
- func ParsePgArrayToStringSlice(data []byte, delim byte) []string
- func RandomString(n int) string
- func RegisterExtensions(ctx context.Context, conn *pgx.Conn, version uint32) error
- func ReplaceIllegalCharactersWithUnderscores(s string) string
- func RollbackTx(tx pgx.Tx, logger log.Logger)
- func SkipSendingToIncidentIo(errTags []string) bool
- func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable
- func UnsafeFastReadOnlyBytesToString(s []byte) string
- func UnsafeFastStringToReadOnlyBytes(s string) []byte
- func Val[T any](p *T) T
- func WrapError(s string, err error) error
- type AdjustedPartitions
- type BoundSelector
- type CatalogPool
- func (p CatalogPool) Begin(ctx context.Context) (pgx.Tx, error)
- func (p CatalogPool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (p CatalogPool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
- func (p CatalogPool) Ping(ctx context.Context) error
- func (p CatalogPool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
- func (p CatalogPool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
- type CatalogRow
- type CatalogRows
- func (rows CatalogRows) Close()
- func (rows CatalogRows) CommandTag() pgconn.CommandTag
- func (rows CatalogRows) Conn() *pgx.Conn
- func (rows CatalogRows) Err() error
- func (rows CatalogRows) FieldDescriptions() []pgconn.FieldDescription
- func (rows CatalogRows) Next() bool
- func (rows CatalogRows) RawValues() [][]byte
- func (rows CatalogRows) Scan(dest ...any) error
- func (rows CatalogRows) Values() ([]any, error)
- type CatalogTx
- func (tx CatalogTx) Begin(ctx context.Context) (pgx.Tx, error)
- func (tx CatalogTx) Commit(ctx context.Context) error
- func (tx CatalogTx) Conn() *pgx.Conn
- func (tx CatalogTx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, ...) (int64, error)
- func (tx CatalogTx) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
- func (tx CatalogTx) LargeObjects() pgx.LargeObjects
- func (tx CatalogTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)
- func (tx CatalogTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
- func (tx CatalogTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
- func (tx CatalogTx) Rollback(ctx context.Context) error
- func (tx CatalogTx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
- type ContextKey
- type CustomDataType
- type ErrType
- type PGVersion
- type PeerDBEncKey
- type PeerDBEncKeys
- type QRepWarnings
- type SlogHandler
- type TaskQueueID
Constants ¶
const ( InternalVersion_First uint32 = iota // Postgres: vector types ("vector", "halfvec", "sparsevec") replicated as float arrays instead of string InternalVersion_PgVectorAsFloatArray // MongoDB: rename `_full_document` column to `doc` InternalVersion_MongoDBFullDocumentColumnToDoc // All: setting json_type_escape_dots_in_keys = true when inserting JSON column to ClickHouse (only impacts MongoDB today) InternalVersion_JsonEscapeDotsInKeys // MongoDB: `_id` column values stored as-is without redundant quotes InternalVersion_MongoDBIdWithoutRedundantQuotes // MySQL: convert enums to integers for older versions without binlog row metadata support InternalVersion_MySQL5ConvertEnumsToInts TotalNumberOfInternalVersions InternalVersion_Latest = TotalNumberOfInternalVersions - 1 )
const ( QRepFetchSize = 128 * 1024 QRepChannelSize = 1024 )
const (
Flag_ClickHouseTime64Enabled = "clickhouse_time64_enabled"
)
Flag constants for flow config Flags mapping
const (
POSTGRES_TABLE_OID_MIGRATION = "POSTGRES_TABLE_OID_MIGRATION"
)
Variables ¶
var ( ErrSlotAlreadyExists = temporal.NewNonRetryableApplicationError("slot already exists", exceptions.ApplicationErrorTypeIrrecoverableExistingSlot.String(), nil) ErrTableDoesNotExist = temporal.NewNonRetryableApplicationError("table does not exist", exceptions.ApplicationErrorTypeIrrecoverableMissingTables.String(), nil) )
var ( LuaTime = glua64.UserDataType[time.Time]{Name: "peerdb_time"} LuaUuid = glua64.UserDataType[uuid.UUID]{Name: "peerdb_uuid"} LuaBigInt = glua64.UserDataType[*big.Int]{Name: "peerdb_bigint"} LuaDecimal = glua64.UserDataType[decimal.Decimal]{Name: "peerdb_decimal"} )
var MirrorNameSearchAttribute = temporal.NewSearchAttributeKeyString("MirrorName")
var Year0000 = time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC)
Functions ¶
func ArrayCastElements ¶
func ArraysHaveOverlap ¶
func ArraysHaveOverlap[T comparable](first, second []T) bool
func AtomicInt64Max ¶
TODO remove after https://github.com/golang/go/issues/63999
func BigIntDivCeil ¶
BigIntDivCeil returns ceil(x / y) for big.Int values.
func DecodePKCS8PrivateKey ¶
func DecodePKCS8PrivateKey(rawKey []byte, password *string) (*rsa.PrivateKey, error)
func DivCeil ¶
func DivCeil[T constraints.Integer](x, y T) T
func GetCustomDataTypes ¶
func IsSQLStateError ¶
func IsValidReplicationName ¶
func JoinHostPort ¶
func JoinHostPort[I constraints.Integer](host string, port I) string
func LTableToSlice ¶
func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T
func NewSearchAttributes ¶
func NewSearchAttributes(mirrorName string) temporal.SearchAttributes
func NewSlogHandlerOptions ¶
func NewSlogHandlerOptions() *slog.HandlerOptions
func ParsePgArrayStringToStringSlice ¶
see array_in from postgres
func RandomString ¶
func RegisterExtensions ¶
func SkipSendingToIncidentIo ¶
func SliceToLTable ¶
func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable
Types ¶
type AdjustedPartitions ¶
AdjustedPartitions represents the adjusted partitioning parameters
func AdjustNumPartitions ¶
func AdjustNumPartitions(totalRows int64, desiredRowsPerPartition int64) AdjustedPartitions
AdjustNumPartitions takes the total number of rows and the desired number of rows per partition, and returns the adjusted number of partitions and rows per partition so that the partition count does not exceed 1000. When adjustment is needed, resulting partition count will be in [901, 1000] range for 9K+ rows and [501, 1000] range for 1-9K rows.
type BoundSelector ¶
type BoundSelector struct {
// contains filtered or unexported fields
}
func NewBoundSelector ¶
func NewBoundSelector(ctx workflow.Context, selectorName string, limit int) *BoundSelector
func (*BoundSelector) SpawnChild ¶
type CatalogPool ¶
func (CatalogPool) Exec ¶
func (p CatalogPool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
type CatalogRow ¶
func (CatalogRow) Scan ¶
func (row CatalogRow) Scan(dest ...any) error
type CatalogRows ¶
func (CatalogRows) Close ¶
func (rows CatalogRows) Close()
func (CatalogRows) CommandTag ¶
func (rows CatalogRows) CommandTag() pgconn.CommandTag
func (CatalogRows) Conn ¶
func (rows CatalogRows) Conn() *pgx.Conn
func (CatalogRows) Err ¶
func (rows CatalogRows) Err() error
func (CatalogRows) FieldDescriptions ¶
func (rows CatalogRows) FieldDescriptions() []pgconn.FieldDescription
func (CatalogRows) Next ¶
func (rows CatalogRows) Next() bool
func (CatalogRows) RawValues ¶
func (rows CatalogRows) RawValues() [][]byte
func (CatalogRows) Scan ¶
func (rows CatalogRows) Scan(dest ...any) error
func (CatalogRows) Values ¶
func (rows CatalogRows) Values() ([]any, error)
type CatalogTx ¶
func (CatalogTx) CopyFrom ¶
func (tx CatalogTx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
func (CatalogTx) LargeObjects ¶
func (tx CatalogTx) LargeObjects() pgx.LargeObjects
type ContextKey ¶
type ContextKey string
const ( FlowNameKey ContextKey = "flowName" PartitionIDKey ContextKey = "partitionId" DeploymentUIDKey ContextKey = "deploymentUid" RequestIdKey ContextKey = "x-peerdb-request-id" )
func (ContextKey) String ¶
func (c ContextKey) String() string
type CustomDataType ¶
type CustomDataType = pgutils.CustomDataType
CustomDataType is an alias for the canonical type in flow/pkg/postgres.
type PeerDBEncKey ¶
PeerDBEncKey is a key for encrypting and decrypting data.
type PeerDBEncKeys ¶
type PeerDBEncKeys []PeerDBEncKey
func (PeerDBEncKeys) Get ¶
func (e PeerDBEncKeys) Get(id string) (PeerDBEncKey, error)
type QRepWarnings ¶
type QRepWarnings []error
type SlogHandler ¶
type TaskQueueID ¶
type TaskQueueID string
const ( // Task Queues PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue" SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue" MaintenanceFlowTaskQueue TaskQueueID = "maintenance-flow-task-queue" // Queries CDCFlowStateQuery = "q-cdc-flow-state" QRepFlowStateQuery = "q-qrep-flow-state" )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
This is in reference to PostgreSQL's hstore: https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c
|
This is in reference to PostgreSQL's hstore: https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c |