Documentation
¶
Overview ¶
Package filereader provides generic file reading for structured data formats with composable readers, merge-sort capabilities, and pluggable data transformation.
Overview ¶
The filereader package provides streaming row-by-row access to various telemetry file formats. All readers implement a common interface and can be composed together for complex data processing patterns like merge-sort operations across multiple files. Data transformation is handled by separate translator components for maximum flexibility.
Core Interfaces ¶
All file format readers return raw data without transformation:
type Row map[string]any type Batch struct { Rows []Row } type Reader interface { Next() (*Batch, error) // Returns next batch or io.EOF when exhausted Close() error TotalRowsReturned() int64 // Total rows successfully returned so far } type RowTranslator interface { TranslateRow(row *Row) error // Transforms row in-place }
Use translators for data processing and TranslatingReader for composition.
Format Readers ¶
All format readers return raw, untransformed data from files:
- ParquetRawReader: Generic Parquet files using parquet-go/parquet-go (requires io.ReaderAt)
- JSONLinesReader: Streams JSON objects line-by-line from any io.ReadCloser
- IngestProtoLogsReader: Raw OTEL log records from protobuf
- IngestProtoMetricsReader: Raw OTEL metric data points from protobuf (ingestion only)
- ProtoTracesReader: Raw OTEL span data from protobuf
Example usage:
// For compressed JSON, pass in a gzip reader: gzReader, err := gzip.NewReader(file) if err != nil { return err } reader, err := NewJSONLinesReader(gzReader, 1000) if err != nil { return err } defer reader.Close() for { batch, err := reader.Next(ctx) if err != nil { if errors.Is(err, io.EOF) { break } return err } if batch != nil { // process raw row data in batch.Rows for _, row := range batch.Rows { // process each row } } }
Data Translation ¶
Use translators to transform raw data:
// Create a simple translator that adds tags translator := NewTagsTranslator(map[string]string{ "source": "myapp", "env": "prod", }) // Wrap any reader with translation translatingReader := NewTranslatingReader(rawReader, translator) // Chain multiple translators chain := NewChainTranslator( NewTagsTranslator(someTags), customTranslator, // Implement your own RowTranslator ) reader := NewTranslatingReader(rawReader, chain)
Built-in translators:
- NoopTranslator: Pass-through (no transformation)
- TagsTranslator: Adds static tags to rows
- ChainTranslator: Applies multiple translators in sequence
Implement custom translators by satisfying the RowTranslator interface.
Composite Readers ¶
Sorting Readers ¶
Choose the appropriate sorting reader based on dataset size and memory constraints:
MemorySortingReader - For smaller datasets (high memory usage, no disk I/O):
reader := NewMemorySortingReader(rawReader, &NonPooledMetricSortKeyProvider{})
DiskSortingReader - For larger datasets (moderate memory usage, 2x disk I/O):
reader := NewDiskSortingReader(rawReader, &TimestampSortKeyProvider{})
MergesortReader - For merging multiple already-sorted sources (low memory, streaming):
keyProvider := NewTimeOrderedSortKeyProvider("timestamp") reader := NewMergesortReader([]Reader{r1, r2, r3}, keyProvider)
SequentialReader - Sequential processing (no sorting):
reader := NewSequentialReader([]Reader{r1, r2, r3})
Usage Patterns ¶
Time-ordered merge sort across multiple files:
readers := []Reader{ NewParquetRawReader(file1, size1), NewParquetRawReader(file2, size2), NewJSONLinesReader(file3), } keyProvider := NewTimeOrderedSortKeyProvider("_cardinalhq.timestamp") ordered := NewMergesortReader(readers, keyProvider) defer ordered.Close() for { batch, err := ordered.Next(ctx) if err != nil { if errors.Is(err, io.EOF) { break } return err } if batch != nil { // rows arrive in timestamp order across all files for _, row := range batch.Rows { // process each row } } }
Composable reader trees:
// Process multiple file groups in timestamp order, // then combine groups sequentially keyProvider := NewTimeOrderedSortKeyProvider("timestamp") group1 := NewMergesortReader(readers1, keyProvider) group2 := NewMergesortReader(readers2, keyProvider) final := NewSequentialReader([]Reader{group1, group2})
Memory Management & Batch Ownership ¶
The filereader package implements efficient memory management through batch ownership:
**Batch Ownership**: Readers own the returned Batch and its Row maps. Callers must NOT retain references to batches beyond the next Next() call.
**Memory Safety**: Use pipeline.CopyBatch() if you need to retain batch data:
for { batch, err := reader.Next(ctx) if err != nil { if errors.Is(err, io.EOF) { break } return err } if batch != nil { // Use data immediately or copy if retention needed safeBatch := pipeline.CopyBatch(batch) // For retention // Process batch.Rows directly for immediate use } }
**Data Safety**: Readers maintain clean batch states and handle EOF correctly. Batches must not be accessed after the next Next() call.
**Error Handling**: Next() returns nil batch on errors. Check error before accessing batch.
Resource Management ¶
- All readers must be closed via Close()
- Parquet readers use random access (io.ReaderAt) - no buffering
- Streaming readers (JSON, Proto) process incrementally
- Composite readers automatically close child readers
Package filereader provides a generic interface for reading rows from various file formats. Callers construct readers directly and compose them as needed for their specific use cases.
Index ¶
- Variables
- type AggregatingMetricsReader
- type ArrowRawReader
- type Batch
- type CSVLogTranslator
- type CSVReader
- type ChainTranslator
- type CookedLogTranslatingReader
- type CookedMetricTranslatingReader
- type CookedTraceTranslatingReader
- type DiskSortingReader
- type HistogramNoCountsError
- type IngestProtoLogsReader
- type IngestProtoMetricsReader
- type JSONLinesReader
- type MemorySortingReader
- type MergesortReader
- type MetricFilteringReader
- type MetricSortKey
- type MetricSortKeyProvider
- type NonPooledMetricSortKey
- type NonPooledMetricSortKeyProvider
- type NoopTranslator
- type OTELLogsProvider
- type OTELMetricsProvider
- type ParquetRawReader
- type ProtoBinLogTranslator
- type ProtoTracesReader
- type Reader
- func NewCookedLogParquetReader(r io.ReaderAt, size int64, batchSize int) (Reader, error)
- func NewCookedMetricParquetReader(r io.ReaderAt, size int64, batchSize int) (Reader, error)
- func NewCookedTraceParquetReader(r io.ReaderAt, size int64, batchSize int) (Reader, error)
- func ReaderForFile(filename string, signalType SignalType, orgId string, ...) (Reader, error)
- func ReaderForFileWithOptions(filename string, opts ReaderOptions) (Reader, error)
- func ReaderForMetricAggregation(filename, orgId string, aggregationPeriodMs int64) (Reader, error)
- func WrapReaderForAggregation(reader Reader, opts ReaderOptions) (Reader, error)
- type ReaderOptions
- type Row
- type RowIndex
- type RowTranslator
- type SequentialReader
- func (sr *SequentialReader) Close() error
- func (sr *SequentialReader) CurrentReaderIndex() int
- func (sr *SequentialReader) Next(ctx context.Context) (*Batch, error)
- func (sr *SequentialReader) RemainingReaderCount() int
- func (sr *SequentialReader) TotalReaderCount() int
- func (sr *SequentialReader) TotalRowsReturned() int64
- type SignalType
- type SortKey
- type SortKeyProvider
- type TagsTranslator
- type TimeOrderedSortKey
- type TimeOrderedSortKeyProvider
- type TimestampSortKey
- type TimestampSortKeyProvider
- type TranslatingReader
Constants ¶
This section is empty.
Variables ¶
var ErrHistogramNoCounts = HistogramNoCountsError{}
ErrHistogramNoCounts is a sentinel error for histogram datapoints with no bucket counts.
Functions ¶
This section is empty.
Types ¶
type AggregatingMetricsReader ¶ added in v1.3.3
type AggregatingMetricsReader struct {
// contains filtered or unexported fields
}
AggregatingMetricsReader wraps a sorted Reader to perform streaming aggregation of metrics. It aggregates rows with the same [metric_name, tid, truncated_timestamp] key. The underlying reader must return rows in sorted order by this key.
func NewAggregatingMetricsReader ¶ added in v1.3.3
func NewAggregatingMetricsReader(reader Reader, aggregationPeriodMs int64, batchSize int) (*AggregatingMetricsReader, error)
NewAggregatingMetricsReader creates a new AggregatingMetricsReader that aggregates metrics with the same [metric_name, tid, truncated_timestamp] key.
aggregationPeriodMs: period in milliseconds for timestamp truncation (e.g., 10000 for 10s) reader: underlying reader that returns rows in sorted order by [metric_name, tid, timestamp]
func (*AggregatingMetricsReader) Close ¶ added in v1.3.3
func (ar *AggregatingMetricsReader) Close() error
Close closes the reader and the underlying reader.
func (*AggregatingMetricsReader) GetOTELMetrics ¶ added in v1.3.3
func (ar *AggregatingMetricsReader) GetOTELMetrics() (any, error)
GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.
func (*AggregatingMetricsReader) Next ¶ added in v1.3.3
func (ar *AggregatingMetricsReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of aggregated rows.
func (*AggregatingMetricsReader) TotalRowsReturned ¶ added in v1.3.3
func (ar *AggregatingMetricsReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of aggregated rows returned via Next().
type ArrowRawReader ¶ added in v1.4.1
type ArrowRawReader struct {
// contains filtered or unexported fields
}
ArrowRawReader reads parquet files using Apache Arrow and returns raw rows. This reader provides raw parquet data without any opinionated transformations. It handles NULL-type columns gracefully, unlike the parquet-go library.
func NewArrowRawReader ¶ added in v1.4.1
func NewArrowRawReader(ctx context.Context, reader parquet.ReaderAtSeeker, batchSize int) (*ArrowRawReader, error)
NewArrowRawReader creates an ArrowRawReader for the given parquet.ReaderAtSeeker.
func (*ArrowRawReader) Close ¶ added in v1.4.1
func (r *ArrowRawReader) Close() error
Close releases resources associated with the reader.
func (*ArrowRawReader) GetSchema ¶ added in v1.4.1
func (r *ArrowRawReader) GetSchema() (*arrow.Schema, error)
GetSchema returns the schema of the parquet file.
func (*ArrowRawReader) Next ¶ added in v1.4.1
func (r *ArrowRawReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the parquet file.
func (*ArrowRawReader) TotalRowsReturned ¶ added in v1.4.1
func (r *ArrowRawReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows successfully returned.
type Batch ¶ added in v1.3.3
Batch represents a collection of rows with clear ownership semantics. The batch is owned by the reader that returns it.
type CSVLogTranslator ¶ added in v1.4.3
type CSVLogTranslator struct {
// contains filtered or unexported fields
}
CSVLogTranslator handles translation for CSV log files
func NewCSVLogTranslator ¶ added in v1.4.3
func NewCSVLogTranslator(opts ReaderOptions) *CSVLogTranslator
NewCSVLogTranslator creates a new CSV log translator
func (*CSVLogTranslator) TranslateRow ¶ added in v1.4.3
func (t *CSVLogTranslator) TranslateRow(ctx context.Context, row *Row) error
TranslateRow handles CSV-specific field translation for logs
type CSVReader ¶ added in v1.4.3
type CSVReader struct {
// contains filtered or unexported fields
}
CSVReader reads rows from a CSV stream using pipeline semantics.
func NewCSVReader ¶ added in v1.4.3
func NewCSVReader(reader io.ReadCloser, batchSize int) (*CSVReader, error)
NewCSVReader creates a new CSVReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called.
func (*CSVReader) TotalRowsReturned ¶ added in v1.4.3
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type ChainTranslator ¶
type ChainTranslator struct {
// contains filtered or unexported fields
}
ChainTranslator applies multiple translators in sequence.
func NewChainTranslator ¶
func NewChainTranslator(translators ...RowTranslator) *ChainTranslator
NewChainTranslator creates a translator that applies multiple translators in order.
func (*ChainTranslator) TranslateRow ¶
func (ct *ChainTranslator) TranslateRow(ctx context.Context, row *Row) error
TranslateRow applies all translators in sequence to the row.
type CookedLogTranslatingReader ¶ added in v1.3.3
type CookedLogTranslatingReader struct {
// contains filtered or unexported fields
}
CookedLogTranslatingReader wraps another reader and applies log-specific transformations to the data, such as type conversions, validation, and filtering. This reader takes ownership of rows it wants to keep using efficient buffer operations.
func NewCookedLogTranslatingReader ¶ added in v1.3.3
func NewCookedLogTranslatingReader(wrapped Reader) *CookedLogTranslatingReader
NewCookedLogTranslatingReader creates a new reader that applies log-specific transformations.
func (*CookedLogTranslatingReader) Close ¶ added in v1.3.3
func (r *CookedLogTranslatingReader) Close() error
Close closes the reader and its wrapped reader.
func (*CookedLogTranslatingReader) Next ¶ added in v1.3.3
func (r *CookedLogTranslatingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of transformed log data.
func (*CookedLogTranslatingReader) TotalRowsReturned ¶ added in v1.3.3
func (r *CookedLogTranslatingReader) TotalRowsReturned() int64
TotalRowsReturned delegates to the wrapped reader.
type CookedMetricTranslatingReader ¶ added in v1.3.3
type CookedMetricTranslatingReader struct {
// contains filtered or unexported fields
}
CookedMetricTranslatingReader wraps another reader and applies metric-specific transformations to the data, such as type conversions, validation, and filtering. This reader takes ownership of rows it wants to keep using efficient buffer operations.
func NewCookedMetricTranslatingReader ¶ added in v1.3.3
func NewCookedMetricTranslatingReader(wrapped Reader) *CookedMetricTranslatingReader
NewCookedMetricTranslatingReader creates a new reader that applies metric-specific transformations.
func (*CookedMetricTranslatingReader) Close ¶ added in v1.3.3
func (r *CookedMetricTranslatingReader) Close() error
Close closes the reader and its wrapped reader.
func (*CookedMetricTranslatingReader) Next ¶ added in v1.3.3
func (r *CookedMetricTranslatingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of transformed metric data.
func (*CookedMetricTranslatingReader) TotalRowsReturned ¶ added in v1.3.3
func (r *CookedMetricTranslatingReader) TotalRowsReturned() int64
TotalRowsReturned delegates to the wrapped reader.
type CookedTraceTranslatingReader ¶ added in v1.3.5
type CookedTraceTranslatingReader struct {
// contains filtered or unexported fields
}
CookedTraceTranslatingReader wraps another reader and applies trace-specific transformations to the data, such as type conversions, validation, and filtering. This reader takes ownership of rows it wants to keep using efficient buffer operations.
func NewCookedTraceTranslatingReader ¶ added in v1.3.5
func NewCookedTraceTranslatingReader(wrapped Reader) *CookedTraceTranslatingReader
NewCookedTraceTranslatingReader creates a new reader that applies trace-specific transformations.
func (*CookedTraceTranslatingReader) Close ¶ added in v1.3.5
func (r *CookedTraceTranslatingReader) Close() error
Close closes the reader and its wrapped reader.
func (*CookedTraceTranslatingReader) Next ¶ added in v1.3.5
func (r *CookedTraceTranslatingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of transformed trace data.
func (*CookedTraceTranslatingReader) TotalRowsReturned ¶ added in v1.3.5
func (r *CookedTraceTranslatingReader) TotalRowsReturned() int64
TotalRowsReturned delegates to the wrapped reader.
type DiskSortingReader ¶ added in v1.3.3
type DiskSortingReader struct {
// contains filtered or unexported fields
}
Memory Impact: LOW-MODERATE - Only stores extracted sort keys in memory plus file offsets.
Much more memory-efficient than MemorySortingReader for large datasets.
Disk I/O: 2x data size - Each row written once to temp binary file, then read once during output Stability: Records are only guaranteed to be sorted by the sort function;
if the sort function is not stable, the result will not be stable
func NewDiskSortingReader ¶ added in v1.3.3
func NewDiskSortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*DiskSortingReader, error)
NewDiskSortingReader creates a reader that uses disk-based sorting with custom binary encoding.
Use this for large datasets that may not fit in memory. The temp file is automatically cleaned up when the reader is closed. Custom binary encoding provides efficient storage and serialization for the temporary data with no reflection overhead.
The keyProvider creates sort keys from rows to minimize memory usage during sorting.
func (*DiskSortingReader) Close ¶ added in v1.3.3
func (r *DiskSortingReader) Close() error
Close closes the reader and cleans up temp file.
func (*DiskSortingReader) GetOTELMetrics ¶ added in v1.3.3
func (r *DiskSortingReader) GetOTELMetrics() (any, error)
GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.
func (*DiskSortingReader) Next ¶ added in v1.3.3
func (r *DiskSortingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of sorted rows by reading from the temp file in index order.
func (*DiskSortingReader) TotalRowsReturned ¶ added in v1.3.3
func (r *DiskSortingReader) TotalRowsReturned() int64
TotalRowsReturned returns the number of rows that have been returned via Next().
type HistogramNoCountsError ¶ added in v1.3.3
type HistogramNoCountsError struct{}
HistogramNoCountsError represents an error when a histogram datapoint has no bucket counts. This is a recoverable error that indicates the datapoint should be dropped.
func (HistogramNoCountsError) Error ¶ added in v1.3.3
func (e HistogramNoCountsError) Error() string
type IngestProtoLogsReader ¶ added in v1.3.3
type IngestProtoLogsReader struct {
// contains filtered or unexported fields
}
IngestProtoLogsReader reads rows from OpenTelemetry protobuf logs format.
Implements OTELLogsProvider interface.
func NewIngestProtoLogsReader ¶ added in v1.3.3
func NewIngestProtoLogsReader(reader io.Reader, opts ReaderOptions) (*IngestProtoLogsReader, error)
NewIngestProtoLogsReader creates a new IngestProtoLogsReader for the given io.Reader.
func (*IngestProtoLogsReader) Close ¶ added in v1.3.3
func (r *IngestProtoLogsReader) Close() error
Close closes the reader and releases resources.
func (*IngestProtoLogsReader) GetOTELLogs ¶ added in v1.3.3
func (r *IngestProtoLogsReader) GetOTELLogs() (*plog.Logs, error)
GetOTELLogs returns the underlying parsed OTEL logs structure. This allows access to the original log body and metadata not available in the row format.
func (*IngestProtoLogsReader) Next ¶ added in v1.3.3
func (r *IngestProtoLogsReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the OTEL logs.
func (*IngestProtoLogsReader) TotalRowsReturned ¶ added in v1.3.3
func (r *IngestProtoLogsReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type IngestProtoMetricsReader ¶ added in v1.3.3
type IngestProtoMetricsReader struct {
// contains filtered or unexported fields
}
IngestProtoMetricsReader reads rows from OpenTelemetry protobuf metrics format for ingestion. This reader is specifically designed for metric ingestion and should not be used for compaction or rollup operations.
Implements OTELMetricsProvider interface.
func NewIngestProtoMetricsReader ¶ added in v1.3.3
func NewIngestProtoMetricsReader(reader io.Reader, opts ReaderOptions) (*IngestProtoMetricsReader, error)
NewIngestProtoMetricsReader creates a new IngestProtoMetricsReader for the given io.Reader.
func NewIngestProtoMetricsReaderFromMetrics ¶ added in v1.3.3
func NewIngestProtoMetricsReaderFromMetrics(metrics *pmetric.Metrics, opts ReaderOptions) (*IngestProtoMetricsReader, error)
NewIngestProtoMetricsReaderFromMetrics creates a new IngestProtoMetricsReader from pre-parsed OTEL metrics. This is useful when you need to access the raw OTEL structure for processing (e.g., exemplars) while also reading rows from the same data.
func (*IngestProtoMetricsReader) Close ¶ added in v1.3.3
func (r *IngestProtoMetricsReader) Close() error
Close closes the reader and releases resources.
func (*IngestProtoMetricsReader) GetOTELMetrics ¶ added in v1.3.3
func (r *IngestProtoMetricsReader) GetOTELMetrics() (*pmetric.Metrics, error)
GetOTELMetrics implements the OTELMetricsProvider interface. Returns the underlying pmetric.Metrics structure for exemplar processing.
func (*IngestProtoMetricsReader) Next ¶ added in v1.3.3
func (r *IngestProtoMetricsReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the OTEL metrics.
func (*IngestProtoMetricsReader) TotalRowsReturned ¶ added in v1.3.3
func (r *IngestProtoMetricsReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type JSONLinesReader ¶
type JSONLinesReader struct {
// contains filtered or unexported fields
}
JSONLinesReader reads rows from a JSON lines stream using pipeline semantics.
func NewJSONLinesReader ¶
func NewJSONLinesReader(reader io.ReadCloser, batchSize int) (*JSONLinesReader, error)
NewJSONLinesReader creates a new JSONLinesReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called.
func (*JSONLinesReader) Close ¶
func (r *JSONLinesReader) Close() error
Close closes the reader and the underlying io.ReadCloser.
func (*JSONLinesReader) Next ¶ added in v1.3.3
func (r *JSONLinesReader) Next(ctx context.Context) (*Batch, error)
func (*JSONLinesReader) TotalRowsReturned ¶ added in v1.3.3
func (r *JSONLinesReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type MemorySortingReader ¶ added in v1.3.3
type MemorySortingReader struct {
// contains filtered or unexported fields
}
MemorySortingReader reads all rows from an underlying reader, then sorts them using a custom sort function and returns them in order. This is useful when you need sorted output with flexible sorting criteria.
Memory Impact: HIGH - All rows are loaded into memory at once Disk I/O: None (pure in-memory operations) Stability: Records are only guaranteed to be sorted by the sort function;
if the sort function is not stable, the result will not be stable
func NewMemorySortingReader ¶ added in v1.3.3
func NewMemorySortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*MemorySortingReader, error)
NewMemorySortingReader creates a reader that buffers all rows, sorts them using the provided key provider, then returns them in order.
Use this for smaller datasets that fit comfortably in memory. For large datasets, consider DiskSortingReader to avoid OOM issues.
func (*MemorySortingReader) Close ¶ added in v1.3.3
func (r *MemorySortingReader) Close() error
Close closes the reader and underlying reader.
func (*MemorySortingReader) GetOTELMetrics ¶ added in v1.3.3
func (r *MemorySortingReader) GetOTELMetrics() (any, error)
GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.
func (*MemorySortingReader) Next ¶ added in v1.3.3
func (r *MemorySortingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of sorted rows from the buffer.
func (*MemorySortingReader) TotalRowsReturned ¶ added in v1.3.3
func (r *MemorySortingReader) TotalRowsReturned() int64
TotalRowsReturned returns the number of rows that have been returned via Next().
type MergesortReader ¶ added in v1.3.3
type MergesortReader struct {
// contains filtered or unexported fields
}
MergesortReader implements merge-sort style reading across multiple pre-sorted readers. It assumes each individual reader returns rows in sorted order according to the provided SortKeyProvider.
func NewMergesortReader ¶ added in v1.3.3
func NewMergesortReader(ctx context.Context, readers []Reader, keyProvider SortKeyProvider, batchSize int) (*MergesortReader, error)
NewMergesortReader creates a new MergesortReader that merges rows from multiple readers in sorted order using the new algorithm with active reader management.
func (*MergesortReader) ActiveReaderCount ¶ added in v1.3.3
func (or *MergesortReader) ActiveReaderCount() int
ActiveReaderCount returns the number of readers that still have data available.
func (*MergesortReader) Close ¶ added in v1.3.3
func (or *MergesortReader) Close() error
Close closes all underlying readers and releases resources.
func (*MergesortReader) Next ¶ added in v1.3.3
func (or *MergesortReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows in sorted order across all readers.
func (*MergesortReader) TotalRowsReturned ¶ added in v1.3.3
func (or *MergesortReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows returned by this reader.
type MetricFilteringReader ¶ added in v1.3.6
type MetricFilteringReader struct {
// contains filtered or unexported fields
}
MetricFilteringReader wraps another reader and filters rows by metric name. It assumes the underlying reader provides data sorted by metric name. Once it sees the target metric, it returns those rows until the metric changes.
func NewMetricFilteringReader ¶ added in v1.3.6
func NewMetricFilteringReader(source Reader, metricName string) *MetricFilteringReader
NewMetricFilteringReader creates a reader that filters for a specific metric name. It starts a goroutine to read ahead from the source.
func (*MetricFilteringReader) Close ¶ added in v1.3.6
func (r *MetricFilteringReader) Close() error
Close releases resources
func (*MetricFilteringReader) Next ¶ added in v1.3.6
func (r *MetricFilteringReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of filtered rows
func (*MetricFilteringReader) TotalRowsReturned ¶ added in v1.3.6
func (r *MetricFilteringReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows returned
type MetricSortKey ¶ added in v1.3.3
type MetricSortKey struct { Name string Tid int64 Timestamp int64 NameOk bool TidOk bool TsOk bool }
MetricSortKey represents the sort key for metrics: [name, tid, timestamp]
func MakeMetricSortKey ¶ added in v1.3.3
func MakeMetricSortKey(row Row) *MetricSortKey
MakeMetricSortKey creates a pooled MetricSortKey from a row
func (*MetricSortKey) Compare ¶ added in v1.3.3
func (k *MetricSortKey) Compare(other SortKey) int
Compare implements SortKey interface for MetricSortKey
func (*MetricSortKey) Release ¶ added in v1.3.3
func (k *MetricSortKey) Release()
Release returns the MetricSortKey to the pool for reuse
type MetricSortKeyProvider ¶ added in v1.3.3
type MetricSortKeyProvider struct{}
MetricSortKeyProvider creates MetricSortKey instances from rows
func (*MetricSortKeyProvider) MakeKey ¶ added in v1.3.3
func (p *MetricSortKeyProvider) MakeKey(row Row) SortKey
MakeKey implements SortKeyProvider interface for metrics
type NonPooledMetricSortKey ¶ added in v1.3.5
type NonPooledMetricSortKey struct { Name string Tid int64 Timestamp int64 NameOk bool TidOk bool TsOk bool }
NonPooledMetricSortKey is like MetricSortKey but doesn't use object pools This is safer for long-lived keys that are retained during sorting operations
func (*NonPooledMetricSortKey) Compare ¶ added in v1.3.5
func (k *NonPooledMetricSortKey) Compare(other SortKey) int
Compare implements SortKey interface for NonPooledMetricSortKey
func (*NonPooledMetricSortKey) Release ¶ added in v1.3.5
func (k *NonPooledMetricSortKey) Release()
Release is a no-op for non-pooled keys
type NonPooledMetricSortKeyProvider ¶ added in v1.3.5
type NonPooledMetricSortKeyProvider struct{}
NonPooledMetricSortKeyProvider creates NonPooledMetricSortKey instances This is safer for long-lived keys that are retained during sorting operations
func (*NonPooledMetricSortKeyProvider) MakeKey ¶ added in v1.3.5
func (p *NonPooledMetricSortKeyProvider) MakeKey(row Row) SortKey
MakeKey implements SortKeyProvider interface for metrics without using pools
type NoopTranslator ¶
type NoopTranslator struct{}
NoopTranslator returns rows unchanged for high performance. This is a code example of the most efficient translator implementation.
func NewNoopTranslator ¶
func NewNoopTranslator() *NoopTranslator
NewNoopTranslator creates a translator that passes rows through unchanged.
func (*NoopTranslator) TranslateRow ¶
func (nt *NoopTranslator) TranslateRow(ctx context.Context, row *Row) error
TranslateRow does nothing for maximum performance.
type OTELLogsProvider ¶ added in v1.3.3
type OTELLogsProvider interface { // GetOTELLogs returns the underlying parsed OTEL logs structure. // This allows access to the original log body and metadata not available in the row format. GetOTELLogs() (*plog.Logs, error) }
OTELLogsProvider provides access to the underlying OpenTelemetry logs structure. This is used when the original OTEL structure is needed for processing (e.g., exemplars).
type OTELMetricsProvider ¶ added in v1.3.3
type OTELMetricsProvider interface { // GetOTELMetrics returns the underlying parsed OTEL metrics structure. // This allows access to exemplars and other metadata not available in the row format. GetOTELMetrics() (*pmetric.Metrics, error) }
OTELMetricsProvider provides access to the underlying OpenTelemetry metrics structure. This is used when the original OTEL structure is needed for processing (e.g., exemplars).
type ParquetRawReader ¶ added in v1.3.3
type ParquetRawReader struct {
// contains filtered or unexported fields
}
ParquetRawReader reads rows from a generic Parquet stream. This reader provides raw parquet data without any opinionated transformations. Use wrapper readers like CookedMetricTranslatingReader for domain-specific logic.
func NewParquetRawReader ¶ added in v1.3.3
NewParquetRawReader creates a new ParquetRawReader for the given io.ReaderAt.
func (*ParquetRawReader) Close ¶ added in v1.3.3
func (r *ParquetRawReader) Close() error
Close closes the reader and releases resources.
func (*ParquetRawReader) Next ¶ added in v1.3.3
func (r *ParquetRawReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the parquet file.
func (*ParquetRawReader) TotalRowsReturned ¶ added in v1.3.3
func (r *ParquetRawReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type ProtoBinLogTranslator ¶ added in v1.3.3
type ProtoBinLogTranslator struct {
// contains filtered or unexported fields
}
ProtoBinLogTranslator handles translation for protobuf binary log files
func NewProtoBinLogTranslator ¶ added in v1.3.3
func NewProtoBinLogTranslator(opts ReaderOptions) *ProtoBinLogTranslator
NewProtoBinLogTranslator creates a new protobuf log translator
func (*ProtoBinLogTranslator) TranslateRow ¶ added in v1.3.3
func (t *ProtoBinLogTranslator) TranslateRow(ctx context.Context, row *Row) error
TranslateRow handles protobuf-specific field translation
type ProtoTracesReader ¶
type ProtoTracesReader struct {
// contains filtered or unexported fields
}
ProtoTracesReader reads rows from OpenTelemetry protobuf traces format. Returns raw OTEL trace data without signal-specific transformations.
func NewIngestProtoTracesReader ¶ added in v1.4.1
func NewIngestProtoTracesReader(reader io.Reader, opts ReaderOptions) (*ProtoTracesReader, error)
NewIngestProtoTracesReader creates a new ProtoTracesReader for ingestion with exemplar processing.
func NewProtoTracesReader ¶
func NewProtoTracesReader(reader io.Reader, batchSize int) (*ProtoTracesReader, error)
NewProtoTracesReader creates a new ProtoTracesReader for the given io.Reader. The caller is responsible for closing the underlying reader.
func (*ProtoTracesReader) Close ¶
func (r *ProtoTracesReader) Close() error
Close closes the reader and releases resources.
func (*ProtoTracesReader) Next ¶ added in v1.3.3
func (r *ProtoTracesReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the OTEL traces.
func (*ProtoTracesReader) TotalRowsReturned ¶ added in v1.3.3
func (r *ProtoTracesReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type Reader ¶
type Reader interface { // Next returns the next batch of rows, or io.EOF when exhausted. // The returned batch is owned by the reader and must not be retained // beyond the next Next() call. Use pipeline.CopyBatch() if you need to retain. // The context can be used for cancellation, deadlines, and updating metrics. Next(ctx context.Context) (*Batch, error) // Close releases any resources held by the reader. Close() error // TotalRowsReturned returns the total number of rows that have been successfully // returned via Next() calls from this reader so far. TotalRowsReturned() int64 }
Reader is the core interface for reading rows from any file format using pipeline semantics. This eliminates memory ownership issues by establishing clear ownership: batches are owned by the reader and must not be retained beyond the next Next() call.
func NewCookedLogParquetReader ¶ added in v1.3.3
NewCookedLogParquetReader creates a reader for cooked log parquet files. It wraps a ParquetRawReader with CookedLogTranslatingReader to apply log-specific filtering and transformations.
func NewCookedMetricParquetReader ¶ added in v1.3.3
NewCookedMetricParquetReader creates a reader for cooked metric parquet files. It wraps a ParquetRawReader with CookedMetricTranslatingReader to apply metric-specific filtering and transformations.
func NewCookedTraceParquetReader ¶ added in v1.3.5
NewCookedTraceParquetReader creates a reader for cooked trace parquet files. It wraps a ParquetRawReader with CookedTraceTranslatingReader to apply trace-specific filtering and transformations.
func ReaderForFile ¶ added in v1.3.3
func ReaderForFile(filename string, signalType SignalType, orgId string, exemplarProcessor *exemplars.Processor) (Reader, error)
ReaderForFile creates a Reader for the given file based on its extension and signal type. This is a convenience function that uses default options.
func ReaderForFileWithOptions ¶ added in v1.3.3
func ReaderForFileWithOptions(filename string, opts ReaderOptions) (Reader, error)
ReaderForFileWithOptions creates a Reader for the given file with the provided options. Supported file formats:
- .parquet: Creates a ParquetRawReader (works for all signal types)
- .json.gz: Creates a JSONLinesReader with gzip decompression (works for all signal types)
- .json: Creates a JSONLinesReader (works for all signal types)
- .csv: Creates a CSVReader (works for all signal types)
- .csv.gz: Creates a CSVReader with gzip decompression (works for all signal types)
- .binpb: Creates a signal-specific proto reader (NewIngestProtoLogsReader, NewIngestProtoMetricsReader, or NewProtoTracesReader)
- .binpb.gz: Creates a signal-specific proto reader with gzip decompression
func ReaderForMetricAggregation ¶ added in v1.3.3
ReaderForMetricAggregation creates a Reader for metrics with aggregation enabled.
func WrapReaderForAggregation ¶ added in v1.3.3
func WrapReaderForAggregation(reader Reader, opts ReaderOptions) (Reader, error)
WrapReaderForAggregation wraps a reader with aggregation if enabled.
type ReaderOptions ¶ added in v1.3.3
type ReaderOptions struct { SignalType SignalType BatchSize int // Batch size for readers (default: 1000) // Translation options for protobuf logs and metrics OrgID string Bucket string ObjectID string ExemplarProcessor *exemplars.Processor // Aggregation options for metrics EnableAggregation bool // Enable streaming aggregation AggregationPeriodMs int64 // Aggregation period in milliseconds (e.g., 10000 for 10s) }
ReaderOptions provides options for creating readers.
type RowIndex ¶ added in v1.3.3
type RowIndex struct { SortKey SortKey // Extracted sort key for sorting FileOffset int64 ByteLength int32 }
RowIndex represents a lightweight pointer to a binary-encoded row in the temp file. It stores only the extracted sort key plus file location info.
type RowTranslator ¶
type RowTranslator interface { // TranslateRow transforms a row in-place by modifying the provided row pointer. TranslateRow(ctx context.Context, row *Row) error }
RowTranslator transforms rows from one format to another.
type SequentialReader ¶ added in v1.3.3
type SequentialReader struct {
// contains filtered or unexported fields
}
SequentialReader reads from multiple readers sequentially in the order provided. It reads all rows from the first reader, then all rows from the second reader, etc. This is useful when you want to concatenate multiple files without any ordering requirements.
func NewSequentialReader ¶ added in v1.3.3
func NewSequentialReader(readers []Reader, batchSize int) (*SequentialReader, error)
NewSequentialReader creates a new SequentialReader that reads from the provided readers sequentially. Readers will be closed when the SequentialReader is closed.
func (*SequentialReader) Close ¶ added in v1.3.3
func (sr *SequentialReader) Close() error
Close closes all underlying readers and releases resources.
func (*SequentialReader) CurrentReaderIndex ¶ added in v1.3.3
func (sr *SequentialReader) CurrentReaderIndex() int
CurrentReaderIndex returns the index of the reader currently being read from. Returns -1 if all readers are exhausted or the reader is closed.
func (*SequentialReader) Next ¶ added in v1.3.3
func (sr *SequentialReader) Next(ctx context.Context) (*Batch, error)
func (*SequentialReader) RemainingReaderCount ¶ added in v1.3.3
func (sr *SequentialReader) RemainingReaderCount() int
RemainingReaderCount returns the number of readers that haven't been fully processed yet.
func (*SequentialReader) TotalReaderCount ¶ added in v1.3.3
func (sr *SequentialReader) TotalReaderCount() int
TotalReaderCount returns the total number of readers in this SequentialReader.
func (*SequentialReader) TotalRowsReturned ¶ added in v1.3.3
func (sr *SequentialReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next() from all readers.
type SignalType ¶ added in v1.3.3
type SignalType int
SignalType represents the type of telemetry signal being processed.
const ( // SignalTypeLogs represents log data SignalTypeLogs SignalType = iota // SignalTypeMetrics represents metric data SignalTypeMetrics // SignalTypeTraces represents trace data SignalTypeTraces )
func (SignalType) String ¶ added in v1.3.3
func (s SignalType) String() string
String returns the string representation of the signal type.
type SortKey ¶ added in v1.3.3
type SortKey interface { // Compare returns: // -1 if this key should come before other // 0 if this key equals other // 1 if this key should come after other Compare(other SortKey) int // Release returns the key to its pool for reuse Release() }
SortKey represents a key that can be compared for sorting
type SortKeyProvider ¶ added in v1.3.3
type SortKeyProvider interface { // MakeKey creates a sort key from a row MakeKey(row Row) SortKey }
SortKeyProvider creates sort keys from rows
func GetCurrentMetricSortKeyProvider ¶ added in v1.3.5
func GetCurrentMetricSortKeyProvider() SortKeyProvider
GetCurrentMetricSortKeyProvider returns the provider for creating sort keys for the current metric sort version. This is the single source of truth for metric sorting.
type TagsTranslator ¶
type TagsTranslator struct {
// contains filtered or unexported fields
}
TagsTranslator adds static tags to every row.
func NewTagsTranslator ¶
func NewTagsTranslator(tags map[string]string) *TagsTranslator
NewTagsTranslator creates a translator that adds the given tags to each row.
func (*TagsTranslator) TranslateRow ¶
func (tt *TagsTranslator) TranslateRow(ctx context.Context, row *Row) error
TranslateRow adds tags to the row in-place.
type TimeOrderedSortKey ¶ added in v1.3.3
type TimeOrderedSortKey struct {
// contains filtered or unexported fields
}
TimeOrderedSortKey represents a sort key for timestamp-based ordering
func (*TimeOrderedSortKey) Compare ¶ added in v1.3.3
func (k *TimeOrderedSortKey) Compare(other SortKey) int
func (*TimeOrderedSortKey) Release ¶ added in v1.3.3
func (k *TimeOrderedSortKey) Release()
type TimeOrderedSortKeyProvider ¶ added in v1.3.3
type TimeOrderedSortKeyProvider struct {
// contains filtered or unexported fields
}
TimeOrderedSortKeyProvider creates sort keys based on a single timestamp field
func NewTimeOrderedSortKeyProvider ¶ added in v1.3.3
func NewTimeOrderedSortKeyProvider(fieldName string) *TimeOrderedSortKeyProvider
NewTimeOrderedSortKeyProvider creates a provider that sorts by a timestamp field
func (*TimeOrderedSortKeyProvider) MakeKey ¶ added in v1.3.3
func (p *TimeOrderedSortKeyProvider) MakeKey(row Row) SortKey
type TimestampSortKey ¶ added in v1.3.3
TimestampSortKey represents timestamp-only sort key
func MakeTimestampSortKey ¶ added in v1.3.3
func MakeTimestampSortKey(row Row) *TimestampSortKey
MakeTimestampSortKey creates a pooled TimestampSortKey from a row
func (*TimestampSortKey) Compare ¶ added in v1.3.3
func (k *TimestampSortKey) Compare(other SortKey) int
Compare implements SortKey interface for TimestampSortKey
func (*TimestampSortKey) Release ¶ added in v1.3.3
func (k *TimestampSortKey) Release()
Release returns the TimestampSortKey to the pool for reuse
type TimestampSortKeyProvider ¶ added in v1.3.3
type TimestampSortKeyProvider struct{}
TimestampSortKeyProvider creates TimestampSortKey instances from rows
func (*TimestampSortKeyProvider) MakeKey ¶ added in v1.3.3
func (p *TimestampSortKeyProvider) MakeKey(row Row) SortKey
MakeKey implements SortKeyProvider interface for timestamps
type TranslatingReader ¶
type TranslatingReader struct {
// contains filtered or unexported fields
}
TranslatingReader wraps another Reader and applies row transformations. This enables composition where any Reader can be enhanced with signal-specific translation logic without coupling file parsing to data transformation.
func NewTranslatingReader ¶
func NewTranslatingReader(reader Reader, translator RowTranslator, batchSize int) (*TranslatingReader, error)
NewTranslatingReader creates a new TranslatingReader that applies the given translator to each row returned by the underlying reader.
The TranslatingReader takes ownership of the underlying reader and will close it when Close() is called.
func (*TranslatingReader) Close ¶
func (tr *TranslatingReader) Close() error
Close closes the underlying reader and releases resources.
func (*TranslatingReader) Next ¶ added in v1.3.3
func (tr *TranslatingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of translated rows from the underlying reader.
func (*TranslatingReader) TotalRowsReturned ¶ added in v1.3.3
func (tr *TranslatingReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next() after translation by this reader.
Source Files
¶
- aggregating_metrics_reader.go
- arrow_raw_reader.go
- cooked_log_translating_reader.go
- cooked_metric_translating_reader.go
- cooked_parquet_reader_factory.go
- cooked_trace_translating_reader.go
- csv_log_translator.go
- csv_reader.go
- disk_sorting_reader.go
- doc.go
- errors.go
- ingest_proto_logs.go
- ingest_proto_metrics.go
- jsonlines.go
- memory_sorting.go
- mergesort_reader.go
- metric_filtering_reader.go
- metric_sortkey_v2.go
- otel_attributes.go
- parquet_raw_reader.go
- proto_traces.go
- protobuf_log_translator.go
- reader.go
- reader_factory.go
- sequential_reader.go
- signal_type.go
- sort_metric.go
- sort_timestamp.go
- sorting.go
- telemetry.go
- translating.go
- translators.go