Documentation
¶
Index ¶
- func InitializeSqliteDatabase(ctx context.Context, sqliteCfg config.SqliteConfig, ...) (eventstore.EventsSaver, eventstore.EventsRetriever, eventstore.LockProvider, ...)
- type BoundaryPools
- type SqliteAdminDB
- func (a *SqliteAdminDB) CreateBoundaryIndex(ctx context.Context, boundary, name string, ...) (err error)
- func (a *SqliteAdminDB) DeleteUser(id string) error
- func (a *SqliteAdminDB) DropBoundaryIndex(ctx context.Context, boundary, name string) (err error)
- func (a *SqliteAdminDB) GetEventsCount(boundary string) (int, error)
- func (a *SqliteAdminDB) GetProjectorLastPosition(projectorName string) (*eventstore.Position, error)
- func (a *SqliteAdminDB) GetUserById(id string) (eventstore.User, error)
- func (a *SqliteAdminDB) GetUserByUsername(username string) (eventstore.User, error)
- func (a *SqliteAdminDB) GetUsersCount() (uint32, error)
- func (a *SqliteAdminDB) ListAdminUsers() ([]*eventstore.User, error)
- func (a *SqliteAdminDB) SaveEventCount(count int, boundary string) error
- func (a *SqliteAdminDB) SaveUsersCount(count uint32) error
- func (a *SqliteAdminDB) UpdateProjectorPosition(name string, position *eventstore.Position) error
- func (a *SqliteAdminDB) UpsertUser(user eventstore.User) error
- type SqliteEventNotifier
- type SqliteEventPublishing
- type SqliteGetEvents
- type SqliteSaveEvents
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitializeSqliteDatabase ¶ added in v0.2.7
func InitializeSqliteDatabase( ctx context.Context, sqliteCfg config.SqliteConfig, adminCfg config.AdminConfig, boundaries []string, js jetstream.JetStream, logger logging.Logger, ) (eventstore.EventsSaver, eventstore.EventsRetriever, eventstore.LockProvider, common.DB, eventstore.EventPublishingTracker, func(string) eventstore.EventSignal, error)
InitializeSqliteDatabase opens per-boundary SQLite pools and constructs the five backend interfaces. Boundary identity is the file: {dir}/{boundary}.db.
Types ¶
type BoundaryPools ¶ added in v0.2.7
type BoundaryPools struct {
Boundary string
Write *sqlitex.Pool
Read *sqlitex.Pool
// contains filtered or unexported fields
}
BoundaryPools holds the write and read connection pools backing one boundary's SQLite file.
Write pool is sized to 1: SQLite serializes writers anyway via its file lock, so a single in-flight writer queues callers in Go and avoids SQLITE_BUSY churn. Read pool is sized to runtime.NumCPU() — under WAL, readers run concurrently with the single writer.
func OpenBoundaryPools ¶ added in v0.2.7
func OpenBoundaryPools(ctx context.Context, dir, boundary, adminBoundary string) (*BoundaryPools, error)
OpenBoundaryPools opens write+read pools for one boundary at {dir}/{boundary}.db. Migrations are applied on the first connection drawn from the write pool. adminBoundary controls whether admin-only tables (users, users_count) are created.
func OpenBoundaryPoolsWithConfig ¶ added in v0.2.16
func OpenBoundaryPoolsWithConfig(ctx context.Context, sqliteCfg config.SqliteConfig, boundary, adminBoundary string) (*BoundaryPools, error)
func (*BoundaryPools) Close ¶ added in v0.2.7
func (b *BoundaryPools) Close() error
type SqliteAdminDB ¶ added in v0.2.7
type SqliteAdminDB struct {
// contains filtered or unexported fields
}
func NewSqliteAdminDB ¶ added in v0.2.7
func NewSqliteAdminDB(pools map[string]*BoundaryPools, adminBoundary string, logger logging.Logger) *SqliteAdminDB
func (*SqliteAdminDB) CreateBoundaryIndex ¶ added in v0.2.7
func (a *SqliteAdminDB) CreateBoundaryIndex( ctx context.Context, boundary, name string, fields []eventstore.BoundaryIndexField, conditions []eventstore.BoundaryIndexCondition, combinator string, ) (err error)
CreateBoundaryIndex builds a partial expression index over orisun_es_event.data JSON keys. Index name = "{name}_idx" inside the boundary's database.
func (*SqliteAdminDB) DeleteUser ¶ added in v0.2.7
func (a *SqliteAdminDB) DeleteUser(id string) error
func (*SqliteAdminDB) DropBoundaryIndex ¶ added in v0.2.7
func (a *SqliteAdminDB) DropBoundaryIndex(ctx context.Context, boundary, name string) (err error)
func (*SqliteAdminDB) GetEventsCount ¶ added in v0.2.7
func (a *SqliteAdminDB) GetEventsCount(boundary string) (int, error)
func (*SqliteAdminDB) GetProjectorLastPosition ¶ added in v0.2.7
func (a *SqliteAdminDB) GetProjectorLastPosition(projectorName string) (*eventstore.Position, error)
func (*SqliteAdminDB) GetUserById ¶ added in v0.2.7
func (a *SqliteAdminDB) GetUserById(id string) (eventstore.User, error)
func (*SqliteAdminDB) GetUserByUsername ¶ added in v0.2.7
func (a *SqliteAdminDB) GetUserByUsername(username string) (eventstore.User, error)
func (*SqliteAdminDB) GetUsersCount ¶ added in v0.2.7
func (a *SqliteAdminDB) GetUsersCount() (uint32, error)
func (*SqliteAdminDB) ListAdminUsers ¶ added in v0.2.7
func (a *SqliteAdminDB) ListAdminUsers() ([]*eventstore.User, error)
func (*SqliteAdminDB) SaveEventCount ¶ added in v0.2.7
func (a *SqliteAdminDB) SaveEventCount(count int, boundary string) error
func (*SqliteAdminDB) SaveUsersCount ¶ added in v0.2.7
func (a *SqliteAdminDB) SaveUsersCount(count uint32) error
func (*SqliteAdminDB) UpdateProjectorPosition ¶ added in v0.2.7
func (a *SqliteAdminDB) UpdateProjectorPosition(name string, position *eventstore.Position) error
func (*SqliteAdminDB) UpsertUser ¶ added in v0.2.7
func (a *SqliteAdminDB) UpsertUser(user eventstore.User) error
type SqliteEventNotifier ¶ added in v0.2.16
type SqliteEventNotifier struct {
// contains filtered or unexported fields
}
func NewSqliteEventNotifier ¶ added in v0.2.16
func NewSqliteEventNotifier(interval time.Duration) *SqliteEventNotifier
func (*SqliteEventNotifier) Notify ¶ added in v0.2.16
func (n *SqliteEventNotifier) Notify(boundary string)
func (*SqliteEventNotifier) Signal ¶ added in v0.2.16
func (n *SqliteEventNotifier) Signal(boundary string) eventstore.EventSignal
type SqliteEventPublishing ¶ added in v0.2.7
type SqliteEventPublishing struct {
// contains filtered or unexported fields
}
func NewSqliteEventPublishing ¶ added in v0.2.7
func NewSqliteEventPublishing(pools map[string]*BoundaryPools, logger logging.Logger) *SqliteEventPublishing
func (*SqliteEventPublishing) GetLastPublishedEventPosition ¶ added in v0.2.7
func (p *SqliteEventPublishing) GetLastPublishedEventPosition(ctx context.Context, boundary string) (eventstore.Position, error)
func (*SqliteEventPublishing) InsertLastPublishedEvent ¶ added in v0.2.7
type SqliteGetEvents ¶ added in v0.2.7
type SqliteGetEvents struct {
// contains filtered or unexported fields
}
func NewSqliteGetEvents ¶ added in v0.2.7
func NewSqliteGetEvents(pools map[string]*BoundaryPools, logger logging.Logger) *SqliteGetEvents
func (*SqliteGetEvents) Get ¶ added in v0.2.7
func (s *SqliteGetEvents) Get(ctx context.Context, req *eventstore.GetEventsRequest) (*eventstore.GetEventsResponse, error)
func (*SqliteGetEvents) GetLatestByCriteria ¶ added in v0.4.0
func (s *SqliteGetEvents) GetLatestByCriteria(ctx context.Context, req *eventstore.GetLatestByCriteriaRequest) (*eventstore.GetLatestByCriteriaResponse, error)
GetLatestByCriteria returns the latest event per criterion plus the max observed position, all from ONE read snapshot: the per-criterion lookups run inside an explicit deferred read transaction, so under WAL every statement sees the same database state. Independent client reads cannot substitute — an event committing between them can hide below the observed max position.
type SqliteSaveEvents ¶ added in v0.2.7
type SqliteSaveEvents struct {
// contains filtered or unexported fields
}
func NewSqliteSaveEvents ¶ added in v0.2.7
func NewSqliteSaveEvents(pools map[string]*BoundaryPools, logger logging.Logger) *SqliteSaveEvents
func (*SqliteSaveEvents) Save ¶ added in v0.2.7
func (s *SqliteSaveEvents) Save( ctx context.Context, events []eventstore.EventWithMapTags, boundary string, expectedPosition *eventstore.Position, streamConsistencyCondition *eventstore.Query, ) (transactionID string, globalID int64, err error)