db

package
v0.20.34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package db provides database abstraction for the Broadside load tester.

This package defines the Database interface which encapsulates all operations required for load testing Lookout's database backends. It supports multiple database implementations (PostgreSQL, ClickHouse, and in-memory) behind a common interface, enabling apples-to-apples performance comparisons.

Interface

The Database interface provides methods for:

  • Schema initialisation (InitialiseSchema)
  • Batch ingestion query execution (ExecuteIngestionQueryBatch)
  • Historical job population (PopulateHistoricalJobs)
  • Individual record retrieval (GetJobRunDebugMessage, GetJobRunError, GetJobSpec)
  • Multiple record retrieval (GetJobs, GetJobGroups)
  • Resource cleanup (TearDown, Close)

Historical Job Population

PopulateHistoricalJobs inserts a batch of pre-completed historical jobs for a single (queue, job set) pair. It accepts a HistoricalJobsParams struct that specifies the queue and job set identifiers, the number of jobs to create, and threshold values used to divide jobs into terminal states (succeeded, errored, cancelled, preempted) via modular arithmetic on the job number. Implementations may perform the insertion using server-side SQL generation (e.g. PostgreSQL INSERT...SELECT FROM generate_series) to avoid round-trip overhead, or using in-memory Go loops for the MemoryDatabase.

Ingestion Queries

Batch operations are performed via ExecuteIngestionQueryBatch, which accepts a slice of IngestionQuery values. IngestionQuery is an interface implemented by all ingestion query types.

The interface uses a private method to ensure type safety at compile time. Only the defined query types can be used, preventing runtime errors from incorrect types. Implementations use a type switch to handle each query type:

switch q := query.(type) {
case db.InsertJob:
    // handle insert job
case db.SetJobCancelled:
    // handle job cancellation
// ... etc
}

The JobIDFromQuery function extracts the job ID from any IngestionQuery type, which is useful for routing queries to workers based on job ID (e.g. for parallel batch execution whilst maintaining per-job ordering).

Supported ingestion query types:

  • InsertJob: Insert a new job record
  • InsertJobSpec: Insert job specification
  • UpdateJobPriority: Update job priority
  • SetJobCancelled: Mark job as cancelled
  • SetJobSucceeded: Mark job as succeeded
  • InsertJobError: Insert job error
  • SetJobPreempted: Mark job as preempted
  • SetJobRejected: Mark job as rejected
  • SetJobErrored: Mark job as errored
  • SetJobRunning: Mark job as running
  • SetJobRunStarted: Mark job run as started
  • SetJobPending: Mark job as pending
  • SetJobRunPending: Mark job run as pending
  • SetJobRunCancelled: Mark job run as cancelled
  • SetJobRunFailed: Mark job run as failed
  • SetJobRunSucceeded: Mark job run as succeeded
  • SetJobRunPreempted: Mark job run as preempted
  • SetJobLeased: Mark job as leased
  • InsertJobRun: Insert a new job run record

Implementations

Three implementations are provided:

  • PostgresDatabase: PostgreSQL adapter (placeholder implementation)
  • ClickHouseDatabase: ClickHouse adapter (placeholder implementation)
  • MemoryDatabase: In-memory adapter for smoke-testing Broadside

The MemoryDatabase implementation stores all data in Go maps protected by a read-write mutex. It mirrors the schema structure used in PostgreSQL (jobs, job runs, annotations, job errors, job specs) and is intended for verifying logical correctness before running load tests against real databases.

Query Methods

GetJobs retrieves jobs matching the provided filters, sorted by the specified order, with pagination via skip and take parameters. It supports filtering on all job fields (jobId, queue, jobSet, owner, namespace, state, cpu, memory, ephemeralStorage, gpu, priority, submitted, lastTransitionTime, priorityClass) as well as job run fields (cluster, node) and annotations. Filter match types include exact, anyOf, startsWith, contains, and numeric comparisons (greaterThan, lessThan, greaterThanOrEqualTo, lessThanOrEqualTo).

When the activeJobSets parameter is true, only jobs belonging to "active" job sets are returned. A job set is considered active if it contains at least one job in a non-terminal state (QUEUED, LEASED, PENDING, or RUNNING). This mirrors the behaviour of the Lookout PostgreSQL implementation, which performs an inner join with a subquery selecting distinct (queue, jobSet) pairs that have active jobs.

GetJobGroups aggregates jobs by a specified field (queue, jobSet, owner, namespace, state, cluster, node, or an annotation key) and computes aggregate values. The supported aggregates are:

  • state: returns a map of state names to job counts
  • submitted: returns the earliest submission time in the group
  • lastTransitionTime: returns the average last transition time in the group

Usage

Create a database instance using the appropriate constructor:

// For PostgreSQL
db := db.NewPostgresDatabase(map[string]string{
    "host":     "localhost",
    "port":     "5432",
    "database": "lookout",
})

// For ClickHouse
db := db.NewClickHouseDatabase(map[string]string{
    "host":     "localhost",
    "port":     "9000",
    "database": "lookout",
})

// For in-memory (smoke testing)
db := db.NewMemoryDatabase()

// Initialise schema
if err := db.InitialiseSchema(ctx); err != nil {
    return err
}

// Execute batch of ingestion queries
queries := []db.IngestionQuery{
    db.InsertJob{Job: &db.NewJob{...}},
}
if err := db.ExecuteIngestionQueryBatch(ctx, queries); err != nil {
    return err
}

// Clean up
defer db.Close()

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func JobIDFromQuery

func JobIDFromQuery(q IngestionQuery) string

JobIDFromQuery extracts the job ID from any IngestionQuery. For run-based queries, it extracts the job ID prefix from the run ID.

Types

type ClickHouseDatabase

type ClickHouseDatabase struct {
	// contains filtered or unexported fields
}

func NewClickHouseDatabase

func NewClickHouseDatabase(config map[string]string) *ClickHouseDatabase

func (*ClickHouseDatabase) Close

func (c *ClickHouseDatabase) Close()

func (*ClickHouseDatabase) ExecuteIngestionQueryBatch

func (c *ClickHouseDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error

func (*ClickHouseDatabase) GetJobGroups

func (c *ClickHouseDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)

func (*ClickHouseDatabase) GetJobRunDebugMessage

func (c *ClickHouseDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)

func (*ClickHouseDatabase) GetJobRunError

func (c *ClickHouseDatabase) GetJobRunError(ctx context.Context, jobRunID string) (string, error)

func (*ClickHouseDatabase) GetJobSpec

func (c *ClickHouseDatabase) GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)

func (*ClickHouseDatabase) GetJobs

func (c *ClickHouseDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)

func (*ClickHouseDatabase) InitialiseSchema

func (c *ClickHouseDatabase) InitialiseSchema(ctx context.Context) error

func (*ClickHouseDatabase) PopulateHistoricalJobs added in v0.20.34

func (c *ClickHouseDatabase) PopulateHistoricalJobs(_ context.Context, _ HistoricalJobsParams) error

func (*ClickHouseDatabase) TearDown

func (c *ClickHouseDatabase) TearDown(ctx context.Context) error

type Database

type Database interface {
	InitialiseSchema(ctx context.Context) error
	ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
	GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
	GetJobRunError(ctx context.Context, jobRunID string) (string, error)
	GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
	GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)
	GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)
	PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error
	TearDown(ctx context.Context) error
	Close()
}

type HistoricalJobsParams added in v0.20.34

type HistoricalJobsParams struct {
	QueueIdx           int
	JobSetIdx          int
	QueueName          string
	JobSetName         string
	NJobs              int
	ChunkSize          int
	SucceededThreshold int
	ErroredThreshold   int
	CancelledThreshold int
	JobSpecBytes       []byte
	ErrorBytes         []byte
	DebugBytes         []byte
	PreemptionBytes    []byte
}

HistoricalJobsParams describes a batch of terminal historical jobs to insert.

The threshold fields use a scale of 1000: a job with index i is assigned a state based on i%1000. Jobs where i%1000 < SucceededThreshold are succeeded, i%1000 < ErroredThreshold are errored, i%1000 < CancelledThreshold are cancelled, and the remainder are preempted. Derive them as:

SucceededThreshold = int(ProportionSucceeded * 1000)
ErroredThreshold   = SucceededThreshold + int(ProportionErrored * 1000)
CancelledThreshold = ErroredThreshold   + int(ProportionCancelled * 1000)

type IngestionQuery

type IngestionQuery interface {
	// contains filtered or unexported methods
}

type InsertJob

type InsertJob struct {
	Job *NewJob
}

type InsertJobError

type InsertJobError struct {
	JobID string
	Error []byte
}

type InsertJobRun

type InsertJobRun struct {
	JobRunID string
	JobID    string
	Cluster  string
	Node     string
	Pool     string
	Time     time.Time
}

type InsertJobSpec

type InsertJobSpec struct {
	JobID   string
	JobSpec string
}

type MemoryDatabase

type MemoryDatabase struct {
	// contains filtered or unexported fields
}

func NewMemoryDatabase

func NewMemoryDatabase() *MemoryDatabase

func (*MemoryDatabase) Close

func (m *MemoryDatabase) Close()

func (*MemoryDatabase) ExecuteIngestionQueryBatch

func (m *MemoryDatabase) ExecuteIngestionQueryBatch(_ context.Context, queries []IngestionQuery) error

func (*MemoryDatabase) GetJobGroups

func (m *MemoryDatabase) GetJobGroups(_ *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)

func (*MemoryDatabase) GetJobRunDebugMessage

func (m *MemoryDatabase) GetJobRunDebugMessage(_ context.Context, jobRunID string) (string, error)

func (*MemoryDatabase) GetJobRunError

func (m *MemoryDatabase) GetJobRunError(_ context.Context, jobRunID string) (string, error)

func (*MemoryDatabase) GetJobSpec

func (m *MemoryDatabase) GetJobSpec(_ context.Context, jobID string) (*api.Job, error)

func (*MemoryDatabase) GetJobs

func (m *MemoryDatabase) GetJobs(_ *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)

func (*MemoryDatabase) InitialiseSchema

func (m *MemoryDatabase) InitialiseSchema(_ context.Context) error

func (*MemoryDatabase) PopulateHistoricalJobs added in v0.20.34

func (m *MemoryDatabase) PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error

func (*MemoryDatabase) TearDown

func (m *MemoryDatabase) TearDown(_ context.Context) error

type NewJob

type NewJob struct {
	JobID            string
	Queue            string
	JobSet           string
	Owner            string
	Namespace        string
	Priority         int64
	PriorityClass    string
	Submitted        time.Time
	Cpu              int64
	Memory           int64
	EphemeralStorage int64
	Gpu              int64
	Annotations      map[string]string
}

type PostgresDatabase

type PostgresDatabase struct {
	// contains filtered or unexported fields
}

PostgresDatabase implements the Database interface using PostgreSQL. It reuses the Lookout schema and query infrastructure to ensure realistic testing of production query patterns.

func NewPostgresDatabase

func NewPostgresDatabase(config map[string]string) *PostgresDatabase

NewPostgresDatabase creates a new PostgresDatabase instance. The config map should contain connection parameters compatible with libpq:

  • host: database host (e.g., "localhost")
  • port: database port (e.g., "5433")
  • user: database user (e.g., "postgres")
  • password: database password
  • dbname: database name (e.g., "broadside_test")
  • sslmode: SSL mode (e.g., "disable")

func (*PostgresDatabase) Close

func (p *PostgresDatabase) Close()

Close is a no-op: pgxpool.Close can hang on background health-check goroutines. Pool connections will be reclaimed by the OS on process exit.

func (*PostgresDatabase) ExecuteIngestionQueryBatch

func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error

ExecuteIngestionQueryBatch executes a batch of ingestion queries. Jobs are grouped by type and inserted/updated in the appropriate order to maintain referential integrity.

func (*PostgresDatabase) GetJobGroups

func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)

GetJobGroups retrieves aggregated job groups using the Lookout repository.

func (*PostgresDatabase) GetJobRunDebugMessage

func (p *PostgresDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)

GetJobRunDebugMessage retrieves the debug message for a specific job run.

func (*PostgresDatabase) GetJobRunError

func (p *PostgresDatabase) GetJobRunError(ctx context.Context, jobRunID string) (string, error)

GetJobRunError retrieves the error message for a specific job run.

func (*PostgresDatabase) GetJobSpec

func (p *PostgresDatabase) GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)

GetJobSpec retrieves the job specification for a specific job.

func (*PostgresDatabase) GetJobs

func (p *PostgresDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)

GetJobs retrieves jobs matching the given filters using the Lookout repository.

func (*PostgresDatabase) InitialiseSchema

func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error

InitialiseSchema opens the connection pool, applies the Lookout database migrations, and initialises the query repository.

func (*PostgresDatabase) PopulateHistoricalJobs added in v0.20.34

func (p *PostgresDatabase) PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error

PopulateHistoricalJobs inserts terminal historical jobs in chunks, with automatic resume on restart. Each chunk is a separate transaction so that progress survives interruptions. On entry the method queries the database to find the highest job index already present and resumes from there.

func (*PostgresDatabase) TearDown

func (p *PostgresDatabase) TearDown(ctx context.Context) error

TearDown truncates all tables to clean up after a test run. This is faster than dropping and recreating the database, and allows multiple test runs against the same database instance.

type SetJobCancelled

type SetJobCancelled struct {
	JobID        string
	Time         time.Time
	CancelReason string
	CancelUser   string
}

type SetJobErrored

type SetJobErrored struct {
	JobID string
	Time  time.Time
}

type SetJobLeased

type SetJobLeased struct {
	JobID string
	Time  time.Time
	RunID string
}

type SetJobPending

type SetJobPending struct {
	JobID string
	Time  time.Time
	RunID string
}

type SetJobPreempted

type SetJobPreempted struct {
	JobID string
	Time  time.Time
}

type SetJobRejected

type SetJobRejected struct {
	JobID string
	Time  time.Time
}

type SetJobRunCancelled

type SetJobRunCancelled struct {
	JobRunID string
	Time     time.Time
}

type SetJobRunFailed

type SetJobRunFailed struct {
	JobRunID string
	Time     time.Time
	Error    []byte
	Debug    []byte
	ExitCode int32
}

type SetJobRunPending

type SetJobRunPending struct {
	JobRunID string
	Time     time.Time
}

type SetJobRunPreempted

type SetJobRunPreempted struct {
	JobRunID string
	Time     time.Time
	Error    []byte
}

type SetJobRunStarted

type SetJobRunStarted struct {
	JobRunID string
	Time     time.Time
	Node     string
}

type SetJobRunSucceeded

type SetJobRunSucceeded struct {
	JobRunID string
	Time     time.Time
	ExitCode int32
}

type SetJobRunning

type SetJobRunning struct {
	JobID       string
	Time        time.Time
	LatestRunID string
}

type SetJobSucceeded

type SetJobSucceeded struct {
	JobID string
	Time  time.Time
}

type UpdateJobPriority

type UpdateJobPriority struct {
	JobID    string
	Priority int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL