Documentation
¶
Index ¶
- Variables
- func GetAttempt(ctx context.Context) int
- func GetErrorClassString(errClass ErrorClassification) string
- func GetHeaders(ctx context.Context) map[string]string
- func GetJobID(ctx context.Context) string
- func GetMaxRetry(ctx context.Context) int
- func GetNodeID(ctx context.Context) string
- func GetRetryAfter(err error) time.Duration
- func GetRunID(ctx context.Context) string
- func IsPanicError(err error) bool
- func ReportProgress(ctx context.Context, data any) error
- func WithAttempt(ctx context.Context, attempt int) context.Context
- func WithHeaders(ctx context.Context, headers map[string]string) context.Context
- func WithJobID(ctx context.Context, id string) context.Context
- func WithMaxRetry(ctx context.Context, max int) context.Context
- func WithNodeID(ctx context.Context, id string) context.Context
- func WithPriority(ctx context.Context, p Priority) context.Context
- func WithProgressReporter(ctx context.Context, fn ProgressReporterFunc) context.Context
- func WithRunID(ctx context.Context, id string) context.Context
- func WithTaskType(ctx context.Context, tt TaskType) context.Context
- type AggregateResult
- type AtTime
- type BatchDefinition
- type BatchFailurePolicy
- type BatchInstance
- type BatchItem
- type BatchItemResult
- type BatchItemState
- type BatchOnetimeState
- type BatchProgress
- type Duration
- type EnqueueGroupOption
- type ErrorClassification
- type EventType
- type GroupAggregator
- type GroupAggregatorFunc
- type GroupConfig
- type GroupHandler
- type GroupMessage
- type HandlerDefinition
- type HandlerFunc
- type HandlerFuncWithResult
- type Job
- type JobEvent
- type JobRun
- type JobStatus
- type MiddlewareFunc
- type MiddlewareWithResultFunc
- type NodeInfo
- type NonRetryableError
- type OverlapPolicy
- type PanicError
- type PeriodicTaskConfig
- type PeriodicTaskConfigProvider
- type PeriodicTaskConfigProviderFunc
- type PoolStats
- type Priority
- type PriorityTier
- type ProgressReporterFunc
- type RateLimitedError
- type RedisOptions
- type RedisSentinelConfig
- type RetryPolicy
- type RetryableError
- type RunStatus
- type Schedule
- type ScheduleEntry
- type ScheduleType
- type TaskType
- type WorkMessage
- type WorkResult
- type WorkflowDefinition
- type WorkflowInstance
- type WorkflowStatus
- type WorkflowTask
- type WorkflowTaskState
Constants ¶
This section is empty.
Variables ¶
var ( ErrJobNotFound = errors.New("dureq: job not found") ErrScheduleNotFound = errors.New("dureq: schedule not found") ErrRunNotFound = errors.New("dureq: run not found") ErrDuplicateJob = errors.New("dureq: duplicate unique key") ErrHandlerNotFound = errors.New("dureq: handler not registered for task type") ErrNotLeader = errors.New("dureq: not the leader node") ErrLockFailed = errors.New("dureq: failed to acquire lock") ErrLockNotHeld = errors.New("dureq: lock not held") ErrServerStopped = errors.New("dureq: server stopped") ErrInvalidSchedule = errors.New("dureq: invalid schedule") ErrInvalidJobStatus = errors.New("dureq: invalid job status transition") ErrRedisNotConnected = errors.New("dureq: Redis not connected") ErrWorkflowNotFound = errors.New("dureq: workflow not found") ErrCyclicDependency = errors.New("dureq: workflow has cyclic dependencies") ErrBatchNotFound = errors.New("dureq: batch not found") ErrExecutionTimedOut = errors.New("dureq: execution timed out") ErrScheduleToStartTimeout = errors.New("dureq: schedule-to-start timeout exceeded") ErrNoProgressReporter = errors.New("dureq: no progress reporter in context (called outside handler?)") )
Sentinel errors.
Functions ¶
func GetAttempt ¶
GetAttempt extracts the attempt number from the context.
func GetErrorClassString ¶
func GetErrorClassString(errClass ErrorClassification) string
GetErrorClassString extracts the error string from an ErrorClassification.
func GetHeaders ¶
GetHeaders extracts the headers map from the context.
func GetMaxRetry ¶
GetMaxRetry extracts the max retry count from the context.
func GetRetryAfter ¶
GetRetryAfter extracts the RetryAfter duration from a RateLimitedError.
func IsPanicError ¶
IsPanicError checks whether err (or any wrapped error) is a PanicError.
func ReportProgress ¶
ReportProgress reports user-defined progress data for the current run. The data is stored alongside the run's heartbeat and is accessible via the monitoring API and client SDK.
func WithAttempt ¶
WithAttempt stores the attempt number in the context.
func WithHeaders ¶
WithHeaders stores the headers map in the context.
func WithMaxRetry ¶
WithMaxRetry stores the max retry count in the context.
func WithNodeID ¶
WithNodeID stores the node ID in the context.
func WithPriority ¶
WithPriority stores the priority in the context.
func WithProgressReporter ¶
func WithProgressReporter(ctx context.Context, fn ProgressReporterFunc) context.Context
WithProgressReporter stores the progress reporter function in the context.
Types ¶
type AggregateResult ¶
type AggregateResult struct {
Group string `json:"group"`
Count int `json:"count"`
AggregatedPayload json.RawMessage `json:"aggregated_payload"`
}
AggregateResult is the outcome of flushing a group.
type AtTime ¶
type AtTime struct {
Hour uint `json:"hour"`
Minute uint `json:"minute"`
Second uint `json:"second"`
}
AtTime represents a time of day as hour, minute, second.
type BatchDefinition ¶
type BatchDefinition struct {
Name string `json:"name"`
// OnetimeTaskType is the handler for shared preprocessing (runs once before items).
OnetimeTaskType *TaskType `json:"onetime_task_type,omitempty"`
OnetimePayload json.RawMessage `json:"onetime_payload,omitempty"`
// ItemTaskType is the handler that processes each batch item.
ItemTaskType TaskType `json:"item_task_type"`
Items []BatchItem `json:"items"`
// FailurePolicy controls partial failure behavior. Default: continue_on_error.
FailurePolicy BatchFailurePolicy `json:"failure_policy,omitempty"`
// ChunkSize controls how many items are dispatched concurrently. Default: 100.
ChunkSize int `json:"chunk_size,omitempty"`
// ItemRetryPolicy for individual item failures.
ItemRetryPolicy *RetryPolicy `json:"item_retry_policy,omitempty"`
// Execution timeout for the entire batch.
ExecutionTimeout *Duration `json:"execution_timeout,omitempty"`
// RetryPolicy for retrying the entire batch on failure.
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
// DefaultPriority for all jobs created by this batch.
DefaultPriority *Priority `json:"default_priority,omitempty"`
}
BatchDefinition describes a batch processing job to submit.
type BatchFailurePolicy ¶
type BatchFailurePolicy string
BatchFailurePolicy controls behavior when a batch item fails.
const ( // BatchFailFast stops the batch immediately on first item failure. BatchFailFast BatchFailurePolicy = "fail_fast" // BatchContinueOnError continues processing remaining items. BatchContinueOnError BatchFailurePolicy = "continue_on_error" )
type BatchInstance ¶
type BatchInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Status WorkflowStatus `json:"status"`
Definition BatchDefinition `json:"definition"`
// Onetime preprocessing state.
OnetimeState *BatchOnetimeState `json:"onetime_state,omitempty"`
// Per-item tracking.
ItemStates map[string]BatchItemState `json:"item_states"`
// Progress counters.
TotalItems int `json:"total_items"`
CompletedItems int `json:"completed_items"`
FailedItems int `json:"failed_items"`
RunningItems int `json:"running_items"`
PendingItems int `json:"pending_items"`
// Chunk tracking — index into Definition.Items for the next chunk.
NextChunkIndex int `json:"next_chunk_index"`
// Timeout and retry tracking.
Deadline *time.Time `json:"deadline,omitempty"`
Attempt int `json:"attempt,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
BatchInstance is the runtime state of a batch being processed.
type BatchItem ¶
type BatchItem struct {
ID string `json:"id"`
Payload json.RawMessage `json:"payload"`
}
BatchItem is a single item within a batch.
type BatchItemResult ¶
type BatchItemResult struct {
BatchID string `json:"batch_id"`
ItemID string `json:"item_id"`
Success bool `json:"success"`
Output json.RawMessage `json:"output,omitempty"`
Error *string `json:"error,omitempty"`
}
BatchItemResult stores the output of a single batch item execution.
type BatchItemState ¶
type BatchItemState struct {
ItemID string `json:"item_id"`
JobID string `json:"job_id,omitempty"`
Status JobStatus `json:"status"`
Error *string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
BatchItemState tracks an individual item within a batch.
type BatchOnetimeState ¶
type BatchOnetimeState struct {
JobID string `json:"job_id,omitempty"`
Status JobStatus `json:"status"`
ResultData json.RawMessage `json:"result_data,omitempty"`
Error *string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
BatchOnetimeState tracks the shared preprocessing step.
type BatchProgress ¶
type BatchProgress struct {
BatchID string `json:"batch_id"`
Done int `json:"done"`
Total int `json:"total"`
Failed int `json:"failed"`
Step string `json:"step,omitempty"`
}
BatchProgress is published as an event for progress tracking.
type Duration ¶
Duration is a JSON-friendly time.Duration.
func (Duration) MarshalJSON ¶
func (*Duration) UnmarshalJSON ¶
type EnqueueGroupOption ¶
type EnqueueGroupOption struct {
Group string // the group name (required)
TaskType TaskType
Payload json.RawMessage
}
EnqueueGroupOption configures a grouped task enqueue.
type ErrorClassification ¶
type ErrorClassification int
ErrorClassification categorizes errors for retry decisions.
const ( ErrorClassRetryable ErrorClassification = iota // transient, should retry ErrorClassNonRetryable // permanent, do not retry ErrorClassRateLimited // retry after specific delay )
func ClassifyError ¶
func ClassifyError(err error) ErrorClassification
ClassifyError determines the retry classification of an error.
type EventType ¶
type EventType string
EventType identifies the kind of event published on the DUREQ_EVENTS stream.
const ( EventJobEnqueued EventType = "job.enqueued" EventJobScheduled EventType = "job.scheduled" EventJobDispatched EventType = "job.dispatched" EventJobStarted EventType = "job.started" EventJobCompleted EventType = "job.completed" EventJobFailed EventType = "job.failed" EventJobRetrying EventType = "job.retrying" EventJobDead EventType = "job.dead" EventJobCancelled EventType = "job.cancelled" EventNodeJoined EventType = "node.joined" EventNodeLeft EventType = "node.left" EventLeaderElected EventType = "leader.elected" EventLeaderLost EventType = "leader.lost" EventScheduleCreated EventType = "schedule.created" EventScheduleRemoved EventType = "schedule.removed" EventWorkflowStarted EventType = "workflow.started" EventWorkflowCompleted EventType = "workflow.completed" EventWorkflowFailed EventType = "workflow.failed" EventWorkflowCancelled EventType = "workflow.cancelled" EventWorkflowTaskDispatched EventType = "workflow.task.dispatched" EventWorkflowTaskCompleted EventType = "workflow.task.completed" EventWorkflowTaskFailed EventType = "workflow.task.failed" EventBatchStarted EventType = "batch.started" EventBatchCompleted EventType = "batch.completed" EventBatchFailed EventType = "batch.failed" EventBatchCancelled EventType = "batch.cancelled" EventBatchOnetimeCompleted EventType = "batch.onetime.completed" EventBatchOnetimeFailed EventType = "batch.onetime.failed" EventBatchItemCompleted EventType = "batch.item.completed" EventBatchItemFailed EventType = "batch.item.failed" EventBatchProgress EventType = "batch.progress" EventWorkflowTimedOut EventType = "workflow.timed_out" EventWorkflowRetrying EventType = "workflow.retrying" EventBatchTimedOut EventType = "batch.timed_out" EventBatchRetrying EventType = "batch.retrying" EventJobScheduleToStartTimeout EventType = "job.schedule_to_start_timeout" EventNodeCrashDetected EventType = "node.crash_detected" EventJobAutoRecovered EventType = "job.auto_recovered" )
type GroupAggregator ¶
type GroupAggregator interface {
// Aggregate combines multiple task payloads into a single payload.
// The group name and list of payloads are provided.
Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)
}
GroupAggregator defines how a set of grouped tasks should be combined into a single aggregated task for processing.
type GroupAggregatorFunc ¶
type GroupAggregatorFunc func(group string, payloads []json.RawMessage) (json.RawMessage, error)
GroupAggregatorFunc is a convenience adapter for GroupAggregator.
func (GroupAggregatorFunc) Aggregate ¶
func (f GroupAggregatorFunc) Aggregate(group string, payloads []json.RawMessage) (json.RawMessage, error)
type GroupConfig ¶
type GroupConfig struct {
// Aggregator combines grouped tasks into one before processing.
Aggregator GroupAggregator
// GracePeriod is how long to wait for additional tasks after the
// first task arrives in a group. Default: 5s.
GracePeriod time.Duration
// MaxDelay is the maximum time a group can wait before being
// flushed, regardless of GracePeriod. Default: 30s.
MaxDelay time.Duration
// MaxSize is the maximum number of tasks in a group before
// it is flushed immediately. Default: 100.
MaxSize int
}
GroupConfig configures task aggregation for a server.
type GroupHandler ¶
GroupHandler processes an aggregated group of tasks.
type GroupMessage ¶
type GroupMessage struct {
JobID string `json:"job_id"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
AddedAt time.Time `json:"added_at"`
}
GroupMessage is stored in the group set pending aggregation.
type HandlerDefinition ¶
type HandlerDefinition struct {
TaskType TaskType
Handler HandlerFunc // original handler (no result)
HandlerWithResult HandlerFuncWithResult // handler that returns result data (optional)
Concurrency int // max concurrent per node (0 = pool default)
Timeout time.Duration // per-execution timeout (0 = no timeout)
RetryPolicy *RetryPolicy // default retry (overridable per-job)
Middlewares []MiddlewareFunc // per-handler middleware (applied after global)
}
HandlerDefinition describes a handler registered on the server.
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, payload json.RawMessage) error
HandlerFunc is the function signature server handlers must implement.
func ChainMiddleware ¶
func ChainMiddleware(handler HandlerFunc, middlewares ...MiddlewareFunc) HandlerFunc
ChainMiddleware applies middlewares to a HandlerFunc in order. The first middleware in the list is the outermost (executed first).
type HandlerFuncWithResult ¶
type HandlerFuncWithResult func(ctx context.Context, payload json.RawMessage) (json.RawMessage, error)
HandlerFuncWithResult is an extended handler that returns output data.
func ChainMiddlewareWithResult ¶
func ChainMiddlewareWithResult(handler HandlerFuncWithResult, middlewares ...MiddlewareWithResultFunc) HandlerFuncWithResult
ChainMiddlewareWithResult applies middlewares to a HandlerFuncWithResult.
type Job ¶
type Job struct {
ID string `json:"id"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Schedule Schedule `json:"schedule"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
// State
Status JobStatus `json:"status"`
Attempt int `json:"attempt"`
LastError *string `json:"last_error,omitempty"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
// Metadata
Tags []string `json:"tags,omitempty"`
Headers map[string]string `json:"headers,omitempty"` // key-value metadata
UniqueKey *string `json:"unique_key,omitempty"`
DLQAfter *int `json:"dlq_after,omitempty"` // move to DLQ after N failures
// Workflow association (set when this job is part of a workflow).
WorkflowID *string `json:"workflow_id,omitempty"`
WorkflowTask *string `json:"workflow_task,omitempty"`
// Batch association (set when this job is part of a batch).
BatchID *string `json:"batch_id,omitempty"`
BatchItem *string `json:"batch_item,omitempty"` // item ID within batch
BatchRole *string `json:"batch_role,omitempty"` // "onetime" or "item"
// Priority and queue policies.
Priority *Priority `json:"priority,omitempty"`
ScheduleToStartTimeout *Duration `json:"schedule_to_start_timeout,omitempty"`
// HeartbeatTimeout overrides the default 30s stale age for orphan detection.
// If set, a run is considered orphaned if no heartbeat is received within
// this duration.
HeartbeatTimeout *Duration `json:"heartbeat_timeout,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Job is the persisted entity representing a registered job with its schedule.
type JobEvent ¶
type JobEvent struct {
Type EventType `json:"type"`
JobID string `json:"job_id,omitempty"`
RunID string `json:"run_id,omitempty"`
NodeID string `json:"node_id,omitempty"`
TaskType TaskType `json:"task_type,omitempty"`
Error *string `json:"error,omitempty"`
Attempt int `json:"attempt,omitempty"`
Timestamp time.Time `json:"timestamp"`
BatchProgress *BatchProgress `json:"batch_progress,omitempty"`
AffectedRunIDs []string `json:"affected_run_ids,omitempty"`
}
JobEvent is the envelope published to the DUREQ_EVENTS stream.
type JobRun ¶
type JobRun struct {
ID string `json:"id"`
JobID string `json:"job_id"`
NodeID string `json:"node_id"`
Status RunStatus `json:"status"`
Attempt int `json:"attempt"`
Error *string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
LastHeartbeatAt *time.Time `json:"last_heartbeat_at,omitempty"`
Progress json.RawMessage `json:"progress,omitempty"` // user-reported progress data
}
JobRun represents an individual execution of a job.
type JobStatus ¶
type JobStatus string
JobStatus represents the lifecycle state of a job definition.
const ( JobStatusPending JobStatus = "pending" JobStatusScheduled JobStatus = "scheduled" JobStatusRunning JobStatus = "running" JobStatusCompleted JobStatus = "completed" JobStatusFailed JobStatus = "failed" JobStatusRetrying JobStatus = "retrying" JobStatusDead JobStatus = "dead" JobStatusCancelled JobStatus = "cancelled" )
func (JobStatus) IsTerminal ¶
type MiddlewareFunc ¶
type MiddlewareFunc func(HandlerFunc) HandlerFunc
MiddlewareFunc wraps a HandlerFunc, allowing pre/post processing.
type MiddlewareWithResultFunc ¶
type MiddlewareWithResultFunc func(HandlerFuncWithResult) HandlerFuncWithResult
MiddlewareWithResultFunc wraps a HandlerFuncWithResult.
func AdaptMiddleware ¶
func AdaptMiddleware(mw MiddlewareFunc) MiddlewareWithResultFunc
AdaptMiddleware converts a MiddlewareFunc for use with HandlerFuncWithResult. The adapted middleware wraps the result handler by converting it to/from a plain handler.
type NodeInfo ¶
type NodeInfo struct {
NodeID string `json:"node_id"`
Address string `json:"address,omitempty"`
TaskTypes []string `json:"task_types"`
StartedAt time.Time `json:"started_at"`
LastHeartbeat time.Time `json:"last_heartbeat"`
PoolStats *PoolStats `json:"pool_stats,omitempty"`
ActiveRunIDs []string `json:"active_run_ids,omitempty"`
}
NodeInfo is registered by each node in the dureq_nodes KV bucket.
type NonRetryableError ¶
type NonRetryableError struct{ Err error }
NonRetryableError wraps an error as non-retryable.
func (*NonRetryableError) Error ¶
func (e *NonRetryableError) Error() string
func (*NonRetryableError) Unwrap ¶
func (e *NonRetryableError) Unwrap() error
type OverlapPolicy ¶
type OverlapPolicy string
OverlapPolicy controls behavior when a scheduled job fires while a previous execution is still running.
const ( // OverlapAllowAll dispatches regardless of active runs (default). OverlapAllowAll OverlapPolicy = "ALLOW_ALL" // OverlapSkip skips the dispatch if any run for this job is active. OverlapSkip OverlapPolicy = "SKIP" // OverlapBufferOne buffers at most one pending dispatch while a run is active. OverlapBufferOne OverlapPolicy = "BUFFER_ONE" // OverlapBufferAll buffers all pending dispatches while a run is active. OverlapBufferAll OverlapPolicy = "BUFFER_ALL" )
type PanicError ¶
type PanicError struct {
Value interface{}
Stacktrace string
}
PanicError wraps a recovered panic value as an error.
func GetPanicError ¶
func GetPanicError(err error) *PanicError
GetPanicError extracts the PanicError from an error chain, if present.
func (*PanicError) Error ¶
func (e *PanicError) Error() string
type PeriodicTaskConfig ¶
type PeriodicTaskConfig struct {
// CronExpr is the cron expression (e.g., "*/5 * * * *").
CronExpr string `json:"cron_expr"`
// TaskType is the handler to invoke.
TaskType TaskType `json:"task_type"`
// Payload is the task payload.
Payload json.RawMessage `json:"payload,omitempty"`
// UniqueKey prevents duplicate schedules for the same logical task.
UniqueKey string `json:"unique_key,omitempty"`
}
PeriodicTaskConfig describes a single periodic task to be managed.
type PeriodicTaskConfigProvider ¶
type PeriodicTaskConfigProvider interface {
GetConfigs() ([]*PeriodicTaskConfig, error)
}
PeriodicTaskConfigProvider is an interface that returns the current set of periodic tasks. The PeriodicTaskManager calls GetConfigs periodically and syncs the returned tasks with the scheduler — adding new ones, removing stale ones, and updating changed ones.
type PeriodicTaskConfigProviderFunc ¶
type PeriodicTaskConfigProviderFunc func() ([]*PeriodicTaskConfig, error)
PeriodicTaskConfigProviderFunc is a convenience adapter.
func (PeriodicTaskConfigProviderFunc) GetConfigs ¶
func (f PeriodicTaskConfigProviderFunc) GetConfigs() ([]*PeriodicTaskConfig, error)
type PoolStats ¶
type PoolStats struct {
RunningWorkers int `json:"running_workers"`
IdleWorkers int `json:"idle_workers"`
MaxConcurrency int `json:"max_concurrency"`
TotalSubmitted uint64 `json:"total_submitted"`
TotalCompleted uint64 `json:"total_completed"`
TotalFailed uint64 `json:"total_failed"`
QueueLength int `json:"queue_length"`
}
PoolStats captures worker pool metrics for a node.
type Priority ¶
type Priority int
Priority represents the execution priority of a job (1-10).
func GetPriority ¶
GetPriority extracts the priority from the context.
type PriorityTier ¶
type PriorityTier string
PriorityTier maps a numeric priority to a Redis Stream tier.
const ( TierHigh PriorityTier = "high" TierNormal PriorityTier = "normal" TierLow PriorityTier = "low" )
func TierForPriority ¶
func TierForPriority(p Priority) PriorityTier
TierForPriority returns the subject tier for a given priority value.
type ProgressReporterFunc ¶
type ProgressReporterFunc func(ctx context.Context, data json.RawMessage) error
ProgressReporterFunc is the callback stored in context for reporting progress.
type RateLimitedError ¶
RateLimitedError wraps an error with a retry-after duration.
func (*RateLimitedError) Error ¶
func (e *RateLimitedError) Error() string
func (*RateLimitedError) Unwrap ¶
func (e *RateLimitedError) Unwrap() error
type RedisOptions ¶
type RedisOptions struct {
// URL is the Redis server URL (e.g., "redis://localhost:6379", "rediss://host:6380/0").
URL string
// Password for Redis authentication. Overridden if set in URL.
Password string
// DB is the Redis database number. Default: 0.
DB int
// PoolSize is the maximum number of connections in the pool. Default: 10.
PoolSize int
// TLSConfig enables TLS for the connection. Set to non-nil to enable.
TLSConfig *tls.Config
// SentinelConfig enables Redis Sentinel failover mode.
// When set, URL is ignored and Sentinel is used instead.
SentinelConfig *RedisSentinelConfig
// ClusterAddrs enables Redis Cluster mode.
// When set, URL is ignored and a cluster client is created.
ClusterAddrs []string
}
RedisOptions holds connection parameters for the Redis client.
type RedisSentinelConfig ¶
type RedisSentinelConfig struct {
// MasterName is the name of the master as configured in Sentinel.
MasterName string
// SentinelAddrs is the list of Sentinel node addresses (host:port).
SentinelAddrs []string
// SentinelPassword is the password for Sentinel nodes (if different from master password).
SentinelPassword string
}
RedisSentinelConfig holds configuration for Redis Sentinel failover.
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int `json:"max_attempts"`
InitialDelay time.Duration `json:"initial_delay"`
MaxDelay time.Duration `json:"max_delay"`
Multiplier float64 `json:"multiplier"`
Jitter float64 `json:"jitter"` // 0.0 - 1.0
}
RetryPolicy describes retry behavior.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy.
type RetryableError ¶
type RetryableError struct{ Err error }
RetryableError wraps an error as retryable.
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
func (*RetryableError) Unwrap ¶
func (e *RetryableError) Unwrap() error
type RunStatus ¶
type RunStatus string
RunStatus represents the state of an individual job execution.
func (RunStatus) IsTerminal ¶
type Schedule ¶
type Schedule struct {
Type ScheduleType `json:"type"`
// ONE_TIME: run once at this time
RunAt *time.Time `json:"run_at,omitempty"`
// DURATION: repeat at fixed interval
Interval *Duration `json:"interval,omitempty"`
// CRON: cron expression
CronExpr *string `json:"cron_expr,omitempty"`
// DAILY/WEEKLY/MONTHLY
RegularInterval *uint `json:"regular_interval,omitempty"` // every N days/weeks/months
AtTimes []AtTime `json:"at_times,omitempty"` // times of day
IncludedDays []int `json:"included_days,omitempty"` // weekdays (0-6) or month days (1-31)
// Common boundaries
StartsAt *time.Time `json:"starts_at,omitempty"`
EndsAt *time.Time `json:"ends_at,omitempty"`
Timezone string `json:"timezone,omitempty"`
// OverlapPolicy controls what happens when a new occurrence fires
// while a previous run is still active. Only for recurring schedules.
OverlapPolicy OverlapPolicy `json:"overlap_policy,omitempty"`
// CatchupWindow defines the maximum age of a missed firing that should
// be backfilled. Zero means no backfill (only the latest due firing).
CatchupWindow *Duration `json:"catchup_window,omitempty"`
// MaxBackfillPerTick limits how many missed firings are enqueued in a
// single scheduler tick to prevent flooding. Default: 10 if CatchupWindow is set.
MaxBackfillPerTick *int `json:"max_backfill_per_tick,omitempty"`
}
Schedule defines "when" a job should run.
type ScheduleEntry ¶
type ScheduleEntry struct {
JobID string `json:"job_id"`
NextRunAt time.Time `json:"next_run_at"`
Schedule Schedule `json:"schedule"`
LastProcessedAt *time.Time `json:"last_processed_at,omitempty"` // last successful processing time
}
ScheduleEntry is the KV entry the leader's scheduler scans for due jobs.
type ScheduleType ¶
type ScheduleType string
ScheduleType defines how a job is scheduled.
const ( ScheduleImmediate ScheduleType = "IMMEDIATE" ScheduleOneTime ScheduleType = "ONE_TIME" ScheduleDuration ScheduleType = "DURATION" ScheduleCron ScheduleType = "CRON" ScheduleDaily ScheduleType = "DAILY" ScheduleWeekly ScheduleType = "WEEKLY" ScheduleMonthly ScheduleType = "MONTHLY" )
type TaskType ¶
type TaskType string
TaskType identifies a registered handler. This is the "how" — what function to call when a job of this type fires.
func GetTaskType ¶
GetTaskType extracts the task type from the context.
type WorkMessage ¶
type WorkMessage struct {
RunID string `json:"run_id"`
JobID string `json:"job_id"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Attempt int `json:"attempt"`
Deadline time.Time `json:"deadline"`
Metadata map[string]string `json:"metadata,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Priority Priority `json:"priority,omitempty"`
DispatchedAt time.Time `json:"dispatched_at,omitempty"`
}
WorkMessage is the message published to DUREQ_WORK stream.
type WorkResult ¶
type WorkResult struct {
RunID string `json:"run_id"`
JobID string `json:"job_id"`
Success bool `json:"success"`
Error *string `json:"error,omitempty"`
Output json.RawMessage `json:"output,omitempty"` // handler output data
}
WorkResult is the outcome of executing a work message.
type WorkflowDefinition ¶
type WorkflowDefinition struct {
Name string `json:"name"`
Tasks []WorkflowTask `json:"tasks"`
ExecutionTimeout *Duration `json:"execution_timeout,omitempty"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
DefaultPriority *Priority `json:"default_priority,omitempty"`
}
WorkflowDefinition describes a DAG of tasks to execute.
type WorkflowInstance ¶
type WorkflowInstance struct {
ID string `json:"id"`
WorkflowName string `json:"workflow_name"`
Status WorkflowStatus `json:"status"`
Tasks map[string]WorkflowTaskState `json:"tasks"`
Definition WorkflowDefinition `json:"definition"`
Input json.RawMessage `json:"input,omitempty"`
Deadline *time.Time `json:"deadline,omitempty"`
Attempt int `json:"attempt,omitempty"`
MaxAttempts int `json:"max_attempts,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
WorkflowInstance is a running instance of a workflow definition.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the lifecycle state of a workflow instance.
const ( WorkflowStatusPending WorkflowStatus = "pending" WorkflowStatusRunning WorkflowStatus = "running" WorkflowStatusCompleted WorkflowStatus = "completed" WorkflowStatusFailed WorkflowStatus = "failed" WorkflowStatusCancelled WorkflowStatus = "cancelled" )
func (WorkflowStatus) IsTerminal ¶
func (s WorkflowStatus) IsTerminal() bool
func (WorkflowStatus) String ¶
func (s WorkflowStatus) String() string
type WorkflowTask ¶
type WorkflowTask struct {
Name string `json:"name"`
TaskType TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
}
WorkflowTask is one step in a workflow DAG.
type WorkflowTaskState ¶
type WorkflowTaskState struct {
Name string `json:"name"`
JobID string `json:"job_id,omitempty"`
Status JobStatus `json:"status"`
Error *string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
WorkflowTaskState tracks the state of one task within a workflow instance.