Documentation
¶
Index ¶
- Variables
- func New(path string, opts ...Options) *qwrBuilder
- func NewSQL(reader, writer *sql.DB, opts ...Options) *qwrBuilder
- func ReleaseQueryBuilder(qb *QueryBuilder)
- type BatchCollector
- type BatchJob
- type BatchResult
- type ErrorCategory
- type ErrorQueue
- func (eq *ErrorQueue) Clear()
- func (eq *ErrorQueue) Close()
- func (eq *ErrorQueue) Count() int
- func (eq *ErrorQueue) Get(jobID int64) (JobError, bool)
- func (eq *ErrorQueue) GetAll() []JobError
- func (eq *ErrorQueue) GetReadyForRetry(now time.Time, maxRetries int) []JobError
- func (eq *ErrorQueue) PersistError(jobErr JobError, reason string) error
- func (eq *ErrorQueue) Remove(jobID int64) bool
- func (eq *ErrorQueue) Store(jobErr JobError)
- type ErrorQueueStats
- type Job
- type JobError
- func (je JobError) Age() time.Duration
- func (je *JobError) CalculateNextRetry(baseDelay time.Duration)
- func (je JobError) CreateRetryQuery() Query
- func (je JobError) ID() int64
- func (je JobError) SQL() (string, []any)
- func (je JobError) ShouldRetry(maxRetries int) bool
- func (je JobError) String() string
- type JobResult
- type JobType
- type Manager
- func (m *Manager) Backup(dest string, method backup.Method) error
- func (m *Manager) Batch(job Job) error
- func (m *Manager) ClearErrors()
- func (m *Manager) Close() error
- func (m *Manager) Database() string
- func (m *Manager) GetCacheMetrics() map[string]StmtCacheStats
- func (m *Manager) GetDetailedCacheMetrics() map[string]interface{}
- func (m *Manager) GetErrorByID(jobID int64) (JobError, bool)
- func (m *Manager) GetErrorQueueStats() ErrorQueueStats
- func (m *Manager) GetErrors() []JobError
- func (m *Manager) GetJobStatus(jobID int64) (string, error)
- func (m *Manager) GetMetrics() MetricsSnapshot
- func (m *Manager) GetReaderProfile() *profile.Profile
- func (m *Manager) GetWriterProfile() *profile.Profile
- func (m *Manager) Query(sql string, args ...any) *QueryBuilder
- func (m *Manager) RemoveError(jobID int64) bool
- func (m *Manager) ResetCacheDetailedMetrics()
- func (m *Manager) ResetCaches()
- func (m *Manager) ResetMetrics()
- func (m *Manager) RetryJob(jobID int64) error
- func (m *Manager) RunCheckpoint(mode checkpoint.Mode) error
- func (m *Manager) RunIncrementalVacuum(pages int) error
- func (m *Manager) RunVacuum() error
- func (m *Manager) SetSecureDelete(enabled bool) error
- func (m *Manager) Transaction(capacity ...int) *Transaction
- func (m *Manager) WaitForIdle(ctx context.Context) error
- type Metrics
- type MetricsSnapshot
- type Options
- type QWRError
- type Query
- type QueryBuilder
- func (qb *QueryBuilder) Async() (int64, error)
- func (qb *QueryBuilder) Batch() (int64, error)
- func (qb *QueryBuilder) Execute() (*QueryResult, error)
- func (qb *QueryBuilder) GenID() *QueryBuilder
- func (qb *QueryBuilder) Prepared() *QueryBuilder
- func (qb *QueryBuilder) Read() (*sql.Rows, error)
- func (qb *QueryBuilder) ReadClose(fn func(*sql.Rows) error) error
- func (qb *QueryBuilder) ReadRow() (*sql.Row, error)
- func (qb *QueryBuilder) Release()
- func (qb *QueryBuilder) WithContext(ctx context.Context) *QueryBuilder
- func (qb *QueryBuilder) Write() (*QueryResult, error)
- type QueryResult
- type ResultType
- type RetryStrategy
- type SlowQuery
- type StmtCache
- type StmtCacheMetrics
- type StmtCacheStats
- type Transaction
- func (t *Transaction) Add(sql string, args ...any) *Transaction
- func (t *Transaction) AddPrepared(sql string, args ...any) *Transaction
- func (t *Transaction) Exec() (*TransactionResult, error)
- func (t *Transaction) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult
- func (t *Transaction) ID() int64
- func (t *Transaction) WithContext(ctx context.Context) *Transaction
- func (t *Transaction) Write() (*TransactionResult, error)
- type TransactionResult
- type WriteSerialiser
- func (wp *WriteSerialiser) SetAsyncErrorHandler(handler func(Query, error, time.Duration))
- func (wp *WriteSerialiser) Start(ctx context.Context)
- func (wp *WriteSerialiser) Stop() error
- func (wp *WriteSerialiser) SubmitNoWait(ctx context.Context, job Job) (int64, error)
- func (wp *WriteSerialiser) SubmitNoWaitNoContext(job Job) (int64, error)
- func (wp *WriteSerialiser) SubmitWait(ctx context.Context, job Job) (JobResult, error)
- func (wp *WriteSerialiser) SubmitWaitNoContext(job Job) (JobResult, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrReaderDisabled = errors.New("reader is disabled") ErrWriterDisabled = errors.New("writer is disabled") ErrResultNotFound = errors.New("result not found") ErrInvalidResult = errors.New("invalid result type") ErrQueryTooLarge = errors.New("query exceeds maximum allowed size") ErrStatementCacheFull = errors.New("statement cache is full") ErrHashCollision = errors.New("statement hash collision") ErrErrorQueueDisabled = errors.New("error queue is disabled") ErrJobNotFound = errors.New("job not found in error queue") ErrWorkerNotRunning = errors.New("worker pool is not running") ErrQueueTimeout = errors.New("timeout waiting for queue to accept submission") ErrRetrySubmissionFailed = errors.New("failed to resubmit job for retry") ErrInvalidQuery = errors.New("query is invalid") ErrNilPreparedStatement = errors.New("internal error: prepared statement is nil before execution") ErrNilPreparedStatementCache = errors.New("internal error: global prepared statement cache returned nil statement without error") ErrFailedToPrepareStatement = errors.New("failed to prepare statement") ErrPreparedCacheRequired = errors.New("prepared statement cache is required when using prepared queries") ErrCacheClosed = errors.New("statement cache is closed") ErrBatchContainsNonQuery = errors.New("batch jobs can only contain Query jobs, not Transaction or nested Batch jobs") ErrConnectionUnhealthy = errors.New("database connection is unhealthy") ErrBackupDestinationExists = errors.New("backup destination already exists") ErrBackupDriverUnsupported = errors.New("driver does not support backup API") ErrBackupFailed = errors.New("backup failed") ErrBackupInit = errors.New("failed to initialize backup") ErrBackupStep = errors.New("backup step failed") ErrBackupConnection = errors.New("failed to get connection for backup") ErrBackupInvalidMethod = errors.New("unknown backup method") )
Error constants
var DefaultOptions = Options{ WorkerQueueDepth: 1000, EnableReader: true, EnableWriter: true, BatchSize: 200, BatchTimeout: 1 * time.Second, InlineInserts: false, UseContexts: false, StmtCacheMaxSize: 1000, StmtCacheSampleRate: 100, StmtCacheSlowThreshold: 10 * time.Millisecond, ErrorQueueMaxSize: 1000, EnableAutoRetry: false, RetryInterval: 30 * time.Second, MaxRetries: 3, BaseRetryDelay: 30 * time.Second, JobTimeout: 30 * time.Second, TransactionTimeout: 30 * time.Second, RetrySubmitTimeout: 5 * time.Second, QueueSubmitTimeout: 5 * time.Minute, EnableMetrics: true, }
DefaultOptions provides a common starting point for configuration.
Functions ¶
func New ¶
New creates a new qwr Manager instance builder
Options are immutable after construction. They can only be set here during manager creation and cannot be modified at runtime. If no options are provided, DefaultOptions will be used.
To change options, you must stop the application, create a new manager with different options, and restart.
func NewSQL ¶
NewSQL creates a new qwr Manager instance builder using user-provided database connections. This allows you to bring your own SQLite driver (e.g., mattn/go-sqlite3 instead of modernc.org/sqlite).
Parameters:
- reader: Database connection for read operations (pass nil to disable reader)
- writer: Database connection for write operations (pass nil to disable writer)
- opts: Optional configuration options (variadic). If not provided, DefaultOptions will be used.
Important notes:
- Passing nil for reader/writer automatically disables that connection (sets EnableReader/EnableWriter to false)
- The Manager takes full ownership of the provided database connections
- Calling Manager.Close() will close these database connections
- You should not use these connections directly after passing them to NewSQL()
- Profiles will be applied to your connections (including SQLite PRAGMAs)
- If you provide a non-SQLite database, PRAGMA errors are your responsibility
- Use WithErrorLogPath() to enable persistent error logging to disk
Example with mattn/go-sqlite3:
import _ "github.com/mattn/go-sqlite3"
readerDB, _ := sql.Open("sqlite3", "mydb.db")
writerDB, _ := sql.Open("sqlite3", "mydb.db")
opts := qwr.Options{ErrorLogPath: "errors.db"} // Optional error logging
manager, err := qwr.NewSQL(readerDB, writerDB, opts).
Reader(profile.ReadBalanced()).
Writer(profile.WriteBalanced()).
Open()
func ReleaseQueryBuilder ¶
func ReleaseQueryBuilder(qb *QueryBuilder)
ReleaseQueryBuilder returns a QueryBuilder to the pool after cleaning it
Types ¶
type BatchCollector ¶
type BatchCollector struct {
// contains filtered or unexported fields
}
BatchCollector manages automatic batching of database jobs for asynchronous execution.
func NewBatchCollector ¶
func NewBatchCollector(ctx context.Context, ws *WriteSerialiser, options Options, dbPath string) *BatchCollector
NewBatchCollector creates a new batch collector with pre-allocated capacity and context.
func (*BatchCollector) Add ¶
func (bc *BatchCollector) Add(job Job)
Add adds a job to the current batch for eventual execution using the collector's context
func (*BatchCollector) Close ¶
func (bc *BatchCollector) Close()
Close flushes any pending batch and stops the timer
type BatchJob ¶
type BatchJob struct {
Queries []Job
// contains filtered or unexported fields
}
BatchJob represents a collection of database jobs to be executed as a batch
func (BatchJob) ExecuteWithContext ¶
ExecuteWithContext runs each job in the batch within a single transaction
type BatchResult ¶
type BatchResult struct {
Results []JobResult
// contains filtered or unexported fields
}
BatchResult represents the outcome of a batch execution
func (*BatchResult) Duration ¶
func (r *BatchResult) Duration() time.Duration
GetDuration returns how long the batch took to execute
func (*BatchResult) Error ¶
func (r *BatchResult) Error() error
GetError returns any error that occurred during execution
func (*BatchResult) ID ¶
func (r *BatchResult) ID() int64
GetID returns the ID of the batch that produced this result
type ErrorCategory ¶
type ErrorCategory int
ErrorCategory provides granular error classification for better handling
const ( // ErrorCategoryConnection indicates connection-related errors ErrorCategoryConnection ErrorCategory = iota // ErrorCategoryLock indicates database locking/concurrency errors ErrorCategoryLock // ErrorCategoryConstraint indicates constraint violation errors ErrorCategoryConstraint // ErrorCategorySchema indicates schema-related errors ErrorCategorySchema // ErrorCategoryResource indicates resource exhaustion errors ErrorCategoryResource // ErrorCategoryTimeout indicates timeout/deadline errors ErrorCategoryTimeout // ErrorCategoryPermission indicates access control errors ErrorCategoryPermission // ErrorCategoryInternal indicates internal QWR errors ErrorCategoryInternal // ErrorCategoryUnknown indicates unclassified errors ErrorCategoryUnknown )
func (ErrorCategory) String ¶
func (ec ErrorCategory) String() string
String returns the string representation of ErrorCategory
type ErrorQueue ¶
type ErrorQueue struct {
// contains filtered or unexported fields
}
ErrorQueue maintains a simple registry of failed async operations
func NewErrorQueue ¶
func NewErrorQueue(ws *WriteSerialiser, metrics *Metrics, opts Options, dbPath string) *ErrorQueue
NewErrorQueue creates a new error queue
func (*ErrorQueue) Count ¶
func (eq *ErrorQueue) Count() int
Count returns the number of errors in the queue
func (*ErrorQueue) Get ¶
func (eq *ErrorQueue) Get(jobID int64) (JobError, bool)
Get retrieves a specific error by job ID
func (*ErrorQueue) GetAll ¶
func (eq *ErrorQueue) GetAll() []JobError
GetAll returns all errors in the queue in chronological order
func (*ErrorQueue) GetReadyForRetry ¶
func (eq *ErrorQueue) GetReadyForRetry(now time.Time, maxRetries int) []JobError
GetReadyForRetry returns errors that are ready for retry
func (*ErrorQueue) PersistError ¶
func (eq *ErrorQueue) PersistError(jobErr JobError, reason string) error
PersistError logs an error to the database
func (*ErrorQueue) Remove ¶
func (eq *ErrorQueue) Remove(jobID int64) bool
Remove removes an error from the queue
func (*ErrorQueue) Store ¶
func (eq *ErrorQueue) Store(jobErr JobError)
Store adds or overwrites an error in the queue, or immediately persists non-retriable errors
type ErrorQueueStats ¶
type ErrorQueueStats struct {
CurrentSize int
TotalErrors int64
RetriedErrors int64
DroppedErrors int64
PendingRetries int
}
ErrorQueueStats provides basic error queue metrics
type Job ¶
type Job struct {
Type JobType
Query Query
Transaction Transaction
BatchJob BatchJob
}
Job represents a database job that can be executed
func NewTransactionJob ¶
func NewTransactionJob(t Transaction) Job
NewTransactionJob creates a Job from a Transaction
func (Job) ExecuteWithContext ¶
ExecuteWithContext runs the job against the database with context
type JobError ¶
type JobError struct {
Query Query // The query that failed
// contains filtered or unexported fields
}
JobError represents an error that occurred during async job execution
func (*JobError) CalculateNextRetry ¶
CalculateNextRetry calculates when this job should be retried next Uses exponential backoff with jitter to prevent thundering herd
func (JobError) CreateRetryQuery ¶
CreateRetryQuery creates a new query for retry with incremented retry count
func (JobError) ShouldRetry ¶
ShouldRetry determines if this error should be retried based on error type and retry count
type JobResult ¶
type JobResult struct {
Type ResultType
QueryResult QueryResult
TransactionResult TransactionResult
BatchResult BatchResult
}
JobResult represents the outcome of a job execution
func NewBatchResult ¶
func NewBatchResult(br BatchResult) JobResult
NewBatchResult creates a JobResult from a BatchResult
func NewQueryResult ¶
func NewQueryResult(qr QueryResult) JobResult
NewQueryResult creates a JobResult from a QueryResult
func NewTransactionResult ¶
func NewTransactionResult(tr TransactionResult) JobResult
NewTransactionResult creates a JobResult from a TransactionResult
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles serialised database operations
func (*Manager) Backup ¶ added in v0.1.5
Backup creates a backup of the database to the specified destination path.
Available methods:
- backup.Default: Uses backup API if available, falls back to Vacuum
- backup.API: Uses SQLite's online backup API (less locking)
- backup.Vacuum: Uses VACUUM INTO (creates optimized copy)
The destination file must not already exist.
func (*Manager) ClearErrors ¶
func (m *Manager) ClearErrors()
ClearErrors removes all errors from the queue
func (*Manager) GetCacheMetrics ¶
func (m *Manager) GetCacheMetrics() map[string]StmtCacheStats
GetCacheMetrics returns the current statement cache metrics
func (*Manager) GetDetailedCacheMetrics ¶
GetDetailedCacheMetrics returns enhanced cache metrics with detailed information
func (*Manager) GetErrorByID ¶
GetErrorByID retrieves a specific error from the queue
func (*Manager) GetErrorQueueStats ¶
func (m *Manager) GetErrorQueueStats() ErrorQueueStats
GetErrorQueueStats returns basic error queue statistics
func (*Manager) GetJobStatus ¶
GetJobStatus checks if a job failed by looking in the error queue
func (*Manager) GetMetrics ¶
func (m *Manager) GetMetrics() MetricsSnapshot
GetMetrics returns the current write performance metrics
func (*Manager) GetReaderProfile ¶
GetReaderProfile returns the current reader profile
func (*Manager) GetWriterProfile ¶
GetWriterProfile returns the current writer profile
func (*Manager) Query ¶
func (m *Manager) Query(sql string, args ...any) *QueryBuilder
Query creates a new query with the given SQL and arguments
func (*Manager) RemoveError ¶
RemoveError removes a specific error from the queue
func (*Manager) ResetCacheDetailedMetrics ¶
func (m *Manager) ResetCacheDetailedMetrics()
ResetCacheDetailedMetrics resets only the detailed cache metrics
func (*Manager) ResetCaches ¶
func (m *Manager) ResetCaches()
ResetCaches clears all cached prepared statements, freeing memory. The cache remains usable - new statements will be prepared on demand.
func (*Manager) ResetMetrics ¶
func (m *Manager) ResetMetrics()
ResetMetrics resets the write performance metrics
func (*Manager) RunCheckpoint ¶ added in v0.1.5
func (m *Manager) RunCheckpoint(mode checkpoint.Mode) error
RunCheckpoint triggers a WAL checkpoint
func (*Manager) RunIncrementalVacuum ¶
RunIncrementalVacuum performs incremental vacuum if enabled
func (*Manager) SetSecureDelete ¶
SetSecureDelete enables or disables secure_delete
func (*Manager) Transaction ¶
func (m *Manager) Transaction(capacity ...int) *Transaction
Transaction creates a new transaction
type Metrics ¶
type Metrics struct {
// Enhanced cache metrics with detailed tracking
ReaderStmtMetrics StmtCacheMetrics
WriterStmtMetrics StmtCacheMetrics
// contains filtered or unexported fields
}
Metrics tracks performance of the qwr system
func NewMetrics ¶
func NewMetrics() *Metrics
func (*Metrics) Snapshot ¶
func (m *Metrics) Snapshot() MetricsSnapshot
type MetricsSnapshot ¶
type MetricsSnapshot struct {
JobsProcessed int64
JobsFailed int64
QueriesProcessed int64
TransactionsProcessed int64
BatchesProcessed int64
DirectWritesProcessed int64
DirectWritesFailed int64
CurrentQueueLen int32
ProcessingRate float64 // jobs/second
ErrorRate float64 // percentage
AvgQueueWaitTime time.Duration
AvgProcessingTime time.Duration
AvgDirectWriteTime time.Duration
Uptime time.Duration
// Error queue stats
TotalErrors int64
RetriedErrors int64
DroppedErrors int64
// Cache stats (non-atomic snapshots)
ReaderStmtStats StmtCacheStats
WriterStmtStats StmtCacheStats
}
MetricsSnapshot provides a point-in-time view
type Options ¶
type Options struct {
// WorkerQueueDepth sets the buffer size for the write queue.
// Higher values allow more queries to be buffered but use more memory.
// Default: 50000. Typical range: 1000-100000 depending on throughput needs.
WorkerQueueDepth int
// EnableReader determines if the reader database connection pool is created and used.
// Disable for write-only applications to save resources.
// Default: true
EnableReader bool
// EnableWriter determines if the writer database connection (and worker) is created and used.
// Disable for read-only applications to save resources.
// Default: true
EnableWriter bool
// BatchSize is the number of queries to collect before automatically executing the batch.
// Larger batches improve throughput but increase latency and memory usage.
// Default: 200. Typical range: 50-1000 depending on query size and latency requirements.
BatchSize int
// BatchTimeout is the maximum time to wait before executing a partial batch.
// Ensures timely execution even when BatchSize isn't reached.
// Default: 1 second. Typical range: 100ms-5s depending on latency requirements.
BatchTimeout time.Duration
// InlineInserts enables experimental combining of similar INSERT statements.
// Combines multiple INSERT INTO table VALUES (...) into single multi-value INSERT.
// Improves performance for bulk inserts but requires identical SQL structure.
//
// EXPERIMENTAL: This feature uses simple string parsing which may produce
// incorrect results for complex queries (e.g., INSERT...SELECT, or VALUES
// containing parentheses in string literals). Use only with simple INSERT
// statements where you control the SQL structure.
//
// Default: false (disabled)
InlineInserts bool
// UseContexts determines whether to use context-based methods by default.
// When false, operations use non-context methods unless WithContext() is called.
// Default: false (contexts are opt-in for better performance)
UseContexts bool
// StmtCacheMaxSize is the maximum number of prepared statements to cache.
// When the cache is full, least recently used statements are evicted.
// Default: 1000. Set higher for applications with many unique queries.
StmtCacheMaxSize int
// StmtCacheSampleRate controls detailed metrics sampling (1 in N operations).
// Higher values reduce metrics overhead but provide less detailed monitoring.
// Default: 100 (sample every 100th operation). Set to 1 for full sampling.
StmtCacheSampleRate int64
// StmtCacheSlowThreshold is the threshold for recording slow query preparation.
// Preparations taking longer than this are logged for performance analysis.
// Default: 10ms. Typical range: 1ms-100ms depending on performance requirements.
StmtCacheSlowThreshold time.Duration
// ErrorQueueMaxSize is the maximum number of errors to retain in memory.
// When full, oldest errors are persisted to disk and removed from memory.
// Default: 1000. Higher values provide more error history but use more memory.
ErrorQueueMaxSize int
// EnableAutoRetry determines whether to automatically retry failed async operations.
// When enabled, retriable errors (database locks, timeouts) are retried automatically.
// Default: false (manual retry control)
EnableAutoRetry bool
// RetryInterval is how often to check for jobs ready to retry.
// Shorter intervals provide faster retry response but use more CPU.
// Default: 30 seconds. Typical range: 10s-5m depending on urgency needs.
RetryInterval time.Duration
// MaxRetries is the maximum number of retry attempts for a failed job.
// After exceeding this limit, jobs are marked as permanently failed.
// Default: 3. Typical range: 1-10 depending on reliability requirements.
MaxRetries int
// BaseRetryDelay is the base delay for exponential backoff retry logic.
// Actual delay is BaseRetryDelay * 2^(attempt-1) with jitter.
// Default: 30 seconds. Longer delays reduce database pressure during issues.
BaseRetryDelay time.Duration
// JobTimeout is the default timeout for individual job execution.
// Applied to queries, prepared statement execution, and other single operations.
// Default: 30 seconds. Adjust based on expected query complexity and database performance.
JobTimeout time.Duration
// TransactionTimeout is the default timeout for transaction execution.
// Applied to multi-statement transactions including BEGIN, queries, and COMMIT.
// Default: 30 seconds. Should be longer than JobTimeout for complex transactions.
TransactionTimeout time.Duration
// RetrySubmitTimeout is the timeout for submitting retry jobs to the worker queue.
// Prevents retry logic from hanging when the queue is full or worker is stopped.
// Default: 5 seconds. Should be shorter than RetryInterval.
RetrySubmitTimeout time.Duration
// QueueSubmitTimeout is the timeout for context-free submissions to wait for queue space.
// Only applies to SubmitWaitNoContext/SubmitNoWaitNoContext methods.
// Prevents deadlock when queue is full by failing after this timeout.
// Default: 5 minutes. Should be long enough to allow queue to drain during high load.
QueueSubmitTimeout time.Duration
// EnableMetrics determines whether to collect performance and operational metrics.
// When disabled, all metrics collection is skipped for better performance.
// Default: true (metrics collection enabled)
EnableMetrics bool
// UsePreparedStatements makes all queries use prepared statements by default.
// Individual queries can still override this with the Prepared() method.
// Prepared statements are cached and reduce parsing overhead for repeated queries.
// Default: false
UsePreparedStatements bool
// ErrorLogPath is the path for persistent error logging database.
// If empty, persistent error logging is disabled (in-memory error queue still works).
// Default: "" (persistent logging disabled)
ErrorLogPath string
}
Options holds configuration for the qwr manager's internal behavior.
IMPORTANT: Options are immutable after manager startup. They can only be set during manager construction via New() and cannot be modified at runtime. To change options, you must stop the application, modify the options, and restart.
func (*Options) SetDefaults ¶
func (o *Options) SetDefaults()
SetDefaults applies default values if not set.
type QWRError ¶
type QWRError struct {
// Original error from the underlying operation
Original error
// Category of the error for granular handling
Category ErrorCategory
// RetryStrategy for this specific error
Strategy RetryStrategy
// Context provides additional information about the error
Context map[string]any
// Timestamp when the error occurred
Timestamp time.Time
// Operation that caused the error (query, transaction, etc.)
Operation string
}
QWRError provides structured error information with enhanced classification
func ClassifyError ¶
ClassifyError provides enhanced error classification with detailed categorization
func NewQWRError ¶
func NewQWRError(original error, category ErrorCategory, strategy RetryStrategy, operation string) *QWRError
NewQWRError creates a new structured QWR error
func (*QWRError) IsRetriable ¶
IsRetriable returns true if the error should be retried
type Query ¶
Query represents a database query operation
func (Query) ExecuteWithContext ¶
ExecuteWithContext runs the query against the database with context
type QueryBuilder ¶
type QueryBuilder struct {
// contains filtered or unexported fields
}
QueryBuilder provides a fluent API for building and executing queries
func GetQueryBuilder ¶
func GetQueryBuilder() *QueryBuilder
GetQueryBuilder gets a pre-allocated QueryBuilder object from pool
func (*QueryBuilder) Async ¶
func (qb *QueryBuilder) Async() (int64, error)
Async submits the query to the worker pool for background execution. Returns immediately with a job ID that can be used to check for errors later. Failed async queries are automatically added to the error queue for inspection or retry.
func (*QueryBuilder) Batch ¶
func (qb *QueryBuilder) Batch() (int64, error)
Batch adds the query to a batch for deferred execution
IMPORTANT: Batch operations use the manager's internal context, NOT the context set via WithContext(). This means:
- Query-level contexts (from WithContext()) are ignored for batch operations
- All queries in a batch share the same manager-level context
- Timeouts and cancellation apply to the entire batch, not individual queries
If you need query-specific context control, use Execute() or Async() instead.
func (*QueryBuilder) Execute ¶
func (qb *QueryBuilder) Execute() (*QueryResult, error)
Execute submits the query to the worker pool and waits for completion. Provides queued execution with immediate error feedback. Query will be serialised with other operations but the caller will block until completion.
func (*QueryBuilder) GenID ¶
func (qb *QueryBuilder) GenID() *QueryBuilder
GenID generates a unique ID for this query
func (*QueryBuilder) Prepared ¶
func (qb *QueryBuilder) Prepared() *QueryBuilder
Prepared marks the query to use a prepared statement for execution. Prepared statements are cached and reused, reducing parsing overhead for repeated queries. Most beneficial for queries executed multiple times with different parameters.
func (*QueryBuilder) Read ¶
func (qb *QueryBuilder) Read() (*sql.Rows, error)
Read executes a read operation on the reader connection pool and returns multiple rows. Uses concurrent reader connections for better read performance. Remember to call rows.Close() when finished to prevent connection leaks.
func (*QueryBuilder) ReadClose ¶ added in v0.1.4
func (qb *QueryBuilder) ReadClose(fn func(*sql.Rows) error) error
ReadClose executes a read operation and automatically closes the rows when done. The provided function receives the rows and should iterate/scan them as needed. Rows are automatically closed after the function returns, preventing resource leaks.
This is a convenience method that eliminates the need to manually defer rows.Close(), making it safer and cleaner for typical read operations.
Example:
var users []User
err := mgr.Query("SELECT id, name FROM users").ReadClose(func(rows *sql.Rows) error {
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name); err != nil {
return err
}
users = append(users, u)
}
return nil
})
Returns any error from the query execution, the callback function, or row iteration.
func (*QueryBuilder) ReadRow ¶
func (qb *QueryBuilder) ReadRow() (*sql.Row, error)
ReadRow executes a read operation on the reader connection pool and returns a single row. Convenient for queries expected to return exactly one row. Use row.Scan() to extract values. sql.ErrNoRows is returned when no rows match the query.
func (*QueryBuilder) Release ¶
func (qb *QueryBuilder) Release()
Release manually returns the QueryBuilder to the object pool for reuse. Only call this if you don't execute the query (Write/Async/Execute/Read/ReadRow). All execution methods automatically release the QueryBuilder when complete.
func (*QueryBuilder) WithContext ¶
func (qb *QueryBuilder) WithContext(ctx context.Context) *QueryBuilder
WithContext adds a context to the query and enables context usage for this specific query. The context will be used for timeouts, cancellation, and deadlines during query execution. Note: Batch operations ignore query-level contexts and use the manager's internal context.
func (*QueryBuilder) Write ¶
func (qb *QueryBuilder) Write() (*QueryResult, error)
Write executes the query directly on the writer connection, bypassing the worker queue. This provides immediate execution and error feedback but may block the caller. Returns a QueryResult containing SQL result, error, duration, and query ID.
type QueryResult ¶
QueryResult represents the outcome of a query execution
func (*QueryResult) Duration ¶
func (r *QueryResult) Duration() time.Duration
GetDuration returns how long the query took to execute
func (*QueryResult) Error ¶
func (r *QueryResult) Error() error
GetError returns any error that occurred during execution
func (*QueryResult) ID ¶
func (r *QueryResult) ID() int64
GetID returns the ID of the query that produced this result
type ResultType ¶
type ResultType int
const ( ResultTypeQuery ResultType = iota ResultTypeTransaction ResultTypeBatch )
type RetryStrategy ¶
type RetryStrategy int
RetryStrategy defines how errors should be retried
const ( // RetryStrategyNone indicates no retry should be attempted RetryStrategyNone RetryStrategy = iota // RetryStrategyImmediate indicates immediate retry with no delay RetryStrategyImmediate // RetryStrategyExponential indicates exponential backoff retry RetryStrategyExponential // RetryStrategyLinear indicates linear backoff retry RetryStrategyLinear )
func (RetryStrategy) String ¶
func (rs RetryStrategy) String() string
String returns the string representation of RetryStrategy
type SlowQuery ¶
type SlowQuery struct {
Query string // The SQL query that was slow to prepare
Duration time.Duration // How long the preparation took
When time.Time // When the slow preparation occurred
}
SlowQuery represents a slow statement preparation for metrics tracking
type StmtCache ¶
type StmtCache struct {
// contains filtered or unexported fields
}
StmtCache is a high-performance prepared statement cache using ristretto for optimal concurrent access with LRU eviction. Stores SQL prepared statements keyed by their query string.
func NewStmtCache ¶
func NewStmtCache(metrics *StmtCacheMetrics, options Options) (*StmtCache, error)
NewStmtCache creates a new prepared statement cache with LRU eviction. maxSize controls the maximum number of statements to cache (0 = default 1000).
func (*StmtCache) Clear ¶ added in v0.1.6
func (c *StmtCache) Clear()
Clear closes all cached statements and clears the cache. The cache remains usable after Clear() - new statements will be prepared on demand.
type StmtCacheMetrics ¶
type StmtCacheMetrics struct {
Size atomic.Int32 // Current cache size
MaxSize int64 // Maximum cache size
Hits atomic.Int64 // Cache hits
Misses atomic.Int64 // Cache misses
Evictions atomic.Int64 // Number of evictions
Collisions atomic.Int64 // Hash collisions (legacy, may not be used)
// contains filtered or unexported fields
}
StmtCacheMetrics holds enhanced atomic counters for thread-safe cache metrics
func NewStmtCacheMetrics ¶
func NewStmtCacheMetrics() *StmtCacheMetrics
NewStmtCacheMetrics creates initialized statement cache metrics
func (*StmtCacheMetrics) Stats ¶
func (s *StmtCacheMetrics) Stats() StmtCacheStats
Stats method converts atomic metrics to snapshot stats
type StmtCacheStats ¶
type StmtCacheStats struct {
Size int
MaxSize int64
Hits int64
Misses int64
Evictions int64
Collisions int64
HitRatio float64
// Enhanced stats
AvgPrepTimeMs float64
PrepErrors int64
SlowQueriesCount int
UptimeSeconds float64
TopQueries map[string]int64
RecentSlowQueries []SlowQuery
}
StmtCacheStats provides a snapshot of cache statistics (non-atomic)
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction represents multiple SQL statements to execute in a transaction
func (*Transaction) Add ¶
func (t *Transaction) Add(sql string, args ...any) *Transaction
Add adds a query to the transaction
func (*Transaction) AddPrepared ¶
func (t *Transaction) AddPrepared(sql string, args ...any) *Transaction
AddPrepared adds a prepared query to the transaction
func (*Transaction) Exec ¶
func (t *Transaction) Exec() (*TransactionResult, error)
Exec runs the transaction through the worker pool
func (*Transaction) ExecuteWithContext ¶
ExecuteWithContext implements JobID interface
func (*Transaction) WithContext ¶
func (t *Transaction) WithContext(ctx context.Context) *Transaction
WithContext adds context to the transaction
func (*Transaction) Write ¶
func (t *Transaction) Write() (*TransactionResult, error)
Write executes the transaction directly
type TransactionResult ¶
type TransactionResult struct {
Results []*QueryResult
// contains filtered or unexported fields
}
TransactionResult represents the outcome of a transaction execution
func (*TransactionResult) Duration ¶
func (r *TransactionResult) Duration() time.Duration
GetDuration returns how long the transaction took to execute
func (*TransactionResult) Error ¶
func (r *TransactionResult) Error() error
GetError returns any error that occurred during execution
func (*TransactionResult) ID ¶
func (r *TransactionResult) ID() int64
GetID returns the ID of the transaction that produced this result
type WriteSerialiser ¶
type WriteSerialiser struct {
// contains filtered or unexported fields
}
WriteSerialiser manages a single worker that processes database jobs
func NewWorkerPool ¶
func NewWorkerPool(db *sql.DB, queueDepth int, metrics *Metrics, writeCache *StmtCache, options Options) *WriteSerialiser
NewWorkerPool creates a new worker pool for database jobs
func (*WriteSerialiser) SetAsyncErrorHandler ¶
func (wp *WriteSerialiser) SetAsyncErrorHandler(handler func(Query, error, time.Duration))
SetAsyncErrorHandler sets the callback for handling async errors
func (*WriteSerialiser) Start ¶
func (wp *WriteSerialiser) Start(ctx context.Context)
Start begins the worker processing loop
func (*WriteSerialiser) Stop ¶
func (wp *WriteSerialiser) Stop() error
Stop shuts down the worker pool
func (*WriteSerialiser) SubmitNoWait ¶
SubmitNoWait submits a job to the queue without waiting
func (*WriteSerialiser) SubmitNoWaitNoContext ¶
func (wp *WriteSerialiser) SubmitNoWaitNoContext(job Job) (int64, error)
SubmitNoWaitNoContext submits a job without using any context
func (*WriteSerialiser) SubmitWait ¶
SubmitWait submits a job to the queue and waits for its result
func (*WriteSerialiser) SubmitWaitNoContext ¶
func (wp *WriteSerialiser) SubmitWaitNoContext(job Job) (JobResult, error)
SubmitWaitNoContext submits a job without using any context
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backup defines backup methods for SQLite databases.
|
Package backup defines backup methods for SQLite databases. |
|
Package checkpoint defines WAL checkpoint modes for SQLite.
|
Package checkpoint defines WAL checkpoint modes for SQLite. |
|
Package profiles provides pre-configured database profiles for different workload types.
|
Package profiles provides pre-configured database profiles for different workload types. |