tap

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package tap provides real-time observability of SQL executed during queen migrations. It captures per-statement events (SQL text, args, duration, rows affected, errors) and dispatches them to a Sink for rendering, logging, or assertion in tests.

Tap is scoped to migration execution — it does not proxy or wrap your application's database/sql traffic. For SQL migrations Queen emits a single event with the full SQL text. For Go-function migrations users opt in by wrapping the transaction with Tx(ctx, tx) and calling ExecContext/QueryContext through the wrapper.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BindSQL

func BindSQL(sql string, args []any) string

BindSQL replaces positional placeholders in sql with formatted args. It supports PostgreSQL-style $1 placeholders and ? placeholders.

func NormalizeSQL

func NormalizeSQL(sql string) string

NormalizeSQL collapses whitespace and replaces literal values with placeholders so structurally identical queries can be grouped.

func OperationOf

func OperationOf(sql string) string

OperationOf returns the first SQL keyword in lower case.

func WithScope

func WithScope(ctx context.Context, sink Sink, version, name string, dir Direction) context.Context

WithScope returns a context carrying the given sink and migration metadata. Queen calls this internally before invoking a Go-function migration so that tap.ObserveTx(ctx, tx) can find the active sink.

func WriteJSONL

func WriteJSONL(w io.Writer, events []Event) error

WriteJSONL writes events as newline-delimited JSON.

func WriteMarkdown

func WriteMarkdown(w io.Writer, events []Event) error

WriteMarkdown writes a compact migration tap report.

Types

type AnalyzerConfig

type AnalyzerConfig struct {
	SlowThreshold   time.Duration
	NPlus1Threshold int
	NPlus1Window    time.Duration
	NPlus1Cooldown  time.Duration
	BindArgs        bool
}

AnalyzerConfig controls enrichment performed by AnalyzerSink.

func DefaultAnalyzerConfig

func DefaultAnalyzerConfig() AnalyzerConfig

DefaultAnalyzerConfig is tuned for interactive migration runs.

type AnalyzerSink

type AnalyzerSink struct {
	// contains filtered or unexported fields
}

AnalyzerSink enriches exec events with operation, normalized SQL, bound SQL, slow-query markers, and N+1 detection before forwarding them.

func NewAnalyzerSink

func NewAnalyzerSink(next Sink, cfg AnalyzerConfig) *AnalyzerSink

NewAnalyzerSink adds SQL summaries before forwarding events to next.

func (*AnalyzerSink) Emit

func (s *AnalyzerSink) Emit(e Event)

type ChannelSink

type ChannelSink struct {
	Ch chan Event
	// contains filtered or unexported fields
}

ChannelSink is a non-blocking buffered event queue.

func NewChannelSink

func NewChannelSink(buffer int) *ChannelSink

NewChannelSink creates a buffered channel sink.

func (*ChannelSink) Close

func (c *ChannelSink) Close()

Close closes the underlying channel. Call after migrations finish.

func (*ChannelSink) Dropped

func (c *ChannelSink) Dropped() int64

Dropped returns the number of events dropped due to a full buffer.

func (*ChannelSink) Emit

func (c *ChannelSink) Emit(e Event)

type Direction

type Direction string

Direction matches queen.DirectionUp / queen.DirectionDown.

const (
	DirectionUp   Direction = "up"
	DirectionDown Direction = "down"
)

type Event

type Event struct {
	ID           int64         `json:"id,omitempty"`
	Kind         Kind          `json:"kind"`
	Version      string        `json:"version"`
	Name         string        `json:"name"`
	Direction    Direction     `json:"direction"`
	Index        int           `json:"index,omitempty"`
	StartedAt    time.Time     `json:"started_at"`
	Duration     time.Duration `json:"duration_ns"`
	Operation    string        `json:"operation,omitempty"`
	SQL          string        `json:"sql,omitempty"`
	SQLTemplate  string        `json:"sql_template,omitempty"`
	BoundSQL     string        `json:"bound_sql,omitempty"`
	Args         []any         `json:"args,omitempty"`
	RowsAffected int64         `json:"rows_affected,omitempty"`
	Error        string        `json:"error,omitempty"`
	Slow         bool          `json:"slow,omitempty"`
	NPlus1       bool          `json:"n_plus_1,omitempty"`
	NPlus1Count  int           `json:"n_plus_1_count,omitempty"`
	NPlus1Alert  bool          `json:"n_plus_1_alert,omitempty"`
}

Event is a single tap record.

type ExplainMode

type ExplainMode string

ExplainMode selects between a planner-only EXPLAIN and an execution-backed ANALYZE where the connected database supports it.

const (
	ExplainOnly    ExplainMode = "EXPLAIN"
	ExplainAnalyze ExplainMode = "EXPLAIN ANALYZE"
)

type ExplainResult

type ExplainResult struct {
	Mode     ExplainMode
	Plan     string
	Duration time.Duration
}

ExplainResult is the textual plan returned by the database.

func RunExplain

func RunExplain(ctx context.Context, db *sql.DB, driverName string, mode ExplainMode, query string, args []any) (*ExplainResult, error)

RunExplain executes EXPLAIN for query. It keeps the API deliberately small: callers pass the driver name Queen already knows, the original query, and captured args.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter matches tap events using terms such as "op:select d>100ms error".

func ParseFilter

func ParseFilter(expr string) (Filter, error)

ParseFilter turns a compact filter string into matchable tokens.

func (Filter) Match

func (f Filter) Match(e Event) bool

type FuncSink

type FuncSink func(Event)

FuncSink adapts a function into a Sink.

func (FuncSink) Emit

func (f FuncSink) Emit(e Event)

type JSONSink

type JSONSink struct {
	// contains filtered or unexported fields
}

JSONSink writes each event as a JSON line to w. Safe for concurrent use.

func NewJSONSink

func NewJSONSink(w io.Writer) *JSONSink

NewJSONSink wraps w in a line-delimited JSON sink.

func (*JSONSink) Emit

func (j *JSONSink) Emit(e Event)

Emit writes one JSON line. Writer errors are ignored by design.

type Kind

type Kind string

Kind classifies a tap event.

const (
	// KindStart fires once before a migration runs.
	KindStart Kind = "start"
	// KindTxBegin fires after a migration transaction begins.
	KindTxBegin Kind = "tx_begin"
	// KindTxCommit fires after a migration transaction commits.
	KindTxCommit Kind = "tx_commit"
	// KindTxRollback fires after a migration transaction rolls back.
	KindTxRollback Kind = "tx_rollback"
	// KindExec fires for each captured SQL statement.
	KindExec Kind = "exec"
	// KindEnd fires once after a migration finishes (success or failure).
	KindEnd Kind = "end"
)

type MultiSink

type MultiSink []Sink

MultiSink fans events out to multiple sinks.

func (MultiSink) Emit

func (m MultiSink) Emit(e Event)

type NPlus1Detector

type NPlus1Detector struct {
	// contains filtered or unexported fields
}

NPlus1Detector tracks repeated SELECT templates in a sliding window.

func NewNPlus1Detector

func NewNPlus1Detector(threshold int, window, cooldown time.Duration) *NPlus1Detector

func (*NPlus1Detector) Record

func (d *NPlus1Detector) Record(query string, t time.Time) NPlus1Result

type NPlus1Result

type NPlus1Result struct {
	Matched bool
	Count   int
	Alert   bool
}

NPlus1Result is the outcome of recording a query template occurrence.

type NopSink

type NopSink struct{}

NopSink discards every event.

func (NopSink) Emit

func (NopSink) Emit(Event)

type ObservedTx

type ObservedTx struct {
	// contains filtered or unexported fields
}

ObservedTx records Exec/Query calls made from a Go-function migration.

func ObserveTx

func ObserveTx(ctx context.Context, tx *sql.Tx) *ObservedTx

ObserveTx returns a transaction wrapper bound to the tap scope on ctx.

func Tx deprecated

func Tx(ctx context.Context, tx *sql.Tx) *ObservedTx

Tx returns an ObservedTx bound to ctx and tx.

Deprecated: use ObserveTx. The migration already runs inside a transaction; this helper only observes SQL calls for tap.

func (*ObservedTx) ExecContext

func (w *ObservedTx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*ObservedTx) PrepareContext

func (w *ObservedTx) PrepareContext(ctx context.Context, query string) (*StmtWrapper, error)

PrepareContext mirrors *sql.Tx.PrepareContext and returns a tapped statement.

func (*ObservedTx) QueryContext

func (w *ObservedTx) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

QueryContext emits the query with zero rows affected; database/sql does not expose that count.

func (*ObservedTx) QueryRowContext

func (w *ObservedTx) QueryRowContext(ctx context.Context, query string, args ...any) *RowWrapper

QueryRowContext mirrors *sql.Tx.QueryRowContext and taps the call when Scan is invoked, so deferred QueryRow errors are captured accurately.

func (*ObservedTx) Unwrap

func (w *ObservedTx) Unwrap() *sql.Tx

Unwrap returns the underlying *sql.Tx for operations not covered by the wrapper (e.g. Prepare, Stmt).

type QueryStat

type QueryStat struct {
	SQL           string
	SQLTemplate   string
	Operation     string
	Count         int
	Errors        int
	Slow          int
	NPlus1        int
	TotalDuration time.Duration
	AvgDuration   time.Duration
	MaxDuration   time.Duration
}

QueryStat aggregates events by normalized SQL template.

func TopQueries

func TopQueries(events []Event, sortBy string, limit int) []QueryStat

TopQueries returns up to limit query stats sorted by total, count, avg, or max.

type RecorderSink

type RecorderSink struct {
	// contains filtered or unexported fields
}

RecorderSink collects all events in memory. Intended for tests.

func NewRecorderSink

func NewRecorderSink() *RecorderSink

NewRecorderSink returns an empty recorder.

func (*RecorderSink) Emit

func (r *RecorderSink) Emit(e Event)

Emit appends e.

func (*RecorderSink) Events

func (r *RecorderSink) Events() []Event

Events returns a copy of recorded events.

func (*RecorderSink) Reset

func (r *RecorderSink) Reset()

Reset clears recorded events.

type RowWrapper

type RowWrapper struct {
	// contains filtered or unexported fields
}

RowWrapper wraps *sql.Row so Scan can emit the final QueryRow error.

func (*RowWrapper) Scan

func (r *RowWrapper) Scan(dest ...any) error

type Sink

type Sink interface {
	Emit(Event)
}

Sink receives tap events. Implementations should be concurrency-safe and quick.

func SinkFromContext

func SinkFromContext(ctx context.Context) Sink

SinkFromContext returns the sink registered with WithScope, or nil. Useful when a migration wants to emit custom events.

type StmtWrapper

type StmtWrapper struct {
	// contains filtered or unexported fields
}

StmtWrapper wraps *sql.Stmt and emits tap events for statement execution.

func (*StmtWrapper) Close

func (s *StmtWrapper) Close() error

func (*StmtWrapper) ExecContext

func (s *StmtWrapper) ExecContext(ctx context.Context, args ...any) (sql.Result, error)

func (*StmtWrapper) QueryContext

func (s *StmtWrapper) QueryContext(ctx context.Context, args ...any) (*sql.Rows, error)

func (*StmtWrapper) QueryRowContext

func (s *StmtWrapper) QueryRowContext(ctx context.Context, args ...any) *RowWrapper

func (*StmtWrapper) Unwrap

func (s *StmtWrapper) Unwrap() *sql.Stmt

type Summary

type Summary struct {
	Total         int
	Execs         int
	Errors        int
	Slow          int
	NPlus1        int
	TotalDuration time.Duration
	Queries       []QueryStat
}

Summary is an aggregate view of recorded tap events.

func Summarize

func Summarize(events []Event) Summary

type TxWrapper deprecated

type TxWrapper = ObservedTx

TxWrapper is kept as a source-compatible alias for the old wrapper name.

Deprecated: use ObservedTx.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL