repository

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigureDB

func ConfigureDB(db *sql.DB)

ConfigureDB configures database connection pool settings.

Types

type GoalRepository

type GoalRepository interface {
	// GetProgress retrieves a single user's progress for a specific goal.
	// Returns nil if no progress record exists (lazy initialization).
	GetProgress(ctx context.Context, userID, goalID string) (*domain.UserGoalProgress, error)

	// GetUserProgress retrieves all goal progress records for a specific user.
	// Returns empty slice if user has no progress records.
	// M3 Phase 4: activeOnly parameter filters to only is_active = true goals.
	GetUserProgress(ctx context.Context, userID string, activeOnly bool) ([]*domain.UserGoalProgress, error)

	// GetChallengeProgress retrieves all goal progress for a user within a specific challenge.
	// Returns empty slice if user has no progress for this challenge.
	// M3 Phase 4: activeOnly parameter filters to only is_active = true goals.
	GetChallengeProgress(ctx context.Context, userID, challengeID string, activeOnly bool) ([]*domain.UserGoalProgress, error)

	// UpsertProgress creates or updates a single goal progress record.
	// Uses INSERT ... ON CONFLICT (user_id, goal_id) DO UPDATE.
	// Does NOT update if status is 'claimed' (protection against overwrites).
	UpsertProgress(ctx context.Context, progress *domain.UserGoalProgress) error

	// BatchUpsertProgress performs batch upsert for multiple progress records in a single query.
	// This is the key optimization for the buffered event processing (1,000,000x query reduction).
	// Does NOT update records where status is 'claimed'.
	//
	// DEPRECATED: Use BatchUpsertProgressWithCOPY for better performance (5-10x faster).
	// This method is kept for backwards compatibility and testing.
	//
	// USAGE: Use this for absolute goal types where you have the complete progress value.
	// For increment goals, use BatchIncrementProgress instead.
	BatchUpsertProgress(ctx context.Context, updates []*domain.UserGoalProgress) error

	// BatchUpsertProgressWithCOPY performs batch upsert using PostgreSQL COPY protocol.
	// This is 5-10x faster than BatchUpsertProgress (10-20ms vs 62-105ms for 1,000 records).
	// Does NOT update records where status is 'claimed'.
	//
	// USAGE: Use this for production workloads requiring high throughput (500+ EPS).
	// This method solves the Phase 1 database bottleneck by reducing flush time from
	// 62-105ms to 10-20ms, allowing the system to handle 500+ EPS with <1% data loss.
	BatchUpsertProgressWithCOPY(ctx context.Context, updates []*domain.UserGoalProgress) error

	// IncrementProgress atomically increments a user's progress by a delta value.
	// This is used for increment and daily goal types where progress accumulates.
	//
	// For regular increment goals (isDailyIncrement=false):
	//   - Atomically adds delta to current progress (handles concurrency safely)
	//   - Multiple concurrent increments won't lose updates
	//   - Example: progress=5, delta=3 → progress=8
	//
	// For daily increment goals (isDailyIncrement=true):
	//   - Only increments once per day (uses updated_at date from DB)
	//   - If updated_at is today, this is a no-op (progress unchanged)
	//   - If updated_at is before today, increments by delta and updates timestamp
	//   - Example: Day 1 progress=3 → increment(1) → progress=4
	//              Same day → increment(1) → progress=4 (no change)
	//              Next day → increment(1) → progress=5
	//
	// The targetValue parameter is used for status determination:
	//   - If progress >= targetValue, status becomes 'completed'
	//   - Sets completed_at timestamp when threshold reached
	//
	// USAGE: Use this for single increment operations during event processing.
	// For batch operations (flush), use BatchIncrementProgress instead for better performance.
	//
	// Does NOT update if status is 'claimed'.
	IncrementProgress(ctx context.Context, userID, goalID, challengeID, namespace string,
		delta, targetValue int, isDailyIncrement bool) error

	// BatchIncrementProgress performs batch atomic increment for multiple progress records.
	// This is the key optimization for buffered increment event processing (50x better than individual calls).
	//
	// For regular increment goals (IsDailyIncrement=false):
	//   - Atomically adds delta to current progress for each record
	//   - Single SQL query using UNNEST for all increments
	//
	// For daily increment goals (IsDailyIncrement=true):
	//   - Only increments if updated_at date is before today
	//   - Uses DATE(updated_at AT TIME ZONE 'UTC') for timezone-safe comparison
	//   - Updates updated_at timestamp after increment
	//
	// USAGE: Use this during periodic flush to batch all accumulated increments.
	// This reduces 1,000 individual queries to 1 single batch query (50x performance gain).
	//
	// Performance: 1,000 increments in ~20ms (vs 1,000ms for individual calls)
	//
	// Does NOT update if status is 'claimed'.
	BatchIncrementProgress(ctx context.Context, increments []ProgressIncrement) error

	// MarkAsClaimed updates a goal's status to 'claimed' and sets claimed_at timestamp.
	// Used after successfully granting rewards via AGS Platform Service.
	// Returns error if goal is not in 'completed' status or already claimed.
	MarkAsClaimed(ctx context.Context, userID, goalID string) error

	// BeginTx starts a database transaction and returns a transactional repository.
	// Used for claim flow to ensure atomicity (check status + mark claimed + verify).
	BeginTx(ctx context.Context) (TxRepository, error)

	// GetGoalsByIDs retrieves goal progress records for a user across multiple goal IDs.
	// Returns empty slice if none of the goals have progress records.
	// Used by initialization endpoint to check which default goals already exist.
	GetGoalsByIDs(ctx context.Context, userID string, goalIDs []string) ([]*domain.UserGoalProgress, error)

	// BulkInsert creates multiple goal progress records in a single query.
	// Uses INSERT ... ON CONFLICT DO NOTHING for idempotency.
	// Used by initialization endpoint to create default goal assignments.
	BulkInsert(ctx context.Context, progresses []*domain.UserGoalProgress) error

	// UpsertGoalActive creates or updates a goal's is_active status.
	// If row doesn't exist, creates it with is_active and assigned_at fields.
	// If row exists, updates is_active and assigned_at (only when activating).
	// Used by manual activation/deactivation endpoint.
	UpsertGoalActive(ctx context.Context, progress *domain.UserGoalProgress) error

	// GetUserGoalCount returns the total number of goals for a user (active + inactive).
	// Used by initialization endpoint's fast path to quickly check if user is initialized.
	// If count > 0, user has been initialized → use GetActiveGoals() instead of full init.
	// Performance: < 1ms using idx_user_goal_count index.
	GetUserGoalCount(ctx context.Context, userID string) (int, error)

	// GetActiveGoals retrieves only active goal progress records for a user.
	// Returns empty slice if user has no active goals.
	// Used by initialization endpoint's fast path to avoid querying all 500 goal IDs.
	// Performance: < 5ms using idx_user_goal_active_only partial index.
	GetActiveGoals(ctx context.Context, userID string) ([]*domain.UserGoalProgress, error)
}

GoalRepository defines the interface for managing user goal progress in the database. This interface abstracts database operations to allow for testing and different implementations.

type PostgresGoalRepository

type PostgresGoalRepository struct {
	// contains filtered or unexported fields
}

PostgresGoalRepository implements GoalRepository interface using PostgreSQL.

func NewPostgresGoalRepository

func NewPostgresGoalRepository(db *sql.DB) *PostgresGoalRepository

NewPostgresGoalRepository creates a new PostgreSQL-backed goal repository.

func (*PostgresGoalRepository) BatchIncrementProgress

func (r *PostgresGoalRepository) BatchIncrementProgress(ctx context.Context, increments []ProgressIncrement) error

BatchIncrementProgress performs batch atomic increment for multiple progress records. Uses PostgreSQL UNNEST for efficient batch processing (50x faster than individual calls).

func (*PostgresGoalRepository) BatchUpsertProgress

func (r *PostgresGoalRepository) BatchUpsertProgress(ctx context.Context, updates []*domain.UserGoalProgress) error

BatchUpsertProgress performs batch upsert for multiple progress records in a single query. This is the key optimization for buffered event processing (1,000,000x query reduction).

DEPRECATED: Use BatchUpsertProgressWithCOPY for better performance (5-10x faster). This method is kept for backwards compatibility and testing.

func (*PostgresGoalRepository) BatchUpsertProgressWithCOPY added in v0.2.0

func (r *PostgresGoalRepository) BatchUpsertProgressWithCOPY(ctx context.Context, updates []*domain.UserGoalProgress) error

BatchUpsertProgressWithCOPY performs batch upsert using PostgreSQL COPY protocol. This is 5-10x faster than BatchUpsertProgress (10-20ms vs 62-105ms for 1,000 records).

Implementation: 1. Creates temporary table (session-local, auto-dropped) 2. Uses COPY FROM STDIN to bulk load data (bypasses query parser) 3. Merges temp table into main table using INSERT ... SELECT with ON CONFLICT 4. Maintains claimed protection logic (does not update claimed goals)

This method solves the Phase 1 database bottleneck by reducing flush time from 62-105ms to 10-20ms, allowing the system to handle 500+ EPS with <1% data loss.

func (*PostgresGoalRepository) BeginTx

BeginTx starts a database transaction and returns a transactional repository.

func (*PostgresGoalRepository) BulkInsert added in v0.3.0

func (r *PostgresGoalRepository) BulkInsert(ctx context.Context, progresses []*domain.UserGoalProgress) error

BulkInsert creates multiple goal progress records in a single query.

func (*PostgresGoalRepository) GetActiveGoals added in v0.9.0

func (r *PostgresGoalRepository) GetActiveGoals(ctx context.Context, userID string) ([]*domain.UserGoalProgress, error)

GetActiveGoals retrieves only active goal progress records for a user.

func (*PostgresGoalRepository) GetChallengeProgress

func (r *PostgresGoalRepository) GetChallengeProgress(ctx context.Context, userID, challengeID string, activeOnly bool) ([]*domain.UserGoalProgress, error)

GetChallengeProgress retrieves all goal progress for a user within a specific challenge. M3 Phase 4: activeOnly parameter filters to only is_active = true goals.

func (*PostgresGoalRepository) GetGoalsByIDs added in v0.3.0

func (r *PostgresGoalRepository) GetGoalsByIDs(ctx context.Context, userID string, goalIDs []string) ([]*domain.UserGoalProgress, error)

GetGoalsByIDs retrieves goal progress records for a user across multiple goal IDs.

func (*PostgresGoalRepository) GetProgress

func (r *PostgresGoalRepository) GetProgress(ctx context.Context, userID, goalID string) (*domain.UserGoalProgress, error)

GetProgress retrieves a single user's progress for a specific goal.

func (*PostgresGoalRepository) GetUserGoalCount added in v0.9.0

func (r *PostgresGoalRepository) GetUserGoalCount(ctx context.Context, userID string) (int, error)

GetUserGoalCount returns the total number of goals for a user (active + inactive).

func (*PostgresGoalRepository) GetUserProgress

func (r *PostgresGoalRepository) GetUserProgress(ctx context.Context, userID string, activeOnly bool) ([]*domain.UserGoalProgress, error)

GetUserProgress retrieves all goal progress records for a specific user. M3 Phase 4: activeOnly parameter filters to only is_active = true goals.

func (*PostgresGoalRepository) IncrementProgress

func (r *PostgresGoalRepository) IncrementProgress(ctx context.Context, userID, goalID, challengeID, namespace string, delta, targetValue int, isDailyIncrement bool) error

IncrementProgress atomically increments a user's progress by a delta value.

func (*PostgresGoalRepository) MarkAsClaimed

func (r *PostgresGoalRepository) MarkAsClaimed(ctx context.Context, userID, goalID string) error

MarkAsClaimed updates a goal's status to 'claimed' and sets claimed_at timestamp.

func (*PostgresGoalRepository) UpsertGoalActive added in v0.3.0

func (r *PostgresGoalRepository) UpsertGoalActive(ctx context.Context, progress *domain.UserGoalProgress) error

UpsertGoalActive creates or updates a goal's is_active status.

func (*PostgresGoalRepository) UpsertProgress

func (r *PostgresGoalRepository) UpsertProgress(ctx context.Context, progress *domain.UserGoalProgress) error

UpsertProgress creates or updates a single goal progress record.

type PostgresTxRepository

type PostgresTxRepository struct {
	// contains filtered or unexported fields
}

PostgresTxRepository implements TxRepository interface for transactional operations.

func (*PostgresTxRepository) BatchIncrementProgress

func (r *PostgresTxRepository) BatchIncrementProgress(ctx context.Context, increments []ProgressIncrement) error

BatchIncrementProgress performs batch atomic increment within a transaction.

func (*PostgresTxRepository) BatchUpsertProgress

func (r *PostgresTxRepository) BatchUpsertProgress(ctx context.Context, updates []*domain.UserGoalProgress) error

BatchUpsertProgress batch upserts within a transaction. DEPRECATED: Use BatchUpsertProgressWithCOPY for better performance.

func (*PostgresTxRepository) BatchUpsertProgressWithCOPY added in v0.2.0

func (r *PostgresTxRepository) BatchUpsertProgressWithCOPY(ctx context.Context, updates []*domain.UserGoalProgress) error

BatchUpsertProgressWithCOPY performs batch upsert using COPY protocol within a transaction. This is 5-10x faster than BatchUpsertProgress.

func (*PostgresTxRepository) BeginTx

BeginTx is not supported within a transaction.

func (*PostgresTxRepository) BulkInsert added in v0.3.0

func (r *PostgresTxRepository) BulkInsert(ctx context.Context, progresses []*domain.UserGoalProgress) error

BulkInsert creates multiple goal progress records within a transaction.

func (*PostgresTxRepository) Commit

func (r *PostgresTxRepository) Commit() error

Commit commits the transaction.

func (*PostgresTxRepository) GetActiveGoals added in v0.9.0

func (r *PostgresTxRepository) GetActiveGoals(ctx context.Context, userID string) ([]*domain.UserGoalProgress, error)

GetActiveGoals retrieves only active goal progress records for a user within a transaction.

func (*PostgresTxRepository) GetChallengeProgress

func (r *PostgresTxRepository) GetChallengeProgress(ctx context.Context, userID, challengeID string, activeOnly bool) ([]*domain.UserGoalProgress, error)

GetChallengeProgress retrieves challenge progress within a transaction. M3 Phase 4: activeOnly parameter filters to only is_active = true goals.

func (*PostgresTxRepository) GetGoalsByIDs added in v0.3.0

func (r *PostgresTxRepository) GetGoalsByIDs(ctx context.Context, userID string, goalIDs []string) ([]*domain.UserGoalProgress, error)

GetGoalsByIDs retrieves goal progress records within a transaction.

func (*PostgresTxRepository) GetProgress

func (r *PostgresTxRepository) GetProgress(ctx context.Context, userID, goalID string) (*domain.UserGoalProgress, error)

GetProgress retrieves progress within a transaction.

func (*PostgresTxRepository) GetProgressForUpdate

func (r *PostgresTxRepository) GetProgressForUpdate(ctx context.Context, userID, goalID string) (*domain.UserGoalProgress, error)

GetProgressForUpdate retrieves progress with SELECT ... FOR UPDATE (row-level lock).

func (*PostgresTxRepository) GetUserGoalCount added in v0.9.0

func (r *PostgresTxRepository) GetUserGoalCount(ctx context.Context, userID string) (int, error)

GetUserGoalCount returns the total number of goals for a user (active + inactive) within a transaction.

func (*PostgresTxRepository) GetUserProgress

func (r *PostgresTxRepository) GetUserProgress(ctx context.Context, userID string, activeOnly bool) ([]*domain.UserGoalProgress, error)

GetUserProgress retrieves all user progress within a transaction. M3 Phase 4: activeOnly parameter filters to only is_active = true goals.

func (*PostgresTxRepository) IncrementProgress

func (r *PostgresTxRepository) IncrementProgress(ctx context.Context, userID, goalID, challengeID, namespace string, delta, targetValue int, isDailyIncrement bool) error

IncrementProgress atomically increments progress within a transaction.

func (*PostgresTxRepository) MarkAsClaimed

func (r *PostgresTxRepository) MarkAsClaimed(ctx context.Context, userID, goalID string) error

MarkAsClaimed marks a goal as claimed within a transaction.

func (*PostgresTxRepository) Rollback

func (r *PostgresTxRepository) Rollback() error

Rollback rolls back the transaction.

func (*PostgresTxRepository) UpsertGoalActive added in v0.3.0

func (r *PostgresTxRepository) UpsertGoalActive(ctx context.Context, progress *domain.UserGoalProgress) error

UpsertGoalActive creates or updates a goal's is_active status within a transaction.

func (*PostgresTxRepository) UpsertProgress

func (r *PostgresTxRepository) UpsertProgress(ctx context.Context, progress *domain.UserGoalProgress) error

UpsertProgress upserts progress within a transaction.

type ProgressIncrement

type ProgressIncrement struct {
	UserID           string // User ID
	GoalID           string // Goal ID
	ChallengeID      string // Challenge ID
	Namespace        string // Namespace
	Delta            int    // Amount to increment progress by
	TargetValue      int    // Target value for completion check
	IsDailyIncrement bool   // If true, only increments once per day (based on updated_at date)
}

ProgressIncrement represents a single atomic increment operation for batch processing. Used by BatchIncrementProgress to perform multiple increments in a single query.

type TxRepository

type TxRepository interface {
	GoalRepository

	// GetProgressForUpdate retrieves progress with SELECT ... FOR UPDATE (row-level lock).
	// This prevents concurrent claim attempts for the same goal.
	GetProgressForUpdate(ctx context.Context, userID, goalID string) (*domain.UserGoalProgress, error)

	// Commit commits the transaction.
	Commit() error

	// Rollback rolls back the transaction.
	Rollback() error
}

TxRepository represents a transactional repository that supports commit/rollback. This ensures the claim flow is atomic (prevents double claims via row-level locking).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL