shared

package
v0.0.0-...-6f847f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2026 License: AGPL-3.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MoneyOID        uint32 = 790
	TxidSnapshotOID uint32 = 2970
	TsvectorOID     uint32 = 3614
	TsqueryOID      uint32 = 3615
)
View Source
const (
	InternalVersion_First uint32 = iota
	// Postgres: vector types ("vector", "halfvec", "sparsevec") replicated as float arrays instead of string
	InternalVersion_PgVectorAsFloatArray
	// MongoDB: rename `_full_document` column to `doc`
	InternalVersion_MongoDBFullDocumentColumnToDoc
	// All: setting json_type_escape_dots_in_keys = true when inserting JSON column to ClickHouse (only impacts MongoDB today)
	InternalVersion_JsonEscapeDotsInKeys
	// MongoDB: `_id` column values stored as-is without redundant quotes
	InternalVersion_MongoDBIdWithoutRedundantQuotes

	TotalNumberOfInternalVersions
	InternalVersion_Latest = TotalNumberOfInternalVersions - 1
)
View Source
const (
	QRepFetchSize   = 128 * 1024
	QRepChannelSize = 1024
)
View Source
const (
	POSTGRES_TABLE_OID_MIGRATION = "POSTGRES_TABLE_OID_MIGRATION"
)

Variables

View Source
var (
	LuaTime    = glua64.UserDataType[time.Time]{Name: "peerdb_time"}
	LuaUuid    = glua64.UserDataType[uuid.UUID]{Name: "peerdb_uuid"}
	LuaBigInt  = glua64.UserDataType[*big.Int]{Name: "peerdb_bigint"}
	LuaDecimal = glua64.UserDataType[decimal.Decimal]{Name: "peerdb_decimal"}
)
View Source
var MirrorNameSearchAttribute = temporal.NewSearchAttributeKeyString("MirrorName")
View Source
var Year0000 = time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC)

Functions

func ArrayCastElements

func ArrayCastElements[T any](arr []any) []T

func ArrayMinus

func ArrayMinus[T comparable](first, second []T) []T

first - second

func ArraysHaveOverlap

func ArraysHaveOverlap[T comparable](first, second []T) bool

func AtomicInt64Max

func AtomicInt64Max(a *atomic.Int64, v int64)

TODO remove after https://github.com/golang/go/issues/63999

func CreateTlsConfig

func CreateTlsConfig(minVersion uint16, rootCAs *string, host string, tlsHost string, skipCertVerification bool) (*tls.Config, error)

func DecodePKCS8PrivateKey

func DecodePKCS8PrivateKey(rawKey []byte, password *string) (*rsa.PrivateKey, error)

func DivCeil

func DivCeil[T constraints.Integer](x, y T) T

func GetCustomDataTypes

func GetCustomDataTypes(ctx context.Context, conn *pgx.Conn) (map[uint32]CustomDataType, error)

func IsSQLStateError

func IsSQLStateError(err error, sqlStates ...string) bool

func IsValidReplicationName

func IsValidReplicationName(s string) bool

func JoinHostPort

func JoinHostPort[I constraints.Integer](host string, port I) string

func LTableToSlice

func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T

func LogError

func LogError(logger log.Logger, err error) error

func NewSearchAttributes

func NewSearchAttributes(mirrorName string) temporal.SearchAttributes

func NewSlogHandler

func NewSlogHandler(handler slog.Handler) slog.Handler

func NewSlogHandlerOptions

func NewSlogHandlerOptions() *slog.HandlerOptions

func ParsePgArrayStringToStringSlice

func ParsePgArrayStringToStringSlice(data string, delim byte) []string

see array_in from postgres

func ParsePgArrayToStringSlice

func ParsePgArrayToStringSlice(data []byte, delim byte) []string

func Ptr

func Ptr[T any](x T) *T

func RandomString

func RandomString(n int) string

func RegisterExtensions

func RegisterExtensions(ctx context.Context, conn *pgx.Conn, version uint32) error

func ReplaceIllegalCharactersWithUnderscores

func ReplaceIllegalCharactersWithUnderscores(s string) string

func RollbackTx

func RollbackTx(tx pgx.Tx, logger log.Logger)

func SkipSendingToIncidentIo

func SkipSendingToIncidentIo(errTags []string) bool

func SliceToLTable

func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable

func UnsafeFastReadOnlyBytesToString

func UnsafeFastReadOnlyBytesToString(s []byte) string

func UnsafeFastStringToReadOnlyBytes

func UnsafeFastStringToReadOnlyBytes(s string) []byte

func Val

func Val[T any](p *T) T

func WrapError

func WrapError(s string, err error) error

Types

type AdjustedPartitions

type AdjustedPartitions struct {
	AdjustedNumPartitions       int64
	AdjustedNumRowsPerPartition int64
}

AdjustedPartitions represents the adjusted partitioning parameters

func AdjustNumPartitions

func AdjustNumPartitions(totalRows int64, desiredRowsPerPartition int64) AdjustedPartitions

AdjustNumPartitions takes the total number of rows and the desired number of rows per partition, and returns the adjusted number of partitions and rows per partition so that the partition count does not exceed 1000. It does so by increasing the rows-per-partition by a power-of-10 multiplier when necessary.

type BoundSelector

type BoundSelector struct {
	// contains filtered or unexported fields
}

func NewBoundSelector

func NewBoundSelector(ctx workflow.Context, selectorName string, limit int) *BoundSelector

func (*BoundSelector) SpawnChild

func (s *BoundSelector) SpawnChild(ctx workflow.Context, w any, futureCallback func(workflow.Future), args ...any) error

func (*BoundSelector) Wait

func (s *BoundSelector) Wait(ctx workflow.Context) error

type CatalogPool

type CatalogPool struct {
	Pool *pgxpool.Pool
}

func (CatalogPool) Begin

func (p CatalogPool) Begin(ctx context.Context) (pgx.Tx, error)

func (CatalogPool) BeginTx

func (p CatalogPool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)

func (CatalogPool) Exec

func (p CatalogPool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)

func (CatalogPool) Ping

func (p CatalogPool) Ping(ctx context.Context) error

func (CatalogPool) Query

func (p CatalogPool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)

func (CatalogPool) QueryRow

func (p CatalogPool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row

type CatalogRow

type CatalogRow struct {
	Row pgx.Row
}

func (CatalogRow) Scan

func (row CatalogRow) Scan(dest ...any) error

type CatalogRows

type CatalogRows struct {
	Rows pgx.Rows
}

func (CatalogRows) Close

func (rows CatalogRows) Close()

func (CatalogRows) CommandTag

func (rows CatalogRows) CommandTag() pgconn.CommandTag

func (CatalogRows) Conn

func (rows CatalogRows) Conn() *pgx.Conn

func (CatalogRows) Err

func (rows CatalogRows) Err() error

func (CatalogRows) FieldDescriptions

func (rows CatalogRows) FieldDescriptions() []pgconn.FieldDescription

func (CatalogRows) Next

func (rows CatalogRows) Next() bool

func (CatalogRows) RawValues

func (rows CatalogRows) RawValues() [][]byte

func (CatalogRows) Scan

func (rows CatalogRows) Scan(dest ...any) error

func (CatalogRows) Values

func (rows CatalogRows) Values() ([]any, error)

type CatalogTx

type CatalogTx struct {
	Tx pgx.Tx
}

func (CatalogTx) Begin

func (tx CatalogTx) Begin(ctx context.Context) (pgx.Tx, error)

func (CatalogTx) Commit

func (tx CatalogTx) Commit(ctx context.Context) error

func (CatalogTx) Conn

func (tx CatalogTx) Conn() *pgx.Conn

func (CatalogTx) CopyFrom

func (tx CatalogTx) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)

func (CatalogTx) Exec

func (tx CatalogTx) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)

func (CatalogTx) LargeObjects

func (tx CatalogTx) LargeObjects() pgx.LargeObjects

func (CatalogTx) Prepare

func (tx CatalogTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)

func (CatalogTx) Query

func (tx CatalogTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)

func (CatalogTx) QueryRow

func (tx CatalogTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row

func (CatalogTx) Rollback

func (tx CatalogTx) Rollback(ctx context.Context) error

func (CatalogTx) SendBatch

func (tx CatalogTx) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults

type ContextKey

type ContextKey string
const (
	FlowNameKey      ContextKey = "flowName"
	PartitionIDKey   ContextKey = "partitionId"
	DeploymentUIDKey ContextKey = "deploymentUid"
	RequestIdKey     ContextKey = "x-peerdb-request-id"
)

func (ContextKey) String

func (c ContextKey) String() string

type CustomDataType

type CustomDataType struct {
	Name  string
	Type  byte
	Delim byte // non-zero character for arrays
}

type ErrType

type ErrType string
const (
	ErrTypeCanceled ErrType = "err:Canceled"
	ErrTypeClosed   ErrType = "err:Closed"
	ErrTypeNet      ErrType = "err:Net"
	ErrTypeEOF      ErrType = "err:EOF"
)

type PGVersion

type PGVersion int32
const (
	POSTGRES_12 PGVersion = 120000
	POSTGRES_13 PGVersion = 130000
	POSTGRES_14 PGVersion = 140000
	POSTGRES_15 PGVersion = 150000
	POSTGRES_16 PGVersion = 160000
	POSTGRES_17 PGVersion = 170000
)

func GetMajorVersion

func GetMajorVersion(ctx context.Context, conn *pgx.Conn) (PGVersion, error)

type PeerDBEncKey

type PeerDBEncKey struct {
	ID    string `json:"id"`
	Value string `json:"value"`
}

PeerDBEncKey is a key for encrypting and decrypting data.

func (PeerDBEncKey) Decrypt

func (key PeerDBEncKey) Decrypt(ciphertext []byte) ([]byte, error)

Decrypt decrypts the given ciphertext using the PeerDBEncKey.

func (PeerDBEncKey) Encrypt

func (key PeerDBEncKey) Encrypt(plaintext []byte) ([]byte, error)

Encrypt encrypts the given plaintext using the PeerDBEncKey.

type PeerDBEncKeys

type PeerDBEncKeys []PeerDBEncKey

func (PeerDBEncKeys) Get

func (e PeerDBEncKeys) Get(id string) (PeerDBEncKey, error)

type QRepWarnings

type QRepWarnings []error

type SlogHandler

type SlogHandler struct {
	slog.Handler
}

func (SlogHandler) Handle

func (h SlogHandler) Handle(ctx context.Context, record slog.Record) error

type TaskQueueID

type TaskQueueID string
const (
	// Task Queues
	PeerFlowTaskQueue        TaskQueueID = "peer-flow-task-queue"
	SnapshotFlowTaskQueue    TaskQueueID = "snapshot-flow-task-queue"
	MaintenanceFlowTaskQueue TaskQueueID = "maintenance-flow-task-queue"

	// Queries
	CDCFlowStateQuery  = "q-cdc-flow-state"
	QRepFlowStateQuery = "q-qrep-flow-state"
)

type WatchWriter

type WatchWriter struct {
	// contains filtered or unexported fields
}

func NewWatchWriter

func NewWatchWriter(w io.Writer, size *atomic.Int64) *WatchWriter

func (*WatchWriter) Write

func (w *WatchWriter) Write(p []byte) (int, error)

Directories

Path Synopsis
This is in reference to PostgreSQL's hstore: https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c
This is in reference to PostgreSQL's hstore: https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL