Documentation
¶
Index ¶
- func CleanupCommittedOffsets(ctx context.Context, store CleanupStore, topic, consumerGroup string, ...)
- type BoxerConsumer
- type BoxerManager
- type BoxerStore
- type BundleProcessor
- type CleanupStore
- type CommonConsumer
- func NewCommonConsumer[M messages.CompactionMessage, K messages.CompactionKeyInterface](ctx context.Context, factory FlyConsumerFactory, cfg *config.Config, ...) (*CommonConsumer[M, K], error)
- func NewCommonConsumerWithComponents[M messages.CompactionMessage, K messages.CompactionKeyInterface](ctx context.Context, consumer fly.Consumer, ...) *CommonConsumer[M, K]
- type CommonConsumerConfig
- type ConfigMismatchError
- type ConsumerFactory
- type DateintBin
- type DateintBinManager
- type DateintBinResult
- type DefaultConsumerFactory
- func (f *DefaultConsumerFactory) CreateLogCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
- func (f *DefaultConsumerFactory) CreateLogIngestConsumer(ctx context.Context) (BoxerConsumer, error)
- func (f *DefaultConsumerFactory) CreateMetricCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
- func (f *DefaultConsumerFactory) CreateMetricIngestConsumer(ctx context.Context) (BoxerConsumer, error)
- func (f *DefaultConsumerFactory) CreateMetricRollupConsumer(ctx context.Context) (BoxerConsumer, error)
- func (f *DefaultConsumerFactory) CreateTraceCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
- func (f *DefaultConsumerFactory) CreateTraceIngestConsumer(ctx context.Context) (BoxerConsumer, error)
- type FlyConsumerFactory
- type GroupValidationError
- type KafkaCommitData
- type LogCompactionBoxerConsumer
- type LogCompactionBoxerProcessor
- func (p *LogCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.LogCompactionKey) int64
- func (p *LogCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.LogCompactionKey], ...) error
- func (p *LogCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.LogCompactionMessage) bool
- type LogCompactionConsumer
- type LogCompactionProcessor
- func (p *LogCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.LogCompactionKey) int64
- func (p *LogCompactionProcessor) ProcessBundle(ctx context.Context, key messages.LogCompactionKey, ...) error
- func (p *LogCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (p *LogCompactionProcessor) ShouldEmitImmediately(msg *messages.LogCompactionMessage) bool
- type LogCompactionStore
- type LogDateintBinResult
- type LogFileMetadata
- type LogIngestBoxerConsumer
- type LogIngestBoxerProcessor
- func (b *LogIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
- func (b *LogIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], ...) error
- func (b *LogIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
- type LogIngestConsumer
- type LogIngestDuckDBResult
- type LogIngestProcessor
- func (p *LogIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
- func (p *LogIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, ...) error
- func (p *LogIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (p *LogIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
- type LogIngestStore
- type LogIngestionResult
- type LogTranslatingReader
- type LogTranslator
- type MessageGatherer
- type MetricCompactionBoxerConsumer
- type MetricCompactionBoxerProcessor
- func (b *MetricCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.CompactionKey) int64
- func (b *MetricCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.CompactionKey], ...) error
- func (b *MetricCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.MetricCompactionMessage) bool
- type MetricCompactionConsumer
- type MetricCompactionProcessor
- func (p *MetricCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.CompactionKey) int64
- func (p *MetricCompactionProcessor) ProcessBundle(ctx context.Context, key messages.CompactionKey, ...) error
- func (p *MetricCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (p *MetricCompactionProcessor) ShouldEmitImmediately(msg *messages.MetricCompactionMessage) bool
- type MetricCompactionStore
- type MetricFileMetadata
- type MetricIngestBoxerConsumer
- type MetricIngestBoxerProcessor
- func (b *MetricIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
- func (b *MetricIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], ...) error
- func (b *MetricIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
- type MetricIngestConsumer
- type MetricIngestDuckDBResult
- type MetricIngestProcessor
- func (p *MetricIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
- func (p *MetricIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, ...) error
- func (p *MetricIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (p *MetricIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
- type MetricIngestStore
- type MetricRollupBoxerConsumer
- type MetricRollupBoxerProcessor
- func (b *MetricRollupBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.RollupKey) int64
- func (b *MetricRollupBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.RollupKey], ...) error
- func (b *MetricRollupBoxerProcessor) ShouldEmitImmediately(msg *messages.MetricRollupMessage) bool
- type MetricRollupConsumer
- type MetricRollupProcessor
- func (r *MetricRollupProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.RollupKey) int64
- func (r *MetricRollupProcessor) ProcessBundle(ctx context.Context, bundle *messages.MetricRollupBundle, partition int32, ...) error
- func (r *MetricRollupProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (r *MetricRollupProcessor) ShouldEmitImmediately(msg *messages.MetricRollupMessage) bool
- type MetricRollupStore
- type OffsetTrackerStore
- type ParquetLogTranslatingReader
- type QueueWorkerConsumer
- type TraceCompactionBoxerConsumer
- type TraceCompactionBoxerProcessor
- func (b *TraceCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.TraceCompactionKey) int64
- func (b *TraceCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.TraceCompactionKey], ...) error
- func (b *TraceCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.TraceCompactionMessage) bool
- type TraceCompactionConsumer
- type TraceCompactionProcessor
- func (p *TraceCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.TraceCompactionKey) int64
- func (p *TraceCompactionProcessor) ProcessBundle(ctx context.Context, key messages.TraceCompactionKey, ...) error
- func (p *TraceCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (p *TraceCompactionProcessor) ProcessWork(ctx context.Context, tmpDir string, storageClient cloudstorage.Client, ...) error
- func (p *TraceCompactionProcessor) ShouldEmitImmediately(msg *messages.TraceCompactionMessage) bool
- type TraceCompactionStore
- type TraceDateintBin
- type TraceDateintBinManager
- type TraceDateintBinResult
- type TraceFileMetadata
- type TraceIDTimestampSortKey
- type TraceIDTimestampSortKeyProvider
- type TraceIngestBoxerConsumer
- type TraceIngestBoxerProcessor
- func (b *TraceIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
- func (b *TraceIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], ...) error
- func (b *TraceIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
- type TraceIngestConsumer
- type TraceIngestDuckDBResult
- type TraceIngestProcessor
- func (p *TraceIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
- func (p *TraceIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, ...) error
- func (p *TraceIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
- func (p *TraceIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
- type TraceIngestStore
- type TraceTranslator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CleanupCommittedOffsets ¶ added in v1.3.6
func CleanupCommittedOffsets(ctx context.Context, store CleanupStore, topic, consumerGroup string, messages []fly.ConsumedMessage)
CleanupCommittedOffsets removes old offset tracking records after successful Kafka commit. This is a helper function for consumers that don't use CommonConsumer.
Types ¶
type BoxerConsumer ¶ added in v1.4.1
BoxerConsumer represents a generic boxer consumer interface
type BoxerManager ¶ added in v1.4.1
type BoxerManager struct {
// contains filtered or unexported fields
}
BoxerManager coordinates multiple boxer consumers running concurrently
func NewBoxerManager ¶ added in v1.4.1
func NewBoxerManager(ctx context.Context, cfg *config.Config, store BoxerStore, tasks []string) (*BoxerManager, error)
NewBoxerManager creates a new BoxerManager with the specified tasks
func NewBoxerManagerWithFactory ¶ added in v1.4.1
func NewBoxerManagerWithFactory(ctx context.Context, factory ConsumerFactory, tasks []string) (*BoxerManager, error)
NewBoxerManagerWithFactory creates a new BoxerManager with a custom factory (for testing)
func (*BoxerManager) Close ¶ added in v1.4.1
func (m *BoxerManager) Close() error
Close stops all consumers
type BoxerStore ¶ added in v1.3.5
type BoxerStore interface {
OffsetTrackerStore
GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
GetTraceEstimate(ctx context.Context, orgID uuid.UUID) int64
GetLogEstimate(ctx context.Context, orgID uuid.UUID) int64
WorkQueueAdd(ctx context.Context, arg lrdb.WorkQueueAddParams) (lrdb.WorkQueue, error)
WorkQueueDepth(ctx context.Context, taskName string) (int64, error)
WorkQueueCleanup(ctx context.Context, heartbeatTimeout time.Duration) error
}
BoxerStore defines the interface required by all boxer processors. Boxers only bundle messages and need estimation methods for optimal batching. They do not perform actual processing operations (compaction, etc.).
type BundleProcessor ¶ added in v1.6.0
type BundleProcessor interface {
ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
}
BundleProcessor defines the interface for processing a bundle from the work queue
type CleanupStore ¶ added in v1.3.6
type CleanupStore interface {
CleanupKafkaOffsets(ctx context.Context, params lrdb.CleanupKafkaOffsetsParams) (int64, error)
}
CleanupStore defines the interface for stores that support offset cleanup
type CommonConsumer ¶ added in v1.3.5
type CommonConsumer[M messages.CompactionMessage, K messages.CompactionKeyInterface] struct { // contains filtered or unexported fields }
CommonConsumer is a generic consumer for any message processing operations
func NewCommonConsumer ¶ added in v1.3.5
func NewCommonConsumer[M messages.CompactionMessage, K messages.CompactionKeyInterface]( ctx context.Context, factory FlyConsumerFactory, cfg *config.Config, consumerConfig CommonConsumerConfig, store offsetStore, processor processor[M, K], ) (*CommonConsumer[M, K], error)
NewCommonConsumer creates a new generic common consumer
func NewCommonConsumerWithComponents ¶ added in v1.3.5
func NewCommonConsumerWithComponents[M messages.CompactionMessage, K messages.CompactionKeyInterface]( ctx context.Context, consumer fly.Consumer, consumerConfig CommonConsumerConfig, store offsetStore, processor processor[M, K], ) *CommonConsumer[M, K]
NewCommonConsumerWithComponents creates a consumer with provided components (for testability)
func (*CommonConsumer[M, K]) Close ¶ added in v1.3.5
func (c *CommonConsumer[M, K]) Close() error
Close stops the consumer and cleans up resources
type CommonConsumerConfig ¶ added in v1.3.5
type CommonConsumerConfig struct {
ConsumerName string
Topic string
ConsumerGroup string
FlushInterval time.Duration
StaleAge time.Duration
MaxAge time.Duration
}
CommonConsumerConfig holds configuration for a common consumer
type ConfigMismatchError ¶ added in v1.3.4
type ConfigMismatchError struct {
Field string // "topic" or "consumer_group"
Expected string
Got string
}
ConfigMismatchError represents an error when message metadata doesn't match gatherer configuration
func (*ConfigMismatchError) Error ¶ added in v1.3.4
func (e *ConfigMismatchError) Error() string
type ConsumerFactory ¶ added in v1.4.1
type ConsumerFactory interface {
CreateLogIngestConsumer(ctx context.Context) (BoxerConsumer, error)
CreateMetricIngestConsumer(ctx context.Context) (BoxerConsumer, error)
CreateTraceIngestConsumer(ctx context.Context) (BoxerConsumer, error)
CreateLogCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
CreateMetricCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
CreateTraceCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
CreateMetricRollupConsumer(ctx context.Context) (BoxerConsumer, error)
}
ConsumerFactory creates boxer consumers for different task types
type DateintBin ¶ added in v1.3.4
type DateintBin struct {
Dateint int32 // The dateint for this bin
Writer parquetwriter.ParquetWriter
Results []parquetwriter.Result // Results after writer is closed (can be multiple files)
}
DateintBin represents a file group containing logs for a specific dateint
type DateintBinManager ¶ added in v1.3.4
type DateintBinManager struct {
// contains filtered or unexported fields
}
DateintBinManager manages multiple file groups, one per dateint
type DateintBinResult ¶ added in v1.9.0
type DateintBinResult struct {
Dateint int32
OutputFile string // Local parquet file path
RecordCount int64
FileSize int64
Metadata *MetricFileMetadata
}
DateintBinResult contains the result for a single dateint partition
type DefaultConsumerFactory ¶ added in v1.4.1
type DefaultConsumerFactory struct {
// contains filtered or unexported fields
}
DefaultConsumerFactory implements ConsumerFactory using existing consumer constructors
func NewDefaultConsumerFactory ¶ added in v1.4.1
func NewDefaultConsumerFactory(cfg *config.Config, boxerStore BoxerStore) (*DefaultConsumerFactory, error)
NewDefaultConsumerFactory creates a new DefaultConsumerFactory
func (*DefaultConsumerFactory) CreateLogCompactionConsumer ¶ added in v1.4.1
func (f *DefaultConsumerFactory) CreateLogCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
func (*DefaultConsumerFactory) CreateLogIngestConsumer ¶ added in v1.4.5
func (f *DefaultConsumerFactory) CreateLogIngestConsumer(ctx context.Context) (BoxerConsumer, error)
func (*DefaultConsumerFactory) CreateMetricCompactionConsumer ¶ added in v1.4.1
func (f *DefaultConsumerFactory) CreateMetricCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
func (*DefaultConsumerFactory) CreateMetricIngestConsumer ¶ added in v1.4.5
func (f *DefaultConsumerFactory) CreateMetricIngestConsumer(ctx context.Context) (BoxerConsumer, error)
func (*DefaultConsumerFactory) CreateMetricRollupConsumer ¶ added in v1.4.1
func (f *DefaultConsumerFactory) CreateMetricRollupConsumer(ctx context.Context) (BoxerConsumer, error)
func (*DefaultConsumerFactory) CreateTraceCompactionConsumer ¶ added in v1.4.1
func (f *DefaultConsumerFactory) CreateTraceCompactionConsumer(ctx context.Context) (BoxerConsumer, error)
func (*DefaultConsumerFactory) CreateTraceIngestConsumer ¶ added in v1.4.5
func (f *DefaultConsumerFactory) CreateTraceIngestConsumer(ctx context.Context) (BoxerConsumer, error)
type FlyConsumerFactory ¶ added in v1.3.5
type FlyConsumerFactory interface {
CreateConsumer(topic, consumerGroup string) (fly.Consumer, error)
}
FlyConsumerFactory defines the interface for creating Kafka consumers (for testability)
type GroupValidationError ¶ added in v1.3.4
type GroupValidationError struct {
Field string // field that failed validation
Expected interface{} // expected value
Got interface{} // actual value
Message string // description of the validation failure
}
GroupValidationError represents an error when message group validation fails
func (*GroupValidationError) Error ¶ added in v1.3.4
func (e *GroupValidationError) Error() string
type KafkaCommitData ¶ added in v1.3.4
type KafkaCommitData struct {
Topic string
ConsumerGroup string
Offsets map[int32]int64 // partition -> highest offset
}
KafkaCommitData represents Kafka offset data to be committed atomically with SQL operations This is now only used internally by metadataTracker for safe commit offset calculations
type LogCompactionBoxerConsumer ¶ added in v1.3.5
type LogCompactionBoxerConsumer struct {
*CommonConsumer[*messages.LogCompactionMessage, messages.LogCompactionKey]
}
LogCompactionBoxerConsumer handles log compaction using the generic framework
func NewLogCompactionBoxerConsumer ¶ added in v1.3.5
func NewLogCompactionBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*LogCompactionBoxerConsumer, error)
NewLogCompactionBoxerConsumer creates a new log compaction boxer consumer using the generic framework
type LogCompactionBoxerProcessor ¶ added in v1.3.5
type LogCompactionBoxerProcessor struct {
// contains filtered or unexported fields
}
LogCompactionBoxerProcessor implements boxing for log compaction messages. Note: This processor only bundles messages for compaction and only requires GetLogEstimate() from the store. The actual compaction operations (GetLogSeg, CompactLogSegments, etc.) are performed by the compaction processor itself.
func (*LogCompactionBoxerProcessor) GetTargetRecordCount ¶ added in v1.3.5
func (p *LogCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.LogCompactionKey) int64
GetTargetRecordCount returns the target record count for a grouping key
func (*LogCompactionBoxerProcessor) Process ¶ added in v1.3.5
func (p *LogCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.LogCompactionKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the processor interface for log compaction boxing
func (*LogCompactionBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *LogCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.LogCompactionMessage) bool
ShouldEmitImmediately returns false - log compaction always uses normal grouping.
type LogCompactionConsumer ¶ added in v1.3.4
type LogCompactionConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
LogCompactionConsumer handles log compaction bundles from the work queue
func NewLogCompactionConsumer ¶ added in v1.3.4
func NewLogCompactionConsumer( ctx context.Context, cfg *config.Config, store LogCompactionStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*LogCompactionConsumer, error)
NewLogCompactionConsumer creates a new log compaction consumer that processes bundles from the work queue
type LogCompactionProcessor ¶ added in v1.3.4
type LogCompactionProcessor struct {
// contains filtered or unexported fields
}
LogCompactionProcessor implements compaction processing for logs
func NewLogCompactionProcessor ¶ added in v1.3.5
func NewLogCompactionProcessor( store LogCompactionStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, cfg *config.Config, ) *LogCompactionProcessor
NewLogCompactionProcessor creates a new log compaction processor
func (*LogCompactionProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (p *LogCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.LogCompactionKey) int64
GetTargetRecordCount returns the target record count for a grouping key
func (*LogCompactionProcessor) ProcessBundle ¶ added in v1.3.5
func (p *LogCompactionProcessor) ProcessBundle(ctx context.Context, key messages.LogCompactionKey, msgs []*messages.LogCompactionMessage, partition int32, offset int64) error
ProcessBundle processes a compaction bundle directly (simplified interface)
func (*LogCompactionProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (p *LogCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue processes a log compaction bundle from the work queue
func (*LogCompactionProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *LogCompactionProcessor) ShouldEmitImmediately(msg *messages.LogCompactionMessage) bool
ShouldEmitImmediately returns false - log compaction always uses normal grouping.
type LogCompactionStore ¶ added in v1.3.4
type LogCompactionStore interface {
workqueue.DB
GetLogSeg(ctx context.Context, params lrdb.GetLogSegParams) (lrdb.LogSeg, error)
CompactLogSegments(ctx context.Context, params lrdb.CompactLogSegsParams) error
MarkLogSegsCompactedByKeys(ctx context.Context, params lrdb.MarkLogSegsCompactedByKeysParams) error
GetLogEstimate(ctx context.Context, orgID uuid.UUID) int64
}
LogCompactionStore defines database operations needed for log compaction
type LogDateintBinResult ¶ added in v1.9.1
type LogDateintBinResult struct {
Dateint int32
OutputFile string // Local parquet file path
RecordCount int64
FileSize int64
Metadata *LogFileMetadata
}
LogDateintBinResult contains the result for a single dateint partition
type LogFileMetadata ¶ added in v1.9.1
type LogFileMetadata struct {
StartTs int64 // Inclusive start timestamp in ms
EndTs int64 // Exclusive end timestamp in ms (lastTs + 1)
Hour int16 // Hour of day for the first timestamp
Fingerprints []int64 // Unique log fingerprints for semantic grouping
StreamIds []string // Unique stream identifier values
StreamIdField string // Field used for stream identification
LabelNameMap []byte // JSON map of label columns
}
LogFileMetadata contains metadata extracted from the log parquet file
type LogIngestBoxerConsumer ¶ added in v1.4.5
type LogIngestBoxerConsumer struct {
*CommonConsumer[*messages.ObjStoreNotificationMessage, messages.IngestKey]
}
LogIngestBoxerConsumer handles log ingestion bundling using CommonConsumer
func NewLogIngestBoxerConsumer ¶ added in v1.4.5
func NewLogIngestBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*LogIngestBoxerConsumer, error)
NewLogIngestBoxerConsumer creates a new log ingestion boxer consumer using the common consumer framework
type LogIngestBoxerProcessor ¶ added in v1.4.5
type LogIngestBoxerProcessor struct {
// contains filtered or unexported fields
}
LogIngestBoxerProcessor implements the Processor interface for boxing log ingestion bundles
func (*LogIngestBoxerProcessor) GetTargetRecordCount ¶ added in v1.4.5
func (b *LogIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
GetTargetRecordCount returns the target file size limit for log ingestion batching
func (*LogIngestBoxerProcessor) Process ¶ added in v1.4.5
func (b *LogIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the Processor interface and sends the bundle to the ingestion processing topic
func (*LogIngestBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (b *LogIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
ShouldEmitImmediately returns true for parquet files to prevent memory issues when processing large pre-aggregated files that shouldn't be batched together.
type LogIngestConsumer ¶ added in v1.3.4
type LogIngestConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
LogIngestConsumer handles log ingest bundles from the work queue
func NewLogIngestConsumer ¶ added in v1.3.4
func NewLogIngestConsumer( ctx context.Context, cfg *config.Config, kafkaFactory *fly.Factory, store LogIngestStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*LogIngestConsumer, error)
NewLogIngestConsumer creates a new log ingest consumer that processes bundles from the work queue
type LogIngestDuckDBResult ¶ added in v1.9.1
type LogIngestDuckDBResult struct {
DateintBins map[int32]*LogDateintBinResult
TotalRows int64
// FailedPartitions tracks dateint keys that failed processing with their errors.
FailedPartitions map[int64]error
}
LogIngestDuckDBResult contains the results of DuckDB-based log ingestion
type LogIngestProcessor ¶ added in v1.3.4
type LogIngestProcessor struct {
// contains filtered or unexported fields
}
LogIngestProcessor implements the Processor interface for raw log ingestion
func (*LogIngestProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (p *LogIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
GetTargetRecordCount returns the target file size limit (5MB) for accumulation
func (*LogIngestProcessor) ProcessBundle ¶ added in v1.4.5
func (p *LogIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, msgs []*messages.ObjStoreNotificationMessage, partition int32, offset int64) error
Process implements the Processor interface and performs raw log ingestion
func (*LogIngestProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (p *LogIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration
func (*LogIngestProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *LogIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
ShouldEmitImmediately returns false - log ingest processing always uses normal grouping.
type LogIngestStore ¶ added in v1.3.4
type LogIngestStore interface {
workqueue.DB
InsertLogSegmentsBatch(ctx context.Context, segments []lrdb.InsertLogSegmentParams) error
GetLogEstimate(ctx context.Context, orgID uuid.UUID) int64
}
LogIngestStore defines database operations needed for log ingestion
type LogIngestionResult ¶ added in v1.5.0
type LogIngestionResult struct {
OutputFiles []parquetwriter.Result // Parquet files generated
TotalRecords int64 // Total number of log records processed
TotalInputBytes int64 // Total size of input files
TotalOutputBytes int64 // Total size of output files
SegmentParams []lrdb.InsertLogSegmentParams
}
LogIngestionResult contains the results of processing log files
func ProcessLogFiles ¶ added in v1.5.0
func ProcessLogFiles( ctx context.Context, filePaths []string, orgID string, bucket string, outputDir string, rpfEstimate int64, fingerprintManager *fingerprint.TenantManager, backendType parquetwriter.BackendType, streamField string, ) (*LogIngestionResult, error)
ProcessLogFiles performs end-to-end log ingestion on a set of files. This function encapsulates the complete ingestion pipeline and can be called from both the production processor and benchmarks.
Parameters:
- ctx: Context for cancellation and logging
- filePaths: List of file paths to process (can be local files)
- orgID: Organization ID for metadata
- bucket: Bucket name for metadata
- outputDir: Directory to write output Parquet files
- rpfEstimate: Rows-per-file estimate for Parquet writer sizing
- fingerprintManager: Optional fingerprint manager (can be nil for benchmarks)
- backendType: Parquet backend type (empty string defaults to go-parquet)
- streamField: Field to use for stream identification (empty = use default priority)
Returns:
- LogIngestionResult with statistics and output file information
- error if any step fails
type LogTranslatingReader ¶ added in v1.5.0
type LogTranslatingReader struct {
// contains filtered or unexported fields
}
LogTranslatingReader wraps a reader and applies log-specific transformations. Unlike ParquetLogTranslatingReader, this is for non-parquet formats (proto, json, csv).
func NewLogTranslatingReader ¶ added in v1.5.0
func NewLogTranslatingReader(wrapped filereader.Reader, orgID, bucket, objectID string, fingerprintTenantManager *fingerprint.TenantManager) *LogTranslatingReader
NewLogTranslatingReader creates a new reader that applies log-specific transformations.
func (*LogTranslatingReader) Close ¶ added in v1.5.0
func (r *LogTranslatingReader) Close() error
Close closes the wrapped reader
func (*LogTranslatingReader) GetSchema ¶ added in v1.5.0
func (r *LogTranslatingReader) GetSchema() *filereader.ReaderSchema
GetSchema returns the schema with log-specific columns added.
func (*LogTranslatingReader) Next ¶ added in v1.5.0
func (r *LogTranslatingReader) Next(ctx context.Context) (*filereader.Batch, error)
Next returns the next batch of transformed log data.
func (*LogTranslatingReader) TotalRowsReturned ¶ added in v1.5.0
func (r *LogTranslatingReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows successfully returned
type LogTranslator ¶ added in v1.3.4
type LogTranslator struct {
// contains filtered or unexported fields
}
LogTranslator adds resource metadata to log rows
func NewLogTranslator ¶ added in v1.3.4
func NewLogTranslator(orgID, bucket, objectID string, fingerprintTenantManager *fingerprint.TenantManager) *LogTranslator
NewLogTranslator creates a new LogTranslator with the specified metadata
func (*LogTranslator) TranslateRow ¶ added in v1.3.4
TranslateRow adds resource fields to each row
type MessageGatherer ¶ added in v1.3.5
type MessageGatherer[M messages.CompactionMessage, K messages.CompactionKeyInterface] interface { // contains filtered or unexported methods }
MessageGatherer defines the interface for processing messages and idle groups
type MetricCompactionBoxerConsumer ¶ added in v1.3.5
type MetricCompactionBoxerConsumer struct {
*CommonConsumer[*messages.MetricCompactionMessage, messages.CompactionKey]
}
MetricCompactionBoxerConsumer handles metric compaction bundling using CommonConsumer
func NewMetricCompactionBoxerConsumer ¶ added in v1.3.5
func NewMetricCompactionBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*MetricCompactionBoxerConsumer, error)
NewMetricCompactionBoxerConsumer creates a new metric compaction boxer consumer using the common consumer framework
type MetricCompactionBoxerProcessor ¶ added in v1.3.5
type MetricCompactionBoxerProcessor struct {
// contains filtered or unexported fields
}
MetricCompactionBoxerProcessor implements the Processor interface for boxing metric compaction bundles
func (*MetricCompactionBoxerProcessor) GetTargetRecordCount ¶ added in v1.3.5
func (b *MetricCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.CompactionKey) int64
GetTargetRecordCount returns the estimated record count for the frequency
func (*MetricCompactionBoxerProcessor) Process ¶ added in v1.3.5
func (b *MetricCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.CompactionKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the Processor interface and sends the bundle to the compaction topic
func (*MetricCompactionBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (b *MetricCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.MetricCompactionMessage) bool
ShouldEmitImmediately returns false - metric compaction always uses normal grouping.
type MetricCompactionConsumer ¶ added in v1.3.4
type MetricCompactionConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
MetricCompactionConsumer handles metric compaction bundles from the work queue
func NewMetricCompactionConsumer ¶ added in v1.3.4
func NewMetricCompactionConsumer( ctx context.Context, cfg *config.Config, store MetricCompactionStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*MetricCompactionConsumer, error)
NewMetricCompactionConsumer creates a new metric compaction consumer that processes bundles from the work queue
type MetricCompactionProcessor ¶ added in v1.3.4
type MetricCompactionProcessor struct {
// contains filtered or unexported fields
}
MetricCompactionProcessor implements compaction processing for metrics
func NewMetricCompactionProcessor ¶ added in v1.3.5
func NewMetricCompactionProcessor( store MetricCompactionStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, cfg *config.Config, ) *MetricCompactionProcessor
NewMetricCompactionProcessor creates a new metric compaction processor
func (*MetricCompactionProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (p *MetricCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.CompactionKey) int64
GetTargetRecordCount returns the target record count for a grouping key
func (*MetricCompactionProcessor) ProcessBundle ¶ added in v1.3.5
func (p *MetricCompactionProcessor) ProcessBundle(ctx context.Context, key messages.CompactionKey, msgs []*messages.MetricCompactionMessage, partition int32, offset int64) error
ProcessBundle processes a compaction bundle directly (simplified interface)
func (*MetricCompactionProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (p *MetricCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue processes a metric compaction bundle from the work queue
func (*MetricCompactionProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *MetricCompactionProcessor) ShouldEmitImmediately(msg *messages.MetricCompactionMessage) bool
ShouldEmitImmediately returns false - metric compaction always uses normal grouping.
type MetricCompactionStore ¶ added in v1.3.4
type MetricCompactionStore interface {
workqueue.DB
GetMetricSeg(ctx context.Context, params lrdb.GetMetricSegParams) (lrdb.MetricSeg, error)
CompactMetricSegments(ctx context.Context, params lrdb.CompactMetricSegsParams) error
MarkMetricSegsCompactedByKeys(ctx context.Context, params lrdb.MarkMetricSegsCompactedByKeysParams) error
GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
}
MetricCompactionStore defines database operations needed for metric compaction
type MetricFileMetadata ¶ added in v1.9.0
type MetricFileMetadata struct {
StartTs int64 // Inclusive start timestamp in ms
EndTs int64 // Exclusive end timestamp in ms (lastTs + 1)
Hour int16 // Hour of day for the first timestamp
Fingerprints []int64 // Fingerprints for indexing
MetricNames []string // Unique metric names, sorted
MetricTypes []int16 // Metric types parallel to MetricNames
LabelNameMap []byte // JSON map of label columns
}
MetricFileMetadata contains metadata extracted from the parquet file
type MetricIngestBoxerConsumer ¶ added in v1.4.5
type MetricIngestBoxerConsumer struct {
*CommonConsumer[*messages.ObjStoreNotificationMessage, messages.IngestKey]
}
MetricIngestBoxerConsumer handles metric ingestion bundling using CommonConsumer
func NewMetricIngestBoxerConsumer ¶ added in v1.4.5
func NewMetricIngestBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*MetricIngestBoxerConsumer, error)
NewMetricIngestBoxerConsumer creates a new metric ingestion boxer consumer using the common consumer framework
type MetricIngestBoxerProcessor ¶ added in v1.4.5
type MetricIngestBoxerProcessor struct {
// contains filtered or unexported fields
}
MetricIngestBoxerProcessor implements the Processor interface for boxing metric ingestion bundles
func (*MetricIngestBoxerProcessor) GetTargetRecordCount ¶ added in v1.4.5
func (b *MetricIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
GetTargetRecordCount returns the target file size limit for metric ingestion batching
func (*MetricIngestBoxerProcessor) Process ¶ added in v1.4.5
func (b *MetricIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the Processor interface and sends the bundle to the ingestion processing topic
func (*MetricIngestBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (b *MetricIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
ShouldEmitImmediately returns false - metric ingest always uses normal grouping.
type MetricIngestConsumer ¶ added in v1.3.4
type MetricIngestConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
MetricIngestConsumer handles metric ingest bundles from the work queue
func NewMetricIngestConsumer ¶ added in v1.3.4
func NewMetricIngestConsumer( ctx context.Context, cfg *config.Config, kafkaFactory *fly.Factory, store MetricIngestStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*MetricIngestConsumer, error)
NewMetricIngestConsumer creates a new metric ingest consumer that processes bundles from the work queue
type MetricIngestDuckDBResult ¶ added in v1.9.0
type MetricIngestDuckDBResult struct {
DateintBins map[int32]*DateintBinResult
TotalRows int64
// FailedPartitions tracks dateint keys that failed processing with their errors.
// Processing uses best-effort strategy: failures are recorded but don't stop
// processing of other partitions. Callers should check this to detect partial results.
FailedPartitions map[int64]error
}
MetricIngestDuckDBResult contains the results of DuckDB-based metric ingestion
type MetricIngestProcessor ¶ added in v1.3.4
type MetricIngestProcessor struct {
// contains filtered or unexported fields
}
MetricIngestProcessor implements the Processor interface for raw metric ingestion
func (*MetricIngestProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (p *MetricIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
GetTargetRecordCount returns the target file size limit (20MB) for accumulation
func (*MetricIngestProcessor) ProcessBundle ¶ added in v1.4.5
func (p *MetricIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, msgs []*messages.ObjStoreNotificationMessage, partition int32, offset int64) error
ProcessBundle implements the Processor interface and performs raw metric ingestion
func (*MetricIngestProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (p *MetricIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration
func (*MetricIngestProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *MetricIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
ShouldEmitImmediately returns false - metric ingest always uses normal grouping.
type MetricIngestStore ¶ added in v1.3.4
type MetricIngestStore interface {
workqueue.DB
InsertMetricSegmentsBatch(ctx context.Context, segments []lrdb.InsertMetricSegmentParams) error
GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
}
MetricIngestStore defines database operations needed for metric ingestion
type MetricRollupBoxerConsumer ¶ added in v1.3.5
type MetricRollupBoxerConsumer struct {
*CommonConsumer[*messages.MetricRollupMessage, messages.RollupKey]
}
MetricRollupBoxerConsumer handles metric rollup bundling using CommonConsumer
func NewMetricBoxerConsumer ¶ added in v1.3.5
func NewMetricBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*MetricRollupBoxerConsumer, error)
NewMetricBoxerConsumer creates a new metric boxer consumer using the common consumer framework
type MetricRollupBoxerProcessor ¶ added in v1.3.5
type MetricRollupBoxerProcessor struct {
// contains filtered or unexported fields
}
MetricRollupBoxerProcessor implements the Processor interface for boxing metric rollup bundles
func (*MetricRollupBoxerProcessor) GetTargetRecordCount ¶ added in v1.3.5
func (b *MetricRollupBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.RollupKey) int64
GetTargetRecordCount returns the estimated record count for the target frequency
func (*MetricRollupBoxerProcessor) Process ¶ added in v1.3.5
func (b *MetricRollupBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.RollupKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the Processor interface and sends the bundle to the rollup topic
func (*MetricRollupBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (b *MetricRollupBoxerProcessor) ShouldEmitImmediately(msg *messages.MetricRollupMessage) bool
ShouldEmitImmediately returns false - metric rollup always uses normal grouping.
type MetricRollupConsumer ¶ added in v1.3.4
type MetricRollupConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
MetricRollupConsumer handles metric rollup bundles from the work queue
func NewMetricRollupConsumer ¶ added in v1.3.4
func NewMetricRollupConsumer( ctx context.Context, cfg *config.Config, kafkaFactory *fly.Factory, store MetricRollupStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*MetricRollupConsumer, error)
NewMetricRollupConsumer creates a new metric rollup consumer that processes bundles from the work queue
type MetricRollupProcessor ¶ added in v1.3.4
type MetricRollupProcessor struct {
// contains filtered or unexported fields
}
MetricRollupProcessor implements the Processor interface for metric rollups
func (*MetricRollupProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (r *MetricRollupProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.RollupKey) int64
GetTargetRecordCount returns the target record count for a rollup grouping key
func (*MetricRollupProcessor) ProcessBundle ¶ added in v1.3.5
func (r *MetricRollupProcessor) ProcessBundle(ctx context.Context, bundle *messages.MetricRollupBundle, partition int32, offset int64) error
ProcessBundle processes a MetricRollupBundle directly (simplified interface)
func (*MetricRollupProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (r *MetricRollupProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration
func (*MetricRollupProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (r *MetricRollupProcessor) ShouldEmitImmediately(msg *messages.MetricRollupMessage) bool
ShouldEmitImmediately returns false - metric rollup always uses normal grouping.
type MetricRollupStore ¶ added in v1.3.5
type MetricRollupStore interface {
workqueue.DB
GetMetricSeg(ctx context.Context, params lrdb.GetMetricSegParams) (lrdb.MetricSeg, error)
RollupMetricSegments(ctx context.Context, sourceParams lrdb.RollupSourceParams, targetParams lrdb.RollupTargetParams, sourceSegmentIDs []int64, newRecords []lrdb.RollupNewRecord) error
KafkaOffsetsAfter(ctx context.Context, params lrdb.KafkaOffsetsAfterParams) ([]int64, error)
CleanupKafkaOffsets(ctx context.Context, params lrdb.CleanupKafkaOffsetsParams) (int64, error)
InsertKafkaOffsets(ctx context.Context, params lrdb.InsertKafkaOffsetsParams) error
GetMetricEstimate(ctx context.Context, orgID uuid.UUID, frequencyMs int32) int64
}
MetricRollupStore defines database operations needed for rollups
type OffsetTrackerStore ¶ added in v1.3.6
type OffsetTrackerStore interface {
KafkaOffsetsAfter(ctx context.Context, params lrdb.KafkaOffsetsAfterParams) ([]int64, error)
CleanupKafkaOffsets(ctx context.Context, params lrdb.CleanupKafkaOffsetsParams) (int64, error)
InsertKafkaOffsets(ctx context.Context, params lrdb.InsertKafkaOffsetsParams) error
}
OffsetTrackerStore defines the interface for sync mode offset tracking
type ParquetLogTranslatingReader ¶ added in v1.5.0
type ParquetLogTranslatingReader struct {
// contains filtered or unexported fields
}
ParquetLogTranslatingReader wraps a reader and translates parquet logs to CardinalHQ format. It properly transforms the schema to reflect row transformations (resource_ prefixing, etc.)
func NewParquetLogTranslatingReader ¶ added in v1.5.0
func NewParquetLogTranslatingReader(wrapped filereader.Reader, orgID, bucket, objectID string) *ParquetLogTranslatingReader
NewParquetLogTranslatingReader creates a reader that translates parquet logs
func (*ParquetLogTranslatingReader) Close ¶ added in v1.5.0
func (r *ParquetLogTranslatingReader) Close() error
Close closes the wrapped reader
func (*ParquetLogTranslatingReader) GetSchema ¶ added in v1.5.0
func (r *ParquetLogTranslatingReader) GetSchema() *filereader.ReaderSchema
GetSchema returns the transformed schema
func (*ParquetLogTranslatingReader) Next ¶ added in v1.5.0
func (r *ParquetLogTranslatingReader) Next(ctx context.Context) (*filereader.Batch, error)
Next reads and translates the next batch of rows
func (*ParquetLogTranslatingReader) TotalRowsReturned ¶ added in v1.5.0
func (r *ParquetLogTranslatingReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows successfully returned
type QueueWorkerConsumer ¶ added in v1.6.0
type QueueWorkerConsumer struct {
// contains filtered or unexported fields
}
QueueWorkerConsumer is a consumer that pulls work from the PostgreSQL work queue
func NewQueueWorkerConsumer ¶ added in v1.6.0
func NewQueueWorkerConsumer( manager *workqueue.Manager, processor BundleProcessor, taskName string, concurrency int, ) *QueueWorkerConsumer
NewQueueWorkerConsumer creates a new queue-based worker consumer. Concurrency defaults to runtime.NumCPU() if not specified or if 0 is passed.
func (*QueueWorkerConsumer) Close ¶ added in v1.6.0
func (c *QueueWorkerConsumer) Close() error
Close closes the consumer
type TraceCompactionBoxerConsumer ¶ added in v1.3.5
type TraceCompactionBoxerConsumer struct {
*CommonConsumer[*messages.TraceCompactionMessage, messages.TraceCompactionKey]
}
TraceCompactionBoxerConsumer handles trace compaction bundling using CommonConsumer
func NewTraceCompactionBoxerConsumer ¶ added in v1.3.5
func NewTraceCompactionBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*TraceCompactionBoxerConsumer, error)
NewTraceCompactionBoxerConsumer creates a new trace compaction boxer consumer using the common consumer framework
type TraceCompactionBoxerProcessor ¶ added in v1.3.5
type TraceCompactionBoxerProcessor struct {
// contains filtered or unexported fields
}
TraceCompactionBoxerProcessor implements the Processor interface for boxing trace compaction bundles
func (*TraceCompactionBoxerProcessor) GetTargetRecordCount ¶ added in v1.3.5
func (b *TraceCompactionBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.TraceCompactionKey) int64
GetTargetRecordCount returns the estimated record count for traces
func (*TraceCompactionBoxerProcessor) Process ¶ added in v1.3.5
func (b *TraceCompactionBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.TraceCompactionKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the Processor interface and sends the bundle to the compaction topic
func (*TraceCompactionBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (b *TraceCompactionBoxerProcessor) ShouldEmitImmediately(msg *messages.TraceCompactionMessage) bool
ShouldEmitImmediately returns false - trace compaction always uses normal grouping.
type TraceCompactionConsumer ¶ added in v1.3.4
type TraceCompactionConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
TraceCompactionConsumer handles trace compaction bundles from the work queue
func NewTraceCompactionConsumer ¶ added in v1.3.4
func NewTraceCompactionConsumer( ctx context.Context, cfg *config.Config, store TraceCompactionStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*TraceCompactionConsumer, error)
NewTraceCompactionConsumer creates a new trace compaction consumer that processes bundles from the work queue
type TraceCompactionProcessor ¶ added in v1.3.4
type TraceCompactionProcessor struct {
// contains filtered or unexported fields
}
TraceCompactionProcessor implements compaction processing for traces
func NewTraceCompactionProcessor ¶ added in v1.3.5
func NewTraceCompactionProcessor( store TraceCompactionStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, cfg *config.Config, ) *TraceCompactionProcessor
NewTraceCompactionProcessor creates a new trace compaction processor
func (*TraceCompactionProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (p *TraceCompactionProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.TraceCompactionKey) int64
GetTargetRecordCount returns the target record count for a grouping key
func (*TraceCompactionProcessor) ProcessBundle ¶ added in v1.3.5
func (p *TraceCompactionProcessor) ProcessBundle(ctx context.Context, key messages.TraceCompactionKey, msgs []*messages.TraceCompactionMessage, partition int32, offset int64) error
ProcessBundle processes a bundle of trace compaction messages directly
func (*TraceCompactionProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (p *TraceCompactionProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue processes a trace compaction bundle from the work queue
func (*TraceCompactionProcessor) ProcessWork ¶ added in v1.3.5
func (p *TraceCompactionProcessor) ProcessWork( ctx context.Context, tmpDir string, storageClient cloudstorage.Client, storageProfile storageprofile.StorageProfile, key messages.TraceCompactionKey, msgs []*messages.TraceCompactionMessage, partition int32, offset int64, ) error
ProcessWork handles the trace-specific work logic
func (*TraceCompactionProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *TraceCompactionProcessor) ShouldEmitImmediately(msg *messages.TraceCompactionMessage) bool
ShouldEmitImmediately returns false - trace compaction always uses normal grouping.
type TraceCompactionStore ¶ added in v1.3.4
type TraceCompactionStore interface {
workqueue.DB
GetTraceSeg(ctx context.Context, params lrdb.GetTraceSegParams) (lrdb.TraceSeg, error)
CompactTraceSegments(ctx context.Context, params lrdb.CompactTraceSegsParams) error
MarkTraceSegsCompactedByKeys(ctx context.Context, params lrdb.MarkTraceSegsCompactedByKeysParams) error
GetTraceEstimate(ctx context.Context, orgID uuid.UUID) int64
}
TraceCompactionStore defines database operations needed for trace compaction
type TraceDateintBin ¶ added in v1.3.4
type TraceDateintBin struct {
Dateint int32 // The dateint for this bin
Writer parquetwriter.ParquetWriter
Results []parquetwriter.Result // Results after writer is closed (can be multiple files)
}
TraceDateintBin represents a file group containing traces for a specific dateint
type TraceDateintBinManager ¶ added in v1.3.4
type TraceDateintBinManager struct {
// contains filtered or unexported fields
}
TraceDateintBinManager manages multiple file groups, one per dateint
type TraceDateintBinResult ¶ added in v1.9.1
type TraceDateintBinResult struct {
Dateint int32
OutputFile string // Local parquet file path
RecordCount int64
FileSize int64
Metadata *TraceFileMetadata
}
TraceDateintBinResult contains the result for a single dateint partition
type TraceFileMetadata ¶ added in v1.9.1
type TraceFileMetadata struct {
StartTs int64 // Inclusive start timestamp in ms
EndTs int64 // Exclusive end timestamp in ms (lastTs + 1)
Hour int16 // Hour of day for the first timestamp
Fingerprints []int64 // Span fingerprints for indexing
LabelNameMap []byte // JSON map of label columns
}
TraceFileMetadata contains metadata extracted from the trace parquet file
type TraceIDTimestampSortKey ¶ added in v1.3.4
TraceIDTimestampSortKey represents a sort key based on trace_id first, then timestamp
func (*TraceIDTimestampSortKey) Compare ¶ added in v1.3.4
func (k *TraceIDTimestampSortKey) Compare(other filereader.SortKey) int
Compare implements filereader.SortKey interface for TraceIDTimestampSortKey
func (*TraceIDTimestampSortKey) Release ¶ added in v1.3.4
func (k *TraceIDTimestampSortKey) Release()
Release returns the TraceIDTimestampSortKey to the pool for reuse
type TraceIDTimestampSortKeyProvider ¶ added in v1.3.4
type TraceIDTimestampSortKeyProvider struct{}
TraceIDTimestampSortKeyProvider creates TraceIDTimestampSortKey instances from rows
func (*TraceIDTimestampSortKeyProvider) MakeKey ¶ added in v1.3.4
func (p *TraceIDTimestampSortKeyProvider) MakeKey(row pipeline.Row) filereader.SortKey
MakeKey implements filereader.SortKeyProvider interface for trace ID + timestamp sorting
type TraceIngestBoxerConsumer ¶ added in v1.4.5
type TraceIngestBoxerConsumer struct {
*CommonConsumer[*messages.ObjStoreNotificationMessage, messages.IngestKey]
}
TraceIngestBoxerConsumer handles trace ingestion bundling using CommonConsumer
func NewTraceIngestBoxerConsumer ¶ added in v1.4.5
func NewTraceIngestBoxerConsumer( ctx context.Context, cfg *config.Config, store BoxerStore, factory *fly.Factory, ) (*TraceIngestBoxerConsumer, error)
NewTraceIngestBoxerConsumer creates a new trace ingestion boxer consumer using the common consumer framework
type TraceIngestBoxerProcessor ¶ added in v1.4.5
type TraceIngestBoxerProcessor struct {
// contains filtered or unexported fields
}
TraceIngestBoxerProcessor implements the Processor interface for boxing trace ingestion bundles
func (*TraceIngestBoxerProcessor) GetTargetRecordCount ¶ added in v1.4.5
func (b *TraceIngestBoxerProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
GetTargetRecordCount returns the target file size limit for trace ingestion batching
func (*TraceIngestBoxerProcessor) Process ¶ added in v1.4.5
func (b *TraceIngestBoxerProcessor) Process(ctx context.Context, group *accumulationGroup[messages.IngestKey], kafkaOffsets []lrdb.KafkaOffsetInfo) error
Process implements the Processor interface and sends the bundle to the ingestion processing topic
func (*TraceIngestBoxerProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (b *TraceIngestBoxerProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
ShouldEmitImmediately returns false - trace ingest always uses normal grouping.
type TraceIngestConsumer ¶ added in v1.3.4
type TraceIngestConsumer struct {
*QueueWorkerConsumer
// contains filtered or unexported fields
}
TraceIngestConsumer handles trace ingest bundles from the work queue
func NewTraceIngestConsumer ¶ added in v1.3.4
func NewTraceIngestConsumer( ctx context.Context, cfg *config.Config, kafkaFactory *fly.Factory, store TraceIngestStore, storageProvider storageprofile.StorageProfileProvider, cmgr cloudstorage.ClientProvider, ) (*TraceIngestConsumer, error)
NewTraceIngestConsumer creates a new trace ingest consumer that processes bundles from the work queue
type TraceIngestDuckDBResult ¶ added in v1.9.1
type TraceIngestDuckDBResult struct {
DateintBins map[int32]*TraceDateintBinResult
TotalRows int64
// FailedPartitions tracks dateint keys that failed processing with their errors.
FailedPartitions map[int64]error
}
TraceIngestDuckDBResult contains the results of DuckDB-based trace ingestion
type TraceIngestProcessor ¶ added in v1.3.4
type TraceIngestProcessor struct {
// contains filtered or unexported fields
}
TraceIngestProcessor implements the Processor interface for raw trace ingestion
func (*TraceIngestProcessor) GetTargetRecordCount ¶ added in v1.3.4
func (p *TraceIngestProcessor) GetTargetRecordCount(ctx context.Context, groupingKey messages.IngestKey) int64
GetTargetRecordCount returns the target file size limit (5MB) for accumulation
func (*TraceIngestProcessor) ProcessBundle ¶ added in v1.4.5
func (p *TraceIngestProcessor) ProcessBundle(ctx context.Context, key messages.IngestKey, msgs []*messages.ObjStoreNotificationMessage, partition int32, offset int64) error
ProcessBundle implements the ProcessBundle pattern for raw trace ingestion
func (*TraceIngestProcessor) ProcessBundleFromQueue ¶ added in v1.6.0
func (p *TraceIngestProcessor) ProcessBundleFromQueue(ctx context.Context, workItem workqueue.Workable) error
ProcessBundleFromQueue implements the BundleProcessor interface for work queue integration
func (*TraceIngestProcessor) ShouldEmitImmediately ¶ added in v1.7.0
func (p *TraceIngestProcessor) ShouldEmitImmediately(msg *messages.ObjStoreNotificationMessage) bool
ShouldEmitImmediately returns false - trace ingest always uses normal grouping.
type TraceIngestStore ¶ added in v1.3.4
type TraceIngestStore interface {
workqueue.DB
InsertTraceSegmentsBatch(ctx context.Context, segments []lrdb.InsertTraceSegmentParams) error
GetTraceEstimate(ctx context.Context, orgID uuid.UUID) int64
}
TraceIngestStore defines database operations needed for trace ingestion
type TraceTranslator ¶ added in v1.3.4
type TraceTranslator struct {
// contains filtered or unexported fields
}
TraceTranslator adds resource metadata to trace rows
func NewTraceTranslator ¶ added in v1.3.4
func NewTraceTranslator(orgID, bucket, objectID string) *TraceTranslator
NewTraceTranslator creates a new TraceTranslator with the specified metadata
func (*TraceTranslator) TranslateRow ¶ added in v1.3.4
TranslateRow adds resource fields to each row
Source Files
¶
- boxer_manager.go
- cleanup_helper.go
- common_consumer.go
- errors.go
- gatherer.go
- hunter.go
- interfaces.go
- log_compaction_boxer_consumer.go
- log_compaction_boxer_processor.go
- log_compaction_consumer.go
- log_compaction_processor.go
- log_ingest_boxer_consumer.go
- log_ingest_boxer_processor.go
- log_translating_reader.go
- log_translator.go
- logs_ingest_consumer.go
- logs_ingest_duckdb.go
- logs_ingest_processor.go
- metadata_tracker.go
- metric_compaction_boxer_consumer.go
- metric_compaction_boxer_processor.go
- metric_compaction_consumer.go
- metric_compaction_processor.go
- metric_ingest_boxer_consumer.go
- metric_ingest_boxer_processor.go
- metric_ingest_consumer.go
- metric_ingest_duckdb.go
- metric_ingest_processor.go
- metric_rollup_boxer_consumer.go
- metric_rollup_boxer_processor.go
- metric_rollup_consumer.go
- metric_rollup_processor.go
- offset_tracker.go
- parquet_log_translator.go
- queue_worker_consumer.go
- telemetry.go
- test_helpers.go
- trace_compaction_boxer_consumer.go
- trace_compaction_boxer_processor.go
- trace_compaction_consumer.go
- trace_compaction_processor.go
- trace_ingest_boxer_consumer.go
- trace_ingest_boxer_processor.go
- trace_ingest_consumer.go
- trace_ingest_duckdb.go
- trace_ingest_processor.go
- writer_logs.go
- writer_logs_duckdb.go
- writer_metrics_duckdb.go
- writer_traces_duckdb.go