Documentation
¶
Index ¶
- func ApplyMigrations(ctx context.Context, db *sql.DB, dir string, logger *zap.Logger) error
- func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger *zap.Logger) error
- type Client
- type ClusterDiscoveryService
- func (c *ClusterDiscoveryService) FindJoinTargets() []string
- func (c *ClusterDiscoveryService) ForceWritePeersJSON() error
- func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata
- func (c *ClusterDiscoveryService) GetAllPeers() []*discovery.RQLiteNodeMetadata
- func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics
- func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata
- func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool
- func (c *ClusterDiscoveryService) Start(ctx context.Context) error
- func (c *ClusterDiscoveryService) Stop()
- func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error
- func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error
- func (c *ClusterDiscoveryService) TriggerSync()
- func (c *ClusterDiscoveryService) UpdateOwnMetadata()
- func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context)
- type ClusterMetrics
- type FindOption
- type HTTPGateway
- type PeerHealth
- type QueryBuilder
- func (qb *QueryBuilder) Alias(a string) *QueryBuilder
- func (qb *QueryBuilder) AndWhere(expr string, args ...any) *QueryBuilder
- func (qb *QueryBuilder) Build() (string, []any)
- func (qb *QueryBuilder) GetMany(ctx context.Context, dest any) error
- func (qb *QueryBuilder) GetOne(ctx context.Context, dest any) error
- func (qb *QueryBuilder) GroupBy(cols ...string) *QueryBuilder
- func (qb *QueryBuilder) InnerJoin(table string, on string) *QueryBuilder
- func (qb *QueryBuilder) Join(table string, on string) *QueryBuilder
- func (qb *QueryBuilder) LeftJoin(table string, on string) *QueryBuilder
- func (qb *QueryBuilder) Limit(n int) *QueryBuilder
- func (qb *QueryBuilder) Offset(n int) *QueryBuilder
- func (qb *QueryBuilder) OrWhere(expr string, args ...any) *QueryBuilder
- func (qb *QueryBuilder) OrderBy(exprs ...string) *QueryBuilder
- func (qb *QueryBuilder) Select(cols ...string) *QueryBuilder
- func (qb *QueryBuilder) Where(expr string, args ...any) *QueryBuilder
- type RQLiteAdapter
- type RQLiteManager
- func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error
- func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error
- func (r *RQLiteManager) GetConnection() *gorqlite.Connection
- func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService)
- func (r *RQLiteManager) SetNodeType(nodeType string)
- func (r *RQLiteManager) Start(ctx context.Context) error
- func (r *RQLiteManager) Stop() error
- func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string)
- type RQLiteNode
- type RQLiteNodes
- type RQLiteStatus
- type Repository
- type TableNamer
- type Tx
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ApplyMigrations ¶
ApplyMigrations scans a directory for *.sql files, orders them by numeric prefix, and applies any that are not yet recorded in schema_migrations(version).
func ApplyMigrationsDirs ¶
ApplyMigrationsDirs applies migrations from multiple directories. - Gathers *.sql files from each dir - Parses numeric prefix as the version - Errors if the same version appears in more than one dir (to avoid ambiguity) - Sorts globally by version and applies those not yet in schema_migrations
Types ¶
type Client ¶
type Client interface {
// Query runs an arbitrary SELECT and scans rows into dest (pointer to slice of structs or []map[string]any).
Query(ctx context.Context, dest any, query string, args ...any) error
// Exec runs a write statement (INSERT/UPDATE/DELETE).
Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
// FindBy/FindOneBy provide simple map-based criteria filtering.
FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error
FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error
// Save inserts or updates an entity (single-PK).
Save(ctx context.Context, entity any) error
// Remove deletes by PK (single-PK).
Remove(ctx context.Context, entity any) error
// Repositories (generic layer). Optional but convenient if you use Go generics.
Repository(table string) any
// Fluent query builder for advanced querying.
CreateQueryBuilder(table string) *QueryBuilder
// Tx executes a function within a transaction.
Tx(ctx context.Context, fn func(tx Tx) error) error
}
Client is the high-level ORM-like API.
func NewClientFromAdapter ¶
func NewClientFromAdapter(adapter *RQLiteAdapter) Client
NewClientFromAdapter is convenient if you already created the adapter.
type ClusterDiscoveryService ¶ added in v0.53.2
type ClusterDiscoveryService struct {
// contains filtered or unexported fields
}
ClusterDiscoveryService bridges LibP2P discovery with RQLite cluster management
func NewClusterDiscoveryService ¶ added in v0.53.2
func NewClusterDiscoveryService( h host.Host, discoveryMgr *discovery.Manager, rqliteManager *RQLiteManager, nodeID string, nodeType string, raftAddress string, httpAddress string, dataDir string, logger *zap.Logger, ) *ClusterDiscoveryService
NewClusterDiscoveryService creates a new cluster discovery service
func (*ClusterDiscoveryService) FindJoinTargets ¶ added in v0.53.2
func (c *ClusterDiscoveryService) FindJoinTargets() []string
FindJoinTargets discovers join targets via LibP2P
func (*ClusterDiscoveryService) ForceWritePeersJSON ¶ added in v0.69.13
func (c *ClusterDiscoveryService) ForceWritePeersJSON() error
ForceWritePeersJSON forces writing peers.json regardless of membership changes This is useful after clearing raft state when we need to recreate peers.json
func (*ClusterDiscoveryService) GetActivePeers ¶ added in v0.53.2
func (c *ClusterDiscoveryService) GetActivePeers() []*discovery.RQLiteNodeMetadata
GetActivePeers returns a list of active peers (not including self)
func (*ClusterDiscoveryService) GetAllPeers ¶ added in v0.53.2
func (c *ClusterDiscoveryService) GetAllPeers() []*discovery.RQLiteNodeMetadata
GetAllPeers returns a list of all known peers (including self)
func (*ClusterDiscoveryService) GetMetrics ¶ added in v0.53.2
func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics
GetMetrics returns current cluster metrics
func (*ClusterDiscoveryService) GetNodeWithHighestLogIndex ¶ added in v0.53.2
func (c *ClusterDiscoveryService) GetNodeWithHighestLogIndex() *discovery.RQLiteNodeMetadata
GetNodeWithHighestLogIndex returns the node with the highest Raft log index
func (*ClusterDiscoveryService) HasRecentPeersJSON ¶ added in v0.53.2
func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool
HasRecentPeersJSON checks if peers.json was recently updated
func (*ClusterDiscoveryService) Start ¶ added in v0.53.2
func (c *ClusterDiscoveryService) Start(ctx context.Context) error
Start begins the cluster discovery service
func (*ClusterDiscoveryService) Stop ¶ added in v0.53.2
func (c *ClusterDiscoveryService) Stop()
Stop stops the cluster discovery service
func (*ClusterDiscoveryService) StoreRemotePeerMetadata ¶ added in v0.53.2
func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metadata *discovery.RQLiteNodeMetadata) error
StoreRemotePeerMetadata stores metadata received from a remote peer
func (*ClusterDiscoveryService) TriggerPeerExchange ¶ added in v0.53.2
func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error
TriggerPeerExchange actively exchanges peer information with connected peers This populates the peerstore with RQLite metadata from other nodes
func (*ClusterDiscoveryService) TriggerSync ¶ added in v0.53.2
func (c *ClusterDiscoveryService) TriggerSync()
TriggerSync manually triggers a cluster membership sync
func (*ClusterDiscoveryService) UpdateOwnMetadata ¶ added in v0.53.2
func (c *ClusterDiscoveryService) UpdateOwnMetadata()
UpdateOwnMetadata updates our own RQLite metadata in the peerstore
func (*ClusterDiscoveryService) WaitForDiscoverySettling ¶ added in v0.53.2
func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context)
WaitForDiscoverySettling waits for LibP2P discovery to settle (used on concurrent startup)
type ClusterMetrics ¶ added in v0.53.2
type ClusterMetrics struct {
ClusterSize int
ActiveNodes int
InactiveNodes int
RemovedNodes int
LastUpdate time.Time
DiscoveryStatus string
CurrentLeader string
AveragePeerHealth float64
}
ClusterMetrics contains cluster-wide metrics
type FindOption ¶
type FindOption func(q *QueryBuilder)
FindOption customizes Find queries.
func WithGroupBy ¶
func WithGroupBy(cols ...string) FindOption
func WithJoin ¶
func WithJoin(kind, table, on string) FindOption
func WithLimit ¶
func WithLimit(n int) FindOption
func WithOffset ¶
func WithOffset(n int) FindOption
func WithOrderBy ¶
func WithOrderBy(exprs ...string) FindOption
func WithSelect ¶
func WithSelect(cols ...string) FindOption
type HTTPGateway ¶
type HTTPGateway struct {
// Client is the ORM-like rqlite client to execute operations against.
Client Client
// BasePath is the prefix for all routes, e.g. "/v1/db".
// If empty, defaults to "/v1/db". A trailing slash is trimmed.
BasePath string
// Optional: Request timeout. If > 0, handlers will use a context with this timeout.
Timeout time.Duration
}
HTTPGateway exposes the ORM Client as a set of HTTP handlers.
func NewHTTPGateway ¶
func NewHTTPGateway(c Client, base string) *HTTPGateway
NewHTTPGateway constructs a new HTTPGateway with sensible defaults.
func (*HTTPGateway) RegisterRoutes ¶
func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers all handlers onto the provided mux under BasePath.
type PeerHealth ¶ added in v0.53.2
type PeerHealth struct {
LastSeen time.Time
LastSuccessful time.Time
FailureCount int
Status string // "active", "degraded", "inactive"
}
PeerHealth tracks the health status of a peer
type QueryBuilder ¶
type QueryBuilder struct {
// contains filtered or unexported fields
}
QueryBuilder implements a fluent SELECT builder with joins, where, etc.
func (*QueryBuilder) Alias ¶
func (qb *QueryBuilder) Alias(a string) *QueryBuilder
func (*QueryBuilder) AndWhere ¶
func (qb *QueryBuilder) AndWhere(expr string, args ...any) *QueryBuilder
func (*QueryBuilder) Build ¶
func (qb *QueryBuilder) Build() (string, []any)
Build returns the SQL string and args for a SELECT.
func (*QueryBuilder) GetMany ¶
func (qb *QueryBuilder) GetMany(ctx context.Context, dest any) error
GetMany executes the built query and scans into dest (pointer to slice).
func (*QueryBuilder) GetOne ¶
func (qb *QueryBuilder) GetOne(ctx context.Context, dest any) error
GetOne executes the built query and scans into dest (pointer to struct or map) with LIMIT 1.
func (*QueryBuilder) GroupBy ¶
func (qb *QueryBuilder) GroupBy(cols ...string) *QueryBuilder
func (*QueryBuilder) InnerJoin ¶
func (qb *QueryBuilder) InnerJoin(table string, on string) *QueryBuilder
func (*QueryBuilder) Join ¶
func (qb *QueryBuilder) Join(table string, on string) *QueryBuilder
func (*QueryBuilder) LeftJoin ¶
func (qb *QueryBuilder) LeftJoin(table string, on string) *QueryBuilder
func (*QueryBuilder) Limit ¶
func (qb *QueryBuilder) Limit(n int) *QueryBuilder
func (*QueryBuilder) Offset ¶
func (qb *QueryBuilder) Offset(n int) *QueryBuilder
func (*QueryBuilder) OrWhere ¶
func (qb *QueryBuilder) OrWhere(expr string, args ...any) *QueryBuilder
func (*QueryBuilder) OrderBy ¶
func (qb *QueryBuilder) OrderBy(exprs ...string) *QueryBuilder
func (*QueryBuilder) Select ¶
func (qb *QueryBuilder) Select(cols ...string) *QueryBuilder
func (*QueryBuilder) Where ¶
func (qb *QueryBuilder) Where(expr string, args ...any) *QueryBuilder
type RQLiteAdapter ¶
type RQLiteAdapter struct {
// contains filtered or unexported fields
}
RQLiteAdapter adapts RQLite to the sql.DB interface
func NewRQLiteAdapter ¶
func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error)
NewRQLiteAdapter creates a new adapter that provides sql.DB interface for RQLite
func (*RQLiteAdapter) Close ¶
func (a *RQLiteAdapter) Close() error
Close closes the adapter connections
func (*RQLiteAdapter) GetManager ¶
func (a *RQLiteAdapter) GetManager() *RQLiteManager
GetManager returns the underlying RQLite manager for advanced operations
func (*RQLiteAdapter) GetSQLDB ¶
func (a *RQLiteAdapter) GetSQLDB() *sql.DB
GetSQLDB returns the sql.DB interface for compatibility with existing storage service
type RQLiteManager ¶
type RQLiteManager struct {
// contains filtered or unexported fields
}
RQLiteManager manages an RQLite node instance
func NewRQLiteManager ¶
func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.DiscoveryConfig, dataDir string, logger *zap.Logger) *RQLiteManager
NewRQLiteManager creates a new RQLite manager
func (*RQLiteManager) ApplyMigrations ¶
func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error
ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager.
func (*RQLiteManager) ApplyMigrationsDirs ¶
func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error
ApplyMigrationsDirs is the multi-dir variant on RQLiteManager.
func (*RQLiteManager) GetConnection ¶
func (r *RQLiteManager) GetConnection() *gorqlite.Connection
GetConnection returns the RQLite connection GetConnection returns the RQLite connection
func (*RQLiteManager) SetDiscoveryService ¶ added in v0.53.2
func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService)
SetDiscoveryService sets the cluster discovery service for this RQLite manager
func (*RQLiteManager) SetNodeType ¶ added in v0.72.0
func (r *RQLiteManager) SetNodeType(nodeType string)
SetNodeType sets the node type for this RQLite manager
func (*RQLiteManager) Start ¶
func (r *RQLiteManager) Start(ctx context.Context) error
Start starts the RQLite node
func (*RQLiteManager) UpdateAdvertisedAddresses ¶ added in v0.72.0
func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string)
UpdateAdvertisedAddresses overrides the discovery advertised addresses when cluster discovery infers a better host than what was provided via configuration (e.g. replacing localhost).
type RQLiteNode ¶ added in v0.53.2
type RQLiteNode struct {
ID string `json:"id"`
Address string `json:"address"`
Leader bool `json:"leader"`
Voter bool `json:"voter"`
Reachable bool `json:"reachable"`
}
RQLiteNode represents a node in the RQLite cluster
type RQLiteNodes ¶ added in v0.53.2
type RQLiteNodes []RQLiteNode
RQLiteNodes represents the response from RQLite's /nodes endpoint
type RQLiteStatus ¶ added in v0.53.2
type RQLiteStatus struct {
Store struct {
Raft struct {
AppliedIndex uint64 `json:"applied_index"`
CommitIndex uint64 `json:"commit_index"`
LastLogIndex uint64 `json:"last_log_index"`
LastSnapshotIndex uint64 `json:"last_snapshot_index"`
State string `json:"state"`
LeaderID string `json:"leader_id"`
LeaderAddr string `json:"leader_addr"`
Term uint64 `json:"term"`
NumPeers int `json:"num_peers"`
Voter bool `json:"voter"`
} `json:"raft"`
DBConf struct {
DSN string `json:"dsn"`
Memory bool `json:"memory"`
} `json:"db_conf"`
} `json:"store"`
Runtime struct {
GOARCH string `json:"GOARCH"`
GOOS string `json:"GOOS"`
GOMAXPROCS int `json:"GOMAXPROCS"`
NumCPU int `json:"num_cpu"`
NumGoroutine int `json:"num_goroutine"`
Version string `json:"version"`
} `json:"runtime"`
HTTP struct {
Addr string `json:"addr"`
Auth string `json:"auth"`
} `json:"http"`
Node struct {
Uptime string `json:"uptime"`
StartTime string `json:"start_time"`
} `json:"node"`
}
RQLiteStatus represents the response from RQLite's /status endpoint
type Repository ¶
type Repository[T any] interface { Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error Save(ctx context.Context, entity *T) error Remove(ctx context.Context, entity *T) error // Builder helpers Q() *QueryBuilder }
Repository provides typed entity operations for a table.
type TableNamer ¶
type TableNamer interface {
TableName() string
}
TableNamer lets a struct provide its table name.
type Tx ¶
type Tx interface {
Query(ctx context.Context, dest any, query string, args ...any) error
Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
CreateQueryBuilder(table string) *QueryBuilder
// Optional: scoped Save/Remove inside tx
Save(ctx context.Context, entity any) error
Remove(ctx context.Context, entity any) error
}
Tx mirrors Client but executes within a transaction.