Documentation
¶
Index ¶
- Variables
- type CDCIterator
- type Iterator
- type Snapshotter
- type Source
- func (s *Source) Ack(ctx context.Context, pos record.Position) error
- func (s *Source) Open(ctx context.Context, cfg plugins.Config) error
- func (s *Source) Read(ctx context.Context, pos record.Position) (record.Record, error)
- func (s *Source) Teardown() error
- func (s *Source) Validate(cfg plugins.Config) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoRows is returned when there are no rows to read. // * This can happen if the database is closed early, if there are no // rows in the result set, or if there are no results left to return. ErrNoRows = cerrors.Errorf("no more rows") // ErrSnapshotInterrupt is return when Teardown or other signal cancels // an in-progress snapshot. ErrSnapshotInterrupt = cerrors.Errorf("interrupted snapshot") )
var ( // ErrNotFound is returned when no record is returned at a given Position ErrNotFound = cerrors.New("record not found") // ErrNoTable is returned when no table is configured for reading ErrNoTable = cerrors.New("must provide a table name") // ErrInvalidURL is returned when the DB can't be connected to with the // provided URL ErrInvalidURL = cerrors.New("incorrect url") )
Functions ¶
This section is empty.
Types ¶
type CDCIterator ¶
type CDCIterator struct {
// contains filtered or unexported fields
}
CDCIterator listens for events from the WAL and pushes them into its buffer. It iterates through that Buffer so that we have a controlled way to get 1 record from our CDC buffer without having to expose a loop to the main Read.
func NewIterator ¶
func NewIterator(rch chan record.Record) *CDCIterator
NewIterator creates an iterator from a channel and sets its position to 0.
func (*CDCIterator) HasNext ¶
func (i *CDCIterator) HasNext() bool
HasNext returns true if there is an item in the buffer.
func (*CDCIterator) Next ¶
func (i *CDCIterator) Next() (record.Record, error)
Next returns the next record in the buffer. This is a blocking operation so it should only be called if we've checked that HasNext is true or else it will block until a record is inserted into the queue.
func (*CDCIterator) Push ¶
func (i *CDCIterator) Push(r record.Record)
Push appends a Record to the buffer.
func (*CDCIterator) Teardown ¶
func (i *CDCIterator) Teardown() error
Teardown is a noop that returns nil since our buffer requires no cleanup
type Snapshotter ¶
type Snapshotter struct {
// contains filtered or unexported fields
}
Snapshotter implements the Iterator interface for capturing an initial table snapshot.
func NewSnapshotter ¶
func NewSnapshotter(ctx context.Context, db *sql.DB, table string, columns []string, key string, ) (*Snapshotter, error)
NewSnapshotter returns a Snapshotter that is an Iterator. * NewSnapshotter attempts to load the sql rows into the Snapshotter and will immediately begin to return them to subsequent Read calls. * It acquires a read only transaction lock before reading the table. * If Teardown is called while a snpashot is in progress, it will return an ErrSnapshotInterrupt error.
func (*Snapshotter) HasNext ¶
func (s *Snapshotter) HasNext() bool
HasNext returns whether s.rows has another row. * It must be called before Snapshotter#Next is or else it will fail. * It increments the interal position if another row exists. * If HasNext is called and no rows are available, it will mark the snapshot as complete and then returns.
func (*Snapshotter) Next ¶
func (s *Snapshotter) Next() (record.Record, error)
Next returns the next record in the buffer. * If the snapshot is complete it returns an ErrNoRows error * If there is no rows property it returns an ErrNoRows error * If Next is called after HasNext has returned false, it will return an ErrNoRows error.
func (*Snapshotter) Teardown ¶
func (s *Snapshotter) Teardown() error
Teardown cleans up the database snapshotter by committing and closing the connection to sql.Rows * If the snapshot is not complete yet, it will return an ErrSnpashotInterrupt * Teardown must be called by the caller, it will not automatically be called when the snapshot is completed. * Teardown handles all of its manual cleanup first then calls cancel to stop any unhandled contexts that we've received.
type Source ¶
type Source struct {
// inherit from Mutex so we can gain locks on our Source.
sync.Mutex
// contains filtered or unexported fields
}
Source holds a connection to the database.
func (*Source) Open ¶
Open attempts to open a database connection to Postgres. We use the `with` pattern here to mutate the Source struct at each point.
func (*Source) Read ¶
Read attempts to increment the Position and then queries for the row at that Position. * It builds the payload from the rows and assigns a timestamp, key, metadata, payload, and position to the record. * Read takes the _current_ position of the connector, and returns the row at the _next_ position.