rqlite

package
v0.72.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyMigrations

func ApplyMigrations(ctx context.Context, db *sql.DB, dir string, logger *zap.Logger) error

ApplyMigrations scans a directory for *.sql files, orders them by numeric prefix, and applies any that are not yet recorded in schema_migrations(version).

func ApplyMigrationsDirs

func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger *zap.Logger) error

ApplyMigrationsDirs applies migrations from multiple directories. - Gathers *.sql files from each dir - Parses numeric prefix as the version - Errors if the same version appears in more than one dir (to avoid ambiguity) - Sorts globally by version and applies those not yet in schema_migrations

Types

type Client

type Client interface {
	// Query runs an arbitrary SELECT and scans rows into dest (pointer to slice of structs or []map[string]any).
	Query(ctx context.Context, dest any, query string, args ...any) error
	// Exec runs a write statement (INSERT/UPDATE/DELETE).
	Exec(ctx context.Context, query string, args ...any) (sql.Result, error)

	// FindBy/FindOneBy provide simple map-based criteria filtering.
	FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error
	FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error

	// Save inserts or updates an entity (single-PK).
	Save(ctx context.Context, entity any) error
	// Remove deletes by PK (single-PK).
	Remove(ctx context.Context, entity any) error

	// Repositories (generic layer). Optional but convenient if you use Go generics.
	Repository(table string) any

	// Fluent query builder for advanced querying.
	CreateQueryBuilder(table string) *QueryBuilder

	// Tx executes a function within a transaction.
	Tx(ctx context.Context, fn func(tx Tx) error) error
}

Client is the high-level ORM-like API.

func NewClient

func NewClient(db *sql.DB) Client

NewClient wires the ORM client to a *sql.DB (from your RQLiteAdapter).

func NewClientFromAdapter

func NewClientFromAdapter(adapter *RQLiteAdapter) Client

NewClientFromAdapter is convenient if you already created the adapter.

type ClusterDiscoveryService added in v0.53.2

type ClusterDiscoveryService struct {
	// contains filtered or unexported fields
}

ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management

func NewClusterDiscoveryService added in v0.53.2

func NewClusterDiscoveryService(
	h host.Host,
	discoveryMgr *discovery.Manager,
	rqliteManager *RQLiteManager,
	nodeID string,
	nodeType string,
	raftAddress string,
	httpAddress string,
	dataDir string,
	logger *zap.Logger,
) *ClusterDiscoveryService

NewClusterDiscoveryService creates a new cluster discovery service

func (*ClusterDiscoveryService) FindJoinTargets added in v0.53.2

func (c *ClusterDiscoveryService) FindJoinTargets() []string

FindJoinTargets discovers join targets via LibP2P

func (*ClusterDiscoveryService) ForceWritePeersJSON added in v0.69.13

func (c *ClusterDiscoveryService) ForceWritePeersJSON() error

ForceWritePeersJSON forces writing peers.json regardless of membership changes This is useful after clearing raft state when we need to recreate peers.json

func (*ClusterDiscoveryService) GetActivePeers added in v0.53.2

func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata

GetActivePeers returns a list of active peers (not including self)

func (*ClusterDiscoveryService) GetAllPeers added in v0.53.2

GetAllPeers returns a list of all known peers (including self)

func (*ClusterDiscoveryService) GetMetrics added in v0.53.2

func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics

GetMetrics returns current cluster metrics

func (*ClusterDiscoveryService) GetNodeWithHighestLogIndex added in v0.53.2

func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata

GetNodeWithHighestLogIndex returns the node with the highest Raft log index

func (*ClusterDiscoveryService) HasRecentPeersJSON added in v0.53.2

func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool

HasRecentPeersJSON checks if peers.json was recently updated

func (*ClusterDiscoveryService) Start added in v0.53.2

Start begins the cluster discovery service

func (*ClusterDiscoveryService) Stop added in v0.53.2

func (c *ClusterDiscoveryService) Stop()

Stop stops the cluster discovery service

func (*ClusterDiscoveryService) StoreRemotePeerMetadata added in v0.53.2

func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error

StoreRemotePeerMetadata stores metadata received from a remote peer

func (*ClusterDiscoveryService) TriggerPeerExchange added in v0.53.2

func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error

TriggerPeerExchange actively exchanges peer information with connected peers This populates the peerstore with RQLite metadata from other nodes

func (*ClusterDiscoveryService) TriggerSync added in v0.53.2

func (c *ClusterDiscoveryService) TriggerSync()

TriggerSync manually triggers a cluster membership sync

func (*ClusterDiscoveryService) UpdateOwnMetadata added in v0.53.2

func (c *ClusterDiscoveryService) UpdateOwnMetadata()

UpdateOwnMetadata updates our own RQLite metadata in the peerstore

func (*ClusterDiscoveryService) WaitForDiscoverySettling added in v0.53.2

func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context)

WaitForDiscoverySettling waits for LibP2P discovery to settle (used on concurrent startup)

type ClusterMetrics added in v0.53.2

type ClusterMetrics struct {
	ClusterSize       int
	ActiveNodes       int
	InactiveNodes     int
	RemovedNodes      int
	LastUpdate        time.Time
	DiscoveryStatus   string
	CurrentLeader     string
	AveragePeerHealth float64
}

ClusterMetrics contains cluster-wide metrics

type FindOption

type FindOption func(q *QueryBuilder)

FindOption customizes Find queries.

func WithGroupBy

func WithGroupBy(cols ...string) FindOption

func WithJoin

func WithJoin(kind, table, on string) FindOption

func WithLimit

func WithLimit(n int) FindOption

func WithOffset

func WithOffset(n int) FindOption

func WithOrderBy

func WithOrderBy(exprs ...string) FindOption

func WithSelect

func WithSelect(cols ...string) FindOption

type HTTPGateway

type HTTPGateway struct {
	// Client is the ORM-like rqlite client to execute operations against.
	Client Client
	// BasePath is the prefix for all routes, e.g. "/v1/db".
	// If empty, defaults to "/v1/db". A trailing slash is trimmed.
	BasePath string

	// Optional: Request timeout. If > 0, handlers will use a context with this timeout.
	Timeout time.Duration
}

HTTPGateway exposes the ORM Client as a set of HTTP handlers.

func NewHTTPGateway

func NewHTTPGateway(c Client, base string) *HTTPGateway

NewHTTPGateway constructs a new HTTPGateway with sensible defaults.

func (*HTTPGateway) RegisterRoutes

func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers all handlers onto the provided mux under BasePath.

type PeerHealth added in v0.53.2

type PeerHealth struct {
	LastSeen       time.Time
	LastSuccessful time.Time
	FailureCount   int
	Status         string // "active", "degraded", "inactive"
}

PeerHealth tracks the health status of a peer

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder implements a fluent SELECT builder with joins, where, etc.

func (*QueryBuilder) Alias

func (qb *QueryBuilder) Alias(a string) *QueryBuilder

func (*QueryBuilder) AndWhere

func (qb *QueryBuilder) AndWhere(expr string, args ...any) *QueryBuilder

func (*QueryBuilder) Build

func (qb *QueryBuilder) Build() (string, []any)

Build returns the SQL string and args for a SELECT.

func (*QueryBuilder) GetMany

func (qb *QueryBuilder) GetMany(ctx context.Context, dest any) error

GetMany executes the built query and scans into dest (pointer to slice).

func (*QueryBuilder) GetOne

func (qb *QueryBuilder) GetOne(ctx context.Context, dest any) error

GetOne executes the built query and scans into dest (pointer to struct or map) with LIMIT 1.

func (*QueryBuilder) GroupBy

func (qb *QueryBuilder) GroupBy(cols ...string) *QueryBuilder

func (*QueryBuilder) InnerJoin

func (qb *QueryBuilder) InnerJoin(table string, on string) *QueryBuilder

func (*QueryBuilder) Join

func (qb *QueryBuilder) Join(table string, on string) *QueryBuilder

func (*QueryBuilder) LeftJoin

func (qb *QueryBuilder) LeftJoin(table string, on string) *QueryBuilder

func (*QueryBuilder) Limit

func (qb *QueryBuilder) Limit(n int) *QueryBuilder

func (*QueryBuilder) Offset

func (qb *QueryBuilder) Offset(n int) *QueryBuilder

func (*QueryBuilder) OrWhere

func (qb *QueryBuilder) OrWhere(expr string, args ...any) *QueryBuilder

func (*QueryBuilder) OrderBy

func (qb *QueryBuilder) OrderBy(exprs ...string) *QueryBuilder

func (*QueryBuilder) Select

func (qb *QueryBuilder) Select(cols ...string) *QueryBuilder

func (*QueryBuilder) Where

func (qb *QueryBuilder) Where(expr string, args ...any) *QueryBuilder

type RQLiteAdapter

type RQLiteAdapter struct {
	// contains filtered or unexported fields
}

RQLiteAdapter adapts RQLite to the sql.DB interface

func NewRQLiteAdapter

func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error)

NewRQLiteAdapter creates a new adapter that provides sql.DB interface for RQLite

func (*RQLiteAdapter) Close

func (a *RQLiteAdapter) Close() error

Close closes the adapter connections

func (*RQLiteAdapter) GetManager

func (a *RQLiteAdapter) GetManager() *RQLiteManager

GetManager returns the underlying RQLite manager for advanced operations

func (*RQLiteAdapter) GetSQLDB

func (a *RQLiteAdapter) GetSQLDB() *sql.DB

GetSQLDB returns the sql.DB interface for compatibility with existing storage service

type RQLiteManager

type RQLiteManager struct {
	// contains filtered or unexported fields
}

RQLiteManager manages an RQLite node instance

func NewRQLiteManager

func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.DiscoveryConfig, dataDir string, logger *zap.Logger) *RQLiteManager

NewRQLiteManager creates a new RQLite manager

func (*RQLiteManager) ApplyMigrations

func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error

ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager.

func (*RQLiteManager) ApplyMigrationsDirs

func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error

ApplyMigrationsDirs is the multi-dir variant on RQLiteManager.

func (*RQLiteManager) GetConnection

func (r *RQLiteManager) GetConnection() *gorqlite.Connection

GetConnection returns the RQLite connection GetConnection returns the RQLite connection

func (*RQLiteManager) SetDiscoveryService added in v0.53.2

func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService)

SetDiscoveryService sets the cluster discovery service for this RQLite manager

func (*RQLiteManager) SetNodeType added in v0.72.0

func (r *RQLiteManager) SetNodeType(nodeType string)

SetNodeType sets the node type for this RQLite manager

func (*RQLiteManager) Start

func (r *RQLiteManager) Start(ctx context.Context) error

Start starts the RQLite node

func (*RQLiteManager) Stop

func (r *RQLiteManager) Stop() error

Stop stops the RQLite node

func (*RQLiteManager) UpdateAdvertisedAddresses added in v0.72.0

func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string)

UpdateAdvertisedAddresses overrides the discovery advertised addresses when cluster discovery infers a better host than what was provided via configuration (e.g. replacing localhost).

type RQLiteNode added in v0.53.2

type RQLiteNode struct {
	ID        string `json:"id"`
	Address   string `json:"address"`
	Leader    bool   `json:"leader"`
	Voter     bool   `json:"voter"`
	Reachable bool   `json:"reachable"`
}

RQLiteNode represents a node in the RQLite cluster

type RQLiteNodes added in v0.53.2

type RQLiteNodes []RQLiteNode

RQLiteNodes represents the response from RQLite's /nodes endpoint

type RQLiteStatus added in v0.53.2

type RQLiteStatus struct {
	Store struct {
		Raft struct {
			AppliedIndex      uint64 `json:"applied_index"`
			CommitIndex       uint64 `json:"commit_index"`
			LastLogIndex      uint64 `json:"last_log_index"`
			LastSnapshotIndex uint64 `json:"last_snapshot_index"`
			State             string `json:"state"`
			LeaderID          string `json:"leader_id"`
			LeaderAddr        string `json:"leader_addr"`
			Term              uint64 `json:"term"`
			NumPeers          int    `json:"num_peers"`
			Voter             bool   `json:"voter"`
		} `json:"raft"`
		DBConf struct {
			DSN    string `json:"dsn"`
			Memory bool   `json:"memory"`
		} `json:"db_conf"`
	} `json:"store"`
	Runtime struct {
		GOARCH       string `json:"GOARCH"`
		GOOS         string `json:"GOOS"`
		GOMAXPROCS   int    `json:"GOMAXPROCS"`
		NumCPU       int    `json:"num_cpu"`
		NumGoroutine int    `json:"num_goroutine"`
		Version      string `json:"version"`
	} `json:"runtime"`
	HTTP struct {
		Addr string `json:"addr"`
		Auth string `json:"auth"`
	} `json:"http"`
	Node struct {
		Uptime    string `json:"uptime"`
		StartTime string `json:"start_time"`
	} `json:"node"`
}

RQLiteStatus represents the response from RQLite's /status endpoint

type Repository

type Repository[T any] interface {
	Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error
	FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error
	Save(ctx context.Context, entity *T) error
	Remove(ctx context.Context, entity *T) error

	// Builder helpers
	Q() *QueryBuilder
}

Repository provides typed entity operations for a table.

type TableNamer

type TableNamer interface {
	TableName() string
}

TableNamer lets a struct provide its table name.

type Tx

type Tx interface {
	Query(ctx context.Context, dest any, query string, args ...any) error
	Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
	CreateQueryBuilder(table string) *QueryBuilder

	// Optional: scoped Save/Remove inside tx
	Save(ctx context.Context, entity any) error
	Remove(ctx context.Context, entity any) error
}

Tx mirrors Client but executes within a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL