events

package
v1.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2025 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregationResult

type AggregationResult struct {
	Results   []UsageResult         `json:"results,omitempty"`
	Value     decimal.Decimal       `json:"value,omitempty"`
	EventName string                `json:"event_name"`
	Type      types.AggregationType `json:"type"`
	Metadata  map[string]string     `json:"metadata,omitempty"`
	MeterID   string                `json:"meter_id"`
	PriceID   string                `json:"price_id"`
}

type Aggregator

type Aggregator interface {
	// GetQuery returns the query for this aggregation
	GetQuery(ctx context.Context, params *UsageParams) string

	// GetType returns the aggregation type
	GetType() types.AggregationType
}

type DetailedUsageAnalytic added in v1.0.17

type DetailedUsageAnalytic struct {
	FeatureID       string
	FeatureName     string
	EventName       string
	Source          string
	MeterID         string
	AggregationType types.AggregationType
	Unit            string
	UnitPlural      string
	TotalUsage      decimal.Decimal
	TotalCost       decimal.Decimal
	Currency        string
	EventCount      uint64 // Number of events that contributed to this aggregation
	Points          []UsageAnalyticPoint
}

DetailedUsageAnalytic represents detailed usage and cost data for analytics

type Event

type Event struct {
	// Unique identifier for the event
	ID string `json:"id" ch:"id" validate:"required"`

	// Tenant identifier
	TenantID string `json:"tenant_id" ch:"tenant_id" validate:"required"`

	// Environment identifier
	EnvironmentID string `json:"environment_id" ch:"environment_id"`

	// Event name is an identifier for the event and will be used for filtering and aggregation
	EventName string `json:"event_name" ch:"event_name" validate:"required"`

	// Additional properties
	Properties map[string]interface{} `json:"properties" ch:"properties"`

	// Source of the event
	Source string `json:"source" ch:"source"`

	// Timestamps
	Timestamp time.Time `json:"timestamp" ch:"timestamp,timezone('UTC')" validate:"required"`

	// IngestedAt is the time the event was ingested into the database and it is automatically set to the current time
	// at the clickhouse server level and is not required to be set by the caller
	IngestedAt time.Time `json:"ingested_at" ch:"ingested_at,timezone('UTC')"`

	// Subject identifiers - at least one is required
	// CustomerID is the identifier of the customer in Flexprice's system
	CustomerID string `json:"customer_id" ch:"customer_id"`

	// ExternalCustomerID is the identifier of the customer in the external system ex Customer DB or Stripe
	ExternalCustomerID string `json:"external_customer_id" ch:"external_customer_id"`
}

Event represents the base event structure

func NewEvent

func NewEvent(
	eventName, tenantID, externalCustomerID string,
	properties map[string]interface{},
	timestamp time.Time,
	eventID, customerID, source string,
	environmentID string,
) *Event

NewEvent creates a new event with defaults

func (*Event) ToProcessedEvent added in v1.0.17

func (e *Event) ToProcessedEvent() *ProcessedEvent

ToProcessedEvent creates a new ProcessedEvent from this Event with pending status

func (*Event) Validate

func (e *Event) Validate() error

Validate validates the event

type EventIterator

type EventIterator struct {
	Timestamp time.Time
	ID        string
}

type FilterGroup

type FilterGroup struct {
	// ID is the identifier for the filter group. We are using the price ID
	// as the unique identifier for the filter group as of now
	ID string `json:"id"`

	// Priority is the priority of the filter group for deduping events matching multiple filter groups
	Priority int `json:"priority"`

	// Filters are the actual filters where the key is the $properties.key
	// and the values are all the predefined filter values
	Filters map[string][]string `json:"filters"`
}

FilterGroup represents a group of filters with priority

type FindUnprocessedEventsParams added in v1.0.17

type FindUnprocessedEventsParams struct {
	ExternalCustomerID string    // Optional filter by external customer ID
	EventName          string    // Optional filter by event name
	StartTime          time.Time // Optional filter by start time
	EndTime            time.Time // Optional filter by end time
	BatchSize          int       // Number of events to return per batch
	LastID             string    // Last event ID for keyset pagination (more efficient than offset)
	LastTimestamp      time.Time // Last event timestamp for keyset pagination
}

FindUnprocessedEventsParams contains parameters for finding events that haven't been processed

type GetEventsParams

type GetEventsParams struct {
	ExternalCustomerID string              `json:"external_customer_id"`
	EventName          string              `json:"event_name" validate:"required"`
	EventID            string              `json:"event_id"`
	StartTime          time.Time           `json:"start_time" validate:"required"`
	EndTime            time.Time           `json:"end_time" validate:"required"`
	IterFirst          *EventIterator      `json:"iter_first"`
	IterLast           *EventIterator      `json:"iter_last"`
	PageSize           int                 `json:"page_size"`
	PropertyFilters    map[string][]string `json:"property_filters,omitempty"`
	Offset             int                 `json:"offset"`
	Source             string              `json:"source"`
	Sort               *string             `json:"sort"`
	Order              *string             `json:"order"`
	CountTotal         bool                `json:"count_total"`
}

type GetProcessedEventsParams added in v1.0.17

type GetProcessedEventsParams struct {
	StartTime      time.Time `json:"start_time" validate:"required"`
	EndTime        time.Time `json:"end_time" validate:"required"`
	CustomerID     string    `json:"customer_id"`
	SubscriptionID string    `json:"subscription_id"`
	MeterID        string    `json:"meter_id"`
	FeatureID      string    `json:"feature_id"`
	PriceID        string    `json:"price_id"`
	Offset         int       `json:"offset"`
	Limit          int       `json:"limit"`
	CountTotal     bool      `json:"count_total"`
}

GetProcessedEventsParams defines parameters for querying processed events

type PeriodFeatureTotal added in v1.0.17

type PeriodFeatureTotal struct {
	FeatureID string          `json:"feature_id"`
	Quantity  decimal.Decimal `json:"quantity"`
	FreeUnits decimal.Decimal `json:"free_units"`
	Cost      decimal.Decimal `json:"cost"`
}

PeriodFeatureTotal represents aggregated usage for a feature in a period

type ProcessedEvent added in v1.0.17

type ProcessedEvent struct {
	// Original event fields
	Event
	// Processing fields
	SubscriptionID string `json:"subscription_id" ch:"subscription_id"`
	SubLineItemID  string `json:"sub_line_item_id" ch:"sub_line_item_id"`
	PriceID        string `json:"price_id" ch:"price_id"`
	FeatureID      string `json:"feature_id" ch:"feature_id"`
	MeterID        string `json:"meter_id" ch:"meter_id"`
	PeriodID       uint64 `json:"period_id" ch:"period_id"`
	Currency       string `json:"currency" ch:"currency"`

	// Deduplication and metrics
	UniqueHash     string          `json:"unique_hash" ch:"unique_hash"`
	QtyTotal       decimal.Decimal `json:"qty_total" ch:"qty_total"`
	QtyBillable    decimal.Decimal `json:"qty_billable" ch:"qty_billable"`
	QtyFreeApplied decimal.Decimal `json:"qty_free_applied" ch:"qty_free_applied"`
	TierSnapshot   decimal.Decimal `json:"tier_snapshot" ch:"tier_snapshot"`
	UnitCost       decimal.Decimal `json:"unit_cost" ch:"unit_cost"`
	Cost           decimal.Decimal `json:"cost" ch:"cost"`

	// Audit fields
	Version uint64 `json:"version" ch:"version"`
	Sign    int8   `json:"sign" ch:"sign"`

	// Processing metadata
	ProcessedAt time.Time `json:"processed_at" ch:"processed_at,timezone('UTC')"`
	FinalLagMs  uint32    `json:"final_lag_ms" ch:"final_lag_ms"`
}

ProcessedEvent represents an event that has been processed for billing

type ProcessedEventRepository added in v1.0.17

type ProcessedEventRepository interface {
	// Inserts a single processed event into events_processed table
	InsertProcessedEvent(ctx context.Context, event *ProcessedEvent) error

	// Bulk insert events into events_processed table
	BulkInsertProcessedEvents(ctx context.Context, events []*ProcessedEvent) error

	// Get processed events with filtering options
	GetProcessedEvents(ctx context.Context, params *GetProcessedEventsParams) ([]*ProcessedEvent, uint64, error)

	// Check for duplicate event using unique_hash
	IsDuplicate(ctx context.Context, subscriptionID, meterID string, periodID uint64, uniqueHash string) (bool, error)

	// Get free and billable quantity to date for a subscription line item
	GetLineItemUsage(ctx context.Context, subLineItemID string, periodID uint64) (qty decimal.Decimal, freeUnits decimal.Decimal, err error)

	// Get usage cost to date for a customer's subscription in a billing period
	GetPeriodCost(ctx context.Context, tenantID, environmentID, customerID, subscriptionID string, periodID uint64) (decimal.Decimal, error)

	// Get usage totals per feature for invoicing
	GetPeriodFeatureTotals(ctx context.Context, tenantID, environmentID, customerID, subscriptionID string, periodID uint64) ([]*PeriodFeatureTotal, error)

	// Get usage analytics for recent events
	GetUsageAnalytics(ctx context.Context, tenantID, environmentID, customerID string, lookbackHours int) ([]*UsageAnalytic, error)

	// GetDetailedUsageAnalytics provides comprehensive usage analytics with filtering, grouping, and time-series data
	GetDetailedUsageAnalytics(ctx context.Context, params *UsageAnalyticsParams) ([]*DetailedUsageAnalytic, error)
}

ProcessedEventRepository defines operations for processed events

type Repository

type Repository interface {
	InsertEvent(ctx context.Context, event *Event) error
	BulkInsertEvents(ctx context.Context, events []*Event) error
	GetUsage(ctx context.Context, params *UsageParams) (*AggregationResult, error)
	GetUsageWithFilters(ctx context.Context, params *UsageWithFiltersParams) ([]*AggregationResult, error)
	GetEvents(ctx context.Context, params *GetEventsParams) ([]*Event, uint64, error)
	FindUnprocessedEvents(ctx context.Context, params *FindUnprocessedEventsParams) ([]*Event, error)
}

type ReprocessEventsParams added in v1.0.17

type ReprocessEventsParams struct {
	ExternalCustomerID string    // Filter by external customer ID (optional)
	EventName          string    // Filter by event name (optional)
	StartTime          time.Time // Filter by start time (optional)
	EndTime            time.Time // Filter by end time (optional)
	BatchSize          int       // Number of events to process per batch (default 100)
}

ReprocessEventsParams contains parameters for event reprocessing

type UsageAnalytic added in v1.0.17

type UsageAnalytic struct {
	Source    string          `json:"source"`
	FeatureID string          `json:"feature_id"`
	Cost      decimal.Decimal `json:"cost"`
	Usage     decimal.Decimal `json:"usage"`
}

UsageAnalytic represents usage analytics data grouped by source and feature

type UsageAnalyticPoint added in v1.0.17

type UsageAnalyticPoint struct {
	Timestamp  time.Time
	Usage      decimal.Decimal
	Cost       decimal.Decimal
	EventCount uint64 // Number of events in this time window
}

UsageAnalyticPoint represents a data point in a time series

type UsageAnalyticsParams added in v1.0.17

type UsageAnalyticsParams struct {
	TenantID           string
	EnvironmentID      string
	CustomerID         string
	ExternalCustomerID string
	FeatureIDs         []string
	Sources            []string
	StartTime          time.Time
	EndTime            time.Time
	GroupBy            []string // Allowed values: "source", "feature_id"
	WindowSize         types.WindowSize
	PropertyFilters    map[string][]string
}

UsageAnalyticsParams defines parameters for detailed usage analytics queries

type UsageParams

type UsageParams struct {
	ExternalCustomerID string                `json:"external_customer_id"`
	CustomerID         string                `json:"customer_id"`
	EventName          string                `json:"event_name" validate:"required"`
	PropertyName       string                `json:"property_name" validate:"required"`
	AggregationType    types.AggregationType `json:"aggregation_type" validate:"required"`
	WindowSize         types.WindowSize      `json:"window_size"`
	BucketSize         types.WindowSize      `json:"bucket_size,omitempty"` // For windowed MAX aggregation
	StartTime          time.Time             `json:"start_time" validate:"required"`
	EndTime            time.Time             `json:"end_time" validate:"required"`
	Filters            map[string][]string   `json:"filters"`
	Multiplier         *decimal.Decimal      `json:"multiplier,omitempty" validate:"omitempty,gt=0"`
}

type UsageResult

type UsageResult struct {
	WindowSize time.Time       `json:"window_size"`
	Value      decimal.Decimal `json:"value"`
}

type UsageSummaryParams added in v1.0.17

type UsageSummaryParams struct {
	StartTime      time.Time `json:"start_time" validate:"required"`
	EndTime        time.Time `json:"end_time" validate:"required"`
	CustomerID     string    `json:"customer_id"`
	SubscriptionID string    `json:"subscription_id"`
	MeterID        string    `json:"meter_id"`
	PriceID        string    `json:"price_id"`
	FeatureID      string    `json:"feature_id"`
}

UsageSummaryParams defines parameters for querying pre-computed usage

type UsageWithFiltersParams

type UsageWithFiltersParams struct {
	*UsageParams
	FilterGroups []FilterGroup // Ordered list of filter groups, from most specific to least specific
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL