utils

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var OidToTypeName = map[uint32]string{
	pgtype.BoolOID:             "bool",
	pgtype.ByteaOID:            "bytea",
	pgtype.Int8OID:             "int8",
	pgtype.Int2OID:             "int2",
	pgtype.Int4OID:             "int4",
	pgtype.TextOID:             "text",
	pgtype.JSONOID:             "json",
	pgtype.Float4OID:           "float4",
	pgtype.Float8OID:           "float8",
	pgtype.BoolArrayOID:        "bool[]",
	pgtype.Int2ArrayOID:        "int2[]",
	pgtype.Int4ArrayOID:        "int4[]",
	pgtype.TextArrayOID:        "text[]",
	pgtype.ByteaArrayOID:       "bytea[]",
	pgtype.Int8ArrayOID:        "int8[]",
	pgtype.Float4ArrayOID:      "float4[]",
	pgtype.Float8ArrayOID:      "float8[]",
	pgtype.BPCharOID:           "bpchar",
	pgtype.VarcharOID:          "varchar",
	pgtype.DateOID:             "date",
	pgtype.TimeOID:             "time",
	pgtype.TimestampOID:        "timestamp",
	pgtype.TimestampArrayOID:   "timestamp[]",
	pgtype.DateArrayOID:        "date[]",
	pgtype.TimestamptzOID:      "timestamptz",
	pgtype.TimestamptzArrayOID: "timestamptz[]",
	pgtype.IntervalOID:         "interval",
	pgtype.NumericArrayOID:     "numeric[]",
	pgtype.BitOID:              "bit",
	pgtype.VarbitOID:           "varbit",
	pgtype.NumericOID:          "numeric",
	pgtype.UUIDOID:             "uuid",
	pgtype.UUIDArrayOID:        "uuid[]",
	pgtype.JSONBOID:            "jsonb",
	pgtype.JSONBArrayOID:       "jsonb[]",
}

OidToTypeName maps PostgreSQL OIDs to their corresponding type names

Functions

func ConvertToPgCompatibleOutput

func ConvertToPgCompatibleOutput(value interface{}, oid uint32) ([]byte, error)

ConvertToPgCompatibleOutput converts a Go value to its PostgreSQL output format.

func DecodeArray

func DecodeArray(data []byte, dataType uint32) (interface{}, error)

DecodeArray decodes a PostgreSQL array into a slice of the appropriate type

func DecodeTextArray

func DecodeTextArray(data []byte) ([]string, error)

DecodeTextArray decodes a PostgreSQL text array into a []string

func DecodeValue

func DecodeValue(data []byte, dataType uint32) (interface{}, error)

DecodeValue decodes a byte slice into a Go value based on the PostgreSQL data type

func EncodeArray

func EncodeArray(value interface{}) ([]byte, error)

EncodeArray encodes a slice of values into a PostgreSQL array format.

func EncodeCDCMessage

func EncodeCDCMessage(m CDCMessage) ([]byte, error)

EncodeCDCMessage encodes a CDCMessage into a byte slice

func EncodeValue

func EncodeValue(value interface{}, dataType uint32) ([]byte, error)

EncodeValue encodes a Go value into a byte slice based on the PostgreSQL data type

func OIDToString

func OIDToString(oid uint32) string

OIDToString converts a PostgreSQL OID to its string representation

func ParseTimestamp

func ParseTimestamp(value string) (time.Time, error)

ParseTimestamp attempts to parse a timestamp string using multiple layouts

func StringToOID

func StringToOID(typeName string) uint32

StringToOID converts a type name to its PostgreSQL OID

func ToBool

func ToBool(v interface{}) (bool, bool)

ToBool converts various types to bool

func ToFloat64

func ToFloat64(v interface{}) (float64, bool)

ToFloat64 converts an interface{} to float64

func ToInt64

func ToInt64(v interface{}) (int64, bool)

ToInt64 converts an interface{} to int64

func WithRetry

func WithRetry(ctx context.Context, cfg RetryConfig, operation func() error) error

Types

type CDCMessage

type CDCMessage struct {
	Type           OperationType
	Schema         string
	Table          string
	Columns        []*pglogrepl.RelationMessageColumn
	NewTuple       *pglogrepl.TupleData
	OldTuple       *pglogrepl.TupleData
	ReplicationKey ReplicationKey
	LSN            string
	EmittedAt      time.Time
	ToastedColumns map[string]bool
}

CDCMessage represents a full message for Change Data Capture

func DecodeCDCMessage

func DecodeCDCMessage(data []byte) (*CDCMessage, error)

DecodeCDCMessage decodes a byte slice into a CDCMessage

func (*CDCMessage) GetColumnIndex

func (m *CDCMessage) GetColumnIndex(columnName string) int

func (*CDCMessage) GetColumnValue

func (m *CDCMessage) GetColumnValue(columnName string, useOldValues bool) (interface{}, error)

GetColumnValue gets a column value, optionally using old values for DELETE/UPDATE

func (*CDCMessage) IsColumnToasted

func (m *CDCMessage) IsColumnToasted(columnName string) bool

IsColumnToasted checks if a column was TOASTed

func (CDCMessage) MarshalBinary

func (m CDCMessage) MarshalBinary() ([]byte, error)

MarshalBinary implements the encoding.BinaryMarshaler interface

func (*CDCMessage) SetColumnValue

func (m *CDCMessage) SetColumnValue(columnName string, value interface{}) error

SetColumnValue sets the value of a column, respecting its type

func (*CDCMessage) UnmarshalBinary

func (m *CDCMessage) UnmarshalBinary(data []byte) error

UnmarshalBinary implements the encoding.BinaryUnmarshaler interface

type LogEvent added in v0.0.12

type LogEvent interface {
	Str(key, val string) LogEvent
	Int(key string, val int) LogEvent
	Int64(key string, val int64) LogEvent
	Uint8(key string, val uint8) LogEvent
	Uint32(key string, val uint32) LogEvent
	Interface(key string, val interface{}) LogEvent
	Err(err error) LogEvent
	Strs(key string, vals []string) LogEvent
	Any(key string, val interface{}) LogEvent
	Type(key string, val interface{}) LogEvent
	Msg(msg string)
	Msgf(format string, v ...interface{})
}

type Logger added in v0.0.12

type Logger interface {
	Debug() LogEvent
	Info() LogEvent
	Warn() LogEvent
	Error() LogEvent
	Err(err error) LogEvent
}

func NewZerologLogger added in v0.0.12

func NewZerologLogger(logger zerolog.Logger) Logger

type OperationType

type OperationType string

OperationType represents the type of database operation

const (
	OperationInsert OperationType = "INSERT"
	OperationUpdate OperationType = "UPDATE"
	OperationDelete OperationType = "DELETE"
	OperationDDL    OperationType = "DDL"
)

type ReplicationKey

type ReplicationKey struct {
	Type    ReplicationKeyType
	Columns []string
}

ReplicationKey represents a key used for replication (either PK or unique constraint)

func (*ReplicationKey) IsValid

func (rk *ReplicationKey) IsValid() bool

IsValid checks if the replication key is properly configured

type ReplicationKeyType

type ReplicationKeyType string

ReplicationKeyType represents the type of replication key

const (
	ReplicationKeyPK     ReplicationKeyType = "PRIMARY KEY"
	ReplicationKeyUnique ReplicationKeyType = "UNIQUE"
	ReplicationKeyFull   ReplicationKeyType = "FULL" // Replica identity full
)

type RetryConfig

type RetryConfig struct {
	MaxAttempts int
	InitialWait time.Duration
	MaxWait     time.Duration
}

type ZerologLogEvent added in v0.0.12

type ZerologLogEvent struct {
	// contains filtered or unexported fields
}

func (*ZerologLogEvent) Any added in v0.0.12

func (e *ZerologLogEvent) Any(key string, val interface{}) LogEvent

func (*ZerologLogEvent) Err added in v0.0.12

func (e *ZerologLogEvent) Err(err error) LogEvent

func (*ZerologLogEvent) Int added in v0.0.12

func (e *ZerologLogEvent) Int(key string, val int) LogEvent

func (*ZerologLogEvent) Int64 added in v0.0.12

func (e *ZerologLogEvent) Int64(key string, val int64) LogEvent

func (*ZerologLogEvent) Interface added in v0.0.12

func (e *ZerologLogEvent) Interface(key string, val interface{}) LogEvent

func (*ZerologLogEvent) Msg added in v0.0.12

func (e *ZerologLogEvent) Msg(msg string)

func (*ZerologLogEvent) Msgf added in v0.0.12

func (e *ZerologLogEvent) Msgf(format string, v ...interface{})

func (*ZerologLogEvent) Str added in v0.0.12

func (e *ZerologLogEvent) Str(key, val string) LogEvent

func (*ZerologLogEvent) Strs added in v0.0.12

func (e *ZerologLogEvent) Strs(key string, vals []string) LogEvent

func (*ZerologLogEvent) Type added in v0.0.12

func (e *ZerologLogEvent) Type(key string, val interface{}) LogEvent

func (*ZerologLogEvent) Uint32 added in v0.0.12

func (e *ZerologLogEvent) Uint32(key string, val uint32) LogEvent

func (*ZerologLogEvent) Uint8 added in v0.0.12

func (e *ZerologLogEvent) Uint8(key string, val uint8) LogEvent

type ZerologLogger added in v0.0.12

type ZerologLogger struct {
	// contains filtered or unexported fields
}

func (*ZerologLogger) Debug added in v0.0.12

func (z *ZerologLogger) Debug() LogEvent

func (*ZerologLogger) Err added in v0.0.12

func (z *ZerologLogger) Err(err error) LogEvent

func (*ZerologLogger) Error added in v0.0.12

func (z *ZerologLogger) Error() LogEvent

func (*ZerologLogger) Info added in v0.0.12

func (z *ZerologLogger) Info() LogEvent

func (*ZerologLogger) Warn added in v0.0.12

func (z *ZerologLogger) Warn() LogEvent

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL