snowplow

package
v2.20.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package snowplow provides a lightweight Snowplow event tracking implementation for Go applications.

The package enables asynchronous event collection and batch sending to Snowplow collectors with automatic retry logic and FIFO queue management to prevent event stagnation.

Basic Usage

Create an emitter to handle event sending:

emitter, err := snowplow.NewEmitter("https://collector.example.com/com.snowplowanalytics.snowplow/tp2")
if err != nil {
	log.Fatal(err)
}

Create a tracker to generate events:

tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
	log.Fatal(err)
}
defer tracker.Stop()

Track custom events with or without properties:

// Simple event without properties
tracker.TrackEvent("user_login", nil)

// Event with properties
tracker.TrackEvent("button_clicked", map[string]any{
	"button_id": "submit",
	"page":      "/checkout",
})

// Event with nested properties
tracker.TrackEvent("page_view", map[string]any{
	"title": "Product Page",
	"user": map[string]any{
		"id":   12345,
		"role": "customer",
	},
})

Custom Contexts

To attach typed custom contexts to a custom event, use TrackEventWithContexts. Custom contexts populate the Snowplow `cx` field and let analysts query their fields as first-class warehouse columns instead of parsing them out of the freeform props bag.

The package ships with GitLabStandardContext, a typed payload for the gitlab_standard iglu schema with typed enums (Realm, DeploymentType) and a sealed UserID variant. Service-owned contexts can be defined by implementing the EventContext interface:

// Universal identity / environment fields shared across GitLab services.
standard := snowplow.GitLabStandardContext{
	Environment:    "production",
	OrganizationID: 12345,
	Realm:          snowplow.RealmSaaS,
	InstanceID:     "gitlab-rails-1",
}

// Service-specific context (defined by the caller). Any type satisfying
// the EventContext interface — Schema() string and Data() any — can be
// passed; for example:
//
//   type ArtifactRegistryEventContext struct {
//       Format string `json:"format"`
//       Kind   string `json:"kind"`
//   }
//
//   func (c ArtifactRegistryEventContext) Schema() string {
//       return "iglu:com.gitlab/artifact_registry_event/jsonschema/1-0-0"
//   }
//   func (c ArtifactRegistryEventContext) Data() any { return c }
registryCtx := ArtifactRegistryEventContext{
	Format: "container",
	Kind:   "local",
}

tracker.TrackEventWithContexts(
	"artifact_registry_artifact_pushed",
	nil,
	standard,
	registryCtx,
)

Contexts may optionally implement `Validate() error`. If present, the tracker calls it before queuing the event; a validation failure aborts the event with the returned error. GitLabStandardContext.Validate covers the schema constraints: required Environment, Realm/DeploymentType enums, UserID variant, string maxLengths, and integer ranges (0..2^31-1).

TrackEvent is equivalent to TrackEventWithContexts with zero contexts and remains supported for backward compatibility.

Billing Events

Track billing events for GitLab's billable usage tracking. Billing events use the billable_usage schema (iglu:com.gitlab/billable_usage/jsonschema/1-0-2) and are sent as Snowplow structured events with billing context attached.

// Minimal billing event
tracker.TrackBillingEvent(
	BillingEventRequiredInput{
		Category:      "CodeSuggestionsService",
		EventType:     "code_completions",
		Realm:         "SaaS",
		UnitOfMeasure: "tokens",
		Quantity:      150.0,
	}
)

// Billing event with full context
tracker.TrackBillingEventWithOptionalInput(
	BillingEventRequiredInput{
		Category:      "CodeSuggestionsService",
		EventType:     "code_completions",
		Realm:         "SaaS",
		UnitOfMeasure: "tokens",
		Quantity:      150.0,
	},
	BillingEventOptionalInput{
		InstanceID:      "instance-abc123",
		ProjectID:       12345,
		NamespaceID:     67890,
		RootNamespaceID: 22222,
		OrganizationID:  11111,
		Subject:         "user-456",
		GlobalUserID:    "global-user-789",
		CorrelationID:   "request-correlation-xyz",
		DeploymentType:  "self-managed",
		EntityID:        "entity-abc",
		Metadata: map[string]any{
			"model": "claude-3",
		},
	},
)

Emitter Options

The emitter can be configured using functional options:

emitter, err := snowplow.NewEmitter(
	"https://collector.example.com/com.snowplowanalytics.snowplow/tp2",
	snowplow.WithCallback(func(successes, failures []snowplow.SendResult) {
		for _, s := range successes {
			log.Printf("Sent %d events (status: %d)", s.EventCount, s.StatusCode)
		}
		for _, f := range failures {
			if f.Err != nil {
				log.Printf("Failed to send %d events: %v", f.EventCount, f.Err)
			} else {
				log.Printf("Failed to send %d events (status: %d)", f.EventCount, f.StatusCode)
			}
		}
	}),
)

Available options:

WithCallback: Sets a callback function that is invoked after each send loop iteration with aggregated success and failure results. The callback receives two slices of SendResult: successes (HTTP 2xx/3xx responses) and failures (HTTP 4xx/5xx responses or local errors). Each SendResult contains EventCount (number of events in the batch), StatusCode (HTTP status or -1 for local errors), and Err (non-nil for local errors like network failures or marshal errors).

Metrics and Observability

The emitter automatically collects Prometheus metrics tracking event flow and performance. Metrics are accessible via Emitter.Collector(), which returns a prometheus.Collector that can be registered with any Prometheus registry.

Available metrics include:

  • Event enqueue counts
  • Send success/failure counts
  • Batch delivery duration (histogram)
  • Queue depth and capacity
  • Overflow drop counts

Example:

collector := emitter.Collector()
// Register with Prometheus
prometheus.MustRegister(collector)

For detailed metric descriptions, see the metrics package documentation.

Architecture

The implementation consists of three main components:

Tracker: High-level API for tracking events. Generates Snowplow protocol-compliant payloads with unique IDs and timestamps, and encodes custom event data as base64-encoded self-describing JSON.

Emitter: Handles asynchronous batch sending of events to Snowplow collectors. Events are buffered in memory and sent in batches of up to 100 events. Failed sends are automatically retried by re-queueing events. Uses context cancellation for graceful shutdown.

Storage: Thread-safe in-memory event storage using a FIFO queue. Events are retrieved in order (oldest first) to prevent event stagnation. Implements automatic capacity management with a maximum size of 10,000 events.

Event Processing

Events flow through the system as follows: Tracker.TrackEvent creates a Snowplow protocol payload, Emitter.AddAsync stores the event in the FIFO queue, Emitter automatically batches and sends events (up to 100 per batch), on failure events are re-queued to the back of the queue, and FIFO ordering ensures old events eventually get processed.

Lifecycle Management

The emitter provides lifecycle management methods:

Flush: Triggers immediate sending of buffered events (non-blocking).

Stop: Cancels in-flight requests and waits for completion (blocking). After Stop is called, TrackEvent will return an error (ErrEmitterStopped).

Always call Stop before application shutdown:

tracker.Stop()

Event Structure

Custom events (TrackEvent) are sent as self-describing events with standard Snowplow fields: aid (application ID), dtm (device timestamp), e (event type), eid (event ID), p (platform), stm (sent timestamp), tna (namespace), tv (tracker version), ua (user agent), and ue_px (base64-encoded self-describing JSON payload).

Billing events (TrackBillingEvent, TrackBillingEventWithOptionalInput) are sent as structured events with fields: se_ca (category), se_ac (action/event type), se_la (label/event ID), se_va (value/quantity), and cx (base64-encoded billing context). The billing context follows the billable_usage schema with required fields: event_id, event_type, realm, unit_of_measure, quantity, and timestamp.

Implementation Details

Events are sent asynchronously in batches (100 events per batch) to minimize overhead. Failed sends automatically retry by re-queueing events to the back of the queue. FIFO queue ordering prevents event stagnation (oldest events are sent first). Storage capacity is limited to 10,000 events; when full, oldest events are dropped. Storage is in-memory only (events are lost on process restart). HTTP client uses connection pooling with a 5 second timeout.

Thread Safety

All public APIs are thread-safe and can be called concurrently from multiple goroutines. The storage implementation uses mutex synchronization to protect internal state.

Example

Example demonstrates basic usage of the Snowplow event tracker.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Initialize emitter with collector URL.
	// The emitter handles automatic batching (5 events per batch) and retry logic.
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Create tracker with application ID.
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a simple event
	err = tracker.TrackEvent("user_login", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	// Stop tracker before shutdown
	tracker.Stop()
}

Index

Examples

Constants

View Source
const (
	// SchemaCustomEvent points to the GitLab custom event schema
	// https://gitlab.com/gitlab-org/iglu/-/blob/master/public/schemas/com.gitlab/custom_event/jsonschema/1-0-0
	SchemaCustomEvent = "iglu:com.gitlab/custom_event/jsonschema/1-0-0"

	// SchemaPayloadData points to the schema for event envelop (top level wrapper for event payload)
	// https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4
	SchemaPayloadData = "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4"

	// SchemaSelfDescribedEvent points to the schema for self-describing event (event includes link to a schema to describe the payloads structure)
	// https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0
	SchemaSelfDescribedEvent = "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0"

	// SchemaContexts points to the schema for contexts wrapper (array of context entities)
	// https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1
	SchemaContexts = "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"

	// SchemaBillableUsage points to GitLab billable usage context schema
	// https://gitlab.com/gitlab-org/iglu/-/tree/master/public/schemas/com.gitlab/billable_usage
	SchemaBillableUsage = "iglu:com.gitlab/billable_usage/jsonschema/1-0-2"

	// SchemaGitLabStandard points to the GitLab standard context schema. This
	// is the same context the Rails monolith attaches to every Snowplow event,
	// providing universal identity and environment fields (organization_id,
	// realm, instance_id, etc.) that the Snowflake data warehouse models are
	// built to query.
	// https://gitlab.com/gitlab-org/iglu/-/tree/master/public/schemas/com.gitlab/gitlab_standard
	SchemaGitLabStandard = "iglu:com.gitlab/gitlab_standard/jsonschema/1-1-8"
)

Variables

View Source
var ErrEmitterStopped = errors.New("emitter is stopped")

ErrEmitterStopped is returned when AddAsync is called on a stopped emitter.

Functions

This section is empty.

Types

type BillingEventOptionalInput

type BillingEventOptionalInput struct {
	InstanceID       string         `json:"instance_id,omitempty"`
	InstanceVersion  string         `json:"instance_version,omitempty"`
	UniqueInstanceID string         `json:"unique_instance_id,omitempty"`
	HostName         string         `json:"host_name,omitempty"`
	ProjectID        int64          `json:"project_id,omitempty"`
	NamespaceID      int64          `json:"namespace_id,omitempty"`
	OrganizationID   int64          `json:"organization_id,omitempty"`
	RootNamespaceID  int64          `json:"root_namespace_id,omitempty"`
	Subject          string         `json:"subject,omitempty"`
	GlobalUserID     string         `json:"global_user_id,omitempty"`
	SeatIDs          []string       `json:"seat_ids,omitempty"`
	CorrelationID    string         `json:"correlation_id,omitempty"`
	DeploymentType   string         `json:"deployment_type,omitempty"`
	Assignments      []string       `json:"assignments,omitempty"`
	EntityID         string         `json:"entity_id,omitempty"`
	Metadata         map[string]any `json:"metadata,omitempty"`
}

BillingEventOptionalInput is a struct containing fields that are optional when approaching to track a billing event. To set them the caller must use TrackBillingEventWithOptionalInput() instead of TrackBillingEvent().

All fields are directly translated to the IGLU billable schema. Exact URL of the schema is defined with the SchemaBillableUsage const.

nolint:tagliatelle

type BillingEventRequiredInput

type BillingEventRequiredInput struct {
	// Category is not part of the billing event schema, but it's used
	// to fill the value expected by Snowplow in the event envelope
	Category string `json:"-"`

	// EventType defines the type name of the event that should be agreed
	// with the Billing Platform
	EventType string `json:"event_type"`

	Realm         string  `json:"realm"`
	UnitOfMeasure string  `json:"unit_of_measure"`
	Quantity      float64 `json:"quantity"`
}

BillingEventRequiredInput is a struct containing fields that must be set when approaching to track a billing event through TrackBillingEvent() or TrackBillingEventWithOptionalInput().

All fields aside of Category are directly translated to the IGLU billable schema. Exact URL of the schema is defined with the SchemaBillableUsage const.

nolint:tagliatelle

type DeploymentType added in v2.18.0

type DeploymentType string

DeploymentType identifies the GitLab deployment model. Intended to replace Realm in a future schema revision. Use the DeploymentType* constants.

const (
	DeploymentTypeSelfManaged DeploymentType = "self-managed"
	DeploymentTypeCom         DeploymentType = ".com"
	DeploymentTypeDedicated   DeploymentType = "dedicated"
)

DeploymentType values from the gitlab_standard iglu schema (see SchemaGitLabStandard).

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter handles asynchronous batch sending of events to a Snowplow collector.

It runs a single background goroutine that waits for flush signals and drains the event storage in batches. The flush channel (capacity 1) naturally coalesces multiple signals, so concurrent AddAsync calls never spawn extra goroutines.

func NewEmitter

func NewEmitter(collectorURL string, opts ...EmitterOption) (*Emitter, error)

NewEmitter creates a new Emitter with the given collector URL. Returns an error if the collector URL is invalid.

Example

ExampleNewEmitter demonstrates how to create a new Snowplow emitter.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Create an emitter that sends events to a Snowplow collector.
	// The emitter handles automatic batching and retry logic.
	// Events are stored in a FIFO queue to prevent stagnation.
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Use the emitter with a tracker
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("app_started", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Emitter) AddAsync

func (e *Emitter) AddAsync(event Event) error

AddAsync adds an event to the buffer and triggers asynchronous sending. Returns ErrEmitterStopped if the emitter has been stopped.

func (*Emitter) Collector

func (e *Emitter) Collector() prometheus.Collector

Collector returns the internal prometheus.Collector providing details about Emitter's state and performance.

Example

ExampleEmitter_Collector demonstrates how to access and use the metrics collector from an emitter for Prometheus monitoring.

package main

import (
	"log"

	"github.com/prometheus/client_golang/prometheus"
	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Create an emitter as usual
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Access the Prometheus collector for metrics export
	collector := emitter.Collector()

	// The collector can be registered with a Prometheus registry.
	// Registry can be next exposed with HTTP for Prometheus to
	// scrape the metrics.
	registry := prometheus.NewRegistry()
	registry.MustRegister(collector)

	// Use the emitter normally; metrics are collected automatically
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("page_view", map[string]any{"page": "/home"})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Emitter) Flush

func (e *Emitter) Flush()

Flush triggers sending of any buffered events. This is a non-blocking operation that signals the send loop if not already signaled.

func (*Emitter) Stop

func (e *Emitter) Stop()

Stop initiates graceful shutdown of the emitter and blocks until complete. It cancels the context (which aborts in-flight HTTP requests and causes the send loop to exit), then waits for the send loop goroutine to finish. After Stop returns, the emitter will not accept new events. Stop is safe to call multiple times.

type EmitterOption

type EmitterOption func(*emitterConfig)

EmitterOption configures an Emitter.

func WithCallback

func WithCallback(cb SendCallback) EmitterOption

WithCallback sets a callback for send results.

The callback is invoked synchronously within the send goroutine after each send loop iteration. Callbacks should be non-blocking; otherwise, they will stall event sending.

Example

ExampleWithCallback demonstrates using a callback to handle send results.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Create an emitter with a callback to handle success and failure results.
	// The callback is invoked once per send loop with aggregated results.
	emitter, err := snowplow.NewEmitter(
		"http://localhost:9093/com.snowplowanalytics.snowplow/tp2",
		snowplow.WithCallback(func(successes, failures []snowplow.SendResult) {
			for _, s := range successes {
				log.Printf("Successfully sent %d events (status: %d)", s.EventCount, s.StatusCode)
			}
			for _, f := range failures {
				if f.Err != nil {
					log.Printf("Failed to send %d events: %v", f.EventCount, f.Err)
				} else {
					log.Printf("Failed to send %d events (status: %d)", f.EventCount, f.StatusCode)
				}
			}
		}),
	)
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("user_action", map[string]any{"action": "click"})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func WithMetricsCollectorOptions

func WithMetricsCollectorOptions(opts ...metrics.CollectorOption) EmitterOption

WithMetricsCollectorOptions passes customization options to the Collector.

Collector is always created for the Emitter to tracks its state and performance.

func WithTokenSource

func WithTokenSource(ts TokenSource) EmitterOption

WithTokenSource sets a token source to authenticate requests to the API gateway.

type Event

type Event struct {
	ID             string    `json:"eid"`
	Type           eventType `json:"e"`
	AppID          string    `json:"aid"`
	TrackerVersion string    `json:"tv"`
	UserAgent      string    `json:"ua"`
	Platform       string    `json:"p"`
	Namespace      string    `json:"tna"`
	Timestamp      string    `json:"dtm"`
	SentTimestamp  string    `json:"stm,omitempty"`

	SelfDescribedEncoded string `json:"ue_px,omitempty"`

	ContextEncoded string `json:"cx,omitempty"`

	StructuredCategory string `json:"se_ca,omitempty"`
	StructuredAction   string `json:"se_ac,omitempty"`
	StructuredLabel    string `json:"se_la,omitempty"`
	StructuredProperty string `json:"se_pr,omitempty"`
	StructuredValue    string `json:"se_va,omitempty"`
}

Event is the envelope for events sent to Snowplow.

Fields specific to one event type (ue_px for "ue", se_* for "se") use omitempty so they're absent on the other type. cx is shared by both event types and is omitempty so it's absent when no contexts are attached. Snowplow's atomic schema parses se_va as java.math.BigDecimal and stm as "ms since epoch"; an empty string in either fails validation.

nolint:tagliatelle

type EventContext added in v2.18.0

type EventContext interface {
	Schema() string
	Data() any
}

EventContext is a self-describing JSON object attached to a Snowplow event as a custom context. Schema returns the iglu schema URI; Data returns the value placed in the `data` field of the context's self-describing JSON. The tracker wraps each context in {schema, data}, places them in a contexts envelope, JSON-encodes the envelope, and base64-encodes the result into the event's `cx` field. Data must therefore be JSON-marshalable.

GitLabStandardContext is the built-in implementation for the gitlab_standard schema. Callers can define service-specific contexts by implementing this interface and passing them to Tracker.TrackEventWithContexts.

Implementations may also implement Validate() error; the tracker calls it before queuing the event and aborts on a non-nil return.

The name avoids shadowing the standard library's context.Context.

type EventStorage

type EventStorage interface {
	// Add stores a single event.
	Add(payload Event)

	// AddBatch stores multiple events atomically.
	AddBatch(payloads []Event)

	// Get removes and returns up to the N oldest events.
	// Events are returned in FIFO order.
	Get(n int) []Event
}

EventStorage defines the interface for storing and retrieving events. Implementations must be thread-safe as events may be added and retrieved concurrently.

func NewStorage

func NewStorage(mc *metrics.Collector) EventStorage

NewStorage creates a new in-memory event storage with default configuration. The storage uses a ring buffer with a fixed capacity of 10,000 events.

type GitLabStandardContext added in v2.18.0

type GitLabStandardContext struct {
	// Required.
	Environment string `json:"environment"`

	// Identity.
	ProjectID                 int64  `json:"project_id,omitempty"`
	NamespaceID               int64  `json:"namespace_id,omitempty"`
	OrganizationID            int64  `json:"organization_id,omitempty"`
	UltimateParentNamespaceID int64  `json:"ultimate_parent_namespace_id,omitempty"`
	UserID                    UserID `json:"user_id,omitempty"`
	UserType                  string `json:"user_type,omitempty"`
	IsGitLabTeamMember        *bool  `json:"is_gitlab_team_member,omitempty"`

	// Source / environment.
	Source            string `json:"source,omitempty"`
	Plan              string `json:"plan,omitempty"`
	GoogleAnalyticsID string `json:"google_analytics_id,omitempty"`

	// Instance.
	Realm            Realm          `json:"realm,omitempty"`
	DeploymentType   DeploymentType `json:"deployment_type,omitempty"`
	InstanceID       string         `json:"instance_id,omitempty"`
	UniqueInstanceID string         `json:"unique_instance_id,omitempty"`
	HostName         string         `json:"host_name,omitempty"`
	InstanceVersion  string         `json:"instance_version,omitempty"`
	GlobalUserID     string         `json:"global_user_id,omitempty"`

	// Request.
	CorrelationID      string `json:"correlation_id,omitempty"`
	ContextGeneratedAt string `json:"context_generated_at,omitempty"`

	// Client.
	Interface     string `json:"interface,omitempty"`
	ClientName    string `json:"client_name,omitempty"`
	ClientVersion string `json:"client_version,omitempty"`
	ClientType    string `json:"client_type,omitempty"`

	// Feature.
	FeatureCategory              string  `json:"feature_category,omitempty"`
	FeatureEnabledByNamespaceIDs []int64 `json:"feature_enabled_by_namespace_ids,omitempty"`
	FeatureEnablementType        string  `json:"feature_enablement_type,omitempty"`

	// AI/ML.
	InputTokens   int64  `json:"input_tokens,omitempty"`
	OutputTokens  int64  `json:"output_tokens,omitempty"`
	TotalTokens   int64  `json:"total_tokens,omitempty"`
	ModelEngine   string `json:"model_engine,omitempty"`
	ModelName     string `json:"model_name,omitempty"`
	ModelProvider string `json:"model_provider,omitempty"`

	// Extensibility.
	Extra map[string]any `json:"extra,omitempty"`
}

GitLabStandardContext is a typed payload for the gitlab_standard iglu schema (see SchemaGitLabStandard), mirroring it field-for-field.

Only Environment is required; other fields are omitted from the JSON payload when zero. IsGitLabTeamMember is a *bool so the payload can distinguish "not set" from false. UserID uses the sealed UserID interface; set it with UserIDString or UserIDInt.

GitLabStandardContext satisfies EventContext directly; pass it to Tracker.TrackEventWithContexts without conversion.

Schema evolution: additive revisions update this type in place. Breaking revisions (renames, retypes, removals, tightened constraints) introduce a new type (e.g. GitLabStandardContextV2) so existing callers keep compiling.

nolint:tagliatelle

func (GitLabStandardContext) Data added in v2.18.0

func (c GitLabStandardContext) Data() any

Data returns the context payload. It is the receiver itself; encoding/json will marshal the exported fields according to their struct tags.

func (GitLabStandardContext) Schema added in v2.18.0

func (c GitLabStandardContext) Schema() string

Schema returns the iglu schema URI for the gitlab_standard context.

func (GitLabStandardContext) Validate added in v2.18.0

func (c GitLabStandardContext) Validate() error

Validate checks fields against the gitlab_standard iglu schema (see SchemaGitLabStandard): required Environment, Realm/DeploymentType enums, UserID variant, string maxLengths, and integer ranges (0..2^31-1). The schema is authoritative; this is a pre-flight check to catch errors before the event is sent.

type Realm added in v2.18.0

type Realm string

Realm identifies the GitLab deployment model. Use the Realm* constants; Validate re-checks the value at runtime to catch explicit casts.

const (
	RealmSelfManaged Realm = "self-managed"
	RealmSaaS        Realm = "saas"
	RealmDedicated   Realm = "dedicated"
)

Realm values from the gitlab_standard iglu schema (see SchemaGitLabStandard).

type SelfDescribingJSON

type SelfDescribingJSON struct {
	Schema string `json:"schema"`
	Data   any    `json:"data"`
}

type SendCallback

type SendCallback func(successes []SendResult, failures []SendResult)

SendCallback is called after each send loop iteration with aggregated results.

type SendResult

type SendResult struct {
	EventCount int
	StatusCode int
	Err        error
}

SendResult represents the outcome of sending a batch of events.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage provides thread-safe in-memory event storage using a fixed-size ring buffer. It implements a FIFO queue: when full, the oldest event is overwritten. The backing array is allocated once and never grows.

func (*Storage) Add

func (s *Storage) Add(payload Event)

Add stores an event. The event is copied to prevent external modification. If storage is at max capacity, the oldest event is dropped.

Note: The event may be modified by the caller between when this method is called and when the copy is made. Callers should not modify the event map concurrently with this call.

func (*Storage) AddBatch

func (s *Storage) AddBatch(payloads []Event)

AddBatch stores multiple events atomically under a single lock. Events are copied to prevent external modification. If adding the batch exceeds capacity, the oldest events are dropped.

func (*Storage) Get

func (s *Storage) Get(n int) []Event

Get removes and returns up to the N oldest events in FIFO order. Returned events are owned by the caller; no further copy is made.

type TokenSource

type TokenSource interface {
	EnhanceHeader(ctx context.Context, h http.Header, audience string) error
}

TokenSource is an interface enabling authentication capabilities for requests sent by Emitter to the API gateway. When added, it will perform operations needed to retrieve/generate a token and will next enhance the request header with it.

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker provides the API for tracking Snowplow events.

func NewTracker

func NewTracker(appId string, emitter *Emitter) (*Tracker, error)

NewTracker creates a new Tracker instance with the given application ID and emitter. Returns an error if emitter is nil or appId is empty.

Example

ExampleNewTracker demonstrates how to create a new Snowplow tracker.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Create a tracker with your application ID.
	// The tracker generates event payloads with unique IDs and timestamps.
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("app_started", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) Flush

func (t *Tracker) Flush()

Flush triggers immediate sending of any buffered events. This is a non-blocking operation. See Emitter.Flush for details.

func (*Tracker) Stop

func (t *Tracker) Stop()

Stop initiates graceful shutdown of the emitter and blocks until complete. See Emitter.Stop for details on shutdown behavior.

Example

ExampleTracker_Stop demonstrates proper shutdown.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track some events
	err = tracker.TrackEvent("event1", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	err = tracker.TrackEvent("event2", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	// Stop waits for in-progress sends to complete before shutdown.
	tracker.Stop()
}

func (*Tracker) TrackBillingEvent

func (t *Tracker) TrackBillingEvent(requiredInput BillingEventRequiredInput) error

TrackBillingEvent tracks a billing event with the billable_usage context.

The event is sent as a Snowplow structured event with billing context attached. All fields in the requiredInput input parameter must be set. The quantity must be greater than zero.

Example

ExampleTracker_TrackBillingEvent demonstrates tracking a billing event.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a billing event with minimal required fields.
	// The event is sent as a Snowplow structured event with billing context attached.
	// Realm must be one of: "SaaS", "Dedicated", "SM"
	err = tracker.TrackBillingEvent(
		snowplow.BillingEventRequiredInput{
			Category:      "CodeSuggestionsService",
			EventType:     "code_completions",
			Realm:         "SaaS",
			UnitOfMeasure: "tokens",
			Quantity:      150.0,
		},
	)
	if err != nil {
		log.Printf("Failed to track billing event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) TrackBillingEventWithOptionalInput

func (t *Tracker) TrackBillingEventWithOptionalInput(
	requiredInput BillingEventRequiredInput,
	optionalInput BillingEventOptionalInput,
) error

TrackBillingEventWithOptionalInput tracks a billing event with the billable_usage context.

Similar to TrackBillingEvent, but allows to pass also the optional event input fields.

Example

ExampleTracker_TrackBillingEventWithOptionalInput demonstrates tracking a billing event with context options.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a billing event with full context for billing reconciliation.
	// All optional fields help identify the source and context of the billable usage.
	err = tracker.TrackBillingEventWithOptionalInput(
		snowplow.BillingEventRequiredInput{
			Category:      "CodeSuggestionsService",
			EventType:     "code_generations",
			Realm:         "SaaS",
			UnitOfMeasure: "tokens",
			Quantity:      500.0,
		},
		snowplow.BillingEventOptionalInput{
			InstanceID:      "instance-abc123",
			ProjectID:       12345,
			NamespaceID:     67890,
			OrganizationID:  22222,
			RootNamespaceID: 11111,
			Subject:         "user-456",
			GlobalUserID:    "global-user-789",
			CorrelationID:   "request-correlation-xyz",
			DeploymentType:  "self-managed",
			EntityID:        "entity-abc",
			Metadata: map[string]any{
				"model":    "claude-3",
				"language": "go",
			},
		},
	)
	if err != nil {
		log.Printf("Failed to track billing event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) TrackEvent

func (t *Tracker) TrackEvent(eventName string, props map[string]any) error

TrackEvent tracks a custom event with the given name and optional properties. It is equivalent to TrackEventWithContexts with zero contexts; see that method for the full error contract. To attach typed contexts, use TrackEventWithContexts directly.

Example

ExampleTracker_TrackEvent demonstrates tracking a simple event without properties.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a simple event with just a name (no properties).
	// Pass nil for events without additional properties.
	err = tracker.TrackEvent("user_login", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}
Example (WithNestedProperties)

ExampleTracker_TrackEvent_withNestedProperties demonstrates tracking events with nested properties.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track an event with nested properties (multi-level structure).
	// Supports nested maps and complex data structures.
	err = tracker.TrackEvent("page_view", map[string]any{
		"title":    "Product Details - Premium Widget",
		"duration": 4500,
		"user": map[string]any{
			"id":       67890,
			"username": "john_doe",
		},
	})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}
Example (WithProperties)

ExampleTracker_TrackEvent_withProperties demonstrates tracking events with simple properties.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track an event with simple properties (flat structure).
	// Supports string, int, float, bool types.
	err = tracker.TrackEvent("button_clicked", map[string]any{
		"button_id": "submit_form",
		"user_id":   12345,
	})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) TrackEventWithContexts added in v2.18.0

func (t *Tracker) TrackEventWithContexts(eventName string, props map[string]any, contexts ...EventContext) error

TrackEventWithContexts tracks a custom event with typed contexts attached. Contexts are serialized into the Snowplow `cx` field so their fields become first-class warehouse columns rather than entries in the freeform props bag.

Each context must satisfy EventContext. If a context also implements Validate() error, the tracker calls it and aborts the event on a non-nil return. With zero contexts the call is equivalent to TrackEvent.

type UserID added in v2.18.0

type UserID interface {
	// contains filtered or unexported methods
}

UserID is the type of GitLabStandardContext.UserID. The schema permits a string or an integer; use UserIDString or UserIDInt. The unexported isUserID marker seals the interface — only this package can declare variants.

type UserIDInt added in v2.18.0

type UserIDInt int64

UserIDInt is the integer variant of UserID. The underlying int64 covers the full schema range (0..2^31-1) on every platform.

type UserIDString added in v2.18.0

type UserIDString string

UserIDString is the string variant of UserID.

Directories

Path Synopsis
Package metrics is a module providing observability through Prometheus Metrics for the snowplow events mechanism.
Package metrics is a module providing observability through Prometheus Metrics for the snowplow events mechanism.
Package oidc provides an implementation of the snowplow.TokenSource interface, allowing snowplow.Emitter to authenticate with the collector endpoint using the OIDC flow.
Package oidc provides an implementation of the snowplow.TokenSource interface, allowing snowplow.Emitter to authenticate with the collector endpoint using the OIDC flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL