postgres

package module
v0.0.0-...-14b8f79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: MIT Imports: 23 Imported by: 0

README

Event Storage library

Build Status Report Card Go Reference codecov

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

type Connector

type Connector interface {
	// Ping will always be the first call a Storage does to a Connector.
	Ping(ctx context.Context) error

	// ApplyMigrations must call apply with a Conn for all instances that must have DDL done.
	ApplyMigrations(ctx context.Context, apply func(conn *pgxpool.Conn) error) error

	// Close must free all underlying resources
	Close() error

	// AcquireRead supplies a connection used to read
	AcquireRead(ctx context.Context) (*pgxpool.Conn, error)

	// AcquireReadStream supplies a connection used to read from the given stream
	AcquireReadStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error)

	// AcquireWrite supplies a connection used to write
	AcquireWrite(ctx context.Context) (*pgxpool.Conn, error)

	// AcquireWriteStream supplies a connection used to write to the given stream
	AcquireWriteStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error)
	// contains filtered or unexported methods
}

type Instance

type Instance struct {
	// contains filtered or unexported fields
}

func InstanceFromDSN

func InstanceFromDSN(dsn string) *Instance

func InstanceFromPool

func InstanceFromPool(pool *pgxpool.Pool) *Instance

func (*Instance) AcquireRead

func (i *Instance) AcquireRead(ctx context.Context) (*pgxpool.Conn, error)

func (*Instance) AcquireReadStream

func (i *Instance) AcquireReadStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error)

func (*Instance) AcquireWrite

func (i *Instance) AcquireWrite(ctx context.Context) (*pgxpool.Conn, error)

func (*Instance) AcquireWriteStream

func (i *Instance) AcquireWriteStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error)

func (*Instance) ApplyMigrations

func (i *Instance) ApplyMigrations(ctx context.Context, apply func(conn *pgxpool.Conn) error) error

func (*Instance) Close

func (i *Instance) Close() error

func (*Instance) Ping

func (i *Instance) Ping(ctx context.Context) error

type Logger

type Logger interface {
	InfofCtx(ctx context.Context, template string, args ...any)
	ErrorfCtx(ctx context.Context, template string, args ...any)
}

type Option

type Option func(cfg *Config)

func WithDefaultSlog

func WithDefaultSlog() Option

func WithExponentialProcessBackoff

func WithExponentialProcessBackoff(base time.Duration) Option

WithExponentialProcessBackoff doubles the wait time with each retry.

func WithFNVPartitioner

func WithFNVPartitioner(partitionCount uint32) Option

func WithFixedProcessBackoff

func WithFixedProcessBackoff(d time.Duration) Option

WithFixedProcessBackoff waits a fixed amount of time between retries.

func WithLeaseHeartbeatInterval

func WithLeaseHeartbeatInterval(interval time.Duration) Option

func WithLeaseRange

func WithLeaseRange(from, to uint32) Option

WithLeaseRange sets the range of leases used by the consistent hashing mechanism.

func WithLeaseVNodeCount

func WithLeaseVNodeCount(count uint32) Option

func WithLinearProcessBackoff

func WithLinearProcessBackoff(increment time.Duration) Option

WithLinearProcessBackoff increases the wait time linearly with each retry.

func WithListenPublishing

func WithListenPublishing(enabled bool) Option

WithListenPublishing enables publishing by the Postgres LISTEN/NOTIFY mechanism.

func WithLogger

func WithLogger(logger Logger) Option

func WithNoopLogger

func WithNoopLogger() Option

func WithPartitioner

func WithPartitioner(partitioner func(streamType, streamID string) uint32) Option

func WithProcessBackoff

func WithProcessBackoff(fn func(streamType string, retryCount int64) time.Duration) Option

WithProcessBackoff is used to delay processing of a stream in the outbox after failure

func WithProcessTimeout

func WithProcessTimeout(timeout time.Duration) Option

WithProcessTimeout sets the timeout for processing a stream in the outbox

func WithReconcileInterval

func WithReconcileInterval(interval time.Duration) Option

WithReconcileInterval sets how often the Storage will check for new items in the Outbox. This is purely a fallback mechanism, as there is a life-streaming mechanism that should take all traffic. In some error scenarios this check will ensure everything is published. The default value is reasonable high, so as to not put too much load on the database.

func WithReconcilePublishing

func WithReconcilePublishing(enabled bool) Option

WithReconcilePublishing enables publishing by a periodic reconciler.

func WithReconcileTimeout

func WithReconcileTimeout(timeout time.Duration) Option

WithReconcileTimeout sets the timeout for doing a reconcile check.

func WithSlog

func WithSlog(log *slog.Logger) Option

func WithStartContext

func WithStartContext(ctx context.Context) Option

WithStartContext uses the provided context during initialization.

func WithTablePrefix

func WithTablePrefix(prefix string) Option

WithTablePrefix uses the given prefix for the database tables that this library creates. Table names will have the form "{prefix}_{name}". Example: "es_migrations"

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func New

func New(connector Connector, opts ...Option) (*Storage, error)

func (*Storage) Close

func (s *Storage) Close() error

func (*Storage) GetStreamReferences

func (s *Storage) GetStreamReferences(ctx context.Context, streamType string, storeStreamID string, limit int64) iter.Seq2[es.StreamReference, error]

func (*Storage) Read

func (s *Storage) Read(ctx context.Context, streamType string, streamID string, eventNumber int64) iter.Seq2[es.Event, error]

func (*Storage) Register

func (s *Storage) Register(streamType string, types ...es.Content) error

func (*Storage) StartPublish

func (s *Storage) StartPublish(ctx context.Context, w es.Writer) error

func (*Storage) Write

func (s *Storage) Write(ctx context.Context, streamType string, events iter.Seq2[es.Event, error]) error

type Writer

type Writer es.Writer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL