Documentation
¶
Overview ¶
Package jobqueue provides River-based job scheduling for parallel bench runs.
Index ¶
- Constants
- type BenchJobArgs
- type BenchWorker
- type Client
- func (c *Client) Insert(ctx context.Context, args BenchJobArgs) error
- func (c *Client) InsertBatch(ctx context.Context, scenarios []string, ...) error
- func (c *Client) Migrate(ctx context.Context) error
- func (c *Client) Start(ctx context.Context) error
- func (c *Client) Stop(ctx context.Context) error
- func (c *Client) Stopped() <-chan struct{}
- type RunFunc
Constants ¶
const MaxJobAttempts = 2
MaxJobAttempts is the default retry count for bench scenario jobs. A retry occurs only if RunFunc returns a non-nil error (infra failure). Scenario failures (agent didn't fix) should return nil to avoid retries.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BenchJobArgs ¶
type BenchJobArgs struct {
JobID string `json:"job_id"`
TenantID string `json:"tenant_id"` // Reserved for multi-tenant SaaS (Phase 2b). Empty in CLI mode.
ScenarioID string `json:"scenario_id"`
Model string `json:"model"`
Provider string `json:"provider"`
MCPServer string `json:"mcp_server,omitempty"`
ToolServer string `json:"tool_server,omitempty"`
ToolServerVersion string `json:"tool_server_version,omitempty"`
SkillFile string `json:"skill_file,omitempty"`
SkillID string `json:"skill_id,omitempty"`
SkillVersion string `json:"skill_version,omitempty"`
SkillSource string `json:"skill_source,omitempty"`
SkillSHA256 string `json:"skill_sha256,omitempty"`
NamespaceSlot int `json:"namespace_slot"` // Round-robin slot for namespace isolation (bench-w0..bench-wN)
Parallel int `json:"parallel"` // Total worker count; 1 = no namespace isolation
}
BenchJobArgs defines the arguments for a single scenario bench job.
func (BenchJobArgs) InsertOpts ¶
func (BenchJobArgs) InsertOpts() river.InsertOpts
InsertOpts returns default insert options.
func (BenchJobArgs) Kind ¶
func (BenchJobArgs) Kind() string
Kind returns the River job kind identifier.
type BenchWorker ¶
type BenchWorker struct {
river.WorkerDefaults[BenchJobArgs]
// contains filtered or unexported fields
}
BenchWorker implements river.Worker for scenario execution.
func NewBenchWorker ¶
func NewBenchWorker(fn RunFunc) *BenchWorker
NewBenchWorker creates a worker with the given run function.
func (*BenchWorker) Work ¶
func (w *BenchWorker) Work(ctx context.Context, job *river.Job[BenchJobArgs]) error
Work executes a single scenario in an isolated namespace.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps a River client with bench-specific configuration. Client wraps a River client with bench-specific configuration. Lifecycle: NewClient → Migrate → Insert/InsertBatch → Start → Stop. Migrate must be called before Start. Stop must be called exactly once.
func NewClient ¶
func NewClient(ctx context.Context, databaseURL string, parallel int, runFn RunFunc) (*Client, error)
NewClient creates a River client connected to PostgreSQL. parallel controls the max concurrent workers.
func (*Client) Insert ¶
func (c *Client) Insert(ctx context.Context, args BenchJobArgs) error
Insert enqueues a bench scenario job.
func (*Client) InsertBatch ¶
func (c *Client) InsertBatch(ctx context.Context, scenarios []string, model, provider, mcpServer, toolServer, toolServerVersion, skillFile, skillID, skillVersion, skillSource, skillSHA256, jobID, tenantID string, parallel int) error
InsertBatch enqueues multiple scenario jobs, assigning worker IDs round-robin.
type RunFunc ¶
type RunFunc func(ctx context.Context, args BenchJobArgs, namespace string) error
RunFunc is the function that executes a single scenario. It receives the job args and the worker namespace. Returning an error triggers a River retry (MaxAttempts=2). Return nil even on scenario failure to avoid retries.