utils

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package utils provides common utilities and data structures for pg_flo.

Package utils provides common utilities and data structures for pg_flo.

Package utils provides common utilities and data structures for pg_flo.

Index

Constants

This section is empty.

Variables

View Source
var GlobalPostgreSQLTypeConverter = NewPostgreSQLTypeConverter()

GlobalPostgreSQLTypeConverter is a global instance for easy access

View Source
var OidToTypeName = map[uint32]string{
	pgtype.BoolOID:             "bool",
	pgtype.ByteaOID:            "bytea",
	pgtype.Int8OID:             "int8",
	pgtype.Int2OID:             "int2",
	pgtype.Int4OID:             "int4",
	pgtype.TextOID:             "text",
	pgtype.JSONOID:             "json",
	pgtype.Float4OID:           "float4",
	pgtype.Float8OID:           "float8",
	pgtype.BoolArrayOID:        "bool[]",
	pgtype.Int2ArrayOID:        "int2[]",
	pgtype.Int4ArrayOID:        "int4[]",
	pgtype.TextArrayOID:        "text[]",
	pgtype.ByteaArrayOID:       "bytea[]",
	pgtype.Int8ArrayOID:        "int8[]",
	pgtype.Float4ArrayOID:      "float4[]",
	pgtype.Float8ArrayOID:      "float8[]",
	pgtype.BPCharOID:           "bpchar",
	pgtype.VarcharOID:          "varchar",
	pgtype.DateOID:             "date",
	pgtype.TimeOID:             "time",
	pgtype.TimestampOID:        "timestamp",
	pgtype.TimestampArrayOID:   "timestamp[]",
	pgtype.DateArrayOID:        "date[]",
	pgtype.TimestamptzOID:      "timestamptz",
	pgtype.TimestamptzArrayOID: "timestamptz[]",
	pgtype.IntervalOID:         "interval",
	pgtype.NumericArrayOID:     "numeric[]",
	pgtype.BitOID:              "bit",
	pgtype.VarbitOID:           "varbit",
	pgtype.NumericOID:          "numeric",
	pgtype.UUIDOID:             "uuid",
	pgtype.UUIDArrayOID:        "uuid[]",
	pgtype.JSONBOID:            "jsonb",
	pgtype.JSONBArrayOID:       "jsonb[]",
}

OidToTypeName maps PostgreSQL OIDs to their corresponding type names

Functions

func OIDToString

func OIDToString(oid uint32) string

OIDToString converts a PostgreSQL OID to its string representation

func ParseTimestamp

func ParseTimestamp(value string) (time.Time, error)

ParseTimestamp attempts to parse a timestamp string using multiple layouts

func StringToOID

func StringToOID(typeName string) uint32

StringToOID converts a type name to its PostgreSQL OID

func ToBool

func ToBool(v interface{}) (bool, bool)

ToBool converts various types to bool

func ToFloat64

func ToFloat64(v interface{}) (float64, bool)

ToFloat64 converts an interface{} to float64

func ToInt64

func ToInt64(v interface{}) (int64, bool)

ToInt64 converts an interface{} to int64

func WithRetry

func WithRetry(ctx context.Context, cfg RetryConfig, operation func() error) error

WithRetry executes an operation with retry logic based on the provided configuration

Types

type CDCMessage

type CDCMessage struct {
	Type           OperationType
	Schema         string
	Table          string
	Columns        []*pglogrepl.RelationMessageColumn
	NewTuple       *pglogrepl.TupleData // For WAL messages
	OldTuple       *pglogrepl.TupleData // For WAL messages
	CopyData       [][]byte             // For COPY messages
	ReplicationKey ReplicationKey
	LSN            string
	EmittedAt      time.Time
	ToastedColumns map[string]bool
}

CDCMessage represents a full message for Change Data Capture

func (*CDCMessage) GetColumnIndex

func (m *CDCMessage) GetColumnIndex(columnName string) int

GetColumnIndex returns the index of a column by name, or -1 if not found

func (*CDCMessage) GetColumnValue

func (m *CDCMessage) GetColumnValue(columnName string, useOldValues bool) (interface{}, error)

GetColumnValue gets a column value, optionally using old values for DELETE/UPDATE

func (*CDCMessage) IsColumnToasted

func (m *CDCMessage) IsColumnToasted(columnName string) bool

IsColumnToasted checks if a column was TOASTed

func (*CDCMessage) RemoveColumn added in v0.0.14

func (m *CDCMessage) RemoveColumn(columnName string) error

RemoveColumn removes a column from the message

func (*CDCMessage) SetColumnValue

func (m *CDCMessage) SetColumnValue(columnName string, value interface{}) error

SetColumnValue sets the value of a column (only used by transform rules)

type ColumnNotFoundError added in v0.0.15

type ColumnNotFoundError struct {
	ColumnName string
}

ColumnNotFoundError is returned when a requested column is not found in the CDC message

func (ColumnNotFoundError) Error added in v0.0.15

func (e ColumnNotFoundError) Error() string

type LogEvent added in v0.0.12

type LogEvent interface {
	Str(key, val string) LogEvent
	Int(key string, val int) LogEvent
	Int64(key string, val int64) LogEvent
	Uint8(key string, val uint8) LogEvent
	Uint32(key string, val uint32) LogEvent
	Interface(key string, val interface{}) LogEvent
	Err(err error) LogEvent
	Strs(key string, vals []string) LogEvent
	Any(key string, val interface{}) LogEvent
	Type(key string, val interface{}) LogEvent
	Msg(msg string)
	Msgf(format string, v ...interface{})
}

LogEvent defines the interface for individual log events

type Logger added in v0.0.12

type Logger interface {
	Debug() LogEvent
	Info() LogEvent
	Warn() LogEvent
	Error() LogEvent
	Err(err error) LogEvent
}

Logger defines the interface for logging operations

func NewZerologLogger added in v0.0.12

func NewZerologLogger(logger zerolog.Logger) Logger

NewZerologLogger creates a new ZerologLogger instance

type OperationType

type OperationType string

OperationType represents the type of database operation

const (
	// OperationInsert represents an INSERT database operation
	OperationInsert OperationType = "INSERT"
	// OperationUpdate represents an UPDATE database operation
	OperationUpdate OperationType = "UPDATE"
	// OperationDelete represents a DELETE database operation
	OperationDelete OperationType = "DELETE"
	// OperationDDL represents a DDL (Data Definition Language) database operation
	OperationDDL OperationType = "DDL"
)

type PostgreSQLTypeConverter added in v0.0.15

type PostgreSQLTypeConverter struct {
	// contains filtered or unexported fields
}

PostgreSQLTypeConverter handles PostgreSQL data type conversions using pgx

func NewPostgreSQLTypeConverter added in v0.0.15

func NewPostgreSQLTypeConverter() *PostgreSQLTypeConverter

NewPostgreSQLTypeConverter creates a new PostgreSQLTypeConverter

func (*PostgreSQLTypeConverter) DecodePostgreSQLValue added in v0.0.15

func (c *PostgreSQLTypeConverter) DecodePostgreSQLValue(data []byte, dataTypeOID uint32, format int16) (interface{}, error)

DecodePostgreSQLValue decodes PostgreSQL binary data using pgx natively

func (*PostgreSQLTypeConverter) EncodePostgreSQLValue added in v0.0.15

func (c *PostgreSQLTypeConverter) EncodePostgreSQLValue(value interface{}, dataTypeOID uint32) ([]byte, error)

EncodePostgreSQLValue encodes a Go value to PostgreSQL text format using pgx

type ReplicationKey

type ReplicationKey struct {
	Type    ReplicationKeyType
	Columns []string
}

ReplicationKey represents a key used for replication (either PK or unique constraint)

func (*ReplicationKey) IsValid

func (rk *ReplicationKey) IsValid() bool

IsValid checks if the replication key is properly configured

type ReplicationKeyType

type ReplicationKeyType string

ReplicationKeyType represents the type of replication key

const (
	// ReplicationKeyPK represents a primary key replication identifier
	ReplicationKeyPK ReplicationKeyType = "PRIMARY KEY"
	// ReplicationKeyUnique represents a unique constraint replication identifier
	ReplicationKeyUnique ReplicationKeyType = "UNIQUE"
	// ReplicationKeyFull represents a full table replication identifier (replica identity full)
	ReplicationKeyFull ReplicationKeyType = "FULL"
)

type RetryConfig

type RetryConfig struct {
	MaxAttempts int
	InitialWait time.Duration
	MaxWait     time.Duration
}

RetryConfig defines configuration for retry operations

type ZerologLogEvent added in v0.0.12

type ZerologLogEvent struct {
	// contains filtered or unexported fields
}

ZerologLogEvent implements the LogEvent interface using zerolog

func (*ZerologLogEvent) Any added in v0.0.12

func (e *ZerologLogEvent) Any(key string, val interface{}) LogEvent

Any adds an arbitrary field to the log event

func (*ZerologLogEvent) Err added in v0.0.12

func (e *ZerologLogEvent) Err(err error) LogEvent

Err adds an error field to the log event

func (*ZerologLogEvent) Int added in v0.0.12

func (e *ZerologLogEvent) Int(key string, val int) LogEvent

Int adds an integer field to the log event

func (*ZerologLogEvent) Int64 added in v0.0.12

func (e *ZerologLogEvent) Int64(key string, val int64) LogEvent

Int64 adds a 64-bit integer field to the log event

func (*ZerologLogEvent) Interface added in v0.0.12

func (e *ZerologLogEvent) Interface(key string, val interface{}) LogEvent

Interface adds an interface field to the log event

func (*ZerologLogEvent) Msg added in v0.0.12

func (e *ZerologLogEvent) Msg(msg string)

Msg sets the message for the log event and emits it

func (*ZerologLogEvent) Msgf added in v0.0.12

func (e *ZerologLogEvent) Msgf(format string, v ...interface{})

Msgf sets a formatted message for the log event and emits it

func (*ZerologLogEvent) Str added in v0.0.12

func (e *ZerologLogEvent) Str(key, val string) LogEvent

Str adds a string field to the log event

func (*ZerologLogEvent) Strs added in v0.0.12

func (e *ZerologLogEvent) Strs(key string, vals []string) LogEvent

Strs adds a string slice field to the log event

func (*ZerologLogEvent) Type added in v0.0.12

func (e *ZerologLogEvent) Type(key string, val interface{}) LogEvent

Type adds a type field to the log event

func (*ZerologLogEvent) Uint32 added in v0.0.12

func (e *ZerologLogEvent) Uint32(key string, val uint32) LogEvent

Uint32 adds a 32-bit unsigned integer field to the log event

func (*ZerologLogEvent) Uint8 added in v0.0.12

func (e *ZerologLogEvent) Uint8(key string, val uint8) LogEvent

Uint8 adds an 8-bit unsigned integer field to the log event

type ZerologLogger added in v0.0.12

type ZerologLogger struct {
	// contains filtered or unexported fields
}

ZerologLogger implements the Logger interface using zerolog

func (*ZerologLogger) Debug added in v0.0.12

func (z *ZerologLogger) Debug() LogEvent

Debug creates a debug-level log event

func (*ZerologLogger) Err added in v0.0.12

func (z *ZerologLogger) Err(err error) LogEvent

Err creates an error-level log event with the given error

func (*ZerologLogger) Error added in v0.0.12

func (z *ZerologLogger) Error() LogEvent

func (*ZerologLogger) Info added in v0.0.12

func (z *ZerologLogger) Info() LogEvent

Info creates an info-level log event

func (*ZerologLogger) Warn added in v0.0.12

func (z *ZerologLogger) Warn() LogEvent

Warn creates a warning-level log event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL