types

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrJobNotFound            = errors.New("dureq: job not found")
	ErrScheduleNotFound       = errors.New("dureq: schedule not found")
	ErrRunNotFound            = errors.New("dureq: run not found")
	ErrDuplicateJob           = errors.New("dureq: duplicate unique key")
	ErrHandlerNotFound        = errors.New("dureq: handler not registered for task type")
	ErrNotLeader              = errors.New("dureq: not the leader node")
	ErrLockFailed             = errors.New("dureq: failed to acquire lock")
	ErrLockNotHeld            = errors.New("dureq: lock not held")
	ErrServerStopped          = errors.New("dureq: server stopped")
	ErrInvalidSchedule        = errors.New("dureq: invalid schedule")
	ErrInvalidJobStatus       = errors.New("dureq: invalid job status transition")
	ErrRedisNotConnected      = errors.New("dureq: Redis not connected")
	ErrWorkflowNotFound       = errors.New("dureq: workflow not found")
	ErrCyclicDependency       = errors.New("dureq: workflow has cyclic dependencies")
	ErrBatchNotFound          = errors.New("dureq: batch not found")
	ErrExecutionTimedOut      = errors.New("dureq: execution timed out")
	ErrScheduleToStartTimeout = errors.New("dureq: schedule-to-start timeout exceeded")
	ErrNoProgressReporter     = errors.New("dureq: no progress reporter in context (called outside handler?)")
)

Sentinel errors.

Functions

func GetAttempt

func GetAttempt(ctx context.Context) int

GetAttempt extracts the attempt number from the context.

func GetErrorClassString

func GetErrorClassString(errClass ErrorClassification) string

GetErrorClassString extracts the error string from an ErrorClassification.

func GetHeaders

func GetHeaders(ctx context.Context) map[string]string

GetHeaders extracts the headers map from the context.

func GetJobID

func GetJobID(ctx context.Context) string

GetJobID extracts the job ID from the context.

func GetMaxRetry

func GetMaxRetry(ctx context.Context) int

GetMaxRetry extracts the max retry count from the context.

func GetNodeID

func GetNodeID(ctx context.Context) string

GetNodeID extracts the node ID from the context.

func GetRetryAfter

func GetRetryAfter(err error) time.Duration

GetRetryAfter extracts the RetryAfter duration from a RateLimitedError.

func GetRunID

func GetRunID(ctx context.Context) string

GetRunID extracts the run ID from the context.

func IsPanicError

func IsPanicError(err error) bool

IsPanicError checks whether err (or any wrapped error) is a PanicError.

func ReportProgress

func ReportProgress(ctx context.Context, data any) error

ReportProgress reports user-defined progress data for the current run. The data is stored alongside the run's heartbeat and is accessible via the monitoring API and client SDK.

func WithAttempt

func WithAttempt(ctx context.Context, attempt int) context.Context

WithAttempt stores the attempt number in the context.

func WithHeaders

func WithHeaders(ctx context.Context, headers map[string]string) context.Context

WithHeaders stores the headers map in the context.

func WithJobID

func WithJobID(ctx context.Context, id string) context.Context

WithJobID stores the job ID in the context.

func WithMaxRetry

func WithMaxRetry(ctx context.Context, max int) context.Context

WithMaxRetry stores the max retry count in the context.

func WithNodeID

func WithNodeID(ctx context.Context, id string) context.Context

WithNodeID stores the node ID in the context.

func WithPriority

func WithPriority(ctx context.Context, p Priority) context.Context

WithPriority stores the priority in the context.

func WithProgressReporter

func WithProgressReporter(ctx context.Context, fn ProgressReporterFunc) context.Context

WithProgressReporter stores the progress reporter function in the context.

func WithRunID

func WithRunID(ctx context.Context, id string) context.Context

WithRunID stores the run ID in the context.

func WithTaskType

func WithTaskType(ctx context.Context, tt TaskType) context.Context

WithTaskType stores the task type in the context.

Types

type AggregateResult

type AggregateResult struct {
	Group             string          `json:"group"`
	Count             int             `json:"count"`
	AggregatedPayload json.RawMessage `json:"aggregated_payload"`
}

AggregateResult is the outcome of flushing a group.

type AtTime

type AtTime struct {
	Hour   uint `json:"hour"`
	Minute uint `json:"minute"`
	Second uint `json:"second"`
}

AtTime represents a time of day as hour, minute, second.

type BatchDefinition

type BatchDefinition struct {
	Name string `json:"name"`

	// OnetimeTaskType is the handler for shared preprocessing (runs once before items).
	OnetimeTaskType *TaskType       `json:"onetime_task_type,omitempty"`
	OnetimePayload  json.RawMessage `json:"onetime_payload,omitempty"`

	// ItemTaskType is the handler that processes each batch item.
	ItemTaskType TaskType    `json:"item_task_type"`
	Items        []BatchItem `json:"items"`

	// FailurePolicy controls partial failure behavior. Default: continue_on_error.
	FailurePolicy BatchFailurePolicy `json:"failure_policy,omitempty"`

	// ChunkSize controls how many items are dispatched concurrently. Default: 100.
	ChunkSize int `json:"chunk_size,omitempty"`

	// ItemRetryPolicy for individual item failures.
	ItemRetryPolicy *RetryPolicy `json:"item_retry_policy,omitempty"`

	// Execution timeout for the entire batch.
	ExecutionTimeout *Duration `json:"execution_timeout,omitempty"`

	// RetryPolicy for retrying the entire batch on failure.
	RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`

	// DefaultPriority for all jobs created by this batch.
	DefaultPriority *Priority `json:"default_priority,omitempty"`
}

BatchDefinition describes a batch processing job to submit.

type BatchFailurePolicy

type BatchFailurePolicy string

BatchFailurePolicy controls behavior when a batch item fails.

const (
	// BatchFailFast stops the batch immediately on first item failure.
	BatchFailFast BatchFailurePolicy = "fail_fast"
	// BatchContinueOnError continues processing remaining items.
	BatchContinueOnError BatchFailurePolicy = "continue_on_error"
)

type BatchInstance

type BatchInstance struct {
	ID         string          `json:"id"`
	Name       string          `json:"name"`
	Status     WorkflowStatus  `json:"status"`
	Definition BatchDefinition `json:"definition"`

	// Onetime preprocessing state.
	OnetimeState *BatchOnetimeState `json:"onetime_state,omitempty"`

	// Per-item tracking.
	ItemStates map[string]BatchItemState `json:"item_states"`

	// Progress counters.
	TotalItems     int `json:"total_items"`
	CompletedItems int `json:"completed_items"`
	FailedItems    int `json:"failed_items"`
	RunningItems   int `json:"running_items"`
	PendingItems   int `json:"pending_items"`

	// Chunk tracking — index into Definition.Items for the next chunk.
	NextChunkIndex int `json:"next_chunk_index"`

	// Timeout and retry tracking.
	Deadline    *time.Time `json:"deadline,omitempty"`
	Attempt     int        `json:"attempt,omitempty"`
	MaxAttempts int        `json:"max_attempts,omitempty"`

	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
}

BatchInstance is the runtime state of a batch being processed.

type BatchItem

type BatchItem struct {
	ID      string          `json:"id"`
	Payload json.RawMessage `json:"payload"`
}

BatchItem is a single item within a batch.

type BatchItemResult

type BatchItemResult struct {
	BatchID string          `json:"batch_id"`
	ItemID  string          `json:"item_id"`
	Success bool            `json:"success"`
	Output  json.RawMessage `json:"output,omitempty"`
	Error   *string         `json:"error,omitempty"`
}

BatchItemResult stores the output of a single batch item execution.

type BatchItemState

type BatchItemState struct {
	ItemID     string     `json:"item_id"`
	JobID      string     `json:"job_id,omitempty"`
	Status     JobStatus  `json:"status"`
	Error      *string    `json:"error,omitempty"`
	StartedAt  *time.Time `json:"started_at,omitempty"`
	FinishedAt *time.Time `json:"finished_at,omitempty"`
}

BatchItemState tracks an individual item within a batch.

type BatchOnetimeState

type BatchOnetimeState struct {
	JobID      string          `json:"job_id,omitempty"`
	Status     JobStatus       `json:"status"`
	ResultData json.RawMessage `json:"result_data,omitempty"`
	Error      *string         `json:"error,omitempty"`
	StartedAt  *time.Time      `json:"started_at,omitempty"`
	FinishedAt *time.Time      `json:"finished_at,omitempty"`
}

BatchOnetimeState tracks the shared preprocessing step.

type BatchProgress

type BatchProgress struct {
	BatchID string `json:"batch_id"`
	Done    int    `json:"done"`
	Total   int    `json:"total"`
	Failed  int    `json:"failed"`
	Step    string `json:"step,omitempty"`
}

BatchProgress is published as an event for progress tracking.

type Duration

type Duration time.Duration

Duration is a JSON-friendly time.Duration.

func (Duration) MarshalJSON

func (d Duration) MarshalJSON() ([]byte, error)

func (Duration) Std

func (d Duration) Std() time.Duration

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(b []byte) error

type EnqueueGroupOption

type EnqueueGroupOption struct {
	Group    string // the group name (required)
	TaskType TaskType
	Payload  json.RawMessage
}

EnqueueGroupOption configures a grouped task enqueue.

type ErrorClassification

type ErrorClassification int

ErrorClassification categorizes errors for retry decisions.

const (
	ErrorClassRetryable    ErrorClassification = iota // transient, should retry
	ErrorClassNonRetryable                            // permanent, do not retry
	ErrorClassRateLimited                             // retry after specific delay
)

func ClassifyError

func ClassifyError(err error) ErrorClassification

ClassifyError determines the retry classification of an error.

type EventType

type EventType string

EventType identifies the kind of event published on the DUREQ_EVENTS stream.

const (
	EventJobEnqueued   EventType = "job.enqueued"
	EventJobScheduled  EventType = "job.scheduled"
	EventJobDispatched EventType = "job.dispatched"
	EventJobStarted    EventType = "job.started"
	EventJobCompleted  EventType = "job.completed"
	EventJobFailed     EventType = "job.failed"
	EventJobRetrying   EventType = "job.retrying"
	EventJobDead       EventType = "job.dead"
	EventJobCancelled  EventType = "job.cancelled"

	EventNodeJoined    EventType = "node.joined"
	EventNodeLeft      EventType = "node.left"
	EventLeaderElected EventType = "leader.elected"
	EventLeaderLost    EventType = "leader.lost"

	EventScheduleCreated EventType = "schedule.created"
	EventScheduleRemoved EventType = "schedule.removed"

	EventWorkflowStarted        EventType = "workflow.started"
	EventWorkflowCompleted      EventType = "workflow.completed"
	EventWorkflowFailed         EventType = "workflow.failed"
	EventWorkflowCancelled      EventType = "workflow.cancelled"
	EventWorkflowTaskDispatched EventType = "workflow.task.dispatched"
	EventWorkflowTaskCompleted  EventType = "workflow.task.completed"
	EventWorkflowTaskFailed     EventType = "workflow.task.failed"

	EventBatchStarted          EventType = "batch.started"
	EventBatchCompleted        EventType = "batch.completed"
	EventBatchFailed           EventType = "batch.failed"
	EventBatchCancelled        EventType = "batch.cancelled"
	EventBatchOnetimeCompleted EventType = "batch.onetime.completed"
	EventBatchOnetimeFailed    EventType = "batch.onetime.failed"
	EventBatchItemCompleted    EventType = "batch.item.completed"
	EventBatchItemFailed       EventType = "batch.item.failed"
	EventBatchProgress         EventType = "batch.progress"

	EventWorkflowTimedOut          EventType = "workflow.timed_out"
	EventWorkflowRetrying          EventType = "workflow.retrying"
	EventBatchTimedOut             EventType = "batch.timed_out"
	EventBatchRetrying             EventType = "batch.retrying"
	EventJobScheduleToStartTimeout EventType = "job.schedule_to_start_timeout"
	EventNodeCrashDetected         EventType = "node.crash_detected"
	EventJobAutoRecovered          EventType = "job.auto_recovered"
)

type GroupAggregator

type GroupAggregator interface {
	// Aggregate combines multiple task payloads into a single payload.
	// The group name and list of payloads are provided.
	Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)
}

GroupAggregator defines how a set of grouped tasks should be combined into a single aggregated task for processing.

type GroupAggregatorFunc

type GroupAggregatorFunc func(group string, payloads []json.RawMessage) (json.RawMessage, error)

GroupAggregatorFunc is a convenience adapter for GroupAggregator.

func (GroupAggregatorFunc) Aggregate

func (f GroupAggregatorFunc) Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)

type GroupConfig

type GroupConfig struct {
	// Aggregator combines grouped tasks into one before processing.
	Aggregator GroupAggregator

	// GracePeriod is how long to wait for additional tasks after the
	// first task arrives in a group. Default: 5s.
	GracePeriod time.Duration

	// MaxDelay is the maximum time a group can wait before being
	// flushed, regardless of GracePeriod. Default: 30s.
	MaxDelay time.Duration

	// MaxSize is the maximum number of tasks in a group before
	// it is flushed immediately. Default: 100.
	MaxSize int
}

GroupConfig configures task aggregation for a server.

type GroupHandler

type GroupHandler func(ctx context.Context, group string, payload json.RawMessage) error

GroupHandler processes an aggregated group of tasks.

type GroupMessage

type GroupMessage struct {
	JobID    string          `json:"job_id"`
	TaskType TaskType        `json:"task_type"`
	Payload  json.RawMessage `json:"payload"`
	AddedAt  time.Time       `json:"added_at"`
}

GroupMessage is stored in the group set pending aggregation.

type HandlerDefinition

type HandlerDefinition struct {
	TaskType          TaskType
	Handler           HandlerFunc           // original handler (no result)
	HandlerWithResult HandlerFuncWithResult // handler that returns result data (optional)
	Concurrency       int                   // max concurrent per node (0 = pool default)
	Timeout           time.Duration         // per-execution timeout (0 = no timeout)
	RetryPolicy       *RetryPolicy          // default retry (overridable per-job)
	Middlewares       []MiddlewareFunc      // per-handler middleware (applied after global)
}

HandlerDefinition describes a handler registered on the server.

type HandlerFunc

type HandlerFunc func(ctx context.Context, payload json.RawMessage) error

HandlerFunc is the function signature server handlers must implement.

func ChainMiddleware

func ChainMiddleware(handler HandlerFunc, middlewares ...MiddlewareFunc) HandlerFunc

ChainMiddleware applies middlewares to a HandlerFunc in order. The first middleware in the list is the outermost (executed first).

type HandlerFuncWithResult

type HandlerFuncWithResult func(ctx context.Context, payload json.RawMessage) (json.RawMessage, error)

HandlerFuncWithResult is an extended handler that returns output data.

func ChainMiddlewareWithResult

func ChainMiddlewareWithResult(handler HandlerFuncWithResult, middlewares ...MiddlewareWithResultFunc) HandlerFuncWithResult

ChainMiddlewareWithResult applies middlewares to a HandlerFuncWithResult.

type Job

type Job struct {
	ID          string          `json:"id"`
	TaskType    TaskType        `json:"task_type"`
	Payload     json.RawMessage `json:"payload"`
	Schedule    Schedule        `json:"schedule"`
	RetryPolicy *RetryPolicy    `json:"retry_policy,omitempty"`

	// State
	Status      JobStatus  `json:"status"`
	Attempt     int        `json:"attempt"`
	LastError   *string    `json:"last_error,omitempty"`
	LastRunAt   *time.Time `json:"last_run_at,omitempty"`
	NextRunAt   *time.Time `json:"next_run_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// Metadata
	Tags      []string          `json:"tags,omitempty"`
	Headers   map[string]string `json:"headers,omitempty"` // key-value metadata
	UniqueKey *string           `json:"unique_key,omitempty"`
	DLQAfter  *int              `json:"dlq_after,omitempty"` // move to DLQ after N failures

	// Workflow association (set when this job is part of a workflow).
	WorkflowID   *string `json:"workflow_id,omitempty"`
	WorkflowTask *string `json:"workflow_task,omitempty"`

	// Batch association (set when this job is part of a batch).
	BatchID   *string `json:"batch_id,omitempty"`
	BatchItem *string `json:"batch_item,omitempty"` // item ID within batch
	BatchRole *string `json:"batch_role,omitempty"` // "onetime" or "item"

	// Priority and queue policies.
	Priority               *Priority `json:"priority,omitempty"`
	ScheduleToStartTimeout *Duration `json:"schedule_to_start_timeout,omitempty"`

	// HeartbeatTimeout overrides the default 30s stale age for orphan detection.
	// If set, a run is considered orphaned if no heartbeat is received within
	// this duration.
	HeartbeatTimeout *Duration `json:"heartbeat_timeout,omitempty"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

Job is the persisted entity representing a registered job with its schedule.

type JobEvent

type JobEvent struct {
	Type           EventType      `json:"type"`
	JobID          string         `json:"job_id,omitempty"`
	RunID          string         `json:"run_id,omitempty"`
	NodeID         string         `json:"node_id,omitempty"`
	TaskType       TaskType       `json:"task_type,omitempty"`
	Error          *string        `json:"error,omitempty"`
	Attempt        int            `json:"attempt,omitempty"`
	Timestamp      time.Time      `json:"timestamp"`
	BatchProgress  *BatchProgress `json:"batch_progress,omitempty"`
	AffectedRunIDs []string       `json:"affected_run_ids,omitempty"`
}

JobEvent is the envelope published to the DUREQ_EVENTS stream.

type JobRun

type JobRun struct {
	ID              string          `json:"id"`
	JobID           string          `json:"job_id"`
	NodeID          string          `json:"node_id"`
	Status          RunStatus       `json:"status"`
	Attempt         int             `json:"attempt"`
	Error           *string         `json:"error,omitempty"`
	StartedAt       time.Time       `json:"started_at"`
	FinishedAt      *time.Time      `json:"finished_at,omitempty"`
	Duration        time.Duration   `json:"duration,omitempty"`
	LastHeartbeatAt *time.Time      `json:"last_heartbeat_at,omitempty"`
	Progress        json.RawMessage `json:"progress,omitempty"` // user-reported progress data
}

JobRun represents an individual execution of a job.

type JobStatus

type JobStatus string

JobStatus represents the lifecycle state of a job definition.

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusScheduled JobStatus = "scheduled"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
	JobStatusRetrying  JobStatus = "retrying"
	JobStatusDead      JobStatus = "dead"
	JobStatusCancelled JobStatus = "cancelled"
)

func (JobStatus) IsTerminal

func (s JobStatus) IsTerminal() bool

func (JobStatus) String

func (s JobStatus) String() string

type MiddlewareFunc

type MiddlewareFunc func(HandlerFunc) HandlerFunc

MiddlewareFunc wraps a HandlerFunc, allowing pre/post processing.

type MiddlewareWithResultFunc

type MiddlewareWithResultFunc func(HandlerFuncWithResult) HandlerFuncWithResult

MiddlewareWithResultFunc wraps a HandlerFuncWithResult.

func AdaptMiddleware

func AdaptMiddleware(mw MiddlewareFunc) MiddlewareWithResultFunc

AdaptMiddleware converts a MiddlewareFunc for use with HandlerFuncWithResult. The adapted middleware wraps the result handler by converting it to/from a plain handler.

type NodeInfo

type NodeInfo struct {
	NodeID        string     `json:"node_id"`
	Address       string     `json:"address,omitempty"`
	TaskTypes     []string   `json:"task_types"`
	StartedAt     time.Time  `json:"started_at"`
	LastHeartbeat time.Time  `json:"last_heartbeat"`
	PoolStats     *PoolStats `json:"pool_stats,omitempty"`
	ActiveRunIDs  []string   `json:"active_run_ids,omitempty"`
}

NodeInfo is registered by each node in the dureq_nodes KV bucket.

type NonRetryableError

type NonRetryableError struct{ Err error }

NonRetryableError wraps an error as non-retryable.

func (*NonRetryableError) Error

func (e *NonRetryableError) Error() string

func (*NonRetryableError) Unwrap

func (e *NonRetryableError) Unwrap() error

type OverlapPolicy

type OverlapPolicy string

OverlapPolicy controls behavior when a scheduled job fires while a previous execution is still running.

const (
	// OverlapAllowAll dispatches regardless of active runs (default).
	OverlapAllowAll OverlapPolicy = "ALLOW_ALL"
	// OverlapSkip skips the dispatch if any run for this job is active.
	OverlapSkip OverlapPolicy = "SKIP"
	// OverlapBufferOne buffers at most one pending dispatch while a run is active.
	OverlapBufferOne OverlapPolicy = "BUFFER_ONE"
	// OverlapBufferAll buffers all pending dispatches while a run is active.
	OverlapBufferAll OverlapPolicy = "BUFFER_ALL"
)

type PanicError

type PanicError struct {
	Value      interface{}
	Stacktrace string
}

PanicError wraps a recovered panic value as an error.

func GetPanicError

func GetPanicError(err error) *PanicError

GetPanicError extracts the PanicError from an error chain, if present.

func (*PanicError) Error

func (e *PanicError) Error() string

type PeriodicTaskConfig

type PeriodicTaskConfig struct {
	// CronExpr is the cron expression (e.g., "*/5 * * * *").
	CronExpr string `json:"cron_expr"`

	// TaskType is the handler to invoke.
	TaskType TaskType `json:"task_type"`

	// Payload is the task payload.
	Payload json.RawMessage `json:"payload,omitempty"`

	// UniqueKey prevents duplicate schedules for the same logical task.
	UniqueKey string `json:"unique_key,omitempty"`
}

PeriodicTaskConfig describes a single periodic task to be managed.

type PeriodicTaskConfigProvider

type PeriodicTaskConfigProvider interface {
	GetConfigs() ([]*PeriodicTaskConfig, error)
}

PeriodicTaskConfigProvider is an interface that returns the current set of periodic tasks. The PeriodicTaskManager calls GetConfigs periodically and syncs the returned tasks with the scheduler — adding new ones, removing stale ones, and updating changed ones.

type PeriodicTaskConfigProviderFunc

type PeriodicTaskConfigProviderFunc func() ([]*PeriodicTaskConfig, error)

PeriodicTaskConfigProviderFunc is a convenience adapter.

func (PeriodicTaskConfigProviderFunc) GetConfigs

type PoolStats

type PoolStats struct {
	RunningWorkers int    `json:"running_workers"`
	IdleWorkers    int    `json:"idle_workers"`
	MaxConcurrency int    `json:"max_concurrency"`
	TotalSubmitted uint64 `json:"total_submitted"`
	TotalCompleted uint64 `json:"total_completed"`
	TotalFailed    uint64 `json:"total_failed"`
	QueueLength    int    `json:"queue_length"`
}

PoolStats captures worker pool metrics for a node.

type Priority

type Priority int

Priority represents the execution priority of a job (1-10).

const (
	PriorityLow      Priority = 1
	PriorityNormal   Priority = 5
	PriorityHigh     Priority = 8
	PriorityCritical Priority = 10
)

func GetPriority

func GetPriority(ctx context.Context) Priority

GetPriority extracts the priority from the context.

type PriorityTier

type PriorityTier string

PriorityTier maps a numeric priority to a Redis Stream tier.

const (
	TierHigh   PriorityTier = "high"
	TierNormal PriorityTier = "normal"
	TierLow    PriorityTier = "low"
)

func TierForPriority

func TierForPriority(p Priority) PriorityTier

TierForPriority returns the subject tier for a given priority value.

type ProgressReporterFunc

type ProgressReporterFunc func(ctx context.Context, data json.RawMessage) error

ProgressReporterFunc is the callback stored in context for reporting progress.

type RateLimitedError

type RateLimitedError struct {
	Err        error
	RetryAfter time.Duration
}

RateLimitedError wraps an error with a retry-after duration.

func (*RateLimitedError) Error

func (e *RateLimitedError) Error() string

func (*RateLimitedError) Unwrap

func (e *RateLimitedError) Unwrap() error

type RedisOptions

type RedisOptions struct {
	// URL is the Redis server URL (e.g., "redis://localhost:6379", "rediss://host:6380/0").
	URL string

	// Password for Redis authentication. Overridden if set in URL.
	Password string

	// DB is the Redis database number. Default: 0.
	DB int

	// PoolSize is the maximum number of connections in the pool. Default: 10.
	PoolSize int

	// TLSConfig enables TLS for the connection. Set to non-nil to enable.
	TLSConfig *tls.Config

	// SentinelConfig enables Redis Sentinel failover mode.
	// When set, URL is ignored and Sentinel is used instead.
	SentinelConfig *RedisSentinelConfig

	// ClusterAddrs enables Redis Cluster mode.
	// When set, URL is ignored and a cluster client is created.
	ClusterAddrs []string
}

RedisOptions holds connection parameters for the Redis client.

type RedisSentinelConfig

type RedisSentinelConfig struct {
	// MasterName is the name of the master as configured in Sentinel.
	MasterName string

	// SentinelAddrs is the list of Sentinel node addresses (host:port).
	SentinelAddrs []string

	// SentinelPassword is the password for Sentinel nodes (if different from master password).
	SentinelPassword string
}

RedisSentinelConfig holds configuration for Redis Sentinel failover.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts  int           `json:"max_attempts"`
	InitialDelay time.Duration `json:"initial_delay"`
	MaxDelay     time.Duration `json:"max_delay"`
	Multiplier   float64       `json:"multiplier"`
	Jitter       float64       `json:"jitter"` // 0.0 - 1.0
}

RetryPolicy describes retry behavior.

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy.

type RetryableError

type RetryableError struct{ Err error }

RetryableError wraps an error as retryable.

func (*RetryableError) Error

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap

func (e *RetryableError) Unwrap() error

type RunStatus

type RunStatus string

RunStatus represents the state of an individual job execution.

const (
	RunStatusClaimed   RunStatus = "claimed"
	RunStatusRunning   RunStatus = "running"
	RunStatusSucceeded RunStatus = "succeeded"
	RunStatusFailed    RunStatus = "failed"
	RunStatusTimedOut  RunStatus = "timed_out"
	RunStatusCancelled RunStatus = "cancelled"
)

func (RunStatus) IsTerminal

func (s RunStatus) IsTerminal() bool

func (RunStatus) String

func (s RunStatus) String() string

type Schedule

type Schedule struct {
	Type ScheduleType `json:"type"`

	// ONE_TIME: run once at this time
	RunAt *time.Time `json:"run_at,omitempty"`

	// DURATION: repeat at fixed interval
	Interval *Duration `json:"interval,omitempty"`

	// CRON: cron expression
	CronExpr *string `json:"cron_expr,omitempty"`

	// DAILY/WEEKLY/MONTHLY
	RegularInterval *uint    `json:"regular_interval,omitempty"` // every N days/weeks/months
	AtTimes         []AtTime `json:"at_times,omitempty"`         // times of day
	IncludedDays    []int    `json:"included_days,omitempty"`    // weekdays (0-6) or month days (1-31)

	// Common boundaries
	StartsAt *time.Time `json:"starts_at,omitempty"`
	EndsAt   *time.Time `json:"ends_at,omitempty"`
	Timezone string     `json:"timezone,omitempty"`

	// OverlapPolicy controls what happens when a new occurrence fires
	// while a previous run is still active. Only for recurring schedules.
	OverlapPolicy OverlapPolicy `json:"overlap_policy,omitempty"`

	// CatchupWindow defines the maximum age of a missed firing that should
	// be backfilled. Zero means no backfill (only the latest due firing).
	CatchupWindow *Duration `json:"catchup_window,omitempty"`

	// MaxBackfillPerTick limits how many missed firings are enqueued in a
	// single scheduler tick to prevent flooding. Default: 10 if CatchupWindow is set.
	MaxBackfillPerTick *int `json:"max_backfill_per_tick,omitempty"`
}

Schedule defines "when" a job should run.

func (*Schedule) Validate

func (s *Schedule) Validate() error

Validate checks the schedule for consistency.

type ScheduleEntry

type ScheduleEntry struct {
	JobID           string     `json:"job_id"`
	NextRunAt       time.Time  `json:"next_run_at"`
	Schedule        Schedule   `json:"schedule"`
	LastProcessedAt *time.Time `json:"last_processed_at,omitempty"` // last successful processing time
}

ScheduleEntry is the KV entry the leader's scheduler scans for due jobs.

type ScheduleType

type ScheduleType string

ScheduleType defines how a job is scheduled.

const (
	ScheduleImmediate ScheduleType = "IMMEDIATE"
	ScheduleOneTime   ScheduleType = "ONE_TIME"
	ScheduleDuration  ScheduleType = "DURATION"
	ScheduleCron      ScheduleType = "CRON"
	ScheduleDaily     ScheduleType = "DAILY"
	ScheduleWeekly    ScheduleType = "WEEKLY"
	ScheduleMonthly   ScheduleType = "MONTHLY"
)

type TaskType

type TaskType string

TaskType identifies a registered handler. This is the "how" — what function to call when a job of this type fires.

func GetTaskType

func GetTaskType(ctx context.Context) TaskType

GetTaskType extracts the task type from the context.

func (TaskType) String

func (t TaskType) String() string

type WorkMessage

type WorkMessage struct {
	RunID        string            `json:"run_id"`
	JobID        string            `json:"job_id"`
	TaskType     TaskType          `json:"task_type"`
	Payload      json.RawMessage   `json:"payload"`
	Attempt      int               `json:"attempt"`
	Deadline     time.Time         `json:"deadline"`
	Metadata     map[string]string `json:"metadata,omitempty"`
	Headers      map[string]string `json:"headers,omitempty"`
	Priority     Priority          `json:"priority,omitempty"`
	DispatchedAt time.Time         `json:"dispatched_at,omitempty"`
}

WorkMessage is the message published to DUREQ_WORK stream.

type WorkResult

type WorkResult struct {
	RunID   string          `json:"run_id"`
	JobID   string          `json:"job_id"`
	Success bool            `json:"success"`
	Error   *string         `json:"error,omitempty"`
	Output  json.RawMessage `json:"output,omitempty"` // handler output data
}

WorkResult is the outcome of executing a work message.

type WorkflowDefinition

type WorkflowDefinition struct {
	Name             string         `json:"name"`
	Tasks            []WorkflowTask `json:"tasks"`
	ExecutionTimeout *Duration      `json:"execution_timeout,omitempty"`
	RetryPolicy      *RetryPolicy   `json:"retry_policy,omitempty"`
	DefaultPriority  *Priority      `json:"default_priority,omitempty"`
}

WorkflowDefinition describes a DAG of tasks to execute.

type WorkflowInstance

type WorkflowInstance struct {
	ID           string                       `json:"id"`
	WorkflowName string                       `json:"workflow_name"`
	Status       WorkflowStatus               `json:"status"`
	Tasks        map[string]WorkflowTaskState `json:"tasks"`
	Definition   WorkflowDefinition           `json:"definition"`
	Input        json.RawMessage              `json:"input,omitempty"`
	Deadline     *time.Time                   `json:"deadline,omitempty"`
	Attempt      int                          `json:"attempt,omitempty"`
	MaxAttempts  int                          `json:"max_attempts,omitempty"`
	CreatedAt    time.Time                    `json:"created_at"`
	UpdatedAt    time.Time                    `json:"updated_at"`
	CompletedAt  *time.Time                   `json:"completed_at,omitempty"`
}

WorkflowInstance is a running instance of a workflow definition.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the lifecycle state of a workflow instance.

const (
	WorkflowStatusPending   WorkflowStatus = "pending"
	WorkflowStatusRunning   WorkflowStatus = "running"
	WorkflowStatusCompleted WorkflowStatus = "completed"
	WorkflowStatusFailed    WorkflowStatus = "failed"
	WorkflowStatusCancelled WorkflowStatus = "cancelled"
)

func (WorkflowStatus) IsTerminal

func (s WorkflowStatus) IsTerminal() bool

func (WorkflowStatus) String

func (s WorkflowStatus) String() string

type WorkflowTask

type WorkflowTask struct {
	Name      string          `json:"name"`
	TaskType  TaskType        `json:"task_type"`
	Payload   json.RawMessage `json:"payload,omitempty"`
	DependsOn []string        `json:"depends_on,omitempty"`
}

WorkflowTask is one step in a workflow DAG.

type WorkflowTaskState

type WorkflowTaskState struct {
	Name       string     `json:"name"`
	JobID      string     `json:"job_id,omitempty"`
	Status     JobStatus  `json:"status"`
	Error      *string    `json:"error,omitempty"`
	StartedAt  *time.Time `json:"started_at,omitempty"`
	FinishedAt *time.Time `json:"finished_at,omitempty"`
}

WorkflowTaskState tracks the state of one task within a workflow instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL