qwr

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: MIT Imports: 19 Imported by: 0

README

qwr - Query Write Reader

A Go library for SQLite that provides serialised writes and concurrent reads with optional context support. qwr uses a worker pool pattern with a single worker to squentally queue writes to SQLite. It supports a configurable reader with connections.

Quick Start

package main

import (
    "log"
    
    "github.com/caelisco/qwr"
    "github.com/caelisco/qwr/profile"
)

func main() {
    // Create manager with default options
    manager, err := qwr.New("test.db").
        Reader(profile.ReadBalanced()).
        Writer(profile.WriteBalanced()).
        Open()
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close()

    // Write bypassing queue
    result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").Write()

    // Synchronous write
    result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Jane").Execute()
    if err != nil {
        log.Fatal(err)
    }

    // Asynchronous write
    jobID, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Bob").Async()
    if err != nil {
        log.Fatal(err)
    }

    // Read data
    rows, err := manager.Query("SELECT * FROM users").Read()
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()
}
Write Operations

Direct Write Bypasses the worker queue for immediate execution.

result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").Write()

Synchronous Write Uses the worker pool to serialise the write, blocking until the write is complete.

result, err := manager.Query("INSERT INTO USERS (name) VALUES (?)", "Jane").Execute()

Async Write Asynchronous writes are non-blocking and are guarded by an error queue. The error queue attempts to retry transactions automatically with an exponential backoff + jitter. Certain errors are not recoverable.

jobID, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Bob").Async()

Note: Async operations that fail are automatically added to the error queue. Since there's no immediate error return, you must check the error queue to detect failures:

if jobErr, found := manager.GetErrorByID(jobID); found {
    log.Printf("Async job %d failed: %v", jobID, jobErr.Error())
}

Batch Write Batching creates a slice of queries for deferred processing. Writing occurs either at a timed interval, or once the queue depth reaches a pre-determined threshold. An experimental feature inlines common queries in the batch to optimise the write process.

// These will be automatically batched together
manager.Query("INSERT INTO users (name) VALUES (?)", "Charlie").Batch()
manager.Query("INSERT INTO users (name) VALUES (?)", "Diana").Batch()

Transactions Multi-statement atomic operations.

tx := manager.Transaction().
    Add("INSERT INTO users (name) VALUES (?)", "Eve").
    Add("UPDATE users SET active = ? WHERE name = ?", true, "Eve")

result, err := tx.Write() // or tx.Exec() for async
Read Operations

Read operations use the reader connection pool and can be executed concurrently:

Multiple Rows:

rows, err := manager.Query("SELECT * FROM users WHERE active = ?", true).Read()
if err != nil {
    log.Fatal(err)
}
defer rows.Close() // Must manually close

for rows.Next() {
    var user User
    if err := rows.Scan(&user.ID, &user.Name); err != nil {
        log.Fatal(err)
    }
    // process user...
}

Multiple Rows with Automatic Cleanup:

var users []User
err := manager.Query("SELECT * FROM users WHERE active = ?", true).ReadClose(func(rows *sql.Rows) error {
    for rows.Next() {
        var user User
        if err := rows.Scan(&user.ID, &user.Name); err != nil {
            return err
        }
        users = append(users, user)
    }
    return nil
})
// rows.Close() called automatically

Single Row:

row, err := manager.Query("SELECT name FROM users WHERE id = ?", 1).ReadRow()
if err != nil {
    log.Fatal(err)
}

var name string
if err := row.Scan(&name); err != nil {
    log.Fatal(err)
}
Prepared Statements

Prepared statements are cached in a sync.Map automatically when enabled. This reduces preparation overhead for repeated queries:

result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").
    Prepared().
    Write()
Context Support (Optional)

Contexts can be used for timeouts and cancellation but are not required. The library functions normally without any context usage:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Per-query context
result, err := manager.Query("SELECT * FROM users").
    WithContext(ctx).
    Read()

// Manager-level context (affects all operations)
manager, err := qwr.New("test.db").
    WithContext(ctx).
    Open()

Database Profiles

Database profiles configure connection pools and SQLite PRAGMA settings for different use cases. qwr includes several pre-configured profiles:

Read Profiles
  • profile.ReadLight() - Low resource usage (5 connections, 30MB cache)
  • profile.ReadBalanced() - General purpose (10 connections, 75MB cache)
  • profile.ReadHeavy() - High concurrency (25 connections, 150MB cache)
Write Profiles
  • profile.WriteLight() - Basic performance (50MB cache, 4KB pages)
  • profile.WriteBalanced() - Most applications (100MB cache, 8KB pages)
  • profile.WriteHeavy() - High volume (200MB cache, 8KB pages)
Custom Profiles
customProfile := profile.New().
    WithMaxOpenConns(15).
    WithCacheSize(-102400). // 100MB
    WithJournalMode(profile.JournalWal).
    WithSynchronous(profile.SyncNormal).
    WithPageSize(8192)

manager, err := qwr.New("test.db").
    Reader(customProfile).
    Writer(profile.WriteBalanced()).
    Open()

Logging

qwr uses slog.Default() for all internal logging. Configure your application's default logger using slog.SetDefault() to control qwr's log output, format, and level.

Configuration Options

The Options struct controls various aspects of qwr's behaviour. All options have sensible defaults:

options := qwr.Options{
    // Worker configuration
    WorkerQueueDepth:  50000,         // Queue buffer size
    EnableReader:      true,          // Enable read operations
    EnableWriter:      true,          // Enable write operations
    
    // Batching
    BatchSize:         200,           // Queries per batch
    BatchTimeout:      1*time.Second, // Max batch wait time
    InlineInserts:     false,         // Experimental: combine INSERT statements
    
    // Context behaviour  
    UseContexts:       false,         // Default context usage
    
    // Statement caching
    UsePreparedStatements: false,     // Use prepared statements for all queries by default
    StmtCacheSampleRate: 100,         // Metrics sampling rate (1 in N operations)
    StmtCacheSlowThreshold: 10*time.Millisecond, // Threshold for slow query logging

    // Error handling
    ErrorQueueMaxSize: 1000,          // Max errors in memory
    ErrorLogPath:      "",            // Set via WithErrorDB() with an empty string disabling it
    EnableAutoRetry:   false,         // Automatic retry
    MaxRetries:        3,             // Max retry attempts
    BaseRetryDelay:    30*time.Second,// Initial retry delay
    RetryInterval:     30*time.Second,// Retry check frequency
    
    // Timeouts
    JobTimeout:         30*time.Second, // Individual job timeout
    TransactionTimeout: 30*time.Second, // Transaction timeout
    RetrySubmitTimeout: 5*time.Second,  // Retry submission timeout
    QueueSubmitTimeout: 5*time.Minute,  // Timeout for context-free queue submissions

    // Monitoring
    EnableMetrics:      true,            // Enable metrics collection
}

Error Handling & Retry

For asynchronous processing of queries, qwr provides error classifications to optimise retry strategies.

Enhanced Error Classification

qwr classifies errors:

  • Connection Errors: File I/O issues, permission problems → Linear backoff retry
  • Lock Errors: Database busy, locked → Exponential backoff retry
  • Constraint Violations: Unique key, foreign key, NOT NULL → No retry (permanent failure)
  • Schema Errors: Missing tables/columns, syntax errors → No retry (permanent failure)
  • Resource Errors: Disk full, out of memory → Linear backoff retry
  • Timeout Errors: Context timeouts, deadlines → Linear backoff retry
  • Permission Errors: Access denied, read-only → No retry (permanent failure)
  • Internal Errors: qwr-specific errors → No retry
  • Unknown Errors: Unclassified → No retry (safe default)
Error Logging

Errors from async operations can be persisted to a separate SQLite database. This is disabled by default and must be explicitly enabled using WithErrorDB():

manager, err := qwr.New("test.db").
    WithErrorDB("errors.db").  // Enable persistent error logging
    Open()

When enabled, errors are logged with full context:

  • SQL statement and parameters (CBOR encoded)
  • Error type and message
  • Retry attempts and timestamps
  • Failure reason

If WithErrorDB() is not called, error logging is disabled. Using :memory: is rejected to prevent unbound memory growth.

Retry Configuration
options := qwr.Options{
    EnableAutoRetry:   true,
    MaxRetries:        3,
    BaseRetryDelay:    30 * time.Second,
    RetryInterval:     30 * time.Second,
}

manager, err := qwr.New("test.db", options).Open()
Error Queue Management
// Get all errors
errors := manager.GetErrors()

// Get specific error with enhanced information
if jobErr, found := manager.GetErrorByID(jobID); found {
    fmt.Printf("Job %d failed: %v\n", jobID, jobErr.Error())
    
    // Access structured error information
    if qwrErr := jobErr.errType; qwrErr != nil {
        fmt.Printf("Error category: %s\n", qwrErr.Category)
        fmt.Printf("Retry strategy: %s\n", qwrErr.Strategy) 
        fmt.Printf("Context: %+v\n", qwrErr.Context)
    }
}

// Manually retry a failed job
if err := manager.RetryJob(jobID); err != nil {
    switch {
    case errors.Is(err, qwr.ErrJobNotFound):
        fmt.Printf("Job %d not found in error queue\n", jobID)
    case errors.Is(err, qwr.ErrRetrySubmissionFailed):
        fmt.Printf("Failed to resubmit job %d\n", jobID)
    default:
        fmt.Printf("Retry error: %v\n", err)
    }
}

// Clear error queue
manager.ClearErrors()
Automatic Batching

The batch collector groups multiple operations together and executes them in a single transaction. This reduces the number of database round trips:

// Configure batching
options := qwr.Options{
    BatchSize:    200,                // Execute after 200 queries
    BatchTimeout: 1 * time.Second,    // Or after 1 second
    InlineInserts: true,              // Experimental: opt-in for simple INSERTs
}
Statement Caching

Prepared statements are stored in memory to avoid re-parsing SQL. The cache uses sync.Map and grows unbounded - there are no size limits enforced.

Inline INSERT Optimisation (Experimental)

Inline INSERT optimises statements with identical SQL are combined into a single multi-value INSERT:

// These individual statements:
INSERT INTO users (name) VALUES ('Alice')
INSERT INTO users (name) VALUES ('Bob')
INSERT INTO users (name) VALUES ('Charlie')

// Becomes:
INSERT INTO users (name) VALUES ('Alice'), ('Bob'), ('Charlie')

Monitoring & Metrics

qwr provides basic performance monitoring and operational visibility. Metrics collection can be disabled for performance.

Metrics Configuration
options := qwr.Options{
    EnableMetrics: true,  // Enable/disable metrics collection (default: true)
}
Performance Metrics
metrics := manager.GetMetrics()
fmt.Printf("Jobs processed: %d\n", metrics.JobsProcessed)
fmt.Printf("Processing rate: %.2f jobs/sec\n", metrics.ProcessingRate)
fmt.Printf("Error rate: %.2f%%\n", metrics.ErrorRate*100)
fmt.Printf("Queue length: %d\n", metrics.CurrentQueueLen)
Cache Metrics
cacheMetrics := manager.GetCacheMetrics()
for name, stats := range cacheMetrics {
    fmt.Printf("%s cache hit ratio: %.2f%%\n", name, stats.HitRatio*100)
}
Error Queue Stats
errorStats := manager.GetErrorQueueStats()
fmt.Printf("Pending retries: %d\n", errorStats.PendingRetries)
Wait for Queue to Drain
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := manager.WaitForIdle(ctx); err != nil {
    log.Printf("Timeout waiting for queue to drain: %v", err)
}
Manual Cache Management
// Clear statement cache (frees memory, cache rebuilds on demand)
manager.ResetCaches()

// Reset metrics only
manager.ResetMetrics()
Database Maintenance
// Full vacuum (rebuilds entire database)
err := manager.RunVacuum()

// Incremental vacuum (reclaims some space)
err := manager.RunIncrementalVacuum(1000) // 1000 pages

// Manual WAL checkpoint
err := manager.RunCheckpoint(checkpoint.Passive)

// Online backup (Default tries API first, falls back to Vacuum)
err := manager.Backup("/path/to/backup.db", backup.Default)

// Backup using SQLite backup API (less locking, better for large DBs)
err := manager.Backup("/path/to/backup.db", backup.API)

// Backup using VACUUM INTO (creates optimized/defragmented copy)
err := manager.Backup("/path/to/backup.db", backup.Vacuum)
Automatic WAL Checkpoint on Close

Configure automatic WAL checkpointing when the manager closes:

import "github.com/caelisco/qwr/checkpoint"

manager, err := qwr.New("test.db").
    Checkpoint(checkpoint.Truncate).
    Open()

Available checkpoint modes:

  • checkpoint.None - No checkpoint on close (default)
  • checkpoint.Passive - Non-blocking, best-effort checkpoint
  • checkpoint.Full - Wait for writers, checkpoint all frames
  • checkpoint.Restart - Full + restart WAL from beginning
  • checkpoint.Truncate - Restart + truncate WAL to zero bytes

Caveats & Design Decisions

Unbounded Statement Cache

The prepared statement cache uses sync.Map and grows without bounds - there is no size limit or eviction policy.

This is an intentional design decision to maximise performance. Most applications use a finite set of SQL queries, so the cache naturally reaches a stable size.

Applications that generate dynamic SQL with varying literals (e.g., embedding timestamps or IDs directly in SQL strings instead of using parameters) will cause unbounded memory growth.

  • Always use parameterised queries with ? placeholders
  • Avoid building SQL strings with dynamic values embedded
Error Queue Overflow Behaviour

When the error queue exceeds ErrorQueueMaxSize, the oldest errors are persisted to the error log database and then removed from the in-memory queue.

  1. In-memory queue fills to ErrorQueueMaxSize (default: 1,000 errors)
  2. New errors trigger overflow handling
  3. Oldest errors are written to the SQLite3 error log database
  4. Oldest errors are removed from memory to make space
  5. Error information is preserved on disk for later analysis

If the error log database write fails (e.g., disk full, permissions), the error data is permanently lost.

Using Custom SQLite Drivers

qwr uses modernc.org/sqlite by default, but you can bring your own SQLite driver using the NewSQL() constructor:

package main

import (
    "database/sql"
    "log"

    "github.com/caelisco/qwr"
    "github.com/caelisco/qwr/profile"
    _ "github.com/mattn/go-sqlite3" // Your preferred SQLite driver
)

func main() {
    // Open your own database connections
    readerDB, err := sql.Open("sqlite3", "test.db")
    if err != nil {
        log.Fatal(err)
    }

    writerDB, err := sql.Open("sqlite3", "test.db")
    if err != nil {
        log.Fatal(err)
    }

    // Pass connections to qwr and enable error logging
    manager, err := qwr.NewSQL(readerDB, writerDB).
        WithErrorDB("errors.db").  // Optional: enable persistent error logging
        Reader(profile.ReadBalanced()).
        Writer(profile.WriteBalanced()).
        Open()
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close() // This will also close readerDB and writerDB

    // Use manager normally...
}

The Manager takes full ownership of your database connections. Call Manager.Close() to close both reader and writer connections. Use WithErrorDB() to enable persistent error logging for async operations.

Profiles can still be applied to your connections (profiles are just specified SQLite PRAGMAS).

qwr was inspired by numerous articles that describe using SQLite3 in production systems.

Consider SQLite

Author: Wesley Aptekar-Cassels URL: https://blog.wesleyac.com/posts/consider-sqlite

Scaling SQLite to 4M QPS on a Single Server

Author: Expensify Engineering URL: https://use.expensify.com/blog/scaling-sqlite-to-4m-qps-on-a-single-server

Your Database is Your Prison — Here's How Expensify Broke Free

Author: First Round Review URL: https://review.firstround.com/your-database-is-your-prison-heres-how-expensify-broke-free/

How (and Why) to Run SQLite in Production

Author: Stephen Margheim URL: https://fractaledmind.github.io/2023/12/23/rubyconftw/

Gotchas with SQLite in Production

Author: Anže Pečar URL: https://blog.pecar.me/sqlite-prod

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrReaderDisabled            = errors.New("reader is disabled")
	ErrWriterDisabled            = errors.New("writer is disabled")
	ErrResultNotFound            = errors.New("result not found")
	ErrInvalidResult             = errors.New("invalid result type")
	ErrQueryTooLarge             = errors.New("query exceeds maximum allowed size")
	ErrStatementCacheFull        = errors.New("statement cache is full")
	ErrHashCollision             = errors.New("statement hash collision")
	ErrErrorQueueDisabled        = errors.New("error queue is disabled")
	ErrJobNotFound               = errors.New("job not found in error queue")
	ErrWorkerNotRunning          = errors.New("worker pool is not running")
	ErrQueueTimeout              = errors.New("timeout waiting for queue to accept submission")
	ErrRetrySubmissionFailed     = errors.New("failed to resubmit job for retry")
	ErrInvalidQuery              = errors.New("query is invalid")
	ErrNilPreparedStatement      = errors.New("internal error: prepared statement is nil before execution")
	ErrNilPreparedStatementCache = errors.New("internal error: global prepared statement cache returned nil statement without error")
	ErrFailedToPrepareStatement  = errors.New("failed to prepare statement")
	ErrPreparedCacheRequired     = errors.New("prepared statement cache is required when using prepared queries")
	ErrCacheClosed               = errors.New("statement cache is closed")
	ErrBatchContainsNonQuery     = errors.New("batch jobs can only contain Query jobs, not Transaction or nested Batch jobs")
	ErrConnectionUnhealthy       = errors.New("database connection is unhealthy")
	ErrBackupDestinationExists   = errors.New("backup destination already exists")
	ErrBackupDriverUnsupported   = errors.New("driver does not support backup API")
	ErrBackupFailed              = errors.New("backup failed")
	ErrBackupInit                = errors.New("failed to initialize backup")
	ErrBackupStep                = errors.New("backup step failed")
	ErrBackupConnection          = errors.New("failed to get connection for backup")
	ErrBackupInvalidMethod       = errors.New("unknown backup method")
)

Error constants

View Source
var DefaultOptions = Options{
	WorkerQueueDepth:       1000,
	EnableReader:           true,
	EnableWriter:           true,
	BatchSize:              200,
	BatchTimeout:           1 * time.Second,
	InlineInserts:          false,
	UseContexts:            false,
	StmtCacheMaxSize:       1000,
	StmtCacheSampleRate:    100,
	StmtCacheSlowThreshold: 10 * time.Millisecond,
	ErrorQueueMaxSize:      1000,
	EnableAutoRetry:        false,
	RetryInterval:          30 * time.Second,
	MaxRetries:             3,
	BaseRetryDelay:         30 * time.Second,
	JobTimeout:             30 * time.Second,
	TransactionTimeout:     30 * time.Second,
	RetrySubmitTimeout:     5 * time.Second,
	QueueSubmitTimeout:     5 * time.Minute,
	EnableMetrics:          true,
}

DefaultOptions provides a common starting point for configuration.

Functions

func New

func New(path string, opts ...Options) *qwrBuilder

New creates a new qwr Manager instance builder

Options are immutable after construction. They can only be set here during manager creation and cannot be modified at runtime. If no options are provided, DefaultOptions will be used.

To change options, you must stop the application, create a new manager with different options, and restart.

func NewSQL

func NewSQL(reader, writer *sql.DB, opts ...Options) *qwrBuilder

NewSQL creates a new qwr Manager instance builder using user-provided database connections. This allows you to bring your own SQLite driver (e.g., mattn/go-sqlite3 instead of modernc.org/sqlite).

Parameters:

  • reader: Database connection for read operations (pass nil to disable reader)
  • writer: Database connection for write operations (pass nil to disable writer)
  • opts: Optional configuration options (variadic). If not provided, DefaultOptions will be used.

Important notes:

  • Passing nil for reader/writer automatically disables that connection (sets EnableReader/EnableWriter to false)
  • The Manager takes full ownership of the provided database connections
  • Calling Manager.Close() will close these database connections
  • You should not use these connections directly after passing them to NewSQL()
  • Profiles will be applied to your connections (including SQLite PRAGMAs)
  • If you provide a non-SQLite database, PRAGMA errors are your responsibility
  • Use WithErrorLogPath() to enable persistent error logging to disk

Example with mattn/go-sqlite3:

import _ "github.com/mattn/go-sqlite3"

readerDB, _ := sql.Open("sqlite3", "mydb.db")
writerDB, _ := sql.Open("sqlite3", "mydb.db")
opts := qwr.Options{ErrorLogPath: "errors.db"} // Optional error logging
manager, err := qwr.NewSQL(readerDB, writerDB, opts).
    Reader(profile.ReadBalanced()).
    Writer(profile.WriteBalanced()).
    Open()

func ReleaseQueryBuilder

func ReleaseQueryBuilder(qb *QueryBuilder)

ReleaseQueryBuilder returns a QueryBuilder to the pool after cleaning it

Types

type BatchCollector

type BatchCollector struct {
	// contains filtered or unexported fields
}

BatchCollector manages automatic batching of database jobs for asynchronous execution.

func NewBatchCollector

func NewBatchCollector(ctx context.Context, ws *WriteSerialiser, options Options, dbPath string) *BatchCollector

NewBatchCollector creates a new batch collector with pre-allocated capacity and context.

func (*BatchCollector) Add

func (bc *BatchCollector) Add(job Job)

Add adds a job to the current batch for eventual execution using the collector's context

func (*BatchCollector) Close

func (bc *BatchCollector) Close()

Close flushes any pending batch and stops the timer

type BatchJob

type BatchJob struct {
	Queries []Job
	// contains filtered or unexported fields
}

BatchJob represents a collection of database jobs to be executed as a batch

func (BatchJob) ExecuteWithContext

func (b BatchJob) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs each job in the batch within a single transaction

func (BatchJob) ID

func (b BatchJob) ID() int64

ID returns the unique identifier for this batch

type BatchResult

type BatchResult struct {
	Results []JobResult
	// contains filtered or unexported fields
}

BatchResult represents the outcome of a batch execution

func (*BatchResult) Duration

func (r *BatchResult) Duration() time.Duration

GetDuration returns how long the batch took to execute

func (*BatchResult) Error

func (r *BatchResult) Error() error

GetError returns any error that occurred during execution

func (*BatchResult) ID

func (r *BatchResult) ID() int64

GetID returns the ID of the batch that produced this result

type ErrorCategory

type ErrorCategory int

ErrorCategory provides granular error classification for better handling

const (
	// ErrorCategoryConnection indicates connection-related errors
	ErrorCategoryConnection ErrorCategory = iota
	// ErrorCategoryLock indicates database locking/concurrency errors
	ErrorCategoryLock
	// ErrorCategoryConstraint indicates constraint violation errors
	ErrorCategoryConstraint
	// ErrorCategorySchema indicates schema-related errors
	ErrorCategorySchema
	// ErrorCategoryResource indicates resource exhaustion errors
	ErrorCategoryResource
	// ErrorCategoryTimeout indicates timeout/deadline errors
	ErrorCategoryTimeout
	// ErrorCategoryPermission indicates access control errors
	ErrorCategoryPermission
	// ErrorCategoryInternal indicates internal QWR errors
	ErrorCategoryInternal
	// ErrorCategoryUnknown indicates unclassified errors
	ErrorCategoryUnknown
)

func (ErrorCategory) String

func (ec ErrorCategory) String() string

String returns the string representation of ErrorCategory

type ErrorQueue

type ErrorQueue struct {
	// contains filtered or unexported fields
}

ErrorQueue maintains a simple registry of failed async operations

func NewErrorQueue

func NewErrorQueue(ws *WriteSerialiser, metrics *Metrics, opts Options, dbPath string) *ErrorQueue

NewErrorQueue creates a new error queue

func (*ErrorQueue) Clear

func (eq *ErrorQueue) Clear()

Clear removes all errors from the queue

func (*ErrorQueue) Close

func (eq *ErrorQueue) Close()

Close shuts down the error queue

func (*ErrorQueue) Count

func (eq *ErrorQueue) Count() int

Count returns the number of errors in the queue

func (*ErrorQueue) Get

func (eq *ErrorQueue) Get(jobID int64) (JobError, bool)

Get retrieves a specific error by job ID

func (*ErrorQueue) GetAll

func (eq *ErrorQueue) GetAll() []JobError

GetAll returns all errors in the queue in chronological order

func (*ErrorQueue) GetReadyForRetry

func (eq *ErrorQueue) GetReadyForRetry(now time.Time, maxRetries int) []JobError

GetReadyForRetry returns errors that are ready for retry

func (*ErrorQueue) PersistError

func (eq *ErrorQueue) PersistError(jobErr JobError, reason string) error

PersistError logs an error to the database

func (*ErrorQueue) Remove

func (eq *ErrorQueue) Remove(jobID int64) bool

Remove removes an error from the queue

func (*ErrorQueue) Store

func (eq *ErrorQueue) Store(jobErr JobError)

Store adds or overwrites an error in the queue, or immediately persists non-retriable errors

type ErrorQueueStats

type ErrorQueueStats struct {
	CurrentSize    int
	TotalErrors    int64
	RetriedErrors  int64
	DroppedErrors  int64
	PendingRetries int
}

ErrorQueueStats provides basic error queue metrics

type Job

type Job struct {
	Type        JobType
	Query       Query
	Transaction Transaction
	BatchJob    BatchJob
}

Job represents a database job that can be executed

func NewBatchJobJob

func NewBatchJobJob(b BatchJob) Job

NewBatchJob creates a Job from a BatchJob

func NewQueryJob

func NewQueryJob(q Query) Job

NewQueryJob creates a Job from a Query

func NewTransactionJob

func NewTransactionJob(t Transaction) Job

NewTransactionJob creates a Job from a Transaction

func (Job) ExecuteWithContext

func (j Job) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs the job against the database with context

func (Job) ID

func (j Job) ID() int64

ID returns the unique identifier for this job

type JobError

type JobError struct {
	Query Query // The query that failed
	// contains filtered or unexported fields
}

JobError represents an error that occurred during async job execution

func (JobError) Age

func (je JobError) Age() time.Duration

Age returns how long ago the error occurred

func (*JobError) CalculateNextRetry

func (je *JobError) CalculateNextRetry(baseDelay time.Duration)

CalculateNextRetry calculates when this job should be retried next Uses exponential backoff with jitter to prevent thundering herd

func (JobError) CreateRetryQuery

func (je JobError) CreateRetryQuery() Query

CreateRetryQuery creates a new query for retry with incremented retry count

func (JobError) ID

func (je JobError) ID() int64

func (JobError) SQL

func (je JobError) SQL() (string, []any)

GetSQL returns the SQL statement and arguments

func (JobError) ShouldRetry

func (je JobError) ShouldRetry(maxRetries int) bool

ShouldRetry determines if this error should be retried based on error type and retry count

func (JobError) String

func (je JobError) String() string

String returns a string representation of the error

type JobResult

type JobResult struct {
	Type              ResultType
	QueryResult       QueryResult
	TransactionResult TransactionResult
	BatchResult       BatchResult
}

JobResult represents the outcome of a job execution

func NewBatchResult

func NewBatchResult(br BatchResult) JobResult

NewBatchResult creates a JobResult from a BatchResult

func NewQueryResult

func NewQueryResult(qr QueryResult) JobResult

NewQueryResult creates a JobResult from a QueryResult

func NewTransactionResult

func NewTransactionResult(tr TransactionResult) JobResult

NewTransactionResult creates a JobResult from a TransactionResult

func (JobResult) Duration

func (jr JobResult) Duration() time.Duration

Duration returns how long the job took to execute

func (JobResult) Error

func (jr JobResult) Error() error

Error returns any error that occurred during execution

func (JobResult) ID

func (jr JobResult) ID() int64

ID returns the ID of the job that produced this result

type JobType

type JobType int
const (
	JobTypeQuery JobType = iota
	JobTypeTransaction
	JobTypeBatch
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles serialised database operations

func (*Manager) Backup added in v0.1.5

func (m *Manager) Backup(dest string, method backup.Method) error

Backup creates a backup of the database to the specified destination path.

Available methods:

  • backup.Default: Uses backup API if available, falls back to Vacuum
  • backup.API: Uses SQLite's online backup API (less locking)
  • backup.Vacuum: Uses VACUUM INTO (creates optimized copy)

The destination file must not already exist.

func (*Manager) Batch

func (m *Manager) Batch(job Job) error

Batch adds a job to be executed as part of a batch

func (*Manager) ClearErrors

func (m *Manager) ClearErrors()

ClearErrors removes all errors from the queue

func (*Manager) Close

func (m *Manager) Close() error

Close closes all database connections and stops the worker pool

func (*Manager) Database

func (m *Manager) Database() string

Database extracts the database filename for logging context

func (*Manager) GetCacheMetrics

func (m *Manager) GetCacheMetrics() map[string]StmtCacheStats

GetCacheMetrics returns the current statement cache metrics

func (*Manager) GetDetailedCacheMetrics

func (m *Manager) GetDetailedCacheMetrics() map[string]interface{}

GetDetailedCacheMetrics returns enhanced cache metrics with detailed information

func (*Manager) GetErrorByID

func (m *Manager) GetErrorByID(jobID int64) (JobError, bool)

GetErrorByID retrieves a specific error from the queue

func (*Manager) GetErrorQueueStats

func (m *Manager) GetErrorQueueStats() ErrorQueueStats

GetErrorQueueStats returns basic error queue statistics

func (*Manager) GetErrors

func (m *Manager) GetErrors() []JobError

GetErrors returns all errors in the error queue

func (*Manager) GetJobStatus

func (m *Manager) GetJobStatus(jobID int64) (string, error)

GetJobStatus checks if a job failed by looking in the error queue

func (*Manager) GetMetrics

func (m *Manager) GetMetrics() MetricsSnapshot

GetMetrics returns the current write performance metrics

func (*Manager) GetReaderProfile

func (m *Manager) GetReaderProfile() *profile.Profile

GetReaderProfile returns the current reader profile

func (*Manager) GetWriterProfile

func (m *Manager) GetWriterProfile() *profile.Profile

GetWriterProfile returns the current writer profile

func (*Manager) Query

func (m *Manager) Query(sql string, args ...any) *QueryBuilder

Query creates a new query with the given SQL and arguments

func (*Manager) RemoveError

func (m *Manager) RemoveError(jobID int64) bool

RemoveError removes a specific error from the queue

func (*Manager) ResetCacheDetailedMetrics

func (m *Manager) ResetCacheDetailedMetrics()

ResetCacheDetailedMetrics resets only the detailed cache metrics

func (*Manager) ResetCaches

func (m *Manager) ResetCaches()

ResetCaches clears all cached prepared statements, freeing memory. The cache remains usable - new statements will be prepared on demand.

func (*Manager) ResetMetrics

func (m *Manager) ResetMetrics()

ResetMetrics resets the write performance metrics

func (*Manager) RetryJob

func (m *Manager) RetryJob(jobID int64) error

RetryJob manually retries a failed job by its ID

func (*Manager) RunCheckpoint added in v0.1.5

func (m *Manager) RunCheckpoint(mode checkpoint.Mode) error

RunCheckpoint triggers a WAL checkpoint

func (*Manager) RunIncrementalVacuum

func (m *Manager) RunIncrementalVacuum(pages int) error

RunIncrementalVacuum performs incremental vacuum if enabled

func (*Manager) RunVacuum

func (m *Manager) RunVacuum() error

RunVacuum performs a VACUUM operation (full database rebuild)

func (*Manager) SetSecureDelete

func (m *Manager) SetSecureDelete(enabled bool) error

SetSecureDelete enables or disables secure_delete

func (*Manager) Transaction

func (m *Manager) Transaction(capacity ...int) *Transaction

Transaction creates a new transaction

func (*Manager) WaitForIdle

func (m *Manager) WaitForIdle(ctx context.Context) error

WaitForIdle waits until all operations are processed

type Metrics

type Metrics struct {

	// Enhanced cache metrics with detailed tracking
	ReaderStmtMetrics StmtCacheMetrics
	WriterStmtMetrics StmtCacheMetrics
	// contains filtered or unexported fields
}

Metrics tracks performance of the qwr system

func NewMetrics

func NewMetrics() *Metrics

func (*Metrics) Reset

func (m *Metrics) Reset()

func (*Metrics) Snapshot

func (m *Metrics) Snapshot() MetricsSnapshot

type MetricsSnapshot

type MetricsSnapshot struct {
	JobsProcessed         int64
	JobsFailed            int64
	QueriesProcessed      int64
	TransactionsProcessed int64
	BatchesProcessed      int64
	DirectWritesProcessed int64
	DirectWritesFailed    int64
	CurrentQueueLen       int32
	ProcessingRate        float64 // jobs/second
	ErrorRate             float64 // percentage
	AvgQueueWaitTime      time.Duration
	AvgProcessingTime     time.Duration
	AvgDirectWriteTime    time.Duration
	Uptime                time.Duration

	// Error queue stats
	TotalErrors   int64
	RetriedErrors int64
	DroppedErrors int64

	// Cache stats (non-atomic snapshots)
	ReaderStmtStats StmtCacheStats
	WriterStmtStats StmtCacheStats
}

MetricsSnapshot provides a point-in-time view

type Options

type Options struct {
	// WorkerQueueDepth sets the buffer size for the write queue.
	// Higher values allow more queries to be buffered but use more memory.
	// Default: 50000. Typical range: 1000-100000 depending on throughput needs.
	WorkerQueueDepth int

	// EnableReader determines if the reader database connection pool is created and used.
	// Disable for write-only applications to save resources.
	// Default: true
	EnableReader bool

	// EnableWriter determines if the writer database connection (and worker) is created and used.
	// Disable for read-only applications to save resources.
	// Default: true
	EnableWriter bool

	// BatchSize is the number of queries to collect before automatically executing the batch.
	// Larger batches improve throughput but increase latency and memory usage.
	// Default: 200. Typical range: 50-1000 depending on query size and latency requirements.
	BatchSize int

	// BatchTimeout is the maximum time to wait before executing a partial batch.
	// Ensures timely execution even when BatchSize isn't reached.
	// Default: 1 second. Typical range: 100ms-5s depending on latency requirements.
	BatchTimeout time.Duration

	// InlineInserts enables experimental combining of similar INSERT statements.
	// Combines multiple INSERT INTO table VALUES (...) into single multi-value INSERT.
	// Improves performance for bulk inserts but requires identical SQL structure.
	//
	// EXPERIMENTAL: This feature uses simple string parsing which may produce
	// incorrect results for complex queries (e.g., INSERT...SELECT, or VALUES
	// containing parentheses in string literals). Use only with simple INSERT
	// statements where you control the SQL structure.
	//
	// Default: false (disabled)
	InlineInserts bool

	// UseContexts determines whether to use context-based methods by default.
	// When false, operations use non-context methods unless WithContext() is called.
	// Default: false (contexts are opt-in for better performance)
	UseContexts bool

	// StmtCacheMaxSize is the maximum number of prepared statements to cache.
	// When the cache is full, least recently used statements are evicted.
	// Default: 1000. Set higher for applications with many unique queries.
	StmtCacheMaxSize int

	// StmtCacheSampleRate controls detailed metrics sampling (1 in N operations).
	// Higher values reduce metrics overhead but provide less detailed monitoring.
	// Default: 100 (sample every 100th operation). Set to 1 for full sampling.
	StmtCacheSampleRate int64

	// StmtCacheSlowThreshold is the threshold for recording slow query preparation.
	// Preparations taking longer than this are logged for performance analysis.
	// Default: 10ms. Typical range: 1ms-100ms depending on performance requirements.
	StmtCacheSlowThreshold time.Duration

	// ErrorQueueMaxSize is the maximum number of errors to retain in memory.
	// When full, oldest errors are persisted to disk and removed from memory.
	// Default: 1000. Higher values provide more error history but use more memory.
	ErrorQueueMaxSize int

	// EnableAutoRetry determines whether to automatically retry failed async operations.
	// When enabled, retriable errors (database locks, timeouts) are retried automatically.
	// Default: false (manual retry control)
	EnableAutoRetry bool

	// RetryInterval is how often to check for jobs ready to retry.
	// Shorter intervals provide faster retry response but use more CPU.
	// Default: 30 seconds. Typical range: 10s-5m depending on urgency needs.
	RetryInterval time.Duration

	// MaxRetries is the maximum number of retry attempts for a failed job.
	// After exceeding this limit, jobs are marked as permanently failed.
	// Default: 3. Typical range: 1-10 depending on reliability requirements.
	MaxRetries int

	// BaseRetryDelay is the base delay for exponential backoff retry logic.
	// Actual delay is BaseRetryDelay * 2^(attempt-1) with jitter.
	// Default: 30 seconds. Longer delays reduce database pressure during issues.
	BaseRetryDelay time.Duration

	// JobTimeout is the default timeout for individual job execution.
	// Applied to queries, prepared statement execution, and other single operations.
	// Default: 30 seconds. Adjust based on expected query complexity and database performance.
	JobTimeout time.Duration

	// TransactionTimeout is the default timeout for transaction execution.
	// Applied to multi-statement transactions including BEGIN, queries, and COMMIT.
	// Default: 30 seconds. Should be longer than JobTimeout for complex transactions.
	TransactionTimeout time.Duration

	// RetrySubmitTimeout is the timeout for submitting retry jobs to the worker queue.
	// Prevents retry logic from hanging when the queue is full or worker is stopped.
	// Default: 5 seconds. Should be shorter than RetryInterval.
	RetrySubmitTimeout time.Duration

	// QueueSubmitTimeout is the timeout for context-free submissions to wait for queue space.
	// Only applies to SubmitWaitNoContext/SubmitNoWaitNoContext methods.
	// Prevents deadlock when queue is full by failing after this timeout.
	// Default: 5 minutes. Should be long enough to allow queue to drain during high load.
	QueueSubmitTimeout time.Duration

	// EnableMetrics determines whether to collect performance and operational metrics.
	// When disabled, all metrics collection is skipped for better performance.
	// Default: true (metrics collection enabled)
	EnableMetrics bool

	// UsePreparedStatements makes all queries use prepared statements by default.
	// Individual queries can still override this with the Prepared() method.
	// Prepared statements are cached and reduce parsing overhead for repeated queries.
	// Default: false
	UsePreparedStatements bool

	// ErrorLogPath is the path for persistent error logging database.
	// If empty, persistent error logging is disabled (in-memory error queue still works).
	// Default: "" (persistent logging disabled)
	ErrorLogPath string
}

Options holds configuration for the qwr manager's internal behavior.

IMPORTANT: Options are immutable after manager startup. They can only be set during manager construction via New() and cannot be modified at runtime. To change options, you must stop the application, modify the options, and restart.

func (*Options) SetDefaults

func (o *Options) SetDefaults()

SetDefaults applies default values if not set.

func (*Options) Validate

func (o *Options) Validate() error

Validate validates and sets defaults for all options

type QWRError

type QWRError struct {
	// Original error from the underlying operation
	Original error
	// Category of the error for granular handling
	Category ErrorCategory
	// RetryStrategy for this specific error
	Strategy RetryStrategy
	// Context provides additional information about the error
	Context map[string]any
	// Timestamp when the error occurred
	Timestamp time.Time
	// Operation that caused the error (query, transaction, etc.)
	Operation string
}

QWRError provides structured error information with enhanced classification

func ClassifyError

func ClassifyError(err error, operation string) *QWRError

ClassifyError provides enhanced error classification with detailed categorization

func NewQWRError

func NewQWRError(original error, category ErrorCategory, strategy RetryStrategy, operation string) *QWRError

NewQWRError creates a new structured QWR error

func (*QWRError) Error

func (qe *QWRError) Error() string

Error implements the error interface

func (*QWRError) IsRetriable

func (qe *QWRError) IsRetriable() bool

IsRetriable returns true if the error should be retried

func (*QWRError) Unwrap

func (qe *QWRError) Unwrap() error

Unwrap allows error unwrapping for errors.Is and errors.As

func (*QWRError) WithContext

func (qe *QWRError) WithContext(key string, value any) *QWRError

WithContext adds context information to the error

type Query

type Query struct {
	SQL  string
	Args []any
	// contains filtered or unexported fields
}

Query represents a database query operation

func (Query) ExecuteWithContext

func (q Query) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs the query against the database with context

func (Query) ID

func (q Query) ID() int64

ID returns the unique identifier for this query

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder provides a fluent API for building and executing queries

func GetQueryBuilder

func GetQueryBuilder() *QueryBuilder

GetQueryBuilder gets a pre-allocated QueryBuilder object from pool

func (*QueryBuilder) Async

func (qb *QueryBuilder) Async() (int64, error)

Async submits the query to the worker pool for background execution. Returns immediately with a job ID that can be used to check for errors later. Failed async queries are automatically added to the error queue for inspection or retry.

func (*QueryBuilder) Batch

func (qb *QueryBuilder) Batch() (int64, error)

Batch adds the query to a batch for deferred execution

IMPORTANT: Batch operations use the manager's internal context, NOT the context set via WithContext(). This means:

  • Query-level contexts (from WithContext()) are ignored for batch operations
  • All queries in a batch share the same manager-level context
  • Timeouts and cancellation apply to the entire batch, not individual queries

If you need query-specific context control, use Execute() or Async() instead.

func (*QueryBuilder) Execute

func (qb *QueryBuilder) Execute() (*QueryResult, error)

Execute submits the query to the worker pool and waits for completion. Provides queued execution with immediate error feedback. Query will be serialised with other operations but the caller will block until completion.

func (*QueryBuilder) GenID

func (qb *QueryBuilder) GenID() *QueryBuilder

GenID generates a unique ID for this query

func (*QueryBuilder) Prepared

func (qb *QueryBuilder) Prepared() *QueryBuilder

Prepared marks the query to use a prepared statement for execution. Prepared statements are cached and reused, reducing parsing overhead for repeated queries. Most beneficial for queries executed multiple times with different parameters.

func (*QueryBuilder) Read

func (qb *QueryBuilder) Read() (*sql.Rows, error)

Read executes a read operation on the reader connection pool and returns multiple rows. Uses concurrent reader connections for better read performance. Remember to call rows.Close() when finished to prevent connection leaks.

func (*QueryBuilder) ReadClose added in v0.1.4

func (qb *QueryBuilder) ReadClose(fn func(*sql.Rows) error) error

ReadClose executes a read operation and automatically closes the rows when done. The provided function receives the rows and should iterate/scan them as needed. Rows are automatically closed after the function returns, preventing resource leaks.

This is a convenience method that eliminates the need to manually defer rows.Close(), making it safer and cleaner for typical read operations.

Example:

var users []User
err := mgr.Query("SELECT id, name FROM users").ReadClose(func(rows *sql.Rows) error {
    for rows.Next() {
        var u User
        if err := rows.Scan(&u.ID, &u.Name); err != nil {
            return err
        }
        users = append(users, u)
    }
    return nil
})

Returns any error from the query execution, the callback function, or row iteration.

func (*QueryBuilder) ReadRow

func (qb *QueryBuilder) ReadRow() (*sql.Row, error)

ReadRow executes a read operation on the reader connection pool and returns a single row. Convenient for queries expected to return exactly one row. Use row.Scan() to extract values. sql.ErrNoRows is returned when no rows match the query.

func (*QueryBuilder) Release

func (qb *QueryBuilder) Release()

Release manually returns the QueryBuilder to the object pool for reuse. Only call this if you don't execute the query (Write/Async/Execute/Read/ReadRow). All execution methods automatically release the QueryBuilder when complete.

func (*QueryBuilder) WithContext

func (qb *QueryBuilder) WithContext(ctx context.Context) *QueryBuilder

WithContext adds a context to the query and enables context usage for this specific query. The context will be used for timeouts, cancellation, and deadlines during query execution. Note: Batch operations ignore query-level contexts and use the manager's internal context.

func (*QueryBuilder) Write

func (qb *QueryBuilder) Write() (*QueryResult, error)

Write executes the query directly on the writer connection, bypassing the worker queue. This provides immediate execution and error feedback but may block the caller. Returns a QueryResult containing SQL result, error, duration, and query ID.

type QueryResult

type QueryResult struct {
	SQLResult sql.Result
	// contains filtered or unexported fields
}

QueryResult represents the outcome of a query execution

func (*QueryResult) Duration

func (r *QueryResult) Duration() time.Duration

GetDuration returns how long the query took to execute

func (*QueryResult) Error

func (r *QueryResult) Error() error

GetError returns any error that occurred during execution

func (*QueryResult) ID

func (r *QueryResult) ID() int64

GetID returns the ID of the query that produced this result

type ResultType

type ResultType int
const (
	ResultTypeQuery ResultType = iota
	ResultTypeTransaction
	ResultTypeBatch
)

type RetryStrategy

type RetryStrategy int

RetryStrategy defines how errors should be retried

const (
	// RetryStrategyNone indicates no retry should be attempted
	RetryStrategyNone RetryStrategy = iota
	// RetryStrategyImmediate indicates immediate retry with no delay
	RetryStrategyImmediate
	// RetryStrategyExponential indicates exponential backoff retry
	RetryStrategyExponential
	// RetryStrategyLinear indicates linear backoff retry
	RetryStrategyLinear
)

func (RetryStrategy) String

func (rs RetryStrategy) String() string

String returns the string representation of RetryStrategy

type SlowQuery

type SlowQuery struct {
	Query    string        // The SQL query that was slow to prepare
	Duration time.Duration // How long the preparation took
	When     time.Time     // When the slow preparation occurred
}

SlowQuery represents a slow statement preparation for metrics tracking

type StmtCache

type StmtCache struct {
	// contains filtered or unexported fields
}

StmtCache is a high-performance prepared statement cache using ristretto for optimal concurrent access with LRU eviction. Stores SQL prepared statements keyed by their query string.

func NewStmtCache

func NewStmtCache(metrics *StmtCacheMetrics, options Options) (*StmtCache, error)

NewStmtCache creates a new prepared statement cache with LRU eviction. maxSize controls the maximum number of statements to cache (0 = default 1000).

func (*StmtCache) Clear added in v0.1.6

func (c *StmtCache) Clear()

Clear closes all cached statements and clears the cache. The cache remains usable after Clear() - new statements will be prepared on demand.

func (*StmtCache) Close

func (c *StmtCache) Close()

Close gracefully shuts down the cache. After Close(), the cache cannot be used - Get() will return ErrCacheClosed.

func (*StmtCache) Get

func (c *StmtCache) Get(db *sql.DB, query string) (*sql.Stmt, error)

Get retrieves a prepared statement from cache or prepares it if not found

type StmtCacheMetrics

type StmtCacheMetrics struct {
	Size       atomic.Int32 // Current cache size
	MaxSize    int64        // Maximum cache size
	Hits       atomic.Int64 // Cache hits
	Misses     atomic.Int64 // Cache misses
	Evictions  atomic.Int64 // Number of evictions
	Collisions atomic.Int64 // Hash collisions (legacy, may not be used)
	// contains filtered or unexported fields
}

StmtCacheMetrics holds enhanced atomic counters for thread-safe cache metrics

func NewStmtCacheMetrics

func NewStmtCacheMetrics() *StmtCacheMetrics

NewStmtCacheMetrics creates initialized statement cache metrics

func (*StmtCacheMetrics) Stats

func (s *StmtCacheMetrics) Stats() StmtCacheStats

Stats method converts atomic metrics to snapshot stats

type StmtCacheStats

type StmtCacheStats struct {
	Size       int
	MaxSize    int64
	Hits       int64
	Misses     int64
	Evictions  int64
	Collisions int64
	HitRatio   float64

	// Enhanced stats
	AvgPrepTimeMs     float64
	PrepErrors        int64
	SlowQueriesCount  int
	UptimeSeconds     float64
	TopQueries        map[string]int64
	RecentSlowQueries []SlowQuery
}

StmtCacheStats provides a snapshot of cache statistics (non-atomic)

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction represents multiple SQL statements to execute in a transaction

func (*Transaction) Add

func (t *Transaction) Add(sql string, args ...any) *Transaction

Add adds a query to the transaction

func (*Transaction) AddPrepared

func (t *Transaction) AddPrepared(sql string, args ...any) *Transaction

AddPrepared adds a prepared query to the transaction

func (*Transaction) Exec

func (t *Transaction) Exec() (*TransactionResult, error)

Exec runs the transaction through the worker pool

func (*Transaction) ExecuteWithContext

func (t *Transaction) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext implements JobID interface

func (*Transaction) ID

func (t *Transaction) ID() int64

ID returns the transaction ID

func (*Transaction) WithContext

func (t *Transaction) WithContext(ctx context.Context) *Transaction

WithContext adds context to the transaction

func (*Transaction) Write

func (t *Transaction) Write() (*TransactionResult, error)

Write executes the transaction directly

type TransactionResult

type TransactionResult struct {
	Results []*QueryResult
	// contains filtered or unexported fields
}

TransactionResult represents the outcome of a transaction execution

func (*TransactionResult) Duration

func (r *TransactionResult) Duration() time.Duration

GetDuration returns how long the transaction took to execute

func (*TransactionResult) Error

func (r *TransactionResult) Error() error

GetError returns any error that occurred during execution

func (*TransactionResult) ID

func (r *TransactionResult) ID() int64

GetID returns the ID of the transaction that produced this result

type WriteSerialiser

type WriteSerialiser struct {
	// contains filtered or unexported fields
}

WriteSerialiser manages a single worker that processes database jobs

func NewWorkerPool

func NewWorkerPool(db *sql.DB, queueDepth int, metrics *Metrics, writeCache *StmtCache, options Options) *WriteSerialiser

NewWorkerPool creates a new worker pool for database jobs

func (*WriteSerialiser) SetAsyncErrorHandler

func (wp *WriteSerialiser) SetAsyncErrorHandler(handler func(Query, error, time.Duration))

SetAsyncErrorHandler sets the callback for handling async errors

func (*WriteSerialiser) Start

func (wp *WriteSerialiser) Start(ctx context.Context)

Start begins the worker processing loop

func (*WriteSerialiser) Stop

func (wp *WriteSerialiser) Stop() error

Stop shuts down the worker pool

func (*WriteSerialiser) SubmitNoWait

func (wp *WriteSerialiser) SubmitNoWait(ctx context.Context, job Job) (int64, error)

SubmitNoWait submits a job to the queue without waiting

func (*WriteSerialiser) SubmitNoWaitNoContext

func (wp *WriteSerialiser) SubmitNoWaitNoContext(job Job) (int64, error)

SubmitNoWaitNoContext submits a job without using any context

func (*WriteSerialiser) SubmitWait

func (wp *WriteSerialiser) SubmitWait(ctx context.Context, job Job) (JobResult, error)

SubmitWait submits a job to the queue and waits for its result

func (*WriteSerialiser) SubmitWaitNoContext

func (wp *WriteSerialiser) SubmitWaitNoContext(job Job) (JobResult, error)

SubmitWaitNoContext submits a job without using any context

Directories

Path Synopsis
Package backup defines backup methods for SQLite databases.
Package backup defines backup methods for SQLite databases.
Package checkpoint defines WAL checkpoint modes for SQLite.
Package checkpoint defines WAL checkpoint modes for SQLite.
Package profiles provides pre-configured database profiles for different workload types.
Package profiles provides pre-configured database profiles for different workload types.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL