wal

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const DDLPrefix = "pgstream.ddl"
View Source
const LogicalMessageAction = "M"
View Source
const ZeroLSN = "0/0"

Variables

View Source
var (
	ErrNotDDLEvent            = fmt.Errorf("not a DDL event")
	ErrInvalidDDLEventContent = errors.New("invalid DDL event content")
)

Functions

This section is empty.

Types

type Column

type Column struct {
	// ID is a pgstream assigned immutable column id. Id does not change when column is renamed.
	ID    string `json:"id"`
	Name  string `json:"name"`
	Type  string `json:"type"`
	Value any    `json:"value"`
}

type ColumnDiff added in v1.0.0

type ColumnDiff struct {
	ColumnName       string
	ColumnPgstreamID string
	NameChange       *ValueChange[string]
	TypeChange       *ValueChange[string]
	UniqueChange     *ValueChange[bool]
	NullChange       *ValueChange[bool]
	DefaultChange    *ValueChange[*string]
	GeneratedChange  *ValueChange[bool]
	IdentityChange   *ValueChange[string]
}

type CommitPosition

type CommitPosition string

CommitPosition represents a position in the input stream

type DDLColumn added in v1.0.0

type DDLColumn struct {
	Attnum    int     `json:"attnum"`
	Name      string  `json:"name"`
	Type      string  `json:"type"`
	Nullable  bool    `json:"nullable"`
	Default   *string `json:"default,omitempty"`
	Generated bool    `json:"generated"`
	Identity  *string `json:"identity,omitempty"`
	Unique    bool    `json:"unique"`
}

DDLColumn represents a column in a DDL event

func (*DDLColumn) GetColumnPgstreamID added in v1.0.0

func (c *DDLColumn) GetColumnPgstreamID(tablePgstreamID string) string

GetColumnPgstreamID returns the pgstream ID for a column based on table pgstream ID and attnum

func (*DDLColumn) GetSequenceName added in v1.0.0

func (c *DDLColumn) GetSequenceName() string

func (*DDLColumn) HasSequence added in v1.0.0

func (c *DDLColumn) HasSequence() bool

func (*DDLColumn) IsGenerated added in v1.0.0

func (c *DDLColumn) IsGenerated() bool

type DDLEvent added in v1.0.0

type DDLEvent struct {
	DDL        string      `json:"ddl"`
	SchemaName string      `json:"schema_name"`
	CommandTag string      `json:"command_tag"`
	Objects    []DDLObject `json:"objects"`
}

DDLEvent represents a parsed DDL logical message from pgstream.ddl prefix

func WalDataToDDLEvent added in v1.0.0

func WalDataToDDLEvent(d *Data) (*DDLEvent, error)

WalDataToDDLEvent parses the wal data content field as a DDL event

func (*DDLEvent) GetMaterializedViewObjects added in v1.0.0

func (e *DDLEvent) GetMaterializedViewObjects() []DDLObject

GetMaterializedViewObjects returns only the materialized view objects from the DDL event

func (*DDLEvent) GetObjectsByType added in v1.0.0

func (e *DDLEvent) GetObjectsByType(objectType string) []DDLObject

func (*DDLEvent) GetTableColumnObjects added in v1.0.0

func (e *DDLEvent) GetTableColumnObjects() []DDLObject

GetTableColumnObjects returns only the table column objects from the DDL event

func (*DDLEvent) GetTableObjectByName added in v1.0.1

func (e *DDLEvent) GetTableObjectByName(schema, table string) *DDLObject

func (*DDLEvent) GetTableObjects added in v1.0.0

func (e *DDLEvent) GetTableObjects() []DDLObject

GetTableObjects returns only the table objects from the DDL event

func (*DDLEvent) IsDropEvent added in v1.0.0

func (e *DDLEvent) IsDropEvent() bool

type DDLObject added in v1.0.0

type DDLObject struct {
	Type              string      `json:"type"`
	Identity          string      `json:"identity"`
	Schema            string      `json:"schema"`
	OID               string      `json:"oid"`
	PgstreamID        string      `json:"pgstream_id,omitempty"`
	Columns           []DDLColumn `json:"columns,omitempty"`
	PrimaryKeyColumns []string    `json:"primary_key_columns,omitempty"`
}

DDLObject represents an object affected by a DDL command

func (*DDLObject) GetColumnByName added in v1.0.0

func (e *DDLObject) GetColumnByName(name string) (*DDLColumn, bool)

func (*DDLObject) GetName added in v1.0.0

func (e *DDLObject) GetName() string

GetName extracts the unqualified object name from the identity. For example:

  • "public.users" returns "users"
  • "public.test_table.username" returns "username"

func (*DDLObject) GetSchema added in v1.0.0

func (e *DDLObject) GetSchema() string

GetSchema extracts the schema name from the identity. For example:

  • "public.users" returns "public"
  • "public.test_table.username" returns "public"

func (*DDLObject) GetTable added in v1.0.0

func (e *DDLObject) GetTable() string

GetTable extracts the table name from the identity. For table objects: "public.users" returns "users" For table column objects: "public.test_table.username" returns "test_table"

type Data

type Data struct {
	Action    string   `json:"action"`    // "I" -- insert, "U" -- update, "D" -- delete, "T" -- truncate, "M" -- logical message
	Timestamp string   `json:"timestamp"` // ISO8601, i.e. 2019-12-29 04:58:34.806671
	LSN       string   `json:"lsn"`
	Schema    string   `json:"schema"`
	Table     string   `json:"table"`
	Columns   []Column `json:"columns"`
	Identity  []Column `json:"identity"`
	XID       int64    `json:"xid,omitempty"`
	// For logical messages (when Action == "M")
	Prefix  string `json:"prefix,omitempty"`
	Content string `json:"content,omitempty"`
	// pgstream specific metadata
	Metadata Metadata `json:"metadata"`
}

Data contains the wal data properties identifying the table operation.

func (*Data) GetTimestamp

func (d *Data) GetTimestamp() (time.Time, error)

func (*Data) IsDDLEvent added in v1.0.0

func (d *Data) IsDDLEvent() bool

IsDDLEvent returns true if the data represents a DDL logical message

func (*Data) IsInsert

func (d *Data) IsInsert() bool

func (*Data) IsUpdate

func (d *Data) IsUpdate() bool

type Event

type Event struct {
	Data           *Data
	CommitPosition CommitPosition
}

Event represents the WAL information. If the data is nil but there's a commit position present, it represents a keep alive event that needs to be checkpointed.

type Metadata

type Metadata struct {
	SchemaID        xid.ID `json:"schema_id"`         // the schema ID the event was stamped with
	TablePgstreamID string `json:"table_pgstream_id"` // the ID of the table to which the event belongs
	// This is the Pgstream ID of the "id" column(s). We track this specifically, as we extract it from the event
	// in order to use as the ID for the record.
	InternalColIDs []string `json:"id_col_pgstream_id"`
}

Metadata is pgstream specific properties to help identify the id/version within the wal event as well as some pgstream unique immutable ids for the schema and the table it relates to.

func (Metadata) IsEmpty

func (m Metadata) IsEmpty() bool

IsEmpty returns true if the pgstream metadata hasn't been populated, false otherwise.

func (Metadata) IsIDColumn

func (m Metadata) IsIDColumn(colID string) bool

IsIDColumn returns true if the column id on input is part of the pgstream identified identity columns.

type SchemaDiff added in v1.0.0

type SchemaDiff struct {
	SchemaName    string
	SchemaDropped bool
	TablesAdded   []DDLObject
	TablesRemoved []DDLObject
	TablesChanged []TableDiff
}

func DDLEventToSchemaDiff added in v1.0.0

func DDLEventToSchemaDiff(ddlEvent *DDLEvent) (*SchemaDiff, error)

func (*SchemaDiff) IsEmpty added in v1.0.0

func (d *SchemaDiff) IsEmpty() bool

type TableDiff added in v1.0.0

type TableDiff struct {
	TableName             string
	TablePgstreamID       string
	TableNameChange       *ValueChange[string]
	TablePrimaryKeyChange *ValueChange[[]string]
	ColumnsAdded          []DDLColumn
	ColumnsRemoved        []DDLColumn
	ColumnsChanged        []ColumnDiff
}

func (*TableDiff) IsEmpty added in v1.0.0

func (td *TableDiff) IsEmpty() bool

type ValueChange added in v1.0.0

type ValueChange[T any] struct {
	Old, New T
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL