ha

package module
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2025 License: Apache-2.0 Imports: 25 Imported by: 6

README

go-ha

Go database/sql driver based on github.com/mattn/go-sqlite3, providing high availability for SQLite databases.

Features

  • Built on the robust foundation of go-sqlite3.
  • High availability support for SQLite databases.
  • Replication: Synchronize data across nodes using NATS.
  • Customize the replication strategy
  • Leaderless clusters: Read/Write from/to any node. Last-writer wins by default, but you can customize conflict resolutions by implementing ChangeSetInterceptor.
  • Embedded or External NATS: Choose between an embedded NATS server or an external one for replication.
  • Easy to integrate with existing Go projects.

Installation

go get github.com/litesql/go-ha

Usage

package main

import (
    "database/sql"
    _ "github.com/litesql/go-ha"
)

func main() {
    db, err := sql.Open("ha", "file:example.db?_journal=WAL&_timeout=5000&replicationURL=nats://broker:4222&name=node0")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    // Use db to interact with your database
}
Using Connector
package main

import (
    "database/sql"
    "github.com/litesql/go-ha"
)

func main() {
    c, err := ha.NewConnector("file:example.db?_journal=WAL&_timeout=5000",
		ha.WithName("node1"),
		ha.WithEmbeddedNatsConfig(&ha.EmbeddedNatsConfig{
			Port: 4222,
		}))
	if err != nil {
		panic(err)
	}
	defer c.Close()

	db := sql.OpenDB(c)
	defer db.Close()

    // Use db to interact with your database
}

Configuration

Connector option DSN param Description Default
WithName name Unique node name $HOSTNAME
WithReplicationURL replicationURL NATS connection URL. (nats://localhost:4222)
WithEmbeddedNatsConfig
  • natsPort
  • natsStoreDir
  • natsConfigFile
NATS embedded server config
WithDisableDDLSync disableDDLSync Disable replication of DDL commands
WithPublisherTimeout publisherTimeout Publisher timeout 15s
WithChangeSetInterceptor Customize the replication behaviour
WithExtensions SQLite extensions to load

Projects using go-ha

  • HA: Highly available leaderless SQLite cluster with HTTP and PostgreSQL Wire Protocol
  • PocketBase HA: Highly available leaderless PocketBase cluster

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	TypeExplain            = "EXPLAIN"
	TypeSelect             = "SELECT"
	TypeInsert             = "INSERT"
	TypeUpdate             = "UPDATE"
	TypeDelete             = "DELETE"
	TypeCreateTable        = "CREATE TABLE"
	TypeCreateIndex        = "CREATE INDEX"
	TypeCreateView         = "CREATE VIEW"
	TypeCreateTrigger      = "CREATE TRIGGER"
	TypeCreateVirtualTable = "CREATE VIRTUAL TABLE"
	TypeAlterTable         = "ALTER TABLE"
	TypeVacuum             = "VACUUM"
	TypeDrop               = "DROP"
	TypeAnalyze            = "ANALYZE"
	TypeBegin              = "BEGIN"
	TypeCommit             = "COMMIT"
	TypeRollback           = "ROLLBACK"
	TypeSavepoint          = "SAVEPOINT"
	TypeRelease            = "RELEASE"
	TypeOther              = "OTHER"
)
View Source
const DefaultStream = "ha_replication"

Variables

View Source
var (
	ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrNatsNotConfigured = errors.New("NATS not configured")

Functions

func LatestSnapshot added in v0.0.4

func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)

Types

type BackupFn added in v0.0.10

type BackupFn func(context.Context, *sql.DB, io.Writer) error

type CDCPublisher

type CDCPublisher interface {
	Publish(cs *ChangeSet) error
}

type CDCSubscriber added in v0.0.6

type CDCSubscriber interface {
	Start() error
	LatestSeq() uint64
	RemoveConsumer(ctx context.Context, name string) error
	DeliveredInfo(ctx context.Context, name string) (any, error)
}

type Change

type Change struct {
	Database  string   `json:"database,omitempty"`
	Table     string   `json:"table,omitempty"`
	Columns   []string `json:"columns,omitempty"`
	Operation string   `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
	OldRowID  int64    `json:"old_rowid,omitempty"`
	NewRowID  int64    `json:"new_rowid,omitempty"`
	OldValues []any    `json:"old_values,omitempty"`
	NewValues []any    `json:"new_values,omitempty"`
	Command   string   `json:"command,omitempty"`
	Args      []any    `json:"args,omitempty"`
}

type ChangeSet

type ChangeSet struct {
	Node      string   `json:"node"`
	ProcessID int64    `json:"process_id"`
	Filename  string   `json:"filename"`
	Changes   []Change `json:"changes"`
	Timestamp int64    `json:"timestamp_ns"`
	StreamSeq uint64   `json:"-"`
	// contains filtered or unexported fields
}

func NewChangeSet

func NewChangeSet(node string, filename string) *ChangeSet

func (*ChangeSet) AddChange

func (cs *ChangeSet) AddChange(change Change)

func (*ChangeSet) Apply

func (cs *ChangeSet) Apply(db *sql.DB) (err error)

func (*ChangeSet) Clear

func (cs *ChangeSet) Clear()

func (*ChangeSet) Send

func (cs *ChangeSet) Send(pub CDCPublisher) error

func (*ChangeSet) SetConnProvider added in v0.0.10

func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)

func (*ChangeSet) SetInterceptor added in v0.0.6

func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)

type ChangeSetInterceptor added in v0.0.6

type ChangeSetInterceptor interface {
	BeforeApply(*ChangeSet, *sql.Conn) (skip bool, err error)
	AfterApply(*ChangeSet, *sql.Conn, error) error
}

type ChangeSetSerializer added in v0.0.6

type ChangeSetSerializer func(*ChangeSet) ([]byte, error)

type ConnHooksFactory added in v0.0.10

type ConnHooksFactory func(nodeName string, filename string, disableDDSSync bool, publisher CDCPublisher) ConnHooksProvider

type ConnHooksProvider added in v0.0.10

type ConnHooksProvider interface {
	RegisterHooks(driver.Conn) (driver.Conn, error)
	DisableHooks(*sql.Conn) error
	EnableHooks(*sql.Conn) error
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector(dsn string, driver driver.Driver, connHooksFactory ConnHooksFactory, backupFn BackupFn, options ...Option) (*Connector, error)

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

func (*Connector) DeliveredInfo

func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

func (*Connector) LatestSeq

func (c *Connector) LatestSeq() uint64

func (*Connector) LatestSnapshot

func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*Connector) NodeName added in v0.0.10

func (c *Connector) NodeName() string

func (*Connector) Publisher added in v0.0.10

func (c *Connector) Publisher() CDCPublisher

func (*Connector) RemoveConsumer

func (c *Connector) RemoveConsumer(ctx context.Context, name string) error

func (*Connector) TakeSnapshot

func (c *Connector) TakeSnapshot(ctx context.Context, db *sql.DB) (sequence uint64, err error)

type DBSnapshotter added in v0.0.6

type DBSnapshotter interface {
	TakeSnapshot(ctx context.Context, db *sql.DB) (sequence uint64, err error)
	LatestSnapshot(ctx context.Context) (sequence uint64, reader io.ReadCloser, err error)
}

type DriverProvider added in v0.0.10

type DriverProvider interface {
	driver.Driver
	ConnWithoutHooks() (*sql.Conn, error)
	EnableHooks(conn *sql.Conn)
	OnConnect(c driver.Conn) (driver.Conn, error)
}

type EmbeddedNatsConfig

type EmbeddedNatsConfig struct {
	Name       string
	Port       int
	StoreDir   string
	User       string
	Pass       string
	File       string
	EnableLogs bool
}

type JSONPublisher added in v0.0.6

type JSONPublisher struct {
	// contains filtered or unexported fields
}

func NewJSONPublisher added in v0.0.6

func NewJSONPublisher(w io.Writer) *JSONPublisher

func (*JSONPublisher) Publish added in v0.0.6

func (p *JSONPublisher) Publish(cs *ChangeSet) error

type NATSPublisher added in v0.0.6

type NATSPublisher struct {
	// contains filtered or unexported fields
}

func NewNATSPublisher added in v0.0.6

func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)

func (*NATSPublisher) Publish added in v0.0.6

func (p *NATSPublisher) Publish(cs *ChangeSet) error

type NATSSnapshotter added in v0.0.6

type NATSSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNATSSnapshotter added in v0.0.6

func NewNATSSnapshotter(ctx context.Context, nc *nats.Conn, replicas int, stream string, db *sql.DB, backupFn BackupFn, interval time.Duration, sequenceProvider SequenceProvider, objectName string) (*NATSSnapshotter, error)

func (*NATSSnapshotter) LatestSnapshot added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NATSSnapshotter) LatestSnapshotSequence added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)

func (*NATSSnapshotter) TakeSnapshot added in v0.0.6

func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context, db *sql.DB) (sequence uint64, err error)

type NATSSubscriber added in v0.0.6

type NATSSubscriber struct {
	// contains filtered or unexported fields
}

func NewNATSSubscriber added in v0.0.6

func NewNATSSubscriber(node string, durable string, nc *nats.Conn, stream, subject string, policy string, db *sql.DB, connProvider ConnHooksProvider, interceptor ChangeSetInterceptor) (*NATSSubscriber, error)

func (*NATSSubscriber) DeliveredInfo added in v0.0.6

func (s *NATSSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NATSSubscriber) LatestSeq added in v0.0.6

func (s *NATSSubscriber) LatestSeq() uint64

func (*NATSSubscriber) RemoveConsumer added in v0.0.6

func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NATSSubscriber) Start added in v0.0.6

func (s *NATSSubscriber) Start() error

type NoopPublisher added in v0.0.6

type NoopPublisher struct{}

func NewNoopPublisher added in v0.0.6

func NewNoopPublisher() *NoopPublisher

func (*NoopPublisher) Publish added in v0.0.6

func (p *NoopPublisher) Publish(cs *ChangeSet) error

type NoopSnapshotter added in v0.0.6

type NoopSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNoopSnapshotter added in v0.0.6

func NewNoopSnapshotter() *NoopSnapshotter

func (*NoopSnapshotter) LatestSnapshot added in v0.0.6

func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NoopSnapshotter) TakeSnapshot added in v0.0.6

func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context, db *sql.DB) (sequence uint64, err error)

type NoopSubscriber added in v0.0.6

type NoopSubscriber struct{}

func NewNoopSubscriber added in v0.0.6

func NewNoopSubscriber() *NoopSubscriber

func (*NoopSubscriber) DeliveredInfo added in v0.0.6

func (*NoopSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NoopSubscriber) LatestSeq added in v0.0.6

func (*NoopSubscriber) LatestSeq() uint64

func (*NoopSubscriber) RemoveConsumer added in v0.0.6

func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NoopSubscriber) Start added in v0.0.6

func (*NoopSubscriber) Start() error

type Option

type Option func(*Connector)

func NameToOptions added in v0.0.10

func NameToOptions(name string) (string, []Option, error)

func WithCDCPublisher

func WithCDCPublisher(pub CDCPublisher) Option

func WithCDCSubscriber added in v0.0.6

func WithCDCSubscriber(sub CDCSubscriber) Option

func WithChangeSetInterceptor added in v0.0.6

func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option

func WithDBSnapshotter added in v0.0.6

func WithDBSnapshotter(snap DBSnapshotter) Option

func WithDeliverPolicy

func WithDeliverPolicy(deliverPolicy string) Option

func WithDisableDDLSync added in v0.0.4

func WithDisableDDLSync() Option

func WithEmbeddedNatsConfig

func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option

func WithExtensions

func WithExtensions(extensions ...string) Option

func WithName

func WithName(name string) Option

func WithNatsOptions

func WithNatsOptions(options ...nats.Option) Option

func WithPublisherTimeout

func WithPublisherTimeout(timeout time.Duration) Option

func WithReplicas

func WithReplicas(replicas int) Option

func WithReplicationStream added in v0.0.9

func WithReplicationStream(stream string) Option

func WithReplicationURL

func WithReplicationURL(url string) Option

func WithSnapshotInterval

func WithSnapshotInterval(interval time.Duration) Option

func WithStreamMaxAge

func WithStreamMaxAge(maxAge time.Duration) Option

func WithWaitFor added in v0.0.4

func WithWaitFor(ch chan struct{}) Option

type SequenceProvider

type SequenceProvider interface {
	LatestSeq() uint64
}

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

func Parse

func Parse(ctx context.Context, sql string) ([]*Statement, error)

func ParseStatement added in v0.0.4

func ParseStatement(ctx context.Context, sql string) (*Statement, error)

func (*Statement) Begin

func (s *Statement) Begin() bool

func (*Statement) Columns

func (s *Statement) Columns() []string

func (*Statement) Commit

func (s *Statement) Commit() bool

func (*Statement) DDL

func (s *Statement) DDL() bool

func (*Statement) HasDistinct

func (s *Statement) HasDistinct() bool

func (*Statement) HasReturning

func (s *Statement) HasReturning() bool

func (*Statement) IsCreateTable

func (s *Statement) IsCreateTable() bool

func (*Statement) IsDelete

func (s *Statement) IsDelete() bool

func (*Statement) IsExplain

func (s *Statement) IsExplain() bool

func (*Statement) IsInsert

func (s *Statement) IsInsert() bool

func (*Statement) IsSelect

func (s *Statement) IsSelect() bool

func (*Statement) IsUpdate

func (s *Statement) IsUpdate() bool

func (*Statement) Parameters

func (s *Statement) Parameters() []string

func (*Statement) Rollback

func (s *Statement) Rollback() bool

func (*Statement) Source

func (s *Statement) Source() string

func (*Statement) SourceWithIfExists added in v0.0.10

func (s *Statement) SourceWithIfExists() string

func (*Statement) Type

func (s *Statement) Type() string

type WriterPublisher added in v0.0.6

type WriterPublisher struct {
	// contains filtered or unexported fields
}

func NewWriterPublisher added in v0.0.6

func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher

func (*WriterPublisher) Publish added in v0.0.6

func (p *WriterPublisher) Publish(cs *ChangeSet) error

Directories

Path Synopsis
_examples
node1 command
node2 command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL