Documentation
¶
Index ¶
- func RunDbScripts(db *sql.DB, boundary string, schema string, isAdminSchema bool, ...) error
- type PGNotifyListener
- type PostgresAdminDB
- func (db *PostgresAdminDB) CreateBoundaryIndex(ctx context.Context, boundary, name string, ...) error
- func (p *PostgresAdminDB) DeleteUser(id string) error
- func (db *PostgresAdminDB) DropBoundaryIndex(ctx context.Context, boundary, name string) error
- func (s *PostgresAdminDB) GetEventsCount(boundary string) (int, error)
- func (s *PostgresAdminDB) GetProjectorLastPosition(projectorName string) (*eventstore.Position, error)
- func (s *PostgresAdminDB) GetUserById(id string) (orisun.User, error)
- func (s *PostgresAdminDB) GetUserByUsername(username string) (orisun.User, error)
- func (s *PostgresAdminDB) GetUsersCount() (uint32, error)
- func (s *PostgresAdminDB) ListAdminUsers() ([]*orisun.User, error)
- func (s *PostgresAdminDB) SaveEventCount(event_count int, boundary string) error
- func (s *PostgresAdminDB) SaveUsersCount(users_count uint32) error
- func (p *PostgresAdminDB) UpdateProjectorPosition(name string, position *eventstore.Position) error
- func (s *PostgresAdminDB) UpsertUser(user orisun.User) error
- type PostgresEventPublishing
- func (s *PostgresEventPublishing) GetLastPublishedEventPosition(ctx context.Context, boundary string) (orisun.Position, error)
- func (s *PostgresEventPublishing) InsertLastPublishedEvent(ctx context.Context, boundaryOfInterest string, transactionId int64, ...) error
- func (s *PostgresEventPublishing) Schema(boundary string) (string, error)
- type PostgresGetEvents
- func (s *PostgresGetEvents) Get(ctx context.Context, req *eventstore.GetEventsRequest) (*eventstore.GetEventsResponse, error)
- func (s *PostgresGetEvents) GetLatestByCriteria(ctx context.Context, req *eventstore.GetLatestByCriteriaRequest) (*eventstore.GetLatestByCriteriaResponse, error)
- func (s *PostgresGetEvents) Schema(boundary string) (string, error)
- type PostgresSaveEvents
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RunDbScripts ¶
func RunDbScripts(db *sql.DB, boundary string, schema string, isAdminSchema bool, ctx context.Context) error
RunDbScripts initializes database tables for a specific boundary. It validates the boundary name, creates the schema if needed, and calls the PostgreSQL initialization functions to create boundary-prefixed tables.
Types ¶
type PGNotifyListener ¶ added in v0.2.6
type PGNotifyListener struct {
// contains filtered or unexported fields
}
func InitializePostgresDatabase ¶
func InitializePostgresDatabase( ctx context.Context, postgresDBConfig config.PostgresDBConfig, adminConfig config.AdminConfig, js jetstream.JetStream, logger logging.Logger, ) (eventstore.EventsSaver, eventstore.EventsRetriever, eventstore.LockProvider, common.DB, eventstore.EventPublishingTracker, *PGNotifyListener)
func NewPGNotifyListener ¶ added in v0.2.6
func NewPGNotifyListener( ctx context.Context, connStr string, boundarySchemaMapping map[string]config.BoundaryToPostgresSchemaMapping, logger logging.Logger, ) (*PGNotifyListener, error)
func (*PGNotifyListener) Close ¶ added in v0.2.6
func (l *PGNotifyListener) Close(ctx context.Context)
Close waits for the Start goroutine to stop (it closes the connection on exit). The caller must cancel the context passed to Start to trigger shutdown; Close itself returns once Start has finished or ctx is done.
func (*PGNotifyListener) Signal ¶ added in v0.2.6
func (l *PGNotifyListener) Signal(boundary string, catchupInterval time.Duration) orisun.EventSignal
func (*PGNotifyListener) Start ¶ added in v0.2.6
func (l *PGNotifyListener) Start(ctx context.Context)
Start owns l.conn exclusively for the lifetime of the goroutine: it is the only place the connection is read, swapped (via reconnect), or closed. No other goroutine may touch l.conn — pgx.Conn is not safe for concurrent use.
type PostgresAdminDB ¶
type PostgresAdminDB struct {
// contains filtered or unexported fields
}
func NewPostgresAdminDB ¶
func NewPostgresAdminDB(db *sql.DB, logger logging.Logger, schema string, boundary string, boundarySchemaMappings map[string]config.BoundaryToPostgresSchemaMapping) *PostgresAdminDB
func (*PostgresAdminDB) CreateBoundaryIndex ¶ added in v0.2.0
func (db *PostgresAdminDB) CreateBoundaryIndex( ctx context.Context, boundary, name string, fields []eventstore.BoundaryIndexField, conditions []eventstore.BoundaryIndexCondition, combinator string, ) error
func (*PostgresAdminDB) DeleteUser ¶
func (p *PostgresAdminDB) DeleteUser(id string) error
func (*PostgresAdminDB) DropBoundaryIndex ¶ added in v0.2.0
func (db *PostgresAdminDB) DropBoundaryIndex( ctx context.Context, boundary, name string, ) error
func (*PostgresAdminDB) GetEventsCount ¶
func (s *PostgresAdminDB) GetEventsCount(boundary string) (int, error)
func (*PostgresAdminDB) GetProjectorLastPosition ¶
func (s *PostgresAdminDB) GetProjectorLastPosition(projectorName string) (*eventstore.Position, error)
func (*PostgresAdminDB) GetUserById ¶
func (s *PostgresAdminDB) GetUserById(id string) (orisun.User, error)
func (*PostgresAdminDB) GetUserByUsername ¶
func (s *PostgresAdminDB) GetUserByUsername(username string) (orisun.User, error)
func (*PostgresAdminDB) GetUsersCount ¶
func (s *PostgresAdminDB) GetUsersCount() (uint32, error)
func (*PostgresAdminDB) ListAdminUsers ¶
func (s *PostgresAdminDB) ListAdminUsers() ([]*orisun.User, error)
func (*PostgresAdminDB) SaveEventCount ¶
func (s *PostgresAdminDB) SaveEventCount(event_count int, boundary string) error
func (*PostgresAdminDB) SaveUsersCount ¶
func (s *PostgresAdminDB) SaveUsersCount(users_count uint32) error
func (*PostgresAdminDB) UpdateProjectorPosition ¶
func (p *PostgresAdminDB) UpdateProjectorPosition(name string, position *eventstore.Position) error
func (*PostgresAdminDB) UpsertUser ¶
func (s *PostgresAdminDB) UpsertUser(user orisun.User) error
type PostgresEventPublishing ¶
type PostgresEventPublishing struct {
// contains filtered or unexported fields
}
func NewPostgresEventPublishing ¶
func NewPostgresEventPublishing(db *sql.DB, logger logging.Logger, boundarySchemaMappings map[string]config.BoundaryToPostgresSchemaMapping) *PostgresEventPublishing
func (*PostgresEventPublishing) GetLastPublishedEventPosition ¶
func (*PostgresEventPublishing) InsertLastPublishedEvent ¶
type PostgresGetEvents ¶
type PostgresGetEvents struct {
// contains filtered or unexported fields
}
func NewPostgresGetEvents ¶
func NewPostgresGetEvents(db *sql.DB, logger logging.Logger, boundarySchemaMappings map[string]config.BoundaryToPostgresSchemaMapping) *PostgresGetEvents
func (*PostgresGetEvents) Get ¶
func (s *PostgresGetEvents) Get(ctx context.Context, req *eventstore.GetEventsRequest) (*eventstore.GetEventsResponse, error)
func (*PostgresGetEvents) GetLatestByCriteria ¶ added in v0.4.0
func (s *PostgresGetEvents) GetLatestByCriteria(ctx context.Context, req *eventstore.GetLatestByCriteriaRequest) (*eventstore.GetLatestByCriteriaResponse, error)
GetLatestByCriteria returns the latest event per criterion plus the max observed position. The per-criterion lookups run inside one SQL statement (get_latest_by_criteria_v1 builds a UNION ALL of LIMIT-1 subqueries), so the whole context comes from one snapshot — assembling it from independent queries would let an event commit in between with a position below the observed maximum, invisible to a scalar expected-position check.
type PostgresSaveEvents ¶
type PostgresSaveEvents struct {
// contains filtered or unexported fields
}
func NewPostgresSaveEvents ¶
func NewPostgresSaveEvents( ctx context.Context, db *sql.DB, logger logging.Logger, boundarySchemaMappings map[string]config.BoundaryToPostgresSchemaMapping) *PostgresSaveEvents
func (*PostgresSaveEvents) Save ¶
func (s *PostgresSaveEvents) Save( ctx context.Context, events []eventstore.EventWithMapTags, boundary string, expectedPosition *eventstore.Position, streamConsistencyCondition *eventstore.Query) (transactionID string, globalID int64, err error)