Documentation
¶
Index ¶
- Constants
- Variables
- type AutoSQLTransform
- type Condition
- func HasAtLeastNDistinctValuesCondition(col string, n int) (Condition, error)
- func HasAtLeastNRowsCondition(n int) (Condition, error)
- func HasAtMostNDistinctValuesCondition(col string, n int) (Condition, error)
- func HasAtMostNRowsCondition(n int) (Condition, error)
- func HasExactlyNDistinctValuesCondition(col string, n int) (Condition, error)
- func HasExactlyNRowsCondition(n int) (Condition, error)
- func HasNoDuplicates(col string) (Condition, error)
- func HasNoNullValues(col string) (Condition, error)
- func NewSQLCondition(sql string) (Condition, error)
- type ConsoleDestination
- type ConsoleLogger
- type Coordinator
- type DefaultInserter
- func (d *DefaultInserter) Initialize(l Logger, tableName string, db *sql.DB, cols []string) error
- func (d *DefaultInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
- func (d *DefaultInserter) New() SQLInserter
- func (d *DefaultInserter) PreCommit() error
- func (d *DefaultInserter) Statement() string
- type Destination
- type DestinationHook
- type DevNull
- type Event
- type ExcelDestination
- type ExcelRange
- type ExcelRangePoint
- type ExcelSource
- type GenericLogger
- type GraphNode
- type HTTPSource
- type LiteralSource
- type LiteralSourceFormat
- type LogLevel
- type Logger
- type MSSQLInserter
- type MandrillDestination
- type MandrillPrincipal
- type Message
- type Middleware
- type NamedSliceSource
- type ParameterTable
- type ParameterTableDestination
- type Passthrough
- type PostgresInserter
- type SQLDestination
- type SQLInserter
- type SQLSource
- type Sequenceable
- type SequenceableTransform
- type Sequencer
- type SlackOpts
- type SliceDestination
- type SliceSource
- type Source
- type SourceHook
- type Stopper
- type Stream
- type TransactionManager
- type Transform
- type TransformHook
Constants ¶
const ( DefaultBufferSize = 100 DestinationWildcard = "" )
const ConsoleDestinationName = "CONSOLE"
const DefaultExcelDateFormat = time.RFC3339
const DefaultRowsPerBatch = 500
const InsertQuery = `INSERT INTO %s (%s) VALUES (%s)`
const (
ParameterTableName = "PARAMETERS"
)
const TxManagerMaxRetries = 32
Variables ¶
var ( ErrExcelTooManyWildcards = errors.New("the Excel source/destination range can have at most one wildcard") ErrExcelCannotIncludeColumns = errors.New("the Excel source range cannot be dynamic in X if it includes columns") ErrColumnsNotSpecified = errors.New("the Excel range should either include columns or they should be specified in the COLUMNS option") )
var ErrEOS = errors.New("end of stream")
var ErrInterrupted = errors.New("The execution was interrupted by a context cancellation")
var ErrTransactionManagerFinished = errors.New("transaction manager is in a committed or rolled back state and can no longer provide new transactions")
var Inserters = map[string]SQLInserter{"mssql": &MSSQLInserter{}, "postgres": &PostgresInserter{}}
var LiteralSourceFormats = map[string]LiteralSourceFormat{ "JSON_ARRAY": JSONArray, "JSON_OBJECTS": JSONObjects, "CSV": CSVWithoutHeader, }
var SQLDriverManager sqlDriverManager
SQLDriverManager is a singleton that makes sure there is only a single DB object per connection, rather than one per source/destination
Functions ¶
This section is empty.
Types ¶
type AutoSQLTransform ¶
type AutoSQLTransform struct {
Name string
Table string `aql: "STAGING_TABLE, optional"`
StagingSQLConnString string `aql: "STAGING_CONNECTION_STRING, optional"`
Query string
ParameterTable *ParameterTable
ParameterNames []string
// contains filtered or unexported fields
}
AutoSQLTransform is a transform that drains the source, sticks the rows in an in-memory SQLite database (not GLOBAL - it doesn't share the cache), and then runs an SQL query on that, returning the result as rows.
Essentially, this makes it a combination of a SQL destination and a SQL source, where the two are automatically wired up to work together.
A current limitation is that the source dataset must fit entirely in memory. If this is not possible, it will be necessary to use eg. a GLOBAL destination and to configure SET IN_MEMORY = 'OFF';
func (*AutoSQLTransform) Open ¶
func (a *AutoSQLTransform) Open(source Stream, dest Stream, l Logger, st Stopper)
func (*AutoSQLTransform) SetName ¶
func (a *AutoSQLTransform) SetName(name string)
type Condition ¶
Condition is a func that returns true if the message passes the test and false otherwise.
func HasAtMostNRowsCondition ¶
func HasNoDuplicates ¶
func HasNoNullValues ¶
func NewSQLCondition ¶
type ConsoleDestination ¶
type ConsoleDestination struct {
Name string
FormatAsJSON bool
Writer io.Writer
// contains filtered or unexported fields
}
func (*ConsoleDestination) Open ¶
func (cd *ConsoleDestination) Open(s Stream, l Logger, st Stopper)
func (*ConsoleDestination) Ping ¶
func (cd *ConsoleDestination) Ping() error
type ConsoleLogger ¶
type ConsoleLogger struct {
MinLevel LogLevel
// contains filtered or unexported fields
}
func NewConsoleLogger ¶
func NewConsoleLogger(minLevel LogLevel) *ConsoleLogger
func (*ConsoleLogger) Chan ¶
func (cl *ConsoleLogger) Chan() chan<- Event
func (*ConsoleLogger) Error ¶
func (cl *ConsoleLogger) Error() error
func (*ConsoleLogger) Wait ¶
func (cl *ConsoleLogger) Wait()
type Coordinator ¶
type Coordinator interface {
RegisterHooks(...interface{}) //arguments should be SourceHook, TransformHook or DestinationHook
AddSource(name string, alias string, s Source) error
AddDestination(name string, alias string, d Destination) error
AddTest(node string, name string, desc string, c Condition) error
AddTransform(name string, alias string, t Transform) error
AddConstraint(before, after string) error
Connect(from string, to string) error
UseContext(ctx context.Context)
Compile() error
Execute() error
Stop()
}
func NewCoordinator ¶
func NewCoordinator(logger Logger, txManager TransactionManager) Coordinator
type DefaultInserter ¶
type DefaultInserter struct {
// contains filtered or unexported fields
}
func (*DefaultInserter) Initialize ¶
func (*DefaultInserter) InsertBatch ¶
func (d *DefaultInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
func (*DefaultInserter) New ¶
func (d *DefaultInserter) New() SQLInserter
func (*DefaultInserter) PreCommit ¶
func (d *DefaultInserter) PreCommit() error
func (*DefaultInserter) Statement ¶
func (d *DefaultInserter) Statement() string
type Destination ¶
type Destination interface {
//Ping checks that the destination is available. It is used to verify
//the destination at runtime.
Ping() error
//Open gives the destination a stream to start pulling from and an error stream
Open(Stream, Logger, Stopper)
}
func NewParameterTableDestination ¶
func NewParameterTableDestination(p *ParameterTable, cols []string) Destination
type DestinationHook ¶
type DestinationHook func(string, Destination) (Destination, error)
DestinationHook takes the destination name and interface and does something to it, possibly returning an error. If it returns a non-nil Destination, this will overwrite the existing Destination.
type ExcelDestination ¶
type ExcelDestination struct {
Name string
Filename string `aql:"FILE"`
Overwrite bool `aql:"OVERWRITE, optional"`
Template string `aql:"TEMPLATE, optional"`
Sheet string `aql:"SHEET"`
Range ExcelRange
Alias string
Transpose bool `aql:"TRANSPOSE, optional"`
Cols []string `aql:"COLUMNS, optional"`
// contains filtered or unexported fields
}
func (*ExcelDestination) Ping ¶
func (ed *ExcelDestination) Ping() error
type ExcelRange ¶
type ExcelRange struct {
X1 int
Y1 int
X2 ExcelRangePoint
Y2 ExcelRangePoint
}
type ExcelRangePoint ¶
type ExcelSource ¶
type ExcelSource struct {
Name string
Filename string `aql:"FILE"`
Sheet string `aql:"SHEET"`
Range ExcelRange
RangeIncludesColumns bool
Dateformat string
Cols []string `aql:"COLUMNS, optional"`
// contains filtered or unexported fields
}
func (*ExcelSource) Columns ¶
func (s *ExcelSource) Columns() []string
func (*ExcelSource) Ping ¶
func (s *ExcelSource) Ping() error
func (*ExcelSource) SetName ¶
func (s *ExcelSource) SetName(name string)
type GenericLogger ¶
type GenericLogger struct {
MinLevel LogLevel
Writer io.Writer
// contains filtered or unexported fields
}
func NewGenericLogger ¶
func NewGenericLogger(minLevel LogLevel, writer io.Writer) *GenericLogger
func (*GenericLogger) Chan ¶
func (gl *GenericLogger) Chan() chan<- Event
func (*GenericLogger) Error ¶
func (gl *GenericLogger) Error() error
func (*GenericLogger) Wait ¶
func (gl *GenericLogger) Wait()
type HTTPSource ¶
type HTTPSource struct {
Name string
URL string `aql:"URL"` //URL of request
Headers map[string]string //Headers to add to request, optional
JSONPath string `aql:"JSON_PATH, optional"` //Path to object containing array of rows, optional
NoColumnNames bool //If response has array of primitive types rather than objects with column names, eg. ["bob",2] instead of {"name": "bob", "age": 2}
ColumnNames []string `aql:"COLUMNS, optional"` //if NoColumnNames is true, this should be provided
PaginationLimitName string `aql:"PAGINATION_LIMIT_PARAMETER, optional"` //query parameter for pagination limit (optional)
PaginationOffsetName string `aql:"PAGINATION_OFFSET_PARAMETER, optional"` //query parameter for pagination offset (optional)
PageSize int `aql:"PAGE_SIZE, optional"` //size of page for pagination
// contains filtered or unexported fields
}
func (*HTTPSource) Ping ¶
func (h *HTTPSource) Ping() error
func (*HTTPSource) SetName ¶
func (h *HTTPSource) SetName(name string)
type LiteralSource ¶
type LiteralSource struct {
Name string
Content string
Columns []string
Format LiteralSourceFormat
// contains filtered or unexported fields
}
func (*LiteralSource) Ping ¶
func (ls *LiteralSource) Ping() error
func (*LiteralSource) SetName ¶
func (ls *LiteralSource) SetName(name string)
type LiteralSourceFormat ¶
type LiteralSourceFormat int
const ( //JSONArray is a flat array eg. [[2,3],[3,4]] JSONArray LiteralSourceFormat = iota //JSONObjects is an array of objects, eg. [{"a": 1, "b": 2}, {"a": 4, "b": 5}] JSONObjects //CSVWithoutHeader is a CSV string without headers, eg. 1, 2\n4, 5. Only string //types are supported - other types will not be inferred, so eg. the above example //will map to strings ["1", "2"], ["4", "5"]. CSVWithoutHeader )
type Logger ¶
type Logger interface {
// Chan returns a chan that can be used to log events
Chan() chan<- Event
// Error returns the latest error that has been logged. The logger must keep track of this.
Error() error
// Wait should block until the logger is done processing messages in its chan. The sender should close the chan before calling this or it will deadlock
Wait()
}
func SlackWrapper ¶
SlackWrapper intercepts messages to a logger and forwards any with the given minimum log level to Slack incoming Webhook.
type MSSQLInserter ¶
type MSSQLInserter struct {
// contains filtered or unexported fields
}
func (*MSSQLInserter) Initialize ¶
func (*MSSQLInserter) InsertBatch ¶
func (m *MSSQLInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
func (*MSSQLInserter) New ¶
func (m *MSSQLInserter) New() SQLInserter
func (*MSSQLInserter) PreCommit ¶
func (m *MSSQLInserter) PreCommit() error
type MandrillDestination ¶
type MandrillDestination struct {
Name string
APIKey string `aql:"API_KEY"`
Sender *MandrillPrincipal
Recipients []MandrillPrincipal
SplitByRow bool `aql:"SPLIT, optional"`
Template string `aql:"TEMPLATE"`
Subject string `aql:"SUBJECT, optional"`
// contains filtered or unexported fields
}
func (*MandrillDestination) Open ¶
func (d *MandrillDestination) Open(s Stream, l Logger, st Stopper)
func (*MandrillDestination) Ping ¶
func (d *MandrillDestination) Ping() error
type MandrillPrincipal ¶
func ParseEmailRecipients ¶
func ParseEmailRecipients(s string) ([]MandrillPrincipal, error)
type Middleware ¶
Middleware is a func that transforms a stream.
type NamedSliceSource ¶
type NamedSliceSource struct {
// contains filtered or unexported fields
}
func (*NamedSliceSource) Open ¶
func (ns *NamedSliceSource) Open(dest Stream, logger Logger, stop Stopper)
func (*NamedSliceSource) Ping ¶
func (ns *NamedSliceSource) Ping() error
func (*NamedSliceSource) SetName ¶
func (ns *NamedSliceSource) SetName(name string)
type ParameterTable ¶
func NewParameterTable ¶
func NewParameterTable() *ParameterTable
func (*ParameterTable) Declare ¶
func (p *ParameterTable) Declare(name string) error
func (*ParameterTable) Get ¶
func (p *ParameterTable) Get(name string) (interface{}, bool)
func (*ParameterTable) Set ¶
func (p *ParameterTable) Set(name string, value interface{}) error
type ParameterTableDestination ¶
type ParameterTableDestination struct {
// contains filtered or unexported fields
}
func (*ParameterTableDestination) Open ¶
func (p *ParameterTableDestination) Open(s Stream, l Logger, st Stopper)
func (*ParameterTableDestination) Ping ¶
func (p *ParameterTableDestination) Ping() error
type Passthrough ¶
func (*Passthrough) Open ¶
func (p *Passthrough) Open(source Stream, dest Stream, logger Logger, stop Stopper)
func (*Passthrough) SetName ¶
func (p *Passthrough) SetName(name string)
type PostgresInserter ¶
type PostgresInserter struct {
// contains filtered or unexported fields
}
func (*PostgresInserter) Initialize ¶
func (*PostgresInserter) InsertBatch ¶
func (m *PostgresInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
func (*PostgresInserter) New ¶
func (m *PostgresInserter) New() SQLInserter
func (*PostgresInserter) PreCommit ¶
func (m *PostgresInserter) PreCommit() error
type SQLDestination ¶
type SQLDestination struct {
Name string
Driver string
ConnectionString string
Table string `aql:"TABLE"`
Tx *sql.Tx
RowsPerBatch int `aql:"ROWS_PER_BATCH,optional"`
DropNulls bool `aql:"DROP_NULLS,optional"`
TxUseFunc func() (*sql.Tx, error)
TxReleaseFunc func()
Alias string
// contains filtered or unexported fields
}
func (*SQLDestination) Columns ¶
func (sq *SQLDestination) Columns() []string
func (*SQLDestination) Ping ¶
func (sq *SQLDestination) Ping() error
type SQLInserter ¶
type SQLInserter interface {
New() SQLInserter
//Initialize with connection details and database.
Initialize(l Logger, tableName string, db *sql.DB, cols []string) error
//Insert a single batch
InsertBatch(tx *sql.Tx, msgs []Message) error
//Hook that is called before the transaction manager/etc commits/rollbacks the transaction
PreCommit() error
}
SQLInserter inserts rows into a SQL database. It contains driver-specific optimisations:
- MS SQL Server: uses bulk copy
It does not perform any transaction management.
type SQLSource ¶
type Sequenceable ¶
type Sequenceable interface {
Sequence(seq []string)
}
type SequenceableTransform ¶
type SequenceableTransform interface {
Transform
Sequenceable
}
type Sequencer ¶
Sequencer is a synchronization utility to ensure that a collection of named tasks run in a given sequence even if they are started in parallel.
func NewSequencer ¶
type SliceDestination ¶
func (*SliceDestination) Open ¶
func (sd *SliceDestination) Open(s Stream, logger Logger, stop Stopper)
func (*SliceDestination) Ping ¶
func (sd *SliceDestination) Ping() error
func (*SliceDestination) Results ¶
func (sd *SliceDestination) Results() [][]interface{}
type SliceSource ¶
type SliceSource struct {
// contains filtered or unexported fields
}
func (*SliceSource) Ping ¶
func (s *SliceSource) Ping() error
func (*SliceSource) SetName ¶
func (s *SliceSource) SetName(name string)
type Source ¶
type Source interface {
//SetName sets the name (or alias) of the source for outgoing messages
SetName(name string)
//Ping attempts to connect to the source without creating a stream.
//This is used to check that the source is valid at run-time.
Ping() error
//Get connects to the source and returns a stream of data.
Open(Stream, Logger, Stopper)
}
Source represents data inputs into the system, eg. a database query.
func NewNamedSliceSource ¶
func NewSliceSource ¶
type SourceHook ¶
SourceHook takes the source name and interface and does something to it, possibly returning an error. If it returns a non-nil Source, this will overwrite the existing Source.
type Stopper ¶
type Stopper interface {
//Stopped checks if the stopper is stopped
Stopped() bool
//Stops. This is irreversible.
Stop()
}
Stopper is used as a condition variable stop halt the execution of the program. It is safe for concurrent use by multiple goroutines.
func NewStopper ¶
func NewStopper() Stopper
type Stream ¶
type Stream interface {
//Columns returns a slice of column names
Columns() []string
//SetColumns sets the destination columns. destination can be a wildcard.
SetColumns(destination string, cols []string) error
//Chan is the channel for the stream. It will be closed by the sender when the stream is at an end.
Chan(destination string) chan Message
}
Stream represents a stream of data such as a database resultset
func NewSequencedStream ¶
type TransactionManager ¶
type TransactionManager interface {
// Register makes the connection known to the connection manager. It does
// NOT begin a new transaction.
Register(aql.Connection) error
// Use will begin a new transaction (if none exists) or re-use the existing
// transaction, locking it so that no one may concurrently use it.
Tx(connection string) (*sql.Tx, error)
// Release the transaction so that it may be used by others. It panics if the
// connection has not been registered.
Release(connection string)
// Commit commits ALL transactions. It is an error to call Use() or Register()
// after Commit().
Commit() error
// Rollback rolls back ALL transactions. It is an error to call Use() or Register()
// after Commit().
Rollback() error
}
TransactionManager provides a single transaction per connection, to be used by all components that read or write from the connection. All transactions are then either committed or rolled back together. It is a 2PC Tx manager. Only supported for connections implementing sql.Tx for now.
func NewTransactionManager ¶
func NewTransactionManager(l Logger) TransactionManager
type Transform ¶
type Transform interface {
//SetName sets the alias of the transform for outgoing messages
SetName(name string)
//Open gives the transform a stream to start pulling from
Open(source Stream, dest Stream, logger Logger, stop Stopper)
}
Transform is a component that is neither a source nor a sink. It is configured with one or more sources, and one or more sinks.
Source Files
¶
- auto_sql_transform.go
- condition.go
- console_dest.go
- coordinator.go
- destination.go
- excel_dest.go
- excel_source.go
- http_source.go
- literal_source.go
- logger.go
- mandrill_destination.go
- middleware.go
- multiplexer.go
- parameters.go
- sequencer.go
- slack.go
- source.go
- sql_dest.go
- sql_driver_manager.go
- sql_inserter.go
- sql_source.go
- stopper.go
- stream.go
- transaction_manager.go
- transform.go
- util.go