ha

package module
v0.11.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: Apache-2.0 Imports: 33 Imported by: 6

README

go-ha

Go Reference Go Report Card

A Go database/sql base driver providing high availability for SQLite databases through NATS-based replication.

Features

  • High Availability: Ensure your SQLite databases remain accessible and consistent across multiple nodes.
  • Replication: Synchronize data across nodes using NATS messaging.
  • Flexible Conflict Resolution: Customize replication strategies and handle conflicts with ChangeSetInterceptor.
  • Cluster Modes:
    • Leaderless: Read/write from any node with last-writer-wins resolution.
    • Leader-based: Redirect writes to a leader to prevent conflicts.
  • NATS Integration: Choose between embedded or external NATS servers.
  • Seamless Integration: Drop-in replacement for existing Go database/sql usage.
  • gRPC Support: Expose database operations via gRPC for remote access the sqlite database.
  • Snapshotting: Automatic database snapshots for efficient synchronization.
  • Cross-database Queries: Transparently execute cross-shard queries using SQL hints /*+ db=DSN */.
  • Transaction Undo: Revert already committed transactions.

Architecture

go-ha enables high availability for SQLite by replicating changes across multiple nodes using NATS as the messaging backbone. Each node maintains a local SQLite database and publishes changes to a NATS stream. Other nodes subscribe to this stream and apply the changes, ensuring data consistency.

Comparison with SQLite Session Extension

go-ha provides a modern alternative to SQLite's session extension with enhanced capabilities:

Feature go-ha Session Extension
DDL Replication
DML Replication
Multi-node Cluster
NATS Integration
Conflict Resolution ✓ (Customizable) Limited
gRPC Support
Cross-database Queries

go-ha is designed for distributed systems requiring schema synchronization across nodes, while the session extension is suited for simpler peer-to-peer synchronization.

Installation

go get github.com/litesql/go-ha

Quick Start

Basic Usage
Instance 1
package main

import (
    "database/sql"
    "github.com/litesql/go-ha"
	sqlite3ha "github.com/litesql/go-sqlite3-ha"
)

func main() {
    slog.SetLogLoggerLevel(slog.LevelDebug)
    // Open a connection with HA enabled
    c, err := sqlite3ha.NewConnector("file:my.db?_journal=WAL&_timeout=5000",
		ha.WithName("instance1"),
        ha.WithReplicationID("example"),
		ha.WithGrpcPort(5000),		
		ha.WithEmbeddedNatsConfig(&ha.EmbeddedNatsConfig{
			Port: 4222,
		}))
	if err != nil {
		panic(err)
	}
	defer c.Close()

	db := sql.OpenDB(c)
	defer db.Close()

    // Use like any other database/sql driver
    _, err = db.Exec("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
    if err != nil {
        panic(err)
    }

    // Insert data
    _, err = db.Exec("INSERT INTO users (name) VALUES (?)", "Alice")
    if err != nil {
        panic(err)
    }
}
Instance 2 (leader-based)
package main

import (
    "database/sql"
    "github.com/litesql/go-ha"
	sqlite3ha "github.com/litesql/go-sqlite3-ha"
)

func main() {
    slog.SetLogLoggerLevel(slog.LevelDebug)
    db, err := sql.Open("sqlite3-ha", "file:my2.db?_journal=WAL&_timeout=5000&replicationURL=nats://localhost:4222&replicationID=example&name=instance2&grpcInsecure=true&leaderProvider=static:localhost:5000")
	if err != nil {
		panic(err)
	}
	defer db.Close()

	_, err = db.Exec("INSERT INTO users(name) values('grpc leader redirect')")
	if err != nil {
		panic(err)
	}

	var name string
	err = db.QueryRowContext(context.Background(), "SELECT name FROM users ORDER BY rowid desc LIMIT 1").Scan(&name)
	if err != nil {
		panic(err)
	}

	fmt.Println("User:", name)
}
Cross-Database Queries

Execute queries across multiple databases using SQL hints:

// Query data from all loaded databases from the driver
rows, err := db.Query("/*+ db=.* */ SELECT * FROM users WHERE id = ?", 1)
if err != nil {
    panic(err)
}
defer rows.Close()

Drivers

go-ha provides several driver implementations:

Configuration Options

DSN Parameter Description Default
asyncPublisher Enables asynchronous publishing of replication events. false
asyncPublisherOutboxDir Directory to store outbox files for asynchronous publishing.
autoStart Automatically starts subscriber and snapshotter on initialization. true
replicationID Unique ID for this replication instance. [database filename]
deliverPolicy Delivery policy for replication events (all, last, etc.). all
disableSubscriber Disables the replication subscriber. false
disablePublisher Disables the replication publisher. false
disableDBSnapshotter Disables database snapshotter for initial sync. false
disableDDLSync Disables synchronization of DDL changes. false
grpcInsecure Use insecure gRPC connections. false
grpcPort TCP port for gRPC server.
grpcTimeout Timeout for gRPC operations. 5s
grpcToken Authentication token for gRPC server.
leaderProvider Leader election strategy (e.g., dynamic:local-host:port, static:remote-host:port).
name Node name in the cluster.
natsConfigFile Path to NATS server config file.
natsName Name for embedded NATS server.
natsPort Port for embedded NATS server. 4222
natsStoreDir Data directory for embedded NATS server.
publisherTimeout Timeout for publishing replication events. 15s
replicationStream Name of the NATS stream for replication.
replicationURL NATS server URL for replication.
replicas Number of replicas for high availability. 1
rowIdentify Row identification strategy: pk, rowid, or full. pk
snapshotInterval Interval between database snapshots. 1m
streamMaxAge Maximum age of messages in replication stream.

Performance Considerations

  • Use embedded NATS for single-machine deployments to reduce latency.
  • Configure snapshotInterval based on your write frequency.
  • For high-throughput scenarios, consider leader-based clusters to avoid conflicts.
  • Monitor NATS stream size and adjust streamMaxAge to prevent unbounded growth.

Troubleshooting

Common Issues
  • Replication not working: Ensure NATS server is running and accessible via replicationURL.
  • Conflicts in leaderless mode: Implement a custom ChangeSetInterceptor for complex conflict resolution.
  • Slow synchronization: Check snapshotter configuration and network latency.
  • gRPC connection errors: Verify grpcPort and grpcToken settings.

Projects Using go-ha

  • HA: Highly available leaderless SQLite cluster with HTTP and PostgreSQL Wire Protocol
  • PocketBase HA: Highly available leaderless PocketBase cluster
  • sqlc-http: Generate net/http Go server from SQL

Contributing

We welcome contributions! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests if applicable
  5. Submit a pull request

For major changes, please open an issue first to discuss the proposed changes.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	TypeExplain            = "EXPLAIN"
	TypeSelect             = "SELECT"
	TypeInsert             = "INSERT"
	TypeUpdate             = "UPDATE"
	TypeDelete             = "DELETE"
	TypeCreateTable        = "CREATE TABLE"
	TypeCreateIndex        = "CREATE INDEX"
	TypeCreateView         = "CREATE VIEW"
	TypeCreateTrigger      = "CREATE TRIGGER"
	TypeCreateVirtualTable = "CREATE VIRTUAL TABLE"
	TypeAlterTable         = "ALTER TABLE"
	TypeVacuum             = "VACUUM"
	TypeDrop               = "DROP"
	TypeAnalyze            = "ANALYZE"
	TypeBegin              = "BEGIN"
	TypeCommit             = "COMMIT"
	TypeRollback           = "ROLLBACK"
	TypeSavepoint          = "SAVEPOINT"
	TypeRelease            = "RELEASE"
	TypePragma             = "PRAGMA"
	TypeOther              = "OTHER"
)
View Source
const DefaultStream = "ha_replication"
View Source
const TXCookieName = "_txseq"

Variables

View Source
var (
	ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrSnapshotterNotConfigured = errors.New("snapshotter not configured")

Functions

func ConnectHandler added in v0.8.0

func ConnectHandler(opts ...connect.HandlerOption) (path string, handler http.Handler)

func ContextLocalDB added in v0.11.4

func ContextLocalDB(ctx context.Context, useLocalDB bool) context.Context

func CrossShardQuery added in v0.5.8

func CrossShardQuery(ctx context.Context, stmt *Statement, args []driver.NamedValue, queryRouter *regexp.Regexp, driverConn driverConnFunc) (driver.Rows, error)

func LatestSnapshot added in v0.0.4

func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)

func ListDSN added in v0.1.3

func ListDSN() []string

func ListReplicationIDs added in v0.4.5

func ListReplicationIDs() []string

func LocalDB added in v0.11.4

func LocalDB(ctx context.Context) bool

func Shutdown added in v0.0.16

func Shutdown()

Types

type AsyncNATSPublisher added in v0.0.15

type AsyncNATSPublisher struct {
	*NATSPublisher
	// contains filtered or unexported fields
}

func NewAsyncNATSPublisher added in v0.0.15

func NewAsyncNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig, db *sql.DB) (*AsyncNATSPublisher, error)

func (*AsyncNATSPublisher) Close added in v0.0.15

func (p *AsyncNATSPublisher) Close() error

func (*AsyncNATSPublisher) Publish added in v0.0.15

func (p *AsyncNATSPublisher) Publish(cs *ChangeSet) error

func (*AsyncNATSPublisher) Sequence added in v0.0.20

func (p *AsyncNATSPublisher) Sequence() uint64

type BackupFn added in v0.0.10

type BackupFn func(context.Context, *sql.DB, io.Writer) error

type CDCPublisher

type CDCPublisher interface {
	Publish(data []DebeziumData) error
}

type Change

type Change struct {
	Database  string   `json:"database,omitempty"`
	Table     string   `json:"table,omitempty"`
	Columns   []string `json:"columns,omitempty"`
	PKColumns []string `json:"pk_columns,omitempty"`
	Operation string   `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
	OldRowID  int64    `json:"old_rowid,omitempty"`
	NewRowID  int64    `json:"new_rowid,omitempty"`
	OldValues []any    `json:"old_values,omitempty"`
	NewValues []any    `json:"new_values,omitempty"`
	Command   string   `json:"command,omitempty"`
	Args      []any    `json:"args,omitempty"`
	TsNs      int64    `json:"ts_ns,omitempty"`
}

func (Change) PKColumnsNames added in v0.2.0

func (c Change) PKColumnsNames() []string

func (Change) PKNewValues added in v0.2.1

func (c Change) PKNewValues() []any

func (Change) PKOldValues added in v0.2.0

func (c Change) PKOldValues() []any

func (Change) Reverse added in v0.10.0

func (c Change) Reverse() Change

type ChangeSet

type ChangeSet struct {
	Node      string   `json:"node"`
	ProcessID int64    `json:"process_id"`
	Filename  string   `json:"filename"`
	Changes   []Change `json:"changes"`
	Timestamp int64    `json:"timestamp_ns"`
	Subject   string   `json:"-"`
	StreamSeq uint64   `json:"-"`
	// contains filtered or unexported fields
}

func NewChangeSet

func NewChangeSet(node string, replicationID string) *ChangeSet

func (*ChangeSet) AddChange

func (cs *ChangeSet) AddChange(change Change)

func (*ChangeSet) Apply

func (cs *ChangeSet) Apply(db *sql.DB) (err error)

func (*ChangeSet) Clear

func (cs *ChangeSet) Clear()

func (*ChangeSet) DebeziumData added in v0.3.0

func (cs *ChangeSet) DebeziumData() []DebeziumData

func (*ChangeSet) Send

func (cs *ChangeSet) Send(pub Publisher) error

func (*ChangeSet) SetConnProvider added in v0.0.10

func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)

func (*ChangeSet) SetInterceptor added in v0.0.6

func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)

func (*ChangeSet) SetStrategy added in v0.10.5

func (cs *ChangeSet) SetStrategy(t sqlStrategy)

type ChangeSetInterceptor added in v0.0.6

type ChangeSetInterceptor interface {
	BeforeApply(*ChangeSet, *sql.Conn) (skip bool, err error)
	AfterApply(*ChangeSet, *sql.Conn, error) error
}

type ChangeSetSerializer added in v0.0.6

type ChangeSetSerializer func(*ChangeSet) ([]byte, error)

type ConnHooksConfig added in v0.4.5

type ConnHooksConfig struct {
	NodeName             string
	ReplicationID        string
	DisableDDLSync       bool
	Publisher            Publisher
	CDC                  CDCPublisher
	TxSeqTrackerProvider TxSeqTrackerProvider
	Leader               LeaderProvider
	GrpcTimeout          time.Duration
	GrpcToken            string
	GrpcInsecure         bool
	QueryRouter          *regexp.Regexp
}

type ConnHooksFactory added in v0.0.10

type ConnHooksFactory func(cfg ConnHooksConfig) ConnHooksProvider

type ConnHooksProvider added in v0.0.10

type ConnHooksProvider interface {
	RegisterHooks(driver.Conn, *Connector) (driver.Conn, error)
	DisableHooks(*sql.Conn) error
	EnableHooks(*sql.Conn) error
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func LookupConnector added in v0.1.0

func LookupConnector(dsn string) (*Connector, bool)

func LookupConnectorByReplicationID added in v0.4.5

func LookupConnectorByReplicationID(id string) (*Connector, bool)

func NewConnector

func NewConnector(dsn string, drv driver.Driver, connHooksFactory ConnHooksFactory, backupFn BackupFn, options ...Option) (*Connector, error)

func (*Connector) Backup added in v0.8.0

func (c *Connector) Backup(ctx context.Context, writer io.Writer) error

func (*Connector) CDCPublisher added in v0.3.0

func (c *Connector) CDCPublisher() CDCPublisher

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

func (*Connector) ConsistentReader added in v0.1.0

func (c *Connector) ConsistentReader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler

func (*Connector) ConsistentReaderFunc added in v0.1.0

func (c *Connector) ConsistentReaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc

func (*Connector) DB added in v0.4.5

func (c *Connector) DB() *sql.DB

func (*Connector) DeliveredInfo

func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

func (*Connector) ForwardToLeader added in v0.1.0

func (c *Connector) ForwardToLeader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler

func (*Connector) ForwardToLeaderFunc added in v0.1.0

func (c *Connector) ForwardToLeaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc

func (*Connector) HistoryBySeq added in v0.10.5

func (c *Connector) HistoryBySeq(ctx context.Context, startSeq uint64) ([]haconnect.HistoryItem, error)

func (*Connector) HistoryByTime added in v0.10.5

func (c *Connector) HistoryByTime(ctx context.Context, duration time.Duration) ([]haconnect.HistoryItem, error)

func (*Connector) LatestSeq

func (c *Connector) LatestSeq() uint64

func (*Connector) LatestSnapshot

func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*Connector) LeaderProvider added in v0.1.1

func (c *Connector) LeaderProvider() LeaderProvider

func (*Connector) NodeName added in v0.0.10

func (c *Connector) NodeName() string

func (*Connector) ProxiedDB added in v0.11.4

func (c *Connector) ProxiedDB() *sql.DB

func (*Connector) PubSeq added in v0.4.5

func (c *Connector) PubSeq() uint64

func (*Connector) Publisher added in v0.0.10

func (c *Connector) Publisher() Publisher

func (*Connector) RemoveConsumer

func (c *Connector) RemoveConsumer(ctx context.Context, name string) error

func (*Connector) ResponseWriter added in v0.1.3

func (c *Connector) ResponseWriter(w http.ResponseWriter) http.ResponseWriter

func (*Connector) SetProxiedDB added in v0.11.4

func (c *Connector) SetProxiedDB(db *sql.DB)

func (*Connector) Snapshotter added in v0.0.13

func (c *Connector) Snapshotter() DBSnapshotter

func (*Connector) Start added in v0.1.1

func (c *Connector) Start(db *sql.DB) error

func (*Connector) Subscriber added in v0.0.13

func (c *Connector) Subscriber() Subscriber

func (*Connector) TakeSnapshot

func (c *Connector) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

func (*Connector) UndoBySeq added in v0.10.5

func (c *Connector) UndoBySeq(ctx context.Context, startSeq uint64, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error

func (*Connector) UndoByTime added in v0.10.3

func (c *Connector) UndoByTime(ctx context.Context, duration time.Duration, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error

type DBPublisher added in v0.10.7

type DBPublisher struct {
	// contains filtered or unexported fields
}

func NewDBPublisher added in v0.10.7

func NewDBPublisher(db *sql.DB, maxAge time.Duration) (*DBPublisher, error)

func (*DBPublisher) Close added in v0.10.8

func (p *DBPublisher) Close() error

func (*DBPublisher) Publish added in v0.10.7

func (p *DBPublisher) Publish(cs *ChangeSet) error

func (*DBPublisher) Sequence added in v0.10.7

func (p *DBPublisher) Sequence() uint64

type DBSnapshotter added in v0.0.6

type DBSnapshotter interface {
	DB() *sql.DB
	SetDB(*sql.DB)
	Start()
	TakeSnapshot(ctx context.Context) (sequence uint64, err error)
	LatestSnapshot(ctx context.Context) (sequence uint64, reader io.ReadCloser, err error)
}

type DBSubscriber added in v0.10.7

type DBSubscriber struct {
	// contains filtered or unexported fields
}

func NewDBSubscriber added in v0.10.7

func NewDBSubscriber(cfg DBSubscriberConfig) (*DBSubscriber, error)

func (*DBSubscriber) DB added in v0.11.0

func (s *DBSubscriber) DB() *sql.DB

func (*DBSubscriber) DeliveredInfo added in v0.10.7

func (s *DBSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*DBSubscriber) HistoryBySeq added in v0.10.7

func (s *DBSubscriber) HistoryBySeq(ctx context.Context, startSeq uint64) ([]haconnect.HistoryItem, error)

func (*DBSubscriber) HistoryByTime added in v0.10.7

func (s *DBSubscriber) HistoryByTime(ctx context.Context, duration time.Duration) ([]haconnect.HistoryItem, error)

func (*DBSubscriber) LatestSeq added in v0.10.7

func (s *DBSubscriber) LatestSeq() uint64

func (*DBSubscriber) RemoveConsumer added in v0.10.7

func (s *DBSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*DBSubscriber) SetDB added in v0.10.7

func (s *DBSubscriber) SetDB(db *sql.DB)

func (*DBSubscriber) Start added in v0.10.7

func (s *DBSubscriber) Start() error

func (*DBSubscriber) UndoBySeq added in v0.10.7

func (s *DBSubscriber) UndoBySeq(ctx context.Context, startSeq uint64, filter haconnect.UndoFilter, filterEntities map[string][]int64) error

func (*DBSubscriber) UndoByTime added in v0.10.7

func (s *DBSubscriber) UndoByTime(ctx context.Context, duration time.Duration, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error

type DBSubscriberConfig added in v0.10.7

type DBSubscriberConfig struct {
	HistoryDB    *sql.DB
	DB           *sql.DB
	ConnProvider ConnHooksProvider
	Interceptor  ChangeSetInterceptor
	RowIdentify  RowIdentify
}

type DebeziumData added in v0.3.0

type DebeziumData struct {
	Schema      any                 `json:"schema"`
	Payload     DebeziumPayload     `json:"payload"`
	Transaction DebeziumTransaction `json:"transaction"`
}

type DebeziumPayload added in v0.3.0

type DebeziumPayload struct {
	Before map[string]any `json:"before"`
	After  map[string]any `json:"after"`
	Source DebeziumSource `json:"source"`
	Op     string         `json:"op"`
	TsNs   int64          `json:"ts_ns"`
}

type DebeziumSource added in v0.3.0

type DebeziumSource struct {
	Version   string `json:"version"`
	Connector string `json:"connector"`
	Name      string `json:"name"`
	ServerID  int64  `json:"server_id"`
	TsNs      int64  `json:"ts_ns"`
	DB        string `json:"db"`
	Table     string `json:"table"`
}

type DebeziumTransaction added in v0.3.0

type DebeziumTransaction struct {
	ID string `json:"id"`
}

type DynamicLeader added in v0.1.1

type DynamicLeader struct {
	// contains filtered or unexported fields
}

func (*DynamicLeader) IsLeader added in v0.1.1

func (d *DynamicLeader) IsLeader() bool

func (*DynamicLeader) Ready added in v0.1.1

func (d *DynamicLeader) Ready() chan struct{}

func (*DynamicLeader) RedirectTarget added in v0.1.1

func (d *DynamicLeader) RedirectTarget() string

type EmbeddedNatsConfig

type EmbeddedNatsConfig struct {
	Name           string
	Port           int
	StoreDir       string
	User           string
	Pass           string
	File           string
	WebSocketPort  int
	WebSocketNoTLS bool
	EnableLogs     bool
}

type JSONPublisher added in v0.0.6

type JSONPublisher struct {
	// contains filtered or unexported fields
}

func NewJSONPublisher added in v0.0.6

func NewJSONPublisher(w io.Writer) *JSONPublisher

func (*JSONPublisher) Publish added in v0.0.6

func (p *JSONPublisher) Publish(cs *ChangeSet) error

func (*JSONPublisher) Sequence added in v0.0.20

func (p *JSONPublisher) Sequence() uint64

type LeaderProvider added in v0.1.0

type LeaderProvider interface {
	IsLeader() bool
	Ready() chan struct{}
	RedirectTarget() string
}

type NATSPublisher added in v0.0.6

type NATSPublisher struct {
	// contains filtered or unexported fields
}

func NewNATSPublisher added in v0.0.6

func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)

func (*NATSPublisher) Publish added in v0.0.6

func (p *NATSPublisher) Publish(cs *ChangeSet) error

func (*NATSPublisher) Sequence added in v0.0.20

func (p *NATSPublisher) Sequence() uint64

type NATSSnapshotter added in v0.0.6

type NATSSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNATSSnapshotter added in v0.0.6

func NewNATSSnapshotter(ctx context.Context, nc *nats.Conn, replicas int, stream string, db *sql.DB, backupFn BackupFn, interval time.Duration, sequenceProvider SequenceProvider, objectName string) (*NATSSnapshotter, error)

func (*NATSSnapshotter) DB added in v0.11.0

func (s *NATSSnapshotter) DB() *sql.DB

func (*NATSSnapshotter) LatestSnapshot added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NATSSnapshotter) LatestSnapshotSequence added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)

func (*NATSSnapshotter) SetDB added in v0.0.13

func (s *NATSSnapshotter) SetDB(db *sql.DB)

func (*NATSSnapshotter) Start added in v0.0.13

func (s *NATSSnapshotter) Start()

func (*NATSSnapshotter) TakeSnapshot added in v0.0.6

func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NATSSubscriber added in v0.0.6

type NATSSubscriber struct {
	// contains filtered or unexported fields
}

func NewNATSSubscriber added in v0.0.6

func NewNATSSubscriber(cfg NATSSubscriberConfig) (*NATSSubscriber, error)

func (*NATSSubscriber) DB added in v0.11.0

func (s *NATSSubscriber) DB() *sql.DB

func (*NATSSubscriber) DeliveredInfo added in v0.0.6

func (s *NATSSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NATSSubscriber) HistoryBySeq added in v0.10.5

func (s *NATSSubscriber) HistoryBySeq(ctx context.Context, startSeq uint64) ([]haconnect.HistoryItem, error)

func (*NATSSubscriber) HistoryByTime added in v0.10.5

func (s *NATSSubscriber) HistoryByTime(ctx context.Context, duration time.Duration) ([]haconnect.HistoryItem, error)

func (*NATSSubscriber) LatestSeq added in v0.0.6

func (s *NATSSubscriber) LatestSeq() uint64

func (*NATSSubscriber) RemoveConsumer added in v0.0.6

func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NATSSubscriber) SetDB added in v0.0.13

func (s *NATSSubscriber) SetDB(db *sql.DB)

func (*NATSSubscriber) Start added in v0.0.6

func (s *NATSSubscriber) Start() error

func (*NATSSubscriber) UndoBySeq added in v0.10.5

func (s *NATSSubscriber) UndoBySeq(ctx context.Context, startSeq uint64, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error

func (*NATSSubscriber) UndoByTime added in v0.10.3

func (s *NATSSubscriber) UndoByTime(ctx context.Context, duration time.Duration, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error

type NATSSubscriberConfig added in v0.0.21

type NATSSubscriberConfig struct {
	Node         string
	Durable      string
	NatsConn     *nats.Conn
	Stream       string
	Subject      string
	Policy       string
	DB           *sql.DB
	ConnProvider ConnHooksProvider
	Interceptor  ChangeSetInterceptor
	RowIdentify  RowIdentify
}

type NoopPublisher added in v0.0.6

type NoopPublisher struct{}

func NewNoopPublisher added in v0.0.6

func NewNoopPublisher() *NoopPublisher

func (*NoopPublisher) Publish added in v0.0.6

func (p *NoopPublisher) Publish(cs *ChangeSet) error

func (*NoopPublisher) Sequence added in v0.0.20

func (p *NoopPublisher) Sequence() uint64

type NoopSnapshotter added in v0.0.6

type NoopSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNoopSnapshotter added in v0.0.6

func NewNoopSnapshotter() *NoopSnapshotter

func (*NoopSnapshotter) DB added in v0.11.0

func (s *NoopSnapshotter) DB() *sql.DB

func (*NoopSnapshotter) LatestSnapshot added in v0.0.6

func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NoopSnapshotter) SetDB added in v0.0.13

func (s *NoopSnapshotter) SetDB(_ *sql.DB)

func (*NoopSnapshotter) Start added in v0.0.13

func (s *NoopSnapshotter) Start()

func (*NoopSnapshotter) TakeSnapshot added in v0.0.6

func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NoopSubscriber added in v0.0.6

type NoopSubscriber struct{}

func NewNoopSubscriber added in v0.0.6

func NewNoopSubscriber() *NoopSubscriber

func (*NoopSubscriber) DB added in v0.11.0

func (s *NoopSubscriber) DB() *sql.DB

func (*NoopSubscriber) DeliveredInfo added in v0.0.6

func (*NoopSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NoopSubscriber) HistoryBySeq added in v0.10.5

func (*NoopSubscriber) HistoryBySeq(ctx context.Context, startSeq uint64) ([]haconnect.HistoryItem, error)

func (*NoopSubscriber) HistoryByTime added in v0.10.5

func (*NoopSubscriber) HistoryByTime(ctx context.Context, duration time.Duration) ([]haconnect.HistoryItem, error)

func (*NoopSubscriber) LatestSeq added in v0.0.6

func (s *NoopSubscriber) LatestSeq() uint64

func (*NoopSubscriber) RemoveConsumer added in v0.0.6

func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NoopSubscriber) SetDB added in v0.0.13

func (s *NoopSubscriber) SetDB(db *sql.DB)

func (*NoopSubscriber) Start added in v0.0.6

func (s *NoopSubscriber) Start() error

func (*NoopSubscriber) UndoBySeq added in v0.10.5

func (*NoopSubscriber) UndoBySeq(ctx context.Context, startSeq uint64, filter haconnect.UndoFilter, filterEntities map[string][]int64) error

func (*NoopSubscriber) UndoByTime added in v0.10.3

func (*NoopSubscriber) UndoByTime(ctx context.Context, duration time.Duration, filter haconnect.UndoFilter, filterEntities map[string][]int64) error

type Option

type Option func(*Connector)

func NameToOptions added in v0.0.10

func NameToOptions(name string) (string, []Option, error)

func WithAsyncPublisher added in v0.0.15

func WithAsyncPublisher() Option

func WithAsyncPublisherOutboxDir added in v0.0.15

func WithAsyncPublisherOutboxDir(dir string) Option

func WithAutoStart added in v0.1.0

func WithAutoStart(enabled bool) Option

func WithCDCPublisher

func WithCDCPublisher(p CDCPublisher) Option

func WithChangeSetInterceptor added in v0.0.6

func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option

func WithClusterSize added in v0.1.1

func WithClusterSize(size int) Option

func WithDBSnapshotter added in v0.0.6

func WithDBSnapshotter(snap DBSnapshotter) Option

func WithDeliverPolicy

func WithDeliverPolicy(deliverPolicy string) Option

func WithDisableDDLSync added in v0.0.4

func WithDisableDDLSync() Option

func WithEmbeddedNatsConfig

func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option

func WithExtensions

func WithExtensions(extensions ...string) Option

func WithGrpcInsecure added in v0.9.2

func WithGrpcInsecure(insecure bool) Option

func WithGrpcPort added in v0.4.5

func WithGrpcPort(port int) Option

func WithGrpcTimeout added in v0.4.7

func WithGrpcTimeout(timeout time.Duration) Option

func WithGrpcToken added in v0.7.0

func WithGrpcToken(token string) Option

func WithLeaderElectionLocalTarget added in v0.1.1

func WithLeaderElectionLocalTarget(localEndpoint string) Option

func WithLeaderProvider added in v0.1.1

func WithLeaderProvider(p LeaderProvider) Option

func WithName

func WithName(name string) Option

func WithNatsOptions

func WithNatsOptions(options ...nats.Option) Option

func WithProxiedDB added in v0.11.4

func WithProxiedDB(db *sql.DB) Option

func WithPublisherTimeout

func WithPublisherTimeout(timeout time.Duration) Option

func WithQueryRouter added in v0.5.5

func WithQueryRouter(re *regexp.Regexp) Option

func WithReplicas

func WithReplicas(replicas int) Option

func WithReplicationID added in v0.3.0

func WithReplicationID(id string) Option

func WithReplicationPublisher added in v0.3.0

func WithReplicationPublisher(pub Publisher) Option

func WithReplicationStream added in v0.0.9

func WithReplicationStream(stream string) Option

func WithReplicationSubscriber added in v0.3.0

func WithReplicationSubscriber(sub Subscriber) Option

func WithReplicationURL

func WithReplicationURL(url string) Option

func WithRowIdentify added in v0.0.21

func WithRowIdentify(i RowIdentify) Option

func WithSnapshotInterval

func WithSnapshotInterval(interval time.Duration) Option

func WithStreamMaxAge

func WithStreamMaxAge(maxAge time.Duration) Option

func WithWaitFor added in v0.0.4

func WithWaitFor(ch chan struct{}) Option

type Publisher added in v0.3.0

type Publisher interface {
	Publish(cs *ChangeSet) error
	Sequence() uint64
}

type RowIdentify added in v0.0.21

type RowIdentify string
const (
	PK    RowIdentify = "pk"
	Rowid RowIdentify = "rowid"
	Full  RowIdentify = "full"
)

type SequenceProvider

type SequenceProvider interface {
	LatestSeq() uint64
}

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

func Parse

func Parse(ctx context.Context, sql string) ([]*Statement, error)

func ParseStatement added in v0.0.4

func ParseStatement(ctx context.Context, source string) (*Statement, error)

func UnverifiedStatement added in v0.0.12

func UnverifiedStatement(source string, hasDistinct bool,
	hasReturning bool, typ string, parameters []string, columns []string,
	ddl bool, hasIfExists bool, hasModifier bool,
	modifiesDatabase bool, aggregateFunctions map[int]*sql.Call) *Statement

func (*Statement) AggregateFunctions added in v0.6.5

func (s *Statement) AggregateFunctions() map[int]*sql.Call

func (*Statement) Begin

func (s *Statement) Begin() bool

func (*Statement) Columns

func (s *Statement) Columns() []string

func (*Statement) Commit

func (s *Statement) Commit() bool

func (*Statement) DDL

func (s *Statement) DDL() bool

func (*Statement) HasDistinct

func (s *Statement) HasDistinct() bool

func (*Statement) HasReturning

func (s *Statement) HasReturning() bool

func (*Statement) IsCreateTable

func (s *Statement) IsCreateTable() bool

func (*Statement) IsDelete

func (s *Statement) IsDelete() bool

func (*Statement) IsExplain

func (s *Statement) IsExplain() bool

func (*Statement) IsInsert

func (s *Statement) IsInsert() bool

func (*Statement) IsSelect

func (s *Statement) IsSelect() bool

func (*Statement) IsUpdate

func (s *Statement) IsUpdate() bool

func (*Statement) Limit added in v0.5.9

func (s *Statement) Limit() int

func (*Statement) ModifiesDatabase added in v0.0.12

func (s *Statement) ModifiesDatabase() bool

func (*Statement) OrderBy added in v0.5.6

func (s *Statement) OrderBy() []string

func (*Statement) Parameters

func (s *Statement) Parameters() []string

func (*Statement) RewriteQueryToAggregate added in v0.6.5

func (s *Statement) RewriteQueryToAggregate() (query string, aggregateFunctions map[int]*sql.Call, newColumns map[int]int, err error)

func (*Statement) Rollback

func (s *Statement) Rollback() bool

func (*Statement) Source

func (s *Statement) Source() string

func (*Statement) SourceWithIfExists added in v0.0.10

func (s *Statement) SourceWithIfExists() string

func (*Statement) Type

func (s *Statement) Type() string

func (*Statement) Visit added in v0.6.0

func (s *Statement) Visit(n sql.Node) (w sql.Visitor, node sql.Node, err error)

func (*Statement) VisitEnd added in v0.6.0

func (s *Statement) VisitEnd(n sql.Node) (sql.Node, error)

type StaticLeader added in v0.1.0

type StaticLeader struct {
	Target string
}

func (*StaticLeader) IsLeader added in v0.1.0

func (s *StaticLeader) IsLeader() bool

func (*StaticLeader) Ready added in v0.1.1

func (s *StaticLeader) Ready() chan struct{}

func (*StaticLeader) RedirectTarget added in v0.1.0

func (s *StaticLeader) RedirectTarget() string

type Subscriber added in v0.3.0

type Subscriber interface {
	TxSeqTracker
	DB() *sql.DB
	SetDB(*sql.DB)
	Start() error
	RemoveConsumer(ctx context.Context, name string) error
	DeliveredInfo(ctx context.Context, name string) (any, error)
	HistoryBySeq(ctx context.Context, startSeq uint64) ([]haconnect.HistoryItem, error)
	HistoryByTime(ctx context.Context, duration time.Duration) ([]haconnect.HistoryItem, error)
	UndoBySeq(ctx context.Context, startSeq uint64, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error
	UndoByTime(ctx context.Context, duration time.Duration, filterType haconnect.UndoFilter, filterEntities map[string][]int64) error
}

type TxSeqTracker added in v0.4.8

type TxSeqTracker interface {
	LatestSeq() uint64
}

type TxSeqTrackerProvider added in v0.4.8

type TxSeqTrackerProvider func() TxSeqTracker

type WriterPublisher added in v0.0.6

type WriterPublisher struct {
	// contains filtered or unexported fields
}

func NewWriterPublisher added in v0.0.6

func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher

func (*WriterPublisher) Publish added in v0.0.6

func (p *WriterPublisher) Publish(cs *ChangeSet) error

func (*WriterPublisher) Sequence added in v0.0.20

func (p *WriterPublisher) Sequence() uint64

Directories

Path Synopsis
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL