ingest

package
v0.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler wires the OTLP ingest HTTP endpoints to the Writer.

func NewHandler

func NewHandler(w *Writer, log *slog.Logger) *Handler

func (*Handler) Mount

func (h *Handler) Mount(mux *http.ServeMux)

Mount registers POST /v1/{traces,logs,metrics} on the given mux.

func (*Handler) WithTee added in v0.10.0

func (h *Handler) WithTee(t *Tee) *Handler

WithTee attaches a log-mirror sink. Pass nil to keep tee disabled. Chainable so the caller can write `NewHandler(...).WithTee(tee)`.

type Tee added in v0.10.0

type Tee struct {
	// contains filtered or unexported fields
}

Tee mirrors incoming log Events to an io.Writer in a human-readable format so a user can `tail -f` / `less +F` them from a shell alongside the Waggle UI. It is strictly a passthrough view of the ingest stream; the Store is still authoritative, and Tee never blocks or errors out the ingest path — write failures are logged once and the rest of the batch continues.

Not used on spans or metrics. Only log records (signal_type='log') flow through here; the handler calls us from handleLogs only.

func NewTee added in v0.10.0

func NewTee(cfg TeeConfig) (*Tee, error)

NewTee opens the configured sink. When cfg.Path is empty, NewTee returns (nil, nil) — a nil *Tee is a no-op and callers should guard.

func (*Tee) Close added in v0.10.0

func (t *Tee) Close() error

Close flushes any buffered output and, for file-backed tees, closes the underlying file. Idempotent. Safe to call even if NewTee returned a nil *Tee.

func (*Tee) WriteBatch added in v0.10.0

func (t *Tee) WriteBatch(b store.Batch)

WriteBatch renders each log Event in the batch that passes the service + severity filter to the sink, then flushes the buffer. Safe to call from multiple goroutines.

type TeeConfig added in v0.10.0

type TeeConfig struct {
	Path     string   // file path, "-" for stdout, empty for disabled
	Services []string // allow-list of service.name; empty = all
	MinSev   int32    // SeverityNumber floor (inclusive); 0 = no floor
	Format   string   // "console" | "logfmt" | "json"
	Color    string   // "auto" (default) | "always" | "never"
}

TeeConfig controls log-mirror output. Zero value = disabled.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer drains Batches from a buffered channel and writes them to the Store inside a single transaction each. One Writer should exist per process.

func NewWriter

func NewWriter(s store.Store, log *slog.Logger, cfg WriterConfig) *Writer

func (*Writer) DroppedCount

func (w *Writer) DroppedCount() int64

DroppedCount returns the number of batches rejected due to a full buffer.

func (*Writer) Enqueue

func (w *Writer) Enqueue(b store.Batch) bool

Enqueue attempts to send a batch. Returns false if the buffer is full; the caller should respond with 503 + Retry-After so the OTLP sender retries.

func (*Writer) Start

func (w *Writer) Start(ctx context.Context)

Start launches the drain goroutine. Call Stop to cleanly shut it down.

func (*Writer) Stop

func (w *Writer) Stop(ctx context.Context) error

Stop closes the channel, drains outstanding batches, and waits.

type WriterConfig

type WriterConfig struct {
	BufferSize  int           // how many batches we buffer before the HTTP handler blocks
	FlushEvents int           // commit when accumulated events reach this
	FlushEvery  time.Duration // or when this time has elapsed since first item
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL