Documentation
¶
Overview ¶
Package utils provides common utilities and data structures for pg_flo.
Package utils provides common utilities and data structures for pg_flo.
Package utils provides common utilities and data structures for pg_flo.
Index ¶
- Variables
- func OIDToString(oid uint32) string
- func ParseTimestamp(value string) (time.Time, error)
- func StringToOID(typeName string) uint32
- func ToBool(v interface{}) (bool, bool)
- func ToFloat64(v interface{}) (float64, bool)
- func ToInt64(v interface{}) (int64, bool)
- func WithRetry(ctx context.Context, cfg RetryConfig, operation func() error) error
- type CDCMessage
- func (m *CDCMessage) GetColumnIndex(columnName string) int
- func (m *CDCMessage) GetColumnValue(columnName string, useOldValues bool) (interface{}, error)
- func (m *CDCMessage) IsColumnToasted(columnName string) bool
- func (m *CDCMessage) RemoveColumn(columnName string) error
- func (m *CDCMessage) SetColumnValue(columnName string, value interface{}) error
- type ColumnNotFoundError
- type LogEvent
- type Logger
- type OperationType
- type PostgreSQLTypeConverter
- type ReplicationKey
- type ReplicationKeyType
- type RetryConfig
- type ZerologLogEvent
- func (e *ZerologLogEvent) Any(key string, val interface{}) LogEvent
- func (e *ZerologLogEvent) Err(err error) LogEvent
- func (e *ZerologLogEvent) Int(key string, val int) LogEvent
- func (e *ZerologLogEvent) Int64(key string, val int64) LogEvent
- func (e *ZerologLogEvent) Interface(key string, val interface{}) LogEvent
- func (e *ZerologLogEvent) Msg(msg string)
- func (e *ZerologLogEvent) Msgf(format string, v ...interface{})
- func (e *ZerologLogEvent) Str(key, val string) LogEvent
- func (e *ZerologLogEvent) Strs(key string, vals []string) LogEvent
- func (e *ZerologLogEvent) Type(key string, val interface{}) LogEvent
- func (e *ZerologLogEvent) Uint32(key string, val uint32) LogEvent
- func (e *ZerologLogEvent) Uint8(key string, val uint8) LogEvent
- type ZerologLogger
Constants ¶
This section is empty.
Variables ¶
var GlobalPostgreSQLTypeConverter = NewPostgreSQLTypeConverter()
GlobalPostgreSQLTypeConverter is a global instance for easy access
var OidToTypeName = map[uint32]string{ pgtype.BoolOID: "bool", pgtype.ByteaOID: "bytea", pgtype.Int8OID: "int8", pgtype.Int2OID: "int2", pgtype.Int4OID: "int4", pgtype.TextOID: "text", pgtype.JSONOID: "json", pgtype.Float4OID: "float4", pgtype.Float8OID: "float8", pgtype.BoolArrayOID: "bool[]", pgtype.Int2ArrayOID: "int2[]", pgtype.Int4ArrayOID: "int4[]", pgtype.TextArrayOID: "text[]", pgtype.ByteaArrayOID: "bytea[]", pgtype.Int8ArrayOID: "int8[]", pgtype.Float4ArrayOID: "float4[]", pgtype.Float8ArrayOID: "float8[]", pgtype.BPCharOID: "bpchar", pgtype.VarcharOID: "varchar", pgtype.DateOID: "date", pgtype.TimeOID: "time", pgtype.TimestampOID: "timestamp", pgtype.TimestampArrayOID: "timestamp[]", pgtype.DateArrayOID: "date[]", pgtype.TimestamptzOID: "timestamptz", pgtype.TimestamptzArrayOID: "timestamptz[]", pgtype.IntervalOID: "interval", pgtype.NumericArrayOID: "numeric[]", pgtype.BitOID: "bit", pgtype.VarbitOID: "varbit", pgtype.NumericOID: "numeric", pgtype.UUIDOID: "uuid", pgtype.UUIDArrayOID: "uuid[]", pgtype.JSONBOID: "jsonb", pgtype.JSONBArrayOID: "jsonb[]", }
OidToTypeName maps PostgreSQL OIDs to their corresponding type names
Functions ¶
func OIDToString ¶
OIDToString converts a PostgreSQL OID to its string representation
func ParseTimestamp ¶
ParseTimestamp attempts to parse a timestamp string using multiple layouts
func StringToOID ¶
StringToOID converts a type name to its PostgreSQL OID
Types ¶
type CDCMessage ¶
type CDCMessage struct {
Type OperationType
Schema string
Table string
Columns []*pglogrepl.RelationMessageColumn
NewTuple *pglogrepl.TupleData // For WAL messages
OldTuple *pglogrepl.TupleData // For WAL messages
CopyData [][]byte // For COPY messages
ReplicationKey ReplicationKey
LSN string
EmittedAt time.Time
ToastedColumns map[string]bool
}
CDCMessage represents a full message for Change Data Capture
func (*CDCMessage) GetColumnIndex ¶
func (m *CDCMessage) GetColumnIndex(columnName string) int
GetColumnIndex returns the index of a column by name, or -1 if not found
func (*CDCMessage) GetColumnValue ¶
func (m *CDCMessage) GetColumnValue(columnName string, useOldValues bool) (interface{}, error)
GetColumnValue gets a column value, optionally using old values for DELETE/UPDATE
func (*CDCMessage) IsColumnToasted ¶
func (m *CDCMessage) IsColumnToasted(columnName string) bool
IsColumnToasted checks if a column was TOASTed
func (*CDCMessage) RemoveColumn ¶ added in v0.0.14
func (m *CDCMessage) RemoveColumn(columnName string) error
RemoveColumn removes a column from the message
func (*CDCMessage) SetColumnValue ¶
func (m *CDCMessage) SetColumnValue(columnName string, value interface{}) error
SetColumnValue sets the value of a column (only used by transform rules)
type ColumnNotFoundError ¶ added in v0.0.15
type ColumnNotFoundError struct {
ColumnName string
}
ColumnNotFoundError is returned when a requested column is not found in the CDC message
func (ColumnNotFoundError) Error ¶ added in v0.0.15
func (e ColumnNotFoundError) Error() string
type LogEvent ¶ added in v0.0.12
type LogEvent interface {
Str(key, val string) LogEvent
Int(key string, val int) LogEvent
Int64(key string, val int64) LogEvent
Uint8(key string, val uint8) LogEvent
Uint32(key string, val uint32) LogEvent
Interface(key string, val interface{}) LogEvent
Err(err error) LogEvent
Strs(key string, vals []string) LogEvent
Any(key string, val interface{}) LogEvent
Type(key string, val interface{}) LogEvent
Msg(msg string)
Msgf(format string, v ...interface{})
}
LogEvent defines the interface for individual log events
type Logger ¶ added in v0.0.12
type Logger interface {
Debug() LogEvent
Info() LogEvent
Warn() LogEvent
Error() LogEvent
Err(err error) LogEvent
}
Logger defines the interface for logging operations
func NewZerologLogger ¶ added in v0.0.12
NewZerologLogger creates a new ZerologLogger instance
type OperationType ¶
type OperationType string
OperationType represents the type of database operation
const ( // OperationInsert represents an INSERT database operation OperationInsert OperationType = "INSERT" // OperationUpdate represents an UPDATE database operation OperationUpdate OperationType = "UPDATE" // OperationDelete represents a DELETE database operation OperationDelete OperationType = "DELETE" // OperationDDL represents a DDL (Data Definition Language) database operation OperationDDL OperationType = "DDL" )
type PostgreSQLTypeConverter ¶ added in v0.0.15
type PostgreSQLTypeConverter struct {
// contains filtered or unexported fields
}
PostgreSQLTypeConverter handles PostgreSQL data type conversions using pgx
func NewPostgreSQLTypeConverter ¶ added in v0.0.15
func NewPostgreSQLTypeConverter() *PostgreSQLTypeConverter
NewPostgreSQLTypeConverter creates a new PostgreSQLTypeConverter
func (*PostgreSQLTypeConverter) DecodePostgreSQLValue ¶ added in v0.0.15
func (c *PostgreSQLTypeConverter) DecodePostgreSQLValue(data []byte, dataTypeOID uint32, format int16) (interface{}, error)
DecodePostgreSQLValue decodes PostgreSQL binary data using pgx natively
func (*PostgreSQLTypeConverter) EncodePostgreSQLValue ¶ added in v0.0.15
func (c *PostgreSQLTypeConverter) EncodePostgreSQLValue(value interface{}, dataTypeOID uint32) ([]byte, error)
EncodePostgreSQLValue encodes a Go value to PostgreSQL text format using pgx
type ReplicationKey ¶
type ReplicationKey struct {
Type ReplicationKeyType
Columns []string
}
ReplicationKey represents a key used for replication (either PK or unique constraint)
func (*ReplicationKey) IsValid ¶
func (rk *ReplicationKey) IsValid() bool
IsValid checks if the replication key is properly configured
type ReplicationKeyType ¶
type ReplicationKeyType string
ReplicationKeyType represents the type of replication key
const ( // ReplicationKeyPK represents a primary key replication identifier ReplicationKeyPK ReplicationKeyType = "PRIMARY KEY" // ReplicationKeyUnique represents a unique constraint replication identifier ReplicationKeyUnique ReplicationKeyType = "UNIQUE" // ReplicationKeyFull represents a full table replication identifier (replica identity full) ReplicationKeyFull ReplicationKeyType = "FULL" )
type RetryConfig ¶
RetryConfig defines configuration for retry operations
type ZerologLogEvent ¶ added in v0.0.12
type ZerologLogEvent struct {
// contains filtered or unexported fields
}
ZerologLogEvent implements the LogEvent interface using zerolog
func (*ZerologLogEvent) Any ¶ added in v0.0.12
func (e *ZerologLogEvent) Any(key string, val interface{}) LogEvent
Any adds an arbitrary field to the log event
func (*ZerologLogEvent) Err ¶ added in v0.0.12
func (e *ZerologLogEvent) Err(err error) LogEvent
Err adds an error field to the log event
func (*ZerologLogEvent) Int ¶ added in v0.0.12
func (e *ZerologLogEvent) Int(key string, val int) LogEvent
Int adds an integer field to the log event
func (*ZerologLogEvent) Int64 ¶ added in v0.0.12
func (e *ZerologLogEvent) Int64(key string, val int64) LogEvent
Int64 adds a 64-bit integer field to the log event
func (*ZerologLogEvent) Interface ¶ added in v0.0.12
func (e *ZerologLogEvent) Interface(key string, val interface{}) LogEvent
Interface adds an interface field to the log event
func (*ZerologLogEvent) Msg ¶ added in v0.0.12
func (e *ZerologLogEvent) Msg(msg string)
Msg sets the message for the log event and emits it
func (*ZerologLogEvent) Msgf ¶ added in v0.0.12
func (e *ZerologLogEvent) Msgf(format string, v ...interface{})
Msgf sets a formatted message for the log event and emits it
func (*ZerologLogEvent) Str ¶ added in v0.0.12
func (e *ZerologLogEvent) Str(key, val string) LogEvent
Str adds a string field to the log event
func (*ZerologLogEvent) Strs ¶ added in v0.0.12
func (e *ZerologLogEvent) Strs(key string, vals []string) LogEvent
Strs adds a string slice field to the log event
func (*ZerologLogEvent) Type ¶ added in v0.0.12
func (e *ZerologLogEvent) Type(key string, val interface{}) LogEvent
Type adds a type field to the log event
type ZerologLogger ¶ added in v0.0.12
type ZerologLogger struct {
// contains filtered or unexported fields
}
ZerologLogger implements the Logger interface using zerolog
func (*ZerologLogger) Debug ¶ added in v0.0.12
func (z *ZerologLogger) Debug() LogEvent
Debug creates a debug-level log event
func (*ZerologLogger) Err ¶ added in v0.0.12
func (z *ZerologLogger) Err(err error) LogEvent
Err creates an error-level log event with the given error
func (*ZerologLogger) Error ¶ added in v0.0.12
func (z *ZerologLogger) Error() LogEvent
func (*ZerologLogger) Info ¶ added in v0.0.12
func (z *ZerologLogger) Info() LogEvent
Info creates an info-level log event
func (*ZerologLogger) Warn ¶ added in v0.0.12
func (z *ZerologLogger) Warn() LogEvent
Warn creates a warning-level log event