rqlite

package
v0.90.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2026 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotPointer is returned when a non-pointer is passed where a pointer is required.
	ErrNotPointer = errors.New("dest must be a non-nil pointer")

	// ErrNotSlice is returned when dest is not a pointer to a slice.
	ErrNotSlice = errors.New("dest must be pointer to a slice")

	// ErrNotStruct is returned when entity is not a struct.
	ErrNotStruct = errors.New("entity must point to a struct")

	// ErrNoPrimaryKey is returned when no primary key field is found.
	ErrNoPrimaryKey = errors.New("no primary key field found (tag db:\"...,pk\" or field named ID)")

	// ErrNoTableName is returned when unable to resolve table name.
	ErrNoTableName = errors.New("unable to resolve table name; implement TableNamer or set up a repository with explicit table")

	// ErrEntityMustBePointer is returned when entity is not a non-nil pointer to struct.
	ErrEntityMustBePointer = errors.New("entity must be a non-nil pointer to struct")
)

Functions

func ApplyMigrations

func ApplyMigrations(ctx context.Context, db *sql.DB, dir string, logger *zap.Logger) error

ApplyMigrations scans a directory for *.sql files, orders them by numeric prefix, and applies any that are not yet recorded in schema_migrations(version).

func ApplyMigrationsDirs

func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger *zap.Logger) error

ApplyMigrationsDirs applies migrations from multiple directories. - Gathers *.sql files from each dir - Parses numeric prefix as the version - Errors if the same version appears in more than one dir (to avoid ambiguity) - Sorts globally by version and applies those not yet in schema_migrations

Types

type Client

type Client interface {
	// Query runs an arbitrary SELECT and scans rows into dest (pointer to slice of structs or []map[string]any).
	Query(ctx context.Context, dest any, query string, args ...any) error
	// Exec runs a write statement (INSERT/UPDATE/DELETE).
	Exec(ctx context.Context, query string, args ...any) (sql.Result, error)

	// FindBy/FindOneBy provide simple map-based criteria filtering.
	FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error
	FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error

	// Save inserts or updates an entity (single-PK).
	Save(ctx context.Context, entity any) error
	// Remove deletes by PK (single-PK).
	Remove(ctx context.Context, entity any) error

	// Repositories (generic layer). Optional but convenient if you use Go generics.
	Repository(table string) any

	// Fluent query builder for advanced querying.
	CreateQueryBuilder(table string) *QueryBuilder

	// Tx executes a function within a transaction.
	Tx(ctx context.Context, fn func(tx Tx) error) error
}

Client is the high-level ORM-like API.

func NewClient

func NewClient(db *sql.DB) Client

NewClient wires the ORM client to a *sql.DB (from your RQLiteAdapter).

func NewClientFromAdapter

func NewClientFromAdapter(adapter *RQLiteAdapter) Client

NewClientFromAdapter is convenient if you already created the adapter.

type ClusterDiscoveryService added in v0.53.2

type ClusterDiscoveryService struct {
	// contains filtered or unexported fields
}

ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management

func NewClusterDiscoveryService added in v0.53.2

func NewClusterDiscoveryService(
	h host.Host,
	discoveryMgr *discovery.Manager,
	rqliteManager *RQLiteManager,
	nodeID string,
	nodeType string,
	raftAddress string,
	httpAddress string,
	dataDir string,
	logger *zap.Logger,
) *ClusterDiscoveryService

NewClusterDiscoveryService creates a new cluster discovery service

func (*ClusterDiscoveryService) FindJoinTargets added in v0.53.2

func (c *ClusterDiscoveryService) FindJoinTargets() []string

FindJoinTargets discovers join targets via LibP2P

func (*ClusterDiscoveryService) ForceWritePeersJSON added in v0.69.13

func (c *ClusterDiscoveryService) ForceWritePeersJSON() error

ForceWritePeersJSON forces writing peers.json regardless of membership changes

func (*ClusterDiscoveryService) GetActivePeers added in v0.53.2

func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata

GetActivePeers returns a list of active peers (not including self)

func (*ClusterDiscoveryService) GetAllPeers added in v0.53.2

GetAllPeers returns a list of all known peers (including self)

func (*ClusterDiscoveryService) GetMetrics added in v0.53.2

func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics

GetMetrics returns current cluster metrics

func (*ClusterDiscoveryService) GetNodeWithHighestLogIndex added in v0.53.2

func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata

GetNodeWithHighestLogIndex returns the node with the highest Raft log index

func (*ClusterDiscoveryService) HasRecentPeersJSON added in v0.53.2

func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool

HasRecentPeersJSON checks if peers.json was recently updated

func (*ClusterDiscoveryService) Start added in v0.53.2

Start begins the cluster discovery service

func (*ClusterDiscoveryService) Stop added in v0.53.2

func (c *ClusterDiscoveryService) Stop()

Stop stops the cluster discovery service

func (*ClusterDiscoveryService) StoreRemotePeerMetadata added in v0.53.2

func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error

StoreRemotePeerMetadata stores metadata received from a remote peer

func (*ClusterDiscoveryService) TriggerPeerExchange added in v0.53.2

func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error

TriggerPeerExchange actively exchanges peer information with connected peers

func (*ClusterDiscoveryService) TriggerSync added in v0.53.2

func (c *ClusterDiscoveryService) TriggerSync()

TriggerSync manually triggers a cluster membership sync

func (*ClusterDiscoveryService) UpdateOwnMetadata added in v0.53.2

func (c *ClusterDiscoveryService) UpdateOwnMetadata()

UpdateOwnMetadata updates our own RQLite metadata in the peerstore

func (*ClusterDiscoveryService) WaitForDiscoverySettling added in v0.53.2

func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context)

WaitForDiscoverySettling waits for LibP2P discovery to settle (used on concurrent startup)

type ClusterMetrics added in v0.53.2

type ClusterMetrics struct {
	ClusterSize       int
	ActiveNodes       int
	InactiveNodes     int
	RemovedNodes      int
	LastUpdate        time.Time
	DiscoveryStatus   string
	CurrentLeader     string
	AveragePeerHealth float64
}

ClusterMetrics contains cluster-wide metrics

type FindOption

type FindOption func(q *QueryBuilder)

FindOption customizes Find queries.

func WithGroupBy

func WithGroupBy(cols ...string) FindOption

WithGroupBy adds GROUP BY clause to query.

func WithJoin

func WithJoin(kind, table, on string) FindOption

WithJoin adds a JOIN clause to query.

func WithLimit

func WithLimit(n int) FindOption

WithLimit adds LIMIT clause to query.

func WithOffset

func WithOffset(n int) FindOption

WithOffset adds OFFSET clause to query.

func WithOrderBy

func WithOrderBy(exprs ...string) FindOption

WithOrderBy adds ORDER BY clause to query.

func WithSelect

func WithSelect(cols ...string) FindOption

WithSelect specifies columns to select.

type HTTPGateway

type HTTPGateway struct {
	// Client is the ORM-like rqlite client to execute operations against.
	Client Client
	// BasePath is the prefix for all routes, e.g. "/v1/db".
	// If empty, defaults to "/v1/db". A trailing slash is trimmed.
	BasePath string

	// Optional: Request timeout. If > 0, handlers will use a context with this timeout.
	Timeout time.Duration
}

HTTPGateway exposes the ORM Client as a set of HTTP handlers.

func NewHTTPGateway

func NewHTTPGateway(c Client, base string) *HTTPGateway

NewHTTPGateway constructs a new HTTPGateway with sensible defaults.

func (*HTTPGateway) RegisterRoutes

func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers all handlers onto the provided mux under BasePath.

type PeerHealth added in v0.53.2

type PeerHealth struct {
	LastSeen       time.Time
	LastSuccessful time.Time
	FailureCount   int
	Status         string // "active", "degraded", "inactive"
}

PeerHealth tracks the health status of a peer

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder implements a fluent SELECT builder with joins, where, etc.

func (*QueryBuilder) Alias

func (qb *QueryBuilder) Alias(a string) *QueryBuilder

Alias sets an alias for the main table.

func (*QueryBuilder) AndWhere

func (qb *QueryBuilder) AndWhere(expr string, args ...any) *QueryBuilder

AndWhere adds an AND WHERE clause.

func (*QueryBuilder) Build

func (qb *QueryBuilder) Build() (string, []any)

Build returns the SQL string and args for a SELECT.

func (*QueryBuilder) GetMany

func (qb *QueryBuilder) GetMany(ctx context.Context, dest any) error

GetMany executes the built query and scans into dest (pointer to slice).

func (*QueryBuilder) GetOne

func (qb *QueryBuilder) GetOne(ctx context.Context, dest any) error

GetOne executes the built query and scans into dest (pointer to struct or map) with LIMIT 1.

func (*QueryBuilder) GroupBy

func (qb *QueryBuilder) GroupBy(cols ...string) *QueryBuilder

GroupBy adds GROUP BY columns.

func (*QueryBuilder) InnerJoin

func (qb *QueryBuilder) InnerJoin(table string, on string) *QueryBuilder

InnerJoin adds an INNER JOIN clause.

func (*QueryBuilder) Join

func (qb *QueryBuilder) Join(table string, on string) *QueryBuilder

Join adds a JOIN clause.

func (*QueryBuilder) LeftJoin

func (qb *QueryBuilder) LeftJoin(table string, on string) *QueryBuilder

LeftJoin adds a LEFT JOIN clause.

func (*QueryBuilder) Limit

func (qb *QueryBuilder) Limit(n int) *QueryBuilder

Limit sets the LIMIT clause.

func (*QueryBuilder) Offset

func (qb *QueryBuilder) Offset(n int) *QueryBuilder

Offset sets the OFFSET clause.

func (*QueryBuilder) OrWhere

func (qb *QueryBuilder) OrWhere(expr string, args ...any) *QueryBuilder

OrWhere adds an OR WHERE clause.

func (*QueryBuilder) OrderBy

func (qb *QueryBuilder) OrderBy(exprs ...string) *QueryBuilder

OrderBy adds ORDER BY expressions.

func (*QueryBuilder) Select

func (qb *QueryBuilder) Select(cols ...string) *QueryBuilder

Select specifies columns to select.

func (*QueryBuilder) Where

func (qb *QueryBuilder) Where(expr string, args ...any) *QueryBuilder

Where adds a WHERE clause (same as AndWhere).

type RQLiteAdapter

type RQLiteAdapter struct {
	// contains filtered or unexported fields
}

RQLiteAdapter adapts RQLite to the sql.DB interface

func NewRQLiteAdapter

func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error)

NewRQLiteAdapter creates a new adapter that provides sql.DB interface for RQLite

func (*RQLiteAdapter) Close

func (a *RQLiteAdapter) Close() error

Close closes the adapter connections

func (*RQLiteAdapter) GetManager

func (a *RQLiteAdapter) GetManager() *RQLiteManager

GetManager returns the underlying RQLite manager for advanced operations

func (*RQLiteAdapter) GetSQLDB

func (a *RQLiteAdapter) GetSQLDB() *sql.DB

GetSQLDB returns the sql.DB interface for compatibility with existing storage service

type RQLiteManager

type RQLiteManager struct {
	// contains filtered or unexported fields
}

RQLiteManager manages an RQLite node instance

func NewRQLiteManager

func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.DiscoveryConfig, dataDir string, logger *zap.Logger) *RQLiteManager

NewRQLiteManager creates a new RQLite manager

func (*RQLiteManager) ApplyMigrations

func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error

ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager.

func (*RQLiteManager) ApplyMigrationsDirs

func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error

ApplyMigrationsDirs is the multi-dir variant on RQLiteManager.

func (*RQLiteManager) GetConnection

func (r *RQLiteManager) GetConnection() *gorqlite.Connection

GetConnection returns the RQLite connection

func (*RQLiteManager) SetDiscoveryService added in v0.53.2

func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService)

SetDiscoveryService sets the cluster discovery service

func (*RQLiteManager) SetNodeType added in v0.72.0

func (r *RQLiteManager) SetNodeType(nodeType string)

SetNodeType sets the node type

func (*RQLiteManager) Start

func (r *RQLiteManager) Start(ctx context.Context) error

Start starts the RQLite node

func (*RQLiteManager) Stop

func (r *RQLiteManager) Stop() error

Stop stops the RQLite node

func (*RQLiteManager) UpdateAdvertisedAddresses added in v0.72.0

func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string)

UpdateAdvertisedAddresses overrides advertised addresses

type RQLiteNode added in v0.53.2

type RQLiteNode struct {
	ID        string `json:"id"`
	Address   string `json:"address"`
	Leader    bool   `json:"leader"`
	Voter     bool   `json:"voter"`
	Reachable bool   `json:"reachable"`
}

RQLiteNode represents a node in the RQLite cluster

type RQLiteNodes added in v0.53.2

type RQLiteNodes []RQLiteNode

RQLiteNodes represents the response from RQLite's /nodes endpoint

type RQLiteStatus added in v0.53.2

type RQLiteStatus struct {
	Store struct {
		Raft struct {
			AppliedIndex      uint64 `json:"applied_index"`
			CommitIndex       uint64 `json:"commit_index"`
			LastLogIndex      uint64 `json:"last_log_index"`
			LastSnapshotIndex uint64 `json:"last_snapshot_index"`
			State             string `json:"state"`
			LeaderID          string `json:"leader_id"`
			LeaderAddr        string `json:"leader_addr"`
			Term              uint64 `json:"term"`
			NumPeers          int    `json:"num_peers"`
			Voter             bool   `json:"voter"`
		} `json:"raft"`
		DBConf struct {
			DSN    string `json:"dsn"`
			Memory bool   `json:"memory"`
		} `json:"db_conf"`
	} `json:"store"`
	Runtime struct {
		GOARCH       string `json:"GOARCH"`
		GOOS         string `json:"GOOS"`
		GOMAXPROCS   int    `json:"GOMAXPROCS"`
		NumCPU       int    `json:"num_cpu"`
		NumGoroutine int    `json:"num_goroutine"`
		Version      string `json:"version"`
	} `json:"runtime"`
	HTTP struct {
		Addr string `json:"addr"`
		Auth string `json:"auth"`
	} `json:"http"`
	Node struct {
		Uptime    string `json:"uptime"`
		StartTime string `json:"start_time"`
	} `json:"node"`
}

RQLiteStatus represents the response from RQLite's /status endpoint

type Repository

type Repository[T any] interface {
	Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error
	FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error
	Save(ctx context.Context, entity *T) error
	Remove(ctx context.Context, entity *T) error

	// Builder helpers
	Q() *QueryBuilder
}

Repository provides typed entity operations for a table.

type TableNamer

type TableNamer interface {
	TableName() string
}

TableNamer lets a struct provide its table name.

type Tx

type Tx interface {
	Query(ctx context.Context, dest any, query string, args ...any) error
	Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
	CreateQueryBuilder(table string) *QueryBuilder

	// Optional: scoped Save/Remove inside tx
	Save(ctx context.Context, entity any) error
	Remove(ctx context.Context, entity any) error
}

Tx mirrors Client but executes within a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL