metricsprocessing

package
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: AGPL-3.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanupCommittedOffsets added in v1.3.6

func CleanupCommittedOffsets(ctx context.Context, store CleanupStore, topic, consumerGroup string, messages []fly.ConsumedMessage)

CleanupCommittedOffsets removes old offset tracking records after successful Kafka commit. This is a helper function for consumers that don't use CommonConsumer.

Types

type BoxerConsumer added in v1.4.1

type BoxerConsumer interface {
	Run(ctx context.Context) error
	Close() error
}

BoxerConsumer represents a generic boxer consumer interface

type BoxerManager added in v1.4.1

type BoxerManager struct {
	// contains filtered or unexported fields
}

BoxerManager coordinates multiple boxer consumers running concurrently

func NewBoxerManager added in v1.4.1

func NewBoxerManager(ctx context.Context, cfg *config.Config, store BoxerStore, tasks []string) (*BoxerManager, error)

NewBoxerManager creates a new BoxerManager with the specified tasks

func NewBoxerManagerWithFactory added in v1.4.1

func NewBoxerManagerWithFactory(ctx context.Context, factory ConsumerFactory, tasks []string) (*BoxerManager, error)

NewBoxerManagerWithFactory creates a new BoxerManager with a custom factory (for testing)

func (*BoxerManager) Close added in v1.4.1

func (m *BoxerManager) Close() error

Close stops all consumers

func (*BoxerManager) Run added in v1.4.1

func (m *BoxerManager) Run(ctx context.Context) error

Run starts all consumers concurrently and waits for them to complete

type BoxerStore added in v1.3.5

type BoxerStore interface {
	OffsetTrackerStore
	GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
	GetTraceEstimate(ctx context.Context, orgID uuid.UUID) int64
	GetLogEstimate(ctx context.Context, orgID uuid.UUID) int64
	WorkQueueAdd(ctx context.Context, arg lrdb.WorkQueueAddParams) (lrdb.WorkQueue, error)
	WorkQueueDepth(ctx context.Context, taskName string) (int64, error)
	WorkQueueCleanup(ctx context.Context, heartbeatTimeout time.Duration) error
}

BoxerStore defines the interface required by all boxer processors. Boxers only bundle messages and need estimation methods for optimal batching. They do not perform actual processing operations (compaction, etc.).

type BundleProcessor added in v1.6.0

type BundleProcessor interface {
	ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
}

BundleProcessor defines the interface for processing a bundle from the work queue

type CleanupStore added in v1.3.6

type CleanupStore interface {
	CleanupKafkaOffsets(ctx context.Context, params lrdb.CleanupKafkaOffsetsParams) (int64, error)
}

CleanupStore defines the interface for stores that support offset cleanup

type CommonConsumer added in v1.3.5

type CommonConsumer[M messages.CompactionMessage, K messages.CompactionKeyInterface] struct {
	// contains filtered or unexported fields
}

CommonConsumer is a generic consumer for any message processing operations

func NewCommonConsumer added in v1.3.5

func NewCommonConsumer[M messages.CompactionMessage, K messages.CompactionKeyInterface](
	ctx context.Context,
	factory FlyConsumerFactory,
	cfg *config.Config,
	consumerConfig CommonConsumerConfig,
	store offsetStore,
	processor processor[M, K],
) (*CommonConsumer[M, K], error)

NewCommonConsumer creates a new generic common consumer

func NewCommonConsumerWithComponents added in v1.3.5

func NewCommonConsumerWithComponents[M messages.CompactionMessage, K messages.CompactionKeyInterface](
	ctx context.Context,
	consumer fly.Consumer,
	consumerConfig CommonConsumerConfig,
	store offsetStore,
	processor processor[M, K],
) *CommonConsumer[M, K]

NewCommonConsumerWithComponents creates a consumer with provided components (for testability)

func (*CommonConsumer[M, K]) Close added in v1.3.5

func (c *CommonConsumer[M, K]) Close() error

Close stops the consumer and cleans up resources

func (*CommonConsumer[M, K]) Run added in v1.3.5

func (c *CommonConsumer[M, K]) Run(ctx context.Context) error

Run starts the Kafka consumer and periodic flushing

type CommonConsumerConfig added in v1.3.5

type CommonConsumerConfig struct {
	ConsumerName  string
	Topic         string
	ConsumerGroup string
	FlushInterval time.Duration
	StaleAge      time.Duration
	MaxAge        time.Duration
}

CommonConsumerConfig holds configuration for a common consumer

type ConfigMismatchError added in v1.3.4

type ConfigMismatchError struct {
	Field    string // "topic" or "consumer_group"
	Expected string
	Got      string
}

ConfigMismatchError represents an error when message metadata doesn't match gatherer configuration

func (*ConfigMismatchError) Error added in v1.3.4

func (e *ConfigMismatchError) Error() string

type ConsumerFactory added in v1.4.1

type ConsumerFactory interface {
	CreateLogIngestConsumer(ctx context.Context) (BoxerConsumer, error)
	CreateMetricIngestConsumer(ctx context.Context) (BoxerConsumer, error)
	CreateTraceIngestConsumer(ctx context.Context) (BoxerConsumer, error)
	CreateLogCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
	CreateMetricCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
	CreateTraceCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
	CreateMetricRollupConsumer(ctx context.Context) (BoxerConsumer, error)
}

ConsumerFactory creates boxer consumers for different task types

type DateintBin added in v1.3.4

type DateintBin struct {
	Dateint int32 // The dateint for this bin
	Writer  parquetwriter.ParquetWriter
	Results []parquetwriter.Result // Results after writer is closed (can be multiple files)
}

DateintBin represents a file group containing logs for a specific dateint

type DateintBinManager added in v1.3.4

type DateintBinManager struct {
	// contains filtered or unexported fields
}

DateintBinManager manages multiple file groups, one per dateint

type DateintBinResult added in v1.9.0

type DateintBinResult struct {
	Dateint     int32
	OutputFile  string // Local parquet file path
	RecordCount int64
	FileSize    int64
	Metadata    *MetricFileMetadata
}

DateintBinResult contains the result for a single dateint partition

type DefaultConsumerFactory added in v1.4.1

type DefaultConsumerFactory struct {
	// contains filtered or unexported fields
}

DefaultConsumerFactory implements ConsumerFactory using existing consumer constructors

func NewDefaultConsumerFactory added in v1.4.1

func NewDefaultConsumerFactory(cfg *config.Config, boxerStore BoxerStore) (*DefaultConsumerFactory, error)

NewDefaultConsumerFactory creates a new DefaultConsumerFactory

func (*DefaultConsumerFactory) CreateLogCompactionConsumer added in v1.4.1

func (f *DefaultConsumerFactory) CreateLogCompactionConsumer(ctx context.Context) (BoxerConsumer, error)

func (*DefaultConsumerFactory) CreateLogIngestConsumer added in v1.4.5

func (f *DefaultConsumerFactory) CreateLogIngestConsumer(ctx context.Context) (BoxerConsumer, error)

func (*DefaultConsumerFactory) CreateMetricCompactionConsumer added in v1.4.1

func (f *DefaultConsumerFactory) CreateMetricCompactionConsumer(ctx context.Context) (BoxerConsumer, error)

func (*DefaultConsumerFactory) CreateMetricIngestConsumer added in v1.4.5

func (f *DefaultConsumerFactory) CreateMetricIngestConsumer(ctx context.Context) (BoxerConsumer, error)

func (*DefaultConsumerFactory) CreateMetricRollupConsumer added in v1.4.1

func (f *DefaultConsumerFactory) CreateMetricRollupConsumer(ctx context.Context) (BoxerConsumer, error)

func (*DefaultConsumerFactory) CreateTraceCompactionConsumer added in v1.4.1

func (f *DefaultConsumerFactory) CreateTraceCompactionConsumer(ctx context.Context) (BoxerConsumer, error)

func (*DefaultConsumerFactory) CreateTraceIngestConsumer added in v1.4.5

func (f *DefaultConsumerFactory) CreateTraceIngestConsumer(ctx context.Context) (BoxerConsumer, error)

type FlyConsumerFactory added in v1.3.5

type FlyConsumerFactory interface {
	CreateConsumer(topic, consumerGroup string) (fly.Consumer, error)
}

FlyConsumerFactory defines the interface for creating Kafka consumers (for testability)

type GroupValidationError added in v1.3.4

type GroupValidationError struct {
	Field    string      // field that failed validation
	Expected interface{} // expected value
	Got      interface{} // actual value
	Message  string      // description of the validation failure
}

GroupValidationError represents an error when message group validation fails

func (*GroupValidationError) Error added in v1.3.4

func (e *GroupValidationError) Error() string

type KafkaCommitData added in v1.3.4

type KafkaCommitData struct {
	Topic         string
	ConsumerGroup string
	Offsets       map[int32]int64 // partition -> highest offset
}

KafkaCommitData represents Kafka offset data to be committed atomically with SQL operations This is now only used internally by metadataTracker for safe commit offset calculations

type LogCompactionBoxerConsumer added in v1.3.5

type LogCompactionBoxerConsumer struct {
	*CommonConsumer[*messages.LogCompactionMessage, messages.LogCompactionKey]
}

LogCompactionBoxerConsumer handles log compaction using the generic framework

func NewLogCompactionBoxerConsumer added in v1.3.5

func NewLogCompactionBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*LogCompactionBoxerConsumer, error)

NewLogCompactionBoxerConsumer creates a new log compaction boxer consumer using the generic framework

type LogCompactionBoxerProcessor added in v1.3.5

type LogCompactionBoxerProcessor struct {
	// contains filtered or unexported fields
}

LogCompactionBoxerProcessor implements boxing for log compaction messages. Note: This processor only bundles messages for compaction and only requires GetLogEstimate() from the store. The actual compaction operations (GetLogSeg, CompactLogSegments, etc.) are performed by the compaction processor itself.

func (*LogCompactionBoxerProcessor) GetTargetRecordCount added in v1.3.5

func (p *LogCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.LogCompactionKey) int64

GetTargetRecordCount returns the target record count for a grouping key

func (*LogCompactionBoxerProcessor) Process added in v1.3.5

func (p *LogCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.LogCompactionKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the processor interface for log compaction boxing

func (*LogCompactionBoxerProcessor) ShouldEmitImmediately added in v1.7.0

func (p *LogCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.LogCompactionMessage) bool

ShouldEmitImmediately returns false - log compaction always uses normal grouping.

type LogCompactionConsumer added in v1.3.4

type LogCompactionConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

LogCompactionConsumer handles log compaction bundles from the work queue

func NewLogCompactionConsumer added in v1.3.4

func NewLogCompactionConsumer(
	ctx context.Context,
	cfg *config.Config,
	store LogCompactionStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*LogCompactionConsumer, error)

NewLogCompactionConsumer creates a new log compaction consumer that processes bundles from the work queue

type LogCompactionProcessor added in v1.3.4

type LogCompactionProcessor struct {
	// contains filtered or unexported fields
}

LogCompactionProcessor implements compaction processing for logs

func NewLogCompactionProcessor added in v1.3.5

func NewLogCompactionProcessor(
	store LogCompactionStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
	cfg *config.Config,
) *LogCompactionProcessor

NewLogCompactionProcessor creates a new log compaction processor

func (*LogCompactionProcessor) GetTargetRecordCount added in v1.3.4

func (p *LogCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.LogCompactionKey) int64

GetTargetRecordCount returns the target record count for a grouping key

func (*LogCompactionProcessor) ProcessBundle added in v1.3.5

func (p *LogCompactionProcessor) ProcessBundle(ctx context.Context, key messages.LogCompactionKey, msgs []*messages.LogCompactionMessage, partition int32, offset int64) error

ProcessBundle processes a compaction bundle directly (simplified interface)

func (*LogCompactionProcessor) ProcessBundleFromQueue added in v1.6.0

func (p *LogCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue processes a log compaction bundle from the work queue

func (*LogCompactionProcessor) ShouldEmitImmediately added in v1.7.0

func (p *LogCompactionProcessor) ShouldEmitImmediately(msg *messages.LogCompactionMessage) bool

ShouldEmitImmediately returns false - log compaction always uses normal grouping.

type LogCompactionStore added in v1.3.4

type LogCompactionStore interface {
	workqueue.DB
	GetLogSeg(ctx context.Context, params lrdb.GetLogSegParams) (lrdb.LogSeg, error)
	CompactLogSegments(ctx context.Context, params lrdb.CompactLogSegsParams) error
	MarkLogSegsCompactedByKeys(ctx context.Context, params lrdb.MarkLogSegsCompactedByKeysParams) error
	GetLogEstimate(ctx context.Context, orgID uuid.UUID) int64
}

LogCompactionStore defines database operations needed for log compaction

type LogDateintBinResult added in v1.9.1

type LogDateintBinResult struct {
	Dateint     int32
	OutputFile  string // Local parquet file path
	RecordCount int64
	FileSize    int64
	Metadata    *LogFileMetadata
}

LogDateintBinResult contains the result for a single dateint partition

type LogFileMetadata added in v1.9.1

type LogFileMetadata struct {
	StartTs       int64    // Inclusive start timestamp in ms
	EndTs         int64    // Exclusive end timestamp in ms (lastTs + 1)
	Hour          int16    // Hour of day for the first timestamp
	Fingerprints  []int64  // Unique log fingerprints for semantic grouping
	StreamIds     []string // Unique stream identifier values
	StreamIdField string   // Field used for stream identification
	LabelNameMap  []byte   // JSON map of label columns
}

LogFileMetadata contains metadata extracted from the log parquet file

type LogIngestBoxerConsumer added in v1.4.5

type LogIngestBoxerConsumer struct {
	*CommonConsumer[*messages.ObjStoreNotificationMessage, messages.IngestKey]
}

LogIngestBoxerConsumer handles log ingestion bundling using CommonConsumer

func NewLogIngestBoxerConsumer added in v1.4.5

func NewLogIngestBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*LogIngestBoxerConsumer, error)

NewLogIngestBoxerConsumer creates a new log ingestion boxer consumer using the common consumer framework

type LogIngestBoxerProcessor added in v1.4.5

type LogIngestBoxerProcessor struct {
	// contains filtered or unexported fields
}

LogIngestBoxerProcessor implements the Processor interface for boxing log ingestion bundles

func (*LogIngestBoxerProcessor) GetTargetRecordCount added in v1.4.5

func (b *LogIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64

GetTargetRecordCount returns the target file size limit for log ingestion batching

func (*LogIngestBoxerProcessor) Process added in v1.4.5

func (b *LogIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the Processor interface and sends the bundle to the ingestion processing topic

func (*LogIngestBoxerProcessor) ShouldEmitImmediately added in v1.7.0

func (b *LogIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool

ShouldEmitImmediately returns true for parquet files to prevent memory issues when processing large pre-aggregated files that shouldn't be batched together.

type LogIngestConsumer added in v1.3.4

type LogIngestConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

LogIngestConsumer handles log ingest bundles from the work queue

func NewLogIngestConsumer added in v1.3.4

func NewLogIngestConsumer(
	ctx context.Context,
	cfg *config.Config,
	kafkaFactory *fly.Factory,
	store LogIngestStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*LogIngestConsumer, error)

NewLogIngestConsumer creates a new log ingest consumer that processes bundles from the work queue

type LogIngestDuckDBResult added in v1.9.1

type LogIngestDuckDBResult struct {
	DateintBins map[int32]*LogDateintBinResult
	TotalRows   int64
	// FailedPartitions tracks dateint keys that failed processing with their errors.
	FailedPartitions map[int64]error
}

LogIngestDuckDBResult contains the results of DuckDB-based log ingestion

type LogIngestProcessor added in v1.3.4

type LogIngestProcessor struct {
	// contains filtered or unexported fields
}

LogIngestProcessor implements the Processor interface for raw log ingestion

func (*LogIngestProcessor) GetTargetRecordCount added in v1.3.4

func (p *LogIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64

GetTargetRecordCount returns the target file size limit (5MB) for accumulation

func (*LogIngestProcessor) ProcessBundle added in v1.4.5

func (p *LogIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, msgs []*messages.ObjStoreNotificationMessage, partition int32, offset int64) error

Process implements the Processor interface and performs raw log ingestion

func (*LogIngestProcessor) ProcessBundleFromQueue added in v1.6.0

func (p *LogIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration

func (*LogIngestProcessor) ShouldEmitImmediately added in v1.7.0

func (p *LogIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool

ShouldEmitImmediately returns false - log ingest processing always uses normal grouping.

type LogIngestStore added in v1.3.4

type LogIngestStore interface {
	workqueue.DB
	InsertLogSegmentsBatch(ctx context.Context, segments []lrdb.InsertLogSegmentParams) error
	GetLogEstimate(ctx context.Context, orgID uuid.UUID) int64
}

LogIngestStore defines database operations needed for log ingestion

type LogIngestionResult added in v1.5.0

type LogIngestionResult struct {
	OutputFiles      []parquetwriter.Result // Parquet files generated
	TotalRecords     int64                  // Total number of log records processed
	TotalInputBytes  int64                  // Total size of input files
	TotalOutputBytes int64                  // Total size of output files
	SegmentParams    []lrdb.InsertLogSegmentParams
}

LogIngestionResult contains the results of processing log files

func ProcessLogFiles added in v1.5.0

func ProcessLogFiles(
	ctx context.Context,
	filePaths []string,
	orgID string,
	bucket string,
	outputDir string,
	rpfEstimate int64,
	fingerprintManager *fingerprint.TenantManager,
	backendType parquetwriter.BackendType,
	streamField string,
) (*LogIngestionResult, error)

ProcessLogFiles performs end-to-end log ingestion on a set of files. This function encapsulates the complete ingestion pipeline and can be called from both the production processor and benchmarks.

Parameters:

  • ctx: Context for cancellation and logging
  • filePaths: List of file paths to process (can be local files)
  • orgID: Organization ID for metadata
  • bucket: Bucket name for metadata
  • outputDir: Directory to write output Parquet files
  • rpfEstimate: Rows-per-file estimate for Parquet writer sizing
  • fingerprintManager: Optional fingerprint manager (can be nil for benchmarks)
  • backendType: Parquet backend type (empty string defaults to go-parquet)
  • streamField: Field to use for stream identification (empty = use default priority)

Returns:

  • LogIngestionResult with statistics and output file information
  • error if any step fails

type LogTranslatingReader added in v1.5.0

type LogTranslatingReader struct {
	// contains filtered or unexported fields
}

LogTranslatingReader wraps a reader and applies log-specific transformations. Unlike ParquetLogTranslatingReader, this is for non-parquet formats (proto, json, csv).

func NewLogTranslatingReader added in v1.5.0

func NewLogTranslatingReader(wrapped filereader.Reader, orgID, bucket, objectID string, fingerprintTenantManager *fingerprint.TenantManager) *LogTranslatingReader

NewLogTranslatingReader creates a new reader that applies log-specific transformations.

func (*LogTranslatingReader) Close added in v1.5.0

func (r *LogTranslatingReader) Close() error

Close closes the wrapped reader

func (*LogTranslatingReader) GetSchema added in v1.5.0

GetSchema returns the schema with log-specific columns added.

func (*LogTranslatingReader) Next added in v1.5.0

Next returns the next batch of transformed log data.

func (*LogTranslatingReader) TotalRowsReturned added in v1.5.0

func (r *LogTranslatingReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows successfully returned

type LogTranslator added in v1.3.4

type LogTranslator struct {
	// contains filtered or unexported fields
}

LogTranslator adds resource metadata to log rows

func NewLogTranslator added in v1.3.4

func NewLogTranslator(orgID, bucket, objectID string, fingerprintTenantManager *fingerprint.TenantManager) *LogTranslator

NewLogTranslator creates a new LogTranslator with the specified metadata

func (*LogTranslator) TranslateRow added in v1.3.4

func (t *LogTranslator) TranslateRow(ctx context.Context, row *pipeline.Row) error

TranslateRow adds resource fields to each row

type MessageGatherer added in v1.3.5

type MessageGatherer[M messages.CompactionMessage, K messages.CompactionKeyInterface] interface {
	// contains filtered or unexported methods
}

MessageGatherer defines the interface for processing messages and idle groups

type MetricCompactionBoxerConsumer added in v1.3.5

type MetricCompactionBoxerConsumer struct {
	*CommonConsumer[*messages.MetricCompactionMessage, messages.CompactionKey]
}

MetricCompactionBoxerConsumer handles metric compaction bundling using CommonConsumer

func NewMetricCompactionBoxerConsumer added in v1.3.5

func NewMetricCompactionBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*MetricCompactionBoxerConsumer, error)

NewMetricCompactionBoxerConsumer creates a new metric compaction boxer consumer using the common consumer framework

type MetricCompactionBoxerProcessor added in v1.3.5

type MetricCompactionBoxerProcessor struct {
	// contains filtered or unexported fields
}

MetricCompactionBoxerProcessor implements the Processor interface for boxing metric compaction bundles

func (*MetricCompactionBoxerProcessor) GetTargetRecordCount added in v1.3.5

func (b *MetricCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.CompactionKey) int64

GetTargetRecordCount returns the estimated record count for the frequency

func (*MetricCompactionBoxerProcessor) Process added in v1.3.5

func (b *MetricCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.CompactionKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the Processor interface and sends the bundle to the compaction topic

func (*MetricCompactionBoxerProcessor) ShouldEmitImmediately added in v1.7.0

ShouldEmitImmediately returns false - metric compaction always uses normal grouping.

type MetricCompactionConsumer added in v1.3.4

type MetricCompactionConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

MetricCompactionConsumer handles metric compaction bundles from the work queue

func NewMetricCompactionConsumer added in v1.3.4

func NewMetricCompactionConsumer(
	ctx context.Context,
	cfg *config.Config,
	store MetricCompactionStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*MetricCompactionConsumer, error)

NewMetricCompactionConsumer creates a new metric compaction consumer that processes bundles from the work queue

type MetricCompactionProcessor added in v1.3.4

type MetricCompactionProcessor struct {
	// contains filtered or unexported fields
}

MetricCompactionProcessor implements compaction processing for metrics

func NewMetricCompactionProcessor added in v1.3.5

func NewMetricCompactionProcessor(
	store MetricCompactionStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
	cfg *config.Config,
) *MetricCompactionProcessor

NewMetricCompactionProcessor creates a new metric compaction processor

func (*MetricCompactionProcessor) GetTargetRecordCount added in v1.3.4

func (p *MetricCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.CompactionKey) int64

GetTargetRecordCount returns the target record count for a grouping key

func (*MetricCompactionProcessor) ProcessBundle added in v1.3.5

func (p *MetricCompactionProcessor) ProcessBundle(ctx context.Context, key messages.CompactionKey, msgs []*messages.MetricCompactionMessage, partition int32, offset int64) error

ProcessBundle processes a compaction bundle directly (simplified interface)

func (*MetricCompactionProcessor) ProcessBundleFromQueue added in v1.6.0

func (p *MetricCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue processes a metric compaction bundle from the work queue

func (*MetricCompactionProcessor) ShouldEmitImmediately added in v1.7.0

func (p *MetricCompactionProcessor) ShouldEmitImmediately(msg *messages.MetricCompactionMessage) bool

ShouldEmitImmediately returns false - metric compaction always uses normal grouping.

type MetricCompactionStore added in v1.3.4

type MetricCompactionStore interface {
	workqueue.DB
	GetMetricSeg(ctx context.Context, params lrdb.GetMetricSegParams) (lrdb.MetricSeg, error)
	CompactMetricSegments(ctx context.Context, params lrdb.CompactMetricSegsParams) error
	MarkMetricSegsCompactedByKeys(ctx context.Context, params lrdb.MarkMetricSegsCompactedByKeysParams) error
	GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
}

MetricCompactionStore defines database operations needed for metric compaction

type MetricFileMetadata added in v1.9.0

type MetricFileMetadata struct {
	StartTs      int64    // Inclusive start timestamp in ms
	EndTs        int64    // Exclusive end timestamp in ms (lastTs + 1)
	Hour         int16    // Hour of day for the first timestamp
	Fingerprints []int64  // Fingerprints for indexing
	MetricNames  []string // Unique metric names, sorted
	MetricTypes  []int16  // Metric types parallel to MetricNames
	LabelNameMap []byte   // JSON map of label columns
}

MetricFileMetadata contains metadata extracted from the parquet file

type MetricIngestBoxerConsumer added in v1.4.5

type MetricIngestBoxerConsumer struct {
	*CommonConsumer[*messages.ObjStoreNotificationMessage, messages.IngestKey]
}

MetricIngestBoxerConsumer handles metric ingestion bundling using CommonConsumer

func NewMetricIngestBoxerConsumer added in v1.4.5

func NewMetricIngestBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*MetricIngestBoxerConsumer, error)

NewMetricIngestBoxerConsumer creates a new metric ingestion boxer consumer using the common consumer framework

type MetricIngestBoxerProcessor added in v1.4.5

type MetricIngestBoxerProcessor struct {
	// contains filtered or unexported fields
}

MetricIngestBoxerProcessor implements the Processor interface for boxing metric ingestion bundles

func (*MetricIngestBoxerProcessor) GetTargetRecordCount added in v1.4.5

func (b *MetricIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64

GetTargetRecordCount returns the target file size limit for metric ingestion batching

func (*MetricIngestBoxerProcessor) Process added in v1.4.5

func (b *MetricIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the Processor interface and sends the bundle to the ingestion processing topic

func (*MetricIngestBoxerProcessor) ShouldEmitImmediately added in v1.7.0

ShouldEmitImmediately returns false - metric ingest always uses normal grouping.

type MetricIngestConsumer added in v1.3.4

type MetricIngestConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

MetricIngestConsumer handles metric ingest bundles from the work queue

func NewMetricIngestConsumer added in v1.3.4

func NewMetricIngestConsumer(
	ctx context.Context,
	cfg *config.Config,
	kafkaFactory *fly.Factory,
	store MetricIngestStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*MetricIngestConsumer, error)

NewMetricIngestConsumer creates a new metric ingest consumer that processes bundles from the work queue

type MetricIngestDuckDBResult added in v1.9.0

type MetricIngestDuckDBResult struct {
	DateintBins map[int32]*DateintBinResult
	TotalRows   int64
	// FailedPartitions tracks dateint keys that failed processing with their errors.
	// Processing uses best-effort strategy: failures are recorded but don't stop
	// processing of other partitions. Callers should check this to detect partial results.
	FailedPartitions map[int64]error
}

MetricIngestDuckDBResult contains the results of DuckDB-based metric ingestion

type MetricIngestProcessor added in v1.3.4

type MetricIngestProcessor struct {
	// contains filtered or unexported fields
}

MetricIngestProcessor implements the Processor interface for raw metric ingestion

func (*MetricIngestProcessor) GetTargetRecordCount added in v1.3.4

func (p *MetricIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64

GetTargetRecordCount returns the target file size limit (20MB) for accumulation

func (*MetricIngestProcessor) ProcessBundle added in v1.4.5

func (p *MetricIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, msgs []*messages.ObjStoreNotificationMessage, partition int32, offset int64) error

ProcessBundle implements the Processor interface and performs raw metric ingestion

func (*MetricIngestProcessor) ProcessBundleFromQueue added in v1.6.0

func (p *MetricIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration

func (*MetricIngestProcessor) ShouldEmitImmediately added in v1.7.0

func (p *MetricIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool

ShouldEmitImmediately returns false - metric ingest always uses normal grouping.

type MetricIngestStore added in v1.3.4

type MetricIngestStore interface {
	workqueue.DB
	InsertMetricSegmentsBatch(ctx context.Context, segments []lrdb.InsertMetricSegmentParams) error
	GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
}

MetricIngestStore defines database operations needed for metric ingestion

type MetricRollupBoxerConsumer added in v1.3.5

type MetricRollupBoxerConsumer struct {
	*CommonConsumer[*messages.MetricRollupMessage, messages.RollupKey]
}

MetricRollupBoxerConsumer handles metric rollup bundling using CommonConsumer

func NewMetricBoxerConsumer added in v1.3.5

func NewMetricBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*MetricRollupBoxerConsumer, error)

NewMetricBoxerConsumer creates a new metric boxer consumer using the common consumer framework

type MetricRollupBoxerProcessor added in v1.3.5

type MetricRollupBoxerProcessor struct {
	// contains filtered or unexported fields
}

MetricRollupBoxerProcessor implements the Processor interface for boxing metric rollup bundles

func (*MetricRollupBoxerProcessor) GetTargetRecordCount added in v1.3.5

func (b *MetricRollupBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.RollupKey) int64

GetTargetRecordCount returns the estimated record count for the target frequency

func (*MetricRollupBoxerProcessor) Process added in v1.3.5

func (b *MetricRollupBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.RollupKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the Processor interface and sends the bundle to the rollup topic

func (*MetricRollupBoxerProcessor) ShouldEmitImmediately added in v1.7.0

func (b *MetricRollupBoxerProcessor) ShouldEmitImmediately(msg *messages.MetricRollupMessage) bool

ShouldEmitImmediately returns false - metric rollup always uses normal grouping.

type MetricRollupConsumer added in v1.3.4

type MetricRollupConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

MetricRollupConsumer handles metric rollup bundles from the work queue

func NewMetricRollupConsumer added in v1.3.4

func NewMetricRollupConsumer(
	ctx context.Context,
	cfg *config.Config,
	kafkaFactory *fly.Factory,
	store MetricRollupStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*MetricRollupConsumer, error)

NewMetricRollupConsumer creates a new metric rollup consumer that processes bundles from the work queue

type MetricRollupProcessor added in v1.3.4

type MetricRollupProcessor struct {
	// contains filtered or unexported fields
}

MetricRollupProcessor implements the Processor interface for metric rollups

func (*MetricRollupProcessor) GetTargetRecordCount added in v1.3.4

func (r *MetricRollupProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.RollupKey) int64

GetTargetRecordCount returns the target record count for a rollup grouping key

func (*MetricRollupProcessor) ProcessBundle added in v1.3.5

func (r *MetricRollupProcessor) ProcessBundle(ctx context.Context, bundle *messages.MetricRollupBundle, partition int32, offset int64) error

ProcessBundle processes a MetricRollupBundle directly (simplified interface)

func (*MetricRollupProcessor) ProcessBundleFromQueue added in v1.6.0

func (r *MetricRollupProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration

func (*MetricRollupProcessor) ShouldEmitImmediately added in v1.7.0

func (r *MetricRollupProcessor) ShouldEmitImmediately(msg *messages.MetricRollupMessage) bool

ShouldEmitImmediately returns false - metric rollup always uses normal grouping.

type MetricRollupStore added in v1.3.5

type MetricRollupStore interface {
	workqueue.DB
	GetMetricSeg(ctx context.Context, params lrdb.GetMetricSegParams) (lrdb.MetricSeg, error)
	RollupMetricSegments(ctx context.Context, sourceParams lrdb.RollupSourceParams, targetParams lrdb.RollupTargetParams, sourceSegmentIDs []int64, newRecords []lrdb.RollupNewRecord) error
	KafkaOffsetsAfter(ctx context.Context, params lrdb.KafkaOffsetsAfterParams) ([]int64, error)
	CleanupKafkaOffsets(ctx context.Context, params lrdb.CleanupKafkaOffsetsParams) (int64, error)
	InsertKafkaOffsets(ctx context.Context, params lrdb.InsertKafkaOffsetsParams) error
	GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
}

MetricRollupStore defines database operations needed for rollups

type OffsetTrackerStore added in v1.3.6

type OffsetTrackerStore interface {
	KafkaOffsetsAfter(ctx context.Context, params lrdb.KafkaOffsetsAfterParams) ([]int64, error)
	CleanupKafkaOffsets(ctx context.Context, params lrdb.CleanupKafkaOffsetsParams) (int64, error)
	InsertKafkaOffsets(ctx context.Context, params lrdb.InsertKafkaOffsetsParams) error
}

OffsetTrackerStore defines the interface for sync mode offset tracking

type ParquetLogTranslatingReader added in v1.5.0

type ParquetLogTranslatingReader struct {
	// contains filtered or unexported fields
}

ParquetLogTranslatingReader wraps a reader and translates parquet logs to CardinalHQ format. It properly transforms the schema to reflect row transformations (resource_ prefixing, etc.)

func NewParquetLogTranslatingReader added in v1.5.0

func NewParquetLogTranslatingReader(wrapped filereader.Reader, orgID, bucket, objectID string) *ParquetLogTranslatingReader

NewParquetLogTranslatingReader creates a reader that translates parquet logs

func (*ParquetLogTranslatingReader) Close added in v1.5.0

Close closes the wrapped reader

func (*ParquetLogTranslatingReader) GetSchema added in v1.5.0

GetSchema returns the transformed schema

func (*ParquetLogTranslatingReader) Next added in v1.5.0

Next reads and translates the next batch of rows

func (*ParquetLogTranslatingReader) TotalRowsReturned added in v1.5.0

func (r *ParquetLogTranslatingReader) TotalRowsReturned() int64

TotalRowsReturned returns the total number of rows successfully returned

type QueueWorkerConsumer added in v1.6.0

type QueueWorkerConsumer struct {
	// contains filtered or unexported fields
}

QueueWorkerConsumer is a consumer that pulls work from the PostgreSQL work queue

func NewQueueWorkerConsumer added in v1.6.0

func NewQueueWorkerConsumer(
	manager *workqueue.Manager,
	processor BundleProcessor,
	taskName string,
	concurrency int,
) *QueueWorkerConsumer

NewQueueWorkerConsumer creates a new queue-based worker consumer. Concurrency defaults to runtime.NumCPU() if not specified or if 0 is passed.

func (*QueueWorkerConsumer) Close added in v1.6.0

func (c *QueueWorkerConsumer) Close() error

Close closes the consumer

func (*QueueWorkerConsumer) Run added in v1.6.0

Run starts the consumer, processing work items from the queue. It spawns c.concurrency worker goroutines that each independently request and process work items.

type TraceCompactionBoxerConsumer added in v1.3.5

type TraceCompactionBoxerConsumer struct {
	*CommonConsumer[*messages.TraceCompactionMessage, messages.TraceCompactionKey]
}

TraceCompactionBoxerConsumer handles trace compaction bundling using CommonConsumer

func NewTraceCompactionBoxerConsumer added in v1.3.5

func NewTraceCompactionBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*TraceCompactionBoxerConsumer, error)

NewTraceCompactionBoxerConsumer creates a new trace compaction boxer consumer using the common consumer framework

type TraceCompactionBoxerProcessor added in v1.3.5

type TraceCompactionBoxerProcessor struct {
	// contains filtered or unexported fields
}

TraceCompactionBoxerProcessor implements the Processor interface for boxing trace compaction bundles

func (*TraceCompactionBoxerProcessor) GetTargetRecordCount added in v1.3.5

func (b *TraceCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.TraceCompactionKey) int64

GetTargetRecordCount returns the estimated record count for traces

func (*TraceCompactionBoxerProcessor) Process added in v1.3.5

func (b *TraceCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.TraceCompactionKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the Processor interface and sends the bundle to the compaction topic

func (*TraceCompactionBoxerProcessor) ShouldEmitImmediately added in v1.7.0

func (b *TraceCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.TraceCompactionMessage) bool

ShouldEmitImmediately returns false - trace compaction always uses normal grouping.

type TraceCompactionConsumer added in v1.3.4

type TraceCompactionConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

TraceCompactionConsumer handles trace compaction bundles from the work queue

func NewTraceCompactionConsumer added in v1.3.4

func NewTraceCompactionConsumer(
	ctx context.Context,
	cfg *config.Config,
	store TraceCompactionStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*TraceCompactionConsumer, error)

NewTraceCompactionConsumer creates a new trace compaction consumer that processes bundles from the work queue

type TraceCompactionProcessor added in v1.3.4

type TraceCompactionProcessor struct {
	// contains filtered or unexported fields
}

TraceCompactionProcessor implements compaction processing for traces

func NewTraceCompactionProcessor added in v1.3.5

func NewTraceCompactionProcessor(
	store TraceCompactionStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
	cfg *config.Config,
) *TraceCompactionProcessor

NewTraceCompactionProcessor creates a new trace compaction processor

func (*TraceCompactionProcessor) GetTargetRecordCount added in v1.3.4

func (p *TraceCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.TraceCompactionKey) int64

GetTargetRecordCount returns the target record count for a grouping key

func (*TraceCompactionProcessor) ProcessBundle added in v1.3.5

func (p *TraceCompactionProcessor) ProcessBundle(ctx context.Context, key messages.TraceCompactionKey, msgs []*messages.TraceCompactionMessage, partition int32, offset int64) error

ProcessBundle processes a bundle of trace compaction messages directly

func (*TraceCompactionProcessor) ProcessBundleFromQueue added in v1.6.0

func (p *TraceCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue processes a trace compaction bundle from the work queue

func (*TraceCompactionProcessor) ProcessWork added in v1.3.5

func (p *TraceCompactionProcessor) ProcessWork(
	ctx context.Context,
	tmpDir string,
	storageClient cloudstorage.Client,
	storageProfile storageprofile.StorageProfile,
	key messages.TraceCompactionKey,
	msgs []*messages.TraceCompactionMessage,
	partition int32,
	offset int64,
) error

ProcessWork handles the trace-specific work logic

func (*TraceCompactionProcessor) ShouldEmitImmediately added in v1.7.0

func (p *TraceCompactionProcessor) ShouldEmitImmediately(msg *messages.TraceCompactionMessage) bool

ShouldEmitImmediately returns false - trace compaction always uses normal grouping.

type TraceCompactionStore added in v1.3.4

type TraceCompactionStore interface {
	workqueue.DB
	GetTraceSeg(ctx context.Context, params lrdb.GetTraceSegParams) (lrdb.TraceSeg, error)
	CompactTraceSegments(ctx context.Context, params lrdb.CompactTraceSegsParams) error
	MarkTraceSegsCompactedByKeys(ctx context.Context, params lrdb.MarkTraceSegsCompactedByKeysParams) error
	GetTraceEstimate(ctx context.Context, orgID uuid.UUID) int64
}

TraceCompactionStore defines database operations needed for trace compaction

type TraceDateintBin added in v1.3.4

type TraceDateintBin struct {
	Dateint int32 // The dateint for this bin
	Writer  parquetwriter.ParquetWriter
	Results []parquetwriter.Result // Results after writer is closed (can be multiple files)
}

TraceDateintBin represents a file group containing traces for a specific dateint

type TraceDateintBinManager added in v1.3.4

type TraceDateintBinManager struct {
	// contains filtered or unexported fields
}

TraceDateintBinManager manages multiple file groups, one per dateint

type TraceDateintBinResult added in v1.9.1

type TraceDateintBinResult struct {
	Dateint     int32
	OutputFile  string // Local parquet file path
	RecordCount int64
	FileSize    int64
	Metadata    *TraceFileMetadata
}

TraceDateintBinResult contains the result for a single dateint partition

type TraceFileMetadata added in v1.9.1

type TraceFileMetadata struct {
	StartTs      int64   // Inclusive start timestamp in ms
	EndTs        int64   // Exclusive end timestamp in ms (lastTs + 1)
	Hour         int16   // Hour of day for the first timestamp
	Fingerprints []int64 // Span fingerprints for indexing
	LabelNameMap []byte  // JSON map of label columns
}

TraceFileMetadata contains metadata extracted from the trace parquet file

type TraceIDTimestampSortKey added in v1.3.4

type TraceIDTimestampSortKey struct {
	TraceID   string
	Timestamp int64
	TraceOk   bool
	TsOk      bool
}

TraceIDTimestampSortKey represents a sort key based on trace_id first, then timestamp

func (*TraceIDTimestampSortKey) Compare added in v1.3.4

func (k *TraceIDTimestampSortKey) Compare(other filereader.SortKey) int

Compare implements filereader.SortKey interface for TraceIDTimestampSortKey

func (*TraceIDTimestampSortKey) Release added in v1.3.4

func (k *TraceIDTimestampSortKey) Release()

Release returns the TraceIDTimestampSortKey to the pool for reuse

type TraceIDTimestampSortKeyProvider added in v1.3.4

type TraceIDTimestampSortKeyProvider struct{}

TraceIDTimestampSortKeyProvider creates TraceIDTimestampSortKey instances from rows

func (*TraceIDTimestampSortKeyProvider) MakeKey added in v1.3.4

MakeKey implements filereader.SortKeyProvider interface for trace ID + timestamp sorting

type TraceIngestBoxerConsumer added in v1.4.5

type TraceIngestBoxerConsumer struct {
	*CommonConsumer[*messages.ObjStoreNotificationMessage, messages.IngestKey]
}

TraceIngestBoxerConsumer handles trace ingestion bundling using CommonConsumer

func NewTraceIngestBoxerConsumer added in v1.4.5

func NewTraceIngestBoxerConsumer(
	ctx context.Context,
	cfg *config.Config,
	store BoxerStore,
	factory *fly.Factory,
) (*TraceIngestBoxerConsumer, error)

NewTraceIngestBoxerConsumer creates a new trace ingestion boxer consumer using the common consumer framework

type TraceIngestBoxerProcessor added in v1.4.5

type TraceIngestBoxerProcessor struct {
	// contains filtered or unexported fields
}

TraceIngestBoxerProcessor implements the Processor interface for boxing trace ingestion bundles

func (*TraceIngestBoxerProcessor) GetTargetRecordCount added in v1.4.5

func (b *TraceIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64

GetTargetRecordCount returns the target file size limit for trace ingestion batching

func (*TraceIngestBoxerProcessor) Process added in v1.4.5

func (b *TraceIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error

Process implements the Processor interface and sends the bundle to the ingestion processing topic

func (*TraceIngestBoxerProcessor) ShouldEmitImmediately added in v1.7.0

func (b *TraceIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool

ShouldEmitImmediately returns false - trace ingest always uses normal grouping.

type TraceIngestConsumer added in v1.3.4

type TraceIngestConsumer struct {
	*QueueWorkerConsumer
	// contains filtered or unexported fields
}

TraceIngestConsumer handles trace ingest bundles from the work queue

func NewTraceIngestConsumer added in v1.3.4

func NewTraceIngestConsumer(
	ctx context.Context,
	cfg *config.Config,
	kafkaFactory *fly.Factory,
	store TraceIngestStore,
	storageProvider storageprofile.StorageProfileProvider,
	cmgr cloudstorage.ClientProvider,
) (*TraceIngestConsumer, error)

NewTraceIngestConsumer creates a new trace ingest consumer that processes bundles from the work queue

type TraceIngestDuckDBResult added in v1.9.1

type TraceIngestDuckDBResult struct {
	DateintBins map[int32]*TraceDateintBinResult
	TotalRows   int64
	// FailedPartitions tracks dateint keys that failed processing with their errors.
	FailedPartitions map[int64]error
}

TraceIngestDuckDBResult contains the results of DuckDB-based trace ingestion

type TraceIngestProcessor added in v1.3.4

type TraceIngestProcessor struct {
	// contains filtered or unexported fields
}

TraceIngestProcessor implements the Processor interface for raw trace ingestion

func (*TraceIngestProcessor) GetTargetRecordCount added in v1.3.4

func (p *TraceIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64

GetTargetRecordCount returns the target file size limit (5MB) for accumulation

func (*TraceIngestProcessor) ProcessBundle added in v1.4.5

func (p *TraceIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, msgs []*messages.ObjStoreNotificationMessage, partition int32, offset int64) error

ProcessBundle implements the ProcessBundle pattern for raw trace ingestion

func (*TraceIngestProcessor) ProcessBundleFromQueue added in v1.6.0

func (p *TraceIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error

ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration

func (*TraceIngestProcessor) ShouldEmitImmediately added in v1.7.0

func (p *TraceIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool

ShouldEmitImmediately returns false - trace ingest always uses normal grouping.

type TraceIngestStore added in v1.3.4

type TraceIngestStore interface {
	workqueue.DB
	InsertTraceSegmentsBatch(ctx context.Context, segments []lrdb.InsertTraceSegmentParams) error
	GetTraceEstimate(ctx context.Context, orgID uuid.UUID) int64
}

TraceIngestStore defines database operations needed for trace ingestion

type TraceTranslator added in v1.3.4

type TraceTranslator struct {
	// contains filtered or unexported fields
}

TraceTranslator adds resource metadata to trace rows

func NewTraceTranslator added in v1.3.4

func NewTraceTranslator(orgID, bucket, objectID string) *TraceTranslator

NewTraceTranslator creates a new TraceTranslator with the specified metadata

func (*TraceTranslator) TranslateRow added in v1.3.4

func (t *TraceTranslator) TranslateRow(_ context.Context, row *pipeline.Row) error

TranslateRow adds resource fields to each row

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL