events

package
v0.0.0-...-6ba41bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Console     = oapi.TelemetryEventCategory("console")
	Network     = oapi.TelemetryEventCategory("network")
	Page        = oapi.TelemetryEventCategory("page")
	Interaction = oapi.TelemetryEventCategory("interaction")
	Api         = oapi.TelemetryEventCategory("api")
	System      = oapi.TelemetryEventCategory("system")
)

Variables

AllCategories is the canonical list of all configurable event categories. System events are always captured regardless of telemetry config.

Functions

This section is empty.

Types

type Envelope

type Envelope struct {
	Seq   uint64 `json:"seq"`
	Event Event  `json:"event"`
}

Envelope wraps an Event with pipeline-assigned metadata.

type Event

type Event struct {
	Ts        int64                       `json:"ts"` // Unix microseconds (µs since epoch)
	Type      string                      `json:"type"`
	Category  oapi.TelemetryEventCategory `json:"category"`
	Source    oapi.BrowserEventSource     `json:"source"`
	Data      json.RawMessage             `json:"data,omitempty"`
	Truncated bool                        `json:"truncated,omitempty"`
}

Event is the portable event schema. It contains only producer-emitted content; pipeline metadata (seq) lives on the Envelope.

type EventStream

type EventStream struct {
	// contains filtered or unexported fields
}

EventStream is the process-lifetime event bus. It owns the ring buffer and sequence counter, which outlive individual capture sessions.

func NewEventStream

func NewEventStream(cfg EventStreamConfig) (*EventStream, error)

func (*EventStream) NewReader

func (es *EventStream) NewReader(afterSeq uint64) *Reader

NewReader returns a Reader positioned after afterSeq. Pass 0 to start from the oldest buffered event.

func (*EventStream) Publish

func (es *EventStream) Publish(env Envelope) Envelope

Publish assigns a monotonically increasing seq to env, truncates oversized payloads, and pushes it to the ring buffer.

func (*EventStream) Seq

func (es *EventStream) Seq() uint64

Seq returns the sequence number of the last published event.

type EventStreamConfig

type EventStreamConfig struct {
	// RingCapacity is the number of envelopes the ring buffer holds.
	RingCapacity int
}

type ReadResult

type ReadResult struct {
	Envelope *Envelope
	Dropped  uint64
}

ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is set: Envelope is non-nil for a normal read, Dropped is non-zero when the reader fell behind and events were lost.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader tracks an independent read position in a ringBuffer.

func (*Reader) Read

func (r *Reader) Read(ctx context.Context) (ReadResult, error)

Read blocks until the next envelope is available or ctx is cancelled.

func (*Reader) TryRead

func (r *Reader) TryRead() (ReadResult, bool)

TryRead returns the next available result without blocking. Returns (result, true) if data is available, (ReadResult{}, false) if the reader has caught up to the latest published seq.

type S2Config

type S2Config struct {
	// BatcherLinger is how long the batcher waits before flushing (default: 100ms).
	BatcherLinger time.Duration
	// BatcherMaxRecords is the max records per batch (default: 50).
	BatcherMaxRecords int
}

type S2StorageWriter

type S2StorageWriter struct {
	// contains filtered or unexported fields
}

S2StorageWriter reads from an EventStream and forwards each event to S2. Construct with NewS2StorageWriter, call Start to begin, Stop to drain and shut down.

func NewS2StorageWriter

func NewS2StorageWriter(es *EventStream, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) *S2StorageWriter

func (*S2StorageWriter) Start

func (w *S2StorageWriter) Start(ctx context.Context) error

Start opens the S2 append session and begins reading from the event stream. ctx governs the Run loop — cancel it (e.g. on SIGTERM) to stop reading. The session itself outlives ctx and is torn down by Stop after flushing.

func (*S2StorageWriter) Stop

func (w *S2StorageWriter) Stop(ctx context.Context) error

Stop waits for the Run goroutine to exit, drains any remaining ring events, then closes the S2 producer. ctx bounds the total shutdown time.

type Storage

type Storage interface {
	Append(ctx context.Context, env Envelope) error
	Close(ctx context.Context) error
}

type StorageWriter

type StorageWriter struct {
	// contains filtered or unexported fields
}

StorageWriter reads from the ring buffer and forwards each envelope to Storage. Single-use and not thread-safe: call Run once, then after it returns call Drain followed by Close. Reads start from the oldest available event in the ring, not the current tail. Delivery is at-least-once; consumers should dedupe by env.Seq.

func NewStorageWriter

func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *StorageWriter

NewStorageWriter creates a writer that reads from es starting at seq 0.

func (*StorageWriter) Close

func (w *StorageWriter) Close(ctx context.Context) error

Close drains in-flight writes and releases backend resources.

func (*StorageWriter) Drain

func (w *StorageWriter) Drain(ctx context.Context) error

Drain reads any events still in the ring non-blockingly until caught up or ctx expires. Call after all publishers have stopped and Run has returned to ensure no events are silently skipped on shutdown.

func (*StorageWriter) Run

func (w *StorageWriter) Run(ctx context.Context) error

Run reads from the ring buffer and appends each envelope to storage until ctx is cancelled. Returns the context error on clean shutdown. Must be called at most once; returns an error on a second call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL