ha

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: Apache-2.0 Imports: 33 Imported by: 6

README

go-ha

Go database/sql base driver providing high availability for SQLite databases.

Features

  • High availability support for SQLite databases.
  • Replication: Synchronize data across nodes using NATS.
  • Customize the replication strategy
  • Leaderless clusters: Read/Write from/to any node. Last-writer wins by default, but you can customize conflict resolutions by implementing ChangeSetInterceptor.
  • Leader-based cluster: Write operations are redirected to the leader to prevent conflicts entirely.
  • Embedded or External NATS: Choose between an embedded NATS server or an external one for replication.
  • Easy to integrate with existing Go projects.

Drivers

Options

DSN Param Environment Variable Description Default
asyncPublisher HA_ASYNC_PUBLISHER Enables asynchronous publishing of replication events. false
asyncPublisherOutboxDir HA_ASYNC_PUBLISHER_OUTBOX_DIR Directory to store outbox files for asynchronous publishing.
autoStart HA_AUTO_START Automatically starts the subscriber and snapshotter when the node is initialized. true
replicationID HA_REPLICATION_ID Replication ID [database filename]
deliverPolicy HA_DELIVER_POLICY Specifies the delivery policy for replication events. Options include all, last, etc. all
disableSubscriber HA_DISABLE_SUBSCRIBER Disables the subscriber for replication. false
disablePublisher HA_DISABLE_PUBLISHER Disables the publisher for replication. false
disableDBSnapshotter HA_DISABLE_SNAPSHOTTER Disables the database snapshotter used for initial synchronization. false
disableDDLSync HA_DISABLE_DDL_SYNC Disables the synchronization of DDL (Data Definition Language) changes across nodes. false
grpcPort HA_GRPC_PORT TCP port for the gRPC server
grpcTimeout HA_GRPC_TIMEOUT Timeout for the gRPC operations 5s
grpcToken HA_GRPC_TOKEN Token to protect gRPC server
leaderProvider HA_LEADER_PROVIDER Defines the strategy for determining a leader node in the cluster. This is useful for redirecting HTTP requests. Examples include dynamic:http://host:port or static:http://host:port.
name HA_NAME Name of the node in the cluster.
natsConfigFile HA_NATS_CONFIG_NAME Path to the configuration file for the embedded NATS server. Overrides others NATS configurations.
natsName HA_NATS_NAME Sets the name of the embedded NATS server.
natsPort HA_NATS_PORT Configures the port for the embedded NATS server. 4222
natsStoreDir HA_NATS_STORE_DIR Directory to store data for the embedded NATS server.
publisherTimeout HA_PUBLISHER_TIMEOUT Timeout duration for publishing replication events. 15s
replicationStream HA_REPLICATION_STREAM Name of the replication stream used for synchronizing data.
replicationURL HA_REPLICATION_URL URL used for connecting to the replication stream.
replicas HA_REPLICAS Number of replicas to maintain for high availability. 1
rowIdentify HA_ROW_IDENTIDY Row identify strategy: pk, rowid or full pk
snapshotInterval HA_SNAPSHOT_INTERVAL Interval for taking database snapshots. 1m
streamMaxAge HA_STREAM_MAX_AGE Maximum age of messages in the replication stream before they are removed.

Projects using go-ha

  • HA: Highly available leaderless SQLite cluster with HTTP and PostgreSQL Wire Protocol
  • PocketBase HA: Highly available leaderless PocketBase cluster
  • sqlc-http: Generate net/http go server from SQL

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	TypeExplain            = "EXPLAIN"
	TypeSelect             = "SELECT"
	TypeInsert             = "INSERT"
	TypeUpdate             = "UPDATE"
	TypeDelete             = "DELETE"
	TypeCreateTable        = "CREATE TABLE"
	TypeCreateIndex        = "CREATE INDEX"
	TypeCreateView         = "CREATE VIEW"
	TypeCreateTrigger      = "CREATE TRIGGER"
	TypeCreateVirtualTable = "CREATE VIRTUAL TABLE"
	TypeAlterTable         = "ALTER TABLE"
	TypeVacuum             = "VACUUM"
	TypeDrop               = "DROP"
	TypeAnalyze            = "ANALYZE"
	TypeBegin              = "BEGIN"
	TypeCommit             = "COMMIT"
	TypeRollback           = "ROLLBACK"
	TypeSavepoint          = "SAVEPOINT"
	TypeRelease            = "RELEASE"
	TypePragma             = "PRAGMA"
	TypeOther              = "OTHER"
)
View Source
const DefaultStream = "ha_replication"
View Source
const TXCookieName = "_txseq"

Variables

View Source
var (
	ErrInvalidSQL = fmt.Errorf("invalid SQL")
)
View Source
var ErrSnapshotterNotConfigured = errors.New("snapshotter not configured")

Functions

func ConnectHandler added in v0.8.0

func ConnectHandler(opts ...connect.HandlerOption) (path string, handler http.Handler)

func CrossShardQuery added in v0.5.8

func CrossShardQuery(ctx context.Context, stmt *Statement, args []driver.NamedValue, queryRouter *regexp.Regexp, driverConn driverConnFunc) (driver.Rows, error)

func LatestSnapshot added in v0.0.4

func LatestSnapshot(ctx context.Context, dsn string, options ...Option) (sequence uint64, reader io.ReadCloser, err error)

func ListDSN added in v0.1.3

func ListDSN() []string

func ListReplicationIDs added in v0.4.5

func ListReplicationIDs() []string

func Shutdown added in v0.0.16

func Shutdown()

Types

type AsyncNATSPublisher added in v0.0.15

type AsyncNATSPublisher struct {
	*NATSPublisher
	// contains filtered or unexported fields
}

func NewAsyncNATSPublisher added in v0.0.15

func NewAsyncNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig, db *sql.DB) (*AsyncNATSPublisher, error)

func (*AsyncNATSPublisher) Close added in v0.0.15

func (p *AsyncNATSPublisher) Close() error

func (*AsyncNATSPublisher) Publish added in v0.0.15

func (p *AsyncNATSPublisher) Publish(cs *ChangeSet) error

func (*AsyncNATSPublisher) Sequence added in v0.0.20

func (p *AsyncNATSPublisher) Sequence() uint64

type BackupFn added in v0.0.10

type BackupFn func(context.Context, *sql.DB, io.Writer) error

type CDCPublisher

type CDCPublisher interface {
	Publish(data []DebeziumData) error
}

type Change

type Change struct {
	Database  string   `json:"database,omitempty"`
	Table     string   `json:"table,omitempty"`
	Columns   []string `json:"columns,omitempty"`
	PKColumns []string `json:"pk_columns,omitempty"`
	Operation string   `json:"operation"` // "INSERT", "UPDATE", "DELETE", "SQL", "CUSTOM"
	OldRowID  int64    `json:"old_rowid,omitempty"`
	NewRowID  int64    `json:"new_rowid,omitempty"`
	OldValues []any    `json:"old_values,omitempty"`
	NewValues []any    `json:"new_values,omitempty"`
	Command   string   `json:"command,omitempty"`
	Args      []any    `json:"args,omitempty"`
	TsNs      int64    `json:"ts_ns,omitempty"`
}

func (Change) PKColumnsNames added in v0.2.0

func (c Change) PKColumnsNames() []string

func (Change) PKNewValues added in v0.2.1

func (c Change) PKNewValues() []any

func (Change) PKOldValues added in v0.2.0

func (c Change) PKOldValues() []any

type ChangeSet

type ChangeSet struct {
	Node      string   `json:"node"`
	ProcessID int64    `json:"process_id"`
	Filename  string   `json:"filename"`
	Changes   []Change `json:"changes"`
	Timestamp int64    `json:"timestamp_ns"`
	Subject   string   `json:"-"`
	StreamSeq uint64   `json:"-"`
	// contains filtered or unexported fields
}

func NewChangeSet

func NewChangeSet(node string, replicationID string) *ChangeSet

func (*ChangeSet) AddChange

func (cs *ChangeSet) AddChange(change Change)

func (*ChangeSet) Apply

func (cs *ChangeSet) Apply(db *sql.DB) (err error)

func (*ChangeSet) Clear

func (cs *ChangeSet) Clear()

func (*ChangeSet) DebeziumData added in v0.3.0

func (cs *ChangeSet) DebeziumData() []DebeziumData

func (*ChangeSet) Send

func (cs *ChangeSet) Send(pub Publisher) error

func (*ChangeSet) SetApplyStrategy added in v0.0.21

func (cs *ChangeSet) SetApplyStrategy(fn applyStrategyFn)

func (*ChangeSet) SetConnProvider added in v0.0.10

func (cs *ChangeSet) SetConnProvider(connProvider ConnHooksProvider)

func (*ChangeSet) SetInterceptor added in v0.0.6

func (cs *ChangeSet) SetInterceptor(interceptor ChangeSetInterceptor)

type ChangeSetInterceptor added in v0.0.6

type ChangeSetInterceptor interface {
	BeforeApply(*ChangeSet, *sql.Conn) (skip bool, err error)
	AfterApply(*ChangeSet, *sql.Conn, error) error
}

type ChangeSetSerializer added in v0.0.6

type ChangeSetSerializer func(*ChangeSet) ([]byte, error)

type ConnHooksConfig added in v0.4.5

type ConnHooksConfig struct {
	NodeName             string
	ReplicationID        string
	DisableDDLSync       bool
	Publisher            Publisher
	CDC                  CDCPublisher
	TxSeqTrackerProvider TxSeqTrackerProvider
	Leader               LeaderProvider
	GrpcTimeout          time.Duration
	GrpcToken            string
	QueryRouter          *regexp.Regexp
}

type ConnHooksFactory added in v0.0.10

type ConnHooksFactory func(cfg ConnHooksConfig) ConnHooksProvider

type ConnHooksProvider added in v0.0.10

type ConnHooksProvider interface {
	RegisterHooks(driver.Conn) (driver.Conn, error)
	DisableHooks(*sql.Conn) error
	EnableHooks(*sql.Conn) error
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func LookupConnector added in v0.1.0

func LookupConnector(dsn string) (*Connector, bool)

func LookupConnectorByReplicationID added in v0.4.5

func LookupConnectorByReplicationID(id string) (*Connector, bool)

func NewConnector

func NewConnector(dsn string, drv driver.Driver, connHooksFactory ConnHooksFactory, backupFn BackupFn, options ...Option) (*Connector, error)

func (*Connector) Backup added in v0.8.0

func (c *Connector) Backup(ctx context.Context, writer io.Writer) error

func (*Connector) CDCPublisher added in v0.3.0

func (c *Connector) CDCPublisher() CDCPublisher

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

func (*Connector) ConsistentReader added in v0.1.0

func (c *Connector) ConsistentReader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler

func (*Connector) ConsistentReaderFunc added in v0.1.0

func (c *Connector) ConsistentReaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc

func (*Connector) DB added in v0.4.5

func (c *Connector) DB() *sql.DB

func (*Connector) DeliveredInfo

func (c *Connector) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

func (*Connector) ForwardToLeader added in v0.1.0

func (c *Connector) ForwardToLeader(timeout time.Duration, methods ...string) func(http.Handler) http.Handler

func (*Connector) ForwardToLeaderFunc added in v0.1.0

func (c *Connector) ForwardToLeaderFunc(h http.HandlerFunc, timeout time.Duration, methods ...string) http.HandlerFunc

func (*Connector) LatestSeq

func (c *Connector) LatestSeq() uint64

func (*Connector) LatestSnapshot

func (c *Connector) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*Connector) LeaderProvider added in v0.1.1

func (c *Connector) LeaderProvider() LeaderProvider

func (*Connector) NodeName added in v0.0.10

func (c *Connector) NodeName() string

func (*Connector) PubSeq added in v0.4.5

func (c *Connector) PubSeq() uint64

func (*Connector) Publisher added in v0.0.10

func (c *Connector) Publisher() Publisher

func (*Connector) RemoveConsumer

func (c *Connector) RemoveConsumer(ctx context.Context, name string) error

func (*Connector) ResponseWriter added in v0.1.3

func (c *Connector) ResponseWriter(w http.ResponseWriter) http.ResponseWriter

func (*Connector) Snapshotter added in v0.0.13

func (c *Connector) Snapshotter() DBSnapshotter

func (*Connector) Start added in v0.1.1

func (c *Connector) Start(db *sql.DB) error

func (*Connector) Subscriber added in v0.0.13

func (c *Connector) Subscriber() Subscriber

func (*Connector) TakeSnapshot

func (c *Connector) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type DBSnapshotter added in v0.0.6

type DBSnapshotter interface {
	SetDB(*sql.DB)
	Start()
	TakeSnapshot(ctx context.Context) (sequence uint64, err error)
	LatestSnapshot(ctx context.Context) (sequence uint64, reader io.ReadCloser, err error)
}

type DebeziumData added in v0.3.0

type DebeziumData struct {
	Schema      any                 `json:"schema"`
	Payload     DebeziumPayload     `json:"payload"`
	Transaction DebeziumTransaction `json:"transaction"`
}

type DebeziumPayload added in v0.3.0

type DebeziumPayload struct {
	Before map[string]any `json:"before"`
	After  map[string]any `json:"after"`
	Source DebeziumSource `json:"source"`
	Op     string         `json:"op"`
	TsNs   int64          `json:"ts_ns"`
}

type DebeziumSource added in v0.3.0

type DebeziumSource struct {
	Version   string `json:"version"`
	Connector string `json:"connector"`
	Name      string `json:"name"`
	ServerID  int64  `json:"server_id"`
	TsNs      int64  `json:"ts_ns"`
	DB        string `json:"db"`
	Table     string `json:"table"`
}

type DebeziumTransaction added in v0.3.0

type DebeziumTransaction struct {
	ID string `json:"id"`
}

type DynamicLeader added in v0.1.1

type DynamicLeader struct {
	// contains filtered or unexported fields
}

func (*DynamicLeader) IsLeader added in v0.1.1

func (d *DynamicLeader) IsLeader() bool

func (*DynamicLeader) Ready added in v0.1.1

func (d *DynamicLeader) Ready() chan struct{}

func (*DynamicLeader) RedirectTarget added in v0.1.1

func (d *DynamicLeader) RedirectTarget() string

type EmbeddedNatsConfig

type EmbeddedNatsConfig struct {
	Name       string
	Port       int
	StoreDir   string
	User       string
	Pass       string
	File       string
	EnableLogs bool
}

type JSONPublisher added in v0.0.6

type JSONPublisher struct {
	// contains filtered or unexported fields
}

func NewJSONPublisher added in v0.0.6

func NewJSONPublisher(w io.Writer) *JSONPublisher

func (*JSONPublisher) Publish added in v0.0.6

func (p *JSONPublisher) Publish(cs *ChangeSet) error

func (*JSONPublisher) Sequence added in v0.0.20

func (p *JSONPublisher) Sequence() uint64

type LeaderProvider added in v0.1.0

type LeaderProvider interface {
	IsLeader() bool
	Ready() chan struct{}
	RedirectTarget() string
}

type NATSPublisher added in v0.0.6

type NATSPublisher struct {
	// contains filtered or unexported fields
}

func NewNATSPublisher added in v0.0.6

func NewNATSPublisher(nc *nats.Conn, subject string, timeout time.Duration, streamConfig *jetstream.StreamConfig) (*NATSPublisher, error)

func (*NATSPublisher) Publish added in v0.0.6

func (p *NATSPublisher) Publish(cs *ChangeSet) error

func (*NATSPublisher) Sequence added in v0.0.20

func (p *NATSPublisher) Sequence() uint64

type NATSSnapshotter added in v0.0.6

type NATSSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNATSSnapshotter added in v0.0.6

func NewNATSSnapshotter(ctx context.Context, nc *nats.Conn, replicas int, stream string, db *sql.DB, backupFn BackupFn, interval time.Duration, sequenceProvider SequenceProvider, objectName string) (*NATSSnapshotter, error)

func (*NATSSnapshotter) LatestSnapshot added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NATSSnapshotter) LatestSnapshotSequence added in v0.0.6

func (s *NATSSnapshotter) LatestSnapshotSequence(ctx context.Context) (uint64, error)

func (*NATSSnapshotter) SetDB added in v0.0.13

func (s *NATSSnapshotter) SetDB(db *sql.DB)

func (*NATSSnapshotter) Start added in v0.0.13

func (s *NATSSnapshotter) Start()

func (*NATSSnapshotter) TakeSnapshot added in v0.0.6

func (s *NATSSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NATSSubscriber added in v0.0.6

type NATSSubscriber struct {
	// contains filtered or unexported fields
}

func NewNATSSubscriber added in v0.0.6

func NewNATSSubscriber(cfg NATSSubscriberConfig) (*NATSSubscriber, error)

func (*NATSSubscriber) DeliveredInfo added in v0.0.6

func (s *NATSSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NATSSubscriber) LatestSeq added in v0.0.6

func (s *NATSSubscriber) LatestSeq() uint64

func (*NATSSubscriber) RemoveConsumer added in v0.0.6

func (s *NATSSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NATSSubscriber) SetDB added in v0.0.13

func (s *NATSSubscriber) SetDB(db *sql.DB)

func (*NATSSubscriber) Start added in v0.0.6

func (s *NATSSubscriber) Start() error

type NATSSubscriberConfig added in v0.0.21

type NATSSubscriberConfig struct {
	Node         string
	Durable      string
	NatsConn     *nats.Conn
	Stream       string
	Subject      string
	Policy       string
	DB           *sql.DB
	ConnProvider ConnHooksProvider
	Interceptor  ChangeSetInterceptor
	RowIdentify  RowIdentify
}

type NoopPublisher added in v0.0.6

type NoopPublisher struct{}

func NewNoopPublisher added in v0.0.6

func NewNoopPublisher() *NoopPublisher

func (*NoopPublisher) Publish added in v0.0.6

func (p *NoopPublisher) Publish(cs *ChangeSet) error

func (*NoopPublisher) Sequence added in v0.0.20

func (p *NoopPublisher) Sequence() uint64

type NoopSnapshotter added in v0.0.6

type NoopSnapshotter struct {
	// contains filtered or unexported fields
}

func NewNoopSnapshotter added in v0.0.6

func NewNoopSnapshotter() *NoopSnapshotter

func (*NoopSnapshotter) LatestSnapshot added in v0.0.6

func (s *NoopSnapshotter) LatestSnapshot(ctx context.Context) (uint64, io.ReadCloser, error)

func (*NoopSnapshotter) SetDB added in v0.0.13

func (s *NoopSnapshotter) SetDB(_ *sql.DB)

func (*NoopSnapshotter) Start added in v0.0.13

func (s *NoopSnapshotter) Start()

func (*NoopSnapshotter) TakeSnapshot added in v0.0.6

func (s *NoopSnapshotter) TakeSnapshot(ctx context.Context) (sequence uint64, err error)

type NoopSubscriber added in v0.0.6

type NoopSubscriber struct{}

func NewNoopSubscriber added in v0.0.6

func NewNoopSubscriber() *NoopSubscriber

func (*NoopSubscriber) DeliveredInfo added in v0.0.6

func (*NoopSubscriber) DeliveredInfo(ctx context.Context, name string) (any, error)

func (*NoopSubscriber) LatestSeq added in v0.0.6

func (*NoopSubscriber) LatestSeq() uint64

func (*NoopSubscriber) RemoveConsumer added in v0.0.6

func (*NoopSubscriber) RemoveConsumer(ctx context.Context, name string) error

func (*NoopSubscriber) SetDB added in v0.0.13

func (*NoopSubscriber) SetDB(db *sql.DB)

func (*NoopSubscriber) Start added in v0.0.6

func (*NoopSubscriber) Start() error

type Option

type Option func(*Connector)

func NameToOptions added in v0.0.10

func NameToOptions(name string) (string, []Option, error)

func WithAsyncPublisher added in v0.0.15

func WithAsyncPublisher() Option

func WithAsyncPublisherOutboxDir added in v0.0.15

func WithAsyncPublisherOutboxDir(dir string) Option

func WithAutoStart added in v0.1.0

func WithAutoStart(enabled bool) Option

func WithCDCPublisher

func WithCDCPublisher(p CDCPublisher) Option

func WithChangeSetInterceptor added in v0.0.6

func WithChangeSetInterceptor(interceptor ChangeSetInterceptor) Option

func WithClusterSize added in v0.1.1

func WithClusterSize(size int) Option

func WithDBSnapshotter added in v0.0.6

func WithDBSnapshotter(snap DBSnapshotter) Option

func WithDeliverPolicy

func WithDeliverPolicy(deliverPolicy string) Option

func WithDisableDDLSync added in v0.0.4

func WithDisableDDLSync() Option

func WithEmbeddedNatsConfig

func WithEmbeddedNatsConfig(cfg *EmbeddedNatsConfig) Option

func WithExtensions

func WithExtensions(extensions ...string) Option

func WithGrpcPort added in v0.4.5

func WithGrpcPort(port int) Option

func WithGrpcTimeout added in v0.4.7

func WithGrpcTimeout(timeout time.Duration) Option

func WithGrpcToken added in v0.7.0

func WithGrpcToken(token string) Option

func WithLeaderElectionLocalTarget added in v0.1.1

func WithLeaderElectionLocalTarget(localEndpoint string) Option

func WithLeaderProvider added in v0.1.1

func WithLeaderProvider(p LeaderProvider) Option

func WithName

func WithName(name string) Option

func WithNatsOptions

func WithNatsOptions(options ...nats.Option) Option

func WithPublisherTimeout

func WithPublisherTimeout(timeout time.Duration) Option

func WithQueryRouter added in v0.5.5

func WithQueryRouter(re *regexp.Regexp) Option

func WithReplicas

func WithReplicas(replicas int) Option

func WithReplicationID added in v0.3.0

func WithReplicationID(id string) Option

func WithReplicationPublisher added in v0.3.0

func WithReplicationPublisher(pub Publisher) Option

func WithReplicationStream added in v0.0.9

func WithReplicationStream(stream string) Option

func WithReplicationSubscriber added in v0.3.0

func WithReplicationSubscriber(sub Subscriber) Option

func WithReplicationURL

func WithReplicationURL(url string) Option

func WithRowIdentify added in v0.0.21

func WithRowIdentify(i RowIdentify) Option

func WithSnapshotInterval

func WithSnapshotInterval(interval time.Duration) Option

func WithStreamMaxAge

func WithStreamMaxAge(maxAge time.Duration) Option

func WithWaitFor added in v0.0.4

func WithWaitFor(ch chan struct{}) Option

type Publisher added in v0.3.0

type Publisher interface {
	Publish(cs *ChangeSet) error
	Sequence() uint64
}

type RowIdentify added in v0.0.21

type RowIdentify string
const (
	PK    RowIdentify = "pk"
	Rowid RowIdentify = "rowid"
	Full  RowIdentify = "full"
)

type SequenceProvider

type SequenceProvider interface {
	LatestSeq() uint64
}

type Statement

type Statement struct {
	// contains filtered or unexported fields
}

func Parse

func Parse(ctx context.Context, sql string) ([]*Statement, error)

func ParseStatement added in v0.0.4

func ParseStatement(ctx context.Context, source string) (*Statement, error)

func UnverifiedStatement added in v0.0.12

func UnverifiedStatement(source string, hasDistinct bool,
	hasReturning bool, typ string, parameters []string, columns []string,
	ddl bool, hasIfExists bool, hasModifier bool,
	modifiesDatabase bool, aggregateFunctions map[int]*sql.Call) *Statement

func (*Statement) AggregateFunctions added in v0.6.5

func (s *Statement) AggregateFunctions() map[int]*sql.Call

func (*Statement) Begin

func (s *Statement) Begin() bool

func (*Statement) Columns

func (s *Statement) Columns() []string

func (*Statement) Commit

func (s *Statement) Commit() bool

func (*Statement) DDL

func (s *Statement) DDL() bool

func (*Statement) HasDistinct

func (s *Statement) HasDistinct() bool

func (*Statement) HasReturning

func (s *Statement) HasReturning() bool

func (*Statement) IsCreateTable

func (s *Statement) IsCreateTable() bool

func (*Statement) IsDelete

func (s *Statement) IsDelete() bool

func (*Statement) IsExplain

func (s *Statement) IsExplain() bool

func (*Statement) IsInsert

func (s *Statement) IsInsert() bool

func (*Statement) IsSelect

func (s *Statement) IsSelect() bool

func (*Statement) IsUpdate

func (s *Statement) IsUpdate() bool

func (*Statement) Limit added in v0.5.9

func (s *Statement) Limit() int

func (*Statement) ModifiesDatabase added in v0.0.12

func (s *Statement) ModifiesDatabase() bool

func (*Statement) OrderBy added in v0.5.6

func (s *Statement) OrderBy() []string

func (*Statement) Parameters

func (s *Statement) Parameters() []string

func (*Statement) RewriteQueryToAggregate added in v0.6.5

func (s *Statement) RewriteQueryToAggregate() (query string, aggregateFunctions map[int]*sql.Call, newColumns map[int]int, err error)

func (*Statement) Rollback

func (s *Statement) Rollback() bool

func (*Statement) Source

func (s *Statement) Source() string

func (*Statement) SourceWithIfExists added in v0.0.10

func (s *Statement) SourceWithIfExists() string

func (*Statement) Type

func (s *Statement) Type() string

func (*Statement) Visit added in v0.6.0

func (s *Statement) Visit(n sql.Node) (w sql.Visitor, node sql.Node, err error)

func (*Statement) VisitEnd added in v0.6.0

func (s *Statement) VisitEnd(n sql.Node) (sql.Node, error)

type StaticLeader added in v0.1.0

type StaticLeader struct {
	Target string
}

func (*StaticLeader) IsLeader added in v0.1.0

func (s *StaticLeader) IsLeader() bool

func (*StaticLeader) Ready added in v0.1.1

func (s *StaticLeader) Ready() chan struct{}

func (*StaticLeader) RedirectTarget added in v0.1.0

func (s *StaticLeader) RedirectTarget() string

type Subscriber added in v0.3.0

type Subscriber interface {
	TxSeqTracker
	SetDB(*sql.DB)
	Start() error
	RemoveConsumer(ctx context.Context, name string) error
	DeliveredInfo(ctx context.Context, name string) (any, error)
}

type TxSeqTracker added in v0.4.8

type TxSeqTracker interface {
	LatestSeq() uint64
}

type TxSeqTrackerProvider added in v0.4.8

type TxSeqTrackerProvider func() TxSeqTracker

type WriterPublisher added in v0.0.6

type WriterPublisher struct {
	// contains filtered or unexported fields
}

func NewWriterPublisher added in v0.0.6

func NewWriterPublisher(w io.Writer, serializer ChangeSetSerializer) *WriterPublisher

func (*WriterPublisher) Publish added in v0.0.6

func (p *WriterPublisher) Publish(cs *ChangeSet) error

func (*WriterPublisher) Sequence added in v0.0.20

func (p *WriterPublisher) Sequence() uint64

Directories

Path Synopsis
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL