Documentation
¶
Index ¶
- Constants
- Variables
- func Backup(ctx context.Context, db *sql.DB, w io.Writer) error
- func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)
- type CDCPublisher
- type CDCSubscriber
- type Change
- type ChangeSet
- type ChangeSetInterceptor
- type ChangeSetSerializer
- type Conn
- type ConnectHookFn
- type Connector
- func (c *Connector) Close()
- func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)
- func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)
- func (c *Connector) Driver() driver.Driver
- func (c *Connector) LatestSeq() uint64
- func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
- func (c *Connector) RemoveConsumer(ctx context.Context, name string) error
- func (c *Connector) TakeSnapshot(ctx context.Context, db *sql.DB) (sequence uint64, err error)
- type DBSnapshotter
- type Driver
- type EmbeddedNatsConfig
- type JSONPublisher
- type NATSPublisher
- type NATSSnapshotter
- type NATSSubscriber
- type NoopPublisher
- type NoopSnapshotter
- type NoopSubscriber
- type Option
- func WithCDCPublisher(pub CDCPublisher) Option
- func WithCDCSubscriber(sub CDCSubscriber) Option
- func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option
- func WithConnectHook(fn ConnectHookFn) Option
- func WithDBSnapshotter(snap DBSnapshotter) Option
- func WithDeliverPolicy(deliverPolicy string) Option
- func WithDisableDDLSync() Option
- func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option
- func WithExtensions(extensions ...string) Option
- func WithName(name string) Option
- func WithNatsOptions(options ...nats.Option) Option
- func WithPublisherTimeout(timeout time.Duration) Option
- func WithReplicas(replicas int) Option
- func WithReplicationStream(stream string) Option
- func WithReplicationURL(url string) Option
- func WithSnapshotInterval(interval time.Duration) Option
- func WithStreamMaxAge(maxAge time.Duration) Option
- func WithWaitFor(ch chan struct{}) Option
- type SequenceProvider
- type Statement
- func (s *Statement) Begin() bool
- func (s *Statement) Columns() []string
- func (s *Statement) Commit() bool
- func (s *Statement) DDL() bool
- func (s *Statement) HasDistinct() bool
- func (s *Statement) HasReturning() bool
- func (s *Statement) IsCreateTable() bool
- func (s *Statement) IsDelete() bool
- func (s *Statement) IsExplain() bool
- func (s *Statement) IsInsert() bool
- func (s *Statement) IsSelect() bool
- func (s *Statement) IsUpdate() bool
- func (s *Statement) Parameters() []string
- func (s *Statement) Rollback() bool
- func (s *Statement) Source() string
- func (s *Statement) Type() string
- type WriterPublisher
Constants ¶
View Source
const ( TypeExplain = "EXPLAIN" TypeSelect = "SELECT" TypeInsert = "INSERT" TypeUpdate = "UPDATE" TypeDelete = "DELETE" TypeCreateTable = "CREATE TABLE" TypeCreateIndex = "CREATE INDEX" TypeCreateView = "CREATE VIEW" TypeCreateTrigger = "CREATE TRIGGER" TypeCreateVirtualTable = "CREATE VIRTUAL TABLE" TypeAlterTable = "ALTER TABLE" TypeVacuum = "VACUUM" TypeDrop = "DROP" TypeAnalyze = "ANALYZE" TypeBegin = "BEGIN" TypeCommit = "COMMIT" TypeRollback = "ROLLBACK" TypeSavepoint = "SAVEPOINT" TypeRelease = "RELEASE" TypeOther = "OTHER" )
View Source
const DefaultStream = "ha_replication"
Variables ¶
View Source
var (
ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrNatsNotConfigured = errors.New("NATS not configured")
Functions ¶
func LatestSnapshot ¶ added in v0.0.4
Types ¶
type CDCPublisher ¶
type CDCSubscriber ¶ added in v0.0.6
type Change ¶
type Change struct {
Database string `json:"database,omitempty"`
Table string `json:"table,omitempty"`
Columns []string `json:"columns,omitempty"`
Operation string `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
OldRowID int64 `json:"old_rowid,omitempty"`
NewRowID int64 `json:"new_rowid,omitempty"`
OldValues []any `json:"old_values,omitempty"`
NewValues []any `json:"new_values,omitempty"`
Command string `json:"command,omitempty"`
Args []any `json:"args,omitempty"`
}
type ChangeSet ¶
type ChangeSet struct {
Node string `json:"node"`
ProcessID int64 `json:"process_id"`
Filename string `json:"filename"`
Changes []Change `json:"changes"`
Timestamp int64 `json:"timestamp_ns"`
StreamSeq uint64 `json:"-"`
// contains filtered or unexported fields
}
func NewChangeSet ¶
func NewChangeSet(node string, filename string, publisher CDCPublisher) *ChangeSet
func (*ChangeSet) Send ¶
func (cs *ChangeSet) Send(pub CDCPublisher) error
func (*ChangeSet) SetInterceptor ¶ added in v0.0.6
func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)
type ChangeSetInterceptor ¶ added in v0.0.6
type ChangeSetSerializer ¶ added in v0.0.6
type Conn ¶
type Conn struct {
*sqlite3.SQLiteConn
// contains filtered or unexported fields
}
type ConnectHookFn ¶ added in v0.0.2
type ConnectHookFn func(conn *sqlite3.SQLiteConn) error
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
func (*Connector) DeliveredInfo ¶
func (*Connector) LatestSnapshot ¶
func (*Connector) RemoveConsumer ¶
type DBSnapshotter ¶ added in v0.0.6
type Driver ¶ added in v0.0.2
type Driver struct {
Extensions []string
ConnectHook ConnectHookFn
Options []Option
}
type EmbeddedNatsConfig ¶
type JSONPublisher ¶ added in v0.0.6
type JSONPublisher struct {
// contains filtered or unexported fields
}
func NewJSONPublisher ¶ added in v0.0.6
func NewJSONPublisher(w io.Writer) *JSONPublisher
func (*JSONPublisher) Publish ¶ added in v0.0.6
func (p *JSONPublisher) Publish(cs *ChangeSet) error
type NATSPublisher ¶ added in v0.0.6
type NATSPublisher struct {
// contains filtered or unexported fields
}
func NewNATSPublisher ¶ added in v0.0.6
func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)
func (*NATSPublisher) Publish ¶ added in v0.0.6
func (p *NATSPublisher) Publish(cs *ChangeSet) error
type NATSSnapshotter ¶ added in v0.0.6
type NATSSnapshotter struct {
// contains filtered or unexported fields
}
func NewNATSSnapshotter ¶ added in v0.0.6
func (*NATSSnapshotter) LatestSnapshot ¶ added in v0.0.6
func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
func (*NATSSnapshotter) LatestSnapshotSequence ¶ added in v0.0.6
func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)
func (*NATSSnapshotter) TakeSnapshot ¶ added in v0.0.6
type NATSSubscriber ¶ added in v0.0.6
type NATSSubscriber struct {
// contains filtered or unexported fields
}
func NewNATSSubscriber ¶ added in v0.0.6
func (*NATSSubscriber) DeliveredInfo ¶ added in v0.0.6
func (*NATSSubscriber) LatestSeq ¶ added in v0.0.6
func (s *NATSSubscriber) LatestSeq() uint64
func (*NATSSubscriber) RemoveConsumer ¶ added in v0.0.6
func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error
func (*NATSSubscriber) Start ¶ added in v0.0.6
func (s *NATSSubscriber) Start() error
type NoopPublisher ¶ added in v0.0.6
type NoopPublisher struct{}
func NewNoopPublisher ¶ added in v0.0.6
func NewNoopPublisher() *NoopPublisher
func (*NoopPublisher) Publish ¶ added in v0.0.6
func (p *NoopPublisher) Publish(cs *ChangeSet) error
type NoopSnapshotter ¶ added in v0.0.6
type NoopSnapshotter struct {
// contains filtered or unexported fields
}
func NewNoopSnapshotter ¶ added in v0.0.6
func NewNoopSnapshotter() *NoopSnapshotter
func (*NoopSnapshotter) LatestSnapshot ¶ added in v0.0.6
func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)
func (*NoopSnapshotter) TakeSnapshot ¶ added in v0.0.6
type NoopSubscriber ¶ added in v0.0.6
type NoopSubscriber struct{}
func NewNoopSubscriber ¶ added in v0.0.6
func NewNoopSubscriber() *NoopSubscriber
func (*NoopSubscriber) DeliveredInfo ¶ added in v0.0.6
func (*NoopSubscriber) LatestSeq ¶ added in v0.0.6
func (*NoopSubscriber) LatestSeq() uint64
func (*NoopSubscriber) RemoveConsumer ¶ added in v0.0.6
func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error
func (*NoopSubscriber) Start ¶ added in v0.0.6
func (*NoopSubscriber) Start() error
type Option ¶
type Option func(*Connector)
func WithCDCPublisher ¶
func WithCDCPublisher(pub CDCPublisher) Option
func WithCDCSubscriber ¶ added in v0.0.6
func WithCDCSubscriber(sub CDCSubscriber) Option
func WithChangeSetInterceptor ¶ added in v0.0.6
func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option
func WithConnectHook ¶ added in v0.0.2
func WithConnectHook(fn ConnectHookFn) Option
func WithDBSnapshotter ¶ added in v0.0.6
func WithDBSnapshotter(snap DBSnapshotter) Option
func WithDeliverPolicy ¶
func WithDisableDDLSync ¶ added in v0.0.4
func WithDisableDDLSync() Option
func WithEmbeddedNatsConfig ¶
func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option
func WithExtensions ¶
func WithNatsOptions ¶
func WithPublisherTimeout ¶
func WithReplicas ¶
func WithReplicationStream ¶ added in v0.0.9
func WithReplicationURL ¶
func WithSnapshotInterval ¶
func WithStreamMaxAge ¶
func WithWaitFor ¶ added in v0.0.4
func WithWaitFor(ch chan struct{}) Option
type SequenceProvider ¶
type SequenceProvider interface {
LatestSeq() uint64
}
type Statement ¶
type Statement struct {
// contains filtered or unexported fields
}
func ParseStatement ¶ added in v0.0.4
func (*Statement) HasDistinct ¶
func (*Statement) HasReturning ¶
func (*Statement) IsCreateTable ¶
func (*Statement) Parameters ¶
type WriterPublisher ¶ added in v0.0.6
type WriterPublisher struct {
// contains filtered or unexported fields
}
func NewWriterPublisher ¶ added in v0.0.6
func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher
func (*WriterPublisher) Publish ¶ added in v0.0.6
func (p *WriterPublisher) Publish(cs *ChangeSet) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.