Documentation
¶
Overview ¶
Package riverdriver exposes generic constructs to be implemented by specific drivers that wrap third party database packages, with the aim being to keep the main River interface decoupled from a specific database package so that other packages or other major versions of packages can be supported in future River versions.
River currently only supports Pgx v5, and the interface here wrap it with only the thinnest possible layer. Adding support for alternate packages will require the interface to change substantially, and therefore it should not be implemented or invoked by user code. Changes to interfaces in this package WILL NOT be considered breaking changes for purposes of River's semantic versioning.
Index ¶
- Constants
- Variables
- func MigrationLineMainTruncateTables(version int) []string
- type ColumnExistsParams
- type Driver
- type Executor
- type ExecutorTx
- type GetListenenerParams
- type IndexDropIfExistsParams
- type IndexExistsParams
- type IndexReindexParams
- type IndexesExistParams
- type JobCancelParams
- type JobCountByAllStatesParams
- type JobCountByQueueAndStateParams
- type JobCountByQueueAndStateResult
- type JobCountByStateParams
- type JobDeleteBeforeParams
- type JobDeleteManyParams
- type JobDeleteParams
- type JobGetAvailableParams
- type JobGetByIDManyParams
- type JobGetByIDParams
- type JobGetByKindManyParams
- type JobGetStuckParams
- type JobInsertFastManyParams
- type JobInsertFastParams
- type JobInsertFastResult
- type JobInsertFullManyParams
- type JobInsertFullParams
- type JobKindListParams
- type JobListParams
- type JobRescueManyParams
- type JobRetryParams
- type JobScheduleParams
- type JobScheduleResult
- type JobSetStateIfRunningManyParams
- type JobSetStateIfRunningParams
- func JobSetStateCancelled(id int64, finalizedAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams
- func JobSetStateCompleted(id int64, finalizedAt time.Time, metadataUpdates []byte) *JobSetStateIfRunningParams
- func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams
- func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams
- func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams
- func JobSetStateSnoozed(id int64, scheduledAt time.Time, attempt int, metadataUpdates []byte) *JobSetStateIfRunningParams
- func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, attempt int, metadataUpdates []byte) *JobSetStateIfRunningParams
- type JobUpdateParams
- type Leader
- type LeaderDeleteExpiredParams
- type LeaderElectParams
- type LeaderGetElectedLeaderParams
- type LeaderInsertParams
- type LeaderResignParams
- type Listener
- type Migration
- type MigrationDeleteAssumingMainManyParams
- type MigrationDeleteByLineAndVersionManyParams
- type MigrationGetAllAssumingMainParams
- type MigrationGetByLineParams
- type MigrationInsertManyAssumingMainParams
- type MigrationInsertManyParams
- type Notification
- type NotifyManyParams
- type ProducerKeepAliveParams
- type QueueCreateOrSetUpdatedAtParams
- type QueueDeleteExpiredParams
- type QueueGetParams
- type QueueListParams
- type QueueNameListParams
- type QueuePauseParams
- type QueueResumeParams
- type QueueUpdateParams
- type Row
- type Schema
- type SchemaCreateParams
- type SchemaDropParams
- type SchemaGetExpiredParams
- type TableExistsParams
- type TableTruncateParams
Constants ¶
const AllQueuesString = "*"
const MigrationLineMain = "main"
Variables ¶
var ( ErrClosedPool = errors.New("underlying driver pool is closed") ErrNotImplemented = errors.New("driver does not implement this functionality") )
Functions ¶
func MigrationLineMainTruncateTables ¶ added in v0.23.0
MigrationLineMainTruncateTables is a shared helper that produces tables to truncate for the main migration line. It's reused across all drivers.
API is not stable. DO NOT USE.
Types ¶
type ColumnExistsParams ¶ added in v0.21.0
type Driver ¶
type Driver[TTx any] interface { // ArgPlaceholder is the placeholder character used in query positional // arguments, so "$" for "$1", "$2", "$3", etc. This is a "$" for Postgres // and "?" for SQLite. // // API is not stable. DO NOT USE. ArgPlaceholder() string // DatabaseName is the name of the database that the driver targets like // "postgres" or "sqlite". This is used for purposes like a cache key prefix // in riverdbtest so that multiple drivers may share schemas as long as they // target the same database. // // API is not stable. DO NOT USE. DatabaseName() string // GetExecutor gets an executor for the driver. // // API is not stable. DO NOT USE. GetExecutor() Executor // GetListener gets a listener for purposes of receiving notifications. // // API is not stable. DO NOT USE. GetListener(params *GetListenenerParams) Listener // GetMigrationDefaultLines gets default migration lines that should be // applied when using this driver. This is mainly used by riverdbtest to // figure out what migration lines should be available by default for new // test schemas. // // API is not stable. DO NOT USE. GetMigrationDefaultLines() []string // GetMigrationFS gets a filesystem containing migrations for the driver. // // Each set of migration files is expected to exist within the filesystem as // `migration/<line>/`. For example: // // migration/main/001_create_river_migration.up.sql // // API is not stable. DO NOT USE. GetMigrationFS(line string) fs.FS // GetMigrationLines gets supported migration lines from the driver. Most // drivers will only support a single line: MigrationLineMain. // // API is not stable. DO NOT USE. GetMigrationLines() []string // GetMigrationTruncateTables gets the tables that should be truncated // before or after tests for a specific migration line returned by this // driver. Tables to truncate doesn't need to consider intermediary states, // and should return tables for the latest migration version. // // API is not stable. DO NOT USE. GetMigrationTruncateTables(line string, version int) []string // PoolIsSet returns true if the driver is configured with a database pool. // // API is not stable. DO NOT USE. PoolIsSet() bool // PoolSet sets a database pool into a driver will a nil pool. This is meant // only for use in testing, and only in specific circumstances where it's // needed. The pool in a driver should generally be treated as immutable // because it's inherited by driver executors, and changing if when active // executors exist will cause problems. // // Most drivers don't implement this function and return ErrNotImplemented. // // Drivers should only set a pool if the previous pool was nil (to help root // out bugs where something unexpected is happening), and panic in case a // pool is set to a driver twice. // // API is not stable. DO NOT USE. PoolSet(dbPool any) error // SQLFragmentColumnIn generates an SQL fragment to be included as a // predicate in a `WHERE` query for the existence of a set of values in a // column like `id IN (...)`. The actual implementation depends on support // for specific data types. Postgres uses arrays while SQLite uses a JSON // fragment with `json_each`. // // API is not stable. DO NOT USE. SQLFragmentColumnIn(column string, values any) (string, any, error) // SupportsListener gets whether this driver supports a listener. Drivers // that don't support a listener support poll only mode only. // // API is not stable. DO NOT USE. SupportsListener() bool // SupportsListenNotify indicates whether the underlying database supports // listen/notify. This differs from SupportsListener in that even if a // driver doesn't a support a listener but the database supports the // underlying listen/notify mechanism, it will still broadcast in case there // are other clients/drivers on the database that do support a listener. If // listen/notify can't be supported at all, no broadcast attempt is made. // // API is not stable. DO NOT USE. SupportsListenNotify() bool // TimePrecision returns the maximum time resolution supported by the // database. This is used in test assertions when checking round trips on // timestamps. // // API is not stable. DO NOT USE. TimePrecision() time.Duration // UnwrapExecutor gets an executor from a driver transaction. // // API is not stable. DO NOT USE. UnwrapExecutor(tx TTx) ExecutorTx // UnwrapTx gets a driver transaction from an executor. This is currently // only needed for test transaction helpers. // // API is not stable. DO NOT USE. UnwrapTx(execTx ExecutorTx) TTx }
Driver provides a database driver for use with river.Client.
Its purpose is to wrap the interface of a third party database package, with the aim being to keep the main River interface decoupled from a specific database package so that other packages or major versions of packages can be supported in future River versions.
River currently only supports Pgx v5, and this interface wraps it with only the thinnest possible layer. Adding support for alternate packages will require it to change substantially, and therefore it should not be implemented or invoked by user code. Changes to this interface WILL NOT be considered breaking changes for purposes of River's semantic versioning.
API is not stable. DO NOT IMPLEMENT.
type Executor ¶
type Executor interface {
// Begin begins a new subtransaction. ErrSubTxNotSupported may be returned
// if the executor is a transaction and the driver doesn't support
// subtransactions (like riverdriver/riverdatabasesql for database/sql).
Begin(ctx context.Context) (ExecutorTx, error)
// ColumnExists checks whether a column for a particular table exists for
// the schema in the current search schema.
ColumnExists(ctx context.Context, params *ColumnExistsParams) (bool, error)
// Exec executes raw SQL. Used for migrations.
Exec(ctx context.Context, sql string, args ...any) error
// IndexDropIfExists drops a database index if exists. This abstraction is a
// little leaky right now because Postgres runs this `CONCURRENTLY` and
// that's not possible in SQLite.
//
// API is not stable. DO NOT USE.
IndexDropIfExists(ctx context.Context, params *IndexDropIfExistsParams) error
IndexExists(ctx context.Context, params *IndexExistsParams) (bool, error)
IndexesExist(ctx context.Context, params *IndexesExistParams) (map[string]bool, error)
// IndexReindex reindexes a database index. This abstraction is a little
// leaky right now because Postgres runs this `CONCURRENTLY` and that's not
// possible in SQLite.
//
// API is not stable. DO NOT USE.
IndexReindex(ctx context.Context, params *IndexReindexParams) error
JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error)
JobCountByAllStates(ctx context.Context, params *JobCountByAllStatesParams) (map[rivertype.JobState]int, error)
JobCountByQueueAndState(ctx context.Context, params *JobCountByQueueAndStateParams) ([]*JobCountByQueueAndStateResult, error)
JobCountByState(ctx context.Context, params *JobCountByStateParams) (int, error)
JobDelete(ctx context.Context, params *JobDeleteParams) (*rivertype.JobRow, error)
JobDeleteBefore(ctx context.Context, params *JobDeleteBeforeParams) (int, error)
JobDeleteMany(ctx context.Context, params *JobDeleteManyParams) ([]*rivertype.JobRow, error)
JobGetAvailable(ctx context.Context, params *JobGetAvailableParams) ([]*rivertype.JobRow, error)
JobGetByID(ctx context.Context, params *JobGetByIDParams) (*rivertype.JobRow, error)
JobGetByIDMany(ctx context.Context, params *JobGetByIDManyParams) ([]*rivertype.JobRow, error)
JobGetByKindMany(ctx context.Context, params *JobGetByKindManyParams) ([]*rivertype.JobRow, error)
JobGetStuck(ctx context.Context, params *JobGetStuckParams) ([]*rivertype.JobRow, error)
JobInsertFastMany(ctx context.Context, params *JobInsertFastManyParams) ([]*JobInsertFastResult, error)
JobInsertFastManyNoReturning(ctx context.Context, params *JobInsertFastManyParams) (int, error)
JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error)
JobInsertFullMany(ctx context.Context, jobs *JobInsertFullManyParams) ([]*rivertype.JobRow, error)
JobKindList(ctx context.Context, params *JobKindListParams) ([]string, error)
JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error)
JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error)
JobRetry(ctx context.Context, params *JobRetryParams) (*rivertype.JobRow, error)
JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error)
JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error)
JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error)
LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error)
LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error)
LeaderDeleteExpired(ctx context.Context, params *LeaderDeleteExpiredParams) (int, error)
LeaderGetElectedLeader(ctx context.Context, params *LeaderGetElectedLeaderParams) (*Leader, error)
LeaderInsert(ctx context.Context, params *LeaderInsertParams) (*Leader, error)
LeaderResign(ctx context.Context, params *LeaderResignParams) (bool, error)
// MigrationDeleteAssumingMainMany deletes many migrations assuming
// everything is on the main line. This is suitable for use in databases on
// a version before the `line` column exists.
MigrationDeleteAssumingMainMany(ctx context.Context, params *MigrationDeleteAssumingMainManyParams) ([]*Migration, error)
// MigrationDeleteByLineAndVersionMany deletes many migration versions on a
// particular line.
MigrationDeleteByLineAndVersionMany(ctx context.Context, params *MigrationDeleteByLineAndVersionManyParams) ([]*Migration, error)
// MigrationGetAllAssumingMain gets all migrations assuming everything is on
// the main line. This is suitable for use in databases on a version before
// the `line` column exists.
MigrationGetAllAssumingMain(ctx context.Context, params *MigrationGetAllAssumingMainParams) ([]*Migration, error)
// MigrationGetByLine gets all currently applied migrations.
MigrationGetByLine(ctx context.Context, params *MigrationGetByLineParams) ([]*Migration, error)
// MigrationInsertMany inserts many migration versions.
MigrationInsertMany(ctx context.Context, params *MigrationInsertManyParams) ([]*Migration, error)
// MigrationInsertManyAssumingMain inserts many migrations, assuming they're
// on the main line. This operation is necessary for compatibility before
// the `line` column was added to the migrations table.
MigrationInsertManyAssumingMain(ctx context.Context, params *MigrationInsertManyAssumingMainParams) ([]*Migration, error)
NotifyMany(ctx context.Context, params *NotifyManyParams) error
PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}, error)
QueueCreateOrSetUpdatedAt(ctx context.Context, params *QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error)
QueueDeleteExpired(ctx context.Context, params *QueueDeleteExpiredParams) ([]string, error)
QueueGet(ctx context.Context, params *QueueGetParams) (*rivertype.Queue, error)
QueueList(ctx context.Context, params *QueueListParams) ([]*rivertype.Queue, error)
QueueNameList(ctx context.Context, params *QueueNameListParams) ([]string, error)
QueuePause(ctx context.Context, params *QueuePauseParams) error
QueueResume(ctx context.Context, params *QueueResumeParams) error
QueueUpdate(ctx context.Context, params *QueueUpdateParams) (*rivertype.Queue, error)
QueryRow(ctx context.Context, sql string, args ...any) Row
SchemaCreate(ctx context.Context, params *SchemaCreateParams) error
SchemaDrop(ctx context.Context, params *SchemaDropParams) error
SchemaGetExpired(ctx context.Context, params *SchemaGetExpiredParams) ([]string, error)
// TableExists checks whether a table exists for the schema in the current
// search schema.
TableExists(ctx context.Context, params *TableExistsParams) (bool, error)
TableTruncate(ctx context.Context, params *TableTruncateParams) error
}
Executor provides River operations against a database. It may be a database pool or transaction.
API is not stable. DO NOT IMPLEMENT.
type ExecutorTx ¶
type ExecutorTx interface {
Executor
// Commit commits the transaction.
//
// API is not stable. DO NOT USE.
Commit(ctx context.Context) error
// Rollback rolls back the transaction.
//
// API is not stable. DO NOT USE.
Rollback(ctx context.Context) error
}
ExecutorTx is an executor which is a transaction. In addition to standard Executor operations, it may be committed or rolled back.
API is not stable. DO NOT IMPLEMENT.
type GetListenenerParams ¶ added in v0.21.0
type GetListenenerParams struct {
Schema string
}
type IndexDropIfExistsParams ¶ added in v0.23.0
type IndexExistsParams ¶ added in v0.23.0
type IndexReindexParams ¶ added in v0.23.0
type IndexesExistParams ¶ added in v0.24.0
type JobCancelParams ¶ added in v0.0.23
type JobCountByAllStatesParams ¶ added in v0.24.0
type JobCountByAllStatesParams struct {
Schema string
}
type JobCountByQueueAndStateParams ¶ added in v0.24.0
type JobCountByQueueAndStateResult ¶ added in v0.24.0
type JobCountByStateParams ¶ added in v0.21.0
type JobDeleteBeforeParams ¶ added in v0.0.23
type JobDeleteManyParams ¶ added in v0.24.0
type JobDeleteParams ¶ added in v0.21.0
type JobGetAvailableParams ¶ added in v0.0.23
type JobGetByIDManyParams ¶ added in v0.21.0
type JobGetByIDParams ¶ added in v0.21.0
type JobGetByKindManyParams ¶ added in v0.21.0
type JobGetStuckParams ¶ added in v0.0.23
type JobInsertFastManyParams ¶ added in v0.21.0
type JobInsertFastManyParams struct {
Jobs []*JobInsertFastParams
Schema string
}
type JobInsertFastParams ¶ added in v0.0.23
type JobInsertFastParams struct {
ID *int64
// Args contains the raw underlying job arguments struct. It has already been
// encoded into EncodedArgs, but the original is kept here for to leverage its
// struct tags and interfaces, such as for use in unique key generation.
Args rivertype.JobArgs
CreatedAt *time.Time
EncodedArgs []byte
Kind string
MaxAttempts int
Metadata []byte
Priority int
Queue string
ScheduledAt *time.Time
State rivertype.JobState
Tags []string
UniqueKey []byte
UniqueStates byte
}
type JobInsertFastResult ¶ added in v0.12.0
type JobInsertFullManyParams ¶ added in v0.23.0
type JobInsertFullManyParams struct {
Jobs []*JobInsertFullParams
Schema string
}
type JobInsertFullParams ¶ added in v0.0.23
type JobInsertFullParams struct {
Attempt int
AttemptedAt *time.Time
AttemptedBy []string
CreatedAt *time.Time
EncodedArgs []byte
Errors [][]byte
FinalizedAt *time.Time
Kind string
MaxAttempts int
Metadata []byte
Priority int
Queue string
ScheduledAt *time.Time
Schema string
State rivertype.JobState
Tags []string
UniqueKey []byte
UniqueStates byte
}
type JobKindListParams ¶ added in v0.24.0
type JobListParams ¶ added in v0.19.0
type JobRescueManyParams ¶ added in v0.0.23
type JobRetryParams ¶ added in v0.21.0
type JobScheduleParams ¶ added in v0.0.23
type JobScheduleResult ¶ added in v0.5.0
type JobSetStateIfRunningManyParams ¶ added in v0.12.1
type JobSetStateIfRunningManyParams struct {
ID []int64
Attempt []*int
ErrData [][]byte
FinalizedAt []*time.Time
MetadataDoMerge []bool
MetadataUpdates [][]byte
Now *time.Time
ScheduledAt []*time.Time
Schema string
State []rivertype.JobState
}
JobSetStateIfRunningManyParams are parameters to update the state of currently running jobs. Use one of the constructors below to ensure a correct combination of parameters.
type JobSetStateIfRunningParams ¶ added in v0.0.23
type JobSetStateIfRunningParams struct {
ID int64
Attempt *int
ErrData []byte
FinalizedAt *time.Time
MetadataDoMerge bool
MetadataUpdates []byte
ScheduledAt *time.Time
Schema string // added by completer
State rivertype.JobState
}
JobSetStateIfRunningParams are parameters to update the state of a currently running job. Use one of the constructors below to ensure a correct combination of parameters.
func JobSetStateCancelled ¶ added in v0.0.23
func JobSetStateCompleted ¶ added in v0.0.23
func JobSetStateCompleted(id int64, finalizedAt time.Time, metadataUpdates []byte) *JobSetStateIfRunningParams
func JobSetStateDiscarded ¶ added in v0.0.23
func JobSetStateErrorAvailable ¶ added in v0.0.23
func JobSetStateErrorRetryable ¶ added in v0.0.23
func JobSetStateSnoozed ¶ added in v0.0.23
func JobSetStateSnoozedAvailable ¶ added in v0.0.23
type JobUpdateParams ¶ added in v0.0.23
type JobUpdateParams struct {
ID int64
AttemptDoUpdate bool
Attempt int
AttemptedAtDoUpdate bool
AttemptedAt *time.Time
AttemptedByDoUpdate bool
AttemptedBy []string
ErrorsDoUpdate bool
Errors [][]byte
FinalizedAtDoUpdate bool
FinalizedAt *time.Time
Schema string
StateDoUpdate bool
State rivertype.JobState
// Deprecated and will be removed when advisory lock unique path is removed.
UniqueKeyDoUpdate bool
// Deprecated and will be removed when advisory lock unique path is removed.
UniqueKey []byte
}
type LeaderDeleteExpiredParams ¶ added in v0.21.0
type LeaderElectParams ¶ added in v0.0.23
type LeaderGetElectedLeaderParams ¶ added in v0.21.0
type LeaderGetElectedLeaderParams struct {
Schema string
}
type LeaderInsertParams ¶ added in v0.0.23
type LeaderResignParams ¶ added in v0.0.23
type Listener ¶ added in v0.0.23
type Listener interface {
Close(ctx context.Context) error
Connect(ctx context.Context) error
Listen(ctx context.Context, topic string) error
Ping(ctx context.Context) error
Schema() string
SetAfterConnectExec(sql string) // should only ever be used in testing
Unlisten(ctx context.Context, topic string) error
WaitForNotification(ctx context.Context) (*Notification, error)
}
Listener listens for notifications. In Postgres, this is a database connection where `LISTEN` has been run.
API is not stable. DO NOT IMPLEMENT.
type Migration ¶
type Migration struct {
// CreatedAt is when the migration was initially created.
//
// API is not stable. DO NOT USE.
CreatedAt time.Time
// Line is the migration line that the migration belongs to.
//
// API is not stable. DO NOT USE.
Line string
// Version is the version of the migration.
//
// API is not stable. DO NOT USE.
Version int
}
Migration represents a River migration.
API is not stable. DO NOT USE.
type MigrationDeleteAssumingMainManyParams ¶ added in v0.21.0
type MigrationDeleteByLineAndVersionManyParams ¶ added in v0.21.0
type MigrationGetAllAssumingMainParams ¶ added in v0.21.0
type MigrationGetAllAssumingMainParams struct {
Schema string
}
type MigrationGetByLineParams ¶ added in v0.21.0
type MigrationInsertManyAssumingMainParams ¶ added in v0.21.0
type MigrationInsertManyParams ¶ added in v0.21.0
type Notification ¶ added in v0.0.23
type NotifyManyParams ¶ added in v0.5.0
NotifyManyParams are parameters to issue many pubsub notifications all at once for a single topic.
type ProducerKeepAliveParams ¶ added in v0.20.0
type QueueCreateOrSetUpdatedAtParams ¶ added in v0.5.0
type QueueDeleteExpiredParams ¶ added in v0.5.0
type QueueGetParams ¶ added in v0.21.0
type QueueListParams ¶ added in v0.21.0
type QueueNameListParams ¶ added in v0.24.0
type QueuePauseParams ¶ added in v0.21.0
type QueueResumeParams ¶ added in v0.21.0
type QueueUpdateParams ¶ added in v0.20.0
type SchemaCreateParams ¶ added in v0.23.0
type SchemaCreateParams struct {
Schema string
}
type SchemaDropParams ¶ added in v0.23.0
type SchemaDropParams struct {
Schema string
}
type SchemaGetExpiredParams ¶ added in v0.21.0
type TableExistsParams ¶ added in v0.21.0
type TableTruncateParams ¶ added in v0.23.0
Directories
¶
| Path | Synopsis |
|---|---|
|
riverdatabasesql
module
|
|
|
riverdrivertest
module
|
|
|
riverpgxv5
module
|
|
|
riversqlite
module
|