jobqueue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package jobqueue provides River-based job scheduling for parallel bench runs.

Index

Constants

View Source
const MaxJobAttempts = 2

MaxJobAttempts is the default retry count for bench scenario jobs. A retry occurs only if RunFunc returns a non-nil error (infra failure). Scenario failures (agent didn't fix) should return nil to avoid retries.

Variables

This section is empty.

Functions

This section is empty.

Types

type BenchJobArgs

type BenchJobArgs struct {
	JobID             string `json:"job_id"`
	TenantID          string `json:"tenant_id"` // Reserved for multi-tenant SaaS (Phase 2b). Empty in CLI mode.
	ScenarioID        string `json:"scenario_id"`
	Model             string `json:"model"`
	Provider          string `json:"provider"`
	MCPServer         string `json:"mcp_server,omitempty"`
	ToolServer        string `json:"tool_server,omitempty"`
	ToolServerVersion string `json:"tool_server_version,omitempty"`
	SkillFile         string `json:"skill_file,omitempty"`
	SkillID           string `json:"skill_id,omitempty"`
	SkillVersion      string `json:"skill_version,omitempty"`
	SkillSource       string `json:"skill_source,omitempty"`
	SkillSHA256       string `json:"skill_sha256,omitempty"`
	NamespaceSlot     int    `json:"namespace_slot"` // Round-robin slot for namespace isolation (bench-w0..bench-wN)
	Parallel          int    `json:"parallel"`       // Total worker count; 1 = no namespace isolation
}

BenchJobArgs defines the arguments for a single scenario bench job.

func (BenchJobArgs) InsertOpts

func (BenchJobArgs) InsertOpts() river.InsertOpts

InsertOpts returns default insert options.

func (BenchJobArgs) Kind

func (BenchJobArgs) Kind() string

Kind returns the River job kind identifier.

type BenchWorker

type BenchWorker struct {
	river.WorkerDefaults[BenchJobArgs]
	// contains filtered or unexported fields
}

BenchWorker implements river.Worker for scenario execution.

func NewBenchWorker

func NewBenchWorker(fn RunFunc) *BenchWorker

NewBenchWorker creates a worker with the given run function.

func (*BenchWorker) Work

func (w *BenchWorker) Work(ctx context.Context, job *river.Job[BenchJobArgs]) error

Work executes a single scenario in an isolated namespace.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps a River client with bench-specific configuration. Client wraps a River client with bench-specific configuration. Lifecycle: NewClient → Migrate → Insert/InsertBatch → Start → Stop. Migrate must be called before Start. Stop must be called exactly once.

func NewClient

func NewClient(ctx context.Context, databaseURL string, parallel int, runFn RunFunc) (*Client, error)

NewClient creates a River client connected to PostgreSQL. parallel controls the max concurrent workers.

func (*Client) Insert

func (c *Client) Insert(ctx context.Context, args BenchJobArgs) error

Insert enqueues a bench scenario job.

func (*Client) InsertBatch

func (c *Client) InsertBatch(ctx context.Context, scenarios []string, model, provider, mcpServer, toolServer, toolServerVersion, skillFile, skillID, skillVersion, skillSource, skillSHA256, jobID, tenantID string, parallel int) error

InsertBatch enqueues multiple scenario jobs, assigning worker IDs round-robin.

func (*Client) Migrate

func (c *Client) Migrate(ctx context.Context) error

Migrate runs River's internal migrations to create its job tables.

func (*Client) Start

func (c *Client) Start(ctx context.Context) error

Start begins processing jobs. Blocks until ctx is cancelled.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

Stop gracefully stops the client.

func (*Client) Stopped

func (c *Client) Stopped() <-chan struct{}

Stopped returns a channel that is closed when all workers have stopped.

type RunFunc

type RunFunc func(ctx context.Context, args BenchJobArgs, namespace string) error

RunFunc is the function that executes a single scenario. It receives the job args and the worker namespace. Returning an error triggers a River retry (MaxAttempts=2). Return nil even on scenario failure to avoid retries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL