storage

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2025 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package storage provides the durable data structures for tinySQL.

What: An in-memory multi-tenant catalog of tables with column metadata, rows, and basic typing. It includes snapshot cloning for MVCC-light, GOB-based checkpoints, and an append-only Write-Ahead Log (WAL) for crash recovery and durability. How: Tables store rows as [][]any for compactness; a lower-cased column index accelerates name lookups. Save/Load serialize the catalog to a file, writing JSON for JSON columns. The WAL logs whole-table changes and drops; recovery replays committed records and truncates partial tails. Why: Favor a simple, explicit model over complex page managers: it keeps the code understandable, testable, and sufficient for embedded/edge use cases.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTxNotActive          = fmt.Errorf("transaction is not active")
	ErrSerializationFailure = fmt.Errorf("could not serialize access due to concurrent update")
	ErrRowNotFound          = fmt.Errorf("row not found")
)

Errors

Functions

func EstimateColumnSize

func EstimateColumnSize(typ ColType) int64

EstimateColumnSize estimates the average size of a column type.

func EstimateTableSize

func EstimateTableSize(t *Table) int64

EstimateTableSize estimates memory usage of a table in bytes.

func EstimateValueSize

func EstimateValueSize(val any) int64

EstimateValueSize estimates the memory size of a value.

func FanIn

func FanIn(ctx context.Context, channels ...<-chan interface{}) <-chan interface{}

FanIn combines multiple channels into one.

func FanOut

func FanOut(ctx context.Context, input <-chan interface{}, workers int) []<-chan interface{}

FanOut distributes work from one channel to multiple channels.

func SaveToBytes

func SaveToBytes(db *DB) ([]byte, error)

SaveToBytes serializes the database snapshot to a byte slice.

func SaveToFile

func SaveToFile(db *DB, filename string) error

SaveToFile writes a snapshot of the database to a file. If the filename ends with .gz, the snapshot is gzip-compressed to reduce size.

func SaveToWriter

func SaveToWriter(db *DB, w io.Writer) error

SaveToWriter writes a snapshot of the database to an arbitrary writer. It does not attach or alter WAL configuration.

Types

type AdvancedWAL

type AdvancedWAL struct {
	// contains filtered or unexported fields
}

AdvancedWAL manages row-level write-ahead logging with full ACID guarantees.

func OpenAdvancedWAL

func OpenAdvancedWAL(config AdvancedWALConfig) (*AdvancedWAL, error)

OpenAdvancedWAL creates or opens a WAL with full ACID semantics.

func (*AdvancedWAL) Checkpoint

func (w *AdvancedWAL) Checkpoint(db *DB) error

Checkpoint creates a consistent snapshot and truncates the WAL.

func (*AdvancedWAL) Close

func (w *AdvancedWAL) Close() error

Close flushes and closes the WAL.

func (*AdvancedWAL) GetCommittedLSN

func (w *AdvancedWAL) GetCommittedLSN() LSN

GetCommittedLSN returns the LSN of the last committed transaction.

func (*AdvancedWAL) GetFlushedLSN

func (w *AdvancedWAL) GetFlushedLSN() LSN

GetFlushedLSN returns the LSN of the last flushed record.

func (*AdvancedWAL) GetNextLSN

func (w *AdvancedWAL) GetNextLSN() LSN

GetNextLSN returns the next LSN to be assigned.

func (*AdvancedWAL) LogAbort

func (w *AdvancedWAL) LogAbort(txID TxID) (LSN, error)

LogAbort logs a transaction abort.

func (*AdvancedWAL) LogBegin

func (w *AdvancedWAL) LogBegin(txID TxID) (LSN, error)

LogBegin logs the start of a transaction.

func (*AdvancedWAL) LogCommit

func (w *AdvancedWAL) LogCommit(txID TxID) (LSN, error)

LogCommit logs a transaction commit.

func (*AdvancedWAL) LogDelete

func (w *AdvancedWAL) LogDelete(txID TxID, tenant, table string, rowID int64, before []any, cols []Column) (LSN, error)

LogDelete logs a row deletion.

func (*AdvancedWAL) LogInsert

func (w *AdvancedWAL) LogInsert(txID TxID, tenant, table string, rowID int64, data []any, cols []Column) (LSN, error)

LogInsert logs a row insertion.

func (*AdvancedWAL) LogUpdate

func (w *AdvancedWAL) LogUpdate(txID TxID, tenant, table string, rowID int64, before, after []any, cols []Column) (LSN, error)

LogUpdate logs a row update with before/after images.

func (*AdvancedWAL) Recover

func (w *AdvancedWAL) Recover(db *DB) (int, error)

Recover replays the WAL to restore database state after a crash.

func (*AdvancedWAL) ShouldCheckpoint

func (w *AdvancedWAL) ShouldCheckpoint() bool

ShouldCheckpoint checks if a checkpoint is needed.

type AdvancedWALConfig

type AdvancedWALConfig struct {
	Path               string
	CheckpointPath     string
	CheckpointEvery    uint64        // Checkpoint after N records
	CheckpointInterval time.Duration // Checkpoint after duration
	Compress           bool
	BufferSize         int // Buffer size for writing
}

AdvancedWALConfig configures the advanced WAL.

type BatchHandler

type BatchHandler func(items []interface{}) error

BatchHandler processes a batch of items.

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor batches operations for efficiency.

func NewBatchProcessor

func NewBatchProcessor(maxSize int, interval time.Duration, handler BatchHandler) *BatchProcessor

NewBatchProcessor creates a new batch processor.

func (*BatchProcessor) Add

func (bp *BatchProcessor) Add(item interface{}) error

Add adds an item to the batch queue.

func (*BatchProcessor) Run

func (bp *BatchProcessor) Run(ctx context.Context, wg *sync.WaitGroup)

Run starts the batch processor.

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool manages in-memory tables with configurable eviction.

func NewBufferPool

func NewBufferPool(policy *MemoryPolicy) *BufferPool

NewBufferPool creates a buffer pool with the given policy.

func (*BufferPool) Get

func (bp *BufferPool) Get(tenant, name string) (*Table, bool)

Get retrieves a table from the buffer pool.

func (*BufferPool) GetMemoryLimit

func (bp *BufferPool) GetMemoryLimit() int64

GetMemoryLimit returns the configured memory limit.

func (*BufferPool) GetMemoryUsage

func (bp *BufferPool) GetMemoryUsage() int64

GetMemoryUsage returns current memory usage in bytes.

func (*BufferPool) GetStats

func (bp *BufferPool) GetStats() CacheStats

GetStats returns a copy of current statistics.

func (*BufferPool) Put

func (bp *BufferPool) Put(tenant, name string, table *Table) error

Put adds or updates a table in the buffer pool.

func (*BufferPool) Remove

func (bp *BufferPool) Remove(tenant, name string)

Remove removes a table from the buffer pool.

type CacheStats

type CacheStats struct {
	MemoryUsed        int64
	MemoryLimit       int64
	MemoryUtilization float64

	CacheHits   int64
	CacheMisses int64
	HitRate     float64

	EvictionCount int64
	EvictionSize  int64

	TablesInMemory int
	TablesOnDisk   int
	// contains filtered or unexported fields
}

CacheStats tracks buffer pool performance metrics.

type CacheStrategy

type CacheStrategy int

CacheStrategy defines the eviction policy for the buffer pool.

const (
	StrategyNone CacheStrategy = iota // No eviction (full in-memory)
	StrategyLRU                       // Least Recently Used
	StrategyLFU                       // Least Frequently Used (future)
	StrategyARC                       // Adaptive Replacement Cache (future)
)

func (CacheStrategy) String

func (s CacheStrategy) String() string

type CachedTable

type CachedTable struct {
	Table       *Table
	Size        int64
	LoadedAt    time.Time
	LastAccess  time.Time
	AccessCount int64
	Pinned      bool

	// Disk location (for lazy loading - future)
	OnDisk     bool
	DiskOffset int64
	// contains filtered or unexported fields
}

CachedTable wraps a table with caching metadata.

type ColType

type ColType int

ColType enumerates supported column data types.

const (
	// Integer types
	IntType ColType = iota
	Int8Type
	Int16Type
	Int32Type
	Int64Type
	UintType
	Uint8Type
	Uint16Type
	Uint32Type
	Uint64Type

	// Floating point types
	Float32Type
	Float64Type
	FloatType // alias for Float64Type

	// String and character types
	StringType
	TextType // alias for StringType
	RuneType
	ByteType

	// Boolean type
	BoolType

	// Time types
	TimeType
	DateType
	DateTimeType
	TimestampType
	DurationType

	// Complex types
	JsonType
	JsonbType
	MapType
	SliceType
	ArrayType

	// Advanced types
	Complex64Type
	Complex128Type
	ComplexType // alias for Complex128Type
	PointerType
	InterfaceType
)

func (ColType) String

func (t ColType) String() string

type Column

type Column struct {
	Name         string
	Type         ColType
	Constraint   ConstraintType
	ForeignKey   *ForeignKeyRef // Only used if Constraint == ForeignKey
	PointerTable string         // Target table for POINTER type
}

Column holds column schema information in a table.

type ConcurrencyConfig

type ConcurrencyConfig struct {
	// Worker pool sizes
	ReadWorkers  int
	WriteWorkers int

	// Channel buffer sizes
	ReadQueueSize  int
	WriteQueueSize int

	// Timeouts
	WorkerTimeout time.Duration
	QueueTimeout  time.Duration

	// Batch settings
	BatchSize     int
	BatchInterval time.Duration
}

ConcurrencyConfig configures the concurrency system.

func DefaultConcurrencyConfig

func DefaultConcurrencyConfig() ConcurrencyConfig

DefaultConcurrencyConfig returns sensible defaults based on CPU count.

type ConcurrencyManager

type ConcurrencyManager struct {
	// contains filtered or unexported fields
}

ConcurrencyManager orchestrates concurrent operations.

func NewConcurrencyManager

func NewConcurrencyManager(config ConcurrencyConfig) *ConcurrencyManager

NewConcurrencyManager creates a new concurrency manager.

func (*ConcurrencyManager) Shutdown

func (cm *ConcurrencyManager) Shutdown(timeout time.Duration) error

Shutdown gracefully shuts down the concurrency manager.

func (*ConcurrencyManager) Stats

func (cm *ConcurrencyManager) Stats() *ConcurrencyStats

Stats returns current concurrency statistics.

func (*ConcurrencyManager) SubmitRead

func (cm *ConcurrencyManager) SubmitRead(ctx context.Context, data interface{}) <-chan WorkResult

SubmitRead submits a read request (non-blocking).

func (*ConcurrencyManager) SubmitWrite

func (cm *ConcurrencyManager) SubmitWrite(ctx context.Context, data interface{}) <-chan WorkResult

SubmitWrite submits a write request (non-blocking).

type ConcurrencyStats

type ConcurrencyStats struct {
	TotalRequests   atomic.Uint64
	CompletedReads  atomic.Uint64
	CompletedWrites atomic.Uint64
	FailedRequests  atomic.Uint64
	QueuedReads     atomic.Int64
	QueuedWrites    atomic.Int64
	ActiveWorkers   atomic.Int64
}

ConcurrencyStats tracks concurrency metrics.

type ConstraintType

type ConstraintType int

ConstraintType enumerates supported column constraints.

const (
	NoConstraint ConstraintType = iota
	PrimaryKey
	ForeignKey
	Unique
)

func (ConstraintType) String

func (c ConstraintType) String() string

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is an in-memory, multi-tenant database catalog with full MVCC support.

func LoadFromBytes

func LoadFromBytes(b []byte) (*DB, error)

LoadFromBytes loads a database from a byte slice.

func LoadFromFile

func LoadFromFile(filename string) (*DB, error)

LoadFromFile loads a database snapshot from a file. It auto-detects gzip compression based on the .gz suffix and attaches a WAL if a path is given.

func LoadFromReader

func LoadFromReader(r io.Reader) (*DB, error)

LoadFromReader loads a database snapshot from an arbitrary reader. The returned DB has no WAL attached.

func NewDB

func NewDB() *DB

NewDB creates a new empty database catalog with MVCC support.

func (*DB) AdvancedWAL

func (db *DB) AdvancedWAL() *AdvancedWAL

AdvancedWAL returns the configured advanced WAL manager (may be nil).

func (*DB) AttachAdvancedWAL

func (db *DB) AttachAdvancedWAL(wal *AdvancedWAL)

AttachAdvancedWAL attaches an advanced WAL to the database.

func (*DB) DeepClone

func (db *DB) DeepClone() *DB

DeepClone creates a full copy of the database (MVCC-light snapshot). Note: This is not copy-on-write; it creates a full copy (simple but O(n)).

func (*DB) Drop

func (db *DB) Drop(tn, name string) error

Drop removes a table from the tenant.

func (*DB) Get

func (db *DB) Get(tn, name string) (*Table, error)

Get returns a table by name for the given tenant.

func (*DB) ListTables

func (db *DB) ListTables(tn string) []*Table

ListTables returns the tables in a tenant sorted by name.

func (*DB) MVCC

func (db *DB) MVCC() *MVCCManager

MVCC returns the MVCC manager.

func (*DB) Put

func (db *DB) Put(tn string, t *Table) error

Put adds a new table to the tenant; returns error if it already exists.

func (*DB) WAL

func (db *DB) WAL() *WALManager

WAL returns the configured WAL manager (may be nil).

type ForeignKeyRef

type ForeignKeyRef struct {
	Table  string
	Column string
}

ForeignKeyRef describes a foreign key reference target.

type IsolationLevel

type IsolationLevel uint8

IsolationLevel defines transaction isolation semantics.

const (
	ReadCommitted IsolationLevel = iota
	RepeatableRead
	SnapshotIsolation
	Serializable
)

type LRUNode

type LRUNode struct {
	// contains filtered or unexported fields
}

LRUNode is a node in the LRU doubly-linked list.

type LRUQueue

type LRUQueue struct {
	// contains filtered or unexported fields
}

LRUQueue implements least-recently-used eviction.

func NewLRUQueue

func NewLRUQueue() *LRUQueue

NewLRUQueue creates an empty LRU queue.

func (*LRUQueue) Access

func (lru *LRUQueue) Access(key string, table *CachedTable)

Access moves a node to the front (most recent).

func (*LRUQueue) Add

func (lru *LRUQueue) Add(key string, table *CachedTable)

Add adds a node to the LRU queue (most recent).

func (*LRUQueue) Remove

func (lru *LRUQueue) Remove(key string)

Remove removes a node from the queue.

func (*LRUQueue) RemoveLRU

func (lru *LRUQueue) RemoveLRU() (string, *CachedTable)

RemoveLRU removes and returns the least recently used node.

type LSN

type LSN uint64

LSN (Log Sequence Number) provides total ordering of log records.

type MVCCManager

type MVCCManager struct {
	// contains filtered or unexported fields
}

MVCCManager coordinates transaction IDs, timestamps, and visibility.

func NewMVCCManager

func NewMVCCManager() *MVCCManager

NewMVCCManager creates a new MVCC coordinator.

func (*MVCCManager) AbortTx

func (m *MVCCManager) AbortTx(tx *TxContext)

AbortTx marks a transaction as aborted.

func (*MVCCManager) BeginTx

func (m *MVCCManager) BeginTx(level IsolationLevel) *TxContext

BeginTx starts a new transaction and returns its context.

func (*MVCCManager) CommitTx

func (m *MVCCManager) CommitTx(tx *TxContext) (Timestamp, error)

CommitTx marks a transaction as committed and records its commit timestamp.

func (*MVCCManager) GCWatermark

func (m *MVCCManager) GCWatermark() Timestamp

GCWatermark returns the timestamp before which row versions can be garbage collected.

func (*MVCCManager) IsVisible

func (m *MVCCManager) IsVisible(tx *TxContext, rv *RowVersion) bool

IsVisible determines if a row version is visible to a transaction.

type MVCCTable

type MVCCTable struct {
	*Table
	// contains filtered or unexported fields
}

MVCCTable extends Table with version chains.

func NewMVCCTable

func NewMVCCTable(name string, cols []Column, isTemp bool) *MVCCTable

NewMVCCTable creates a table with MVCC support.

func (*MVCCTable) DeleteVersion

func (t *MVCCTable) DeleteVersion(tx *TxContext, rowID int64) error

DeleteVersion marks a row version as deleted.

func (*MVCCTable) GarbageCollect

func (t *MVCCTable) GarbageCollect(watermark Timestamp) int

GarbageCollect removes old row versions that are no longer visible.

func (*MVCCTable) GetVisibleVersion

func (t *MVCCTable) GetVisibleVersion(mvcc *MVCCManager, tx *TxContext, rowID int64) *RowVersion

GetVisibleVersion returns the visible version of a row for the given transaction.

func (*MVCCTable) InsertVersion

func (t *MVCCTable) InsertVersion(tx *TxContext, data []any) int64

InsertVersion adds a new row version.

func (*MVCCTable) UpdateVersion

func (t *MVCCTable) UpdateVersion(tx *TxContext, rowID int64, newData []any) error

UpdateVersion creates a new version for an update.

type MemoryPolicy

type MemoryPolicy struct {
	// Maximum memory usage in bytes (0 = unlimited)
	MaxMemoryBytes int64

	// Cache eviction strategy
	Strategy CacheStrategy

	// Start evicting when memory usage exceeds this ratio (0.0-1.0)
	EvictionThreshold float64

	// Tables that should always stay in memory
	PinnedTables []string

	// Tables that should never be cached
	IgnoreTables []string

	// Enable eviction (if false, OOM when limit reached)
	EnableEviction bool

	// Number of tables to evict in one batch
	EvictionBatchSize int

	// Track access patterns for better eviction decisions
	TrackAccessPatterns bool

	// Time window for access tracking
	AccessWindow time.Duration
}

MemoryPolicy defines memory management configuration.

func DefaultMemoryPolicy

func DefaultMemoryPolicy() *MemoryPolicy

DefaultMemoryPolicy returns a sensible default configuration.

func LimitedMemoryPolicy

func LimitedMemoryPolicy(maxMB int64) *MemoryPolicy

LimitedMemoryPolicy returns a policy with memory limits.

type ParallelIterator

type ParallelIterator struct {
	// contains filtered or unexported fields
}

ParallelIterator provides concurrent iteration over data.

func NewParallelIterator

func NewParallelIterator(items []interface{}, workers int) *ParallelIterator

NewParallelIterator creates a parallel iterator.

func (*ParallelIterator) ForEach

func (pi *ParallelIterator) ForEach(fn func(item interface{}) error) error

ForEach processes items in parallel.

func (*ParallelIterator) Map

func (pi *ParallelIterator) Map(fn func(item interface{}) (interface{}, error)) ([]interface{}, error)

Map applies a function to all items in parallel and returns results.

func (*ParallelIterator) WithContext

func (pi *ParallelIterator) WithContext(ctx context.Context) *ParallelIterator

WithContext sets the context for the iterator.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline implements a concurrent pipeline pattern.

func NewPipeline

func NewPipeline(ctx context.Context, stages ...PipelineStage) *Pipeline

NewPipeline creates a new pipeline.

func (*Pipeline) Execute

func (p *Pipeline) Execute(input []interface{}) <-chan interface{}

Execute runs the pipeline.

type PipelineStage

type PipelineStage func(ctx context.Context, input <-chan interface{}) <-chan interface{}

PipelineStage represents a stage in the pipeline.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter limits the rate of operations.

func NewRateLimiter

func NewRateLimiter(opsPerSecond int) *RateLimiter

NewRateLimiter creates a new rate limiter.

func (*RateLimiter) Stop

func (rl *RateLimiter) Stop()

Stop stops the rate limiter.

func (*RateLimiter) Wait

func (rl *RateLimiter) Wait(ctx context.Context) error

Wait blocks until a token is available.

type RowVersion

type RowVersion struct {
	// Transaction that created this version
	XMin TxID

	// Transaction that deleted/updated this version (0 if still valid)
	XMax TxID

	// Creation timestamp
	CreatedAt Timestamp

	// Deletion/update timestamp (0 if still valid)
	DeletedAt Timestamp

	// Actual row data
	Data []any

	// Pointer to next version (for version chain)
	NextVersion *RowVersion
}

RowVersion contains MVCC metadata for a single row version.

type Table

type Table struct {
	Name   string
	Cols   []Column
	Rows   [][]any
	IsTemp bool

	Version int
	// contains filtered or unexported fields
}

Table stores rows along with column metadata and indexes.

func NewTable

func NewTable(name string, cols []Column, isTemp bool) *Table

NewTable creates a new Table with case-insensitive column lookup indices.

func (*Table) ColIndex

func (t *Table) ColIndex(name string) (int, error)

ColIndex returns the zero-based index of the named column.

type Timestamp

type Timestamp uint64

Timestamp represents a logical timestamp for MVCC visibility.

type TxContext

type TxContext struct {
	ID             TxID
	StartTime      Timestamp
	Status         TxStatus
	ReadSnapshot   Timestamp                      // Snapshot timestamp for reads
	WriteSet       map[string]map[int64]bool      // table -> row IDs modified
	ReadSet        map[string]map[int64]Timestamp // table -> row IDs read with version
	IsolationLevel IsolationLevel
	// contains filtered or unexported fields
}

TxContext holds the state of an active transaction.

func (*TxContext) RecordRead

func (tx *TxContext) RecordRead(table string, rowID int64, version Timestamp)

RecordRead tracks a read operation for conflict detection.

func (*TxContext) RecordWrite

func (tx *TxContext) RecordWrite(table string, rowID int64)

RecordWrite tracks a write operation.

type TxID

type TxID uint64

TxID represents a unique transaction identifier.

type TxStatus

type TxStatus uint8

TxStatus represents the current state of a transaction.

const (
	TxStatusInProgress TxStatus = iota
	TxStatusCommitted
	TxStatusAborted
)

type WALChange

type WALChange struct {
	Tenant string
	Name   string
	Table  *Table
	Drop   bool
}

WALChange describes a persistent change that will be written to the WAL.

func CollectWALChanges

func CollectWALChanges(prev, next *DB) []WALChange

CollectWALChanges computes the delta between two MVCC snapshots.

type WALConfig

type WALConfig struct {
	Path               string
	CheckpointEvery    uint64
	CheckpointInterval time.Duration
}

WALConfig configures WAL and checkpoint behavior.

type WALManager

type WALManager struct {
	// contains filtered or unexported fields
}

WALManager encapsulates WAL append, recovery, and checkpoints.

func OpenWAL

func OpenWAL(db *DB, cfg WALConfig) (*WALManager, error)

OpenWAL ensures a WAL exists, replays committed records, and returns a ready-to-use manager. It attaches no WAL when Path is empty.

func (*WALManager) Checkpoint

func (w *WALManager) Checkpoint(db *DB) error

Checkpoint writes a DB snapshot and resets the WAL file.

func (*WALManager) Close

func (w *WALManager) Close() error

Close flushes, syncs, and closes the WAL resources.

func (*WALManager) LogTransaction

func (w *WALManager) LogTransaction(changes []WALChange) (bool, error)

LogTransaction appends all changes atomically to the WAL. It returns true when a checkpoint is recommended.

type WALOperationType

type WALOperationType uint8

WALOperationType defines the type of WAL operation.

const (
	WALOpBegin WALOperationType = iota + 1
	WALOpInsert
	WALOpUpdate
	WALOpDelete
	WALOpCommit
	WALOpAbort
	WALOpCheckpoint
)

func (WALOperationType) String

func (t WALOperationType) String() string

type WALRecord

type WALRecord struct {
	// Log Sequence Number - globally unique, monotonically increasing
	LSN LSN

	// Transaction ID
	TxID TxID

	// Operation type
	OpType WALOperationType

	// Tenant and table
	Tenant string
	Table  string

	// Row ID (for row-level operations)
	RowID int64

	// UNDO image (before state) - for rollback
	BeforeImage []any

	// REDO image (after state) - for recovery
	AfterImage []any

	// Column information (for schema tracking)
	Columns []Column

	// Timestamp
	Timestamp time.Time

	// Checksum for corruption detection
	Checksum uint32
}

WALRecord represents a single log entry with before/after images.

type WALTxState

type WALTxState struct {
	TxID       TxID
	StartLSN   LSN
	Operations []LSN
	Status     TxStatus
}

WALTxState tracks the state of a transaction in the WAL.

type WorkHandler

type WorkHandler func(ctx context.Context, req WorkRequest) WorkResult

WorkHandler processes work requests.

type WorkRequest

type WorkRequest struct {
	ID      uint64
	Context context.Context
	Type    WorkType
	Data    interface{}
	Result  chan WorkResult
}

WorkRequest represents a unit of work to be processed.

type WorkResult

type WorkResult struct {
	ID    uint64
	Data  interface{}
	Error error
}

WorkResult contains the result of a work request.

type WorkType

type WorkType uint8

WorkType defines the type of operation.

const (
	WorkTypeRead WorkType = iota
	WorkTypeWrite
	WorkTypeDelete
	WorkTypeScan
	WorkTypeBatch
)

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of worker goroutines.

func NewWorkerPool

func NewWorkerPool(name string, size int, workQueue chan WorkRequest, handler WorkHandler, ctx context.Context, wg *sync.WaitGroup) *WorkerPool

NewWorkerPool creates a new worker pool.

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start launches all worker goroutines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL