engine

package
v0.19.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanIncompleteSeal

func CleanIncompleteSeal(segDir string) error

CleanIncompleteSeal removes partial chunk/index files from a failed seal.

func DecodeID

func DecodeID(id int64) (segmentHour int64, chunkNum, row int)

DecodeID extracts segment hour, chunk number, and row offset from a composite ID.

func EncodeID

func EncodeID(segmentHour int64, chunkNum, row int) int64

EncodeID builds a composite int64 ID from segment hour, chunk number, and row offset.

func IDForPosition

func IDForPosition(segmentHour int64, entryIndex int) int64

IDForPosition computes the composite ID for a given position in the active WAL.

func IsSealComplete

func IsSealComplete(segDir string) bool

IsSealComplete checks if a segment directory has a .seal_complete marker.

func SegmentDirName

func SegmentDirName(h int64) string

SegmentDirName returns the directory name for a segment hour (e.g., "2026-04-04T12").

func SegmentHourFromTime

func SegmentHourFromTime(t time.Time) int64

SegmentHourFromTime returns the segment hour (hours since Unix epoch) for a given time.

func TimeFromSegmentHour

func TimeFromSegmentHour(h int64) time.Time

TimeFromSegmentHour converts a segment hour back to time (start of that hour).

Types

type ChunkMeta

type ChunkMeta struct {
	Index      int      `json:"index"`
	EntryCount int      `json:"entry_count"`
	IDRange    [2]int64 `json:"id_range"`
}

ChunkMeta holds info about a single chunk within a segment.

type HBucket

type HBucket struct {
	Total  int `json:"total"`
	Errors int `json:"errors"`
}

HBucket holds histogram counts for a single minute.

type HistogramBucket

type HistogramBucket struct {
	Timestamp time.Time
	Total     int
	Errors    int
}

HistogramBucket represents a time-bucketed count.

type LoadedSegment

type LoadedSegment struct {
	Hour    int64
	DirName string
	DirPath string
	Meta    *SegmentMeta
}

LoadedSegment is a sealed segment loaded into memory (metadata only).

type MetaCounts

type MetaCounts struct {
	ByLevel   map[string]int `json:"by_level"`
	ByService map[string]int `json:"by_service"`
}

MetaCounts holds pre-computed aggregation counts.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer is a fixed-size circular buffer of log entries for tail support. Writers call Push; tail subscribers call Snapshot + Subscribe.

func NewRingBuffer

func NewRingBuffer() *RingBuffer

NewRingBuffer creates a new ring buffer.

func (*RingBuffer) Push

func (rb *RingBuffer) Push(entries []chunk.Entry)

Push adds entries to the ring buffer and notifies subscribers.

func (*RingBuffer) Snapshot

func (rb *RingBuffer) Snapshot() []chunk.Entry

Snapshot returns the most recent entries in the ring (up to ringSize). Entries are returned oldest-first.

func (*RingBuffer) Subscribe

func (rb *RingBuffer) Subscribe() (<-chan []chunk.Entry, func())

Subscribe returns a channel that receives new entry batches, and an unsubscribe function. The channel has a buffer of 16 batches.

type SearchParams

type SearchParams struct {
	Query            string     // full-text search on message
	Service          string     // exact match
	Level            string     // exact match
	Env              string     // exact match
	TraceID          string     // exact match
	RequestID        string     // exact match
	UserID           string     // exact match
	TenantID         string     // exact match
	EventType        string     // exact match
	ErrorClass       string     // exact match
	ErrorFingerprint string     // exact match
	Method           string     // exact match
	Path             string     // substring match
	MinDurationMs    int        // minimum duration_ms
	NPlusOneOnly     bool       // only n_plus_one entries
	Start            *time.Time // event time range start
	End              *time.Time // event time range end
	Limit            int
	Offset           int
	SortAsc          bool // true = oldest first
}

SearchParams defines filters for log search.

type SearchResult

type SearchResult struct {
	Entries []chunk.Entry
	Total   int // total matching (may be > len(Entries) if limited)
}

SearchResult holds search results.

type SegmentManager

type SegmentManager struct {
	// contains filtered or unexported fields
}

SegmentManager manages the lifecycle of sealed segments and the active WAL.

func NewSegmentManager

func NewSegmentManager(dataDir string) (*SegmentManager, error)

NewSegmentManager scans the data directory and loads all sealed segments.

func (*SegmentManager) AllSegments

func (sm *SegmentManager) AllSegments() []*LoadedSegment

AllSegments returns all loaded segments (for operations that need all data).

func (*SegmentManager) FindSegmentByID

func (sm *SegmentManager) FindSegmentByID(id int64) *LoadedSegment

FindSegmentByID locates which segment contains a given composite ID.

func (*SegmentManager) Prune

func (sm *SegmentManager) Prune(retention time.Duration) (int, error)

Prune deletes segments older than the given duration. Returns the number of segments deleted.

func (*SegmentManager) Register

func (sm *SegmentManager) Register(hour int64, meta *SegmentMeta)

Register adds a newly sealed segment to the manager.

func (*SegmentManager) SegmentCount

func (sm *SegmentManager) SegmentCount() int

SegmentCount returns the number of sealed segments.

func (*SegmentManager) SegmentsInRange

func (sm *SegmentManager) SegmentsInRange(start, end time.Time) []*LoadedSegment

SegmentsInRange returns sealed segments that may contain data for the given time range. Includes ±1 hour buffer for clock skew (received_at vs ts).

type SegmentMeta

type SegmentMeta struct {
	Segment    string              `json:"segment"`
	Chunks     []ChunkMeta         `json:"chunks"`
	EntryCount int                 `json:"entry_count"`
	IDRange    [2]int64            `json:"id_range"`
	TimeRange  [2]string           `json:"time_range"`
	Counts     MetaCounts          `json:"counts"`
	Histogram  map[string]HBucket  `json:"histogram"`
	Dicts      map[string][]string `json:"dictionaries"`
}

SegmentMeta holds pre-computed statistics for a sealed segment.

func Seal

func Seal(walPath string, segmentHour int64) (*SegmentMeta, error)

Seal reads a WAL file, writes columnar chunks + inverted index + meta.json. This is the 2-pass streaming seal process.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the main log store engine. It implements the core query operations on top of the WAL writer, segment manager, and ingest pipeline.

func NewStore

func NewStore(dataDir string, samplingRules []ingest.SamplingRule, piiConfig ingest.PIIConfig) (*Store, error)

NewStore creates and initializes the segmented log store.

func (*Store) Close

func (s *Store) Close() error

Close shuts down the store gracefully.

func (*Store) CountByLevel

func (s *Store) CountByLevel(start, end time.Time, service, environment string) (map[string]int, error)

CountByLevel returns log counts grouped by level for the given time range. CountByLevel returns log counts grouped by level for the given time range.

When environment is set, the WAL path filters by env. Sealed segments use pre-computed counts that aren't broken down by env yet — for segments, the env parameter is ignored, so multi-env deployments may see slight over-counts on historical data. Single-env deployments (the common case) are unaffected since all segment rows share the same env.

func (*Store) CountByService

func (s *Store) CountByService(start, end time.Time, environment string) (map[string]int, error)

CountByService returns log counts grouped by service for the given time range. See CountByLevel for notes on segment-level env filtering.

func (*Store) DistinctValues

func (s *Store) DistinctValues(column string, start, end time.Time) ([]string, error)

DistinctValues returns unique values for a given column name.

func (*Store) GetBody

func (s *Store) GetBody(id int64) (json.RawMessage, error)

GetBody retrieves just the body blob for an entry.

func (*Store) GetByID

func (s *Store) GetByID(id int64) (*chunk.Entry, error)

GetByID retrieves a single entry by composite ID.

func (*Store) Histogram

func (s *Store) Histogram(start, end time.Time, interval time.Duration) ([]HistogramBucket, error)

Histogram returns per-interval counts for the given time range.

func (*Store) Ingest

func (s *Store) Ingest(entries []chunk.Entry) ([]chunk.Entry, error)

Ingest processes and stores a batch of log entries. Runs the full pipeline: sampling → PII scrub → error extraction → log expansion → WAL append.

func (*Store) Prune

func (s *Store) Prune(retention time.Duration) (int, error)

Prune deletes segments older than the given retention duration.

func (*Store) SealCurrentHour

func (s *Store) SealCurrentHour() error

SealCurrentHour triggers sealing of the current hour's WAL.

func (*Store) Search

func (s *Store) Search(params SearchParams) (*SearchResult, error)

Search finds log entries matching the given parameters.

func (*Store) Tail

func (s *Store) Tail() (snapshot []chunk.Entry, ch <-chan []chunk.Entry, unsubscribe func())

Tail subscribes to live log entries.

type WALWriter

type WALWriter struct {
	// contains filtered or unexported fields
}

WALWriter handles appending entries to the active WAL file. Thread-safe: multiple goroutines can call Append concurrently.

func NewWALWriter

func NewWALWriter(dataDir string, ring *RingBuffer) (*WALWriter, error)

NewWALWriter creates a new WAL writer. dataDir is the base directory (e.g., "data/logs"). It opens or creates the WAL file for the current hour.

func (*WALWriter) Append

func (w *WALWriter) Append(entries []chunk.Entry) ([]chunk.Entry, error)

Append writes a batch of entries to the WAL, assigns IDs, and notifies tail subscribers. Returns the assigned entries (with IDs populated).

func (*WALWriter) Close

func (w *WALWriter) Close() error

Close closes the WAL file.

func (*WALWriter) EntryCount

func (w *WALWriter) EntryCount() int

EntryCount returns the number of entries in the current WAL.

func (*WALWriter) Rotate

func (w *WALWriter) Rotate() (sealingPath string, sealHour int64, entryCount int, err error)

Rotate forces a WAL rotation (used by seal trigger). Returns the path to the old WAL file (now named sealing_T*.wal) and the segment hour it belongs to.

func (*WALWriter) SegmentHour

func (w *WALWriter) SegmentHour() int64

SegmentHour returns the current segment hour.

func (*WALWriter) ValidBytes

func (w *WALWriter) ValidBytes() int64

ValidBytes returns the number of safely fsynced bytes (safe for readers).

func (*WALWriter) WALPath

func (w *WALWriter) WALPath() string

WALPath returns the path to the current active WAL file.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL