source

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoRows is returned when there are no rows to read.
	// * This can happen if the database is closed early, if there are no
	// rows in the result set, or if there are no results left to return.
	ErrNoRows = cerrors.Errorf("no more rows")
	// ErrSnapshotInterrupt is return when Teardown or other signal cancels
	// an in-progress snapshot.
	ErrSnapshotInterrupt = cerrors.Errorf("interrupted snapshot")
)
View Source
var (
	// ErrNotFound is returned when no record is returned at a given Position
	ErrNotFound = cerrors.New("record not found")
	// ErrNoTable is returned when no table is configured for reading
	ErrNoTable = cerrors.New("must provide a table name")
	// ErrInvalidURL is returned when the DB can't be connected to with the
	// provided URL
	ErrInvalidURL = cerrors.New("incorrect url")
)

Functions

This section is empty.

Types

type CDCIterator

type CDCIterator struct {
	// contains filtered or unexported fields
}

CDCIterator listens for events from the WAL and pushes them into its buffer. It iterates through that Buffer so that we have a controlled way to get 1 record from our CDC buffer without having to expose a loop to the main Read.

func NewIterator

func NewIterator(rch chan record.Record) *CDCIterator

NewIterator creates an iterator from a channel and sets its position to 0.

func (*CDCIterator) HasNext

func (i *CDCIterator) HasNext() bool

HasNext returns true if there is an item in the buffer.

func (*CDCIterator) Next

func (i *CDCIterator) Next() (record.Record, error)

Next returns the next record in the buffer. This is a blocking operation so it should only be called if we've checked that HasNext is true or else it will block until a record is inserted into the queue.

func (*CDCIterator) Push

func (i *CDCIterator) Push(r record.Record)

Push appends a Record to the buffer.

func (*CDCIterator) Teardown

func (i *CDCIterator) Teardown() error

Teardown is a noop that returns nil since our buffer requires no cleanup

type Iterator

type Iterator interface {
	HasNext() bool
	Next() (record.Record, error)
	Teardown() error
}

Iterator defines an iterator interface that all Iterators must fulfill.

type Snapshotter

type Snapshotter struct {
	// contains filtered or unexported fields
}

Snapshotter implements the Iterator interface for capturing an initial table snapshot.

func NewSnapshotter

func NewSnapshotter(ctx context.Context,
	db *sql.DB,
	table string,
	columns []string,
	key string,
) (*Snapshotter, error)

NewSnapshotter returns a Snapshotter that is an Iterator. * NewSnapshotter attempts to load the sql rows into the Snapshotter and will immediately begin to return them to subsequent Read calls. * It acquires a read only transaction lock before reading the table. * If Teardown is called while a snpashot is in progress, it will return an ErrSnapshotInterrupt error.

func (*Snapshotter) HasNext

func (s *Snapshotter) HasNext() bool

HasNext returns whether s.rows has another row. * It must be called before Snapshotter#Next is or else it will fail. * It increments the interal position if another row exists. * If HasNext is called and no rows are available, it will mark the snapshot as complete and then returns.

func (*Snapshotter) Next

func (s *Snapshotter) Next() (record.Record, error)

Next returns the next record in the buffer. * If the snapshot is complete it returns an ErrNoRows error * If there is no rows property it returns an ErrNoRows error * If Next is called after HasNext has returned false, it will return an ErrNoRows error.

func (*Snapshotter) Teardown

func (s *Snapshotter) Teardown() error

Teardown cleans up the database snapshotter by committing and closing the connection to sql.Rows * If the snapshot is not complete yet, it will return an ErrSnpashotInterrupt * Teardown must be called by the caller, it will not automatically be called when the snapshot is completed. * Teardown handles all of its manual cleanup first then calls cancel to stop any unhandled contexts that we've received.

type Source

type Source struct {

	// inherit from Mutex so we can gain locks on our Source.
	sync.Mutex
	// contains filtered or unexported fields
}

Source holds a connection to the database.

func (*Source) Ack

func (s *Source) Ack(ctx context.Context, pos record.Position) error

func (*Source) Open

func (s *Source) Open(ctx context.Context, cfg plugins.Config) error

Open attempts to open a database connection to Postgres. We use the `with` pattern here to mutate the Source struct at each point.

func (*Source) Read

func (s *Source) Read(ctx context.Context, pos record.Position) (record.Record, error)

Read attempts to increment the Position and then queries for the row at that Position. * It builds the payload from the rows and assigns a timestamp, key, metadata, payload, and position to the record. * Read takes the _current_ position of the connector, and returns the row at the _next_ position.

func (*Source) Teardown

func (s *Source) Teardown() error

Teardown hits the killswitch and waits for the database connection and CDC subscriptions to close.

func (*Source) Validate

func (s *Source) Validate(cfg plugins.Config) error

Validate opens up a connection to the DB to see if it was successful and then calls Teardown to drop the DB connection and clean up.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL