Documentation
¶
Index ¶
- type Config
- type Connector
- type Instance
- func (i *Instance) AcquireRead(ctx context.Context) (*pgxpool.Conn, error)
- func (i *Instance) AcquireReadStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error)
- func (i *Instance) AcquireWrite(ctx context.Context) (*pgxpool.Conn, error)
- func (i *Instance) AcquireWriteStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error)
- func (i *Instance) ApplyMigrations(ctx context.Context, apply func(conn *pgxpool.Conn) error) error
- func (i *Instance) Close() error
- func (i *Instance) Ping(ctx context.Context) error
- type Logger
- type Option
- func WithDefaultSlog() Option
- func WithExponentialProcessBackoff(base time.Duration) Option
- func WithFNVPartitioner(partitionCount uint32) Option
- func WithFixedProcessBackoff(d time.Duration) Option
- func WithLeaseHeartbeatInterval(interval time.Duration) Option
- func WithLeaseRange(from, to uint32) Option
- func WithLeaseVNodeCount(count uint32) Option
- func WithLinearProcessBackoff(increment time.Duration) Option
- func WithListenPublishing(enabled bool) Option
- func WithLogger(logger Logger) Option
- func WithNoopLogger() Option
- func WithPartitioner(partitioner func(streamType, streamID string) uint32) Option
- func WithProcessBackoff(fn func(streamType string, retryCount int64) time.Duration) Option
- func WithProcessTimeout(timeout time.Duration) Option
- func WithReconcileInterval(interval time.Duration) Option
- func WithReconcilePublishing(enabled bool) Option
- func WithReconcileTimeout(timeout time.Duration) Option
- func WithSlog(log *slog.Logger) Option
- func WithStartContext(ctx context.Context) Option
- func WithTablePrefix(prefix string) Option
- type Storage
- func (s *Storage) Close() error
- func (s *Storage) GetStreamReferences(ctx context.Context, streamType string, storeStreamID string, limit int64) iter.Seq2[es.StreamReference, error]
- func (s *Storage) Read(ctx context.Context, streamType string, streamID string, eventNumber int64) iter.Seq2[es.Event, error]
- func (s *Storage) Register(streamType string, types ...es.Content) error
- func (s *Storage) StartPublish(ctx context.Context, w es.Writer) error
- func (s *Storage) Write(ctx context.Context, streamType string, events iter.Seq2[es.Event, error]) error
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connector ¶
type Connector interface { // Ping will always be the first call a Storage does to a Connector. Ping(ctx context.Context) error // ApplyMigrations must call apply with a Conn for all instances that must have DDL done. ApplyMigrations(ctx context.Context, apply func(conn *pgxpool.Conn) error) error // Close must free all underlying resources Close() error // AcquireRead supplies a connection used to read AcquireRead(ctx context.Context) (*pgxpool.Conn, error) // AcquireReadStream supplies a connection used to read from the given stream AcquireReadStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error) // AcquireWrite supplies a connection used to write AcquireWrite(ctx context.Context) (*pgxpool.Conn, error) // AcquireWriteStream supplies a connection used to write to the given stream AcquireWriteStream(ctx context.Context, streamType, streamID string) (*pgxpool.Conn, error) // contains filtered or unexported methods }
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
func InstanceFromDSN ¶
func InstanceFromPool ¶
func (*Instance) AcquireRead ¶
func (*Instance) AcquireReadStream ¶
func (*Instance) AcquireWrite ¶
func (*Instance) AcquireWriteStream ¶
func (*Instance) ApplyMigrations ¶
type Option ¶
type Option func(cfg *Config)
func WithDefaultSlog ¶
func WithDefaultSlog() Option
func WithExponentialProcessBackoff ¶
WithExponentialProcessBackoff doubles the wait time with each retry.
func WithFNVPartitioner ¶
func WithFixedProcessBackoff ¶
WithFixedProcessBackoff waits a fixed amount of time between retries.
func WithLeaseRange ¶
WithLeaseRange sets the range of leases used by the consistent hashing mechanism.
func WithLeaseVNodeCount ¶
func WithLinearProcessBackoff ¶
WithLinearProcessBackoff increases the wait time linearly with each retry.
func WithListenPublishing ¶
WithListenPublishing enables publishing by the Postgres LISTEN/NOTIFY mechanism.
func WithLogger ¶
func WithNoopLogger ¶
func WithNoopLogger() Option
func WithPartitioner ¶
func WithProcessBackoff ¶
WithProcessBackoff is used to delay processing of a stream in the outbox after failure
func WithProcessTimeout ¶
WithProcessTimeout sets the timeout for processing a stream in the outbox
func WithReconcileInterval ¶
WithReconcileInterval sets how often the Storage will check for new items in the Outbox. This is purely a fallback mechanism, as there is a life-streaming mechanism that should take all traffic. In some error scenarios this check will ensure everything is published. The default value is reasonable high, so as to not put too much load on the database.
func WithReconcilePublishing ¶
WithReconcilePublishing enables publishing by a periodic reconciler.
func WithReconcileTimeout ¶
WithReconcileTimeout sets the timeout for doing a reconcile check.
func WithStartContext ¶
WithStartContext uses the provided context during initialization.
func WithTablePrefix ¶
WithTablePrefix uses the given prefix for the database tables that this library creates. Table names will have the form "{prefix}_{name}". Example: "es_migrations"
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}