job

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrJobAlreadyExists = errors.New("job already exists")

ErrJobAlreadyExists indicates the job ID already exists in persistent storage.

View Source
var ErrJobNotFound = errors.New("job not found")

ErrJobNotFound indicates the requested job does not exist.

Functions

func RegisterHandler

func RegisterHandler[T any](m *JobManager, ctx context.Context, jobType string, handler TypedHandler[T]) error

RegisterHandler registers a type-safe handler for a job type. The handler will be called with deserialized arguments of type T.

Types

type DefaultLogger

type DefaultLogger struct{}

DefaultLogger is a simple logger implementation using the standard log package

func NewDefaultLogger

func NewDefaultLogger() *DefaultLogger

NewDefaultLogger creates a new DefaultLogger

func (*DefaultLogger) Debug

func (l *DefaultLogger) Debug(msg string, fields ...Field)

Debug logs a debug message

func (*DefaultLogger) Error

func (l *DefaultLogger) Error(msg string, fields ...Field)

Error logs an error message

func (*DefaultLogger) Info

func (l *DefaultLogger) Info(msg string, fields ...Field)

Info logs an info message

func (*DefaultLogger) Warn

func (l *DefaultLogger) Warn(msg string, fields ...Field)

Warn logs a warning message

type DurableJobStore

type DurableJobStore interface {
	CreateJob(ctx context.Context, job *JobEntity) error
}

DurableJobStore provides atomic create semantics for durable-ack submission. Implement this interface to guarantee CreateJob fails when a job ID already exists.

type ErrorHandler

type ErrorHandler func(jobID string, operation string, err error)

ErrorHandler is called when an IO operation fails after all retries

type Field

type Field struct {
	Key   string
	Value any
}

Field represents a key-value pair for structured logging

func F

func F(key string, value any) Field

F creates a new Field with the given key and value

type JSONSerializer

type JSONSerializer struct{}

JSONSerializer uses JSON encoding for serialization.

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new JSON serializer

func (*JSONSerializer) Deserialize

func (s *JSONSerializer) Deserialize(data []byte, target any) error

func (*JSONSerializer) Name

func (s *JSONSerializer) Name() string

func (*JSONSerializer) Serialize

func (s *JSONSerializer) Serialize(args any) ([]byte, error)

type JobEntity

type JobEntity struct {
	ID        string
	Type      string
	ArgsData  []byte
	Status    JobStatus
	Result    string
	Priority  int
	CreatedAt time.Time
	UpdatedAt time.Time
}

type JobFilter

type JobFilter struct {
	Status JobStatus // Empty means all
	Type   string    // Empty means all
	Limit  int       // 0 means no limit
	Offset int       // Default 0
}

type JobManager

type JobManager struct {
	// contains filtered or unexported fields
}

JobManager manages job lifecycle using a three-layer runner architecture: - Layer 1 (controlRunner): Fast control operations (<100μs, pure memory) - Layer 2 (ioRunner): Sequential IO operations (database, file, network) - Layer 3 (executionRunner): User job execution (may be slow/blocking)

func NewJobManager

func NewJobManager(
	controlRunner taskrunner.TaskRunner,
	ioRunner taskrunner.TaskRunner,
	executionRunner taskrunner.TaskRunner,
	store JobStore,
	serializer JobSerializer,
) *JobManager

NewJobManager creates a new JobManager with the three-layer runner architecture

func (*JobManager) CancelJob

func (m *JobManager) CancelJob(ctx context.Context, id string) error

CancelJob cancels an active job

func (*JobManager) GetActiveJobCount

func (m *JobManager) GetActiveJobCount() int

GetActiveJobCount returns the number of active jobs

func (*JobManager) GetActiveJobs

func (m *JobManager) GetActiveJobs() []*JobEntity

GetActiveJobs returns a snapshot of active jobs

func (*JobManager) GetJob

func (m *JobManager) GetJob(ctx context.Context, id string) (*JobEntity, error)

GetJob retrieves a job by ID

func (*JobManager) GetRetryPolicy

func (m *JobManager) GetRetryPolicy() RetryPolicy

func (*JobManager) ListJobs

func (m *JobManager) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)

ListJobs returns jobs matching the filter (allows slight delay from recent writes)

func (*JobManager) SetErrorHandler

func (m *JobManager) SetErrorHandler(handler ErrorHandler)

SetErrorHandler sets a custom error handler for failed IO operations

func (*JobManager) SetLogger

func (m *JobManager) SetLogger(logger Logger)

SetLogger sets the logger for JobManager

func (*JobManager) SetRetryPolicy

func (m *JobManager) SetRetryPolicy(policy RetryPolicy)

SetRetryPolicy sets the retry policy for IO operations

func (*JobManager) SetShutdownRunners

func (m *JobManager) SetShutdownRunners(own bool)

func (*JobManager) Shutdown

func (m *JobManager) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the JobManager

func (*JobManager) Start

func (m *JobManager) Start(ctx context.Context) error

Start initializes the JobManager and recovers unfinished jobs

func (*JobManager) SubmitDelayedJob

func (m *JobManager) SubmitDelayedJob(
	ctx context.Context,
	id string,
	jobType string,
	args any,
	delay time.Duration,
	traits taskrunner.TaskTraits,
) error

SubmitDelayedJob submits a job with a delay before execution

func (*JobManager) SubmitJob

func (m *JobManager) SubmitJob(ctx context.Context, id string, jobType string, args any, traits taskrunner.TaskTraits) error

SubmitJob submits a job for immediate execution

func (*JobManager) UnregisterHandler

func (m *JobManager) UnregisterHandler(ctx context.Context, jobType string) error

UnregisterHandler removes a previously registered handler for a job type.

type JobSerializer

type JobSerializer interface {
	// Serialize converts a Go value to bytes
	Serialize(args any) ([]byte, error)

	// Deserialize converts bytes back to a Go value
	Deserialize(data []byte, target any) error

	// Name returns the serializer name (for debugging/logging)
	Name() string
}

JobSerializer defines the interface for serializing and deserializing job arguments.

type JobStatus

type JobStatus string
const (
	JobStatusPending   JobStatus = "PENDING"
	JobStatusRunning   JobStatus = "RUNNING"
	JobStatusCompleted JobStatus = "COMPLETED"
	JobStatusFailed    JobStatus = "FAILED"
	JobStatusCanceled  JobStatus = "CANCELED"
)

func (JobStatus) IsValid

func (s JobStatus) IsValid() bool

func (JobStatus) String

func (s JobStatus) String() string

type JobStore

type JobStore interface {
	// SaveJob saves a new job or updates an existing one
	SaveJob(ctx context.Context, job *JobEntity) error

	// UpdateStatus updates the status and result of a job
	UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error

	// GetJob retrieves a job by ID
	GetJob(ctx context.Context, id string) (*JobEntity, error)

	// ListJobs returns jobs matching the filter
	ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)

	// GetRecoverableJobs returns jobs that should be recovered on startup
	// (typically PENDING jobs)
	GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)

	// DeleteJob removes a job from storage
	DeleteJob(ctx context.Context, id string) error
}

JobStore defines the interface for job persistence. Implementations can use in-memory storage, databases, or other backends.

type Logger

type Logger interface {
	// Debug logs a debug message with optional fields
	Debug(msg string, fields ...Field)

	// Info logs an info message with optional fields
	Info(msg string, fields ...Field)

	// Warn logs a warning message with optional fields
	Warn(msg string, fields ...Field)

	// Error logs an error message with optional fields
	Error(msg string, fields ...Field)
}

Logger interface for structured logging Implementations can provide custom logging behavior (e.g., integration with logrus, zap, etc.)

type MemoryJobStore

type MemoryJobStore struct {
	// contains filtered or unexported fields
}

MemoryJobStore is an in-memory implementation of JobStore. It uses sync.Map for concurrent-safe storage.

func NewMemoryJobStore

func NewMemoryJobStore() *MemoryJobStore

NewMemoryJobStore creates a new in-memory job store

func (*MemoryJobStore) Clear

func (s *MemoryJobStore) Clear()

Clear removes all jobs from the store (useful for testing)

func (*MemoryJobStore) Count

func (s *MemoryJobStore) Count() int

Count returns the total number of jobs in the store

func (*MemoryJobStore) CreateJob

func (s *MemoryJobStore) CreateJob(ctx context.Context, job *JobEntity) error

CreateJob inserts a new job atomically. Returns ErrJobAlreadyExists if the ID already exists.

func (*MemoryJobStore) DeleteJob

func (s *MemoryJobStore) DeleteJob(ctx context.Context, id string) error

func (*MemoryJobStore) GetJob

func (s *MemoryJobStore) GetJob(ctx context.Context, id string) (*JobEntity, error)

func (*MemoryJobStore) GetRecoverableJobs

func (s *MemoryJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)

func (*MemoryJobStore) ListJobs

func (s *MemoryJobStore) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)

ListJobs returns jobs matching the filter. Results are not ordered; pagination with Offset/Limit may return inconsistent results across calls. Use for testing only.

func (*MemoryJobStore) SaveJob

func (s *MemoryJobStore) SaveJob(ctx context.Context, job *JobEntity) error

func (*MemoryJobStore) UpdateStatus

func (s *MemoryJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger is a logger that discards all log messages Useful for tests or when logging is not desired

func NewNoOpLogger

func NewNoOpLogger() *NoOpLogger

NewNoOpLogger creates a new NoOpLogger

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(msg string, fields ...Field)

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(msg string, fields ...Field)

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(msg string, fields ...Field)

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(msg string, fields ...Field)

type RawJobHandler

type RawJobHandler func(ctx context.Context, args []byte) error

RawJobHandler is the internal handler type that works with raw bytes

type RetryPolicy

type RetryPolicy struct {
	// MaxRetries is the maximum number of retry attempts (0 = no retry, 1 = one retry)
	MaxRetries int

	// InitialDelay is the delay before the first retry
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries
	MaxDelay time.Duration

	// BackoffRatio is the multiplier for delay after each retry (e.g., 2.0 for exponential)
	// For example, with InitialDelay=100ms and BackoffRatio=2.0:
	// - Retry 1 delay: 100ms
	// - Retry 2 delay: 200ms
	// - Retry 3 delay: 400ms (capped by MaxDelay)
	BackoffRatio float64
}

RetryPolicy defines retry behavior for IO operations

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy

func NoRetry

func NoRetry() RetryPolicy

NoRetry returns a retry policy with no retries

type SQLiteJobStore added in v0.4.1

type SQLiteJobStore struct {
	// contains filtered or unexported fields
}

SQLiteJobStore is a SQLite-backed implementation of JobStore and DurableJobStore.

func NewSQLiteJobStore added in v0.4.1

func NewSQLiteJobStore(sqlDB *sql.DB) (*SQLiteJobStore, error)

NewSQLiteJobStore initialises the jobs table and returns a ready store. Pass an *sql.DB opened with driver "sqlite" (modernc.org/sqlite).

func (*SQLiteJobStore) CreateJob added in v0.4.1

func (s *SQLiteJobStore) CreateJob(ctx context.Context, job *JobEntity) error

CreateJob implements DurableJobStore — fails if the ID already exists.

func (*SQLiteJobStore) DeleteJob added in v0.4.1

func (s *SQLiteJobStore) DeleteJob(ctx context.Context, id string) error

DeleteJob implements JobStore.

func (*SQLiteJobStore) GetJob added in v0.4.1

func (s *SQLiteJobStore) GetJob(ctx context.Context, id string) (*JobEntity, error)

GetJob implements JobStore.

func (*SQLiteJobStore) GetRecoverableJobs added in v0.4.1

func (s *SQLiteJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)

GetRecoverableJobs implements JobStore — returns only PENDING jobs.

func (*SQLiteJobStore) ListJobs added in v0.4.1

func (s *SQLiteJobStore) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)

ListJobs implements JobStore with filter dispatching.

func (*SQLiteJobStore) SaveJob added in v0.4.1

func (s *SQLiteJobStore) SaveJob(ctx context.Context, job *JobEntity) error

SaveJob implements JobStore — upserts (insert or replace).

func (*SQLiteJobStore) UpdateStatus added in v0.4.1

func (s *SQLiteJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error

UpdateStatus implements JobStore.

type TypedHandler

type TypedHandler[T any] func(ctx context.Context, args T) error

TypedHandler is a generic handler type for type-safe job handlers

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL