Documentation
¶
Index ¶
- Variables
- func ConvertToPgCompatibleOutput(value interface{}, oid uint32) ([]byte, error)
- func DecodeArray(data []byte, dataType uint32) (interface{}, error)
- func DecodeTextArray(data []byte) ([]string, error)
- func DecodeValue(data []byte, dataType uint32) (interface{}, error)
- func EncodeArray(value interface{}) ([]byte, error)
- func EncodeCDCMessage(m CDCMessage) ([]byte, error)
- func EncodeValue(value interface{}, dataType uint32) ([]byte, error)
- func OIDToString(oid uint32) string
- func ParseTimestamp(value string) (time.Time, error)
- func StringToOID(typeName string) uint32
- func ToBool(v interface{}) (bool, bool)
- func ToFloat64(v interface{}) (float64, bool)
- func ToInt64(v interface{}) (int64, bool)
- func WithRetry(ctx context.Context, cfg RetryConfig, operation func() error) error
- type CDCMessage
- func (m *CDCMessage) GetColumnIndex(columnName string) int
- func (m *CDCMessage) GetColumnValue(columnName string, useOldValues bool) (interface{}, error)
- func (m *CDCMessage) IsColumnToasted(columnName string) bool
- func (m CDCMessage) MarshalBinary() ([]byte, error)
- func (m *CDCMessage) SetColumnValue(columnName string, value interface{}) error
- func (m *CDCMessage) UnmarshalBinary(data []byte) error
- type OperationType
- type ReplicationKey
- type ReplicationKeyType
- type RetryConfig
Constants ¶
This section is empty.
Variables ¶
var OidToTypeName = map[uint32]string{ pgtype.BoolOID: "bool", pgtype.ByteaOID: "bytea", pgtype.Int8OID: "int8", pgtype.Int2OID: "int2", pgtype.Int4OID: "int4", pgtype.TextOID: "text", pgtype.JSONOID: "json", pgtype.Float4OID: "float4", pgtype.Float8OID: "float8", pgtype.BoolArrayOID: "bool[]", pgtype.Int2ArrayOID: "int2[]", pgtype.Int4ArrayOID: "int4[]", pgtype.TextArrayOID: "text[]", pgtype.ByteaArrayOID: "bytea[]", pgtype.Int8ArrayOID: "int8[]", pgtype.Float4ArrayOID: "float4[]", pgtype.Float8ArrayOID: "float8[]", pgtype.BPCharOID: "bpchar", pgtype.VarcharOID: "varchar", pgtype.DateOID: "date", pgtype.TimeOID: "time", pgtype.TimestampOID: "timestamp", pgtype.TimestampArrayOID: "timestamp[]", pgtype.DateArrayOID: "date[]", pgtype.TimestamptzOID: "timestamptz", pgtype.TimestamptzArrayOID: "timestamptz[]", pgtype.IntervalOID: "interval", pgtype.NumericArrayOID: "numeric[]", pgtype.BitOID: "bit", pgtype.VarbitOID: "varbit", pgtype.NumericOID: "numeric", pgtype.UUIDOID: "uuid", pgtype.UUIDArrayOID: "uuid[]", pgtype.JSONBOID: "jsonb", pgtype.JSONBArrayOID: "jsonb[]", }
OidToTypeName maps PostgreSQL OIDs to their corresponding type names
Functions ¶
func ConvertToPgCompatibleOutput ¶
ConvertToPgCompatibleOutput converts a Go value to its PostgreSQL output format.
func DecodeArray ¶
DecodeArray decodes a PostgreSQL array into a slice of the appropriate type
func DecodeTextArray ¶
DecodeTextArray decodes a PostgreSQL text array into a []string
func DecodeValue ¶
DecodeValue decodes a byte slice into a Go value based on the PostgreSQL data type
func EncodeArray ¶
EncodeArray encodes a slice of values into a PostgreSQL array format.
func EncodeCDCMessage ¶
func EncodeCDCMessage(m CDCMessage) ([]byte, error)
EncodeCDCMessage encodes a CDCMessage into a byte slice
func EncodeValue ¶
EncodeValue encodes a Go value into a byte slice based on the PostgreSQL data type
func OIDToString ¶
OIDToString converts a PostgreSQL OID to its string representation
func ParseTimestamp ¶
ParseTimestamp attempts to parse a timestamp string using multiple layouts
func StringToOID ¶
StringToOID converts a type name to its PostgreSQL OID
Types ¶
type CDCMessage ¶
type CDCMessage struct {
Type OperationType
Schema string
Table string
Columns []*pglogrepl.RelationMessageColumn
NewTuple *pglogrepl.TupleData
OldTuple *pglogrepl.TupleData
ReplicationKey ReplicationKey
LSN string
EmittedAt time.Time
ToastedColumns map[string]bool
}
CDCMessage represents a full message for Change Data Capture
func DecodeCDCMessage ¶
func DecodeCDCMessage(data []byte) (*CDCMessage, error)
DecodeCDCMessage decodes a byte slice into a CDCMessage
func (*CDCMessage) GetColumnIndex ¶
func (m *CDCMessage) GetColumnIndex(columnName string) int
func (*CDCMessage) GetColumnValue ¶
func (m *CDCMessage) GetColumnValue(columnName string, useOldValues bool) (interface{}, error)
GetColumnValue gets a column value, optionally using old values for DELETE/UPDATE
func (*CDCMessage) IsColumnToasted ¶
func (m *CDCMessage) IsColumnToasted(columnName string) bool
IsColumnToasted checks if a column was TOASTed
func (CDCMessage) MarshalBinary ¶
func (m CDCMessage) MarshalBinary() ([]byte, error)
MarshalBinary implements the encoding.BinaryMarshaler interface
func (*CDCMessage) SetColumnValue ¶
func (m *CDCMessage) SetColumnValue(columnName string, value interface{}) error
SetColumnValue sets the value of a column, respecting its type
func (*CDCMessage) UnmarshalBinary ¶
func (m *CDCMessage) UnmarshalBinary(data []byte) error
UnmarshalBinary implements the encoding.BinaryUnmarshaler interface
type OperationType ¶
type OperationType string
OperationType represents the type of database operation
const ( OperationInsert OperationType = "INSERT" OperationUpdate OperationType = "UPDATE" OperationDelete OperationType = "DELETE" OperationDDL OperationType = "DDL" )
type ReplicationKey ¶
type ReplicationKey struct {
Type ReplicationKeyType
Columns []string
}
ReplicationKey represents a key used for replication (either PK or unique constraint)
func (*ReplicationKey) IsValid ¶
func (rk *ReplicationKey) IsValid() bool
IsValid checks if the replication key is properly configured
type ReplicationKeyType ¶
type ReplicationKeyType string
ReplicationKeyType represents the type of replication key
const ( ReplicationKeyPK ReplicationKeyType = "PRIMARY KEY" ReplicationKeyUnique ReplicationKeyType = "UNIQUE" ReplicationKeyFull ReplicationKeyType = "FULL" // Replica identity full )