queue

package
v0.0.0-...-fcaab98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package queue provides job queue implementations for workflow execution.

This file defines the JobQueue interface and Job struct for local workflow execution with optional persistence. Unlike the NNG-based WorkQueue (for distributed workers), JobQueue is for single-node execution with persistence.

Package queue implements NNG-based work distribution for the hybrid architecture.

This package provides: - Work distribution from control plane to workers (PUSH/PULL) - Result collection from workers to control plane (PUSH/PULL) - Worker registration and heartbeat (REQ/REP)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	ID          string           `json:"id"`
	WorkflowID  string           `json:"workflow_id"`
	Workflow    *model.Workflow  `json:"workflow"`
	InputData   []model.DataItem `json:"input_data"`
	Priority    int              `json:"priority"` // Higher = more urgent
	Status      JobStatus        `json:"status"`
	RetryCount  int              `json:"retry_count"`
	MaxRetries  int              `json:"max_retries"`
	ResultData  []model.DataItem `json:"result_data,omitempty"`
	Error       string           `json:"error,omitempty"`
	CreatedAt   time.Time        `json:"created_at"`
	StartedAt   *time.Time       `json:"started_at,omitempty"`
	CompletedAt *time.Time       `json:"completed_at,omitempty"`
}

Job represents a workflow execution job

type JobQueue

type JobQueue interface {
	// Enqueue adds a job to the queue
	Enqueue(job *Job) error

	// Dequeue retrieves and removes the next job from the queue
	// Blocks until a job is available or context is cancelled
	Dequeue(ctx context.Context) (*Job, error)

	// Ack marks a job as successfully completed
	Ack(jobID string, result *JobResult) error

	// Nack marks a job as failed, optionally requeueing for retry
	Nack(jobID string, err error) error

	// GetJob retrieves a job by ID
	GetJob(jobID string) (*Job, error)

	// GetPendingCount returns the number of pending jobs
	GetPendingCount() int

	// GetRunningCount returns the number of running jobs
	GetRunningCount() int

	// ListJobs returns jobs matching the given status (nil for all)
	ListJobs(status *JobStatus, limit int) ([]*Job, error)

	// Close gracefully shuts down the queue
	Close() error
}

JobQueue defines the interface for job queue implementations

type JobResult

type JobResult struct {
	Data  []model.DataItem
	Error error
}

JobResult contains the result of a job execution

type JobStatus

type JobStatus string

JobStatus represents the status of a job

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
)

type MemoryJobQueue

type MemoryJobQueue struct {
	// contains filtered or unexported fields
}

MemoryJobQueue implements JobQueue with in-memory storage Jobs are lost on restart - use SQLiteJobQueue for persistence

func NewMemoryJobQueue

func NewMemoryJobQueue(bufferSize int) *MemoryJobQueue

NewMemoryJobQueue creates a new in-memory job queue

func (*MemoryJobQueue) Ack

func (q *MemoryJobQueue) Ack(jobID string, result *JobResult) error

Ack marks a job as completed

func (*MemoryJobQueue) Close

func (q *MemoryJobQueue) Close() error

Close gracefully shuts down the queue

func (*MemoryJobQueue) Dequeue

func (q *MemoryJobQueue) Dequeue(ctx context.Context) (*Job, error)

Dequeue retrieves the next job from the queue

func (*MemoryJobQueue) Enqueue

func (q *MemoryJobQueue) Enqueue(job *Job) error

Enqueue adds a job to the queue

func (*MemoryJobQueue) GetJob

func (q *MemoryJobQueue) GetJob(jobID string) (*Job, error)

GetJob retrieves a job by ID

func (*MemoryJobQueue) GetPendingCount

func (q *MemoryJobQueue) GetPendingCount() int

GetPendingCount returns the number of pending jobs

func (*MemoryJobQueue) GetRunningCount

func (q *MemoryJobQueue) GetRunningCount() int

GetRunningCount returns the number of running jobs

func (*MemoryJobQueue) ListJobs

func (q *MemoryJobQueue) ListJobs(status *JobStatus, limit int) ([]*Job, error)

ListJobs returns jobs matching the given status

func (*MemoryJobQueue) Nack

func (q *MemoryJobQueue) Nack(jobID string, err error) error

Nack marks a job as failed, optionally requeueing

type ResultHandler

type ResultHandler func(result *ResultMessage) error

ResultHandler is called when a result is received

type ResultMessage

type ResultMessage struct {
	Type        string           `json:"type"`         // "execution_result"
	ExecutionID string           `json:"execution_id"` // Execution ID
	Status      string           `json:"status"`       // "success", "failed", "timeout"
	ResultData  []model.DataItem `json:"result_data"`  // Output data
	Error       string           `json:"error"`        // Error message if failed
	DurationMs  int64            `json:"duration_ms"`  // Execution duration
	WorkerID    string           `json:"worker_id"`    // Which worker executed it
	Timestamp   time.Time        `json:"timestamp"`    // When result was generated
}

ResultMessage represents a result from a worker

type SQLiteJobQueue

type SQLiteJobQueue struct {
	// contains filtered or unexported fields
}

SQLiteJobQueue implements JobQueue with SQLite persistence Jobs are kept in memory for fast access and persisted to SQLite for durability

func NewSQLiteJobQueue

func NewSQLiteJobQueue(dbPath string, bufferSize int) (*SQLiteJobQueue, error)

NewSQLiteJobQueue creates a new SQLite-backed job queue

func (*SQLiteJobQueue) Ack

func (q *SQLiteJobQueue) Ack(jobID string, result *JobResult) error

Ack marks a job as completed

func (*SQLiteJobQueue) Close

func (q *SQLiteJobQueue) Close() error

Close gracefully shuts down the queue

func (*SQLiteJobQueue) Dequeue

func (q *SQLiteJobQueue) Dequeue(ctx context.Context) (*Job, error)

Dequeue retrieves the next job from the queue

func (*SQLiteJobQueue) Enqueue

func (q *SQLiteJobQueue) Enqueue(job *Job) error

Enqueue adds a job to the queue

func (*SQLiteJobQueue) GetJob

func (q *SQLiteJobQueue) GetJob(jobID string) (*Job, error)

GetJob retrieves a job by ID

func (*SQLiteJobQueue) GetPendingCount

func (q *SQLiteJobQueue) GetPendingCount() int

GetPendingCount returns the number of pending jobs

func (*SQLiteJobQueue) GetRunningCount

func (q *SQLiteJobQueue) GetRunningCount() int

GetRunningCount returns the number of running jobs

func (*SQLiteJobQueue) ListJobs

func (q *SQLiteJobQueue) ListJobs(status *JobStatus, limit int) ([]*Job, error)

ListJobs returns jobs matching the given status

func (*SQLiteJobQueue) Nack

func (q *SQLiteJobQueue) Nack(jobID string, err error) error

Nack marks a job as failed, optionally requeueing

type WorkMessage

type WorkMessage struct {
	Type        string           `json:"type"`         // "execute_workflow"
	ExecutionID string           `json:"execution_id"` // Unique execution ID
	WorkflowID  string           `json:"workflow_id"`  // Workflow ID
	Workflow    *model.Workflow  `json:"workflow"`     // Full workflow definition
	InputData   []model.DataItem `json:"input_data"`   // Initial input data
	Priority    string           `json:"priority"`     // "high", "normal", "low"
	Timestamp   time.Time        `json:"timestamp"`    // When work was queued
	RetryCount  int              `json:"retry_count"`  // Number of retries
}

WorkMessage represents a work item to be executed by a worker

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

WorkQueue manages work distribution via NNG

func NewWorkQueue

func NewWorkQueue(workAddr, resultAddr, registrationAddr string) (*WorkQueue, error)

NewWorkQueue creates a new work queue for the control plane

func (*WorkQueue) Close

func (wq *WorkQueue) Close() error

Close gracefully shuts down the work queue

func (*WorkQueue) GetWorkerCount

func (wq *WorkQueue) GetWorkerCount() (active int, total int)

GetWorkerCount returns the number of active workers

func (*WorkQueue) GetWorkers

func (wq *WorkQueue) GetWorkers() []*WorkerStatus

GetWorkers returns the current worker pool status

func (*WorkQueue) PushWork

func (wq *WorkQueue) PushWork(work *WorkMessage) error

PushWork pushes a work item to the queue

func (*WorkQueue) SetResultHandler

func (wq *WorkQueue) SetResultHandler(handler ResultHandler)

SetResultHandler sets the handler for received results

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool processes jobs from a JobQueue using multiple workers

func NewWorkerPool

func NewWorkerPool(queue JobQueue, eng engine.WorkflowEngine, numWorkers int) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) EnqueueWorkflow

func (p *WorkerPool) EnqueueWorkflow(workflow *model.Workflow, inputData []model.DataItem) (string, error)

EnqueueWorkflow is a helper to enqueue a workflow execution

func (*WorkerPool) GetStats

func (p *WorkerPool) GetStats() map[string]interface{}

GetStats returns worker pool statistics

func (*WorkerPool) Start

func (p *WorkerPool) Start()

Start begins processing jobs with the worker pool

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop gracefully stops the worker pool

type WorkerRegistration

type WorkerRegistration struct {
	Type             string   `json:"type"`              // "register" or "heartbeat"
	WorkerID         string   `json:"worker_id"`         // Unique worker ID
	Capabilities     []string `json:"capabilities"`      // Supported node types
	MaxConcurrent    int      `json:"max_concurrent"`    // Max concurrent executions
	ActiveExecutions int      `json:"active_executions"` // Current active count
	UptimeSeconds    int64    `json:"uptime_seconds"`    // Worker uptime
}

WorkerRegistration represents a worker registration message

type WorkerStatus

type WorkerStatus struct {
	WorkerID         string    `json:"worker_id"`
	Status           string    `json:"status"` // "active", "offline"
	LastHeartbeat    time.Time `json:"last_heartbeat"`
	ActiveExecutions int       `json:"active_executions"`
	TotalExecutions  int64     `json:"total_executions"`
	MaxConcurrent    int       `json:"max_concurrent"`
	RegisteredAt     time.Time `json:"registered_at"`
}

WorkerStatus represents the status of a worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL