riversqlite

package module
v0.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: MPL-2.0 Imports: 31 Imported by: 3

Documentation

Overview

Package riversqlite provides a River driver implementation for SQLite. It's also tested against libSQL (a SQLite fork), and that should continue to work as long they keep to their commitment in maintaining API compatibility.

This driver is currently in early testing. It's exercised reasonably thoroughly in the test suite, but has minimal real world use as of yet.

River makes extensive use of internal operations that might run in parallel, which doesn't naturally play well with SQLite, which only allows one operation at a time, returning errors like "database is locked (5) (SQLITE_BUSY)" in case another tries to access it. A good workaround to avoid errors is to set the maximum pool size to one connection like `dbPool.SetMaxOpenConns(1)`.

A known deficiency in this driver compared to Postgres is that due to limitations in sqlc, it performs operations like completion and `InsertMany` one row at a time instead of in batches. This means that it's slower than the Postgres driver, especially when benchmarking.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

Driver is an implementation of riverdriver.Driver for database/sql.

func New

func New(dbPool *sql.DB) *Driver

New returns a new SQLite driver for use with River. It also works with libSQL (a SQLite fork).

It takes an sql.DB to use for use with River. The pool should already be configured to use the schema specified in the client's Schema field. The pool must not be closed while associated River objects are running.

func (*Driver) ArgPlaceholder

func (d *Driver) ArgPlaceholder() string

func (*Driver) DatabaseName

func (d *Driver) DatabaseName() string

func (*Driver) GetExecutor

func (d *Driver) GetExecutor() riverdriver.Executor

func (*Driver) GetListener

func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener

func (*Driver) GetMigrationDefaultLines

func (d *Driver) GetMigrationDefaultLines() []string

func (*Driver) GetMigrationFS

func (d *Driver) GetMigrationFS(line string) fs.FS

func (*Driver) GetMigrationLines

func (d *Driver) GetMigrationLines() []string

func (*Driver) GetMigrationTruncateTables

func (d *Driver) GetMigrationTruncateTables(line string, version int) []string

func (*Driver) PoolIsSet

func (d *Driver) PoolIsSet() bool

func (*Driver) PoolSet

func (d *Driver) PoolSet(dbPool any) error

func (*Driver) SQLFragmentColumnIn

func (d *Driver) SQLFragmentColumnIn(column string, values any) (string, any, error)

func (*Driver) SupportsListenNotify

func (d *Driver) SupportsListenNotify() bool

func (*Driver) SupportsListener

func (d *Driver) SupportsListener() bool

func (*Driver) TimePrecision

func (d *Driver) TimePrecision() time.Duration

func (*Driver) UnwrapExecutor

func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx

func (*Driver) UnwrapTx

func (d *Driver) UnwrapTx(execTx riverdriver.ExecutorTx) *sql.Tx

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func (*Executor) Begin

func (*Executor) ColumnExists

func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnExistsParams) (bool, error)

func (*Executor) Exec

func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error

func (*Executor) IndexDropIfExists

func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.IndexDropIfExistsParams) error

func (*Executor) IndexExists

func (e *Executor) IndexExists(ctx context.Context, params *riverdriver.IndexExistsParams) (bool, error)

func (*Executor) IndexReindex

func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error

func (*Executor) IndexesExist added in v0.24.0

func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)

func (*Executor) JobCancel

func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error)

func (*Executor) JobCountByAllStates added in v0.24.0

func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver.JobCountByAllStatesParams) (map[rivertype.JobState]int, error)

func (*Executor) JobCountByQueueAndState added in v0.24.0

func (*Executor) JobCountByState

func (e *Executor) JobCountByState(ctx context.Context, params *riverdriver.JobCountByStateParams) (int, error)

func (*Executor) JobDelete

func (e *Executor) JobDelete(ctx context.Context, params *riverdriver.JobDeleteParams) (*rivertype.JobRow, error)

func (*Executor) JobDeleteBefore

func (e *Executor) JobDeleteBefore(ctx context.Context, params *riverdriver.JobDeleteBeforeParams) (int, error)

func (*Executor) JobDeleteMany added in v0.24.0

func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDeleteManyParams) ([]*rivertype.JobRow, error)

func (*Executor) JobGetAvailable

func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)

func (*Executor) JobGetByID

func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error)

func (*Executor) JobGetByIDMany

func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGetByIDManyParams) ([]*rivertype.JobRow, error)

func (*Executor) JobGetByKindMany

func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error)

func (*Executor) JobGetStuck

func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error)

func (*Executor) JobInsertFastMany

func (*Executor) JobInsertFastManyNoReturning

func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params *riverdriver.JobInsertFastManyParams) (int, error)

func (*Executor) JobInsertFull

func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobInsertFullParams) (*rivertype.JobRow, error)

func (*Executor) JobInsertFullMany

func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.JobInsertFullManyParams) ([]*rivertype.JobRow, error)

func (*Executor) JobKindList added in v0.24.0

func (e *Executor) JobKindList(ctx context.Context, params *riverdriver.JobKindListParams) ([]string, error)

func (*Executor) JobList

func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error)

func (*Executor) JobRescueMany

func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error)

func (*Executor) JobRetry

func (e *Executor) JobRetry(ctx context.Context, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error)

func (*Executor) JobSchedule

func (*Executor) JobSetStateIfRunningMany

func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)

func (*Executor) JobUpdate

func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error)

func (*Executor) JobUpdateFull added in v0.29.0

func (e *Executor) JobUpdateFull(ctx context.Context, params *riverdriver.JobUpdateFullParams) (*rivertype.JobRow, error)

func (*Executor) LeaderAttemptElect

func (e *Executor) LeaderAttemptElect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error)

func (*Executor) LeaderAttemptReelect

func (e *Executor) LeaderAttemptReelect(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error)

func (*Executor) LeaderDeleteExpired

func (e *Executor) LeaderDeleteExpired(ctx context.Context, params *riverdriver.LeaderDeleteExpiredParams) (int, error)

func (*Executor) LeaderGetElectedLeader

func (e *Executor) LeaderGetElectedLeader(ctx context.Context, params *riverdriver.LeaderGetElectedLeaderParams) (*riverdriver.Leader, error)

func (*Executor) LeaderInsert

func (e *Executor) LeaderInsert(ctx context.Context, params *riverdriver.LeaderInsertParams) (*riverdriver.Leader, error)

func (*Executor) LeaderResign

func (e *Executor) LeaderResign(ctx context.Context, params *riverdriver.LeaderResignParams) (bool, error)

func (*Executor) MigrationDeleteAssumingMainMany

func (e *Executor) MigrationDeleteAssumingMainMany(ctx context.Context, params *riverdriver.MigrationDeleteAssumingMainManyParams) ([]*riverdriver.Migration, error)

func (*Executor) MigrationDeleteByLineAndVersionMany

func (e *Executor) MigrationDeleteByLineAndVersionMany(ctx context.Context, params *riverdriver.MigrationDeleteByLineAndVersionManyParams) ([]*riverdriver.Migration, error)

func (*Executor) MigrationGetAllAssumingMain

func (e *Executor) MigrationGetAllAssumingMain(ctx context.Context, params *riverdriver.MigrationGetAllAssumingMainParams) ([]*riverdriver.Migration, error)

func (*Executor) MigrationGetByLine

func (e *Executor) MigrationGetByLine(ctx context.Context, params *riverdriver.MigrationGetByLineParams) ([]*riverdriver.Migration, error)

func (*Executor) MigrationInsertMany

func (e *Executor) MigrationInsertMany(ctx context.Context, params *riverdriver.MigrationInsertManyParams) ([]*riverdriver.Migration, error)

func (*Executor) MigrationInsertManyAssumingMain

func (e *Executor) MigrationInsertManyAssumingMain(ctx context.Context, params *riverdriver.MigrationInsertManyAssumingMainParams) ([]*riverdriver.Migration, error)

func (*Executor) NotifyMany

func (e *Executor) NotifyMany(ctx context.Context, params *riverdriver.NotifyManyParams) error

func (*Executor) PGAdvisoryXactLock

func (e *Executor) PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}, error)

func (*Executor) QueryRow

func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverdriver.Row

func (*Executor) QueueCreateOrSetUpdatedAt

func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverdriver.QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error)

func (*Executor) QueueDeleteExpired

func (e *Executor) QueueDeleteExpired(ctx context.Context, params *riverdriver.QueueDeleteExpiredParams) ([]string, error)

func (*Executor) QueueGet

func (e *Executor) QueueGet(ctx context.Context, params *riverdriver.QueueGetParams) (*rivertype.Queue, error)

func (*Executor) QueueList

func (e *Executor) QueueList(ctx context.Context, params *riverdriver.QueueListParams) ([]*rivertype.Queue, error)

func (*Executor) QueueNameList added in v0.24.0

func (e *Executor) QueueNameList(ctx context.Context, params *riverdriver.QueueNameListParams) ([]string, error)

func (*Executor) QueuePause

func (e *Executor) QueuePause(ctx context.Context, params *riverdriver.QueuePauseParams) error

func (*Executor) QueueResume

func (e *Executor) QueueResume(ctx context.Context, params *riverdriver.QueueResumeParams) error

func (*Executor) QueueUpdate

func (e *Executor) QueueUpdate(ctx context.Context, params *riverdriver.QueueUpdateParams) (*rivertype.Queue, error)

func (*Executor) SchemaCreate

func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error

func (*Executor) SchemaDrop

func (e *Executor) SchemaDrop(ctx context.Context, params *riverdriver.SchemaDropParams) error

func (*Executor) SchemaGetExpired

func (e *Executor) SchemaGetExpired(ctx context.Context, params *riverdriver.SchemaGetExpiredParams) ([]string, error)

func (*Executor) TableExists

func (e *Executor) TableExists(ctx context.Context, params *riverdriver.TableExistsParams) (bool, error)

func (*Executor) TableTruncate

func (e *Executor) TableTruncate(ctx context.Context, params *riverdriver.TableTruncateParams) error

type ExecutorSubTx

type ExecutorSubTx struct {
	Executor
	// contains filtered or unexported fields
}

func (*ExecutorSubTx) Begin

func (*ExecutorSubTx) Commit

func (t *ExecutorSubTx) Commit(ctx context.Context) error

func (*ExecutorSubTx) Rollback

func (t *ExecutorSubTx) Rollback(ctx context.Context) error

type ExecutorTx

type ExecutorTx struct {
	Executor
	// contains filtered or unexported fields
}

func (*ExecutorTx) Begin

func (*ExecutorTx) Commit

func (t *ExecutorTx) Commit(ctx context.Context) error

func (*ExecutorTx) Rollback

func (t *ExecutorTx) Rollback(ctx context.Context) error

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL