Documentation
¶
Index ¶
- Constants
- func BuildInsertStmtWithout(db *DB, into interface{}, withoutColumns ...string) string
- func CantPerformQuery(err error, q string) error
- func InsertObtainID(ctx context.Context, conn TxOrDB, stmt string, arg any) (int64, error)
- func MysqlSplitStatements(statements string) []string
- func SplitOnDupId[T IDer]() com.BulkChunkSplitPolicy[T]
- func TableName(t interface{}) string
- type ColumnMap
- type Config
- type DB
- func (db *DB) BatchSizeByPlaceholders(n int) int
- func (db *DB) BuildColumns(subject interface{}) []string
- func (db *DB) BuildDeleteStmt(from interface{}) string
- func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int)
- func (db *DB) BuildInsertStmt(into interface{}) (string, int)
- func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string
- func (db *DB) BuildUpdateStmt(update interface{}) (string, int)
- func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int)
- func (db *DB) BuildWhere(subject interface{}) (string, int)
- func (db *DB) BulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, ...) error
- func (db *DB) CreateIgnoreStreamed(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error
- func (db *DB) CreateStreamed(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error
- func (db *DB) Delete(ctx context.Context, entityType Entity, ids []interface{}, ...) error
- func (db *DB) DeleteStreamed(ctx context.Context, entityType Entity, ids <-chan interface{}, ...) error
- func (db *DB) ExecTx(ctx context.Context, fn func(context.Context, *sqlx.Tx) error) error
- func (db *DB) GetAddr() string
- func (db *DB) GetDefaultRetrySettings() retry.Settings
- func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted
- func (db *DB) HasTable(ctx context.Context, table string) (bool, error)
- func (db *DB) Log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper
- func (db *DB) MarshalLogObject(encoder zapcore.ObjectEncoder) error
- func (db *DB) NamedBulkExec(ctx context.Context, query string, count int, sem *semaphore.Weighted, ...) error
- func (db *DB) NamedBulkExecTx(ctx context.Context, query string, count int, sem *semaphore.Weighted, ...) error
- func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
- func (db *DB) UpsertStreamed(ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity]) error
- func (db *DB) YieldAll(ctx context.Context, factoryFunc EntityFactoryFunc, query string, ...) (<-chan Entity, <-chan error)
- type Entity
- type EntityFactoryFunc
- type Fingerprinter
- type ID
- type IDer
- type MysqlFuncLogger
- type OnInitConnFunc
- type OnSuccess
- type Options
- type PgsqlOnConflictConstrainter
- type RetryConnector
- type RetryConnectorCallbacks
- type Scoper
- type TableNamer
- type TxOrDB
- type Upserter
Constants ¶
const ( MySQL string = "mysql" PostgreSQL string = "postgres" )
Driver names as automatically registered in the database/sql package by themselves.
Variables ¶
This section is empty.
Functions ¶
func BuildInsertStmtWithout ¶ added in v0.7.0
BuildInsertStmtWithout builds an insert stmt without the provided columns.
func CantPerformQuery ¶
CantPerformQuery wraps the given error with the specified query that cannot be executed.
func InsertObtainID ¶ added in v0.7.0
InsertObtainID executes the given query and fetches the last inserted ID.
Using this method for database tables that don't define an auto-incrementing ID, or none at all, will not work. The only supported column that can be retrieved with this method is id.
This function expects TxOrDB as an executor of the provided query, and is usually a *sqlx.Tx or *DB instance.
Returns the retrieved ID on success and error on any database inserting/retrieving failure.
func MysqlSplitStatements ¶ added in v0.6.0
MysqlSplitStatements takes a string containing multiple SQL statements and splits them into individual statements with limited support for the DELIMITER keyword like implemented by the mysql command line client.
The main purpose of this function is to allow importing a schema file containing stored functions from Go. Such files have to specify an alternative delimiter internally if the function has semicolons in its body, otherwise the mysql command line clients splits the CREATE FUNCTION statement somewhere in the middle. This delimiter handling is not supported by the MySQL server, so when trying to import such a schema file using a different method than the mysql command line client, the delimiter handling has to be reimplemented. This is what this function does.
To avoid an overly complex implementation, this function has some limitations on its input:
- Specifying a delimiter using a quoted string is NOT supported.
- Statements are only split if the delimiter appears at the end of a line. This in done in order to avoid accidentally splitting in the middle of string literals and comments.
- The function does not attempt to handle comments in any way, so there must not be a delimiter at the end of a line within a comment.
- The delimiter command is only recognized at the beginning of the file or immediately following a delimiter at the end of a previous line, there must not be a comment in between, empty lines are fine.
func SplitOnDupId ¶
func SplitOnDupId[T IDer]() com.BulkChunkSplitPolicy[T]
SplitOnDupId returns a state machine which tracks the inputs' IDs. Once an already seen input arrives, it demands splitting.
Types ¶
type ColumnMap ¶
type ColumnMap interface { // Columns returns database column names for a struct's exported fields in a cached manner. // Thus, the returned slice MUST NOT be modified directly. // By default, all exported struct fields are mapped to database column names using snake case notation. // The - (hyphen) directive for the db tag can be used to exclude certain fields. Columns(any) []string }
ColumnMap provides a cached mapping of structs exported fields to their database column names.
func NewColumnMap ¶
NewColumnMap returns a new ColumnMap.
type Config ¶
type Config struct { Type string `yaml:"type" env:"TYPE" default:"mysql"` Host string `yaml:"host" env:"HOST"` Port int `yaml:"port" env:"PORT"` Database string `yaml:"database" env:"DATABASE"` User string `yaml:"user" env:"USER"` Password string `yaml:"password" env:"PASSWORD,unset"` TlsOptions config.TLS `yaml:",inline"` Options Options `yaml:"options" envPrefix:"OPTIONS_"` }
Config defines database client configuration.
type DB ¶
DB is a wrapper around sqlx.DB with bulk execution, statement building, streaming and logging capabilities.
func NewDbFromConfig ¶
func NewDbFromConfig(c *Config, logger *logging.Logger, connectorCallbacks RetryConnectorCallbacks) (*DB, error)
NewDbFromConfig returns a new DB from Config.
func (*DB) BatchSizeByPlaceholders ¶
BatchSizeByPlaceholders returns how often the specified number of placeholders fits into Options.MaxPlaceholdersPerStatement, but at least 1.
func (*DB) BuildColumns ¶ added in v0.2.0
BuildColumns returns all columns of the given struct.
func (*DB) BuildDeleteStmt ¶
BuildDeleteStmt returns a DELETE statement for the given struct.
func (*DB) BuildInsertIgnoreStmt ¶
BuildInsertIgnoreStmt returns an INSERT statement for the specified struct for which the database ignores rows that have already been inserted.
func (*DB) BuildInsertStmt ¶
BuildInsertStmt returns an INSERT INTO statement for the given struct.
func (*DB) BuildSelectStmt ¶
BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct and the column list from the specified columns struct.
func (*DB) BuildUpdateStmt ¶
BuildUpdateStmt returns an UPDATE statement for the given struct.
func (*DB) BuildUpsertStmt ¶
BuildUpsertStmt returns an upsert statement for the given struct.
func (*DB) BuildWhere ¶
BuildWhere returns a WHERE clause with named placeholder conditions built from the specified struct combined with the AND operator.
func (*DB) BulkExec ¶
func (db *DB) BulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any], ) error
BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. Takes in up to the number of arguments specified in count from the arg stream, derives and expands a query and executes it with this set of arguments until the arg stream has been processed. The derived queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Arguments for which the query ran successfully will be passed to onSuccess.
func (*DB) CreateIgnoreStreamed ¶
func (db *DB) CreateIgnoreStreamed( ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity], ) error
CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.
func (*DB) CreateStreamed ¶
func (db *DB) CreateStreamed( ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity], ) error
CreateStreamed bulk creates the specified entities via NamedBulkExec. The insert statement is created using BuildInsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.
func (*DB) Delete ¶
func (db *DB) Delete( ctx context.Context, entityType Entity, ids []interface{}, onSuccess ...OnSuccess[any], ) error
Delete creates a channel from the specified ids and bulk deletes them by passing the channel along with the entityType to DeleteStreamed. IDs for which the query ran successfully will be passed to onSuccess.
func (*DB) DeleteStreamed ¶
func (db *DB) DeleteStreamed( ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any], ) error
DeleteStreamed bulk deletes the specified ids via BulkExec. The delete statement is created using BuildDeleteStmt with the passed entityType. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. IDs for which the query ran successfully will be passed to onSuccess.
func (*DB) ExecTx ¶ added in v0.4.0
ExecTx executes the provided function within a database transaction.
Starts a new transaction, executes the provided function, and commits the transaction if the function succeeds. If the function returns an error, the transaction is rolled back.
Returns an error if starting the transaction, executing the function, or committing the transaction fails.
Note that committing the transaction may not honor the context provided. For some database drivers, once a COMMIT query is started, it will block until the database responds. Therefore, for time-critical scenarios, it is recommended to add a select wrapper against the context.
func (*DB) GetAddr ¶
GetAddr returns a URI-like database connection string.
It has the following syntax:
type[+tls]://user@host[:port]/database
func (*DB) GetDefaultRetrySettings ¶
func (*DB) GetSemaphoreForTable ¶
func (*DB) HasTable ¶ added in v0.6.2
HasTable checks whether a table is present in the database.
The first return value indicates whether a table of the given name exists. The second return value contains any errors that occurred during the check. If the error is not nil, the first argument is always false.
func (*DB) MarshalLogObject ¶ added in v0.4.0
func (db *DB) MarshalLogObject(encoder zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler, adding the database address DB.GetAddr to each log message.
func (*DB) NamedBulkExec ¶
func (db *DB) NamedBulkExec( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan Entity, splitPolicyFactory com.BulkChunkSplitPolicyFactory[Entity], onSuccess ...OnSuccess[Entity], ) error
NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely in the format INSERT ... VALUES. Takes in up to the number of entities specified in count from the arg stream, derives and executes a new query with the VALUES clause expanded to this set of arguments, until the arg stream has been processed. The queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Entities for which the query ran successfully will be passed to onSuccess.
func (*DB) NamedBulkExecTx ¶
func (db *DB) NamedBulkExecTx( ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan Entity, ) error
NamedBulkExecTx bulk executes queries with named placeholders in separate transactions. Takes in up to the number of entities specified in count from the arg stream and executes a new transaction that runs a new query for each entity in this set of arguments, until the arg stream has been processed.
The transactions are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem.
Note that committing the transaction may not honor the context provided, as described further in DB.ExecTx.
func (*DB) UpdateStreamed ¶
UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. The update statement is created using BuildUpdateStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxRowsPerTransaction and concurrency is controlled via Options.MaxConnectionsPerTable.
func (*DB) UpsertStreamed ¶
func (db *DB) UpsertStreamed( ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity], ) error
UpsertStreamed bulk upserts the specified entities via NamedBulkExec. The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.
func (*DB) YieldAll ¶
func (db *DB) YieldAll(ctx context.Context, factoryFunc EntityFactoryFunc, query string, scope interface{}) (<-chan Entity, <-chan error)
YieldAll executes the query with the supplied scope, scans each resulting row into an entity returned by the factory function, and streams them into a returned channel.
type Entity ¶
type Entity interface { Fingerprinter IDer }
Entity is implemented by each type that works with the database package.
type EntityFactoryFunc ¶
type EntityFactoryFunc func() Entity
EntityFactoryFunc knows how to create an Entity.
type Fingerprinter ¶
type Fingerprinter interface { // Fingerprint returns the value that uniquely identifies the entity. Fingerprint() Fingerprinter }
Fingerprinter is implemented by every entity that uniquely identifies itself.
type ID ¶
type ID interface { // String returns the string representation form of the ID. // The String method is used to use the ID in functions // where it needs to be compared or hashed. String() string }
ID is a unique identifier of an entity.
type MysqlFuncLogger ¶
type MysqlFuncLogger func(v ...interface{})
MysqlFuncLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger.
func (MysqlFuncLogger) Print ¶
func (log MysqlFuncLogger) Print(v ...interface{})
Print implements the mysql.Logger interface.
type OnInitConnFunc ¶
OnInitConnFunc can be used to execute post Connect() arbitrary actions. It will be called after successfully initiated a new connection using the connector's Connect method.
type OnSuccess ¶
OnSuccess is a callback for successful (bulk) DML operations.
func OnSuccessSendTo ¶
type Options ¶
type Options struct { // Maximum number of open connections to the database. MaxConnections int `yaml:"max_connections" env:"MAX_CONNECTIONS" default:"16"` // Maximum number of connections per table, // regardless of what the connection is actually doing, // e.g. INSERT, UPDATE, DELETE. MaxConnectionsPerTable int `yaml:"max_connections_per_table" env:"MAX_CONNECTIONS_PER_TABLE" default:"8"` // MaxPlaceholdersPerStatement defines the maximum number of placeholders in an // INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders, // but this increases the execution time of queries and thus reduces the number of queries // that can be executed in parallel in a given time. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxPlaceholdersPerStatement int `yaml:"max_placeholders_per_statement" env:"MAX_PLACEHOLDERS_PER_STATEMENT" default:"8192"` // MaxRowsPerTransaction defines the maximum number of rows per transaction. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" env:"MAX_ROWS_PER_TRANSACTION" default:"8192"` // WsrepSyncWait enforces Galera cluster nodes to perform strict cluster-wide causality checks // before executing specific SQL queries determined by the number you provided. // Please refer to the below link for a detailed description. // https://icinga.com/docs/icinga-db/latest/doc/03-Configuration/#galera-cluster WsrepSyncWait int `yaml:"wsrep_sync_wait" env:"WSREP_SYNC_WAIT" default:"7"` }
Options define user configurable database options.
type PgsqlOnConflictConstrainter ¶
type PgsqlOnConflictConstrainter interface { // PgsqlOnConflictConstraint returns the primary or unique key constraint name of the PostgreSQL table. PgsqlOnConflictConstraint() string }
PgsqlOnConflictConstrainter implements the PgsqlOnConflictConstraint method, which returns the primary or unique key constraint name of the PostgreSQL table.
type RetryConnector ¶
RetryConnector wraps driver.Connector with retry logic.
The first connection attempt will be retried for retry.DefaultTimeout. After a prior successful connection, reconnection attempts are made infinitely.
func NewConnector ¶
func NewConnector(c driver.Connector, logger *logging.Logger, callbacks RetryConnectorCallbacks) *RetryConnector
NewConnector creates a fully initialized RetryConnector from the given args.
func (*RetryConnector) Driver ¶
func (c *RetryConnector) Driver() driver.Driver
Driver implements part of the driver.Connector interface.
type RetryConnectorCallbacks ¶
type RetryConnectorCallbacks struct { OnInitConn OnInitConnFunc OnRetryableError retry.OnRetryableErrorFunc OnSuccess retry.OnSuccessFunc }
RetryConnectorCallbacks specifies callbacks that are executed upon certain events.
type Scoper ¶
type Scoper interface {
Scope() any
}
Scoper implements the Scope method, which returns a struct specifying the WHERE conditions that entities must satisfy in order to be SELECTed.
type TableNamer ¶
type TableNamer interface {
TableName() string // TableName tells the table.
}
TableNamer implements the TableName method, which returns the table of the object.