stream

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Destroy added in v0.8.0

func Destroy(ctx context.Context, pgURL, replicationSlotName string) error

Destroy removes the pgstream state from the postgres database provided, as well as removing the replication slot.

func Init

func Init(ctx context.Context, pgURL, replicationSlotName string) error

Init initialises the pgstream state in the postgres database provided, along with creating the relevant replication slot if it doesn't already exist.

func Run

func Run(ctx context.Context, logger loglib.Logger, config *Config, init bool, instrumentation *otel.Instrumentation) error

Run will run the configured pgstream processes. This call is blocking.

func Snapshot added in v0.5.0

func Snapshot(ctx context.Context, logger loglib.Logger, config *Config, instrumentation *otel.Instrumentation) error

Types

type Config

type Config struct {
	Listener  ListenerConfig
	Processor ProcessorConfig
}

func (*Config) IsValid

func (c *Config) IsValid() error

func (*Config) PostgresReplicationSlot added in v0.5.0

func (c *Config) PostgresReplicationSlot() string

func (*Config) RequiredTables added in v0.6.4

func (c *Config) RequiredTables() []string

func (*Config) SourcePostgresURL added in v0.5.0

func (c *Config) SourcePostgresURL() string

type ConfigStatus added in v0.5.0

type ConfigStatus struct {
	Valid  bool
	Errors []string
}

func (*ConfigStatus) PrettyPrint added in v0.5.0

func (cs *ConfigStatus) PrettyPrint() string

type InitStatus added in v0.5.0

type InitStatus struct {
	PgstreamSchema  *SchemaStatus
	Migration       *MigrationStatus
	ReplicationSlot *ReplicationSlotStatus
}

func (*InitStatus) GetErrors added in v0.5.0

func (is *InitStatus) GetErrors() []string

GetErrors aggregates all errors from the initialisation status.

func (*InitStatus) PrettyPrint added in v0.5.0

func (is *InitStatus) PrettyPrint() string

type KafkaListenerConfig

type KafkaListenerConfig struct {
	Reader       kafka.ReaderConfig
	Checkpointer kafkacheckpoint.Config
}

type KafkaProcessorConfig

type KafkaProcessorConfig struct {
	Writer *kafkaprocessor.Config
}

type ListenerConfig

type ListenerConfig struct {
	Postgres *PostgresListenerConfig
	Kafka    *KafkaListenerConfig
}

func (*ListenerConfig) IsValid added in v0.5.0

func (c *ListenerConfig) IsValid() error

type MigrationStatus added in v0.5.0

type MigrationStatus struct {
	Version uint
	Dirty   bool
	Errors  []string
}

type PostgresListenerConfig

type PostgresListenerConfig struct {
	URL         string
	Replication pgreplication.Config
	RetryPolicy backoff.Config
	Snapshot    *snapshotbuilder.SnapshotListenerConfig
}

type PostgresProcessorConfig added in v0.4.0

type PostgresProcessorConfig struct {
	BatchWriter postgres.Config
}

type ProcessorConfig

type ProcessorConfig struct {
	Kafka       *KafkaProcessorConfig
	Search      *SearchProcessorConfig
	Webhook     *WebhookProcessorConfig
	Postgres    *PostgresProcessorConfig
	Injector    *injector.Config
	Transformer *transformer.Config
	Filter      *filter.Config
}

func (*ProcessorConfig) IsValid added in v0.5.0

func (c *ProcessorConfig) IsValid() error

type ReplicationSlotStatus added in v0.5.0

type ReplicationSlotStatus struct {
	Name     string
	Plugin   string
	Database string
	Errors   []string
}

type SchemaStatus added in v0.5.0

type SchemaStatus struct {
	SchemaExists         bool
	SchemaLogTableExists bool
	Errors               []string
}

type SearchProcessorConfig

type SearchProcessorConfig struct {
	Indexer search.IndexerConfig
	Store   store.Config
	Retrier search.StoreRetryConfig
}

type SourceStatus added in v0.5.0

type SourceStatus struct {
	Reachable bool
	Errors    []string
}

func (*SourceStatus) PrettyPrint added in v0.5.0

func (ss *SourceStatus) PrettyPrint() string

type Status added in v0.5.0

type Status struct {
	Init                *InitStatus
	Config              *ConfigStatus
	TransformationRules *TransformationRulesStatus
	Source              *SourceStatus
}

func (*Status) GetErrors added in v0.5.0

func (s *Status) GetErrors() StatusErrors

func (*Status) PrettyPrint added in v0.5.0

func (s *Status) PrettyPrint() string

type StatusChecker added in v0.5.0

type StatusChecker struct {
	// contains filtered or unexported fields
}

StatusChecker is responsible for validating the status of the pgstream setup in a PostgreSQL database. It performs checks on the source database connection, initialization status (schema, migrations, replication slot), and transformation rules. It provides detailed status information, including errors, to help diagnose issues with the pgstream configuration and setup.

func NewStatusChecker added in v0.5.0

func NewStatusChecker() *StatusChecker

func (*StatusChecker) Status added in v0.5.0

func (s *StatusChecker) Status(ctx context.Context, config *Config) (*Status, error)

Status retrieves the overall status of the pgstream setup, including the source database connection status, initialization status (schema, migrations, replication slot), and transformation rules validation status. It returns a detailed status report that includes any errors encountered during the checks, helping to diagnose issues with the pgstream configuration and setup.

type StatusErrors added in v0.5.0

type StatusErrors map[string][]string

func (StatusErrors) Keys added in v0.5.0

func (se StatusErrors) Keys() []string

type TransformationRulesStatus added in v0.5.0

type TransformationRulesStatus struct {
	Valid  bool
	Errors []string
}

func (*TransformationRulesStatus) PrettyPrint added in v0.5.0

func (trs *TransformationRulesStatus) PrettyPrint() string

type WebhookProcessorConfig

type WebhookProcessorConfig struct {
	Notifier           notifier.Config
	SubscriptionServer server.Config
	SubscriptionStore  WebhookSubscriptionStoreConfig
}

type WebhookSubscriptionStoreConfig

type WebhookSubscriptionStoreConfig struct {
	URL                  string
	CacheEnabled         bool
	CacheRefreshInterval time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL