Documentation
¶
Index ¶
- Variables
- func IsAlreadyExistsError(err error) bool
- func IsDatabaseError(err error) bool
- func IsForeignKeyViolationError(err error) bool
- func IsNotFoundError(err error) bool
- func WrapDBError(ctx context.Context, err error) error
- type DB
- type Logger
- type Pool
- type PostgresDB
- func (db *PostgresDB) Close()
- func (db *PostgresDB) Ping(ctx context.Context) error
- func (db *PostgresDB) Queue() QueueClient
- func (db *PostgresDB) RiverStore() RiverStore
- func (db *PostgresDB) RootStore() RootStore
- func (db *PostgresDB) StartQueue(ctx context.Context, serverConf *config.Server) error
- func (db *PostgresDB) StopQueue(ctx context.Context) error
- type PostgresRiverStore
- type PostgresRootStore
- type PostgresStore
- type QueueClient
- type RiverQueue
- func (q *RiverQueue) Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
- func (q *RiverQueue) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
- func (q *RiverQueue) Start(ctx context.Context) error
- func (q *RiverQueue) Stop(ctx context.Context) error
- type RiverStore
- type RootStore
- type Store
- type TransactionFunc
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotInitialized = errors.New("database connection not yet initialized") ErrNoTenantID = errors.New("no tenant ID in context") )
Common errors that can be checked with errors.Is .
var ( ErrNotFound = errors.New("resource not found") ErrAlreadyExists = errors.New("resource already exists") ErrForeignKeyViolation = errors.New("invalid reference to another resource") ErrDatabaseError = errors.New("database error") )
Domain-specific error types that clients can check with errors.Is.
var ( ErrQueueNotInitialized = errors.New("job queue not initialized") ErrUnknownAction = errors.New("unknown action type") )
Common queue-related errors.
Functions ¶
func IsAlreadyExistsError ¶
IsAlreadyExistsError checks if the error is an "already exists" error.
func IsDatabaseError ¶
IsDatabaseError checks if the error is a general database error.
func IsForeignKeyViolationError ¶
IsForeignKeyViolationError checks if the error is a foreign key violation.
func IsNotFoundError ¶
IsNotFoundError checks if the error is a "not found" error.
Types ¶
type DB ¶
type DB interface { RootStore() RootStore RiverStore() RiverStore Queue() QueueClient Ping(ctx context.Context) error Close() StartQueue(ctx context.Context, serverConf *config.Server) error StopQueue(ctx context.Context) error }
DB provides access to all database stores.
type Pool ¶
type Pool interface { BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) Ping(ctx context.Context) error Close() }
Pool represents a database connection pool with basic operations.
type PostgresDB ¶
type PostgresDB struct {
// contains filtered or unexported fields
}
PostgresDB implements the DB interface for PostgreSQL.
func (*PostgresDB) Ping ¶
func (db *PostgresDB) Ping(ctx context.Context) error
Ping verifies all database connections are working.
func (*PostgresDB) Queue ¶
func (db *PostgresDB) Queue() QueueClient
Queue returns the queue client.
func (*PostgresDB) RiverStore ¶
func (db *PostgresDB) RiverStore() RiverStore
RiverStore returns the river queue store.
func (*PostgresDB) RootStore ¶
func (db *PostgresDB) RootStore() RootStore
RootStore returns the root store.
func (*PostgresDB) StartQueue ¶
StartQueue initializes and starts the job queue.
type PostgresRiverStore ¶
type PostgresRiverStore struct {
// contains filtered or unexported fields
}
PostgresRiverStore implements the RiverStore interface.
func (*PostgresRiverStore) Close ¶
func (s *PostgresRiverStore) Close()
Close closes the database connection.
func (*PostgresRiverStore) Ping ¶
func (s *PostgresRiverStore) Ping(ctx context.Context) error
Ping verifies the database connection is working.
func (*PostgresRiverStore) Pool ¶
func (s *PostgresRiverStore) Pool() Pool
Pool returns the underlying connection pool.
type PostgresRootStore ¶
type PostgresRootStore struct {
PostgresStore
}
PostgresRootStore implements the RootStore interface.
type PostgresStore ¶
type PostgresStore struct {
// contains filtered or unexported fields
}
PostgresStore implements the Store interface with PostgreSQL.
func (*PostgresStore) Ping ¶
func (s *PostgresStore) Ping(ctx context.Context) error
Ping verifies the database connection is working.
func (*PostgresStore) Pool ¶
func (s *PostgresStore) Pool() Pool
Pool returns the underlying connection pool.
func (*PostgresStore) Queries ¶
func (s *PostgresStore) Queries() *sqlc.Queries
Queries returns the SQLC queries.
func (*PostgresStore) WithTransaction ¶
func (s *PostgresStore) WithTransaction(ctx context.Context, fn TransactionFunc) error
WithTransaction executes the given function within a transaction.
type QueueClient ¶
type QueueClient interface { // Job management. Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) InsertTx( ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts, ) (*rivertype.JobInsertResult, error) // Queue management. Start(ctx context.Context) error Stop(ctx context.Context) error }
QueueClient provides an interface for job queue operations.
type RiverQueue ¶
type RiverQueue struct {
// contains filtered or unexported fields
}
RiverQueue implements the QueueClient interface using River.
func NewQueue ¶
func NewQueue(ctx context.Context, pool *pgxpool.Pool, conf *config.Server, logger *slog.Logger) (*RiverQueue, error)
NewQueue creates a new queue client.
func (*RiverQueue) Insert ¶
func (q *RiverQueue) Insert( ctx context.Context, args river.JobArgs, opts *river.InsertOpts, ) (*rivertype.JobInsertResult, error)
Insert adds a job to the queue.
func (*RiverQueue) InsertTx ¶
func (q *RiverQueue) InsertTx( ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts, ) (*rivertype.JobInsertResult, error)
InsertTx adds a txn job to the queue.
type RiverStore ¶
RiverStore provides access to the River job queue database.
type RootStore ¶
type RootStore interface { Store }
RootStore provides root access to the database (bypassing tenant isolation).
type Store ¶
type Store interface { Queries() *sqlc.Queries Pool() Pool WithTransaction(ctx context.Context, fn TransactionFunc) error Ping(ctx context.Context) error Close() }
Store provides access to database operations with transaction support.
type TransactionFunc ¶
TransactionFunc is a function that executes within a transaction.