Documentation
¶
Overview ¶
Package db provides database abstraction for the Broadside load tester.
This package defines the Database interface which encapsulates all operations required for load testing Lookout's database backends. It supports multiple database implementations (PostgreSQL, ClickHouse, and in-memory) behind a common interface, enabling apples-to-apples performance comparisons.
Interface ¶
The Database interface provides methods for:
- Schema initialisation (InitialiseSchema)
- Batch ingestion query execution (ExecuteIngestionQueryBatch)
- Historical job population (PopulateHistoricalJobs)
- Individual record retrieval (GetJobRunDebugMessage, GetJobRunError, GetJobSpec)
- Multiple record retrieval (GetJobs, GetJobGroups)
- Resource cleanup (TearDown, Close)
Historical Job Population ¶
PopulateHistoricalJobs inserts a batch of pre-completed historical jobs for a single (queue, job set) pair. It accepts a HistoricalJobsParams struct that specifies the queue and job set identifiers, the number of jobs to create, and threshold values used to divide jobs into terminal states (succeeded, errored, cancelled, preempted) via modular arithmetic on the job number. Implementations may perform the insertion using server-side SQL generation (e.g. PostgreSQL INSERT...SELECT FROM generate_series) to avoid round-trip overhead, or using in-memory Go loops for the MemoryDatabase.
Ingestion Queries ¶
Batch operations are performed via ExecuteIngestionQueryBatch, which accepts a slice of IngestionQuery values. IngestionQuery is an interface implemented by all ingestion query types.
The interface uses a private method to ensure type safety at compile time. Only the defined query types can be used, preventing runtime errors from incorrect types. Implementations use a type switch to handle each query type:
switch q := query.(type) {
case db.InsertJob:
// handle insert job
case db.SetJobCancelled:
// handle job cancellation
// ... etc
}
The JobIDFromQuery function extracts the job ID from any IngestionQuery type, which is useful for routing queries to workers based on job ID (e.g. for parallel batch execution whilst maintaining per-job ordering).
Supported ingestion query types:
- InsertJob: Insert a new job record
- InsertJobSpec: Insert job specification
- UpdateJobPriority: Update job priority
- SetJobCancelled: Mark job as cancelled
- SetJobSucceeded: Mark job as succeeded
- InsertJobError: Insert job error
- SetJobPreempted: Mark job as preempted
- SetJobRejected: Mark job as rejected
- SetJobErrored: Mark job as errored
- SetJobRunning: Mark job as running
- SetJobRunStarted: Mark job run as started
- SetJobPending: Mark job as pending
- SetJobRunPending: Mark job run as pending
- SetJobRunCancelled: Mark job run as cancelled
- SetJobRunFailed: Mark job run as failed
- SetJobRunSucceeded: Mark job run as succeeded
- SetJobRunPreempted: Mark job run as preempted
- SetJobLeased: Mark job as leased
- InsertJobRun: Insert a new job run record
Implementations ¶
Three implementations are provided:
- PostgresDatabase: PostgreSQL adapter (placeholder implementation)
- ClickHouseDatabase: ClickHouse adapter (placeholder implementation)
- MemoryDatabase: In-memory adapter for smoke-testing Broadside
The MemoryDatabase implementation stores all data in Go maps protected by a read-write mutex. It mirrors the schema structure used in PostgreSQL (jobs, job runs, annotations, job errors, job specs) and is intended for verifying logical correctness before running load tests against real databases.
Query Methods ¶
GetJobs retrieves jobs matching the provided filters, sorted by the specified order, with pagination via skip and take parameters. It supports filtering on all job fields (jobId, queue, jobSet, owner, namespace, state, cpu, memory, ephemeralStorage, gpu, priority, submitted, lastTransitionTime, priorityClass) as well as job run fields (cluster, node) and annotations. Filter match types include exact, anyOf, startsWith, contains, and numeric comparisons (greaterThan, lessThan, greaterThanOrEqualTo, lessThanOrEqualTo).
When the activeJobSets parameter is true, only jobs belonging to "active" job sets are returned. A job set is considered active if it contains at least one job in a non-terminal state (QUEUED, LEASED, PENDING, or RUNNING). This mirrors the behaviour of the Lookout PostgreSQL implementation, which performs an inner join with a subquery selecting distinct (queue, jobSet) pairs that have active jobs.
GetJobGroups aggregates jobs by a specified field (queue, jobSet, owner, namespace, state, cluster, node, or an annotation key) and computes aggregate values. The supported aggregates are:
- state: returns a map of state names to job counts
- submitted: returns the earliest submission time in the group
- lastTransitionTime: returns the average last transition time in the group
Usage ¶
Create a database instance using the appropriate constructor:
// For PostgreSQL
db := db.NewPostgresDatabase(map[string]string{
"host": "localhost",
"port": "5432",
"database": "lookout",
})
// For ClickHouse
db := db.NewClickHouseDatabase(map[string]string{
"host": "localhost",
"port": "9000",
"database": "lookout",
})
// For in-memory (smoke testing)
db := db.NewMemoryDatabase()
// Initialise schema
if err := db.InitialiseSchema(ctx); err != nil {
return err
}
// Execute batch of ingestion queries
queries := []db.IngestionQuery{
db.InsertJob{Job: &db.NewJob{...}},
}
if err := db.ExecuteIngestionQueryBatch(ctx, queries); err != nil {
return err
}
// Clean up
defer db.Close()
Index ¶
- func JobIDFromQuery(q IngestionQuery) string
- type ClickHouseDatabase
- func (c *ClickHouseDatabase) Close()
- func (c *ClickHouseDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
- func (c *ClickHouseDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, ...) ([]*model.JobGroup, error)
- func (c *ClickHouseDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
- func (c *ClickHouseDatabase) GetJobRunError(ctx context.Context, jobRunID string) (string, error)
- func (c *ClickHouseDatabase) GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
- func (c *ClickHouseDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, ...) ([]*model.Job, error)
- func (c *ClickHouseDatabase) InitialiseSchema(ctx context.Context) error
- func (c *ClickHouseDatabase) PopulateHistoricalJobs(_ context.Context, _ HistoricalJobsParams) error
- func (c *ClickHouseDatabase) TearDown(ctx context.Context) error
- type Database
- type HistoricalJobsParams
- type IngestionQuery
- type InsertJob
- type InsertJobError
- type InsertJobRun
- type InsertJobSpec
- type MemoryDatabase
- func (m *MemoryDatabase) Close()
- func (m *MemoryDatabase) ExecuteIngestionQueryBatch(_ context.Context, queries []IngestionQuery) error
- func (m *MemoryDatabase) GetJobGroups(_ *context.Context, filters []*model.Filter, order *model.Order, ...) ([]*model.JobGroup, error)
- func (m *MemoryDatabase) GetJobRunDebugMessage(_ context.Context, jobRunID string) (string, error)
- func (m *MemoryDatabase) GetJobRunError(_ context.Context, jobRunID string) (string, error)
- func (m *MemoryDatabase) GetJobSpec(_ context.Context, jobID string) (*api.Job, error)
- func (m *MemoryDatabase) GetJobs(_ *context.Context, filters []*model.Filter, activeJobSets bool, ...) ([]*model.Job, error)
- func (m *MemoryDatabase) InitialiseSchema(_ context.Context) error
- func (m *MemoryDatabase) PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error
- func (m *MemoryDatabase) TearDown(_ context.Context) error
- type NewJob
- type PostgresDatabase
- func (p *PostgresDatabase) Close()
- func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
- func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, ...) ([]*model.JobGroup, error)
- func (p *PostgresDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
- func (p *PostgresDatabase) GetJobRunError(ctx context.Context, jobRunID string) (string, error)
- func (p *PostgresDatabase) GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
- func (p *PostgresDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, ...) ([]*model.Job, error)
- func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error
- func (p *PostgresDatabase) PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error
- func (p *PostgresDatabase) TearDown(ctx context.Context) error
- type SetJobCancelled
- type SetJobErrored
- type SetJobLeased
- type SetJobPending
- type SetJobPreempted
- type SetJobRejected
- type SetJobRunCancelled
- type SetJobRunFailed
- type SetJobRunPending
- type SetJobRunPreempted
- type SetJobRunStarted
- type SetJobRunSucceeded
- type SetJobRunning
- type SetJobSucceeded
- type UpdateJobPriority
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func JobIDFromQuery ¶
func JobIDFromQuery(q IngestionQuery) string
JobIDFromQuery extracts the job ID from any IngestionQuery. For run-based queries, it extracts the job ID prefix from the run ID.
Types ¶
type ClickHouseDatabase ¶
type ClickHouseDatabase struct {
// contains filtered or unexported fields
}
func NewClickHouseDatabase ¶
func NewClickHouseDatabase(config map[string]string) *ClickHouseDatabase
func (*ClickHouseDatabase) Close ¶
func (c *ClickHouseDatabase) Close()
func (*ClickHouseDatabase) ExecuteIngestionQueryBatch ¶
func (c *ClickHouseDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
func (*ClickHouseDatabase) GetJobGroups ¶
func (*ClickHouseDatabase) GetJobRunDebugMessage ¶
func (*ClickHouseDatabase) GetJobRunError ¶
func (*ClickHouseDatabase) GetJobSpec ¶
func (*ClickHouseDatabase) InitialiseSchema ¶
func (c *ClickHouseDatabase) InitialiseSchema(ctx context.Context) error
func (*ClickHouseDatabase) PopulateHistoricalJobs ¶ added in v0.20.34
func (c *ClickHouseDatabase) PopulateHistoricalJobs(_ context.Context, _ HistoricalJobsParams) error
type Database ¶
type Database interface {
InitialiseSchema(ctx context.Context) error
ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
GetJobRunError(ctx context.Context, jobRunID string) (string, error)
GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)
GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)
PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error
TearDown(ctx context.Context) error
Close()
}
type HistoricalJobsParams ¶ added in v0.20.34
type HistoricalJobsParams struct {
QueueIdx int
JobSetIdx int
QueueName string
JobSetName string
NJobs int
ChunkSize int
SucceededThreshold int
ErroredThreshold int
CancelledThreshold int
JobSpecBytes []byte
ErrorBytes []byte
DebugBytes []byte
PreemptionBytes []byte
}
HistoricalJobsParams describes a batch of terminal historical jobs to insert.
The threshold fields use a scale of 1000: a job with index i is assigned a state based on i%1000. Jobs where i%1000 < SucceededThreshold are succeeded, i%1000 < ErroredThreshold are errored, i%1000 < CancelledThreshold are cancelled, and the remainder are preempted. Derive them as:
SucceededThreshold = int(ProportionSucceeded * 1000) ErroredThreshold = SucceededThreshold + int(ProportionErrored * 1000) CancelledThreshold = ErroredThreshold + int(ProportionCancelled * 1000)
type IngestionQuery ¶
type IngestionQuery interface {
// contains filtered or unexported methods
}
type InsertJobError ¶
type InsertJobRun ¶
type InsertJobSpec ¶
type MemoryDatabase ¶
type MemoryDatabase struct {
// contains filtered or unexported fields
}
func NewMemoryDatabase ¶
func NewMemoryDatabase() *MemoryDatabase
func (*MemoryDatabase) Close ¶
func (m *MemoryDatabase) Close()
func (*MemoryDatabase) ExecuteIngestionQueryBatch ¶
func (m *MemoryDatabase) ExecuteIngestionQueryBatch(_ context.Context, queries []IngestionQuery) error
func (*MemoryDatabase) GetJobGroups ¶
func (*MemoryDatabase) GetJobRunDebugMessage ¶
func (*MemoryDatabase) GetJobRunError ¶
func (*MemoryDatabase) GetJobSpec ¶
func (*MemoryDatabase) InitialiseSchema ¶
func (m *MemoryDatabase) InitialiseSchema(_ context.Context) error
func (*MemoryDatabase) PopulateHistoricalJobs ¶ added in v0.20.34
func (m *MemoryDatabase) PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error
type PostgresDatabase ¶
type PostgresDatabase struct {
// contains filtered or unexported fields
}
PostgresDatabase implements the Database interface using PostgreSQL. It reuses the Lookout schema and query infrastructure to ensure realistic testing of production query patterns.
func NewPostgresDatabase ¶
func NewPostgresDatabase(config map[string]string) *PostgresDatabase
NewPostgresDatabase creates a new PostgresDatabase instance. The config map should contain connection parameters compatible with libpq:
- host: database host (e.g., "localhost")
- port: database port (e.g., "5433")
- user: database user (e.g., "postgres")
- password: database password
- dbname: database name (e.g., "broadside_test")
- sslmode: SSL mode (e.g., "disable")
func (*PostgresDatabase) Close ¶
func (p *PostgresDatabase) Close()
Close is a no-op: pgxpool.Close can hang on background health-check goroutines. Pool connections will be reclaimed by the OS on process exit.
func (*PostgresDatabase) ExecuteIngestionQueryBatch ¶
func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
ExecuteIngestionQueryBatch executes a batch of ingestion queries. Jobs are grouped by type and inserted/updated in the appropriate order to maintain referential integrity.
func (*PostgresDatabase) GetJobGroups ¶
func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)
GetJobGroups retrieves aggregated job groups using the Lookout repository.
func (*PostgresDatabase) GetJobRunDebugMessage ¶
func (p *PostgresDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
GetJobRunDebugMessage retrieves the debug message for a specific job run.
func (*PostgresDatabase) GetJobRunError ¶
GetJobRunError retrieves the error message for a specific job run.
func (*PostgresDatabase) GetJobSpec ¶
GetJobSpec retrieves the job specification for a specific job.
func (*PostgresDatabase) GetJobs ¶
func (p *PostgresDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)
GetJobs retrieves jobs matching the given filters using the Lookout repository.
func (*PostgresDatabase) InitialiseSchema ¶
func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error
InitialiseSchema opens the connection pool, applies the Lookout database migrations, and initialises the query repository.
func (*PostgresDatabase) PopulateHistoricalJobs ¶ added in v0.20.34
func (p *PostgresDatabase) PopulateHistoricalJobs(ctx context.Context, params HistoricalJobsParams) error
PopulateHistoricalJobs inserts terminal historical jobs in chunks, with automatic resume on restart. Each chunk is a separate transaction so that progress survives interruptions. On entry the method queries the database to find the highest job index already present and resumes from there.