Documentation
¶
Overview ¶
Package riversqlite provides a River driver implementation for SQLite. It's also tested against libSQL (a SQLite fork), and that should continue to work as long they keep to their commitment in maintaining API compatibility.
This driver is currently in early testing. It's exercised reasonably thoroughly in the test suite, but has minimal real world use as of yet.
River makes extensive use of internal operations that might run in parallel, which doesn't naturally play well with SQLite, which only allows one operation at a time, returning errors like "database is locked (5) (SQLITE_BUSY)" in case another tries to access it. A good workaround to avoid errors is to set the maximum pool size to one connection like `dbPool.SetMaxOpenConns(1)`.
A known deficiency in this driver compared to Postgres is that due to limitations in sqlc, it performs operations like completion and `InsertMany` one row at a time instead of in batches. This means that it's slower than the Postgres driver, especially when benchmarking.
Index ¶
- type Driver
- func (d *Driver) ArgPlaceholder() string
- func (d *Driver) DatabaseName() string
- func (d *Driver) GetExecutor() riverdriver.Executor
- func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener
- func (d *Driver) GetMigrationDefaultLines() []string
- func (d *Driver) GetMigrationFS(line string) fs.FS
- func (d *Driver) GetMigrationLines() []string
- func (d *Driver) GetMigrationTruncateTables(line string, version int) []string
- func (d *Driver) PoolIsSet() bool
- func (d *Driver) PoolSet(dbPool any) error
- func (d *Driver) SQLFragmentColumnIn(column string, values any) (string, any, error)
- func (d *Driver) SupportsListenNotify() bool
- func (d *Driver) SupportsListener() bool
- func (d *Driver) TimePrecision() time.Duration
- func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx
- func (d *Driver) UnwrapTx(execTx riverdriver.ExecutorTx) *sql.Tx
- type Executor
- func (e *Executor) Begin(ctx context.Context) (riverdriver.ExecutorTx, error)
- func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnExistsParams) (bool, error)
- func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error
- func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.IndexDropIfExistsParams) error
- func (e *Executor) IndexExists(ctx context.Context, params *riverdriver.IndexExistsParams) (bool, error)
- func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error
- func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)
- func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error)
- func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver.JobCountByAllStatesParams) (map[rivertype.JobState]int, error)
- func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error)
- func (e *Executor) JobCountByState(ctx context.Context, params *riverdriver.JobCountByStateParams) (int, error)
- func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteParams) (*rivertype.JobRow, error)
- func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error)
- func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDeleteManyParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error)
- func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGetByIDManyParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error)
- func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params *riverdriver.JobInsertFastManyParams) (int, error)
- func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobInsertFullParams) (*rivertype.JobRow, error)
- func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.JobInsertFullManyParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobKindList(ctx context.Context, params *riverdriver.JobKindListParams) ([]string, error)
- func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error)
- func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error)
- func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error)
- func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
- func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error)
- func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpdateFullParams) (*rivertype.JobRow, error)
- func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error)
- func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error)
- func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error)
- func (e *Executor) LeaderGetElectedLeader(ctx context.Context, params *riverdriver.LeaderGetElectedLeaderParams) (*riverdriver.Leader, error)
- func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderInsertParams) (*riverdriver.Leader, error)
- func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error)
- func (e *Executor) MigrationDeleteAssumingMainMany(ctx context.Context, params *riverdriver.MigrationDeleteAssumingMainManyParams) ([]*riverdriver.Migration, error)
- func (e *Executor) MigrationDeleteByLineAndVersionMany(ctx context.Context, ...) ([]*riverdriver.Migration, error)
- func (e *Executor) MigrationGetAllAssumingMain(ctx context.Context, params *riverdriver.MigrationGetAllAssumingMainParams) ([]*riverdriver.Migration, error)
- func (e *Executor) MigrationGetByLine(ctx context.Context, params *riverdriver.MigrationGetByLineParams) ([]*riverdriver.Migration, error)
- func (e *Executor) MigrationInsertMany(ctx context.Context, params *riverdriver.MigrationInsertManyParams) ([]*riverdriver.Migration, error)
- func (e *Executor) MigrationInsertManyAssumingMain(ctx context.Context, params *riverdriver.MigrationInsertManyAssumingMainParams) ([]*riverdriver.Migration, error)
- func (e *Executor) NotifyMany(ctx context.Context, params *riverdriver.NotifyManyParams) error
- func (e *Executor) PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}, error)
- func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverdriver.Row
- func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverdriver.QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error)
- func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error)
- func (e *Executor) QueueGet(ctx context.Context, params *riverdriver.QueueGetParams) (*rivertype.Queue, error)
- func (e *Executor) QueueList(ctx context.Context, params *riverdriver.QueueListParams) ([]*rivertype.Queue, error)
- func (e *Executor) QueueNameList(ctx context.Context, params *riverdriver.QueueNameListParams) ([]string, error)
- func (e *Executor) QueuePause(ctx context.Context, params *riverdriver.QueuePauseParams) error
- func (e *Executor) QueueResume(ctx context.Context, params *riverdriver.QueueResumeParams) error
- func (e *Executor) QueueUpdate(ctx context.Context, params *riverdriver.QueueUpdateParams) (*rivertype.Queue, error)
- func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error
- func (e *Executor) SchemaDrop(ctx context.Context, params *riverdriver.SchemaDropParams) error
- func (e *Executor) SchemaGetExpired(ctx context.Context, params *riverdriver.SchemaGetExpiredParams) ([]string, error)
- func (e *Executor) TableExists(ctx context.Context, params *riverdriver.TableExistsParams) (bool, error)
- func (e *Executor) TableTruncate(ctx context.Context, params *riverdriver.TableTruncateParams) error
- type ExecutorSubTx
- type ExecutorTx
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Driver ¶
type Driver struct {
// contains filtered or unexported fields
}
Driver is an implementation of riverdriver.Driver for database/sql.
func New ¶
New returns a new SQLite driver for use with River. It also works with libSQL (a SQLite fork).
It takes an sql.DB to use for use with River. The pool should already be configured to use the schema specified in the client's Schema field. The pool must not be closed while associated River objects are running.
func (*Driver) ArgPlaceholder ¶
func (*Driver) DatabaseName ¶
func (*Driver) GetExecutor ¶
func (d *Driver) GetExecutor() riverdriver.Executor
func (*Driver) GetListener ¶
func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener
func (*Driver) GetMigrationDefaultLines ¶
func (*Driver) GetMigrationLines ¶
func (*Driver) GetMigrationTruncateTables ¶
func (*Driver) SQLFragmentColumnIn ¶
func (*Driver) SupportsListenNotify ¶
func (*Driver) SupportsListener ¶
func (*Driver) TimePrecision ¶
func (*Driver) UnwrapExecutor ¶
func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx
func (*Driver) UnwrapTx ¶
func (d *Driver) UnwrapTx(execTx riverdriver.ExecutorTx) *sql.Tx
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func (*Executor) Begin ¶
func (e *Executor) Begin(ctx context.Context) (riverdriver.ExecutorTx, error)
func (*Executor) ColumnExists ¶
func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnExistsParams) (bool, error)
func (*Executor) IndexDropIfExists ¶
func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.IndexDropIfExistsParams) error
func (*Executor) IndexExists ¶
func (e *Executor) IndexExists(ctx context.Context, params *riverdriver.IndexExistsParams) (bool, error)
func (*Executor) IndexReindex ¶
func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error
func (*Executor) IndexesExist ¶ added in v0.24.0
func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)
func (*Executor) JobCancel ¶
func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error)
func (*Executor) JobCountByAllStates ¶ added in v0.24.0
func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver.JobCountByAllStatesParams) (map[rivertype.JobState]int, error)
func (*Executor) JobCountByQueueAndState ¶ added in v0.24.0
func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error)
func (*Executor) JobCountByState ¶
func (e *Executor) JobCountByState(ctx context.Context, params *riverdriver.JobCountByStateParams) (int, error)
func (*Executor) JobDelete ¶
func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteParams) (*rivertype.JobRow, error)
func (*Executor) JobDeleteBefore ¶
func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error)
func (*Executor) JobDeleteMany ¶ added in v0.24.0
func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDeleteManyParams) ([]*rivertype.JobRow, error)
func (*Executor) JobGetAvailable ¶
func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)
func (*Executor) JobGetByID ¶
func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error)
func (*Executor) JobGetByIDMany ¶
func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGetByIDManyParams) ([]*rivertype.JobRow, error)
func (*Executor) JobGetByKindMany ¶
func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error)
func (*Executor) JobGetStuck ¶
func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error)
func (*Executor) JobInsertFastMany ¶
func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error)
func (*Executor) JobInsertFastManyNoReturning ¶
func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params *riverdriver.JobInsertFastManyParams) (int, error)
func (*Executor) JobInsertFull ¶
func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobInsertFullParams) (*rivertype.JobRow, error)
func (*Executor) JobInsertFullMany ¶
func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.JobInsertFullManyParams) ([]*rivertype.JobRow, error)
func (*Executor) JobKindList ¶ added in v0.24.0
func (e *Executor) JobKindList(ctx context.Context, params *riverdriver.JobKindListParams) ([]string, error)
func (*Executor) JobList ¶
func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error)
func (*Executor) JobRescueMany ¶
func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error)
func (*Executor) JobRetry ¶
func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error)
func (*Executor) JobSchedule ¶
func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error)
func (*Executor) JobSetStateIfRunningMany ¶
func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
func (*Executor) JobUpdate ¶
func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error)
func (*Executor) JobUpdateFull ¶ added in v0.29.0
func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpdateFullParams) (*rivertype.JobRow, error)
func (*Executor) LeaderAttemptElect ¶
func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error)
func (*Executor) LeaderAttemptReelect ¶
func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error)
func (*Executor) LeaderDeleteExpired ¶
func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error)
func (*Executor) LeaderGetElectedLeader ¶
func (e *Executor) LeaderGetElectedLeader(ctx context.Context, params *riverdriver.LeaderGetElectedLeaderParams) (*riverdriver.Leader, error)
func (*Executor) LeaderInsert ¶
func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderInsertParams) (*riverdriver.Leader, error)
func (*Executor) LeaderResign ¶
func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error)
func (*Executor) MigrationDeleteAssumingMainMany ¶
func (e *Executor) MigrationDeleteAssumingMainMany(ctx context.Context, params *riverdriver.MigrationDeleteAssumingMainManyParams) ([]*riverdriver.Migration, error)
func (*Executor) MigrationDeleteByLineAndVersionMany ¶
func (e *Executor) MigrationDeleteByLineAndVersionMany(ctx context.Context, params *riverdriver.MigrationDeleteByLineAndVersionManyParams) ([]*riverdriver.Migration, error)
func (*Executor) MigrationGetAllAssumingMain ¶
func (e *Executor) MigrationGetAllAssumingMain(ctx context.Context, params *riverdriver.MigrationGetAllAssumingMainParams) ([]*riverdriver.Migration, error)
func (*Executor) MigrationGetByLine ¶
func (e *Executor) MigrationGetByLine(ctx context.Context, params *riverdriver.MigrationGetByLineParams) ([]*riverdriver.Migration, error)
func (*Executor) MigrationInsertMany ¶
func (e *Executor) MigrationInsertMany(ctx context.Context, params *riverdriver.MigrationInsertManyParams) ([]*riverdriver.Migration, error)
func (*Executor) MigrationInsertManyAssumingMain ¶
func (e *Executor) MigrationInsertManyAssumingMain(ctx context.Context, params *riverdriver.MigrationInsertManyAssumingMainParams) ([]*riverdriver.Migration, error)
func (*Executor) NotifyMany ¶
func (e *Executor) NotifyMany(ctx context.Context, params *riverdriver.NotifyManyParams) error
func (*Executor) PGAdvisoryXactLock ¶
func (*Executor) QueueCreateOrSetUpdatedAt ¶
func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverdriver.QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error)
func (*Executor) QueueDeleteExpired ¶
func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error)
func (*Executor) QueueGet ¶
func (e *Executor) QueueGet(ctx context.Context, params *riverdriver.QueueGetParams) (*rivertype.Queue, error)
func (*Executor) QueueList ¶
func (e *Executor) QueueList(ctx context.Context, params *riverdriver.QueueListParams) ([]*rivertype.Queue, error)
func (*Executor) QueueNameList ¶ added in v0.24.0
func (e *Executor) QueueNameList(ctx context.Context, params *riverdriver.QueueNameListParams) ([]string, error)
func (*Executor) QueuePause ¶
func (e *Executor) QueuePause(ctx context.Context, params *riverdriver.QueuePauseParams) error
func (*Executor) QueueResume ¶
func (e *Executor) QueueResume(ctx context.Context, params *riverdriver.QueueResumeParams) error
func (*Executor) QueueUpdate ¶
func (e *Executor) QueueUpdate(ctx context.Context, params *riverdriver.QueueUpdateParams) (*rivertype.Queue, error)
func (*Executor) SchemaCreate ¶
func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error
func (*Executor) SchemaDrop ¶
func (e *Executor) SchemaDrop(ctx context.Context, params *riverdriver.SchemaDropParams) error
func (*Executor) SchemaGetExpired ¶
func (e *Executor) SchemaGetExpired(ctx context.Context, params *riverdriver.SchemaGetExpiredParams) ([]string, error)
func (*Executor) TableExists ¶
func (e *Executor) TableExists(ctx context.Context, params *riverdriver.TableExistsParams) (bool, error)
func (*Executor) TableTruncate ¶
func (e *Executor) TableTruncate(ctx context.Context, params *riverdriver.TableTruncateParams) error
type ExecutorSubTx ¶
type ExecutorSubTx struct {
Executor
// contains filtered or unexported fields
}
func (*ExecutorSubTx) Begin ¶
func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error)
type ExecutorTx ¶
type ExecutorTx struct {
Executor
// contains filtered or unexported fields
}
func (*ExecutorTx) Begin ¶
func (t *ExecutorTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, error)