Documentation
¶
Overview ¶
Package snowplow provides a lightweight Snowplow event tracking implementation for Go applications.
The package enables asynchronous event collection and batch sending to Snowplow collectors with automatic retry logic and FIFO queue management to prevent event stagnation.
Basic Usage ¶
Create an emitter to handle event sending:
emitter, err := snowplow.NewEmitter("https://collector.example.com/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatal(err)
}
Create a tracker to generate events:
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatal(err)
}
defer tracker.Stop()
Track custom events with or without properties:
// Simple event without properties
tracker.TrackEvent("user_login", nil)
// Event with properties
tracker.TrackEvent("button_clicked", map[string]any{
"button_id": "submit",
"page": "/checkout",
})
// Event with nested properties
tracker.TrackEvent("page_view", map[string]any{
"title": "Product Page",
"user": map[string]any{
"id": 12345,
"role": "customer",
},
})
Custom Contexts ¶
To attach typed custom contexts to a custom event, use TrackEventWithContexts. Custom contexts populate the Snowplow `cx` field and let analysts query their fields as first-class warehouse columns instead of parsing them out of the freeform props bag.
The package ships with GitLabStandardContext, a typed payload for the gitlab_standard iglu schema with typed enums (Realm, DeploymentType) and a sealed UserID variant. Service-owned contexts can be defined by implementing the EventContext interface:
// Universal identity / environment fields shared across GitLab services.
standard := snowplow.GitLabStandardContext{
Environment: "production",
OrganizationID: 12345,
Realm: snowplow.RealmSaaS,
InstanceID: "gitlab-rails-1",
}
// Service-specific context (defined by the caller). Any type satisfying
// the EventContext interface — Schema() string and Data() any — can be
// passed; for example:
//
// type ArtifactRegistryEventContext struct {
// Format string `json:"format"`
// Kind string `json:"kind"`
// }
//
// func (c ArtifactRegistryEventContext) Schema() string {
// return "iglu:com.gitlab/artifact_registry_event/jsonschema/1-0-0"
// }
// func (c ArtifactRegistryEventContext) Data() any { return c }
registryCtx := ArtifactRegistryEventContext{
Format: "container",
Kind: "local",
}
tracker.TrackEventWithContexts(
"artifact_registry_artifact_pushed",
nil,
standard,
registryCtx,
)
Contexts may optionally implement `Validate() error`. If present, the tracker calls it before queuing the event; a validation failure aborts the event with the returned error. GitLabStandardContext.Validate covers the schema constraints: required Environment, Realm/DeploymentType enums, UserID variant, string maxLengths, and integer ranges (0..2^31-1).
TrackEvent is equivalent to TrackEventWithContexts with zero contexts and remains supported for backward compatibility.
Billing Events ¶
Track billing events for GitLab's billable usage tracking. Billing events use the billable_usage schema (iglu:com.gitlab/billable_usage/jsonschema/1-0-2) and are sent as Snowplow structured events with billing context attached.
// Minimal billing event
tracker.TrackBillingEvent(
BillingEventRequiredInput{
Category: "CodeSuggestionsService",
EventType: "code_completions",
Realm: "SaaS",
UnitOfMeasure: "tokens",
Quantity: 150.0,
}
)
// Billing event with full context
tracker.TrackBillingEventWithOptionalInput(
BillingEventRequiredInput{
Category: "CodeSuggestionsService",
EventType: "code_completions",
Realm: "SaaS",
UnitOfMeasure: "tokens",
Quantity: 150.0,
},
BillingEventOptionalInput{
InstanceID: "instance-abc123",
ProjectID: 12345,
NamespaceID: 67890,
RootNamespaceID: 22222,
OrganizationID: 11111,
Subject: "user-456",
GlobalUserID: "global-user-789",
CorrelationID: "request-correlation-xyz",
DeploymentType: "self-managed",
EntityID: "entity-abc",
Metadata: map[string]any{
"model": "claude-3",
},
},
)
Emitter Options ¶
The emitter can be configured using functional options:
emitter, err := snowplow.NewEmitter(
"https://collector.example.com/com.snowplowanalytics.snowplow/tp2",
snowplow.WithCallback(func(successes, failures []snowplow.SendResult) {
for _, s := range successes {
log.Printf("Sent %d events (status: %d)", s.EventCount, s.StatusCode)
}
for _, f := range failures {
if f.Err != nil {
log.Printf("Failed to send %d events: %v", f.EventCount, f.Err)
} else {
log.Printf("Failed to send %d events (status: %d)", f.EventCount, f.StatusCode)
}
}
}),
)
Available options:
WithCallback: Sets a callback function that is invoked after each send loop iteration with aggregated success and failure results. The callback receives two slices of SendResult: successes (HTTP 2xx/3xx responses) and failures (HTTP 4xx/5xx responses or local errors). Each SendResult contains EventCount (number of events in the batch), StatusCode (HTTP status or -1 for local errors), and Err (non-nil for local errors like network failures or marshal errors).
Metrics and Observability ¶
The emitter automatically collects Prometheus metrics tracking event flow and performance. Metrics are accessible via Emitter.Collector(), which returns a prometheus.Collector that can be registered with any Prometheus registry.
Available metrics include:
- Event enqueue counts
- Send success/failure counts
- Batch delivery duration (histogram)
- Queue depth and capacity
- Overflow drop counts
Example:
collector := emitter.Collector() // Register with Prometheus prometheus.MustRegister(collector)
For detailed metric descriptions, see the metrics package documentation.
Architecture ¶
The implementation consists of three main components:
Tracker: High-level API for tracking events. Generates Snowplow protocol-compliant payloads with unique IDs and timestamps, and encodes custom event data as base64-encoded self-describing JSON.
Emitter: Handles asynchronous batch sending of events to Snowplow collectors. Events are buffered in memory and sent in batches of up to 100 events. Failed sends are automatically retried by re-queueing events. Uses context cancellation for graceful shutdown.
Storage: Thread-safe in-memory event storage using a FIFO queue. Events are retrieved in order (oldest first) to prevent event stagnation. Implements automatic capacity management with a maximum size of 10,000 events.
Event Processing ¶
Events flow through the system as follows: Tracker.TrackEvent creates a Snowplow protocol payload, Emitter.AddAsync stores the event in the FIFO queue, Emitter automatically batches and sends events (up to 100 per batch), on failure events are re-queued to the back of the queue, and FIFO ordering ensures old events eventually get processed.
Lifecycle Management ¶
The emitter provides lifecycle management methods:
Flush: Triggers immediate sending of buffered events (non-blocking).
Stop: Cancels in-flight requests and waits for completion (blocking). After Stop is called, TrackEvent will return an error (ErrEmitterStopped).
Always call Stop before application shutdown:
tracker.Stop()
Event Structure ¶
Custom events (TrackEvent) are sent as self-describing events with standard Snowplow fields: aid (application ID), dtm (device timestamp), e (event type), eid (event ID), p (platform), stm (sent timestamp), tna (namespace), tv (tracker version), ua (user agent), and ue_px (base64-encoded self-describing JSON payload).
Billing events (TrackBillingEvent, TrackBillingEventWithOptionalInput) are sent as structured events with fields: se_ca (category), se_ac (action/event type), se_la (label/event ID), se_va (value/quantity), and cx (base64-encoded billing context). The billing context follows the billable_usage schema with required fields: event_id, event_type, realm, unit_of_measure, quantity, and timestamp.
Implementation Details ¶
Events are sent asynchronously in batches (100 events per batch) to minimize overhead. Failed sends automatically retry by re-queueing events to the back of the queue. FIFO queue ordering prevents event stagnation (oldest events are sent first). Storage capacity is limited to 10,000 events; when full, oldest events are dropped. Storage is in-memory only (events are lost on process restart). HTTP client uses connection pooling with a 5 second timeout.
Thread Safety ¶
All public APIs are thread-safe and can be called concurrently from multiple goroutines. The storage implementation uses mutex synchronization to protect internal state.
Example ¶
Example demonstrates basic usage of the Snowplow event tracker.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
// Initialize emitter with collector URL.
// The emitter handles automatic batching (5 events per batch) and retry logic.
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
// Create tracker with application ID.
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track a simple event
err = tracker.TrackEvent("user_login", nil)
if err != nil {
log.Printf("Failed to track event: %v", err)
}
// Stop tracker before shutdown
tracker.Stop()
}
Output:
Index ¶
- Constants
- Variables
- type BillingEventOptionalInput
- type BillingEventRequiredInput
- type DeploymentType
- type Emitter
- type EmitterOption
- type Event
- type EventContext
- type EventStorage
- type GitLabStandardContext
- type Realm
- type SelfDescribingJSON
- type SendCallback
- type SendResult
- type Storage
- type TokenSource
- type Tracker
- func (t *Tracker) Flush()
- func (t *Tracker) Stop()
- func (t *Tracker) TrackBillingEvent(requiredInput BillingEventRequiredInput) error
- func (t *Tracker) TrackBillingEventWithOptionalInput(requiredInput BillingEventRequiredInput, ...) error
- func (t *Tracker) TrackEvent(eventName string, props map[string]any) error
- func (t *Tracker) TrackEventWithContexts(eventName string, props map[string]any, contexts ...EventContext) error
- type UserID
- type UserIDInt
- type UserIDString
Examples ¶
Constants ¶
const ( // SchemaCustomEvent points to the GitLab custom event schema // https://gitlab.com/gitlab-org/iglu/-/blob/master/public/schemas/com.gitlab/custom_event/jsonschema/1-0-0 SchemaCustomEvent = "iglu:com.gitlab/custom_event/jsonschema/1-0-0" // SchemaPayloadData points to the schema for event envelop (top level wrapper for event payload) // https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4 SchemaPayloadData = "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4" // SchemaSelfDescribedEvent points to the schema for self-describing event (event includes link to a schema to describe the payloads structure) // https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0 SchemaSelfDescribedEvent = "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0" // SchemaContexts points to the schema for contexts wrapper (array of context entities) // https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1 SchemaContexts = "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1" // SchemaBillableUsage points to GitLab billable usage context schema // https://gitlab.com/gitlab-org/iglu/-/tree/master/public/schemas/com.gitlab/billable_usage SchemaBillableUsage = "iglu:com.gitlab/billable_usage/jsonschema/1-0-2" // SchemaGitLabStandard points to the GitLab standard context schema. This // is the same context the Rails monolith attaches to every Snowplow event, // providing universal identity and environment fields (organization_id, // realm, instance_id, etc.) that the Snowflake data warehouse models are // built to query. // https://gitlab.com/gitlab-org/iglu/-/tree/master/public/schemas/com.gitlab/gitlab_standard SchemaGitLabStandard = "iglu:com.gitlab/gitlab_standard/jsonschema/1-1-8" )
Variables ¶
var ErrEmitterStopped = errors.New("emitter is stopped")
ErrEmitterStopped is returned when AddAsync is called on a stopped emitter.
Functions ¶
This section is empty.
Types ¶
type BillingEventOptionalInput ¶
type BillingEventOptionalInput struct {
InstanceID string `json:"instance_id,omitempty"`
InstanceVersion string `json:"instance_version,omitempty"`
UniqueInstanceID string `json:"unique_instance_id,omitempty"`
HostName string `json:"host_name,omitempty"`
ProjectID int64 `json:"project_id,omitempty"`
NamespaceID int64 `json:"namespace_id,omitempty"`
OrganizationID int64 `json:"organization_id,omitempty"`
RootNamespaceID int64 `json:"root_namespace_id,omitempty"`
Subject string `json:"subject,omitempty"`
GlobalUserID string `json:"global_user_id,omitempty"`
SeatIDs []string `json:"seat_ids,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
DeploymentType string `json:"deployment_type,omitempty"`
Assignments []string `json:"assignments,omitempty"`
EntityID string `json:"entity_id,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
BillingEventOptionalInput is a struct containing fields that are optional when approaching to track a billing event. To set them the caller must use TrackBillingEventWithOptionalInput() instead of TrackBillingEvent().
All fields are directly translated to the IGLU billable schema. Exact URL of the schema is defined with the SchemaBillableUsage const.
nolint:tagliatelle
type BillingEventRequiredInput ¶
type BillingEventRequiredInput struct {
// Category is not part of the billing event schema, but it's used
// to fill the value expected by Snowplow in the event envelope
Category string `json:"-"`
// EventType defines the type name of the event that should be agreed
// with the Billing Platform
EventType string `json:"event_type"`
Realm string `json:"realm"`
UnitOfMeasure string `json:"unit_of_measure"`
Quantity float64 `json:"quantity"`
}
BillingEventRequiredInput is a struct containing fields that must be set when approaching to track a billing event through TrackBillingEvent() or TrackBillingEventWithOptionalInput().
All fields aside of Category are directly translated to the IGLU billable schema. Exact URL of the schema is defined with the SchemaBillableUsage const.
nolint:tagliatelle
type DeploymentType ¶ added in v2.18.0
type DeploymentType string
DeploymentType identifies the GitLab deployment model. Intended to replace Realm in a future schema revision. Use the DeploymentType* constants.
const ( DeploymentTypeSelfManaged DeploymentType = "self-managed" DeploymentTypeCom DeploymentType = ".com" DeploymentTypeDedicated DeploymentType = "dedicated" )
DeploymentType values from the gitlab_standard iglu schema (see SchemaGitLabStandard).
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
Emitter handles asynchronous batch sending of events to a Snowplow collector.
It runs a single background goroutine that waits for flush signals and drains the event storage in batches. The flush channel (capacity 1) naturally coalesces multiple signals, so concurrent AddAsync calls never spawn extra goroutines.
func NewEmitter ¶
func NewEmitter(collectorURL string, opts ...EmitterOption) (*Emitter, error)
NewEmitter creates a new Emitter with the given collector URL. Returns an error if the collector URL is invalid.
Example ¶
ExampleNewEmitter demonstrates how to create a new Snowplow emitter.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
// Create an emitter that sends events to a Snowplow collector.
// The emitter handles automatic batching and retry logic.
// Events are stored in a FIFO queue to prevent stagnation.
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
// Use the emitter with a tracker
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
err = tracker.TrackEvent("app_started", nil)
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
func (*Emitter) AddAsync ¶
AddAsync adds an event to the buffer and triggers asynchronous sending. Returns ErrEmitterStopped if the emitter has been stopped.
func (*Emitter) Collector ¶
func (e *Emitter) Collector() prometheus.Collector
Collector returns the internal prometheus.Collector providing details about Emitter's state and performance.
Example ¶
ExampleEmitter_Collector demonstrates how to access and use the metrics collector from an emitter for Prometheus monitoring.
package main
import (
"log"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
// Create an emitter as usual
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
// Access the Prometheus collector for metrics export
collector := emitter.Collector()
// The collector can be registered with a Prometheus registry.
// Registry can be next exposed with HTTP for Prometheus to
// scrape the metrics.
registry := prometheus.NewRegistry()
registry.MustRegister(collector)
// Use the emitter normally; metrics are collected automatically
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
err = tracker.TrackEvent("page_view", map[string]any{"page": "/home"})
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
func (*Emitter) Flush ¶
func (e *Emitter) Flush()
Flush triggers sending of any buffered events. This is a non-blocking operation that signals the send loop if not already signaled.
func (*Emitter) Stop ¶
func (e *Emitter) Stop()
Stop initiates graceful shutdown of the emitter and blocks until complete. It cancels the context (which aborts in-flight HTTP requests and causes the send loop to exit), then waits for the send loop goroutine to finish. After Stop returns, the emitter will not accept new events. Stop is safe to call multiple times.
type EmitterOption ¶
type EmitterOption func(*emitterConfig)
EmitterOption configures an Emitter.
func WithCallback ¶
func WithCallback(cb SendCallback) EmitterOption
WithCallback sets a callback for send results.
The callback is invoked synchronously within the send goroutine after each send loop iteration. Callbacks should be non-blocking; otherwise, they will stall event sending.
Example ¶
ExampleWithCallback demonstrates using a callback to handle send results.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
// Create an emitter with a callback to handle success and failure results.
// The callback is invoked once per send loop with aggregated results.
emitter, err := snowplow.NewEmitter(
"http://localhost:9093/com.snowplowanalytics.snowplow/tp2",
snowplow.WithCallback(func(successes, failures []snowplow.SendResult) {
for _, s := range successes {
log.Printf("Successfully sent %d events (status: %d)", s.EventCount, s.StatusCode)
}
for _, f := range failures {
if f.Err != nil {
log.Printf("Failed to send %d events: %v", f.EventCount, f.Err)
} else {
log.Printf("Failed to send %d events (status: %d)", f.EventCount, f.StatusCode)
}
}
}),
)
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
err = tracker.TrackEvent("user_action", map[string]any{"action": "click"})
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
func WithMetricsCollectorOptions ¶
func WithMetricsCollectorOptions(opts ...metrics.CollectorOption) EmitterOption
WithMetricsCollectorOptions passes customization options to the Collector.
Collector is always created for the Emitter to tracks its state and performance.
func WithTokenSource ¶
func WithTokenSource(ts TokenSource) EmitterOption
WithTokenSource sets a token source to authenticate requests to the API gateway.
type Event ¶
type Event struct {
ID string `json:"eid"`
Type eventType `json:"e"`
AppID string `json:"aid"`
TrackerVersion string `json:"tv"`
UserAgent string `json:"ua"`
Platform string `json:"p"`
Namespace string `json:"tna"`
Timestamp string `json:"dtm"`
SentTimestamp string `json:"stm,omitempty"`
SelfDescribedEncoded string `json:"ue_px,omitempty"`
ContextEncoded string `json:"cx,omitempty"`
StructuredCategory string `json:"se_ca,omitempty"`
StructuredAction string `json:"se_ac,omitempty"`
StructuredLabel string `json:"se_la,omitempty"`
StructuredProperty string `json:"se_pr,omitempty"`
StructuredValue string `json:"se_va,omitempty"`
}
Event is the envelope for events sent to Snowplow.
Fields specific to one event type (ue_px for "ue", se_* for "se") use omitempty so they're absent on the other type. cx is shared by both event types and is omitempty so it's absent when no contexts are attached. Snowplow's atomic schema parses se_va as java.math.BigDecimal and stm as "ms since epoch"; an empty string in either fails validation.
nolint:tagliatelle
type EventContext ¶ added in v2.18.0
EventContext is a self-describing JSON object attached to a Snowplow event as a custom context. Schema returns the iglu schema URI; Data returns the value placed in the `data` field of the context's self-describing JSON. The tracker wraps each context in {schema, data}, places them in a contexts envelope, JSON-encodes the envelope, and base64-encodes the result into the event's `cx` field. Data must therefore be JSON-marshalable.
GitLabStandardContext is the built-in implementation for the gitlab_standard schema. Callers can define service-specific contexts by implementing this interface and passing them to Tracker.TrackEventWithContexts.
Implementations may also implement Validate() error; the tracker calls it before queuing the event and aborts on a non-nil return.
The name avoids shadowing the standard library's context.Context.
type EventStorage ¶
type EventStorage interface {
// Add stores a single event.
Add(payload Event)
// AddBatch stores multiple events atomically.
AddBatch(payloads []Event)
// Get removes and returns up to the N oldest events.
// Events are returned in FIFO order.
Get(n int) []Event
}
EventStorage defines the interface for storing and retrieving events. Implementations must be thread-safe as events may be added and retrieved concurrently.
func NewStorage ¶
func NewStorage(mc *metrics.Collector) EventStorage
NewStorage creates a new in-memory event storage with default configuration. The storage uses a ring buffer with a fixed capacity of 10,000 events.
type GitLabStandardContext ¶ added in v2.18.0
type GitLabStandardContext struct {
// Required.
Environment string `json:"environment"`
// Identity.
ProjectID int64 `json:"project_id,omitempty"`
NamespaceID int64 `json:"namespace_id,omitempty"`
OrganizationID int64 `json:"organization_id,omitempty"`
UltimateParentNamespaceID int64 `json:"ultimate_parent_namespace_id,omitempty"`
UserID UserID `json:"user_id,omitempty"`
UserType string `json:"user_type,omitempty"`
IsGitLabTeamMember *bool `json:"is_gitlab_team_member,omitempty"`
// Source / environment.
Source string `json:"source,omitempty"`
Plan string `json:"plan,omitempty"`
GoogleAnalyticsID string `json:"google_analytics_id,omitempty"`
// Instance.
Realm Realm `json:"realm,omitempty"`
DeploymentType DeploymentType `json:"deployment_type,omitempty"`
InstanceID string `json:"instance_id,omitempty"`
UniqueInstanceID string `json:"unique_instance_id,omitempty"`
HostName string `json:"host_name,omitempty"`
InstanceVersion string `json:"instance_version,omitempty"`
GlobalUserID string `json:"global_user_id,omitempty"`
// Request.
CorrelationID string `json:"correlation_id,omitempty"`
ContextGeneratedAt string `json:"context_generated_at,omitempty"`
// Client.
Interface string `json:"interface,omitempty"`
ClientName string `json:"client_name,omitempty"`
ClientVersion string `json:"client_version,omitempty"`
ClientType string `json:"client_type,omitempty"`
// Feature.
FeatureCategory string `json:"feature_category,omitempty"`
FeatureEnabledByNamespaceIDs []int64 `json:"feature_enabled_by_namespace_ids,omitempty"`
FeatureEnablementType string `json:"feature_enablement_type,omitempty"`
// AI/ML.
InputTokens int64 `json:"input_tokens,omitempty"`
OutputTokens int64 `json:"output_tokens,omitempty"`
TotalTokens int64 `json:"total_tokens,omitempty"`
ModelEngine string `json:"model_engine,omitempty"`
ModelName string `json:"model_name,omitempty"`
ModelProvider string `json:"model_provider,omitempty"`
// Extensibility.
Extra map[string]any `json:"extra,omitempty"`
}
GitLabStandardContext is a typed payload for the gitlab_standard iglu schema (see SchemaGitLabStandard), mirroring it field-for-field.
Only Environment is required; other fields are omitted from the JSON payload when zero. IsGitLabTeamMember is a *bool so the payload can distinguish "not set" from false. UserID uses the sealed UserID interface; set it with UserIDString or UserIDInt.
GitLabStandardContext satisfies EventContext directly; pass it to Tracker.TrackEventWithContexts without conversion.
Schema evolution: additive revisions update this type in place. Breaking revisions (renames, retypes, removals, tightened constraints) introduce a new type (e.g. GitLabStandardContextV2) so existing callers keep compiling.
nolint:tagliatelle
func (GitLabStandardContext) Data ¶ added in v2.18.0
func (c GitLabStandardContext) Data() any
Data returns the context payload. It is the receiver itself; encoding/json will marshal the exported fields according to their struct tags.
func (GitLabStandardContext) Schema ¶ added in v2.18.0
func (c GitLabStandardContext) Schema() string
Schema returns the iglu schema URI for the gitlab_standard context.
func (GitLabStandardContext) Validate ¶ added in v2.18.0
func (c GitLabStandardContext) Validate() error
Validate checks fields against the gitlab_standard iglu schema (see SchemaGitLabStandard): required Environment, Realm/DeploymentType enums, UserID variant, string maxLengths, and integer ranges (0..2^31-1). The schema is authoritative; this is a pre-flight check to catch errors before the event is sent.
type Realm ¶ added in v2.18.0
type Realm string
Realm identifies the GitLab deployment model. Use the Realm* constants; Validate re-checks the value at runtime to catch explicit casts.
type SelfDescribingJSON ¶
type SendCallback ¶
type SendCallback func(successes []SendResult, failures []SendResult)
SendCallback is called after each send loop iteration with aggregated results.
type SendResult ¶
SendResult represents the outcome of sending a batch of events.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage provides thread-safe in-memory event storage using a fixed-size ring buffer. It implements a FIFO queue: when full, the oldest event is overwritten. The backing array is allocated once and never grows.
func (*Storage) Add ¶
Add stores an event. The event is copied to prevent external modification. If storage is at max capacity, the oldest event is dropped.
Note: The event may be modified by the caller between when this method is called and when the copy is made. Callers should not modify the event map concurrently with this call.
type TokenSource ¶
type TokenSource interface {
EnhanceHeader(ctx context.Context, h http.Header, audience string) error
}
TokenSource is an interface enabling authentication capabilities for requests sent by Emitter to the API gateway. When added, it will perform operations needed to retrieve/generate a token and will next enhance the request header with it.
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker provides the API for tracking Snowplow events.
func NewTracker ¶
NewTracker creates a new Tracker instance with the given application ID and emitter. Returns an error if emitter is nil or appId is empty.
Example ¶
ExampleNewTracker demonstrates how to create a new Snowplow tracker.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
// Create a tracker with your application ID.
// The tracker generates event payloads with unique IDs and timestamps.
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
err = tracker.TrackEvent("app_started", nil)
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
func (*Tracker) Flush ¶
func (t *Tracker) Flush()
Flush triggers immediate sending of any buffered events. This is a non-blocking operation. See Emitter.Flush for details.
func (*Tracker) Stop ¶
func (t *Tracker) Stop()
Stop initiates graceful shutdown of the emitter and blocks until complete. See Emitter.Stop for details on shutdown behavior.
Example ¶
ExampleTracker_Stop demonstrates proper shutdown.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track some events
err = tracker.TrackEvent("event1", nil)
if err != nil {
log.Printf("Failed to track event: %v", err)
}
err = tracker.TrackEvent("event2", nil)
if err != nil {
log.Printf("Failed to track event: %v", err)
}
// Stop waits for in-progress sends to complete before shutdown.
tracker.Stop()
}
Output:
func (*Tracker) TrackBillingEvent ¶
func (t *Tracker) TrackBillingEvent(requiredInput BillingEventRequiredInput) error
TrackBillingEvent tracks a billing event with the billable_usage context.
The event is sent as a Snowplow structured event with billing context attached. All fields in the requiredInput input parameter must be set. The quantity must be greater than zero.
Example ¶
ExampleTracker_TrackBillingEvent demonstrates tracking a billing event.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track a billing event with minimal required fields.
// The event is sent as a Snowplow structured event with billing context attached.
// Realm must be one of: "SaaS", "Dedicated", "SM"
err = tracker.TrackBillingEvent(
snowplow.BillingEventRequiredInput{
Category: "CodeSuggestionsService",
EventType: "code_completions",
Realm: "SaaS",
UnitOfMeasure: "tokens",
Quantity: 150.0,
},
)
if err != nil {
log.Printf("Failed to track billing event: %v", err)
}
tracker.Stop()
}
Output:
func (*Tracker) TrackBillingEventWithOptionalInput ¶
func (t *Tracker) TrackBillingEventWithOptionalInput( requiredInput BillingEventRequiredInput, optionalInput BillingEventOptionalInput, ) error
TrackBillingEventWithOptionalInput tracks a billing event with the billable_usage context.
Similar to TrackBillingEvent, but allows to pass also the optional event input fields.
Example ¶
ExampleTracker_TrackBillingEventWithOptionalInput demonstrates tracking a billing event with context options.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track a billing event with full context for billing reconciliation.
// All optional fields help identify the source and context of the billable usage.
err = tracker.TrackBillingEventWithOptionalInput(
snowplow.BillingEventRequiredInput{
Category: "CodeSuggestionsService",
EventType: "code_generations",
Realm: "SaaS",
UnitOfMeasure: "tokens",
Quantity: 500.0,
},
snowplow.BillingEventOptionalInput{
InstanceID: "instance-abc123",
ProjectID: 12345,
NamespaceID: 67890,
OrganizationID: 22222,
RootNamespaceID: 11111,
Subject: "user-456",
GlobalUserID: "global-user-789",
CorrelationID: "request-correlation-xyz",
DeploymentType: "self-managed",
EntityID: "entity-abc",
Metadata: map[string]any{
"model": "claude-3",
"language": "go",
},
},
)
if err != nil {
log.Printf("Failed to track billing event: %v", err)
}
tracker.Stop()
}
Output:
func (*Tracker) TrackEvent ¶
TrackEvent tracks a custom event with the given name and optional properties. It is equivalent to TrackEventWithContexts with zero contexts; see that method for the full error contract. To attach typed contexts, use TrackEventWithContexts directly.
Example ¶
ExampleTracker_TrackEvent demonstrates tracking a simple event without properties.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track a simple event with just a name (no properties).
// Pass nil for events without additional properties.
err = tracker.TrackEvent("user_login", nil)
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
Example (WithNestedProperties) ¶
ExampleTracker_TrackEvent_withNestedProperties demonstrates tracking events with nested properties.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track an event with nested properties (multi-level structure).
// Supports nested maps and complex data structures.
err = tracker.TrackEvent("page_view", map[string]any{
"title": "Product Details - Premium Widget",
"duration": 4500,
"user": map[string]any{
"id": 67890,
"username": "john_doe",
},
})
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
Example (WithProperties) ¶
ExampleTracker_TrackEvent_withProperties demonstrates tracking events with simple properties.
package main
import (
"log"
"gitlab.com/gitlab-org/labkit/v2/events/snowplow"
)
func main() {
emitter, err := snowplow.NewEmitter("http://localhost:9093/com.snowplowanalytics.snowplow/tp2")
if err != nil {
log.Fatalf("Failed to create emitter: %v", err)
}
tracker, err := snowplow.NewTracker("my-application", emitter)
if err != nil {
log.Fatalf("Failed to create tracker: %v", err)
}
// Track an event with simple properties (flat structure).
// Supports string, int, float, bool types.
err = tracker.TrackEvent("button_clicked", map[string]any{
"button_id": "submit_form",
"user_id": 12345,
})
if err != nil {
log.Printf("Failed to track event: %v", err)
}
tracker.Stop()
}
Output:
func (*Tracker) TrackEventWithContexts ¶ added in v2.18.0
func (t *Tracker) TrackEventWithContexts(eventName string, props map[string]any, contexts ...EventContext) error
TrackEventWithContexts tracks a custom event with typed contexts attached. Contexts are serialized into the Snowplow `cx` field so their fields become first-class warehouse columns rather than entries in the freeform props bag.
Each context must satisfy EventContext. If a context also implements Validate() error, the tracker calls it and aborts the event on a non-nil return. With zero contexts the call is equivalent to TrackEvent.
type UserID ¶ added in v2.18.0
type UserID interface {
// contains filtered or unexported methods
}
UserID is the type of GitLabStandardContext.UserID. The schema permits a string or an integer; use UserIDString or UserIDInt. The unexported isUserID marker seals the interface — only this package can declare variants.
type UserIDInt ¶ added in v2.18.0
type UserIDInt int64
UserIDInt is the integer variant of UserID. The underlying int64 covers the full schema range (0..2^31-1) on every platform.
type UserIDString ¶ added in v2.18.0
type UserIDString string
UserIDString is the string variant of UserID.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package metrics is a module providing observability through Prometheus Metrics for the snowplow events mechanism.
|
Package metrics is a module providing observability through Prometheus Metrics for the snowplow events mechanism. |
|
Package oidc provides an implementation of the snowplow.TokenSource interface, allowing snowplow.Emitter to authenticate with the collector endpoint using the OIDC flow.
|
Package oidc provides an implementation of the snowplow.TokenSource interface, allowing snowplow.Emitter to authenticate with the collector endpoint using the OIDC flow. |