replication

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultGroupCount       = 10
	DefaultRetryMaxAttempts = 10
)

Variables

View Source
var IgnoreMessageError = errors.New("ignore this message and continue replication")

Functions

func Initialize

func Initialize(replications []*Config) error

func SendToHttpDestination

func SendToHttpDestination(spec *DestinationSpec, change *Change) error

func SendToSqsDestination

func SendToSqsDestination(spec *DestinationSpec, change *Change) error

func StartReplication

func StartReplication(c *Config) error

Types

type Action

type Action = string
const ActionDelete Action = "delete"
const ActionInsert Action = "insert"
const ActionUpdate Action = "update"

type Change

type Change struct {
	Action       Action        `json:"kind"`
	Schema       string        `json:"schema"`
	Table        string        `json:"table"`
	ColumnNames  []string      `json:"columnnames"`
	ColumnTypes  []string      `json:"columntypes"`
	ColumnValues []interface{} `json:"columnvalues"`
	OldKeys      *OldKeys      `json:"oldkeys,omitempty"`
}

type Config

type Config struct {
	LogLevel                    zapcore.Level   `cpln:"default:info;mapper:ZapLogLevelMapper"`
	Host                        string          `json:"-"`
	Port                        string          `json:"-"`
	User                        string          `json:"-"`
	Database                    string          `json:"-"`
	Password                    string          `json:"-"`
	Slot                        string          `json:"slot"`
	WalAcknowledgementFrequency time.Duration   `cpln:"default:30s" json:"-"`
	Destinations                DestinationList `json:"destinations"`
}

func (Config) Tables

func (c Config) Tables() []string

type DestinationKind

type DestinationKind string
const (
	DestinationKindSQS  DestinationKind = "sqs"
	DestinationKindHTTP DestinationKind = "http"
)

type DestinationList

type DestinationList []*DestinationSpec

func (DestinationList) HandleChange

func (l DestinationList) HandleChange(change *Change) error

func (DestinationList) Tables

func (l DestinationList) Tables() []string

type DestinationSpec

type DestinationSpec struct {
	Map        `json:",inline"`
	Kind       DestinationKind `json:"kind"`
	Parameters map[string]any  `json:"parameters"`
}

func (*DestinationSpec) HandleChange

func (s *DestinationSpec) HandleChange(change *Change) error

type Map

type Map struct {
	Name     string            `json:"name"`
	Table    string            `json:"table"`
	FieldMap map[string]string `json:"fieldMap"`
	Fields   []string          `json:"fields"`
	Actions  []Action          `json:"actions"`
}

func (*Map) Match

func (m *Map) Match(c *Change) bool

func (*Map) Message

func (m *Map) Message(c *Change) *Message

type Mapping

type Mapping struct {
	Maps []Map `json:"maps"`
}

type Message

type Message struct {
	Id        string         `json:"id"`
	Created   string         `json:"created"`
	Delivered string         `json:"delivered"`
	Payload   map[string]any `json:"payload" gorm:"type:jsonb"`
}

type OldKeys

type OldKeys struct {
	KeyNames  []string      `json:"keynames"`
	KeyTypes  []string      `json:"keytypes"`
	KeyValues []interface{} `json:"keyvalues"`
}

type WalData

type WalData struct {
	Changes []*Change `json:"change"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL