Documentation
¶
Index ¶
- Variables
- func RegisterHandler[T any](m *JobManager, ctx context.Context, jobType string, handler TypedHandler[T]) error
- type DefaultLogger
- type DurableJobStore
- type ErrorHandler
- type Field
- type JSONSerializer
- type JobEntity
- type JobFilter
- type JobManager
- func (m *JobManager) CancelJob(ctx context.Context, id string) error
- func (m *JobManager) GetActiveJobCount() int
- func (m *JobManager) GetActiveJobs() []*JobEntity
- func (m *JobManager) GetJob(ctx context.Context, id string) (*JobEntity, error)
- func (m *JobManager) GetRetryPolicy() RetryPolicy
- func (m *JobManager) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
- func (m *JobManager) SetErrorHandler(handler ErrorHandler)
- func (m *JobManager) SetLogger(logger Logger)
- func (m *JobManager) SetRetryPolicy(policy RetryPolicy)
- func (m *JobManager) SetShutdownRunners(own bool)
- func (m *JobManager) Shutdown(ctx context.Context) error
- func (m *JobManager) Start(ctx context.Context) error
- func (m *JobManager) SubmitDelayedJob(ctx context.Context, id string, jobType string, args any, delay time.Duration, ...) error
- func (m *JobManager) SubmitJob(ctx context.Context, id string, jobType string, args any, ...) error
- func (m *JobManager) UnregisterHandler(ctx context.Context, jobType string) error
- type JobSerializer
- type JobStatus
- type JobStore
- type Logger
- type MemoryJobStore
- func (s *MemoryJobStore) Clear()
- func (s *MemoryJobStore) Count() int
- func (s *MemoryJobStore) CreateJob(ctx context.Context, job *JobEntity) error
- func (s *MemoryJobStore) DeleteJob(ctx context.Context, id string) error
- func (s *MemoryJobStore) GetJob(ctx context.Context, id string) (*JobEntity, error)
- func (s *MemoryJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
- func (s *MemoryJobStore) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
- func (s *MemoryJobStore) SaveJob(ctx context.Context, job *JobEntity) error
- func (s *MemoryJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error
- type NoOpLogger
- type RawJobHandler
- type RetryPolicy
- type SQLiteJobStore
- func (s *SQLiteJobStore) CreateJob(ctx context.Context, job *JobEntity) error
- func (s *SQLiteJobStore) DeleteJob(ctx context.Context, id string) error
- func (s *SQLiteJobStore) GetJob(ctx context.Context, id string) (*JobEntity, error)
- func (s *SQLiteJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
- func (s *SQLiteJobStore) ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
- func (s *SQLiteJobStore) SaveJob(ctx context.Context, job *JobEntity) error
- func (s *SQLiteJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error
- type TypedHandler
Constants ¶
This section is empty.
Variables ¶
var ErrJobAlreadyExists = errors.New("job already exists")
ErrJobAlreadyExists indicates the job ID already exists in persistent storage.
var ErrJobNotFound = errors.New("job not found")
ErrJobNotFound indicates the requested job does not exist.
Functions ¶
func RegisterHandler ¶
func RegisterHandler[T any](m *JobManager, ctx context.Context, jobType string, handler TypedHandler[T]) error
RegisterHandler registers a type-safe handler for a job type. The handler will be called with deserialized arguments of type T.
Types ¶
type DefaultLogger ¶
type DefaultLogger struct{}
DefaultLogger is a simple logger implementation using the standard log package
func NewDefaultLogger ¶
func NewDefaultLogger() *DefaultLogger
NewDefaultLogger creates a new DefaultLogger
func (*DefaultLogger) Debug ¶
func (l *DefaultLogger) Debug(msg string, fields ...Field)
Debug logs a debug message
func (*DefaultLogger) Error ¶
func (l *DefaultLogger) Error(msg string, fields ...Field)
Error logs an error message
func (*DefaultLogger) Info ¶
func (l *DefaultLogger) Info(msg string, fields ...Field)
Info logs an info message
func (*DefaultLogger) Warn ¶
func (l *DefaultLogger) Warn(msg string, fields ...Field)
Warn logs a warning message
type DurableJobStore ¶
DurableJobStore provides atomic create semantics for durable-ack submission. Implement this interface to guarantee CreateJob fails when a job ID already exists.
type ErrorHandler ¶
ErrorHandler is called when an IO operation fails after all retries
type JSONSerializer ¶
type JSONSerializer struct{}
JSONSerializer uses JSON encoding for serialization.
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSON serializer
func (*JSONSerializer) Deserialize ¶
func (s *JSONSerializer) Deserialize(data []byte, target any) error
func (*JSONSerializer) Name ¶
func (s *JSONSerializer) Name() string
type JobManager ¶
type JobManager struct {
// contains filtered or unexported fields
}
JobManager manages job lifecycle using a three-layer runner architecture: - Layer 1 (controlRunner): Fast control operations (<100μs, pure memory) - Layer 2 (ioRunner): Sequential IO operations (database, file, network) - Layer 3 (executionRunner): User job execution (may be slow/blocking)
func NewJobManager ¶
func NewJobManager( controlRunner taskrunner.TaskRunner, ioRunner taskrunner.TaskRunner, executionRunner taskrunner.TaskRunner, store JobStore, serializer JobSerializer, ) *JobManager
NewJobManager creates a new JobManager with the three-layer runner architecture
func (*JobManager) CancelJob ¶
func (m *JobManager) CancelJob(ctx context.Context, id string) error
CancelJob cancels an active job
func (*JobManager) GetActiveJobCount ¶
func (m *JobManager) GetActiveJobCount() int
GetActiveJobCount returns the number of active jobs
func (*JobManager) GetActiveJobs ¶
func (m *JobManager) GetActiveJobs() []*JobEntity
GetActiveJobs returns a snapshot of active jobs
func (*JobManager) GetRetryPolicy ¶
func (m *JobManager) GetRetryPolicy() RetryPolicy
func (*JobManager) ListJobs ¶
ListJobs returns jobs matching the filter (allows slight delay from recent writes)
func (*JobManager) SetErrorHandler ¶
func (m *JobManager) SetErrorHandler(handler ErrorHandler)
SetErrorHandler sets a custom error handler for failed IO operations
func (*JobManager) SetLogger ¶
func (m *JobManager) SetLogger(logger Logger)
SetLogger sets the logger for JobManager
func (*JobManager) SetRetryPolicy ¶
func (m *JobManager) SetRetryPolicy(policy RetryPolicy)
SetRetryPolicy sets the retry policy for IO operations
func (*JobManager) SetShutdownRunners ¶
func (m *JobManager) SetShutdownRunners(own bool)
func (*JobManager) Shutdown ¶
func (m *JobManager) Shutdown(ctx context.Context) error
Shutdown gracefully shuts down the JobManager
func (*JobManager) Start ¶
func (m *JobManager) Start(ctx context.Context) error
Start initializes the JobManager and recovers unfinished jobs
func (*JobManager) SubmitDelayedJob ¶
func (m *JobManager) SubmitDelayedJob( ctx context.Context, id string, jobType string, args any, delay time.Duration, traits taskrunner.TaskTraits, ) error
SubmitDelayedJob submits a job with a delay before execution
func (*JobManager) SubmitJob ¶
func (m *JobManager) SubmitJob(ctx context.Context, id string, jobType string, args any, traits taskrunner.TaskTraits) error
SubmitJob submits a job for immediate execution
func (*JobManager) UnregisterHandler ¶
func (m *JobManager) UnregisterHandler(ctx context.Context, jobType string) error
UnregisterHandler removes a previously registered handler for a job type.
type JobSerializer ¶
type JobSerializer interface {
// Serialize converts a Go value to bytes
Serialize(args any) ([]byte, error)
// Deserialize converts bytes back to a Go value
Deserialize(data []byte, target any) error
// Name returns the serializer name (for debugging/logging)
Name() string
}
JobSerializer defines the interface for serializing and deserializing job arguments.
type JobStore ¶
type JobStore interface {
// SaveJob saves a new job or updates an existing one
SaveJob(ctx context.Context, job *JobEntity) error
// UpdateStatus updates the status and result of a job
UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error
// GetJob retrieves a job by ID
GetJob(ctx context.Context, id string) (*JobEntity, error)
// ListJobs returns jobs matching the filter
ListJobs(ctx context.Context, filter JobFilter) ([]*JobEntity, error)
// GetRecoverableJobs returns jobs that should be recovered on startup
// (typically PENDING jobs)
GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
// DeleteJob removes a job from storage
DeleteJob(ctx context.Context, id string) error
}
JobStore defines the interface for job persistence. Implementations can use in-memory storage, databases, or other backends.
type Logger ¶
type Logger interface {
// Debug logs a debug message with optional fields
Debug(msg string, fields ...Field)
// Info logs an info message with optional fields
Info(msg string, fields ...Field)
// Warn logs a warning message with optional fields
Warn(msg string, fields ...Field)
// Error logs an error message with optional fields
Error(msg string, fields ...Field)
}
Logger interface for structured logging Implementations can provide custom logging behavior (e.g., integration with logrus, zap, etc.)
type MemoryJobStore ¶
type MemoryJobStore struct {
// contains filtered or unexported fields
}
MemoryJobStore is an in-memory implementation of JobStore. It uses sync.Map for concurrent-safe storage.
func NewMemoryJobStore ¶
func NewMemoryJobStore() *MemoryJobStore
NewMemoryJobStore creates a new in-memory job store
func (*MemoryJobStore) Clear ¶
func (s *MemoryJobStore) Clear()
Clear removes all jobs from the store (useful for testing)
func (*MemoryJobStore) Count ¶
func (s *MemoryJobStore) Count() int
Count returns the total number of jobs in the store
func (*MemoryJobStore) CreateJob ¶
func (s *MemoryJobStore) CreateJob(ctx context.Context, job *JobEntity) error
CreateJob inserts a new job atomically. Returns ErrJobAlreadyExists if the ID already exists.
func (*MemoryJobStore) DeleteJob ¶
func (s *MemoryJobStore) DeleteJob(ctx context.Context, id string) error
func (*MemoryJobStore) GetRecoverableJobs ¶
func (s *MemoryJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
func (*MemoryJobStore) ListJobs ¶
ListJobs returns jobs matching the filter. Results are not ordered; pagination with Offset/Limit may return inconsistent results across calls. Use for testing only.
func (*MemoryJobStore) SaveJob ¶
func (s *MemoryJobStore) SaveJob(ctx context.Context, job *JobEntity) error
func (*MemoryJobStore) UpdateStatus ¶
type NoOpLogger ¶
type NoOpLogger struct{}
NoOpLogger is a logger that discards all log messages Useful for tests or when logging is not desired
func (*NoOpLogger) Debug ¶
func (l *NoOpLogger) Debug(msg string, fields ...Field)
func (*NoOpLogger) Error ¶
func (l *NoOpLogger) Error(msg string, fields ...Field)
func (*NoOpLogger) Info ¶
func (l *NoOpLogger) Info(msg string, fields ...Field)
func (*NoOpLogger) Warn ¶
func (l *NoOpLogger) Warn(msg string, fields ...Field)
type RawJobHandler ¶
RawJobHandler is the internal handler type that works with raw bytes
type RetryPolicy ¶
type RetryPolicy struct {
// MaxRetries is the maximum number of retry attempts (0 = no retry, 1 = one retry)
MaxRetries int
// InitialDelay is the delay before the first retry
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// BackoffRatio is the multiplier for delay after each retry (e.g., 2.0 for exponential)
// For example, with InitialDelay=100ms and BackoffRatio=2.0:
// - Retry 1 delay: 100ms
// - Retry 2 delay: 200ms
// - Retry 3 delay: 400ms (capped by MaxDelay)
BackoffRatio float64
}
RetryPolicy defines retry behavior for IO operations
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy
type SQLiteJobStore ¶ added in v0.4.1
type SQLiteJobStore struct {
// contains filtered or unexported fields
}
SQLiteJobStore is a SQLite-backed implementation of JobStore and DurableJobStore.
func NewSQLiteJobStore ¶ added in v0.4.1
func NewSQLiteJobStore(sqlDB *sql.DB) (*SQLiteJobStore, error)
NewSQLiteJobStore initialises the jobs table and returns a ready store. Pass an *sql.DB opened with driver "sqlite" (modernc.org/sqlite).
func (*SQLiteJobStore) CreateJob ¶ added in v0.4.1
func (s *SQLiteJobStore) CreateJob(ctx context.Context, job *JobEntity) error
CreateJob implements DurableJobStore — fails if the ID already exists.
func (*SQLiteJobStore) DeleteJob ¶ added in v0.4.1
func (s *SQLiteJobStore) DeleteJob(ctx context.Context, id string) error
DeleteJob implements JobStore.
func (*SQLiteJobStore) GetRecoverableJobs ¶ added in v0.4.1
func (s *SQLiteJobStore) GetRecoverableJobs(ctx context.Context) ([]*JobEntity, error)
GetRecoverableJobs implements JobStore — returns only PENDING jobs.
func (*SQLiteJobStore) ListJobs ¶ added in v0.4.1
ListJobs implements JobStore with filter dispatching.
func (*SQLiteJobStore) SaveJob ¶ added in v0.4.1
func (s *SQLiteJobStore) SaveJob(ctx context.Context, job *JobEntity) error
SaveJob implements JobStore — upserts (insert or replace).
func (*SQLiteJobStore) UpdateStatus ¶ added in v0.4.1
func (s *SQLiteJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, result string) error
UpdateStatus implements JobStore.