snowplow

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package snowplow provides a lightweight Snowplow event tracking implementation for Go applications.

The package enables asynchronous event collection and batch sending to Snowplow collectors with automatic retry logic and FIFO queue management to prevent event stagnation.

Basic Usage

Create an emitter to handle event sending:

emitter, err := snowplow.NewEmitter("https://collector.example.com/com.snowplowanalytics.snowplow/tp2")
if err != nil {
	log.Fatal(err)
}

Create a tracker to generate events:

tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
	log.Fatal(err)
}
defer tracker.Stop()

Track custom events with or without properties:

// Simple event without properties
tracker.TrackEvent("user_login", nil)

// Event with properties
tracker.TrackEvent("button_clicked", map[string]any{
	"button_id": "submit",
	"page":      "/checkout",
})

// Event with nested properties
tracker.TrackEvent("page_view", map[string]any{
	"title": "Product Page",
	"user": map[string]any{
		"id":   12345,
		"role": "customer",
	},
})

Billing Events

Track billing events for GitLab's billable usage tracking. Billing events use the billable_usage schema (iglu:com.gitlab/billable_usage/jsonschema/1-0-2) and are sent as Snowplow structured events with billing context attached.

// Minimal billing event
tracker.TrackBillingEvent(
	BillingEventRequiredInput{
		Category:      "CodeSuggestionsService",
		EventType:     "code_completions",
		Realm:         "SaaS",
		UnitOfMeasure: "tokens",
		Quantity:      150.0,
	}
)

// Billing event with full context
tracker.TrackBillingEventWithOptionalInput(
	BillingEventRequiredInput{
		Category:      "CodeSuggestionsService",
		EventType:     "code_completions",
		Realm:         "SaaS",
		UnitOfMeasure: "tokens",
		Quantity:      150.0,
	},
	BillingEventOptionalInput{
		InstanceID:      "instance-abc123",
		ProjectID:       12345,
		NamespaceID:     67890,
		RootNamespaceID: 22222,
		OrganizationID:  11111,
		Subject:         "user-456",
		GlobalUserID:    "global-user-789,
		CorrelationID:   "request-correlation-xyz",
		DeploymentType:  "self-managed",
		EntityID:        "entity-abc",
		Metadata: map[string]any{
			"model": "claude-3",
		},
	},
)

Emitter Options

The emitter can be configured using functional options:

emitter, err := snowplow.NewEmitter(
	"https://collector.example.com/com.snowplowanalytics.snowplow/tp2",
	snowplow.WithCallback(func(successes, failures []snowplow.SendResult) {
		for _, s := range successes {
			log.Printf("Sent %d events (status: %d)", s.EventCount, s.StatusCode)
		}
		for _, f := range failures {
			if f.Err != nil {
				log.Printf("Failed to send %d events: %v", f.EventCount, f.Err)
			} else {
				log.Printf("Failed to send %d events (status: %d)", f.EventCount, f.StatusCode)
			}
		}
	}),
)

Available options:

WithCallback: Sets a callback function that is invoked after each send loop iteration with aggregated success and failure results. The callback receives two slices of SendResult: successes (HTTP 2xx/3xx responses) and failures (HTTP 4xx/5xx responses or local errors). Each SendResult contains EventCount (number of events in the batch), StatusCode (HTTP status or -1 for local errors), and Err (non-nil for local errors like network failures or marshal errors).

Metrics and Observability

The emitter automatically collects Prometheus metrics tracking event flow and performance. Metrics are accessible via Emitter.Collector(), which returns a prometheus.Collector that can be registered with any Prometheus registry.

Available metrics include:

  • Event enqueue counts
  • Send success/failure counts
  • Batch delivery duration (histogram)
  • Queue depth and capacity
  • Overflow drop counts

Example:

collector := emitter.Collector()
// Register with Prometheus
prometheus.MustRegister(collector)

For detailed metric descriptions, see the metrics package documentation.

Architecture

The implementation consists of three main components:

Tracker: High-level API for tracking events. Generates Snowplow protocol-compliant payloads with unique IDs and timestamps, and encodes custom event data as base64-encoded self-describing JSON.

Emitter: Handles asynchronous batch sending of events to Snowplow collectors. Events are buffered in memory and sent in batches (default: 5 events per batch). Failed sends are automatically retried by re-queueing events. Uses context cancellation for graceful shutdown.

Storage: Thread-safe in-memory event storage using a FIFO queue. Events are retrieved in order (oldest first) to prevent event stagnation. Implements automatic capacity management with a maximum size of 10,000 events.

Event Processing

Events flow through the system as follows: Tracker.TrackEvent creates a Snowplow protocol payload, Emitter.AddAsync stores the event in the FIFO queue, Emitter automatically batches and sends events (5 per batch), on failure events are re-queued to the back of the queue, and FIFO ordering ensures old events eventually get processed.

Lifecycle Management

The emitter provides lifecycle management methods:

Flush: Triggers immediate sending of buffered events (non-blocking).

Stop: Cancels in-flight requests and waits for completion (blocking). After Stop is called, TrackEvent will return an error (ErrEmitterStopped).

Always call Stop before application shutdown:

tracker.Stop()

Event Structure

Custom events (TrackEvent) are sent as self-describing events with standard Snowplow fields: aid (application ID), dtm (device timestamp), e (event type), eid (event ID), p (platform), stm (sent timestamp), tna (namespace), tv (tracker version), ua (user agent), and ue_px (base64-encoded self-describing JSON payload).

Billing events (TrackBillingEvent, TrackBillingEventWithOptionalInput) are sent as structured events with fields: se_ca (category), se_ac (action/event type), se_la (label/event ID), se_va (value/quantity), and cx (base64-encoded billing context). The billing context follows the billable_usage schema with required fields: event_id, event_type, realm, unit_of_measure, quantity, and timestamp.

Implementation Details

Events are sent asynchronously in batches (100 events per batch) to minimize overhead. Failed sends automatically retry by re-queueing events to the back of the queue. FIFO queue ordering prevents event stagnation (oldest events are sent first). Storage capacity is limited to 10,000 events; when full, oldest events are dropped. Storage is in-memory only (events are lost on process restart). HTTP client uses connection pooling with a 5 second timeout.

Thread Safety

All public APIs are thread-safe and can be called concurrently from multiple goroutines. The storage implementation uses mutex synchronization to protect internal state.

Example

Example demonstrates basic usage of the Snowplow event tracker.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Initialize emitter with collector URL.
	// The emitter handles automatic batching (5 events per batch) and retry logic.
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Create tracker with application ID.
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a simple event
	err = tracker.TrackEvent("user_login", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	// Stop tracker before shutdown
	tracker.Stop()
}

Index

Examples

Constants

View Source
const (
	// SchemaCustomEvent points to the GitLab custom event schema
	// https://gitlab.com/gitlab-org/iglu/-/blob/master/public/schemas/com.gitlab/custom_event/jsonschema/1-0-0
	SchemaCustomEvent = "iglu:com.gitlab/custom_event/jsonschema/1-0-0"

	// SchemaPayloadData points to the schema for event envelop (top level wrapper for event payload)
	// https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4
	SchemaPayloadData = "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4"

	// SchemaSelfDescribedEvent points to the schema for self-describing event (event includes link to a schema to describe the payloads structure)
	// https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0
	SchemaSelfDescribedEvent = "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0"

	// SchemaContexts points to the schema for contexts wrapper (array of context entities)
	// https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1
	SchemaContexts = "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"

	// SchemaBillableUsage points to GitLab billable usage context schema
	// https://gitlab.com/gitlab-org/iglu/-/tree/master/public/schemas/com.gitlab/billable_usage
	SchemaBillableUsage = "iglu:com.gitlab/billable_usage/jsonschema/1-0-2"
)

Variables

View Source
var ErrEmitterStopped = errors.New("emitter is stopped")

ErrEmitterStopped is returned when AddAsync is called on a stopped emitter.

Functions

This section is empty.

Types

type BillingEventOptionalInput

type BillingEventOptionalInput struct {
	InstanceID       string         `json:"instance_id,omitempty"`
	InstanceVersion  string         `json:"instance_version,omitempty"`
	UniqueInstanceID string         `json:"unique_instance_id,omitempty"`
	HostName         string         `json:"host_name,omitempty"`
	ProjectID        int64          `json:"project_id,omitempty"`
	NamespaceID      int64          `json:"namespace_id,omitempty"`
	OrganizationID   int64          `json:"organization_id,omitempty"`
	RootNamespaceID  int64          `json:"root_namespace_id,omitempty"`
	Subject          string         `json:"subject,omitempty"`
	GlobalUserID     string         `json:"global_user_id,omitempty"`
	SeatIDs          []string       `json:"seat_ids,omitempty"`
	CorrelationID    string         `json:"correlation_id,omitempty"`
	DeploymentType   string         `json:"deployment_type,omitempty"`
	Assignments      []string       `json:"assignments,omitempty"`
	EntityID         string         `json:"entity_id,omitempty"`
	Metadata         map[string]any `json:"metadata,omitempty"`
}

BillingEventOptionalInput is a struct containing fields that are optional when approaching to track a billing event. To set them the caller must use TrackBillingEventWithOptionalInput() instead of TrackBillingEvent().

All fields are directly translated to the IGLU billable schema. Exact URL of the schema is defined with the SchemaBillableUsage const.

nolint:tagliatelle

type BillingEventRequiredInput

type BillingEventRequiredInput struct {
	// Category is not part of the billing event schema, but it's used
	// to fill the value expected by Snowplow in the event envelope
	Category string `json:"-"`

	// EventType defines the type name of the event that should be agreed
	// with the Billing Platform
	EventType string `json:"event_type"`

	Realm         string  `json:"realm"`
	UnitOfMeasure string  `json:"unit_of_measure"`
	Quantity      float64 `json:"quantity"`
}

BillingEventRequiredInput is a struct containing fields that must be set when approaching to track a billing event through TrackBillingEvent() or TrackBillingEventWithOptionalInput().

All fields aside of Category are directly translated to the IGLU billable schema. Exact URL of the schema is defined with the SchemaBillableUsage const.

nolint:tagliatelle

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter handles asynchronous batch sending of events to a Snowplow collector.

It runs a single background goroutine that waits for flush signals and drains the event storage in batches. The flush channel (capacity 1) naturally coalesces multiple signals, so concurrent AddAsync calls never spawn extra goroutines.

func NewEmitter

func NewEmitter(collectorURL string, opts ...EmitterOption) (*Emitter, error)

NewEmitter creates a new Emitter with the given collector URL. Returns an error if the collector URL is invalid.

Example

ExampleNewEmitter demonstrates how to create a new Snowplow emitter.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Create an emitter that sends events to a Snowplow collector.
	// The emitter handles automatic batching and retry logic.
	// Events are stored in a FIFO queue to prevent stagnation.
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Use the emitter with a tracker
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("app_started", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Emitter) AddAsync

func (e *Emitter) AddAsync(event Event) error

AddAsync adds an event to the buffer and triggers asynchronous sending. Returns ErrEmitterStopped if the emitter has been stopped.

func (*Emitter) Collector

func (e *Emitter) Collector() prometheus.Collector

Collector returns the internal prometheus.Collector providing details about Emitter's state and performance.

Example

ExampleEmitter_Collector demonstrates how to access and use the metrics collector from an emitter for Prometheus monitoring.

package main

import (
	"log"

	"github.com/prometheus/client_golang/prometheus"
	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Create an emitter as usual
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Access the Prometheus collector for metrics export
	collector := emitter.Collector()

	// The collector can be registered with a Prometheus registry.
	// Registry can be next exposed with HTTP for Prometheus to
	// scrape the metrics.
	registry := prometheus.NewRegistry()
	registry.MustRegister(collector)

	// Use the emitter normally; metrics are collected automatically
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("page_view", map[string]any{"page": "/home"})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Emitter) Flush

func (e *Emitter) Flush()

Flush triggers sending of any buffered events. This is a non-blocking operation that signals the send loop if not already signaled.

func (*Emitter) Stop

func (e *Emitter) Stop()

Stop initiates graceful shutdown of the emitter and blocks until complete. It cancels the context (which aborts in-flight HTTP requests and causes the send loop to exit), then waits for the send loop goroutine to finish. After Stop returns, the emitter will not accept new events. Stop is safe to call multiple times.

type EmitterOption

type EmitterOption func(*emitterConfig)

EmitterOption configures an Emitter.

func WithCallback

func WithCallback(cb SendCallback) EmitterOption

WithCallback sets a callback for send results.

The callback is invoked synchronously within the send goroutine after each send loop iteration. Callbacks should be non-blocking; otherwise, they will stall event sending.

Example

ExampleWithCallback demonstrates using a callback to handle send results.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	// Create an emitter with a callback to handle success and failure results.
	// The callback is invoked once per send loop with aggregated results.
	emitter, err := snowplow.NewEmitter(
		"http://localhost:9093/com.snowplowanalytics.snowplow/tp2",
		snowplow.WithCallback(func(successes, failures []snowplow.SendResult) {
			for _, s := range successes {
				log.Printf("Successfully sent %d events (status: %d)", s.EventCount, s.StatusCode)
			}
			for _, f := range failures {
				if f.Err != nil {
					log.Printf("Failed to send %d events: %v", f.EventCount, f.Err)
				} else {
					log.Printf("Failed to send %d events (status: %d)", f.EventCount, f.StatusCode)
				}
			}
		}),
	)
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("user_action", map[string]any{"action": "click"})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func WithMetricsCollectorOptions

func WithMetricsCollectorOptions(opts ...metrics.CollectorOption) EmitterOption

WithMetricsCollectorOptions passes customization options to the Collector.

Collector is always created for the Emitter to tracks its state and performance.

func WithTokenSource

func WithTokenSource(ts TokenSource) EmitterOption

WithTokenSource sets a token source to authenticate requests to the API gateway.

type Event

type Event struct {
	ID             string    `json:"eid"`
	Type           eventType `json:"e"`
	AppID          string    `json:"aid"`
	TrackerVersion string    `json:"tv"`
	UserAgent      string    `json:"ua"`
	Platform       string    `json:"p"`
	Namespace      string    `json:"tna"`
	Timestamp      string    `json:"dtm"`
	SentTimestamp  string    `json:"stm"`

	SelfDescribedEncoded string `json:"ue_px"`

	ContextEncoded string `json:"cx"`

	StructuredCategory string `json:"se_ca"`
	StructuredAction   string `json:"se_ac"`
	StructuredLabel    string `json:"se_la"`
	StructuredProperty string `json:"se_pr"`
	StructuredValue    string `json:"se_va"`
}

Event defines the structure that's an envelope for events sent to snowplow. This is the implementation of snowplow's API.

nolint:tagliatelle

type EventStorage

type EventStorage interface {
	// Add stores a single event.
	Add(payload Event)

	// AddBatch stores multiple events atomically.
	AddBatch(payloads []Event)

	// Get removes and returns up to the N oldest events.
	// Events are returned in FIFO order.
	Get(n int) []Event
}

EventStorage defines the interface for storing and retrieving events. Implementations must be thread-safe as events may be added and retrieved concurrently.

func NewStorage

func NewStorage(mc *metrics.Collector) EventStorage

NewStorage creates a new in-memory event storage with default configuration. The storage uses a ring buffer with a fixed capacity of 10,000 events.

type SelfDescribingJSON

type SelfDescribingJSON struct {
	Schema string `json:"schema"`
	Data   any    `json:"data"`
}

type SendCallback

type SendCallback func(successes []SendResult, failures []SendResult)

SendCallback is called after each send loop iteration with aggregated results.

type SendResult

type SendResult struct {
	EventCount int
	StatusCode int
	Err        error
}

SendResult represents the outcome of sending a batch of events.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage provides thread-safe in-memory event storage using a fixed-size ring buffer. It implements a FIFO queue: when full, the oldest event is overwritten. The backing array is allocated once and never grows.

func (*Storage) Add

func (s *Storage) Add(payload Event)

Add stores an event. The event is copied to prevent external modification. If storage is at max capacity, the oldest event is dropped.

Note: The event may be modified by the caller between when this method is called and when the copy is made. Callers should not modify the event map concurrently with this call.

func (*Storage) AddBatch

func (s *Storage) AddBatch(payloads []Event)

AddBatch stores multiple events atomically under a single lock. Events are copied to prevent external modification. If adding the batch exceeds capacity, the oldest events are dropped.

func (*Storage) Get

func (s *Storage) Get(n int) []Event

Get removes and returns up to the N oldest events in FIFO order. Returned events are owned by the caller; no further copy is made.

type TokenSource

type TokenSource interface {
	EnhanceHeader(ctx context.Context, h http.Header, audience string) error
}

TokenSource is an interface enabling authentication capabilities for requests sent by Emitter to the API gateway. When added, it will perform operations needed to retrieve/generate a token and will next enhance the request header with it.

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker provides the API for tracking Snowplow events.

func NewTracker

func NewTracker(appId string, emitter *Emitter) (*Tracker, error)

NewTracker creates a new Tracker instance with the given application ID and emitter. Returns an error if emitter is nil or appId is empty.

Example

ExampleNewTracker demonstrates how to create a new Snowplow tracker.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	// Create a tracker with your application ID.
	// The tracker generates event payloads with unique IDs and timestamps.
	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	err = tracker.TrackEvent("app_started", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) Flush

func (t *Tracker) Flush()

Flush triggers immediate sending of any buffered events. This is a non-blocking operation. See Emitter.Flush for details.

func (*Tracker) Stop

func (t *Tracker) Stop()

Stop initiates graceful shutdown of the emitter and blocks until complete. See Emitter.Stop for details on shutdown behavior.

Example

ExampleTracker_Stop demonstrates proper shutdown.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track some events
	err = tracker.TrackEvent("event1", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	err = tracker.TrackEvent("event2", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	// Stop waits for in-progress sends to complete before shutdown.
	tracker.Stop()
}

func (*Tracker) TrackBillingEvent

func (t *Tracker) TrackBillingEvent(requiredInput BillingEventRequiredInput) error

TrackBillingEvent tracks a billing event with the billable_usage context.

The event is sent as a Snowplow structured event with billing context attached. All fields in the requiredInput input parameter must be set. The quantity must be greater than zero.

Example

ExampleTracker_TrackBillingEvent demonstrates tracking a billing event.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a billing event with minimal required fields.
	// The event is sent as a Snowplow structured event with billing context attached.
	// Realm must be one of: "SaaS", "Dedicated", "SM"
	err = tracker.TrackBillingEvent(
		snowplow.BillingEventRequiredInput{
			Category:      "CodeSuggestionsService",
			EventType:     "code_completions",
			Realm:         "SaaS",
			UnitOfMeasure: "tokens",
			Quantity:      150.0,
		},
	)
	if err != nil {
		log.Printf("Failed to track billing event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) TrackBillingEventWithOptionalInput

func (t *Tracker) TrackBillingEventWithOptionalInput(
	requiredInput BillingEventRequiredInput,
	optionalInput BillingEventOptionalInput,
) error

TrackBillingEventWithOptionalInput tracks a billing event with the billable_usage context.

Similar to TrackBillingEvent, but allows to pass also the optional event input fields.

Example

ExampleTracker_TrackBillingEventWithOptionalInput demonstrates tracking a billing event with context options.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a billing event with full context for billing reconciliation.
	// All optional fields help identify the source and context of the billable usage.
	err = tracker.TrackBillingEventWithOptionalInput(
		snowplow.BillingEventRequiredInput{
			Category:      "CodeSuggestionsService",
			EventType:     "code_generations",
			Realm:         "SaaS",
			UnitOfMeasure: "tokens",
			Quantity:      500.0,
		},
		snowplow.BillingEventOptionalInput{
			InstanceID:      "instance-abc123",
			ProjectID:       12345,
			NamespaceID:     67890,
			OrganizationID:  22222,
			RootNamespaceID: 11111,
			Subject:         "user-456",
			GlobalUserID:    "global-user-789",
			CorrelationID:   "request-correlation-xyz",
			DeploymentType:  "self-managed",
			EntityID:        "entity-abc",
			Metadata: map[string]any{
				"model":    "claude-3",
				"language": "go",
			},
		},
	)
	if err != nil {
		log.Printf("Failed to track billing event: %v", err)
	}

	tracker.Stop()
}

func (*Tracker) TrackEvent

func (t *Tracker) TrackEvent(eventName string, props map[string]any) error

TrackEvent tracks a custom event with the given name and properties. Pass nil for props if the event has no additional properties. Returns an error if eventName is empty, if the event cannot be marshaled, or if the tracker has been stopped.

Example

ExampleTracker_TrackEvent demonstrates tracking a simple event without properties.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track a simple event with just a name (no properties).
	// Pass nil for events without additional properties.
	err = tracker.TrackEvent("user_login", nil)
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}
Example (WithNestedProperties)

ExampleTracker_TrackEvent_withNestedProperties demonstrates tracking events with nested properties.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track an event with nested properties (multi-level structure).
	// Supports nested maps and complex data structures.
	err = tracker.TrackEvent("page_view", map[string]any{
		"title":    "Product Details - Premium Widget",
		"duration": 4500,
		"user": map[string]any{
			"id":       67890,
			"username": "john_doe",
		},
	})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}
Example (WithProperties)

ExampleTracker_TrackEvent_withProperties demonstrates tracking events with simple properties.

package main

import (
	"log"

	"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)

func main() {
	emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
	if err != nil {
		log.Fatalf("Failed to create emitter: %v", err)
	}

	tracker, err := snowplow.NewTracker("my-application", emitter)
	if err != nil {
		log.Fatalf("Failed to create tracker: %v", err)
	}

	// Track an event with simple properties (flat structure).
	// Supports string, int, float, bool types.
	err = tracker.TrackEvent("button_clicked", map[string]any{
		"button_id": "submit_form",
		"user_id":   12345,
	})
	if err != nil {
		log.Printf("Failed to track event: %v", err)
	}

	tracker.Stop()
}

Directories

Path Synopsis
Package metrics is a module providing observability through Prometheus Metrics for the snowplow events mechanism.
Package metrics is a module providing observability through Prometheus Metrics for the snowplow events mechanism.
Package oidc provides an implementation of the snowplow.TokenSource interface, allowing snowplow.Emitter to authenticate with the collector endpoint using the OIDC flow.
Package oidc provides an implementation of the snowplow.TokenSource interface, allowing snowplow.Emitter to authenticate with the collector endpoint using the OIDC flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL