multidb

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2020 License: BSD-3-Clause Imports: 9 Imported by: 7

README

codecov GoDoc Go Report Card

Multidb

Package multidb provides a sql.DB multiplexer for parallel queries using Go routines. It is meant as a top-level library which connects to a number of database Nodes. Nodes' health conditions are monitored by inspecting returning errors. After a (settable) threshold or errors has passed, a Node is disconnected and considered unavailable for subsequent requests. Failed nodes can be reconnected automatically.

Multidb automatically polls which of the connected Nodes is a master. If the master fails, multidb will try to find a new master, which might be found after promotion took place on a slave or the old master gets reconnected. Actual management of master and slaves (such as promotion) is considered outside the scope of this package.

The Node and MultiNode types aim to be interface compatible with sql.DB and sql.Tx. More specifically, multidb fully implements SQLBoiler's boil.Executor and boil.ContextExecutor interface types. This makes it an excellent fit for SQLBoiler's auto-generated models. (And perhaps other ORMs?)

Maturity

Beta stage:

  • The intended design is fully implemented and unit tested with the race detector.
  • We are using this library in other projects and it is being tested for production

Dependencies

This package has been developed against Go 1.13, with module support and might not work properly on older Go versions. The core package is slim and only depends on the standard Go libraries. Packages in drivers/ usually depend on their SQL driver counterpart. Unit tests pull in some additional packages like go-sqlmock and sqlboiler.

Installation

go get -u github.com/moapis/multidb

Full test suite can be run with:

go test ./...

Documentation and examples

Godoc.org

Copyright (c) 2019, Mohlmann Solutions SRL. All rights reserved. Use of this source code is governed by a License that can be found in the LICENSE file.

Documentation

Overview

Package multidb provides a sql.DB multiplexer for parallel queries using Go routines. It is meant as a top-level library which connects to a number of database Nodes. Nodes' health conditions are monitored by inspecting returning errors. After a (settable) threshold or errors has passed, a Node is disconnected and considered unavailable for subsequent requests. Failed nodes can be reconnected automatically.

Multidb automatically polls which of the connected Nodes is a master. If the master fails, multidb will try to find a new master, which might be found after promotion took place on a slave or the old master gets reconnected. Actual management of master and slaves (such as promotion) is considered outside the scope of this package.

The Node and MultiNode types aim to be interface compatible with sql.DB and sql.Tx. More specifically, multidb fully implements SQLBoiler's boil.Executor and boil.ContextExecutor interface types. This makes it an excellent fit for SQLBoiler. (And perhaps other ORMs?)

Example
c := Config{
	DBConf: postgresql.Config{
		Nodes: []postgresql.Node{
			{
				Host: "db1.example.com",
				Port: 5432,
			},
			{
				Host: "db2.example.com",
				Port: 5432,
			},
			{
				Host: "db3.example.com",
				Port: 5432,
			},
		},
		Params: postgresql.Params{
			DBname:          "multidb",
			User:            "postgres",
			Password:        "",
			SSLmode:         postgresql.SSLDisable,
			Connect_timeout: 30,
		},
	},
	StatsLen:      100,
	MaxFails:      10,
	ReconnectWait: 10 * time.Second,
}
// Connect to all specified DB Hosts
mdb, err := c.Open()
if err != nil {
	log.Fatal(err)
}
defer mdb.Close()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()

// Open a TX for insertion on the master.
// Master assertion is done in the background on first access.
tx, err := mdb.MasterTx(ctx, nil)
if err != nil {
	log.Fatal(err)
}
defer tx.Rollback()

// Do stuff inside the transaction
if _, err = tx.ExecContext(ctx, "CREATE TABLE content ( id INTEGER PRIMARY KEY );"); err != nil {
	log.Fatal(err)
}
if _, err = tx.ExecContext(ctx, "INSERT INTO content (id) VALUES ($1);", 999); err != nil {
	log.Fatal(err)
}
if _, err = tx.ExecContext(ctx, "INSERT INTO content (id) VALUES ($1);", 101); err != nil {
	log.Fatal(err)
}
if err = tx.Commit(); err != nil {
	log.Fatal(err)
}

// Acquire 3 nodes for select operation
mn, err := mdb.MultiNode(3)
if err != nil {
	log.Fatal(err)
}
rows, err := mn.QueryContext(ctx, "SELECT id FROM content WHERE id = $1", 999)
if err != nil {
	log.Fatal(err)
}
for rows.Next() {
	var i int
	if err = rows.Scan(&i); err != nil {
		log.Fatal(err)
	}
	fmt.Println(i)
}

// Acquire the master node without Tx
master, err := mdb.Master(ctx)
if err != nil {
	log.Fatal(err)
}

// Exec without context
if _, err = master.Exec("DROP TABLE content"); err != nil {
	log.Fatal(err)
}

Index

Examples

Constants

View Source
const (
	// ErrNoNodes is returned when there are no connected nodes available for the requested operation
	ErrNoNodes = "No available nodes"
	// ErrNoMaster is returned when no master is available
	ErrNoMaster = "No available master, cause: %w"
	// ErrSuccesReq is returned when higher than 1.0
	ErrSuccesReq = "SuccesReq > 1"
)
View Source
const (
	// ErrAlreadyOpen is returned when Opening on an already open Node
	ErrAlreadyOpen = "Node already open"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	DBConf drivers.Configurator `json:"dbconf,omitempty"`

	// Amount of past connections to consider when establishing the failure rate.
	StatsLen int `json:"statslen,omitempty"`
	// Amount of allowed counted failures, after which the DB connector will be closed.
	// Note that Go's SQL connectors are actually connection pools.
	// Individual connections are already reset upon connection errors by the sql library.
	// This library closes the complete pool for a single node.
	// 0 disconnects on the first error. (Probably not what you want)
	// A value >= StatsLen means 100% failure rate allowed.
	// Negative values disables autoclosing statistics / counting.
	MaxFails int `json:"maxfails"`
	// Time to wait before attempting to reconnect failed nodes.
	// Attempts will be done indefinitely.
	// Set to 0 to disable reconnects.
	ReconnectWait time.Duration `json:"reconnectwait"`
}

Config configures multiple databas servers

func (Config) Open

func (c Config) Open() (*MultiDB, error)

Open all the configured DB hosts. Poll Node.ConnErr() to inspect for connection failures. Only returns an error if amount of configured nodes == 0.

If ReconnectWait is set, failing Nodes will enter into a reconnection sequence and may become available after some time.

type MultiDB

type MultiDB struct {
	// contains filtered or unexported fields
}

MultiDB holds the multiple DB objects, capable of Writing and Reading.

func (*MultiDB) All

func (mdb *MultiDB) All() []*Node

All returns all Nodes, regardless of their state.

func (*MultiDB) Close

func (mdb *MultiDB) Close() error

Close the DB connectors on all nodes.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

func (*MultiDB) Master

func (mdb *MultiDB) Master(ctx context.Context) (*Node, error)

Master node getter

func (*MultiDB) MasterTx

func (mdb *MultiDB) MasterTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error)

MasterTx returns the master node with an opened transaction

func (*MultiDB) MultiNode

func (mdb *MultiDB) MultiNode(max int) (MultiNode, error)

MultiNode returns available *Nodes. Nodes are sorted by the division of InUse/MaxOpenConnections. Up to `max` amount of nodes will be in the returned object. If `max` is set to 0, all available nodes are returned. An error is returned in case no nodes are available.

The nodes may be master or slaves and should only be used for read operations.

func (*MultiDB) MultiTx added in v0.1.1

func (mdb *MultiDB) MultiTx(ctx context.Context, opts *sql.TxOptions, max int) (*MultiTx, error)

MultiTx returns a MultiNode with an open transaction

func (*MultiDB) Node

func (mdb *MultiDB) Node() (*Node, error)

Node returns any ready Mode with the lowest value after division of InUse/MaxOpenConnections.

The returned node may be master or slave and should only be used for read operations.

func (*MultiDB) NodeTx

func (mdb *MultiDB) NodeTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error)

NodeTx returns any node with an opened transaction. The transaction is created in ReadOnly mode.

type MultiError

type MultiError struct {
	Errors []error
}

MultiError is a collection of errors which can arise from parallel query execution.

func (MultiError) Error

func (me MultiError) Error() string

type MultiNode

type MultiNode []*Node

MultiNode holds a slice of Nodes. All methods on this type run their sql.DB variant in one Go routine per Node.

func (MultiNode) Begin

func (mn MultiNode) Begin() (*MultiTx, error)

Begin runs BeginTx with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included for consistency.

func (MultiNode) BeginTx

func (mn MultiNode) BeginTx(ctx context.Context, opts *sql.TxOptions) (*MultiTx, error)

BeginTx runs sql.DB.BeginTx on the Nodes in separate Go routines. The transactions are created in ReadOnly mode. It waits for all the calls to return or the context to expire. If you have enough nodes available, you might want to set short timeout values on the context to fail fast on non-responding database hosts.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Note: this method can return both a valid Tx and an error value, in case any (but not all) node calls fails. Tx will carry fewer amount of entries than requested. This breaks the common `if err != nil` convention, but we want to leave the descission whetter to proceed or not, up to the caller.

func (MultiNode) Exec

func (mn MultiNode) Exec(query string, args ...interface{}) (sql.Result, error)

Exec runs ExecContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (MultiNode) ExecContext

func (mn MultiNode) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

ExecContext runs sql.DB.ExecContext on the Nodes in separate Go routines. The first non-error result is returned immediately and errors from the other Nodes will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

It does not make much sense to run this method against multiple Nodes, as they are usually slaves. This method is primarily included to implement boil.ContextExecutor.

func (MultiNode) Query

func (mn MultiNode) Query(query string, args ...interface{}) (*sql.Rows, error)

Query runs QueryContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (MultiNode) QueryContext

func (mn MultiNode) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

QueryContext runs sql.DB.QueryContext on the Nodes in separate Go routines. The first non-error result is returned immediately and errors from the other Nodes will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Implements boil.ContextExecutor.

func (MultiNode) QueryRow

func (mn MultiNode) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow runs QueryRowContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (MultiNode) QueryRowContext

func (mn MultiNode) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext runs sql.DB.QueryRowContext on the Nodes in separate Go routines. The first result is returned immediately, regardless if that result has an error.

Errors in sql.DB.QueryRow are deferred until scan and therefore opaque to this package. If you have a choice, stick with a regular QueryContext. This method is primarily included to implement boil.Executor.

type MultiTx

type MultiTx struct {
	// contains filtered or unexported fields
}

MultiTx holds a slice of open transactions to multiple nodes. All methods on this type run their sql.Tx variant in one Go routine per Node.

func (*MultiTx) Commit

func (m *MultiTx) Commit() error

Commit runs sql.Tx.Commit on the transactions in separate Go routines. It waits for all the calls to return.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Note: this method returns an error even if some commits where executed successfully. It is up to the caller to decide what to do with those errors. Typically MultiTx calls should only be run against a set if slave databases. herefore it does not make much sense to Commit. If however, you did run this against multiple hosts and some of them failed, you'll now have to deal with an inconsistent dataset.

This method is primarily included to implement boil.Transactor

func (*MultiTx) Exec

func (m *MultiTx) Exec(query string, args ...interface{}) (sql.Result, error)

Exec runs ExecContext with context.Background(). It is highly recommended to stick with the contexted variant in paralell executions. This method is primarily included to implement boil.Executor.

func (*MultiTx) ExecContext

func (m *MultiTx) ExecContext(ctx context.Context, query string, args ...interface{}) (res sql.Result, err error)

ExecContext runs sql.Tx.ExecContext on the transactions in separate Go routines. The first non-error result is returned immediately and errors from the other transactions will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

It does not make much sense to run this method against multiple Nodes, as they are ussualy slaves. This method is primarily included to implement boil.ContextExecutor.

func (*MultiTx) Query

func (m *MultiTx) Query(query string, args ...interface{}) (*sql.Rows, error)

Query runs QueryContext with context.Background(). It is highly recommended to stick with the contexted variant in parallel executions. This method is primarily included to implement boil.Executor.

func (*MultiTx) QueryContext

func (m *MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error)

QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines. The first non-error result is returned immediately and errors from the other Nodes will be ignored.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Implements boil.ContextExecutor.

func (*MultiTx) QueryRow

func (m *MultiTx) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow wrapper around sql.DB.QueryRow. Implements boil.Executor. Since errors are deferred until row.Scan, this package cannot monitor such errors.

func (*MultiTx) QueryRowContext

func (m *MultiTx) QueryRowContext(ctx context.Context, query string, args ...interface{}) (row *sql.Row)

QueryRowContext runs sql.Tx.QueryRowContext on the tranactions in separate Go routines. The first result is returned immediately, regardless if that result has an error.

Errors in sql.Tx.QueryRow are deferred until scan and therefore opaque to this package. If you have a choice, stick with a regular QueryContext. This method is primarily included to implement boil.Executor.

func (*MultiTx) Rollback

func (m *MultiTx) Rollback() error

Rollback runs sql.Tx.Rollback on the transactions in separate Go routines. It waits for all the calls to return.

If all nodes respond with the same error, that exact error is returned as-is. If there is a variety of errors, they will be embedded in a MultiError return.

Note: this method returns an error even if some rollbacks where executed successfully. It is up to the caller to decide what to do with those errors. Typically MultiTx calls should only be run against a set of slave databases. In such cases Rollback is only used in a defer to tell the hosts that we are done and errors can safely be ignored.

Implements boil.Transactor

type Node

type Node struct {
	drivers.Configurator

	// DB holds the raw *sql.DB for direct access.
	// Errors produced by calling DB directly are not monitored by this package.
	DB *sql.DB
	// contains filtered or unexported fields
}

Node represents a database server connection

func (*Node) Begin

func (n *Node) Begin() (*Tx, error)

Begin opens a new *sql.Tx inside a Tx. Does NOT implement boil.Beginner, as it requires a *sql.Tx.

func (*Node) BeginTx

func (n *Node) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error)

BeginTx opens a new *sql.Tx inside a Tx.

func (*Node) CheckErr

func (n *Node) CheckErr(err error) error

CheckErr updates the statistics. If the error is nil or whitelisted, success is recorded. Any other case constitutes an error and failure is recorded. If a the configured failure treshhold is reached, this node will we disconnected.

This method is already called by each database call method and need to be used in most cases. It is exported for use in extending libraries which need use struct embedding and want to overload Node methods, while still keeping statistics up-to-date.

func (*Node) Close

func (n *Node) Close() error

Close the current node and make it unavailable

func (*Node) ConnErr

func (n *Node) ConnErr() error

ConnErr returns the last encountered connection error for Node

func (*Node) Exec

func (n *Node) Exec(query string, args ...interface{}) (sql.Result, error)

Exec wrapper around sql.DB.Exec. Implements boil.Executor

func (*Node) ExecContext

func (n *Node) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

ExecContext wrapper around sql.DB.Exec. Implements boil.ContextExecutor

func (*Node) InUse

func (n *Node) InUse() int

InUse get the InUse counter from db.Stats. Returns -1 in case db is unavailable.

func (*Node) Open

func (n *Node) Open() error

Open calls sql.Open() with the configured driverName and dataSourceName. Open should only be used after the Node was (auto-)closed and reconnection is disabled.

func (*Node) Query

func (n *Node) Query(query string, args ...interface{}) (*sql.Rows, error)

Query wrapper around sql.DB.Query. Implements boil.Executor

func (*Node) QueryContext

func (n *Node) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

QueryContext wrapper around sql.DB.Query. Implements boil.ContextExecutor

func (*Node) QueryRow

func (n *Node) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow wrapper around sql.DB.QueryRow. Implements boil.Executor Since errors are deferred until row.Scan, this package cannot monitor such errors.

func (*Node) QueryRowContext

func (n *Node) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext wrapper around sql.DB.QueryRow. Implements boil.ContextExecutor Since errors are deferred until row.Scan, this package cannot monitor such errors.

func (*Node) Reconnecting

func (n *Node) Reconnecting() bool

Reconnecting returns true while reconnecting is in progress

type Tx

type Tx struct {
	*Node
	// contains filtered or unexported fields
}

Tx is a transaction on a node

func (*Tx) Commit

func (x *Tx) Commit() error

Commit is a wrapper around sql.Tx.Commit. Implements boil.Transactor and boil.ContextTransactor

func (*Tx) Exec

func (x *Tx) Exec(query string, args ...interface{}) (sql.Result, error)

Exec is a wrapper around sql.Tx.Exec. Implements boil.Executor

func (*Tx) ExecContext

func (x *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

ExecContext is a wrapper around sql.Tx.Exec. Implements boil.ContextExecutor

func (*Tx) Query

func (x *Tx) Query(query string, args ...interface{}) (*sql.Rows, error)

Query is a wrapper around sql.Tx.Query. Implements boil.Executor

func (*Tx) QueryContext

func (x *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

QueryContext is a wrapper around sql.Tx.Query. Implements boil.ContextExecutor

func (*Tx) QueryRow

func (x *Tx) QueryRow(query string, args ...interface{}) *sql.Row

QueryRow is a wrapper around sql.Tx.QueryRow. Implements boil.Executor Since errors are defered untill row.Scan, this package cannot monitor such errors.

func (*Tx) QueryRowContext

func (x *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row

QueryRowContext is a wrapper around sql.Tx.QueryRow. Implements boil.ContextExecutor Since errors are defered untill row.Scan, this package cannot monitor such errors.

func (*Tx) Rollback

func (x *Tx) Rollback() error

Rollback is a wrapper around sql.Tx.Rollback. Implements boil.Transactor and boil.ContextTransactor

Notes

Bugs

Directories

Path Synopsis
integration
sqlboiler module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL