db

package
v0.0.0-...-4b578d8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

Database Package - db

This package provides database access for the github.com/AxelTahmid/tinker application. It manages PostgreSQL connections, transaction handling, error mapping, and background job processing.

Architecture

The package follows a layered design:

┌────────────────────────────────────────────────────────────┐
│                       High-level DB Interface              │
└───────────┬─────────────────┬───────────────┬──────────────┘
            │                 │               │
┌───────────▼────┐  ┌─────────▼────┐  ┌───────▼─────┐
│   RootStore    │  │  RiverStore  │  │ QueueClient │
└───────┬────────┘  └──────┬───────┘  └──────┬──────┘
        │                  │                 │
┌───────▼──────────────────▼─────────────────▼───────────────┐
│                       PostgreSQL Pools                     │
└────────────────────────────────────────────────────────────┘

Key Components

Interfaces
  • DB: Top-level interface providing access to all database functionality
  • RootStore: Admin-level database operations bypassing tenant isolation
  • RiverStore: Access to the River job queue database
  • QueueClient: Interface for background job processing
Implementation Classes
  • PostgresDB: Main implementation of the DB interface
  • PostgresRootStore: Implementation of the RootStore interface
  • PostgresRiverStore: Implementation of the RiverStore interface
  • RiverQueue: Implementation of the QueueClient interface using River

Usage Examples

Creating a New Database Connection
import (
    "context"
    "log/slog"

    "github.com/AxelTahmid/tinker/config"
    "github.com/AxelTahmid/tinker/internal/db"
)

func InitDatabase(ctx context.Context, conf *config.Config, logger *slog.Logger) (db.DB, error) {
    database, err := db.New(ctx, conf.Database, logger)
    if err != nil {
        return nil, err
    }

    // Start the job queue
    if err := database.StartQueue(ctx, conf.Server); err != nil {
        database.Close()
        return nil, err
    }

    return database, nil
}
Using Transactions
func CreateUser(ctx context.Context, store db.Store, user User) error {
    return store.WithTransaction(ctx, func(ctx context.Context, tx pgx.Tx) error {
        // Perform operations within transaction
        queries := sqlc.New(tx)

        // Create user
        if err := queries.CreateUser(ctx, /* parameters */); err != nil {
            return db.WrapDBError(ctx, err)
        }

        // Create related records
        // ...

        return nil
    })
}
Adding Background Jobs
func ScheduleHelcimJob(ctx context.Context, db db.DB, customer helcim.Customer) error {
    args := db.HelcimCustomerArgs{
        APIToken: "token",
        Action:   db.ActionCustomerCreate,
        Customer: customer,
    }

    _, err := db.Queue().InsertHelcimCustomerJob(ctx, args, nil)
    return err
}

Error Handling

The package provides domain-specific errors and helper functions:

func GetUser(ctx context.Context, queries *sqlc.Queries, id int) (*User, error) {
    user, err := queries.GetUser(ctx, id)
    if err != nil {
        return nil, db.WrapDBError(ctx, err)
    }
    return user, nil
}

// In the calling code:
user, err := GetUser(ctx, db.Queries(), 123)
if db.IsNotFoundError(err) {
    // Handle not found case
} else if err != nil {
    // Handle other errors
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotInitialized = errors.New("database connection not yet initialized")
	ErrNoTenantID     = errors.New("no tenant ID in context")
)

Common errors that can be checked with errors.Is .

View Source
var (
	ErrNotFound            = errors.New("resource not found")
	ErrAlreadyExists       = errors.New("resource already exists")
	ErrForeignKeyViolation = errors.New("invalid reference to another resource")
	ErrDatabaseError       = errors.New("database error")
)

Domain-specific error types that clients can check with errors.Is.

View Source
var (
	ErrQueueNotInitialized = errors.New("job queue not initialized")
	ErrUnknownAction       = errors.New("unknown action type")
)

Common queue-related errors.

Functions

func IsAlreadyExistsError

func IsAlreadyExistsError(err error) bool

IsAlreadyExistsError checks if the error is an "already exists" error.

func IsDatabaseError

func IsDatabaseError(err error) bool

IsDatabaseError checks if the error is a general database error.

func IsForeignKeyViolationError

func IsForeignKeyViolationError(err error) bool

IsForeignKeyViolationError checks if the error is a foreign key violation.

func IsNotFoundError

func IsNotFoundError(err error) bool

IsNotFoundError checks if the error is a "not found" error.

func WrapDBError

func WrapDBError(ctx context.Context, err error) error

WrapDBError converts database errors into domain-specific errors.

Types

type DB

type DB interface {
	RootStore() RootStore
	RiverStore() RiverStore
	Queue() QueueClient
	Ping(ctx context.Context) error
	Close()
	StartQueue(ctx context.Context, serverConf *config.Server) error
	StopQueue(ctx context.Context) error
}

DB provides access to all database stores.

func New

func New(ctx context.Context, conf *config.Database, logger *slog.Logger) (DB, error)

New creates a new DB instance.

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func NewDBLogger

func NewDBLogger(logger *slog.Logger) *Logger

func (*Logger) Log

func (l *Logger) Log(ctx context.Context, level tracelog.LogLevel, msg string, data map[string]interface{})

type Pool

type Pool interface {
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
	Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
	Ping(ctx context.Context) error
	Close()
}

Pool represents a database connection pool with basic operations.

type PostgresDB

type PostgresDB struct {
	// contains filtered or unexported fields
}

PostgresDB implements the DB interface for PostgreSQL.

func (*PostgresDB) Close

func (db *PostgresDB) Close()

Close closes all database connections.

func (*PostgresDB) Ping

func (db *PostgresDB) Ping(ctx context.Context) error

Ping verifies all database connections are working.

func (*PostgresDB) Queue

func (db *PostgresDB) Queue() QueueClient

Queue returns the queue client.

func (*PostgresDB) RiverStore

func (db *PostgresDB) RiverStore() RiverStore

RiverStore returns the river queue store.

func (*PostgresDB) RootStore

func (db *PostgresDB) RootStore() RootStore

RootStore returns the root store.

func (*PostgresDB) StartQueue

func (db *PostgresDB) StartQueue(ctx context.Context, serverConf *config.Server) error

StartQueue initializes and starts the job queue.

func (*PostgresDB) StopQueue

func (db *PostgresDB) StopQueue(ctx context.Context) error

StopQueue stops the job queue.

type PostgresRiverStore

type PostgresRiverStore struct {
	// contains filtered or unexported fields
}

PostgresRiverStore implements the RiverStore interface.

func (*PostgresRiverStore) Close

func (s *PostgresRiverStore) Close()

Close closes the database connection.

func (*PostgresRiverStore) Ping

func (s *PostgresRiverStore) Ping(ctx context.Context) error

Ping verifies the database connection is working.

func (*PostgresRiverStore) Pool

func (s *PostgresRiverStore) Pool() Pool

Pool returns the underlying connection pool.

type PostgresRootStore

type PostgresRootStore struct {
	PostgresStore
}

PostgresRootStore implements the RootStore interface.

type PostgresStore

type PostgresStore struct {
	// contains filtered or unexported fields
}

PostgresStore implements the Store interface with PostgreSQL.

func (*PostgresStore) Close

func (s *PostgresStore) Close()

Close closes the database connection.

func (*PostgresStore) Ping

func (s *PostgresStore) Ping(ctx context.Context) error

Ping verifies the database connection is working.

func (*PostgresStore) Pool

func (s *PostgresStore) Pool() Pool

Pool returns the underlying connection pool.

func (*PostgresStore) Queries

func (s *PostgresStore) Queries() *sqlc.Queries

Queries returns the SQLC queries.

func (*PostgresStore) WithTransaction

func (s *PostgresStore) WithTransaction(ctx context.Context, fn TransactionFunc) error

WithTransaction executes the given function within a transaction.

type QueueClient

type QueueClient interface {
	// Job management.
	Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
	InsertTx(
		ctx context.Context,
		tx pgx.Tx,
		args river.JobArgs,
		opts *river.InsertOpts,
	) (*rivertype.JobInsertResult, error)

	// Queue management.
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

QueueClient provides an interface for job queue operations.

type RiverQueue

type RiverQueue struct {
	// contains filtered or unexported fields
}

RiverQueue implements the QueueClient interface using River.

func NewQueue

func NewQueue(ctx context.Context, pool *pgxpool.Pool, conf *config.Server, logger *slog.Logger) (*RiverQueue, error)

NewQueue creates a new queue client.

func (*RiverQueue) Insert

Insert adds a job to the queue.

func (*RiverQueue) InsertTx

func (q *RiverQueue) InsertTx(
	ctx context.Context,
	tx pgx.Tx,
	args river.JobArgs,
	opts *river.InsertOpts,
) (*rivertype.JobInsertResult, error)

InsertTx adds a txn job to the queue.

func (*RiverQueue) Start

func (q *RiverQueue) Start(ctx context.Context) error

Start starts the queue processor.

func (*RiverQueue) Stop

func (q *RiverQueue) Stop(ctx context.Context) error

Stop stops the queue processor.

type RiverStore

type RiverStore interface {
	Pool() Pool
	Ping(ctx context.Context) error
	Close()
}

RiverStore provides access to the River job queue database.

type RootStore

type RootStore interface {
	Store
}

RootStore provides root access to the database (bypassing tenant isolation).

type Store

type Store interface {
	Queries() *sqlc.Queries
	Pool() Pool
	WithTransaction(ctx context.Context, fn TransactionFunc) error
	Ping(ctx context.Context) error
	Close()
}

Store provides access to database operations with transaction support.

type TransactionFunc

type TransactionFunc func(ctx context.Context, tx pgx.Tx) error

TransactionFunc is a function that executes within a transaction.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL