schemalog

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SchemaName = "pgstream"
	TableName  = "schema_log"
)

Variables

View Source
var ErrNoRows = errors.New("no rows")

Functions

This section is empty.

Types

type Column

type Column struct {
	Name          string  `json:"name"`
	DataType      string  `json:"type"`
	DefaultValue  *string `json:"default,omitempty"`
	Nullable      bool    `json:"nullable"`
	Generated     bool    `json:"generated"`
	GeneratedKind string  `json:"generated_kind,omitempty"`
	Identity      string  `json:"identity,omitempty"`
	Unique        bool    `json:"unique"`
	// Metadata is NOT typed here because we don't fully control the content that is sent from the publisher.
	Metadata   *string `json:"metadata"`
	PgstreamID string  `json:"pgstream_id"`
}

func (*Column) GetSequenceName added in v0.9.3

func (c *Column) GetSequenceName() string

func (*Column) HasSequence added in v0.9.3

func (c *Column) HasSequence() bool

func (*Column) IsEqual

func (c *Column) IsEqual(other *Column) bool

func (*Column) IsGenerated added in v0.9.0

func (c *Column) IsGenerated() bool

func (*Column) IsSerial added in v0.9.3

func (c *Column) IsSerial() bool

type ColumnDiff added in v0.4.0

type ColumnDiff struct {
	ColumnName       string
	ColumnPgstreamID string
	NameChange       *ValueChange[string]
	TypeChange       *ValueChange[string]
	UniqueChange     *ValueChange[bool]
	NullChange       *ValueChange[bool]
	DefaultChange    *ValueChange[*string]
	GeneratedChange  *ValueChange[bool]
	IdentityChange   *ValueChange[string]
}

func (*ColumnDiff) IsEmpty added in v0.4.0

func (cd *ColumnDiff) IsEmpty() bool

type Constraint added in v0.9.0

type Constraint struct {
	Name       string `json:"name"`
	Type       string `json:"type"`
	Definition string `json:"definition"`
}

func (*Constraint) IsEqual added in v0.9.0

func (c *Constraint) IsEqual(other *Constraint) bool

type Diff added in v0.4.0

type Diff struct {
	TablesRemoved            []Table
	TablesAdded              []Table
	TablesChanged            []TableDiff
	MaterializedViewsRemoved []MaterializedView
	MaterializedViewsAdded   []MaterializedView
	MaterializedViewsChanged []MaterializedViewsDiff
}

func ComputeSchemaDiff added in v0.4.0

func ComputeSchemaDiff(old, new *LogEntry) *Diff

func (*Diff) IsEmpty added in v0.4.0

func (d *Diff) IsEmpty() bool

type ForeignKey added in v0.9.0

type ForeignKey struct {
	Name       string `json:"name"`
	Definition string `json:"definition"`
}

func (*ForeignKey) IsEqual added in v0.9.0

func (fk *ForeignKey) IsEqual(other *ForeignKey) bool

type Index added in v0.9.0

type Index struct {
	Name       string   `json:"name"`
	Columns    []string `json:"columns"`
	Unique     bool     `json:"unique"`
	Definition string   `json:"definition"`
}

func (*Index) IsEqual added in v0.9.0

func (i *Index) IsEqual(other *Index) bool

type LogEntry

type LogEntry struct {
	ID         xid.ID                   `json:"id"`
	Version    int64                    `json:"version"`
	SchemaName string                   `json:"schema_name"`
	CreatedAt  SchemaCreatedAtTimestamp `json:"created_at"`
	Schema     Schema                   `json:"schema"`
	// Acked indicates if the schema has been processed and acknowledged by
	// pgstream after being updated in the source database
	Acked bool `json:"acked"`
}

LogEntry contains the information relating to a schema log

func (*LogEntry) After

func (m *LogEntry) After(other *LogEntry) bool

func (*LogEntry) GetTableByName

func (m *LogEntry) GetTableByName(tableName string) (Table, bool)

func (*LogEntry) IsEmpty

func (m *LogEntry) IsEmpty() bool

func (*LogEntry) IsEqual

func (m *LogEntry) IsEqual(other *LogEntry) bool

func (*LogEntry) IsMaterializedView added in v0.9.2

func (m *LogEntry) IsMaterializedView(tableName string) bool

func (*LogEntry) UnmarshalJSON

func (m *LogEntry) UnmarshalJSON(b []byte) error

type MaterializedView added in v0.9.2

type MaterializedView struct {
	Oid        string  `json:"oid"`
	Name       string  `json:"name"`
	Definition string  `json:"definition"`
	Indexes    []Index `json:"indexes,omitempty"`
}

func (*MaterializedView) IsEqual added in v0.9.2

func (mv *MaterializedView) IsEqual(other *MaterializedView) bool

type MaterializedViewsDiff added in v0.9.2

type MaterializedViewsDiff struct {
	MaterializedViewName string
	NameChange           *ValueChange[string]
	IndexesAdded         []Index
	IndexesRemoved       []Index
	IndexesChanged       []string
}

func (*MaterializedViewsDiff) IsEmpty added in v0.9.2

func (mv *MaterializedViewsDiff) IsEmpty() bool

type Schema

type Schema struct {
	Tables            []Table            `json:"tables"`
	MaterializedViews []MaterializedView `json:"materialized_views,omitempty"`
	// Dropped will be true if the schema has been deleted
	Dropped bool `json:"dropped,omitempty"`
}

func (*Schema) IsEqual

func (s *Schema) IsEqual(other *Schema) bool

func (*Schema) MarshalJSON

func (s *Schema) MarshalJSON() ([]byte, error)

func (*Schema) TableNames added in v0.8.4

func (s *Schema) TableNames() []string

type SchemaCreatedAtTimestamp

type SchemaCreatedAtTimestamp struct {
	time.Time
}

SchemaCreatedAtTimestamp is a wrapper around time.Time that allows us to parse to and from the PG timestamp format.

func NewSchemaCreatedAtTimestamp

func NewSchemaCreatedAtTimestamp(t time.Time) SchemaCreatedAtTimestamp

func (SchemaCreatedAtTimestamp) MarshalJSON

func (s SchemaCreatedAtTimestamp) MarshalJSON() ([]byte, error)

func (*SchemaCreatedAtTimestamp) Scan

func (s *SchemaCreatedAtTimestamp) Scan(src interface{}) error

func (SchemaCreatedAtTimestamp) TimestampValue

func (s SchemaCreatedAtTimestamp) TimestampValue() (pgtype.Timestamp, error)

func (*SchemaCreatedAtTimestamp) UnmarshalJSON

func (s *SchemaCreatedAtTimestamp) UnmarshalJSON(b []byte) error

type Store

type Store interface {
	Insert(ctx context.Context, schemaName string) (*LogEntry, error)
	FetchLast(ctx context.Context, schemaName string, ackedOnly bool) (*LogEntry, error)
	Fetch(ctx context.Context, schemaName string, version int) (*LogEntry, error)
	Ack(ctx context.Context, le *LogEntry) error
	Close() error
}

type StoreCache

type StoreCache struct {
	// contains filtered or unexported fields
}

StoreCache is a wrapper around a schemalog Store that provides an in memory caching mechanism to reduce the amount of calls to the database. It is not concurrency safe.

func NewStoreCache

func NewStoreCache(store Store) *StoreCache

func (*StoreCache) Ack

func (s *StoreCache) Ack(ctx context.Context, entry *LogEntry) error

func (*StoreCache) Close

func (s *StoreCache) Close() error

func (*StoreCache) Fetch

func (s *StoreCache) Fetch(ctx context.Context, schemaName string, version int) (*LogEntry, error)

func (*StoreCache) FetchLast added in v0.4.0

func (s *StoreCache) FetchLast(ctx context.Context, schemaName string, ackedOnly bool) (*LogEntry, error)

func (*StoreCache) Insert added in v0.3.0

func (s *StoreCache) Insert(ctx context.Context, schemaName string) (*LogEntry, error)

type Table

type Table struct {
	Oid               string       `json:"oid"`
	Name              string       `json:"name"`
	Columns           []Column     `json:"columns"`
	PrimaryKeyColumns []string     `json:"primary_key_columns"`
	Indexes           []Index      `json:"indexes"`
	Constraints       []Constraint `json:"constraints"`
	ForeignKeys       []ForeignKey `json:"foreign_keys"`
	// PgstreamID is a unique identifier of the table generated by pgstream
	PgstreamID string `json:"pgstream_id"`
}

func (*Table) GetColumnByName

func (t *Table) GetColumnByName(name string) (Column, bool)

GetColumnByName returns the table column for the name on input and a boolean to indicate if it was found.

func (*Table) GetFirstUniqueNotNullColumn

func (t *Table) GetFirstUniqueNotNullColumn() *Column

GetFirstUniqueNotNullColumn will return the first unique not null column in the table. It will sort the columns by pgstream ID, and return the first one matching the not null/unique constraints. It uses the pgstream id instead of the name since the id doesn't change.

func (*Table) IsEqual

func (t *Table) IsEqual(other *Table) bool

type TableDiff added in v0.4.0

type TableDiff struct {
	TableName             string
	TablePgstreamID       string
	TableNameChange       *ValueChange[string]
	TablePrimaryKeyChange *ValueChange[[]string]
	ColumnsAdded          []Column
	ColumnsRemoved        []Column
	ColumnsChanged        []ColumnDiff
	IndexesAdded          []Index
	IndexesRemoved        []Index
	IndexesChanged        []string
	ConstraintsAdded      []Constraint
	ConstraintsRemoved    []Constraint
	ForeignKeysAdded      []ForeignKey
	ForeignKeysRemoved    []ForeignKey
}

func (*TableDiff) IsEmpty added in v0.4.0

func (td *TableDiff) IsEmpty() bool

type ValueChange added in v0.4.0

type ValueChange[T any] struct {
	Old, New T
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL