filereader

package
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: AGPL-3.0 Imports: 43 Imported by: 0

Documentation

Overview

Package filereader provides generic file reading for structured data formats with composable readers, merge-sort capabilities, and pluggable data transformation.

Overview

The filereader package provides streaming row-by-row access to various telemetry file formats. All readers implement a common interface and can be composed together for complex data processing patterns like merge-sort operations across multiple files. Data transformation is handled by separate translator components for maximum flexibility.

Core Interfaces

All file format readers return raw data without transformation:

type Row map[string]any

type Batch struct {
    Rows []Row
}

type Reader interface {
    Next() (*Batch, error)        // Returns next batch or io.EOF when exhausted
    Close() error
    TotalRowsReturned() int64     // Total rows successfully returned so far
}

type RowTranslator interface {
    TranslateRow(row *Row) error  // Transforms row in-place
}

Use translators for data processing and TranslatingReader for composition.

Format Readers

All format readers return raw, untransformed data from files:

  • ParquetRawReader: Generic Parquet files using parquet-go/parquet-go (requires io.ReaderAt)
  • JSONLinesReader: Streams JSON objects line-by-line from any io.ReadCloser
  • IngestProtoLogsReader: Raw OTEL log records from protobuf
  • IngestProtoMetricsReader: Raw OTEL metric data points from protobuf (ingestion only)
  • ProtoTracesReader: Raw OTEL span data from protobuf

Example usage:

// For compressed JSON, pass in a gzip reader:
gzReader, err := gzip.NewReader(file)
if err != nil {
    return err
}

reader, err := NewJSONLinesReader(gzReader, 1000)
if err != nil {
    return err
}
defer reader.Close()

for {
    batch, err := reader.Next(ctx)
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        return err
    }
    if batch != nil {
        // process raw row data in batch.Rows
        for _, row := range batch.Rows {
            // process each row
        }
    }
}

Data Translation

Use translators to transform raw data:

// Create a simple translator that adds tags
translator := NewTagsTranslator(map[string]string{
    "source": "myapp",
    "env": "prod",
})

// Wrap any reader with translation
translatingReader := NewTranslatingReader(rawReader, translator)

// Chain multiple translators
chain := NewChainTranslator(
    NewTagsTranslator(someTags),
    customTranslator,  // Implement your own RowTranslator
)
reader := NewTranslatingReader(rawReader, chain)

Built-in translators:

  • NoopTranslator: Pass-through (no transformation)
  • TagsTranslator: Adds static tags to rows
  • ChainTranslator: Applies multiple translators in sequence

Implement custom translators by satisfying the RowTranslator interface.

Composite Readers

Sorting Readers

Choose the appropriate sorting reader based on dataset size and memory constraints:

MemorySortingReader - For smaller datasets (high memory usage, no disk I/O):

reader := NewMemorySortingReader(rawReader, &NonPooledMetricSortKeyProvider{})

DiskSortingReader - For larger datasets (moderate memory usage, 2x disk I/O):

reader := NewDiskSortingReader(rawReader, &TimestampSortKeyProvider{})

MergesortReader - For merging multiple already-sorted sources (low memory, streaming):

keyProvider := NewTimeOrderedSortKeyProvider("timestamp")
reader := NewMergesortReader([]Reader{r1, r2, r3}, keyProvider)

SequentialReader - Sequential processing (no sorting):

reader := NewSequentialReader([]Reader{r1, r2, r3})

Usage Patterns

Time-ordered merge sort across multiple files:

readers := []Reader{
    NewParquetRawReader(file1, size1),
    NewParquetRawReader(file2, size2),
    NewJSONLinesReader(file3),
}

keyProvider := NewTimeOrderedSortKeyProvider("_cardinalhq.timestamp")
ordered := NewMergesortReader(readers, keyProvider)
defer ordered.Close()

for {
    batch, err := ordered.Next(ctx)
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        return err
    }
    if batch != nil {
        // rows arrive in timestamp order across all files
        for _, row := range batch.Rows {
            // process each row
        }
    }
}

Composable reader trees:

// Process multiple file groups in timestamp order,
// then combine groups sequentially
keyProvider := NewTimeOrderedSortKeyProvider("timestamp")
group1 := NewMergesortReader(readers1, keyProvider)
group2 := NewMergesortReader(readers2, keyProvider)
final := NewSequentialReader([]Reader{group1, group2})

Memory Management & Batch Ownership

The filereader package implements efficient memory management through batch ownership:

**Batch Ownership**: Readers own the returned Batch and its Row maps. Callers must NOT retain references to batches beyond the next Next() call.

**Memory Safety**: Use pipeline.CopyBatch() if you need to retain batch data:

for {
    batch, err := reader.Next(ctx)
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        return err
    }
    if batch != nil {
        // Use data immediately or copy if retention needed
        safeBatch := pipeline.CopyBatch(batch)  // For retention
        // Process batch.Rows directly for immediate use
    }
}

**Data Safety**: Readers maintain clean batch states and handle EOF correctly. Batches must not be accessed after the next Next() call.

**Error Handling**: Next() returns nil batch on errors. Check error before accessing batch.

Resource Management

  • All readers must be closed via Close()
  • Parquet readers use random access (io.ReaderAt) - no buffering
  • Streaming readers (JSON, Proto) process incrementally
  • Composite readers automatically close child readers

Package filereader provides a generic interface for reading rows from various file formats. Callers construct readers directly and compose them as needed for their specific use cases.

Index

Constants

This section is empty.

Variables

View Source
var ErrHistogramNoCounts = HistogramNoCountsError{}

ErrHistogramNoCounts is a sentinel error for histogram datapoints with no bucket counts.

Functions

This section is empty.

Types

type AggregatingMetricsReader added in v1.3.3

type AggregatingMetricsReader struct {
	// contains filtered or unexported fields
}

AggregatingMetricsReader wraps a sorted Reader to perform streaming aggregation of metrics. It aggregates rows with the same [metric_name, tid, truncated_timestamp] key. The underlying reader must return rows in sorted order by this key.

func NewAggregatingMetricsReader added in v1.3.3

func NewAggregatingMetricsReader(reader Reader, aggregationPeriodMs int64, batchSize int) (*AggregatingMetricsReader, error)

NewAggregatingMetricsReader creates a new AggregatingMetricsReader that aggregates metrics with the same [metric_name, tid, truncated_timestamp] key.

aggregationPeriodMs: period in milliseconds for timestamp truncation (e.g., 10000 for 10s) reader: underlying reader that returns rows in sorted order by [metric_name, tid, timestamp]

func (*AggregatingMetricsReader) Close added in v1.3.3

func (ar *AggregatingMetricsReader) Close() error

Close closes the reader and the underlying reader.

func (*AggregatingMetricsReader) GetOTELMetrics added in v1.3.3

func (ar *AggregatingMetricsReader) GetOTELMetrics() (any, error)

GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.

func (*AggregatingMetricsReader) Next added in v1.3.3

Next returns the next batch of aggregated rows.

func (*AggregatingMetricsReader) TotalRowsReturned added in v1.3.3

func (ar *AggregatingMetricsReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of aggregated rows returned via Next().

type ArrowRawReader added in v1.4.1

type ArrowRawReader struct {
	// contains filtered or unexported fields
}

ArrowRawReader reads parquet files using Apache Arrow and returns raw rows. This reader provides raw parquet data without any opinionated transformations. It handles NULL-type columns gracefully, unlike the parquet-go library.

func NewArrowRawReader added in v1.4.1

func NewArrowRawReader(ctx context.Context, reader parquet.ReaderAtSeeker, batchSize int) (*ArrowRawReader, error)

NewArrowRawReader creates an ArrowRawReader for the given parquet.ReaderAtSeeker.

func (*ArrowRawReader) Close added in v1.4.1

func (r *ArrowRawReader) Close() error

Close releases resources associated with the reader.

func (*ArrowRawReader) GetSchema added in v1.4.1

func (r *ArrowRawReader) GetSchema() (*arrow.Schema, error)

GetSchema returns the schema of the parquet file.

func (*ArrowRawReader) Next added in v1.4.1

func (r *ArrowRawReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows from the parquet file.

func (*ArrowRawReader) TotalRowsReturned added in v1.4.1

func (r *ArrowRawReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows successfully returned.

type Batch added in v1.3.3

type Batch = pipeline.Batch

Batch represents a collection of rows with clear ownership semantics. The batch is owned by the reader that returns it.

type CSVLogTranslator added in v1.4.3

type CSVLogTranslator struct {
	// contains filtered or unexported fields
}

CSVLogTranslator handles translation for CSV log files

func NewCSVLogTranslator added in v1.4.3

func NewCSVLogTranslator(opts ReaderOptions) *CSVLogTranslator

NewCSVLogTranslator creates a new CSV log translator

func (*CSVLogTranslator) TranslateRow added in v1.4.3

func (t *CSVLogTranslator) TranslateRow(ctx context.Context, row *Row) error

TranslateRow handles CSV-specific field translation for logs

type CSVReader added in v1.4.3

type CSVReader struct {
	// contains filtered or unexported fields
}

CSVReader reads rows from a CSV stream using pipeline semantics.

func NewCSVReader added in v1.4.3

func NewCSVReader(reader io.ReadCloser, batchSize int) (*CSVReader, error)

NewCSVReader creates a new CSVReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called.

func (*CSVReader) Close added in v1.4.3

func (r *CSVReader) Close() error

Close closes the reader and the underlying io.ReadCloser.

func (*CSVReader) Next added in v1.4.3

func (r *CSVReader) Next(ctx context.Context) (*Batch, error)

func (*CSVReader) TotalRowsReturned added in v1.4.3

func (r *CSVReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type ChainTranslator

type ChainTranslator struct {
	// contains filtered or unexported fields
}

ChainTranslator applies multiple translators in sequence.

func NewChainTranslator

func NewChainTranslator(translators ...RowTranslator) *ChainTranslator

NewChainTranslator creates a translator that applies multiple translators in order.

func (*ChainTranslator) TranslateRow

func (ct *ChainTranslator) TranslateRow(ctx context.Context, row *Row) error

TranslateRow applies all translators in sequence to the row.

type CookedLogTranslatingReader added in v1.3.3

type CookedLogTranslatingReader struct {
	// contains filtered or unexported fields
}

CookedLogTranslatingReader wraps another reader and applies log-specific transformations to the data, such as type conversions, validation, and filtering. This reader takes ownership of rows it wants to keep using efficient buffer operations.

func NewCookedLogTranslatingReader added in v1.3.3

func NewCookedLogTranslatingReader(wrapped Reader) *CookedLogTranslatingReader

NewCookedLogTranslatingReader creates a new reader that applies log-specific transformations.

func (*CookedLogTranslatingReader) Close added in v1.3.3

func (r *CookedLogTranslatingReader) Close() error

Close closes the reader and its wrapped reader.

func (*CookedLogTranslatingReader) Next added in v1.3.3

Next returns the next batch of transformed log data.

func (*CookedLogTranslatingReader) TotalRowsReturned added in v1.3.3

func (r *CookedLogTranslatingReader) TotalRowsReturned() int64

TotalRowsReturned delegates to the wrapped reader.

type CookedMetricTranslatingReader added in v1.3.3

type CookedMetricTranslatingReader struct {
	// contains filtered or unexported fields
}

CookedMetricTranslatingReader wraps another reader and applies metric-specific transformations to the data, such as type conversions, validation, and filtering. This reader takes ownership of rows it wants to keep using efficient buffer operations.

func NewCookedMetricTranslatingReader added in v1.3.3

func NewCookedMetricTranslatingReader(wrapped Reader) *CookedMetricTranslatingReader

NewCookedMetricTranslatingReader creates a new reader that applies metric-specific transformations.

func (*CookedMetricTranslatingReader) Close added in v1.3.3

Close closes the reader and its wrapped reader.

func (*CookedMetricTranslatingReader) Next added in v1.3.3

Next returns the next batch of transformed metric data.

func (*CookedMetricTranslatingReader) TotalRowsReturned added in v1.3.3

func (r *CookedMetricTranslatingReader) TotalRowsReturned() int64

TotalRowsReturned delegates to the wrapped reader.

type CookedTraceTranslatingReader added in v1.3.5

type CookedTraceTranslatingReader struct {
	// contains filtered or unexported fields
}

CookedTraceTranslatingReader wraps another reader and applies trace-specific transformations to the data, such as type conversions, validation, and filtering. This reader takes ownership of rows it wants to keep using efficient buffer operations.

func NewCookedTraceTranslatingReader added in v1.3.5

func NewCookedTraceTranslatingReader(wrapped Reader) *CookedTraceTranslatingReader

NewCookedTraceTranslatingReader creates a new reader that applies trace-specific transformations.

func (*CookedTraceTranslatingReader) Close added in v1.3.5

Close closes the reader and its wrapped reader.

func (*CookedTraceTranslatingReader) Next added in v1.3.5

Next returns the next batch of transformed trace data.

func (*CookedTraceTranslatingReader) TotalRowsReturned added in v1.3.5

func (r *CookedTraceTranslatingReader) TotalRowsReturned() int64

TotalRowsReturned delegates to the wrapped reader.

type DiskSortingReader added in v1.3.3

type DiskSortingReader struct {
	// contains filtered or unexported fields
}

Memory Impact: LOW-MODERATE - Only stores extracted sort keys in memory plus file offsets.

Much more memory-efficient than MemorySortingReader for large datasets.

Disk I/O: 2x data size - Each row written once to temp binary file, then read once during output Stability: Records are only guaranteed to be sorted by the sort function;

if the sort function is not stable, the result will not be stable

func NewDiskSortingReader added in v1.3.3

func NewDiskSortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*DiskSortingReader, error)

NewDiskSortingReader creates a reader that uses disk-based sorting with custom binary encoding.

Use this for large datasets that may not fit in memory. The temp file is automatically cleaned up when the reader is closed. Custom binary encoding provides efficient storage and serialization for the temporary data with no reflection overhead.

The keyProvider creates sort keys from rows to minimize memory usage during sorting.

func (*DiskSortingReader) Close added in v1.3.3

func (r *DiskSortingReader) Close() error

Close closes the reader and cleans up temp file.

func (*DiskSortingReader) GetOTELMetrics added in v1.3.3

func (r *DiskSortingReader) GetOTELMetrics() (any, error)

GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.

func (*DiskSortingReader) Next added in v1.3.3

func (r *DiskSortingReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of sorted rows by reading from the temp file in index order.

func (*DiskSortingReader) TotalRowsReturned added in v1.3.3

func (r *DiskSortingReader) TotalRowsReturned() int64

TotalRowsReturned returns the number of rows that have been returned via Next().

type HistogramNoCountsError added in v1.3.3

type HistogramNoCountsError struct{}

HistogramNoCountsError represents an error when a histogram datapoint has no bucket counts. This is a recoverable error that indicates the datapoint should be dropped.

func (HistogramNoCountsError) Error added in v1.3.3

func (e HistogramNoCountsError) Error() string

type IngestProtoLogsReader added in v1.3.3

type IngestProtoLogsReader struct {
	// contains filtered or unexported fields
}

IngestProtoLogsReader reads rows from OpenTelemetry protobuf logs format.

Implements OTELLogsProvider interface.

func NewIngestProtoLogsReader added in v1.3.3

func NewIngestProtoLogsReader(reader io.Reader, opts ReaderOptions) (*IngestProtoLogsReader, error)

NewIngestProtoLogsReader creates a new IngestProtoLogsReader for the given io.Reader.

func (*IngestProtoLogsReader) Close added in v1.3.3

func (r *IngestProtoLogsReader) Close() error

Close closes the reader and releases resources.

func (*IngestProtoLogsReader) GetOTELLogs added in v1.3.3

func (r *IngestProtoLogsReader) GetOTELLogs() (*plog.Logs, error)

GetOTELLogs returns the underlying parsed OTEL logs structure. This allows access to the original log body and metadata not available in the row format.

func (*IngestProtoLogsReader) Next added in v1.3.3

func (r *IngestProtoLogsReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows from the OTEL logs.

func (*IngestProtoLogsReader) TotalRowsReturned added in v1.3.3

func (r *IngestProtoLogsReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type IngestProtoMetricsReader added in v1.3.3

type IngestProtoMetricsReader struct {
	// contains filtered or unexported fields
}

IngestProtoMetricsReader reads rows from OpenTelemetry protobuf metrics format for ingestion. This reader is specifically designed for metric ingestion and should not be used for compaction or rollup operations.

Implements OTELMetricsProvider interface.

func NewIngestProtoMetricsReader added in v1.3.3

func NewIngestProtoMetricsReader(reader io.Reader, opts ReaderOptions) (*IngestProtoMetricsReader, error)

NewIngestProtoMetricsReader creates a new IngestProtoMetricsReader for the given io.Reader.

func NewIngestProtoMetricsReaderFromMetrics added in v1.3.3

func NewIngestProtoMetricsReaderFromMetrics(metrics *pmetric.Metrics, opts ReaderOptions) (*IngestProtoMetricsReader, error)

NewIngestProtoMetricsReaderFromMetrics creates a new IngestProtoMetricsReader from pre-parsed OTEL metrics. This is useful when you need to access the raw OTEL structure for processing (e.g., exemplars) while also reading rows from the same data.

func (*IngestProtoMetricsReader) Close added in v1.3.3

func (r *IngestProtoMetricsReader) Close() error

Close closes the reader and releases resources.

func (*IngestProtoMetricsReader) GetOTELMetrics added in v1.3.3

func (r *IngestProtoMetricsReader) GetOTELMetrics() (*pmetric.Metrics, error)

GetOTELMetrics implements the OTELMetricsProvider interface. Returns the underlying pmetric.Metrics structure for exemplar processing.

func (*IngestProtoMetricsReader) Next added in v1.3.3

Next returns the next batch of rows from the OTEL metrics.

func (*IngestProtoMetricsReader) TotalRowsReturned added in v1.3.3

func (r *IngestProtoMetricsReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type JSONLinesReader

type JSONLinesReader struct {
	// contains filtered or unexported fields
}

JSONLinesReader reads rows from a JSON lines stream using pipeline semantics.

func NewJSONLinesReader

func NewJSONLinesReader(reader io.ReadCloser, batchSize int) (*JSONLinesReader, error)

NewJSONLinesReader creates a new JSONLinesReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called.

func (*JSONLinesReader) Close

func (r *JSONLinesReader) Close() error

Close closes the reader and the underlying io.ReadCloser.

func (*JSONLinesReader) Next added in v1.3.3

func (r *JSONLinesReader) Next(ctx context.Context) (*Batch, error)

func (*JSONLinesReader) TotalRowsReturned added in v1.3.3

func (r *JSONLinesReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type MemorySortingReader added in v1.3.3

type MemorySortingReader struct {
	// contains filtered or unexported fields
}

MemorySortingReader reads all rows from an underlying reader, then sorts them using a custom sort function and returns them in order. This is useful when you need sorted output with flexible sorting criteria.

Memory Impact: HIGH - All rows are loaded into memory at once Disk I/O: None (pure in-memory operations) Stability: Records are only guaranteed to be sorted by the sort function;

if the sort function is not stable, the result will not be stable

func NewMemorySortingReader added in v1.3.3

func NewMemorySortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*MemorySortingReader, error)

NewMemorySortingReader creates a reader that buffers all rows, sorts them using the provided key provider, then returns them in order.

Use this for smaller datasets that fit comfortably in memory. For large datasets, consider DiskSortingReader to avoid OOM issues.

func (*MemorySortingReader) Close added in v1.3.3

func (r *MemorySortingReader) Close() error

Close closes the reader and underlying reader.

func (*MemorySortingReader) GetOTELMetrics added in v1.3.3

func (r *MemorySortingReader) GetOTELMetrics() (any, error)

GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.

func (*MemorySortingReader) Next added in v1.3.3

func (r *MemorySortingReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of sorted rows from the buffer.

func (*MemorySortingReader) TotalRowsReturned added in v1.3.3

func (r *MemorySortingReader) TotalRowsReturned() int64

TotalRowsReturned returns the number of rows that have been returned via Next().

type MergesortReader added in v1.3.3

type MergesortReader struct {
	// contains filtered or unexported fields
}

MergesortReader implements merge-sort style reading across multiple pre-sorted readers. It assumes each individual reader returns rows in sorted order according to the provided SortKeyProvider.

func NewMergesortReader added in v1.3.3

func NewMergesortReader(ctx context.Context, readers []Reader, keyProvider SortKeyProvider, batchSize int) (*MergesortReader, error)

NewMergesortReader creates a new MergesortReader that merges rows from multiple readers in sorted order using the new algorithm with active reader management.

func (*MergesortReader) ActiveReaderCount added in v1.3.3

func (or *MergesortReader) ActiveReaderCount() int

ActiveReaderCount returns the number of readers that still have data available.

func (*MergesortReader) Close added in v1.3.3

func (or *MergesortReader) Close() error

Close closes all underlying readers and releases resources.

func (*MergesortReader) Next added in v1.3.3

func (or *MergesortReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows in sorted order across all readers.

func (*MergesortReader) TotalRowsReturned added in v1.3.3

func (or *MergesortReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows returned by this reader.

type MetricFilteringReader added in v1.3.6

type MetricFilteringReader struct {
	// contains filtered or unexported fields
}

MetricFilteringReader wraps another reader and filters rows by metric name. It assumes the underlying reader provides data sorted by metric name. Once it sees the target metric, it returns those rows until the metric changes.

func NewMetricFilteringReader added in v1.3.6

func NewMetricFilteringReader(source Reader, metricName string) *MetricFilteringReader

NewMetricFilteringReader creates a reader that filters for a specific metric name. It starts a goroutine to read ahead from the source.

func (*MetricFilteringReader) Close added in v1.3.6

func (r *MetricFilteringReader) Close() error

Close releases resources

func (*MetricFilteringReader) Next added in v1.3.6

func (r *MetricFilteringReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of filtered rows

func (*MetricFilteringReader) TotalRowsReturned added in v1.3.6

func (r *MetricFilteringReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows returned

type MetricSortKey added in v1.3.3

type MetricSortKey struct {
	Name      string
	Tid       int64
	Timestamp int64
	NameOk    bool
	TidOk     bool
	TsOk      bool
}

MetricSortKey represents the sort key for metrics: [name, tid, timestamp]

func MakeMetricSortKey added in v1.3.3

func MakeMetricSortKey(row Row) *MetricSortKey

MakeMetricSortKey creates a pooled MetricSortKey from a row

func (*MetricSortKey) Compare added in v1.3.3

func (k *MetricSortKey) Compare(other SortKey) int

Compare implements SortKey interface for MetricSortKey

func (*MetricSortKey) Release added in v1.3.3

func (k *MetricSortKey) Release()

Release returns the MetricSortKey to the pool for reuse

type MetricSortKeyProvider added in v1.3.3

type MetricSortKeyProvider struct{}

MetricSortKeyProvider creates MetricSortKey instances from rows

func (*MetricSortKeyProvider) MakeKey added in v1.3.3

func (p *MetricSortKeyProvider) MakeKey(row Row) SortKey

MakeKey implements SortKeyProvider interface for metrics

type NonPooledMetricSortKey added in v1.3.5

type NonPooledMetricSortKey struct {
	Name      string
	Tid       int64
	Timestamp int64
	NameOk    bool
	TidOk     bool
	TsOk      bool
}

NonPooledMetricSortKey is like MetricSortKey but doesn't use object pools This is safer for long-lived keys that are retained during sorting operations

func (*NonPooledMetricSortKey) Compare added in v1.3.5

func (k *NonPooledMetricSortKey) Compare(other SortKey) int

Compare implements SortKey interface for NonPooledMetricSortKey

func (*NonPooledMetricSortKey) Release added in v1.3.5

func (k *NonPooledMetricSortKey) Release()

Release is a no-op for non-pooled keys

type NonPooledMetricSortKeyProvider added in v1.3.5

type NonPooledMetricSortKeyProvider struct{}

NonPooledMetricSortKeyProvider creates NonPooledMetricSortKey instances This is safer for long-lived keys that are retained during sorting operations

func (*NonPooledMetricSortKeyProvider) MakeKey added in v1.3.5

func (p *NonPooledMetricSortKeyProvider) MakeKey(row Row) SortKey

MakeKey implements SortKeyProvider interface for metrics without using pools

type NoopTranslator

type NoopTranslator struct{}

NoopTranslator returns rows unchanged for high performance. This is a code example of the most efficient translator implementation.

func NewNoopTranslator

func NewNoopTranslator() *NoopTranslator

NewNoopTranslator creates a translator that passes rows through unchanged.

func (*NoopTranslator) TranslateRow

func (nt *NoopTranslator) TranslateRow(ctx context.Context, row *Row) error

TranslateRow does nothing for maximum performance.

type OTELLogsProvider added in v1.3.3

type OTELLogsProvider interface {
	// GetOTELLogs returns the underlying parsed OTEL logs structure.
	// This allows access to the original log body and metadata not available in the row format.
	GetOTELLogs() (*plog.Logs, error)
}

OTELLogsProvider provides access to the underlying OpenTelemetry logs structure. This is used when the original OTEL structure is needed for processing (e.g., exemplars).

type OTELMetricsProvider added in v1.3.3

type OTELMetricsProvider interface {
	// GetOTELMetrics returns the underlying parsed OTEL metrics structure.
	// This allows access to exemplars and other metadata not available in the row format.
	GetOTELMetrics() (*pmetric.Metrics, error)
}

OTELMetricsProvider provides access to the underlying OpenTelemetry metrics structure. This is used when the original OTEL structure is needed for processing (e.g., exemplars).

type ParquetRawReader added in v1.3.3

type ParquetRawReader struct {
	// contains filtered or unexported fields
}

ParquetRawReader reads rows from a generic Parquet stream. This reader provides raw parquet data without any opinionated transformations. Use wrapper readers like CookedMetricTranslatingReader for domain-specific logic.

func NewParquetRawReader added in v1.3.3

func NewParquetRawReader(reader io.ReaderAt, size int64, batchSize int) (*ParquetRawReader, error)

NewParquetRawReader creates a new ParquetRawReader for the given io.ReaderAt.

func (*ParquetRawReader) Close added in v1.3.3

func (r *ParquetRawReader) Close() error

Close closes the reader and releases resources.

func (*ParquetRawReader) Next added in v1.3.3

func (r *ParquetRawReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows from the parquet file.

func (*ParquetRawReader) TotalRowsReturned added in v1.3.3

func (r *ParquetRawReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type ProtoBinLogTranslator added in v1.3.3

type ProtoBinLogTranslator struct {
	// contains filtered or unexported fields
}

ProtoBinLogTranslator handles translation for protobuf binary log files

func NewProtoBinLogTranslator added in v1.3.3

func NewProtoBinLogTranslator(opts ReaderOptions) *ProtoBinLogTranslator

NewProtoBinLogTranslator creates a new protobuf log translator

func (*ProtoBinLogTranslator) TranslateRow added in v1.3.3

func (t *ProtoBinLogTranslator) TranslateRow(ctx context.Context, row *Row) error

TranslateRow handles protobuf-specific field translation

type ProtoTracesReader

type ProtoTracesReader struct {
	// contains filtered or unexported fields
}

ProtoTracesReader reads rows from OpenTelemetry protobuf traces format. Returns raw OTEL trace data without signal-specific transformations.

func NewIngestProtoTracesReader added in v1.4.1

func NewIngestProtoTracesReader(reader io.Reader, opts ReaderOptions) (*ProtoTracesReader, error)

NewIngestProtoTracesReader creates a new ProtoTracesReader for ingestion with exemplar processing.

func NewProtoTracesReader

func NewProtoTracesReader(reader io.Reader, batchSize int) (*ProtoTracesReader, error)

NewProtoTracesReader creates a new ProtoTracesReader for the given io.Reader. The caller is responsible for closing the underlying reader.

func (*ProtoTracesReader) Close

func (r *ProtoTracesReader) Close() error

Close closes the reader and releases resources.

func (*ProtoTracesReader) Next added in v1.3.3

func (r *ProtoTracesReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of rows from the OTEL traces.

func (*ProtoTracesReader) TotalRowsReturned added in v1.3.3

func (r *ProtoTracesReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next().

type Reader

type Reader interface {
	// Next returns the next batch of rows, or io.EOF when exhausted.
	// The returned batch is owned by the reader and must not be retained
	// beyond the next Next() call. Use pipeline.CopyBatch() if you need to retain.
	// The context can be used for cancellation, deadlines, and updating metrics.
	Next(ctx context.Context) (*Batch, error)

	// Close releases any resources held by the reader.
	Close() error

	// TotalRowsReturned returns the total number of rows that have been successfully
	// returned via Next() calls from this reader so far.
	TotalRowsReturned() int64
}

Reader is the core interface for reading rows from any file format using pipeline semantics. This eliminates memory ownership issues by establishing clear ownership: batches are owned by the reader and must not be retained beyond the next Next() call.

func NewCookedLogParquetReader added in v1.3.3

func NewCookedLogParquetReader(r io.ReaderAt, size int64, batchSize int) (Reader, error)

NewCookedLogParquetReader creates a reader for cooked log parquet files. It wraps a ParquetRawReader with CookedLogTranslatingReader to apply log-specific filtering and transformations.

func NewCookedMetricParquetReader added in v1.3.3

func NewCookedMetricParquetReader(r io.ReaderAt, size int64, batchSize int) (Reader, error)

NewCookedMetricParquetReader creates a reader for cooked metric parquet files. It wraps a ParquetRawReader with CookedMetricTranslatingReader to apply metric-specific filtering and transformations.

func NewCookedTraceParquetReader added in v1.3.5

func NewCookedTraceParquetReader(r io.ReaderAt, size int64, batchSize int) (Reader, error)

NewCookedTraceParquetReader creates a reader for cooked trace parquet files. It wraps a ParquetRawReader with CookedTraceTranslatingReader to apply trace-specific filtering and transformations.

func ReaderForFile added in v1.3.3

func ReaderForFile(filename string, signalType SignalType, orgId string, exemplarProcessor *exemplars.Processor) (Reader, error)

ReaderForFile creates a Reader for the given file based on its extension and signal type. This is a convenience function that uses default options.

func ReaderForFileWithOptions added in v1.3.3

func ReaderForFileWithOptions(filename string, opts ReaderOptions) (Reader, error)

ReaderForFileWithOptions creates a Reader for the given file with the provided options. Supported file formats:

  • .parquet: Creates a ParquetRawReader (works for all signal types)
  • .json.gz: Creates a JSONLinesReader with gzip decompression (works for all signal types)
  • .json: Creates a JSONLinesReader (works for all signal types)
  • .csv: Creates a CSVReader (works for all signal types)
  • .csv.gz: Creates a CSVReader with gzip decompression (works for all signal types)
  • .binpb: Creates a signal-specific proto reader (NewIngestProtoLogsReader, NewIngestProtoMetricsReader, or NewProtoTracesReader)
  • .binpb.gz: Creates a signal-specific proto reader with gzip decompression

func ReaderForMetricAggregation added in v1.3.3

func ReaderForMetricAggregation(filename, orgId string, aggregationPeriodMs int64) (Reader, error)

ReaderForMetricAggregation creates a Reader for metrics with aggregation enabled.

func WrapReaderForAggregation added in v1.3.3

func WrapReaderForAggregation(reader Reader, opts ReaderOptions) (Reader, error)

WrapReaderForAggregation wraps a reader with aggregation if enabled.

type ReaderOptions added in v1.3.3

type ReaderOptions struct {
	SignalType SignalType
	BatchSize  int // Batch size for readers (default: 1000)
	// Translation options for protobuf logs and metrics
	OrgID             string
	Bucket            string
	ObjectID          string
	ExemplarProcessor *exemplars.Processor
	// Aggregation options for metrics
	EnableAggregation   bool  // Enable streaming aggregation
	AggregationPeriodMs int64 // Aggregation period in milliseconds (e.g., 10000 for 10s)
}

ReaderOptions provides options for creating readers.

type Row

type Row = pipeline.Row

Row represents a single row of data as a map of column names to values.

type RowIndex added in v1.3.3

type RowIndex struct {
	SortKey    SortKey // Extracted sort key for sorting
	FileOffset int64
	ByteLength int32
}

RowIndex represents a lightweight pointer to a binary-encoded row in the temp file. It stores only the extracted sort key plus file location info.

type RowTranslator

type RowTranslator interface {
	// TranslateRow transforms a row in-place by modifying the provided row pointer.
	TranslateRow(ctx context.Context, row *Row) error
}

RowTranslator transforms rows from one format to another.

type SequentialReader added in v1.3.3

type SequentialReader struct {
	// contains filtered or unexported fields
}

SequentialReader reads from multiple readers sequentially in the order provided. It reads all rows from the first reader, then all rows from the second reader, etc. This is useful when you want to concatenate multiple files without any ordering requirements.

func NewSequentialReader added in v1.3.3

func NewSequentialReader(readers []Reader, batchSize int) (*SequentialReader, error)

NewSequentialReader creates a new SequentialReader that reads from the provided readers sequentially. Readers will be closed when the SequentialReader is closed.

func (*SequentialReader) Close added in v1.3.3

func (sr *SequentialReader) Close() error

Close closes all underlying readers and releases resources.

func (*SequentialReader) CurrentReaderIndex added in v1.3.3

func (sr *SequentialReader) CurrentReaderIndex() int

CurrentReaderIndex returns the index of the reader currently being read from. Returns -1 if all readers are exhausted or the reader is closed.

func (*SequentialReader) Next added in v1.3.3

func (sr *SequentialReader) Next(ctx context.Context) (*Batch, error)

func (*SequentialReader) RemainingReaderCount added in v1.3.3

func (sr *SequentialReader) RemainingReaderCount() int

RemainingReaderCount returns the number of readers that haven't been fully processed yet.

func (*SequentialReader) TotalReaderCount added in v1.3.3

func (sr *SequentialReader) TotalReaderCount() int

TotalReaderCount returns the total number of readers in this SequentialReader.

func (*SequentialReader) TotalRowsReturned added in v1.3.3

func (sr *SequentialReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next() from all readers.

type SignalType added in v1.3.3

type SignalType int

SignalType represents the type of telemetry signal being processed.

const (
	// SignalTypeLogs represents log data
	SignalTypeLogs SignalType = iota
	// SignalTypeMetrics represents metric data
	SignalTypeMetrics
	// SignalTypeTraces represents trace data
	SignalTypeTraces
)

func (SignalType) String added in v1.3.3

func (s SignalType) String() string

String returns the string representation of the signal type.

type SortKey added in v1.3.3

type SortKey interface {
	// Compare returns:
	// -1 if this key should come before other
	//  0 if this key equals other
	//  1 if this key should come after other
	Compare(other SortKey) int

	// Release returns the key to its pool for reuse
	Release()
}

SortKey represents a key that can be compared for sorting

type SortKeyProvider added in v1.3.3

type SortKeyProvider interface {
	// MakeKey creates a sort key from a row
	MakeKey(row Row) SortKey
}

SortKeyProvider creates sort keys from rows

func GetCurrentMetricSortKeyProvider added in v1.3.5

func GetCurrentMetricSortKeyProvider() SortKeyProvider

GetCurrentMetricSortKeyProvider returns the provider for creating sort keys for the current metric sort version. This is the single source of truth for metric sorting.

type TagsTranslator

type TagsTranslator struct {
	// contains filtered or unexported fields
}

TagsTranslator adds static tags to every row.

func NewTagsTranslator

func NewTagsTranslator(tags map[string]string) *TagsTranslator

NewTagsTranslator creates a translator that adds the given tags to each row.

func (*TagsTranslator) TranslateRow

func (tt *TagsTranslator) TranslateRow(ctx context.Context, row *Row) error

TranslateRow adds tags to the row in-place.

type TimeOrderedSortKey added in v1.3.3

type TimeOrderedSortKey struct {
	// contains filtered or unexported fields
}

TimeOrderedSortKey represents a sort key for timestamp-based ordering

func (*TimeOrderedSortKey) Compare added in v1.3.3

func (k *TimeOrderedSortKey) Compare(other SortKey) int

func (*TimeOrderedSortKey) Release added in v1.3.3

func (k *TimeOrderedSortKey) Release()

type TimeOrderedSortKeyProvider added in v1.3.3

type TimeOrderedSortKeyProvider struct {
	// contains filtered or unexported fields
}

TimeOrderedSortKeyProvider creates sort keys based on a single timestamp field

func NewTimeOrderedSortKeyProvider added in v1.3.3

func NewTimeOrderedSortKeyProvider(fieldName string) *TimeOrderedSortKeyProvider

NewTimeOrderedSortKeyProvider creates a provider that sorts by a timestamp field

func (*TimeOrderedSortKeyProvider) MakeKey added in v1.3.3

func (p *TimeOrderedSortKeyProvider) MakeKey(row Row) SortKey

type TimestampSortKey added in v1.3.3

type TimestampSortKey struct {
	Timestamp int64
	TsOk      bool
}

TimestampSortKey represents timestamp-only sort key

func MakeTimestampSortKey added in v1.3.3

func MakeTimestampSortKey(row Row) *TimestampSortKey

MakeTimestampSortKey creates a pooled TimestampSortKey from a row

func (*TimestampSortKey) Compare added in v1.3.3

func (k *TimestampSortKey) Compare(other SortKey) int

Compare implements SortKey interface for TimestampSortKey

func (*TimestampSortKey) Release added in v1.3.3

func (k *TimestampSortKey) Release()

Release returns the TimestampSortKey to the pool for reuse

type TimestampSortKeyProvider added in v1.3.3

type TimestampSortKeyProvider struct{}

TimestampSortKeyProvider creates TimestampSortKey instances from rows

func (*TimestampSortKeyProvider) MakeKey added in v1.3.3

func (p *TimestampSortKeyProvider) MakeKey(row Row) SortKey

MakeKey implements SortKeyProvider interface for timestamps

type TranslatingReader

type TranslatingReader struct {
	// contains filtered or unexported fields
}

TranslatingReader wraps another Reader and applies row transformations. This enables composition where any Reader can be enhanced with signal-specific translation logic without coupling file parsing to data transformation.

func NewTranslatingReader

func NewTranslatingReader(reader Reader, translator RowTranslator, batchSize int) (*TranslatingReader, error)

NewTranslatingReader creates a new TranslatingReader that applies the given translator to each row returned by the underlying reader.

The TranslatingReader takes ownership of the underlying reader and will close it when Close() is called.

func (*TranslatingReader) Close

func (tr *TranslatingReader) Close() error

Close closes the underlying reader and releases resources.

func (*TranslatingReader) Next added in v1.3.3

func (tr *TranslatingReader) Next(ctx context.Context) (*Batch, error)

Next returns the next batch of translated rows from the underlying reader.

func (*TranslatingReader) TotalRowsReturned added in v1.3.3

func (tr *TranslatingReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows that have been successfully returned via Next() after translation by this reader.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL