Documentation
¶
Index ¶
- func CleanIncompleteSeal(segDir string) error
- func DecodeID(id int64) (segmentHour int64, chunkNum, row int)
- func EncodeID(segmentHour int64, chunkNum, row int) int64
- func IDForPosition(segmentHour int64, entryIndex int) int64
- func IsSealComplete(segDir string) bool
- func SegmentDirName(h int64) string
- func SegmentHourFromTime(t time.Time) int64
- func TimeFromSegmentHour(h int64) time.Time
- type ChunkMeta
- type HBucket
- type HistogramBucket
- type LoadedSegment
- type MetaCounts
- type RingBuffer
- type SearchParams
- type SearchResult
- type SegmentManager
- func (sm *SegmentManager) AllSegments() []*LoadedSegment
- func (sm *SegmentManager) FindSegmentByID(id int64) *LoadedSegment
- func (sm *SegmentManager) Prune(retention time.Duration) (int, error)
- func (sm *SegmentManager) Register(hour int64, meta *SegmentMeta)
- func (sm *SegmentManager) SegmentCount() int
- func (sm *SegmentManager) SegmentsInRange(start, end time.Time) []*LoadedSegment
- type SegmentMeta
- type Store
- func (s *Store) Close() error
- func (s *Store) CountByLevel(start, end time.Time, service, environment string) (map[string]int, error)
- func (s *Store) CountByService(start, end time.Time, environment string) (map[string]int, error)
- func (s *Store) DistinctValues(column string, start, end time.Time) ([]string, error)
- func (s *Store) GetBody(id int64) (json.RawMessage, error)
- func (s *Store) GetByID(id int64) (*chunk.Entry, error)
- func (s *Store) Histogram(start, end time.Time, interval time.Duration) ([]HistogramBucket, error)
- func (s *Store) Ingest(entries []chunk.Entry) ([]chunk.Entry, error)
- func (s *Store) Prune(retention time.Duration) (int, error)
- func (s *Store) SealCurrentHour() error
- func (s *Store) Search(params SearchParams) (*SearchResult, error)
- func (s *Store) Tail() (snapshot []chunk.Entry, ch <-chan []chunk.Entry, unsubscribe func())
- type WALWriter
- func (w *WALWriter) Append(entries []chunk.Entry) ([]chunk.Entry, error)
- func (w *WALWriter) Close() error
- func (w *WALWriter) EntryCount() int
- func (w *WALWriter) Rotate() (sealingPath string, sealHour int64, entryCount int, err error)
- func (w *WALWriter) SegmentHour() int64
- func (w *WALWriter) ValidBytes() int64
- func (w *WALWriter) WALPath() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CleanIncompleteSeal ¶
CleanIncompleteSeal removes partial chunk/index files from a failed seal.
func EncodeID ¶
EncodeID builds a composite int64 ID from segment hour, chunk number, and row offset.
func IDForPosition ¶
IDForPosition computes the composite ID for a given position in the active WAL.
func IsSealComplete ¶
IsSealComplete checks if a segment directory has a .seal_complete marker.
func SegmentDirName ¶
SegmentDirName returns the directory name for a segment hour (e.g., "2026-04-04T12").
func SegmentHourFromTime ¶
SegmentHourFromTime returns the segment hour (hours since Unix epoch) for a given time.
func TimeFromSegmentHour ¶
TimeFromSegmentHour converts a segment hour back to time (start of that hour).
Types ¶
type ChunkMeta ¶
type ChunkMeta struct {
Index int `json:"index"`
EntryCount int `json:"entry_count"`
IDRange [2]int64 `json:"id_range"`
}
ChunkMeta holds info about a single chunk within a segment.
type HistogramBucket ¶
HistogramBucket represents a time-bucketed count.
type LoadedSegment ¶
type LoadedSegment struct {
Hour int64
DirName string
DirPath string
Meta *SegmentMeta
}
LoadedSegment is a sealed segment loaded into memory (metadata only).
type MetaCounts ¶
type MetaCounts struct {
ByLevel map[string]int `json:"by_level"`
ByService map[string]int `json:"by_service"`
}
MetaCounts holds pre-computed aggregation counts.
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer is a fixed-size circular buffer of log entries for tail support. Writers call Push; tail subscribers call Snapshot + Subscribe.
func (*RingBuffer) Push ¶
func (rb *RingBuffer) Push(entries []chunk.Entry)
Push adds entries to the ring buffer and notifies subscribers.
func (*RingBuffer) Snapshot ¶
func (rb *RingBuffer) Snapshot() []chunk.Entry
Snapshot returns the most recent entries in the ring (up to ringSize). Entries are returned oldest-first.
func (*RingBuffer) Subscribe ¶
func (rb *RingBuffer) Subscribe() (<-chan []chunk.Entry, func())
Subscribe returns a channel that receives new entry batches, and an unsubscribe function. The channel has a buffer of 16 batches.
type SearchParams ¶
type SearchParams struct {
Query string // full-text search on message
Service string // exact match
Level string // exact match
Env string // exact match
TraceID string // exact match
RequestID string // exact match
UserID string // exact match
TenantID string // exact match
EventType string // exact match
ErrorClass string // exact match
ErrorFingerprint string // exact match
Method string // exact match
Path string // substring match
MinDurationMs int // minimum duration_ms
NPlusOneOnly bool // only n_plus_one entries
Start *time.Time // event time range start
End *time.Time // event time range end
Limit int
Offset int
SortAsc bool // true = oldest first
}
SearchParams defines filters for log search.
type SearchResult ¶
type SearchResult struct {
Entries []chunk.Entry
Total int // total matching (may be > len(Entries) if limited)
}
SearchResult holds search results.
type SegmentManager ¶
type SegmentManager struct {
// contains filtered or unexported fields
}
SegmentManager manages the lifecycle of sealed segments and the active WAL.
func NewSegmentManager ¶
func NewSegmentManager(dataDir string) (*SegmentManager, error)
NewSegmentManager scans the data directory and loads all sealed segments.
func (*SegmentManager) AllSegments ¶
func (sm *SegmentManager) AllSegments() []*LoadedSegment
AllSegments returns all loaded segments (for operations that need all data).
func (*SegmentManager) FindSegmentByID ¶
func (sm *SegmentManager) FindSegmentByID(id int64) *LoadedSegment
FindSegmentByID locates which segment contains a given composite ID.
func (*SegmentManager) Prune ¶
func (sm *SegmentManager) Prune(retention time.Duration) (int, error)
Prune deletes segments older than the given duration. Returns the number of segments deleted.
func (*SegmentManager) Register ¶
func (sm *SegmentManager) Register(hour int64, meta *SegmentMeta)
Register adds a newly sealed segment to the manager.
func (*SegmentManager) SegmentCount ¶
func (sm *SegmentManager) SegmentCount() int
SegmentCount returns the number of sealed segments.
func (*SegmentManager) SegmentsInRange ¶
func (sm *SegmentManager) SegmentsInRange(start, end time.Time) []*LoadedSegment
SegmentsInRange returns sealed segments that may contain data for the given time range. Includes ±1 hour buffer for clock skew (received_at vs ts).
type SegmentMeta ¶
type SegmentMeta struct {
Segment string `json:"segment"`
Chunks []ChunkMeta `json:"chunks"`
EntryCount int `json:"entry_count"`
IDRange [2]int64 `json:"id_range"`
TimeRange [2]string `json:"time_range"`
Counts MetaCounts `json:"counts"`
Histogram map[string]HBucket `json:"histogram"`
Dicts map[string][]string `json:"dictionaries"`
}
SegmentMeta holds pre-computed statistics for a sealed segment.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is the main log store engine. It implements the core query operations on top of the WAL writer, segment manager, and ingest pipeline.
func NewStore ¶
func NewStore(dataDir string, samplingRules []ingest.SamplingRule, piiConfig ingest.PIIConfig) (*Store, error)
NewStore creates and initializes the segmented log store.
func (*Store) CountByLevel ¶
func (s *Store) CountByLevel(start, end time.Time, service, environment string) (map[string]int, error)
CountByLevel returns log counts grouped by level for the given time range. CountByLevel returns log counts grouped by level for the given time range.
When environment is set, the WAL path filters by env. Sealed segments use pre-computed counts that aren't broken down by env yet — for segments, the env parameter is ignored, so multi-env deployments may see slight over-counts on historical data. Single-env deployments (the common case) are unaffected since all segment rows share the same env.
func (*Store) CountByService ¶
CountByService returns log counts grouped by service for the given time range. See CountByLevel for notes on segment-level env filtering.
func (*Store) DistinctValues ¶
DistinctValues returns unique values for a given column name.
func (*Store) GetBody ¶
func (s *Store) GetBody(id int64) (json.RawMessage, error)
GetBody retrieves just the body blob for an entry.
func (*Store) Ingest ¶
Ingest processes and stores a batch of log entries. Runs the full pipeline: sampling → PII scrub → error extraction → log expansion → WAL append.
func (*Store) SealCurrentHour ¶
SealCurrentHour triggers sealing of the current hour's WAL.
func (*Store) Search ¶
func (s *Store) Search(params SearchParams) (*SearchResult, error)
Search finds log entries matching the given parameters.
type WALWriter ¶
type WALWriter struct {
// contains filtered or unexported fields
}
WALWriter handles appending entries to the active WAL file. Thread-safe: multiple goroutines can call Append concurrently.
func NewWALWriter ¶
func NewWALWriter(dataDir string, ring *RingBuffer) (*WALWriter, error)
NewWALWriter creates a new WAL writer. dataDir is the base directory (e.g., "data/logs"). It opens or creates the WAL file for the current hour.
func (*WALWriter) Append ¶
Append writes a batch of entries to the WAL, assigns IDs, and notifies tail subscribers. Returns the assigned entries (with IDs populated).
func (*WALWriter) EntryCount ¶
EntryCount returns the number of entries in the current WAL.
func (*WALWriter) Rotate ¶
Rotate forces a WAL rotation (used by seal trigger). Returns the path to the old WAL file (now named sealing_T*.wal) and the segment hour it belongs to.
func (*WALWriter) SegmentHour ¶
SegmentHour returns the current segment hour.
func (*WALWriter) ValidBytes ¶
ValidBytes returns the number of safely fsynced bytes (safe for readers).