ingest

package
v0.2.0-beta.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueFull = errors.New("ingest pipeline at capacity")

ErrQueueFull is returned by Submit when the queue is at hard capacity (100% full). Callers should map this to gRPC RESOURCE_EXHAUSTED or HTTP 429 with a Retry-After hint so OTLP clients back off cleanly.

Functions

func ParseSeverity

func ParseSeverity(level string) int

ParseSeverity is the exported wrapper for parseSeverity. Used by main.go to translate the STORE_MIN_SEVERITY env value into the integer rank the pipeline's second-tier filter expects.

Types

type Batch

type Batch struct {
	Type   SignalType
	Tenant string

	Traces []storage.Trace
	Spans  []storage.Span
	Logs   []storage.Log

	// Priority flags. Errors and slow traces are protected from soft
	// backpressure drops — they may still be rejected at hard capacity.
	HasError bool
	HasSlow  bool

	// Optional per-record callbacks invoked after a successful DB write.
	// In production these feed GraphRAG ingestion. Nil callbacks are
	// skipped silently.
	SpanCallback func(storage.Span)
	LogCallback  func(storage.Log)
	// contains filtered or unexported fields
}

Batch is the unit of work flowing through the async ingest Pipeline. One Batch corresponds to the persistable output of a single OTLP Export() call. Trace insertion ordering (Traces → Spans → Logs) is honored by the worker that processes the batch — packaging the three slices together preserves the FK invariant the synchronous path already enforces.

func (*Batch) Priority

func (b *Batch) Priority() bool

Priority reports whether the batch is protected from soft-backpressure drops. Used by Submit() to decide whether to enqueue at >= 90% fullness.

type HTTPHandler

type HTTPHandler struct {
	// contains filtered or unexported fields
}

HTTPHandler provides HTTP OTLP endpoints that delegate to the existing gRPC Export() methods.

func NewHTTPHandler

func NewHTTPHandler(traces *TraceServer, logs *LogsServer, metrics *MetricsServer) *HTTPHandler

NewHTTPHandler creates an HTTP OTLP handler wrapping the existing gRPC servers.

func (*HTTPHandler) RegisterRoutes

func (h *HTTPHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers the HTTP OTLP endpoints on the given mux.

func (*HTTPHandler) SetMaxBodyBytes

func (h *HTTPHandler) SetMaxBodyBytes(n int64)

SetMaxBodyBytes configures the maximum pre-decompression request body size.

func (*HTTPHandler) SetMaxDecompressedBytes

func (h *HTTPHandler) SetMaxDecompressedBytes(n int64)

SetMaxDecompressedBytes configures the maximum post-gzip body size. Rejecting larger inputs defends against compression bombs from authed clients.

func (*HTTPHandler) SetThrottleCallback

func (h *HTTPHandler) SetThrottleCallback(fn func(signal string))

SetThrottleCallback wires a per-signal counter that increments every time a 429 is returned because the async ingest pipeline is at capacity. Used by main.go to feed `otelcontext_http_otlp_throttled_total{signal=…}`.

type LogsServer

type LogsServer struct {
	collogspb.UnimplementedLogsServiceServer
	// contains filtered or unexported fields
}

func NewLogsServer

func NewLogsServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *config.Config) *LogsServer

func (*LogsServer) Export

Export handles incoming OTLP log data.

func (*LogsServer) SetLogCallback

func (s *LogsServer) SetLogCallback(cb func(storage.Log))

SetLogCallback sets the function to call when a new log is received.

func (*LogsServer) SetPipeline

func (s *LogsServer) SetPipeline(p *Pipeline)

SetPipeline enables the async ingest pipeline for log export. Same semantics as TraceServer.SetPipeline.

type MetricsServer

type MetricsServer struct {
	colmetricspb.UnimplementedMetricsServiceServer
	// contains filtered or unexported fields
}

func NewMetricsServer

func NewMetricsServer(repo *storage.Repository, metrics *telemetry.Metrics, aggregator *tsdb.Aggregator, cfg *config.Config) *MetricsServer

func (*MetricsServer) Export

Export handles incoming OTLP metrics data.

func (*MetricsServer) SetMetricCallback

func (s *MetricsServer) SetMetricCallback(cb func(tsdb.RawMetric))

SetMetricCallback sets the function to call when a new metric point is received.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline decouples OTLP Export() from synchronous DB writes. It holds a bounded buffered channel of Batches, a worker pool that drains the channel into the Repository, and Prometheus instruments that surface queue depth, drop counts, and rejection counts.

Lifecycle:

p := NewPipeline(repo, metrics, cfg)
p.Start(ctx)
defer p.Stop()       // drains in-flight before returning
p.Submit(batch)

func NewPipeline

func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg PipelineConfig) *Pipeline

NewPipeline constructs a Pipeline with the given config, falling back to DefaultPipelineConfig() values for non-positive fields. The Pipeline does NOT start workers — call Start(ctx) when ready.

func (*Pipeline) SetPerTenantCap

func (p *Pipeline) SetPerTenantCap(n int)

SetPerTenantCap configures the maximum in-flight batches one tenant may hold in the queue (and currently being processed). 0 disables the cap. Once a tenant hits the cap, further healthy submissions from that tenant are dropped at Submit() time with reason "tenant_backpressure". Priority batches (errors/slow traces) bypass the cap.

Sized as a fraction of Capacity, e.g. Capacity/4 keeps any single tenant to 25% of queue capacity. Operators tune via INGEST_PIPELINE_PER_TENANT_CAP. Startup-only — call before Start().

func (*Pipeline) SetStoreMinSeverity

func (p *Pipeline) SetStoreMinSeverity(level int)

SetStoreMinSeverity configures the second-tier severity gate applied at persist time. Logs below `level` are dropped from the BatchCreateAll write but still flow through the LogCallback so in-memory consumers (vectordb, GraphRAG Drain mining, anomaly correlation) keep working. 0 disables the second tier — every log surviving IngestMinSeverity at the receiver is also persisted (legacy behavior).

`level` is the integer rank from parseSeverity ("DEBUG"=10 .. "FATAL"=50). Startup-only — call before Start().

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context)

Start spawns the worker pool. Safe to call once. Subsequent calls are no-ops; tests rely on this for reset semantics.

func (*Pipeline) Stats

func (p *Pipeline) Stats() PipelineStats

Stats returns snapshot counters for tests and for telemetry that doesn't already use Prometheus instruments. The values are best-effort and not synchronized across atomics — sufficient for diagnostics.

func (*Pipeline) Stop

func (p *Pipeline) Stop()

Stop signals workers to exit and blocks until in-flight batches have been drained from the channel. Idempotent.

func (*Pipeline) Submit

func (p *Pipeline) Submit(b *Batch) error

Submit enqueues a batch for asynchronous persistence. Returns nil when the batch is accepted (or silently dropped under soft backpressure) and ErrQueueFull when the queue is at hard capacity. Nil batches are no-ops.

Soft backpressure: when fullness >= SoftThreshold, healthy batches (Priority()==false) are dropped at the door and Submit returns nil so the OTLP client sees a successful Export. Errors and slow traces always continue to the channel.

Hard backpressure: when the channel send fails (buffer at 100%), Submit returns ErrQueueFull regardless of priority. The caller should translate this into a backpressure signal so the client retries with exponential backoff rather than tighter loops.

func (*Pipeline) TenantDropped

func (p *Pipeline) TenantDropped() int64

TenantDropped reports the cumulative number of healthy submissions rejected because the submitting tenant was at the per-tenant cap. Distinct from RejectedFull (queue at hard capacity) and DroppedHealthy (soft-backpressure across the whole queue).

type PipelineConfig

type PipelineConfig struct {
	Capacity      int     // total queue depth across all signal types
	Workers       int     // worker goroutines draining the queue
	SoftThreshold float64 // fullness fraction above which healthy batches are dropped (0.0–1.0)
}

PipelineConfig holds the tunables for a Pipeline.

func DefaultPipelineConfig

func DefaultPipelineConfig() PipelineConfig

DefaultPipelineConfig returns production-sized defaults.

type PipelineStats

type PipelineStats struct {
	Enqueued        int64
	Processed       int64
	DroppedHealthy  int64
	RejectedFull    int64
	ProcessFailures int64
	StoreFiltered   int64 // logs dropped by STORE_MIN_SEVERITY at persist time
	QueueDepth      int
	Capacity        int
}

PipelineStats is a snapshot of pipeline counters.

type Sampler

type Sampler struct {
	// contains filtered or unexported fields
}

Sampler decides whether a trace/span should be ingested. Always keeps: error traces, slow traces (duration > latencyThresholdMs), new services. Samples healthy traces at the configured rate using a per-service token bucket.

func NewSampler

func NewSampler(rate float64, alwaysOnErrors bool, latencyThresholdMs float64) *Sampler

NewSampler creates a Sampler with the given parameters.

func (*Sampler) ShouldSample

func (s *Sampler) ShouldSample(serviceName string, isError bool, durationMs float64) bool

ShouldSample returns true if the trace should be ingested. isError: whether the trace/span has error status. durationMs: trace duration in milliseconds. serviceName: originating service.

func (*Sampler) Stats

func (s *Sampler) Stats() (int64, int64)

Stats returns (seen, dropped) counters for metrics.

type SignalType

type SignalType uint8

SignalType identifies the OTLP signal a Batch carries. The label is exported on pipeline metrics so operators can attribute drops.

const (
	SignalTraces SignalType = iota
	SignalLogs
)

type TraceServer

type TraceServer struct {
	coltracepb.UnimplementedTraceServiceServer
	// contains filtered or unexported fields
}

func NewTraceServer

func NewTraceServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *config.Config) *TraceServer

func (*TraceServer) Export

Export handles incoming OTLP trace data.

func (*TraceServer) SetLogCallback

func (s *TraceServer) SetLogCallback(cb func(storage.Log))

SetLogCallback sets the function to call when a new log is synthesized from a trace.

func (*TraceServer) SetPipeline

func (s *TraceServer) SetPipeline(p *Pipeline)

SetPipeline enables the async ingest pipeline. When set, Export() returns to the caller as soon as the parsed batch is enqueued (or rejected), and persistence runs on the pipeline's worker pool. Pass nil to revert to the synchronous DB-write path.

func (*TraceServer) SetSampler

func (s *TraceServer) SetSampler(sm *Sampler)

SetSampler enables adaptive trace sampling. Pass nil to disable.

func (*TraceServer) SetSpanCallback

func (s *TraceServer) SetSpanCallback(cb func(storage.Span))

SetSpanCallback sets the function to call when spans are persisted.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL