Versions in this module Expand all Collapse all v6 v6.0.3 Aug 21, 2024 v6.0.2 Mar 30, 2021 Changes in this version + type ExecFunc func(*sql.Tx, streams.Message) error + func (fn ExecFunc) Exec(tx *sql.Tx, msg streams.Message) error + type Executor interface + Exec func(*sql.Tx, streams.Message) error + type Sink struct + func NewSink(db *sql.DB, batch int, exec Executor) (*Sink, error) + func (p *Sink) Close() error + func (p *Sink) Commit(ctx context.Context) error + func (p *Sink) Process(msg streams.Message) error + func (p *Sink) WithPipe(pipe streams.Pipe) + type Transaction interface + Begin func(*sql.Tx) error + Commit func(*sql.Tx) error Other modules containing this package github.com/rafalmnich/streams