Documentation
¶
Index ¶
- Variables
- type Batch
- type HTTPHandler
- type LogsServer
- type MetricsServer
- type Pipeline
- type PipelineConfig
- type PipelineStats
- type Sampler
- type SignalType
- type TraceServer
- func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error)
- func (s *TraceServer) SetLogCallback(cb func(storage.Log))
- func (s *TraceServer) SetPipeline(p *Pipeline)
- func (s *TraceServer) SetSampler(sm *Sampler)
- func (s *TraceServer) SetSpanCallback(cb func(storage.Span))
Constants ¶
This section is empty.
Variables ¶
var ErrQueueFull = errors.New("ingest pipeline at capacity")
ErrQueueFull is returned by Submit when the queue is at hard capacity (100% full). Callers should map this to gRPC RESOURCE_EXHAUSTED or HTTP 429 with a Retry-After hint so OTLP clients back off cleanly.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
Type SignalType
Tenant string
Traces []storage.Trace
Spans []storage.Span
Logs []storage.Log
// Priority flags. Errors and slow traces are protected from soft
// backpressure drops — they may still be rejected at hard capacity.
HasError bool
HasSlow bool
// Optional per-record callbacks invoked after a successful DB write.
// In production these feed GraphRAG ingestion. Nil callbacks are
// skipped silently.
SpanCallback func(storage.Span)
LogCallback func(storage.Log)
// contains filtered or unexported fields
}
Batch is the unit of work flowing through the async ingest Pipeline. One Batch corresponds to the persistable output of a single OTLP Export() call. Trace insertion ordering (Traces → Spans → Logs) is honored by the worker that processes the batch — packaging the three slices together preserves the FK invariant the synchronous path already enforces.
type HTTPHandler ¶
type HTTPHandler struct {
// contains filtered or unexported fields
}
HTTPHandler provides HTTP OTLP endpoints that delegate to the existing gRPC Export() methods.
func NewHTTPHandler ¶
func NewHTTPHandler(traces *TraceServer, logs *LogsServer, metrics *MetricsServer) *HTTPHandler
NewHTTPHandler creates an HTTP OTLP handler wrapping the existing gRPC servers.
func (*HTTPHandler) RegisterRoutes ¶
func (h *HTTPHandler) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers the HTTP OTLP endpoints on the given mux.
func (*HTTPHandler) SetMaxBodyBytes ¶
func (h *HTTPHandler) SetMaxBodyBytes(n int64)
SetMaxBodyBytes configures the maximum pre-decompression request body size.
func (*HTTPHandler) SetMaxDecompressedBytes ¶
func (h *HTTPHandler) SetMaxDecompressedBytes(n int64)
SetMaxDecompressedBytes configures the maximum post-gzip body size. Rejecting larger inputs defends against compression bombs from authed clients.
func (*HTTPHandler) SetThrottleCallback ¶
func (h *HTTPHandler) SetThrottleCallback(fn func(signal string))
SetThrottleCallback wires a per-signal counter that increments every time a 429 is returned because the async ingest pipeline is at capacity. Used by main.go to feed `otelcontext_http_otlp_throttled_total{signal=…}`.
type LogsServer ¶
type LogsServer struct {
collogspb.UnimplementedLogsServiceServer
// contains filtered or unexported fields
}
func NewLogsServer ¶
func NewLogsServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *config.Config) *LogsServer
func (*LogsServer) Export ¶
func (s *LogsServer) Export(ctx context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error)
Export handles incoming OTLP log data.
func (*LogsServer) SetLogCallback ¶
func (s *LogsServer) SetLogCallback(cb func(storage.Log))
SetLogCallback sets the function to call when a new log is received.
func (*LogsServer) SetPipeline ¶
func (s *LogsServer) SetPipeline(p *Pipeline)
SetPipeline enables the async ingest pipeline for log export. Same semantics as TraceServer.SetPipeline.
type MetricsServer ¶
type MetricsServer struct {
colmetricspb.UnimplementedMetricsServiceServer
// contains filtered or unexported fields
}
func NewMetricsServer ¶
func NewMetricsServer(repo *storage.Repository, metrics *telemetry.Metrics, aggregator *tsdb.Aggregator, cfg *config.Config) *MetricsServer
func (*MetricsServer) Export ¶
func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetricsServiceRequest) (*colmetricspb.ExportMetricsServiceResponse, error)
Export handles incoming OTLP metrics data.
func (*MetricsServer) SetMetricCallback ¶
func (s *MetricsServer) SetMetricCallback(cb func(tsdb.RawMetric))
SetMetricCallback sets the function to call when a new metric point is received.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline decouples OTLP Export() from synchronous DB writes. It holds a bounded buffered channel of Batches, a worker pool that drains the channel into the Repository, and Prometheus instruments that surface queue depth, drop counts, and rejection counts.
Lifecycle:
p := NewPipeline(repo, metrics, cfg) p.Start(ctx) defer p.Stop() // drains in-flight before returning p.Submit(batch)
func NewPipeline ¶
func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg PipelineConfig) *Pipeline
NewPipeline constructs a Pipeline with the given config, falling back to DefaultPipelineConfig() values for non-positive fields. The Pipeline does NOT start workers — call Start(ctx) when ready.
func (*Pipeline) SetPerTenantCap ¶
SetPerTenantCap configures the maximum in-flight batches one tenant may hold in the queue (and currently being processed). 0 disables the cap. Once a tenant hits the cap, further healthy submissions from that tenant are dropped at Submit() time with reason "tenant_backpressure". Priority batches (errors/slow traces) bypass the cap.
Sized as a fraction of Capacity, e.g. Capacity/4 keeps any single tenant to 25% of queue capacity. Operators tune via INGEST_PIPELINE_PER_TENANT_CAP. Startup-only — call before Start().
func (*Pipeline) Start ¶
Start spawns the worker pool. Safe to call once. Subsequent calls are no-ops; tests rely on this for reset semantics.
func (*Pipeline) Stats ¶
func (p *Pipeline) Stats() PipelineStats
Stats returns snapshot counters for tests and for telemetry that doesn't already use Prometheus instruments. The values are best-effort and not synchronized across atomics — sufficient for diagnostics.
func (*Pipeline) Stop ¶
func (p *Pipeline) Stop()
Stop signals workers to exit and blocks until in-flight batches have been drained from the channel. Idempotent.
func (*Pipeline) Submit ¶
Submit enqueues a batch for asynchronous persistence. Returns nil when the batch is accepted (or silently dropped under soft backpressure) and ErrQueueFull when the queue is at hard capacity. Nil batches are no-ops.
Soft backpressure: when fullness >= SoftThreshold, healthy batches (Priority()==false) are dropped at the door and Submit returns nil so the OTLP client sees a successful Export. Errors and slow traces always continue to the channel.
Hard backpressure: when the channel send fails (buffer at 100%), Submit returns ErrQueueFull regardless of priority. The caller should translate this into a backpressure signal so the client retries with exponential backoff rather than tighter loops.
func (*Pipeline) TenantDropped ¶
TenantDropped reports the cumulative number of healthy submissions rejected because the submitting tenant was at the per-tenant cap. Distinct from RejectedFull (queue at hard capacity) and DroppedHealthy (soft-backpressure across the whole queue).
type PipelineConfig ¶
type PipelineConfig struct {
Capacity int // total queue depth across all signal types
Workers int // worker goroutines draining the queue
SoftThreshold float64 // fullness fraction above which healthy batches are dropped (0.0–1.0)
}
PipelineConfig holds the tunables for a Pipeline.
func DefaultPipelineConfig ¶
func DefaultPipelineConfig() PipelineConfig
DefaultPipelineConfig returns production-sized defaults.
type PipelineStats ¶
type PipelineStats struct {
Enqueued int64
Processed int64
DroppedHealthy int64
RejectedFull int64
ProcessFailures int64
QueueDepth int
Capacity int
}
PipelineStats is a snapshot of pipeline counters.
type Sampler ¶
type Sampler struct {
// contains filtered or unexported fields
}
Sampler decides whether a trace/span should be ingested. Always keeps: error traces, slow traces (duration > latencyThresholdMs), new services. Samples healthy traces at the configured rate using a per-service token bucket.
func NewSampler ¶
NewSampler creates a Sampler with the given parameters.
func (*Sampler) ShouldSample ¶
ShouldSample returns true if the trace should be ingested. isError: whether the trace/span has error status. durationMs: trace duration in milliseconds. serviceName: originating service.
type SignalType ¶
type SignalType uint8
SignalType identifies the OTLP signal a Batch carries. The label is exported on pipeline metrics so operators can attribute drops.
const ( SignalTraces SignalType = iota SignalLogs )
type TraceServer ¶
type TraceServer struct {
coltracepb.UnimplementedTraceServiceServer
// contains filtered or unexported fields
}
func NewTraceServer ¶
func NewTraceServer(repo *storage.Repository, metrics *telemetry.Metrics, cfg *config.Config) *TraceServer
func (*TraceServer) Export ¶
func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error)
Export handles incoming OTLP trace data.
func (*TraceServer) SetLogCallback ¶
func (s *TraceServer) SetLogCallback(cb func(storage.Log))
SetLogCallback sets the function to call when a new log is synthesized from a trace.
func (*TraceServer) SetPipeline ¶
func (s *TraceServer) SetPipeline(p *Pipeline)
SetPipeline enables the async ingest pipeline. When set, Export() returns to the caller as soon as the parsed batch is enqueued (or rejected), and persistence runs on the pipeline's worker pool. Pass nil to revert to the synchronous DB-write path.
func (*TraceServer) SetSampler ¶
func (s *TraceServer) SetSampler(sm *Sampler)
SetSampler enables adaptive trace sampling. Pass nil to disable.
func (*TraceServer) SetSpanCallback ¶
func (s *TraceServer) SetSpanCallback(cb func(storage.Span))
SetSpanCallback sets the function to call when spans are persisted.