Documentation
¶
Overview ¶
Package db provides database abstraction for the Broadside load tester.
This package defines the Database interface which encapsulates all operations required for load testing Lookout's database backends. It supports multiple database implementations (PostgreSQL, ClickHouse, and in-memory) behind a common interface, enabling apples-to-apples performance comparisons.
Interface ¶
The Database interface provides methods for:
- Schema initialisation (InitialiseSchema)
- Batch ingestion query execution (ExecuteIngestionQueryBatch)
- Individual record retrieval (GetJobRunDebugMessage, GetJobRunError, GetJobSpec)
- Multiple record retrieval (GetJobs, GetJobGroups)
- Resource cleanup (TearDown, Close)
Ingestion Queries ¶
Batch operations are performed via ExecuteIngestionQueryBatch, which accepts a slice of IngestionQuery values. IngestionQuery is an interface implemented by all ingestion query types.
The interface uses a private method to ensure type safety at compile time. Only the defined query types can be used, preventing runtime errors from incorrect types. Implementations use a type switch to handle each query type:
switch q := query.(type) {
case db.InsertJob:
// handle insert job
case db.SetJobCancelled:
// handle job cancellation
// ... etc
}
The JobIDFromQuery function extracts the job ID from any IngestionQuery type, which is useful for routing queries to workers based on job ID (e.g. for parallel batch execution whilst maintaining per-job ordering).
Supported ingestion query types:
- InsertJob: Insert a new job record
- InsertJobSpec: Insert job specification
- UpdateJobPriority: Update job priority
- SetJobCancelled: Mark job as cancelled
- SetJobSucceeded: Mark job as succeeded
- InsertJobError: Insert job error
- SetJobPreempted: Mark job as preempted
- SetJobRejected: Mark job as rejected
- SetJobErrored: Mark job as errored
- SetJobRunning: Mark job as running
- SetJobRunStarted: Mark job run as started
- SetJobPending: Mark job as pending
- SetJobRunPending: Mark job run as pending
- SetJobRunCancelled: Mark job run as cancelled
- SetJobRunFailed: Mark job run as failed
- SetJobRunSucceeded: Mark job run as succeeded
- SetJobRunPreempted: Mark job run as preempted
- SetJobLeased: Mark job as leased
- InsertJobRun: Insert a new job run record
Implementations ¶
Three implementations are provided:
- PostgresDatabase: PostgreSQL adapter (placeholder implementation)
- ClickHouseDatabase: ClickHouse adapter (placeholder implementation)
- MemoryDatabase: In-memory adapter for smoke-testing Broadside
The MemoryDatabase implementation stores all data in Go maps protected by a read-write mutex. It mirrors the schema structure used in PostgreSQL (jobs, job runs, annotations, job errors, job specs) and is intended for verifying logical correctness before running load tests against real databases.
Query Methods ¶
GetJobs retrieves jobs matching the provided filters, sorted by the specified order, with pagination via skip and take parameters. It supports filtering on all job fields (jobId, queue, jobSet, owner, namespace, state, cpu, memory, ephemeralStorage, gpu, priority, submitted, lastTransitionTime, priorityClass) as well as job run fields (cluster, node) and annotations. Filter match types include exact, anyOf, startsWith, contains, and numeric comparisons (greaterThan, lessThan, greaterThanOrEqualTo, lessThanOrEqualTo).
When the activeJobSets parameter is true, only jobs belonging to "active" job sets are returned. A job set is considered active if it contains at least one job in a non-terminal state (QUEUED, LEASED, PENDING, or RUNNING). This mirrors the behaviour of the Lookout PostgreSQL implementation, which performs an inner join with a subquery selecting distinct (queue, jobSet) pairs that have active jobs.
GetJobGroups aggregates jobs by a specified field (queue, jobSet, owner, namespace, state, cluster, node, or an annotation key) and computes aggregate values. The supported aggregates are:
- state: returns a map of state names to job counts
- submitted: returns the earliest submission time in the group
- lastTransitionTime: returns the average last transition time in the group
Usage ¶
Create a database instance using the appropriate constructor:
// For PostgreSQL
db := db.NewPostgresDatabase(map[string]string{
"host": "localhost",
"port": "5432",
"database": "lookout",
})
// For ClickHouse
db := db.NewClickHouseDatabase(map[string]string{
"host": "localhost",
"port": "9000",
"database": "lookout",
})
// For in-memory (smoke testing)
db := db.NewMemoryDatabase()
// Initialise schema
if err := db.InitialiseSchema(ctx); err != nil {
return err
}
// Execute batch of ingestion queries
queries := []db.IngestionQuery{
db.InsertJob{Job: &db.NewJob{...}},
}
if err := db.ExecuteIngestionQueryBatch(ctx, queries); err != nil {
return err
}
// Clean up
defer db.Close()
Index ¶
- func JobIDFromQuery(q IngestionQuery) string
- type ClickHouseDatabase
- func (c *ClickHouseDatabase) Close()
- func (c *ClickHouseDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
- func (c *ClickHouseDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, ...) ([]*model.JobGroup, error)
- func (c *ClickHouseDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
- func (c *ClickHouseDatabase) GetJobRunError(ctx context.Context, jobRunID string) (string, error)
- func (c *ClickHouseDatabase) GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
- func (c *ClickHouseDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, ...) ([]*model.Job, error)
- func (c *ClickHouseDatabase) InitialiseSchema(ctx context.Context) error
- func (c *ClickHouseDatabase) TearDown(ctx context.Context) error
- type Database
- type IngestionQuery
- type InsertJob
- type InsertJobError
- type InsertJobRun
- type InsertJobSpec
- type MemoryDatabase
- func (m *MemoryDatabase) Close()
- func (m *MemoryDatabase) ExecuteIngestionQueryBatch(_ context.Context, queries []IngestionQuery) error
- func (m *MemoryDatabase) GetJobGroups(_ *context.Context, filters []*model.Filter, order *model.Order, ...) ([]*model.JobGroup, error)
- func (m *MemoryDatabase) GetJobRunDebugMessage(_ context.Context, jobRunID string) (string, error)
- func (m *MemoryDatabase) GetJobRunError(_ context.Context, jobRunID string) (string, error)
- func (m *MemoryDatabase) GetJobSpec(_ context.Context, jobID string) (*api.Job, error)
- func (m *MemoryDatabase) GetJobs(_ *context.Context, filters []*model.Filter, activeJobSets bool, ...) ([]*model.Job, error)
- func (m *MemoryDatabase) InitialiseSchema(_ context.Context) error
- func (m *MemoryDatabase) TearDown(_ context.Context) error
- type NewJob
- type PostgresDatabase
- func (p *PostgresDatabase) Close()
- func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
- func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, ...) ([]*model.JobGroup, error)
- func (p *PostgresDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
- func (p *PostgresDatabase) GetJobRunError(ctx context.Context, jobRunID string) (string, error)
- func (p *PostgresDatabase) GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
- func (p *PostgresDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, ...) ([]*model.Job, error)
- func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error
- func (p *PostgresDatabase) TearDown(ctx context.Context) error
- type SetJobCancelled
- type SetJobErrored
- type SetJobLeased
- type SetJobPending
- type SetJobPreempted
- type SetJobRejected
- type SetJobRunCancelled
- type SetJobRunFailed
- type SetJobRunPending
- type SetJobRunPreempted
- type SetJobRunStarted
- type SetJobRunSucceeded
- type SetJobRunning
- type SetJobSucceeded
- type UpdateJobPriority
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func JobIDFromQuery ¶
func JobIDFromQuery(q IngestionQuery) string
JobIDFromQuery extracts the job ID from any IngestionQuery. For run-based queries, it extracts the job ID prefix from the run ID.
Types ¶
type ClickHouseDatabase ¶
type ClickHouseDatabase struct {
// contains filtered or unexported fields
}
func NewClickHouseDatabase ¶
func NewClickHouseDatabase(config map[string]string) *ClickHouseDatabase
func (*ClickHouseDatabase) Close ¶
func (c *ClickHouseDatabase) Close()
func (*ClickHouseDatabase) ExecuteIngestionQueryBatch ¶
func (c *ClickHouseDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
func (*ClickHouseDatabase) GetJobGroups ¶
func (*ClickHouseDatabase) GetJobRunDebugMessage ¶
func (*ClickHouseDatabase) GetJobRunError ¶
func (*ClickHouseDatabase) GetJobSpec ¶
func (*ClickHouseDatabase) InitialiseSchema ¶
func (c *ClickHouseDatabase) InitialiseSchema(ctx context.Context) error
type Database ¶
type Database interface {
InitialiseSchema(ctx context.Context) error
ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
GetJobRunError(ctx context.Context, jobRunID string) (string, error)
GetJobSpec(ctx context.Context, jobID string) (*api.Job, error)
GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)
GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)
TearDown(ctx context.Context) error
Close()
}
type IngestionQuery ¶
type IngestionQuery interface {
// contains filtered or unexported methods
}
type InsertJobError ¶
type InsertJobRun ¶
type InsertJobSpec ¶
type MemoryDatabase ¶
type MemoryDatabase struct {
// contains filtered or unexported fields
}
func NewMemoryDatabase ¶
func NewMemoryDatabase() *MemoryDatabase
func (*MemoryDatabase) Close ¶
func (m *MemoryDatabase) Close()
func (*MemoryDatabase) ExecuteIngestionQueryBatch ¶
func (m *MemoryDatabase) ExecuteIngestionQueryBatch(_ context.Context, queries []IngestionQuery) error
func (*MemoryDatabase) GetJobGroups ¶
func (*MemoryDatabase) GetJobRunDebugMessage ¶
func (*MemoryDatabase) GetJobRunError ¶
func (*MemoryDatabase) GetJobSpec ¶
func (*MemoryDatabase) InitialiseSchema ¶
func (m *MemoryDatabase) InitialiseSchema(_ context.Context) error
type PostgresDatabase ¶
type PostgresDatabase struct {
// contains filtered or unexported fields
}
PostgresDatabase implements the Database interface using PostgreSQL. It reuses the Lookout schema and query infrastructure to ensure realistic testing of production query patterns.
func NewPostgresDatabase ¶
func NewPostgresDatabase(config map[string]string) *PostgresDatabase
NewPostgresDatabase creates a new PostgresDatabase instance. The config map should contain connection parameters compatible with libpq:
- host: database host (e.g., "localhost")
- port: database port (e.g., "5433")
- user: database user (e.g., "postgres")
- password: database password
- dbname: database name (e.g., "broadside_test")
- sslmode: SSL mode (e.g., "disable")
func (*PostgresDatabase) Close ¶
func (p *PostgresDatabase) Close()
Close closes the database connection pool.
func (*PostgresDatabase) ExecuteIngestionQueryBatch ¶
func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error
ExecuteIngestionQueryBatch executes a batch of ingestion queries. Jobs are grouped by type and inserted/updated in the appropriate order to maintain referential integrity.
func (*PostgresDatabase) GetJobGroups ¶
func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.Filter, order *model.Order, groupedField *model.GroupedField, aggregates []string, skip int, take int) ([]*model.JobGroup, error)
GetJobGroups retrieves aggregated job groups using the Lookout repository.
func (*PostgresDatabase) GetJobRunDebugMessage ¶
func (p *PostgresDatabase) GetJobRunDebugMessage(ctx context.Context, jobRunID string) (string, error)
GetJobRunDebugMessage retrieves the debug message for a specific job run.
func (*PostgresDatabase) GetJobRunError ¶
GetJobRunError retrieves the error message for a specific job run.
func (*PostgresDatabase) GetJobSpec ¶
GetJobSpec retrieves the job specification for a specific job.
func (*PostgresDatabase) GetJobs ¶
func (p *PostgresDatabase) GetJobs(ctx *context.Context, filters []*model.Filter, activeJobSets bool, order *model.Order, skip int, take int) ([]*model.Job, error)
GetJobs retrieves jobs matching the given filters using the Lookout repository.
func (*PostgresDatabase) InitialiseSchema ¶
func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error
InitialiseSchema opens the connection pool, applies the Lookout database migrations, and initialises the query repository.