Documentation
¶
Index ¶
- Constants
- Variables
- func ArrayCastElements[T any](arr []any) []T
- func ArrayMinus[T comparable](first, second []T) []T
- func ArraysHaveOverlap[T comparable](first, second []T) bool
- func AtomicInt64Max(a *atomic.Int64, v int64)
- func CreateTlsConfig(minVersion uint16, rootCAs *string, host string, tlsHost string, ...) (*tls.Config, error)
- func DecodePKCS8PrivateKey(rawKey []byte, password *string) (*rsa.PrivateKey, error)
- func DivCeil[T constraints.Integer](x, y T) T
- func GetCustomDataTypes(ctx context.Context, conn *pgx.Conn) (map[uint32]CustomDataType, error)
- func IsSQLStateError(err error, sqlStates ...string) bool
- func IsValidReplicationName(s string) bool
- func JoinHostPort[I constraints.Integer](host string, port I) string
- func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T
- func LogError(logger log.Logger, err error) error
- func NewSearchAttributes(mirrorName string) temporal.SearchAttributes
- func NewSlogHandler(handler slog.Handler) slog.Handler
- func NewSlogHandlerOptions() *slog.HandlerOptions
- func ParsePgArrayStringToStringSlice(data string, delim byte) []string
- func ParsePgArrayToStringSlice(data []byte, delim byte) []string
- func Ptr[T any](x T) *T
- func RandomString(n int) string
- func RegisterExtensions(ctx context.Context, conn *pgx.Conn, version uint32) error
- func ReplaceIllegalCharactersWithUnderscores(s string) string
- func RollbackTx(tx pgx.Tx, logger log.Logger)
- func SkipSendingToIncidentIo(errTags []string) bool
- func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable
- func UnsafeFastReadOnlyBytesToString(s []byte) string
- func UnsafeFastStringToReadOnlyBytes(s string) []byte
- func Val[T any](p *T) T
- func WrapError(s string, err error) error
- type AdjustedPartitions
- type BoundSelector
- type CatalogPool
- func (p CatalogPool) Begin(ctx context.Context) (pgx.Tx, error)
- func (p CatalogPool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (p CatalogPool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
- func (p CatalogPool) Ping(ctx context.Context) error
- func (p CatalogPool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
- func (p CatalogPool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
- type CatalogRow
- type CatalogRows
- func (rows CatalogRows) Close()
- func (rows CatalogRows) CommandTag() pgconn.CommandTag
- func (rows CatalogRows) Conn() *pgx.Conn
- func (rows CatalogRows) Err() error
- func (rows CatalogRows) FieldDescriptions() []pgconn.FieldDescription
- func (rows CatalogRows) Next() bool
- func (rows CatalogRows) RawValues() [][]byte
- func (rows CatalogRows) Scan(dest ...any) error
- func (rows CatalogRows) Values() ([]any, error)
- type CatalogTx
- func (tx CatalogTx) Begin(ctx context.Context) (pgx.Tx, error)
- func (tx CatalogTx) Commit(ctx context.Context) error
- func (tx CatalogTx) Conn() *pgx.Conn
- func (tx CatalogTx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, ...) (int64, error)
- func (tx CatalogTx) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
- func (tx CatalogTx) LargeObjects() pgx.LargeObjects
- func (tx CatalogTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)
- func (tx CatalogTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
- func (tx CatalogTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
- func (tx CatalogTx) Rollback(ctx context.Context) error
- func (tx CatalogTx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
- type ContextKey
- type CustomDataType
- type ErrType
- type PGVersion
- type PeerDBEncKey
- type PeerDBEncKeys
- type QRepWarnings
- type SlogHandler
- type TaskQueueID
- type WatchWriter
Constants ¶
View Source
const ( MoneyOID uint32 = 790 TxidSnapshotOID uint32 = 2970 TsvectorOID uint32 = 3614 TsqueryOID uint32 = 3615 )
View Source
const ( InternalVersion_First uint32 = iota // Postgres: vector types ("vector", "halfvec", "sparsevec") replicated as float arrays instead of string InternalVersion_PgVectorAsFloatArray // MongoDB: rename `_full_document` column to `doc` InternalVersion_MongoDBFullDocumentColumnToDoc // All: setting json_type_escape_dots_in_keys = true when inserting JSON column to ClickHouse (only impacts MongoDB today) InternalVersion_JsonEscapeDotsInKeys // MongoDB: `_id` column values stored as-is without redundant quotes InternalVersion_MongoDBIdWithoutRedundantQuotes TotalNumberOfInternalVersions InternalVersion_Latest = TotalNumberOfInternalVersions - 1 )
View Source
const ( QRepFetchSize = 128 * 1024 QRepChannelSize = 1024 )
View Source
const (
POSTGRES_TABLE_OID_MIGRATION = "POSTGRES_TABLE_OID_MIGRATION"
)
Variables ¶
View Source
var ( ErrSlotAlreadyExists = temporal.NewNonRetryableApplicationError("slot already exists", exceptions.ApplicationErrorTypeIrrecoverableExistingSlot.String(), nil) ErrTableDoesNotExist = temporal.NewNonRetryableApplicationError("table does not exist", exceptions.ApplicationErrorTypeIrrecoverableMissingTables.String(), nil) )
View Source
var ( LuaTime = glua64.UserDataType[time.Time]{Name: "peerdb_time"} LuaUuid = glua64.UserDataType[uuid.UUID]{Name: "peerdb_uuid"} LuaBigInt = glua64.UserDataType[*big.Int]{Name: "peerdb_bigint"} LuaDecimal = glua64.UserDataType[decimal.Decimal]{Name: "peerdb_decimal"} )
View Source
var MirrorNameSearchAttribute = temporal.NewSearchAttributeKeyString("MirrorName")
View Source
var Year0000 = time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC)
Functions ¶
func ArrayCastElements ¶
func ArraysHaveOverlap ¶
func ArraysHaveOverlap[T comparable](first, second []T) bool
func AtomicInt64Max ¶
TODO remove after https://github.com/golang/go/issues/63999
func CreateTlsConfig ¶
func DecodePKCS8PrivateKey ¶
func DecodePKCS8PrivateKey(rawKey []byte, password *string) (*rsa.PrivateKey, error)
func DivCeil ¶
func DivCeil[T constraints.Integer](x, y T) T
func GetCustomDataTypes ¶
func IsSQLStateError ¶
func IsValidReplicationName ¶
func JoinHostPort ¶
func JoinHostPort[I constraints.Integer](host string, port I) string
func LTableToSlice ¶
func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T
func NewSearchAttributes ¶
func NewSearchAttributes(mirrorName string) temporal.SearchAttributes
func NewSlogHandlerOptions ¶
func NewSlogHandlerOptions() *slog.HandlerOptions
func ParsePgArrayStringToStringSlice ¶
see array_in from postgres
func RandomString ¶
func RegisterExtensions ¶
func SkipSendingToIncidentIo ¶
func SliceToLTable ¶
func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable
Types ¶
type AdjustedPartitions ¶
AdjustedPartitions represents the adjusted partitioning parameters
func AdjustNumPartitions ¶
func AdjustNumPartitions(totalRows int64, desiredRowsPerPartition int64) AdjustedPartitions
AdjustNumPartitions takes the total number of rows and the desired number of rows per partition, and returns the adjusted number of partitions and rows per partition so that the partition count does not exceed 1000. It does so by increasing the rows-per-partition by a power-of-10 multiplier when necessary.
type BoundSelector ¶
type BoundSelector struct {
// contains filtered or unexported fields
}
func NewBoundSelector ¶
func NewBoundSelector(ctx workflow.Context, selectorName string, limit int) *BoundSelector
func (*BoundSelector) SpawnChild ¶
type CatalogPool ¶
func (CatalogPool) Exec ¶
func (p CatalogPool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
type CatalogRow ¶
func (CatalogRow) Scan ¶
func (row CatalogRow) Scan(dest ...any) error
type CatalogRows ¶
func (CatalogRows) Close ¶
func (rows CatalogRows) Close()
func (CatalogRows) CommandTag ¶
func (rows CatalogRows) CommandTag() pgconn.CommandTag
func (CatalogRows) Conn ¶
func (rows CatalogRows) Conn() *pgx.Conn
func (CatalogRows) Err ¶
func (rows CatalogRows) Err() error
func (CatalogRows) FieldDescriptions ¶
func (rows CatalogRows) FieldDescriptions() []pgconn.FieldDescription
func (CatalogRows) Next ¶
func (rows CatalogRows) Next() bool
func (CatalogRows) RawValues ¶
func (rows CatalogRows) RawValues() [][]byte
func (CatalogRows) Scan ¶
func (rows CatalogRows) Scan(dest ...any) error
func (CatalogRows) Values ¶
func (rows CatalogRows) Values() ([]any, error)
type CatalogTx ¶
func (CatalogTx) CopyFrom ¶
func (tx CatalogTx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
func (CatalogTx) LargeObjects ¶
func (tx CatalogTx) LargeObjects() pgx.LargeObjects
type ContextKey ¶
type ContextKey string
const ( FlowNameKey ContextKey = "flowName" PartitionIDKey ContextKey = "partitionId" DeploymentUIDKey ContextKey = "deploymentUid" RequestIdKey ContextKey = "x-peerdb-request-id" )
func (ContextKey) String ¶
func (c ContextKey) String() string
type CustomDataType ¶
type PeerDBEncKey ¶
PeerDBEncKey is a key for encrypting and decrypting data.
type PeerDBEncKeys ¶
type PeerDBEncKeys []PeerDBEncKey
func (PeerDBEncKeys) Get ¶
func (e PeerDBEncKeys) Get(id string) (PeerDBEncKey, error)
type QRepWarnings ¶
type QRepWarnings []error
type SlogHandler ¶
type TaskQueueID ¶
type TaskQueueID string
const ( // Task Queues PeerFlowTaskQueue TaskQueueID = "peer-flow-task-queue" SnapshotFlowTaskQueue TaskQueueID = "snapshot-flow-task-queue" MaintenanceFlowTaskQueue TaskQueueID = "maintenance-flow-task-queue" // Queries CDCFlowStateQuery = "q-cdc-flow-state" QRepFlowStateQuery = "q-qrep-flow-state" )
type WatchWriter ¶
type WatchWriter struct {
// contains filtered or unexported fields
}
func NewWatchWriter ¶
func NewWatchWriter(w io.Writer, size *atomic.Int64) *WatchWriter
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
This is in reference to PostgreSQL's hstore: https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c
|
This is in reference to PostgreSQL's hstore: https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c |
Click to show internal directories.
Click to hide internal directories.