Documentation
¶
Overview ¶
Package tap provides real-time observability of SQL executed during queen migrations. It captures per-statement events (SQL text, args, duration, rows affected, errors) and dispatches them to a Sink for rendering, logging, or assertion in tests.
Tap is scoped to migration execution — it does not proxy or wrap your application's database/sql traffic. For SQL migrations Queen emits a single event with the full SQL text. For Go-function migrations users opt in by wrapping the transaction with Tx(ctx, tx) and calling ExecContext/QueryContext through the wrapper.
Index ¶
- func BindSQL(sql string, args []any) string
- func NormalizeSQL(sql string) string
- func OperationOf(sql string) string
- func WithScope(ctx context.Context, sink Sink, version, name string, dir Direction) context.Context
- func WriteJSONL(w io.Writer, events []Event) error
- func WriteMarkdown(w io.Writer, events []Event) error
- type AnalyzerConfig
- type AnalyzerSink
- type ChannelSink
- type Direction
- type Event
- type ExplainMode
- type ExplainResult
- type Filter
- type FuncSink
- type JSONSink
- type Kind
- type MultiSink
- type NPlus1Detector
- type NPlus1Result
- type NopSink
- type ObservedTx
- func (w *ObservedTx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (w *ObservedTx) PrepareContext(ctx context.Context, query string) (*StmtWrapper, error)
- func (w *ObservedTx) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (w *ObservedTx) QueryRowContext(ctx context.Context, query string, args ...any) *RowWrapper
- func (w *ObservedTx) Unwrap() *sql.Tx
- type QueryStat
- type RecorderSink
- type RowWrapper
- type Sink
- type StmtWrapper
- func (s *StmtWrapper) Close() error
- func (s *StmtWrapper) ExecContext(ctx context.Context, args ...any) (sql.Result, error)
- func (s *StmtWrapper) QueryContext(ctx context.Context, args ...any) (*sql.Rows, error)
- func (s *StmtWrapper) QueryRowContext(ctx context.Context, args ...any) *RowWrapper
- func (s *StmtWrapper) Unwrap() *sql.Stmt
- type Summary
- type TxWrapperdeprecated
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BindSQL ¶
BindSQL replaces positional placeholders in sql with formatted args. It supports PostgreSQL-style $1 placeholders and ? placeholders.
func NormalizeSQL ¶
NormalizeSQL collapses whitespace and replaces literal values with placeholders so structurally identical queries can be grouped.
func OperationOf ¶
OperationOf returns the first SQL keyword in lower case.
func WithScope ¶
WithScope returns a context carrying the given sink and migration metadata. Queen calls this internally before invoking a Go-function migration so that tap.ObserveTx(ctx, tx) can find the active sink.
func WriteJSONL ¶
WriteJSONL writes events as newline-delimited JSON.
Types ¶
type AnalyzerConfig ¶
type AnalyzerConfig struct {
SlowThreshold time.Duration
NPlus1Threshold int
NPlus1Window time.Duration
NPlus1Cooldown time.Duration
BindArgs bool
}
AnalyzerConfig controls enrichment performed by AnalyzerSink.
func DefaultAnalyzerConfig ¶
func DefaultAnalyzerConfig() AnalyzerConfig
DefaultAnalyzerConfig is tuned for interactive migration runs.
type AnalyzerSink ¶
type AnalyzerSink struct {
// contains filtered or unexported fields
}
AnalyzerSink enriches exec events with operation, normalized SQL, bound SQL, slow-query markers, and N+1 detection before forwarding them.
func NewAnalyzerSink ¶
func NewAnalyzerSink(next Sink, cfg AnalyzerConfig) *AnalyzerSink
NewAnalyzerSink adds SQL summaries before forwarding events to next.
func (*AnalyzerSink) Emit ¶
func (s *AnalyzerSink) Emit(e Event)
type ChannelSink ¶
type ChannelSink struct {
Ch chan Event
// contains filtered or unexported fields
}
ChannelSink is a non-blocking buffered event queue.
func NewChannelSink ¶
func NewChannelSink(buffer int) *ChannelSink
NewChannelSink creates a buffered channel sink.
func (*ChannelSink) Close ¶
func (c *ChannelSink) Close()
Close closes the underlying channel. Call after migrations finish.
func (*ChannelSink) Dropped ¶
func (c *ChannelSink) Dropped() int64
Dropped returns the number of events dropped due to a full buffer.
func (*ChannelSink) Emit ¶
func (c *ChannelSink) Emit(e Event)
type Event ¶
type Event struct {
ID int64 `json:"id,omitempty"`
Kind Kind `json:"kind"`
Version string `json:"version"`
Name string `json:"name"`
Direction Direction `json:"direction"`
Index int `json:"index,omitempty"`
StartedAt time.Time `json:"started_at"`
Duration time.Duration `json:"duration_ns"`
Operation string `json:"operation,omitempty"`
SQL string `json:"sql,omitempty"`
SQLTemplate string `json:"sql_template,omitempty"`
BoundSQL string `json:"bound_sql,omitempty"`
Args []any `json:"args,omitempty"`
RowsAffected int64 `json:"rows_affected,omitempty"`
Error string `json:"error,omitempty"`
Slow bool `json:"slow,omitempty"`
NPlus1 bool `json:"n_plus_1,omitempty"`
NPlus1Count int `json:"n_plus_1_count,omitempty"`
NPlus1Alert bool `json:"n_plus_1_alert,omitempty"`
}
Event is a single tap record.
type ExplainMode ¶
type ExplainMode string
ExplainMode selects between a planner-only EXPLAIN and an execution-backed ANALYZE where the connected database supports it.
const ( ExplainOnly ExplainMode = "EXPLAIN" ExplainAnalyze ExplainMode = "EXPLAIN ANALYZE" )
type ExplainResult ¶
type ExplainResult struct {
Mode ExplainMode
Plan string
Duration time.Duration
}
ExplainResult is the textual plan returned by the database.
func RunExplain ¶
func RunExplain(ctx context.Context, db *sql.DB, driverName string, mode ExplainMode, query string, args []any) (*ExplainResult, error)
RunExplain executes EXPLAIN for query. It keeps the API deliberately small: callers pass the driver name Queen already knows, the original query, and captured args.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter matches tap events using terms such as "op:select d>100ms error".
func ParseFilter ¶
ParseFilter turns a compact filter string into matchable tokens.
type JSONSink ¶
type JSONSink struct {
// contains filtered or unexported fields
}
JSONSink writes each event as a JSON line to w. Safe for concurrent use.
func NewJSONSink ¶
NewJSONSink wraps w in a line-delimited JSON sink.
type Kind ¶
type Kind string
Kind classifies a tap event.
const ( // KindStart fires once before a migration runs. KindStart Kind = "start" // KindTxBegin fires after a migration transaction begins. KindTxBegin Kind = "tx_begin" // KindTxCommit fires after a migration transaction commits. KindTxCommit Kind = "tx_commit" // KindTxRollback fires after a migration transaction rolls back. KindTxRollback Kind = "tx_rollback" // KindExec fires for each captured SQL statement. KindExec Kind = "exec" // KindEnd fires once after a migration finishes (success or failure). KindEnd Kind = "end" )
type NPlus1Detector ¶
type NPlus1Detector struct {
// contains filtered or unexported fields
}
NPlus1Detector tracks repeated SELECT templates in a sliding window.
func NewNPlus1Detector ¶
func NewNPlus1Detector(threshold int, window, cooldown time.Duration) *NPlus1Detector
func (*NPlus1Detector) Record ¶
func (d *NPlus1Detector) Record(query string, t time.Time) NPlus1Result
type NPlus1Result ¶
NPlus1Result is the outcome of recording a query template occurrence.
type ObservedTx ¶
type ObservedTx struct {
// contains filtered or unexported fields
}
ObservedTx records Exec/Query calls made from a Go-function migration.
func ObserveTx ¶
func ObserveTx(ctx context.Context, tx *sql.Tx) *ObservedTx
ObserveTx returns a transaction wrapper bound to the tap scope on ctx.
func (*ObservedTx) ExecContext ¶
func (*ObservedTx) PrepareContext ¶
func (w *ObservedTx) PrepareContext(ctx context.Context, query string) (*StmtWrapper, error)
PrepareContext mirrors *sql.Tx.PrepareContext and returns a tapped statement.
func (*ObservedTx) QueryContext ¶
func (w *ObservedTx) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryContext emits the query with zero rows affected; database/sql does not expose that count.
func (*ObservedTx) QueryRowContext ¶
func (w *ObservedTx) QueryRowContext(ctx context.Context, query string, args ...any) *RowWrapper
QueryRowContext mirrors *sql.Tx.QueryRowContext and taps the call when Scan is invoked, so deferred QueryRow errors are captured accurately.
func (*ObservedTx) Unwrap ¶
func (w *ObservedTx) Unwrap() *sql.Tx
Unwrap returns the underlying *sql.Tx for operations not covered by the wrapper (e.g. Prepare, Stmt).
type QueryStat ¶
type QueryStat struct {
SQL string
SQLTemplate string
Operation string
Count int
Errors int
Slow int
NPlus1 int
TotalDuration time.Duration
AvgDuration time.Duration
MaxDuration time.Duration
}
QueryStat aggregates events by normalized SQL template.
type RecorderSink ¶
type RecorderSink struct {
// contains filtered or unexported fields
}
RecorderSink collects all events in memory. Intended for tests.
func NewRecorderSink ¶
func NewRecorderSink() *RecorderSink
NewRecorderSink returns an empty recorder.
func (*RecorderSink) Events ¶
func (r *RecorderSink) Events() []Event
Events returns a copy of recorded events.
type RowWrapper ¶
type RowWrapper struct {
// contains filtered or unexported fields
}
RowWrapper wraps *sql.Row so Scan can emit the final QueryRow error.
func (*RowWrapper) Scan ¶
func (r *RowWrapper) Scan(dest ...any) error
type Sink ¶
type Sink interface {
Emit(Event)
}
Sink receives tap events. Implementations should be concurrency-safe and quick.
func SinkFromContext ¶
SinkFromContext returns the sink registered with WithScope, or nil. Useful when a migration wants to emit custom events.
type StmtWrapper ¶
type StmtWrapper struct {
// contains filtered or unexported fields
}
StmtWrapper wraps *sql.Stmt and emits tap events for statement execution.
func (*StmtWrapper) Close ¶
func (s *StmtWrapper) Close() error
func (*StmtWrapper) ExecContext ¶
func (*StmtWrapper) QueryContext ¶
func (*StmtWrapper) QueryRowContext ¶
func (s *StmtWrapper) QueryRowContext(ctx context.Context, args ...any) *RowWrapper
func (*StmtWrapper) Unwrap ¶
func (s *StmtWrapper) Unwrap() *sql.Stmt
type Summary ¶
type Summary struct {
Total int
Execs int
Errors int
Slow int
NPlus1 int
TotalDuration time.Duration
Queries []QueryStat
}
Summary is an aggregate view of recorded tap events.
type TxWrapper
deprecated
type TxWrapper = ObservedTx
TxWrapper is kept as a source-compatible alias for the old wrapper name.
Deprecated: use ObservedTx.