Documentation
¶
Overview ¶
Package queue provides job queue implementations for workflow execution.
This file defines the JobQueue interface and Job struct for local workflow execution with optional persistence. Unlike the NNG-based WorkQueue (for distributed workers), JobQueue is for single-node execution with persistence.
Package queue implements NNG-based work distribution for the hybrid architecture.
This package provides: - Work distribution from control plane to workers (PUSH/PULL) - Result collection from workers to control plane (PUSH/PULL) - Worker registration and heartbeat (REQ/REP)
Index ¶
- type Job
- type JobQueue
- type JobResult
- type JobStatus
- type MemoryJobQueue
- func (q *MemoryJobQueue) Ack(jobID string, result *JobResult) error
- func (q *MemoryJobQueue) Close() error
- func (q *MemoryJobQueue) Dequeue(ctx context.Context) (*Job, error)
- func (q *MemoryJobQueue) Enqueue(job *Job) error
- func (q *MemoryJobQueue) GetJob(jobID string) (*Job, error)
- func (q *MemoryJobQueue) GetPendingCount() int
- func (q *MemoryJobQueue) GetRunningCount() int
- func (q *MemoryJobQueue) ListJobs(status *JobStatus, limit int) ([]*Job, error)
- func (q *MemoryJobQueue) Nack(jobID string, err error) error
- type ResultHandler
- type ResultMessage
- type SQLiteJobQueue
- func (q *SQLiteJobQueue) Ack(jobID string, result *JobResult) error
- func (q *SQLiteJobQueue) Close() error
- func (q *SQLiteJobQueue) Dequeue(ctx context.Context) (*Job, error)
- func (q *SQLiteJobQueue) Enqueue(job *Job) error
- func (q *SQLiteJobQueue) GetJob(jobID string) (*Job, error)
- func (q *SQLiteJobQueue) GetPendingCount() int
- func (q *SQLiteJobQueue) GetRunningCount() int
- func (q *SQLiteJobQueue) ListJobs(status *JobStatus, limit int) ([]*Job, error)
- func (q *SQLiteJobQueue) Nack(jobID string, err error) error
- type WorkMessage
- type WorkQueue
- type WorkerPool
- type WorkerRegistration
- type WorkerStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
Workflow *model.Workflow `json:"workflow"`
InputData []model.DataItem `json:"input_data"`
Priority int `json:"priority"` // Higher = more urgent
Status JobStatus `json:"status"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
ResultData []model.DataItem `json:"result_data,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
Job represents a workflow execution job
type JobQueue ¶
type JobQueue interface {
// Enqueue adds a job to the queue
Enqueue(job *Job) error
// Dequeue retrieves and removes the next job from the queue
// Blocks until a job is available or context is cancelled
Dequeue(ctx context.Context) (*Job, error)
// Ack marks a job as successfully completed
Ack(jobID string, result *JobResult) error
// Nack marks a job as failed, optionally requeueing for retry
Nack(jobID string, err error) error
// GetJob retrieves a job by ID
GetJob(jobID string) (*Job, error)
// GetPendingCount returns the number of pending jobs
GetPendingCount() int
// GetRunningCount returns the number of running jobs
GetRunningCount() int
// ListJobs returns jobs matching the given status (nil for all)
ListJobs(status *JobStatus, limit int) ([]*Job, error)
// Close gracefully shuts down the queue
Close() error
}
JobQueue defines the interface for job queue implementations
type MemoryJobQueue ¶
type MemoryJobQueue struct {
// contains filtered or unexported fields
}
MemoryJobQueue implements JobQueue with in-memory storage Jobs are lost on restart - use SQLiteJobQueue for persistence
func NewMemoryJobQueue ¶
func NewMemoryJobQueue(bufferSize int) *MemoryJobQueue
NewMemoryJobQueue creates a new in-memory job queue
func (*MemoryJobQueue) Ack ¶
func (q *MemoryJobQueue) Ack(jobID string, result *JobResult) error
Ack marks a job as completed
func (*MemoryJobQueue) Close ¶
func (q *MemoryJobQueue) Close() error
Close gracefully shuts down the queue
func (*MemoryJobQueue) Dequeue ¶
func (q *MemoryJobQueue) Dequeue(ctx context.Context) (*Job, error)
Dequeue retrieves the next job from the queue
func (*MemoryJobQueue) Enqueue ¶
func (q *MemoryJobQueue) Enqueue(job *Job) error
Enqueue adds a job to the queue
func (*MemoryJobQueue) GetJob ¶
func (q *MemoryJobQueue) GetJob(jobID string) (*Job, error)
GetJob retrieves a job by ID
func (*MemoryJobQueue) GetPendingCount ¶
func (q *MemoryJobQueue) GetPendingCount() int
GetPendingCount returns the number of pending jobs
func (*MemoryJobQueue) GetRunningCount ¶
func (q *MemoryJobQueue) GetRunningCount() int
GetRunningCount returns the number of running jobs
type ResultHandler ¶
type ResultHandler func(result *ResultMessage) error
ResultHandler is called when a result is received
type ResultMessage ¶
type ResultMessage struct {
Type string `json:"type"` // "execution_result"
ExecutionID string `json:"execution_id"` // Execution ID
Status string `json:"status"` // "success", "failed", "timeout"
ResultData []model.DataItem `json:"result_data"` // Output data
Error string `json:"error"` // Error message if failed
DurationMs int64 `json:"duration_ms"` // Execution duration
WorkerID string `json:"worker_id"` // Which worker executed it
Timestamp time.Time `json:"timestamp"` // When result was generated
}
ResultMessage represents a result from a worker
type SQLiteJobQueue ¶
type SQLiteJobQueue struct {
// contains filtered or unexported fields
}
SQLiteJobQueue implements JobQueue with SQLite persistence Jobs are kept in memory for fast access and persisted to SQLite for durability
func NewSQLiteJobQueue ¶
func NewSQLiteJobQueue(dbPath string, bufferSize int) (*SQLiteJobQueue, error)
NewSQLiteJobQueue creates a new SQLite-backed job queue
func (*SQLiteJobQueue) Ack ¶
func (q *SQLiteJobQueue) Ack(jobID string, result *JobResult) error
Ack marks a job as completed
func (*SQLiteJobQueue) Close ¶
func (q *SQLiteJobQueue) Close() error
Close gracefully shuts down the queue
func (*SQLiteJobQueue) Dequeue ¶
func (q *SQLiteJobQueue) Dequeue(ctx context.Context) (*Job, error)
Dequeue retrieves the next job from the queue
func (*SQLiteJobQueue) Enqueue ¶
func (q *SQLiteJobQueue) Enqueue(job *Job) error
Enqueue adds a job to the queue
func (*SQLiteJobQueue) GetJob ¶
func (q *SQLiteJobQueue) GetJob(jobID string) (*Job, error)
GetJob retrieves a job by ID
func (*SQLiteJobQueue) GetPendingCount ¶
func (q *SQLiteJobQueue) GetPendingCount() int
GetPendingCount returns the number of pending jobs
func (*SQLiteJobQueue) GetRunningCount ¶
func (q *SQLiteJobQueue) GetRunningCount() int
GetRunningCount returns the number of running jobs
type WorkMessage ¶
type WorkMessage struct {
Type string `json:"type"` // "execute_workflow"
ExecutionID string `json:"execution_id"` // Unique execution ID
WorkflowID string `json:"workflow_id"` // Workflow ID
Workflow *model.Workflow `json:"workflow"` // Full workflow definition
InputData []model.DataItem `json:"input_data"` // Initial input data
Priority string `json:"priority"` // "high", "normal", "low"
Timestamp time.Time `json:"timestamp"` // When work was queued
RetryCount int `json:"retry_count"` // Number of retries
}
WorkMessage represents a work item to be executed by a worker
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
WorkQueue manages work distribution via NNG
func NewWorkQueue ¶
NewWorkQueue creates a new work queue for the control plane
func (*WorkQueue) GetWorkerCount ¶
GetWorkerCount returns the number of active workers
func (*WorkQueue) GetWorkers ¶
func (wq *WorkQueue) GetWorkers() []*WorkerStatus
GetWorkers returns the current worker pool status
func (*WorkQueue) PushWork ¶
func (wq *WorkQueue) PushWork(work *WorkMessage) error
PushWork pushes a work item to the queue
func (*WorkQueue) SetResultHandler ¶
func (wq *WorkQueue) SetResultHandler(handler ResultHandler)
SetResultHandler sets the handler for received results
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool processes jobs from a JobQueue using multiple workers
func NewWorkerPool ¶
func NewWorkerPool(queue JobQueue, eng engine.WorkflowEngine, numWorkers int) *WorkerPool
NewWorkerPool creates a new worker pool
func (*WorkerPool) EnqueueWorkflow ¶
func (p *WorkerPool) EnqueueWorkflow(workflow *model.Workflow, inputData []model.DataItem) (string, error)
EnqueueWorkflow is a helper to enqueue a workflow execution
func (*WorkerPool) GetStats ¶
func (p *WorkerPool) GetStats() map[string]interface{}
GetStats returns worker pool statistics
func (*WorkerPool) Start ¶
func (p *WorkerPool) Start()
Start begins processing jobs with the worker pool
type WorkerRegistration ¶
type WorkerRegistration struct {
Type string `json:"type"` // "register" or "heartbeat"
WorkerID string `json:"worker_id"` // Unique worker ID
Capabilities []string `json:"capabilities"` // Supported node types
MaxConcurrent int `json:"max_concurrent"` // Max concurrent executions
ActiveExecutions int `json:"active_executions"` // Current active count
UptimeSeconds int64 `json:"uptime_seconds"` // Worker uptime
}
WorkerRegistration represents a worker registration message
type WorkerStatus ¶
type WorkerStatus struct {
WorkerID string `json:"worker_id"`
Status string `json:"status"` // "active", "offline"
LastHeartbeat time.Time `json:"last_heartbeat"`
ActiveExecutions int `json:"active_executions"`
TotalExecutions int64 `json:"total_executions"`
MaxConcurrent int `json:"max_concurrent"`
RegisteredAt time.Time `json:"registered_at"`
}
WorkerStatus represents the status of a worker