Documentation
¶
Overview ¶
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultMaxIdleConns is the default maximum number of idle connections // allowed in the database pool. DefaultMaxIdleConns = runtime.GOMAXPROCS(0) // DefaultMaxOpenConns is the default maximum number of open connections // allowed in the database pool. DefaultMaxOpenConns = DefaultMaxIdleConns * 10 // DefaultMaxConnLifetime is the default maximum lifetime of database // connections. DefaultMaxConnLifetime = 10 * time.Minute )
Functions ¶
func CreateSchema ¶
CreateSchema creates the schema elements necessary to use the given database.
It does not return an error if the schema already exists.
Types ¶
type AggregateDriver ¶
type AggregateDriver interface {
// InsertAggregateMetaData inserts meta-data for an aggregate instance.
//
// It returns false if the row already exists.
InsertAggregateMetaData(
ctx context.Context,
tx *sql.Tx,
ak string,
md persistence.AggregateMetaData,
) (bool, error)
// UpdateAggregateMetaData updates meta-data for an aggregate instance.
//
// It returns false if the row does not exist or md.Revision is not current.
UpdateAggregateMetaData(
ctx context.Context,
tx *sql.Tx,
ak string,
md persistence.AggregateMetaData,
) (bool, error)
// SelectAggregateMetaData selects an aggregate instance's meta-data.
SelectAggregateMetaData(
ctx context.Context,
db *sql.DB,
ak, hk, id string,
) (persistence.AggregateMetaData, error)
}
AggregateDriver is the subset of the Driver interface that is concerned with aggregates.
type DSNProvider ¶
type DSNProvider struct {
// DriverName is the driver name to be passed to sql.Open().
DriverName string
// DSN is the data-source name to be passed to sql.Open().
DSN string
// Driver is the Verity SQL driver to use with this database. If it is nil,
// it is chosen automatically from one of the built-in drivers.
Driver Driver
// MaxIdleConnections is the maximum number of idle connections allowed in
// the database pool.
//
// If it is zero, DefaultMaxIdleConns is used.
MaxIdleConns int
// MaxOpenConnections is the maximum number of open connections allowed in
// the database pool.
//
// If it is zero, DefaultMaxOpenConns is used.
MaxOpenConns int
// maxConnLifetime is the maximum lifetime of database connections.
// If it is zero, DefaultMaxConnLifetime is used.
MaxConnLifetime time.Duration
// contains filtered or unexported fields
}
DSNProvider is an implementation of provider.Provider for SQL that opens a a database pool using a DSN.
func (*DSNProvider) Open ¶
func (p *DSNProvider) Open(ctx context.Context, k string) (persistence.DataStore, error)
Open returns a data-store for a specific application.
k is the identity key of the application.
Data stores are opened for exclusive use. If another engine instance has already opened this application's data-store, ErrDataStoreLocked is returned.
type Driver ¶
type Driver interface {
AggregateDriver
EventDriver
OffsetDriver
ProcessDriver
QueueDriver
// IsCompatibleWith returns nil if this driver can be used with db.
IsCompatibleWith(ctx context.Context, db *sql.DB) error
// Begin starts a transaction for use in a peristence.Transaction.
Begin(ctx context.Context, db *sql.DB) (*sql.Tx, error)
// CreateSchema creates any SQL schema elements required by the driver.
CreateSchema(ctx context.Context, db *sql.DB) error
// DropSchema removes any SQL schema elements created by CreateSchema().
DropSchema(ctx context.Context, db *sql.DB) error
}
Driver is used to interface with the underlying SQL database.
type EventDriver ¶
type EventDriver interface {
// UpdateNextOffset increments the next offset by one and returns the new
// value.
UpdateNextOffset(
ctx context.Context,
tx *sql.Tx,
ak string,
) (uint64, error)
// InsertEvent saves an event at a specific offset.
InsertEvent(
ctx context.Context,
tx *sql.Tx,
o uint64,
env *envelopespec.Envelope,
) error
// InsertEventFilter inserts a filter that limits selected events to those
// with a portable name in the given set.
//
// It returns the filter's ID.
InsertEventFilter(
ctx context.Context,
db *sql.DB,
ak string,
f map[string]struct{},
) (int64, error)
// DeleteEventFilter deletes an event filter.
//
// f is the filter ID, as returned by InsertEventFilter().
DeleteEventFilter(
ctx context.Context,
db *sql.DB,
f int64,
) error
// PurgeEventFilters deletes all event filters for the given application.
PurgeEventFilters(
ctx context.Context,
db *sql.DB,
ak string,
) error
// SelectNextEventOffset selects the next "unused" offset.
SelectNextEventOffset(
ctx context.Context,
db *sql.DB,
ak string,
) (uint64, error)
// SelectEventsByType selects events that match the given type filter.
//
// f is a filter ID, as returned by InsertEventFilter(). o is the minimum
// offset to include in the results.
SelectEventsByType(
ctx context.Context,
db *sql.DB,
ak string,
f int64,
o uint64,
) (*sql.Rows, error)
// SelectEventsBySource selects events that were produced by a specific
// handler.
SelectEventsBySource(
ctx context.Context,
db *sql.DB,
ak, hk, id string,
o uint64,
) (*sql.Rows, error)
// SelectOffsetByMessageID selects the offset of the message with the given
// ID. It returns false as a second return value if the message cannot be
// found.
SelectOffsetByMessageID(
ctx context.Context,
db *sql.DB,
id string,
) (uint64, bool, error)
// ScanEvent scans the next event from a row-set returned by
// SelectEventsByType() and SelectEventsBySource().
ScanEvent(
rows *sql.Rows,
ev *persistence.Event,
) error
}
EventDriver is the subset of the Driver interface that is concerned with events.
type OffsetDriver ¶
type OffsetDriver interface {
// LoadOffset loads the last offset associated with the given source
// application key sk. ak is the 'owner' application key.
//
// If there is no offset associated with the given source application key,
// the offset is returned as zero and error as nil.
LoadOffset(
ctx context.Context,
db *sql.DB,
ak, sk string,
) (uint64, error)
// InsertOffset inserts a new offset associated with the given source
// application key sk. ak is the 'owner' application key.
//
// It returns false if the row already exists.
InsertOffset(
ctx context.Context,
tx *sql.Tx,
ak, sk string,
n uint64,
) (bool, error)
// UpdateOffset updates the offset associated with the given source
// application key sk. ak is the 'owner' application key.
//
// It returns false if the row does not exist or c is not the current offset
// associated with the given application key.
UpdateOffset(
ctx context.Context,
tx *sql.Tx,
ak, sk string,
c, n uint64,
) (bool, error)
}
OffsetDriver is the subset of the Driver interface that is concerned with persisting event stream offsets.
type ProcessDriver ¶
type ProcessDriver interface {
// InsertProcessInstance inserts a process instance.
//
// It returns false if the row already exists.
InsertProcessInstance(
ctx context.Context,
tx *sql.Tx,
ak string,
inst persistence.ProcessInstance,
) (bool, error)
// UpdateProcessInstance updates a process instance.
//
// It returns false if the row does not exist or inst.Revision is not
// current.
UpdateProcessInstance(
ctx context.Context,
tx *sql.Tx,
ak string,
inst persistence.ProcessInstance,
) (bool, error)
// DeleteProcessInstance deletes a process instance.
//
// It returns false if the row does not exist or inst.Revision is not
// current.
DeleteProcessInstance(
ctx context.Context,
tx *sql.Tx,
ak string,
inst persistence.ProcessInstance,
) (bool, error)
// SelectProcessInstance selects a process instance's data.
SelectProcessInstance(
ctx context.Context,
db *sql.DB,
ak, hk, id string,
) (persistence.ProcessInstance, error)
}
ProcessDriver is the subset of the Driver interface that is concerned with processess.
type Provider ¶
type Provider struct {
// DB is the SQL database to use.
DB *sql.DB
// Driver is the Verity SQL driver to use with this database. If it is nil,
// it is chosen automatically from one of the built-in drivers.
Driver Driver
// contains filtered or unexported fields
}
Provider is an implementation of provider.Provider for SQL that uses an existing open database pool.
type QueueDriver ¶
type QueueDriver interface {
// InsertQueueMessage inserts a message in the queue.
//
// It returns false if the row already exists.
InsertQueueMessage(
ctx context.Context,
tx *sql.Tx,
ak string,
m persistence.QueueMessage,
) (bool, error)
// UpdateQueueMessage updates meta-data about a message that is already on
// the queue.
//
// It returns false if the row does not exist or m.Revision is not current.
UpdateQueueMessage(
ctx context.Context,
tx *sql.Tx,
ak string,
m persistence.QueueMessage,
) (bool, error)
// DeleteQueueMessage deletes a message from the queue.
//
// It returns false if the row does not exist or m.Revision is not current.
DeleteQueueMessage(
ctx context.Context,
tx *sql.Tx,
ak string,
m persistence.QueueMessage,
) (bool, error)
// DeleteQueueTimeoutMessagesByProcessInstance deletes timeout messages that
// were produced by a specific process instance.
DeleteQueueTimeoutMessagesByProcessInstance(
ctx context.Context,
tx *sql.Tx,
ak string,
inst persistence.ProcessInstance,
) error
// SelectQueueMessages selects up to n messages from the queue.
SelectQueueMessages(
ctx context.Context,
db *sql.DB,
ak string,
n int,
) (*sql.Rows, error)
// ScanQueueMessage scans the next message from a row-set returned by
// SelectQueueMessages().
ScanQueueMessage(
rows *sql.Rows,
m *persistence.QueueMessage,
) error
}
QueueDriver is the subset of the Driver interface that is concerned with the message queue subsystem.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package mysql is a MySQL driver for the SQL persistence provider.
|
Package mysql is a MySQL driver for the SQL persistence provider. |
|
Package postgres is a PostgreSQL driver for the SQL persistence provider.
|
Package postgres is a PostgreSQL driver for the SQL persistence provider. |
|
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
|
Package sqlite is an SQlite v3 driver for the SQL persistence provider. |