dblocker

package module
v0.0.0-...-b199f32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2024 License: MIT Imports: 10 Imported by: 0

README

dblocker

godoc

Golang database locker. A simple library to lock a shared database session for each "user" or "id" behind what is effectively a RWMutex.

Works with sqlite, postgres, and mysql by default. Other databases can be easily added by using a custom connectDBFunc.

The ReadGetDB and RWGetDB functions return a shared database/sql database. The ReadGetDBx and RWGetDBx functions return a shared sqlx databse. sqlx is a library which provides a set of extensions on go's standard database/sql library.

Why?

Allows:

  • simple access to sqlite without worrying about crashes due to concurrently reads and writes from multiple databse sessions.
  • a simple mechanism to ensure that only one "user" or "id" accesses the database at any time, as if access for that "user" or "id" is locked behind a RWMutex.
  • database access such as multiple cuncurrent database select requests without also requiring the use of pgbouncer for postgres or similar session access caching tools.
  • multiple sql commands (and other go code) to be run for a "user" or "id", while not worrying about concurrent access for that "user" or "id", and without needing to run all of the database commands in one database transaction.

If you use a custom connectDBFunc, you can also implement simple database sharding based on the "user" or "id" that you provide.

Example

import (
    "fmt"
    "strconv"

    "github.com/calmdocs/dblocker"
    "github.com/google/uuid"
)

type File struct {
	ID      int64   `db:"id"`
    	UserID  int64   `db:"user_id"`
	Name    string  `db:"name"`
}

func main() {
    debug := false
    driverName := sqlite3
    dataSourceName := "/path/to/sql.db"
    userID := "123"
    oldFileName := "file 27"
    newFileName := "newFile.txt"

    ctx, cancel := context.WithCancel(context.Backgound())
    defer cancel()

    // Create a dbStore
    // with a default unlockTimeout for waiting for access to the database of 2 minutes, and
    // with a default statemenTimeout for database sessions of 4 minutes (where the database supports statement timeouts).
    dbStore, err := dblocker.New(ctx, driverName, dataSourceName, debug)
    if err != nil {
        panic(err)
    }

    // Create database table using RWGetDBx and insert 50 rows.
    err = createTableAndInsertRows(ctx, dbStore, userID)
    if err != nil {
        panic(err)
    }

    // Allow user 123 to get a list of files from the database using ReadGetDBx.
    // Concurrent read access to the database for user 123 is permitted.
    files, err := getFiles(ctx, dbStore, userID)
    if err != nil {
        panic(err)
    }
    fmt.Println(files)
    
    // Allow user 123 to update a database entry using RWGetDBx.
    // No concurrent access to the database for that user is permitted.
    err = updateFileName(ctx, userID, oldFileName, newFileName)
    if err != nil {
        panic(err)
    }

    // Run 20 concurrent goroutines
    // Using new database connections instead of dblocker here would break
    // sqlite due to concurrent reads and writes and
    // postgres due to too many concurrent database connections
    for i := 1; i <= 20; i++ {
        go func() {
            files, err := getFiles(ctx, dbStore, userID)
            if err != nil {
                panic(err)
            }
            fmt.Println(files)

            err = updateFileName(ctx, userID, oldFileName, newFileName)
            if err != nil {
                panic(err)
            }
        }
    }
}

func createTableAndInsertRows(ctx context.Context, dbStore *dblocker.Store, userID string) (err error) {

    // Get exclusive access the shared database session
    cancelDB, db, err := dbStore.RWGetDBx(userID, ctx, "create database")
    if err != nil {
        return err
    }
    defer cancelDB()

    // Create table if it does not exist
    _, err = db.ExecContext(ctx, "CREATE TABLE IF NOT EXISTS files (id TEXT, user_id TEXT, file_id TEXT, name TEXT);")
    if err != nil {
        return err
    }

    // Insert 50 rows into the database
    for i := 1; i <= 50; i++ {

	// Create new fid using uuidv7
	newUUID, err := uuid.NewV7()
	if err != nil {
		return err
	}
	// Insert
        _, err = db.ExecContext(
            ctx,
            db.Rebind("insert into files(id, user_id, name) values(?, ?, ?)"),
            newUUID.String(),
            userID,
            fmt.Sprintf("file %d", i),
        )
    }

    return err
}

func getFiles(ctx context.Context, dbStore *dblocker.Store, userID string) (files []File, err error) {

    // Multiple ReadGetDBx calls can access the shared database session concurrently
    cancelDB, db, err := dbStore.ReadGetDBx(userID, ctx, "get files")
    if err != nil {
        return err
    }
    defer cancelDB()

    err = db.SelectContext(
        ctx,
        &files,
        db.Rebind("SELECT * from files WHERE user_id = ?"),
        userID,
    )
    if err != nil {
        return nil, err
    }
    return files, nil
}

func updateFileName(ctx context.Context, userID string, oldFileName string, newFileName string) (err error) {
    
    // Get exclusive access the shared database session
    cancelDB, db, err := dbStore.RWGetDBx(userID, ctx, "update file name")
    if err != nil {
        return err
    }
    defer cancelDB()

    _, err = db.ExecContext(
        ctx,
        db.Rebind("update files set name = ? WHERE user_id = ? AND name = ?"),
        newFileName,
        userID,
        oldFileName,
    )
    return err
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultConnectDBFunc

func DefaultConnectDBFunc(ctx context.Context, id interface{}, driverName, dataSourceName string, statementTimeout *time.Duration) (db *sqlx.DB, err error)

DefaultConnectDBFunc is the default function used to connecct to the database This default function has an unused id variable. This function could be customised, for example, to send requests to different database shards based on the provided id.

Types

type Group

type Group struct {
	DB *sqlx.DB
	// contains filtered or unexported fields
}

Group is a group storing the shared database for an id

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request is a database access request

type Store

type Store struct {
	sync.Mutex

	Ctx context.Context

	DriverName       string
	DataSourceName   string
	UnlockTimeout    *time.Duration
	StatementTimeout *time.Duration
	// contains filtered or unexported fields
}

Store is the dblocker store

func New

func New(
	ctx context.Context,
	driverName string,
	dataSourceName string,
	debug bool,
) (s *Store, err error)

New creates a new dblocker Store using the default connectDBFunc; with a default unlockTimeout for waiting for access to the database of 2 minutes, and with a default statemenTimeout for database sessions of 4 minutes (where the database supports statement timeouts)

func NewWithConnectDBFuncAndTimeouts

func NewWithConnectDBFuncAndTimeouts(
	ctx context.Context,
	connectDBFunc func(ctx context.Context, id interface{}, driverName, dataSourceName string, statementTimeout *time.Duration) (db *sqlx.DB, err error),
	driverName string,
	dataSourceName string,
	unlockTimeout *time.Duration,
	statementTimeout *time.Duration,
	debug bool,
) (s *Store, err error)

NewWithConnectDBFuncAndTimeouts creates a new dblocker Store with a custom connectDBFunc (which can be used for database types not in the DefaultConnectDBFunc (i.e. sqlite, postgres, and mysql) and/or to shard requests by id for example); with an unlockTimeout for waiting for access to the database; and with a statemenTimeout for database sessions (returns an error if not nil and the database does not support statement timeouts).

func NewWithUnlockAndStatementTimeouts

func NewWithUnlockAndStatementTimeouts(
	ctx context.Context,
	driverName string,
	dataSourceName string,
	unlockTimeout *time.Duration,
	statementTimeout *time.Duration,
	debug bool,
) (s *Store, err error)

NewWithUnlockAndStatementTimeouts creates a new dblocker Store using the default connectDBFunc; with an unlockTimeout for waiting for access to the database; and with a statemenTimeout for database sessions (returns an error if not nil and the database does not support statement timeouts).

func (*Store) RWGetDB

func (s *Store) RWGetDB(id interface{}, ctx context.Context, tag string) (cancel context.CancelFunc, db *sql.DB, err error)

RWGetDB returns a shared copy of a database session (*sql.DB) for the specified id. RWGetDB acts like Lock() for a RWMutex for the specified id. All other RWGetDB, RWGetDBWithTimeout, and ReadDB function calls will wait for access to the database for the specified id until the returned cancel() function is called.

func (*Store) RWGetDBWithTimeout

func (s *Store) RWGetDBWithTimeout(id interface{}, ctx context.Context, tag string, statementTimeout *time.Duration) (cancel context.CancelFunc, db *sql.DB, err error)

RWGetDBWithTimeout returns a new database session (*sql.DB) for the specified id with a custom session timeout. RWGetDBWithTimeout acts like Lock() for a RWMutex for the specified id. All other RWGetDB, RWGetDBWithTimeout, and ReadDB function calls will wait for access to the database for the specified id until the returned cancel() function is called.

func (*Store) RWGetDBx

func (s *Store) RWGetDBx(id interface{}, ctx context.Context, tag string) (cancel context.CancelFunc, db *sqlx.DB, err error)

RWGetDB returns a shared copy of a database session (*sqlx.DB) for the specified id. github.com/jmoiron/sqlx is a library which provides a set of extensions on go's standard database/sql library. RWGetDB acts like Lock() for a RWMutex for the specified id. All other RWGetDB, RWGetDBWithTimeout, and ReadDB function calls will wait for access to the database for the specified id until the returned cancel() function is called.

func (*Store) RWGetDBxWithTimeout

func (s *Store) RWGetDBxWithTimeout(id interface{}, ctx context.Context, tag string, statementTimeout *time.Duration) (cancel context.CancelFunc, db *sqlx.DB, err error)

RWGetDBWithTimeout returns a new database session (*sqlx.DB) for the specified id with a custom session timeout. github.com/jmoiron/sqlx is a library which provides a set of extensions on go's standard database/sql library. RWGetDBWithTimeout acts like Lock() for a RWMutex for the specified id. All other RWGetDB, RWGetDBWithTimeout, and ReadDB function calls will wait for access to the database for the specified id until the returned cancel() function is called.

func (*Store) ReadGetDB

func (s *Store) ReadGetDB(id interface{}, ctx context.Context, tag string) (cancel context.CancelFunc, db *sql.DB, err error)

ReadDB returns a shared copy of a database session (*sql.DB) for the specified id. ReadDB acts like RLock() for a RWMutex for the specified id. Multiple ReadDB function calls can access the shared database at the same time. All RWGetDB and RWGetDBWithTimeout function calls will wait for access to the database for the specified id until the returned cancel() function is called.

func (*Store) ReadGetDBx

func (s *Store) ReadGetDBx(id interface{}, ctx context.Context, tag string) (cancel context.CancelFunc, db *sqlx.DB, err error)

ReadDB returns a shared copy of a database session (*sqlx.DB) for the specified id. github.com/jmoiron/sqlx is a library which provides a set of extensions on go's standard database/sql library. ReadDB acts like RLock() for a RWMutex for the specified id. Multiple ReadDB function calls can access the shared database at the same time. All RWGetDB and RWGetDBWithTimeout function calls will wait for access to the database for the specified id until the returned cancel() function is called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL