schemalog

package
v0.7.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SchemaName = "pgstream"
	TableName  = "schema_log"
)

Variables

View Source
var ErrNoRows = errors.New("no rows")

Functions

This section is empty.

Types

type Column

type Column struct {
	Name         string  `json:"name"`
	DataType     string  `json:"type"`
	DefaultValue *string `json:"default,omitempty"`
	Nullable     bool    `json:"nullable"`
	Generated    bool    `json:"generated"`
	Unique       bool    `json:"unique"`
	// Metadata is NOT typed here because we don't fully control the content that is sent from the publisher.
	Metadata   *string `json:"metadata"`
	PgstreamID string  `json:"pgstream_id"`
}

func (*Column) IsEqual

func (c *Column) IsEqual(other *Column) bool

type ColumnDiff added in v0.4.0

type ColumnDiff struct {
	ColumnName       string
	ColumnPgstreamID string
	NameChange       *ValueChange[string]
	TypeChange       *ValueChange[string]
	UniqueChange     *ValueChange[bool]
	NullChange       *ValueChange[bool]
	DefaultChange    *ValueChange[*string]
}

func (*ColumnDiff) IsEmpty added in v0.4.0

func (cd *ColumnDiff) IsEmpty() bool

type Diff added in v0.4.0

type Diff struct {
	TablesRemoved []Table
	TablesAdded   []Table
	TablesChanged []TableDiff
}

func ComputeSchemaDiff added in v0.4.0

func ComputeSchemaDiff(old, new *LogEntry) *Diff

func (*Diff) IsEmpty added in v0.4.0

func (d *Diff) IsEmpty() bool

type LogEntry

type LogEntry struct {
	ID         xid.ID                   `json:"id"`
	Version    int64                    `json:"version"`
	SchemaName string                   `json:"schema_name"`
	CreatedAt  SchemaCreatedAtTimestamp `json:"created_at"`
	Schema     Schema                   `json:"schema"`
	// Acked indicates if the schema has been processed and acknowledged by
	// pgstream after being updated in the source database
	Acked bool `json:"acked"`
}

LogEntry contains the information relating to a schema log

func (*LogEntry) After

func (m *LogEntry) After(other *LogEntry) bool

func (*LogEntry) GetTableByName

func (m *LogEntry) GetTableByName(tableName string) (Table, bool)

func (*LogEntry) IsEmpty

func (m *LogEntry) IsEmpty() bool

func (*LogEntry) IsEqual

func (m *LogEntry) IsEqual(other *LogEntry) bool

func (*LogEntry) UnmarshalJSON

func (m *LogEntry) UnmarshalJSON(b []byte) error

type Schema

type Schema struct {
	Tables []Table `json:"tables"`
	// Dropped will be true if the schema has been deleted
	Dropped bool `json:"dropped,omitempty"`
}

func (*Schema) IsEqual

func (s *Schema) IsEqual(other *Schema) bool

func (*Schema) MarshalJSON

func (s *Schema) MarshalJSON() ([]byte, error)

type SchemaCreatedAtTimestamp

type SchemaCreatedAtTimestamp struct {
	time.Time
}

SchemaCreatedAtTimestamp is a wrapper around time.Time that allows us to parse to and from the PG timestamp format.

func NewSchemaCreatedAtTimestamp

func NewSchemaCreatedAtTimestamp(t time.Time) SchemaCreatedAtTimestamp

func (SchemaCreatedAtTimestamp) MarshalJSON

func (s SchemaCreatedAtTimestamp) MarshalJSON() ([]byte, error)

func (*SchemaCreatedAtTimestamp) Scan

func (s *SchemaCreatedAtTimestamp) Scan(src interface{}) error

func (SchemaCreatedAtTimestamp) TimestampValue

func (s SchemaCreatedAtTimestamp) TimestampValue() (pgtype.Timestamp, error)

func (*SchemaCreatedAtTimestamp) UnmarshalJSON

func (s *SchemaCreatedAtTimestamp) UnmarshalJSON(b []byte) error

type Store

type Store interface {
	Insert(ctx context.Context, schemaName string) (*LogEntry, error)
	FetchLast(ctx context.Context, schemaName string, ackedOnly bool) (*LogEntry, error)
	Fetch(ctx context.Context, schemaName string, version int) (*LogEntry, error)
	Ack(ctx context.Context, le *LogEntry) error
	Close() error
}

type StoreCache

type StoreCache struct {
	// contains filtered or unexported fields
}

StoreCache is a wrapper around a schemalog Store that provides an in memory caching mechanism to reduce the amount of calls to the database. It is not concurrency safe.

func NewStoreCache

func NewStoreCache(store Store) *StoreCache

func (*StoreCache) Ack

func (s *StoreCache) Ack(ctx context.Context, entry *LogEntry) error

func (*StoreCache) Close

func (s *StoreCache) Close() error

func (*StoreCache) Fetch

func (s *StoreCache) Fetch(ctx context.Context, schemaName string, version int) (*LogEntry, error)

func (*StoreCache) FetchLast added in v0.4.0

func (s *StoreCache) FetchLast(ctx context.Context, schemaName string, ackedOnly bool) (*LogEntry, error)

func (*StoreCache) Insert added in v0.3.0

func (s *StoreCache) Insert(ctx context.Context, schemaName string) (*LogEntry, error)

type Table

type Table struct {
	Oid               string   `json:"oid"`
	Name              string   `json:"name"`
	Columns           []Column `json:"columns"`
	PrimaryKeyColumns []string `json:"primary_key_columns"`
	// PgstreamID is a unique identifier of the table generated by pgstream
	PgstreamID string `json:"pgstream_id"`
}

func (*Table) GetColumnByName

func (t *Table) GetColumnByName(name string) (Column, bool)

GetColumnByName returns the table column for the name on input and a boolean to indicate if it was found.

func (*Table) GetFirstUniqueNotNullColumn

func (t *Table) GetFirstUniqueNotNullColumn() *Column

GetFirstUniqueNotNullColumn will return the first unique not null column in the table. It will sort the columns by pgstream ID, and return the first one matching the not null/unique constraints. It uses the pgstream id instead of the name since the id doesn't change.

func (*Table) IsEqual

func (t *Table) IsEqual(other *Table) bool

type TableDiff added in v0.4.0

type TableDiff struct {
	TableName             string
	TablePgstreamID       string
	TableNameChange       *ValueChange[string]
	TablePrimaryKeyChange *ValueChange[[]string]
	ColumnsAdded          []Column
	ColumnsRemoved        []Column
	ColumnsChanged        []ColumnDiff
}

func (*TableDiff) IsEmpty added in v0.4.0

func (td *TableDiff) IsEmpty() bool

type ValueChange added in v0.4.0

type ValueChange[T any] struct {
	Old, New T
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL