Documentation
¶
Index ¶
- Constants
- Variables
- func BatchDeadlinesKey(prefix string) string
- func BatchKey(prefix, id string) string
- func BatchPayloadKey(prefix, id string) string
- func BatchResultKey(prefix, batchID, itemID string) string
- func BatchResultsSetKey(prefix, batchID string) string
- func BatchesByCreatedKey(prefix string) string
- func BatchesByStatusKey(prefix, status string) string
- func CancelChannelKey(prefix string) string
- func CancelFlagKey(prefix, runID string) string
- func ConcurrencySlotKey(prefix, concKey string) string
- func DLQStreamKey(prefix string) string
- func DailyStatsKey(prefix, date string) string
- func DedupKey(prefix, runID string) string
- func DelayedKey(prefix, tierName string) string
- func ElectionEpochKey(prefix string) string
- func ElectionLeaderKey(prefix string) string
- func EventsBatchChannel(prefix, batchID string) string
- func EventsPubSubChannel(prefix string) string
- func EventsStreamKey(prefix string) string
- func FiringDedupKey(prefix, firingID string) string
- func GroupMessagesKey(prefix, group string) string
- func GroupMetaKey(prefix, group string) string
- func GroupSetKey(prefix string) string
- func HistoryEventKey(prefix, eventID string) string
- func HistoryEventsByJobKey(prefix, jobID string) string
- func HistoryEventsByTypeKey(prefix, eventType string) string
- func HistoryEventsKey(prefix string) string
- func HistoryRunKey(prefix, runID string) string
- func HistoryRunsByJobKey(prefix, jobID string) string
- func HistoryRunsByStatusKey(prefix, status string) string
- func HistoryRunsKey(prefix string) string
- func JobAuditKey(prefix, jobID string) string
- func JobKey(prefix, id string) string
- func JobNotifyChannel(prefix string) string
- func JobPayloadKey(prefix, id string) string
- func JobsByBatchKey(prefix, batchID string) string
- func JobsByCreatedKey(prefix string) string
- func JobsByStatusKey(prefix, status string) string
- func JobsByTagKey(prefix, tag string) string
- func JobsByTaskTypeKey(prefix, taskType string) string
- func JobsByWorkflowKey(prefix, wfID string) string
- func LockKey(prefix, key string) string
- func LuaExtendLockScript() string
- func LuaFlushGroupScript() string
- func LuaLeaderElectWithEpochScript() string
- func LuaLeaderRefreshScript() string
- func LuaLeaderRefreshWithEpochScript() string
- func LuaUnlockIfOwnerScript() string
- func NodeDrainKey(prefix, nodeID string) string
- func NodeKey(prefix, nodeID string) string
- func NodesAllKey(prefix string) string
- func OverlapBufferListKey(prefix, jobID string) string
- func OverlapBufferedKey(prefix, jobID string) string
- func ParseRedisInfo(raw string) map[string]map[string]string
- func QueuePausedKey(prefix, tierName string) string
- func RequestDedupKey(prefix, reqID string) string
- func ResultKey(prefix, jobID string) string
- func ResultNotifyChannel(prefix, jobID string) string
- func RunKey(prefix, runID string) string
- func RunsActiveByJobKey(prefix, jobID string) string
- func RunsActiveKey(prefix string) string
- func RunsByJobKey(prefix, jobID string) string
- func ScheduleKey(prefix, jobID string) string
- func SchedulesByNextKey(prefix string) string
- func SchedulesDueKey(prefix string) string
- func SideEffectKey(prefix, runID, stepKey string) string
- func SignalDedupKey(prefix, wfID, dedupeKey string) string
- func TiersKey(prefix string) string
- func UniqueKeyKey(prefix, uniqueKey string) string
- func WorkMessageFromStreamValues(values map[string]string) types.WorkMessage
- func WorkStreamKey(prefix, tierName string) string
- func WorkflowArchiveKey(prefix, wfID string) string
- func WorkflowDeadlinesKey(prefix string) string
- func WorkflowEventsKey(prefix, wfID string) string
- func WorkflowKey(prefix, id string) string
- func WorkflowPayloadKey(prefix, id string) string
- func WorkflowSignalStreamKey(prefix, wfID string) string
- func WorkflowsByCreatedKey(prefix string) string
- func WorkflowsByStatusKey(prefix, status string) string
- type AuditEntry
- type BatchFilter
- type CompletionBatch
- type DailyStats
- type DelayedPoller
- type EventFilter
- type JobFilter
- type JobStats
- type RedisStore
- func (s *RedisStore) AckMessage(ctx context.Context, tierName, messageID string) error
- func (s *RedisStore) AckSignals(ctx context.Context, workflowID string, streamIDs []string) error
- func (s *RedisStore) AcquireConcurrencySlot(ctx context.Context, concKey, runID string, maxConcurrency int) (bool, error)
- func (s *RedisStore) AddDelayed(ctx context.Context, tierName string, wm *types.WorkMessage, ...) error
- func (s *RedisStore) AddToGroup(ctx context.Context, group string, msg types.GroupMessage) (int64, error)
- func (s *RedisStore) CancelChannel() string
- func (s *RedisStore) CheckUniqueKey(ctx context.Context, uniqueKey string) (string, bool, error)
- func (s *RedisStore) ClaimSideEffect(ctx context.Context, runID, stepKey string, ttlSeconds int) (string, bool, error)
- func (s *RedisStore) ClaimStaleMessages(ctx context.Context, tierName, consumerName string, minIdleTime time.Duration, ...) ([]rueidis.XRangeEntry, error)
- func (s *RedisStore) Cleanup(ctx context.Context, olderThan time.Time) (int64, error)
- func (s *RedisStore) ClearCancelFlag(ctx context.Context, runID string)
- func (s *RedisStore) Client() rueidis.Client
- func (s *RedisStore) Close() error
- func (s *RedisStore) CompactWorkflowTasks(ctx context.Context, wf *types.WorkflowInstance, keep int) (int, error)
- func (s *RedisStore) CompleteRun(ctx context.Context, batch *CompletionBatch) error
- func (s *RedisStore) CompleteSideEffect(ctx context.Context, runID, stepKey string, result string) error
- func (s *RedisStore) ConcurrencySlotCount(ctx context.Context, concKey string) (int64, error)
- func (s *RedisStore) Config() RedisStoreConfig
- func (s *RedisStore) ConsumeSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)
- func (s *RedisStore) CreateJob(ctx context.Context, job *types.Job) (uint64, error)
- func (s *RedisStore) DeleteBatch(ctx context.Context, batchID string) error
- func (s *RedisStore) DeleteGroupMessages(ctx context.Context, group string)
- func (s *RedisStore) DeleteJob(ctx context.Context, jobID string) error
- func (s *RedisStore) DeleteJobAuditTrail(ctx context.Context, jobID string)
- func (s *RedisStore) DeleteRun(ctx context.Context, runID string) error
- func (s *RedisStore) DeleteSchedule(ctx context.Context, jobID string) error
- func (s *RedisStore) DeleteSignalStream(ctx context.Context, workflowID string)
- func (s *RedisStore) DeleteUniqueKey(ctx context.Context, uniqueKey string) error
- func (s *RedisStore) DeleteWorkflow(ctx context.Context, id string) error
- func (s *RedisStore) DeleteWorkflowEvents(ctx context.Context, workflowID string)
- func (s *RedisStore) DispatchToDLQ(ctx context.Context, wm *types.WorkMessage) error
- func (s *RedisStore) DispatchWork(ctx context.Context, tierName string, wm *types.WorkMessage) (string, error)
- func (s *RedisStore) EnsureStreams(ctx context.Context) error
- func (s *RedisStore) ExtendResultTTL(ctx context.Context, jobID string, ttl time.Duration) error
- func (s *RedisStore) FindBatchByPayloadPath(ctx context.Context, jsonPath string, value any) (*types.BatchInstance, uint64, error)
- func (s *RedisStore) FindJobByPayloadPath(ctx context.Context, jsonPath string, value any, taskType string) (*types.Job, uint64, error)
- func (s *RedisStore) FindWorkflowByTaskPayloadPath(ctx context.Context, jsonPath string, value any) (*types.WorkflowInstance, uint64, error)
- func (s *RedisStore) FlushGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
- func (s *RedisStore) GetActiveJobStats(ctx context.Context) (map[types.JobStatus]int, error)
- func (s *RedisStore) GetArchivedTask(ctx context.Context, wfID, taskName string) (*types.WorkflowTaskState, error)
- func (s *RedisStore) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
- func (s *RedisStore) GetBatchByID(ctx context.Context, id string) (*types.BatchInstance, error)
- func (s *RedisStore) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)
- func (s *RedisStore) GetDailyStats(ctx context.Context, days int) ([]DailyStats, error)
- func (s *RedisStore) GetGroupSize(ctx context.Context, group string) (int64, error)
- func (s *RedisStore) GetHistoryRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
- func (s *RedisStore) GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)
- func (s *RedisStore) GetJobAuditCount(ctx context.Context, jobID string) (int64, error)
- func (s *RedisStore) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)
- func (s *RedisStore) GetJobAuditTrail(ctx context.Context, jobID string) ([]AuditEntry, error)
- func (s *RedisStore) GetJobRevision(ctx context.Context, jobID string) (uint64, error)
- func (s *RedisStore) GetJobStats(ctx context.Context, _ StatsFilter) (*JobStats, error)
- func (s *RedisStore) GetNode(ctx context.Context, nodeID string) (*types.NodeInfo, error)
- func (s *RedisStore) GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)
- func (s *RedisStore) GetRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
- func (s *RedisStore) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
- func (s *RedisStore) GetSignalStats(ctx context.Context, workflowID string) (*SignalStats, error)
- func (s *RedisStore) GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
- func (s *RedisStore) GetWorkflowByID(ctx context.Context, id string) (*types.WorkflowInstance, error)
- func (s *RedisStore) GetWorkflowEvents(ctx context.Context, workflowID string) ([]types.JobEvent, error)
- func (s *RedisStore) HasActiveRunForJob(ctx context.Context, jobID string) (bool, error)
- func (s *RedisStore) IncrDailyStat(ctx context.Context, field string)
- func (s *RedisStore) IsCancelRequested(ctx context.Context, runID string) (bool, error)
- func (s *RedisStore) IsNodeDraining(ctx context.Context, nodeID string) (bool, error)
- func (s *RedisStore) IsQueuePaused(ctx context.Context, tierName string) (bool, error)
- func (s *RedisStore) JobStatusCounts(ctx context.Context) (map[types.JobStatus]int, error)
- func (s *RedisStore) ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
- func (s *RedisStore) ListBatchInstances(ctx context.Context, filter BatchFilter) ([]types.BatchInstance, int, error)
- func (s *RedisStore) ListBatchItemResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
- func (s *RedisStore) ListBatches(ctx context.Context) ([]*types.BatchInstance, error)
- func (s *RedisStore) ListBatchesByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.BatchInstance, error)
- func (s *RedisStore) ListDLQ(ctx context.Context, limit int) ([]*types.WorkMessage, error)
- func (s *RedisStore) ListDueBatchDeadlines(ctx context.Context, now time.Time) ([]*types.BatchInstance, error)
- func (s *RedisStore) ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)
- func (s *RedisStore) ListDueWorkflowDeadlines(ctx context.Context, now time.Time) ([]*types.WorkflowInstance, error)
- func (s *RedisStore) ListGroups(ctx context.Context) ([]string, error)
- func (s *RedisStore) ListJobEvents(ctx context.Context, filter EventFilter) ([]types.JobEvent, int, error)
- func (s *RedisStore) ListJobRuns(ctx context.Context, filter RunFilter) ([]types.JobRun, int, error)
- func (s *RedisStore) ListJobs(ctx context.Context) ([]*types.Job, error)
- func (s *RedisStore) ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
- func (s *RedisStore) ListJobsPaginated(ctx context.Context, filter JobFilter) ([]types.Job, int, error)
- func (s *RedisStore) ListNodes(ctx context.Context) ([]*types.NodeInfo, error)
- func (s *RedisStore) ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
- func (s *RedisStore) ListPausedQueues(ctx context.Context) ([]string, error)
- func (s *RedisStore) ListReadyGroups(ctx context.Context, gracePeriod, maxDelay time.Duration, maxSize int) ([]string, error)
- func (s *RedisStore) ListRuns(ctx context.Context) ([]*types.JobRun, error)
- func (s *RedisStore) ListSchedules(ctx context.Context) ([]*types.ScheduleEntry, error)
- func (s *RedisStore) ListSchedulesPaginated(ctx context.Context, filter ScheduleFilter) ([]types.ScheduleEntry, int, error)
- func (s *RedisStore) ListWorkflowInstances(ctx context.Context, filter WorkflowFilter) ([]types.WorkflowInstance, int, error)
- func (s *RedisStore) ListWorkflows(ctx context.Context) ([]*types.WorkflowInstance, error)
- func (s *RedisStore) ListWorkflowsByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.WorkflowInstance, error)
- func (s *RedisStore) MoveDelayedToStream(ctx context.Context, tierName string, maxMove int) (int64, error)
- func (s *RedisStore) PauseQueue(ctx context.Context, tierName string) error
- func (s *RedisStore) Ping(ctx context.Context) error
- func (s *RedisStore) PopOverlapBuffer(ctx context.Context, jobID string) (time.Time, bool, error)
- func (s *RedisStore) PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)
- func (s *RedisStore) Prefix() string
- func (s *RedisStore) PublishEvent(ctx context.Context, event types.JobEvent) error
- func (s *RedisStore) PublishJobNotification(ctx context.Context, jobID, taskType string, priority int) error
- func (s *RedisStore) PublishResult(ctx context.Context, result types.WorkResult) error
- func (s *RedisStore) PurgeJobData(ctx context.Context, jobID string) error
- func (s *RedisStore) PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
- func (s *RedisStore) ReadGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
- func (s *RedisStore) ReadSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)
- func (s *RedisStore) ReenqueueWork(ctx context.Context, tierName string, wm *types.WorkMessage, redeliveries int) error
- func (s *RedisStore) ReleaseConcurrencySlot(ctx context.Context, concKey, runID string) error
- func (s *RedisStore) RequestCancel(ctx context.Context, runID string) error
- func (s *RedisStore) SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
- func (s *RedisStore) SaveBatchInstance(ctx context.Context, batch *types.BatchInstance) error
- func (s *RedisStore) SaveBatchItemResult(ctx context.Context, result *types.BatchItemResult) error
- func (s *RedisStore) SaveJob(ctx context.Context, job *types.Job) (uint64, error)
- func (s *RedisStore) SaveJobEvent(ctx context.Context, event *types.JobEvent) error
- func (s *RedisStore) SaveJobRun(ctx context.Context, run *types.JobRun) error
- func (s *RedisStore) SaveNode(ctx context.Context, node *types.NodeInfo) (uint64, error)
- func (s *RedisStore) SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
- func (s *RedisStore) SaveRun(ctx context.Context, run *types.JobRun) (uint64, error)
- func (s *RedisStore) SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
- func (s *RedisStore) SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
- func (s *RedisStore) SaveWorkflowInstance(ctx context.Context, wf *types.WorkflowInstance) error
- func (s *RedisStore) SendSignal(ctx context.Context, signal *types.WorkflowSignal) error
- func (s *RedisStore) SetNodeDrain(ctx context.Context, nodeID string, drain bool) error
- func (s *RedisStore) SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
- func (s *RedisStore) TrimWorkflowEvents(ctx context.Context, wfID string, maxEntries int64) error
- func (s *RedisStore) UnpauseQueue(ctx context.Context, tierName string) error
- func (s *RedisStore) UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)
- func (s *RedisStore) UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
- func (s *RedisStore) UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)
- type RedisStoreConfig
- type RunFilter
- type ScheduleFilter
- type SignalStats
- type StatsFilter
- type Store
- type TierConfig
- type WorkflowFilter
Constants ¶
const ConsumerGroup = "dureq_workers"
Consumer group names
const OrchestratorConsumerGroup = "dureq_orchestrator"
Variables ¶
var ( ErrCASConflict = errors.New("dureq: CAS conflict") ErrAlreadyExists = errors.New("dureq: already exists") )
Sentinel errors for the Redis store.
Functions ¶
func BatchDeadlinesKey ¶ added in v0.2.0
func BatchPayloadKey ¶
func BatchResultsSetKey ¶
func BatchesByCreatedKey ¶
func BatchesByStatusKey ¶
func CancelFlagKey ¶ added in v0.2.1
Durable Cancel Flag (per-run, SET with EX for auto-cleanup)
func ConcurrencySlotKey ¶ added in v0.2.0
Concurrency slots (sorted set: member=runID, score=acquireTime) Hash-tagged with {concKey} for Redis Cluster slot co-location.
func DelayedKey ¶
func ElectionEpochKey ¶ added in v0.1.3
func ElectionLeaderKey ¶
Election Election keys use {election} hash tag so both land in the same Redis Cluster slot, enabling multi-key Lua scripts (elect + refresh with epoch).
func EventsBatchChannel ¶
func EventsStreamKey ¶
func FiringDedupKey ¶ added in v0.2.1
Firing Dedup (prevents duplicate dispatch of the same schedule firing under leader failover)
func GroupMessagesKey ¶
GroupMessagesKey stores the list of messages for a specific group.
func GroupMetaKey ¶
GroupMetaKey stores group metadata (first_added, last_added timestamps).
func GroupSetKey ¶
GroupSetKey stores the sorted set of groups with their first-added timestamp.
func HistoryEventKey ¶
func HistoryEventsByJobKey ¶
func HistoryEventsByTypeKey ¶
func HistoryEventsKey ¶
func HistoryRunKey ¶
func HistoryRunsByJobKey ¶
func HistoryRunsByStatusKey ¶
func JobAuditKey ¶ added in v0.1.3
Job Audit Trail (per-job state transition history)
func JobNotifyChannel ¶
Job Notification Pub/Sub (dureqv2 actor dispatch)
func JobPayloadKey ¶
JSON Payload mirrors (RedisJSON) — used for JSONPath-based search.
func JobsByBatchKey ¶
func JobsByCreatedKey ¶
func JobsByStatusKey ¶
func JobsByTagKey ¶
func JobsByTaskTypeKey ¶
func JobsByWorkflowKey ¶
func LuaExtendLockScript ¶
func LuaExtendLockScript() string
func LuaFlushGroupScript ¶ added in v0.1.3
func LuaFlushGroupScript() string
func LuaLeaderElectWithEpochScript ¶ added in v0.1.3
func LuaLeaderElectWithEpochScript() string
func LuaLeaderRefreshScript ¶
func LuaLeaderRefreshScript() string
func LuaLeaderRefreshWithEpochScript ¶ added in v0.1.3
func LuaLeaderRefreshWithEpochScript() string
func LuaUnlockIfOwnerScript ¶
func LuaUnlockIfOwnerScript() string
func NodesAllKey ¶
func OverlapBufferListKey ¶
func OverlapBufferedKey ¶
Overlap buffer (schedule overlap policy)
func ParseRedisInfo ¶
ParseRedisInfo parses raw Redis INFO output into a map of sections.
func QueuePausedKey ¶
Queue Pause (hash-tagged with tier for cluster slot co-location)
func RequestDedupKey ¶ added in v0.1.3
func ResultNotifyChannel ¶
func RunsActiveByJobKey ¶
func RunsActiveKey ¶
func RunsByJobKey ¶
func SchedulesByNextKey ¶
func SchedulesDueKey ¶
func SideEffectKey ¶ added in v0.2.0
Side-effect steps (per-run, single-key for Cluster compatibility)
func SignalDedupKey ¶ added in v0.2.0
Signal dedup key (SET NX with TTL)
func UniqueKeyKey ¶
func WorkMessageFromStreamValues ¶
func WorkMessageFromStreamValues(values map[string]string) types.WorkMessage
WorkMessageFromStreamValues constructs a WorkMessage from Redis stream field map.
func WorkStreamKey ¶
Work Streams (per tier) Hash-tagged keys: {tierName} ensures delayed + work + paused co-locate to the same Redis Cluster slot, enabling multi-key Lua scripts.
func WorkflowArchiveKey ¶ added in v0.2.0
Workflow Task Archive (hash: field=taskName, value=taskStateJSON)
func WorkflowDeadlinesKey ¶ added in v0.2.0
Deadline indexes (sorted set: score=deadline unix, member=ID)
func WorkflowEventsKey ¶ added in v0.1.3
Per-Workflow Event Index (sorted set: score=timestamp, member=event stream ID)
func WorkflowPayloadKey ¶
func WorkflowSignalStreamKey ¶ added in v0.1.3
Workflow Signals
func WorkflowsByCreatedKey ¶
func WorkflowsByStatusKey ¶
Types ¶
type AuditEntry ¶ added in v0.1.3
type AuditEntry struct {
ID string `json:"id"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
AuditEntry represents a single state transition in a job's audit trail.
type BatchFilter ¶
type BatchFilter struct {
Name *string
Status *types.WorkflowStatus
Sort string // "newest" or "oldest"
Limit int
Offset int
}
BatchFilter specifies criteria for listing batch instances.
type CompletionBatch ¶
type CompletionBatch struct {
Run *types.JobRun
Event types.JobEvent
Result types.WorkResult
DailyStatField string // "processed" or "failed"
AckTierName string // tier name for XACK (v1 only, empty for v2)
AckMessageID string // stream message ID for XACK (v1 only)
// ResultTTLOverride, if > 0, overrides the default ResultTTL for this result.
// Used by workflow tasks to extend result lifetime to match the workflow deadline.
ResultTTLOverride time.Duration
}
CompletionBatch holds pre-computed data for a batched job completion write.
type DailyStats ¶
type DailyStats struct {
Date string `json:"date"`
Processed int64 `json:"processed"`
Failed int64 `json:"failed"`
}
DailyStats holds processed/failed counts for a single date.
type DelayedPoller ¶
type DelayedPoller struct {
// contains filtered or unexported fields
}
DelayedPoller polls delayed sorted sets and moves ripe messages back to work streams.
func NewDelayedPoller ¶
func NewDelayedPoller(store *RedisStore, logger chainedlog.Logger) *DelayedPoller
NewDelayedPoller creates a delayed retry poller.
func (*DelayedPoller) Start ¶
func (p *DelayedPoller) Start(ctx context.Context)
Start begins polling in the background. Blocks until ctx is cancelled.
type EventFilter ¶
type EventFilter struct {
JobID *string
EventType *types.EventType
Since *time.Time
Until *time.Time
Limit int
Offset int
}
EventFilter specifies criteria for listing job events.
type JobFilter ¶
type JobFilter struct {
Status *types.JobStatus
TaskType *types.TaskType
Tag *string
Search *string // substring match on ID or TaskType
Sort string // "newest" or "oldest"
Limit int
Offset int
}
JobFilter specifies criteria for listing active jobs.
type JobStats ¶
type JobStats struct {
TotalJobs int64 `json:"total_jobs"`
TotalRuns int64 `json:"total_runs"`
ByStatus map[types.JobStatus]int64 `json:"by_status"`
ByTaskType map[types.TaskType]int64 `json:"by_task_type"`
AvgDuration time.Duration `json:"avg_duration"`
SuccessRate float64 `json:"success_rate"`
FailureRate float64 `json:"failure_rate"`
}
JobStats holds aggregated statistics.
type RedisStore ¶
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore manages all dureq state in Redis, replacing both NATSStore and HistoryStore.
func NewRedisStore ¶
func NewRedisStore(rdb rueidis.Client, cfg RedisStoreConfig, logger chainedlog.Logger) (*RedisStore, error)
NewRedisStore creates a new Redis store and registers tier configuration.
func (*RedisStore) AckMessage ¶
func (s *RedisStore) AckMessage(ctx context.Context, tierName, messageID string) error
AckMessage acknowledges a message in a tier's consumer group.
func (*RedisStore) AckSignals ¶ added in v0.2.0
AckSignals deletes processed signal entries from the workflow signal stream. Call this after successfully handling signals returned by ReadSignals.
func (*RedisStore) AcquireConcurrencySlot ¶ added in v0.2.0
func (s *RedisStore) AcquireConcurrencySlot(ctx context.Context, concKey, runID string, maxConcurrency int) (bool, error)
AcquireConcurrencySlot tries to acquire a concurrency slot for the given run. Returns true if the slot was acquired, false if at capacity.
func (*RedisStore) AddDelayed ¶
func (s *RedisStore) AddDelayed(ctx context.Context, tierName string, wm *types.WorkMessage, executeAt time.Time) error
AddDelayed adds a work message to the delayed sorted set for future re-dispatch.
func (*RedisStore) AddToGroup ¶
func (s *RedisStore) AddToGroup(ctx context.Context, group string, msg types.GroupMessage) (int64, error)
AddToGroup adds a message to a named group. Returns the current group size.
func (*RedisStore) CancelChannel ¶
func (s *RedisStore) CancelChannel() string
CancelChannel returns the Redis Pub/Sub channel name used for task cancellation signals.
func (*RedisStore) CheckUniqueKey ¶
CheckUniqueKey checks whether a unique key is claimed. Returns (jobID, exists, error).
func (*RedisStore) ClaimSideEffect ¶ added in v0.2.0
func (s *RedisStore) ClaimSideEffect(ctx context.Context, runID, stepKey string, ttlSeconds int) (string, bool, error)
ClaimSideEffect atomically claims a side-effect step for the given run. Returns (result, true) if the step was already completed (cached result), or ("", false) if the step was newly claimed and the caller should execute.
func (*RedisStore) ClaimStaleMessages ¶
func (s *RedisStore) ClaimStaleMessages(ctx context.Context, tierName, consumerName string, minIdleTime time.Duration, count int64) ([]rueidis.XRangeEntry, error)
ClaimStaleMessages reclaims messages that have been pending longer than minIdleTime. This handles worker crashes — other nodes pick up abandoned work.
func (*RedisStore) ClearCancelFlag ¶ added in v0.2.1
func (s *RedisStore) ClearCancelFlag(ctx context.Context, runID string)
ClearCancelFlag removes the durable cancel flag after a run completes.
func (*RedisStore) Client ¶
func (s *RedisStore) Client() rueidis.Client
func (*RedisStore) Close ¶
func (s *RedisStore) Close() error
func (*RedisStore) CompactWorkflowTasks ¶ added in v0.2.0
func (s *RedisStore) CompactWorkflowTasks(ctx context.Context, wf *types.WorkflowInstance, keep int) (int, error)
CompactWorkflowTasks moves completed tasks exceeding the threshold to the archive hash. Only non-terminal + the most recent `keep` completed tasks remain in the main Tasks map.
func (*RedisStore) CompleteRun ¶
func (s *RedisStore) CompleteRun(ctx context.Context, batch *CompletionBatch) error
CompleteRun performs SaveRun + SaveJobRun + DeleteRun + IncrDailyStat + PublishEvent + PublishResult + AckMessage. Slot-targeted commands are pipelined via DoMulti; PUBLISH (no-slot) commands are sent separately for Redis Cluster compatibility.
func (*RedisStore) CompleteSideEffect ¶ added in v0.2.0
func (s *RedisStore) CompleteSideEffect(ctx context.Context, runID, stepKey string, result string) error
CompleteSideEffect marks a side-effect step as done and stores the result.
func (*RedisStore) ConcurrencySlotCount ¶ added in v0.2.0
ConcurrencySlotCount returns the current number of active slots for a concurrency key.
func (*RedisStore) Config ¶
func (s *RedisStore) Config() RedisStoreConfig
func (*RedisStore) ConsumeSignals ¶ added in v0.1.3
func (s *RedisStore) ConsumeSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)
ConsumeSignals reads and acknowledges all pending signals for a workflow. Deprecated: Use ReadSignals + AckSignals for crash-safe signal processing. This method is retained for backward compatibility but is lossy — if the caller crashes after this call returns, the signals are permanently lost.
func (*RedisStore) CreateJob ¶
CreateJob atomically creates a job only if it doesn't already exist. Secondary indexes are added individually after creation.
func (*RedisStore) DeleteBatch ¶
func (s *RedisStore) DeleteBatch(ctx context.Context, batchID string) error
DeleteBatch removes a batch instance, all indexes, and associated data (batch item results, JSON mirror).
func (*RedisStore) DeleteGroupMessages ¶ added in v0.1.10
func (s *RedisStore) DeleteGroupMessages(ctx context.Context, group string)
DeleteGroupMessages atomically deletes all messages for a group and cleans up metadata. Should be called only after successful processing of ReadGroup results.
func (*RedisStore) DeleteJob ¶
func (s *RedisStore) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes a job and cleans all secondary indexes individually.
func (*RedisStore) DeleteJobAuditTrail ¶ added in v0.1.3
func (s *RedisStore) DeleteJobAuditTrail(ctx context.Context, jobID string)
DeleteJobAuditTrail removes the audit stream for a job (cleanup).
func (*RedisStore) DeleteRun ¶
func (s *RedisStore) DeleteRun(ctx context.Context, runID string) error
DeleteRun removes a run and cleans its indexes.
func (*RedisStore) DeleteSchedule ¶
func (s *RedisStore) DeleteSchedule(ctx context.Context, jobID string) error
DeleteSchedule removes a schedule and its index entry.
func (*RedisStore) DeleteSignalStream ¶ added in v0.1.3
func (s *RedisStore) DeleteSignalStream(ctx context.Context, workflowID string)
DeleteSignalStream removes the signal stream for a workflow (cleanup).
func (*RedisStore) DeleteUniqueKey ¶
func (s *RedisStore) DeleteUniqueKey(ctx context.Context, uniqueKey string) error
DeleteUniqueKey releases a unique key.
func (*RedisStore) DeleteWorkflow ¶
func (s *RedisStore) DeleteWorkflow(ctx context.Context, id string) error
DeleteWorkflow removes a workflow instance, all indexes, and associated data (signal stream, event index, task archive, JSON mirror).
func (*RedisStore) DeleteWorkflowEvents ¶ added in v0.1.3
func (s *RedisStore) DeleteWorkflowEvents(ctx context.Context, workflowID string)
DeleteWorkflowEvents removes the per-workflow event index (cleanup).
func (*RedisStore) DispatchToDLQ ¶
func (s *RedisStore) DispatchToDLQ(ctx context.Context, wm *types.WorkMessage) error
DispatchToDLQ adds a message to the dead letter queue stream.
func (*RedisStore) DispatchWork ¶
func (s *RedisStore) DispatchWork(ctx context.Context, tierName string, wm *types.WorkMessage) (string, error)
DispatchWork adds a work message to the tier-specific Redis Stream.
func (*RedisStore) EnsureStreams ¶
func (s *RedisStore) EnsureStreams(ctx context.Context) error
EnsureStreams creates consumer groups for all configured tier streams and the DLQ stream. Idempotent — ignores "BUSYGROUP" errors when group already exists.
func (*RedisStore) ExtendResultTTL ¶ added in v0.2.0
ExtendResultTTL extends the TTL of a stored result. Used to extend result lifetime for workflow-associated jobs so results survive for the workflow's entire execution duration.
func (*RedisStore) FindBatchByPayloadPath ¶
func (s *RedisStore) FindBatchByPayloadPath(ctx context.Context, jsonPath string, value any) (*types.BatchInstance, uint64, error)
FindBatchByPayloadPath scans all batches and returns the first where onetime or any item Payload matches the given JSONPath + value.
func (*RedisStore) FindJobByPayloadPath ¶
func (s *RedisStore) FindJobByPayloadPath(ctx context.Context, jsonPath string, value any, taskType string) (*types.Job, uint64, error)
FindJobByPayloadPath scans jobs and returns the first whose Payload matches the given JSONPath + value. Uses RedisJSON's JSON.GET for native path extraction. If taskType is non-empty, only jobs of that type are scanned (via the by_task_type index).
func (*RedisStore) FindWorkflowByTaskPayloadPath ¶
func (s *RedisStore) FindWorkflowByTaskPayloadPath(ctx context.Context, jsonPath string, value any) (*types.WorkflowInstance, uint64, error)
FindWorkflowByTaskPayloadPath scans all workflows and returns the first where any task's Payload matches the given JSONPath + value.
func (*RedisStore) FlushGroup ¶
func (s *RedisStore) FlushGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
FlushGroup atomically reads and deletes all messages from a group. Uses a single-key Lua script so that concurrent callers on different nodes are safe: only one caller will receive the messages; others get an empty result. Cleanup of the metadata hash and active set is done separately (idempotent).
func (*RedisStore) GetActiveJobStats ¶
GetActiveJobStats returns aggregated counts by job status (alias for monitor API).
func (*RedisStore) GetArchivedTask ¶ added in v0.2.0
func (s *RedisStore) GetArchivedTask(ctx context.Context, wfID, taskName string) (*types.WorkflowTaskState, error)
GetArchivedTask retrieves a single archived task state from the archive hash.
func (*RedisStore) GetBatch ¶
func (s *RedisStore) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
GetBatch retrieves a batch instance by ID.
func (*RedisStore) GetBatchByID ¶
func (s *RedisStore) GetBatchByID(ctx context.Context, id string) (*types.BatchInstance, error)
GetBatchByID retrieves a batch instance by ID (convenience for monitor API).
func (*RedisStore) GetBatchItemResult ¶
func (s *RedisStore) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)
GetBatchItemResult retrieves a single batch item result.
func (*RedisStore) GetDailyStats ¶
func (s *RedisStore) GetDailyStats(ctx context.Context, days int) ([]DailyStats, error)
GetDailyStats returns daily stats for the last N days.
func (*RedisStore) GetGroupSize ¶
GetGroupSize returns the number of messages in a group.
func (*RedisStore) GetHistoryRun ¶ added in v0.2.0
func (s *RedisStore) GetHistoryRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
GetHistoryRun retrieves a run from the history store by run ID.
func (*RedisStore) GetJobAuditCount ¶ added in v0.1.3
GetJobAuditCount returns the number of audit entries for a job (XLEN).
func (*RedisStore) GetJobAuditCounts ¶ added in v0.1.3
func (s *RedisStore) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)
GetJobAuditCounts returns audit entry counts for multiple jobs via pipelined XLEN.
func (*RedisStore) GetJobAuditTrail ¶ added in v0.1.3
func (s *RedisStore) GetJobAuditTrail(ctx context.Context, jobID string) ([]AuditEntry, error)
GetJobAuditTrail returns the state transition history for a job.
func (*RedisStore) GetJobRevision ¶ added in v0.1.10
GetJobRevision returns only the CAS revision for a job (without full deserialization).
func (*RedisStore) GetJobStats ¶
func (s *RedisStore) GetJobStats(ctx context.Context, _ StatsFilter) (*JobStats, error)
GetJobStats returns aggregated job statistics.
func (*RedisStore) GetNode ¶
GetNode retrieves a node by ID, returns nil if not found or TTL expired.
func (*RedisStore) GetResult ¶
func (s *RedisStore) GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)
GetResult retrieves a stored work result by job ID.
func (*RedisStore) GetSchedule ¶
func (s *RedisStore) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
GetSchedule retrieves a schedule entry by job ID.
func (*RedisStore) GetSignalStats ¶ added in v0.2.1
func (s *RedisStore) GetSignalStats(ctx context.Context, workflowID string) (*SignalStats, error)
GetSignalStats returns the number of pending signals and the age of the oldest unacked signal for a workflow. Uses XLEN + XRANGE COUNT 1.
func (*RedisStore) GetWorkflow ¶
func (s *RedisStore) GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
GetWorkflow retrieves a workflow instance by ID.
func (*RedisStore) GetWorkflowByID ¶
func (s *RedisStore) GetWorkflowByID(ctx context.Context, id string) (*types.WorkflowInstance, error)
GetWorkflowByID retrieves a workflow instance by ID (convenience for monitor API).
func (*RedisStore) GetWorkflowEvents ¶ added in v0.1.3
func (s *RedisStore) GetWorkflowEvents(ctx context.Context, workflowID string) ([]types.JobEvent, error)
GetWorkflowEvents returns all indexed events for a workflow, ordered by timestamp.
func (*RedisStore) HasActiveRunForJob ¶
HasActiveRunForJob returns true if any non-terminal run exists for the given jobID. Uses the per-job active run set for O(1) check.
func (*RedisStore) IncrDailyStat ¶
func (s *RedisStore) IncrDailyStat(ctx context.Context, field string)
IncrDailyStat atomically increments the processed or failed counter for today.
func (*RedisStore) IsCancelRequested ¶ added in v0.2.1
IsCancelRequested checks the durable cancel flag for a run.
func (*RedisStore) IsNodeDraining ¶ added in v0.1.3
IsNodeDraining returns true if the node is in drain mode.
func (*RedisStore) IsQueuePaused ¶
IsQueuePaused checks whether a tier/queue is currently paused.
func (*RedisStore) JobStatusCounts ¶
JobStatusCounts returns job counts by status using ZCARD on index sets.
func (*RedisStore) ListActiveRunsByJobID ¶
func (s *RedisStore) ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
ListActiveRunsByJobID returns all active runs belonging to a specific job. Uses the per-job active run set (RunsActiveByJobKey) for O(per-job) lookup.
func (*RedisStore) ListBatchInstances ¶
func (s *RedisStore) ListBatchInstances(ctx context.Context, filter BatchFilter) ([]types.BatchInstance, int, error)
ListBatchInstances returns paginated batch instances.
func (*RedisStore) ListBatchItemResults ¶
func (s *RedisStore) ListBatchItemResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
ListBatchItemResults returns all item results for a batch.
func (*RedisStore) ListBatches ¶
func (s *RedisStore) ListBatches(ctx context.Context) ([]*types.BatchInstance, error)
ListBatches returns all batch instances.
func (*RedisStore) ListBatchesByStatus ¶ added in v0.1.3
func (s *RedisStore) ListBatchesByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.BatchInstance, error)
ListBatchesByStatus returns batch instances with the given status.
func (*RedisStore) ListDLQ ¶
func (s *RedisStore) ListDLQ(ctx context.Context, limit int) ([]*types.WorkMessage, error)
ListDLQ reads recent messages from the DLQ stream.
func (*RedisStore) ListDueBatchDeadlines ¶ added in v0.2.0
func (s *RedisStore) ListDueBatchDeadlines(ctx context.Context, now time.Time) ([]*types.BatchInstance, error)
ListDueBatchDeadlines returns batches whose deadlines have passed. Uses the deadline sorted set for O(due) instead of O(all-running).
func (*RedisStore) ListDueSchedules ¶
func (s *RedisStore) ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)
ListDueSchedules returns schedules where NextRunAt <= cutoff. O(log N + M) using sorted set.
func (*RedisStore) ListDueWorkflowDeadlines ¶ added in v0.2.0
func (s *RedisStore) ListDueWorkflowDeadlines(ctx context.Context, now time.Time) ([]*types.WorkflowInstance, error)
ListDueWorkflowDeadlines returns workflows whose deadlines have passed. Uses the deadline sorted set for O(due) instead of O(all-running).
func (*RedisStore) ListGroups ¶
func (s *RedisStore) ListGroups(ctx context.Context) ([]string, error)
ListGroups returns all active group names.
func (*RedisStore) ListJobEvents ¶
func (s *RedisStore) ListJobEvents(ctx context.Context, filter EventFilter) ([]types.JobEvent, int, error)
ListJobEvents returns paginated job events from history.
func (*RedisStore) ListJobRuns ¶
func (s *RedisStore) ListJobRuns(ctx context.Context, filter RunFilter) ([]types.JobRun, int, error)
ListJobRuns returns paginated job runs from history.
func (*RedisStore) ListJobsByStatus ¶ added in v0.1.3
func (s *RedisStore) ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
ListJobsByStatus returns all jobs with the given status.
func (*RedisStore) ListJobsPaginated ¶
func (s *RedisStore) ListJobsPaginated(ctx context.Context, filter JobFilter) ([]types.Job, int, error)
ListJobsPaginated returns a paginated, filtered list of jobs backed by Redis sorted sets.
func (*RedisStore) ListNodes ¶
ListNodes returns all live nodes, pruning stale entries from the tracking set.
func (*RedisStore) ListPausedJobsDueForResume ¶ added in v0.1.10
func (s *RedisStore) ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
ListPausedJobsDueForResume returns paused jobs whose ResumeAt time has passed.
func (*RedisStore) ListPausedQueues ¶
func (s *RedisStore) ListPausedQueues(ctx context.Context) ([]string, error)
ListPausedQueues returns the names of all currently paused queues.
func (*RedisStore) ListReadyGroups ¶
func (s *RedisStore) ListReadyGroups(ctx context.Context, gracePeriod, maxDelay time.Duration, maxSize int) ([]string, error)
ListReadyGroups returns groups that are ready to be flushed based on the given criteria.
func (*RedisStore) ListSchedules ¶
func (s *RedisStore) ListSchedules(ctx context.Context) ([]*types.ScheduleEntry, error)
ListSchedules returns all active schedule entries.
func (*RedisStore) ListSchedulesPaginated ¶
func (s *RedisStore) ListSchedulesPaginated(ctx context.Context, filter ScheduleFilter) ([]types.ScheduleEntry, int, error)
ListSchedulesPaginated returns a paginated list of schedules.
func (*RedisStore) ListWorkflowInstances ¶
func (s *RedisStore) ListWorkflowInstances(ctx context.Context, filter WorkflowFilter) ([]types.WorkflowInstance, int, error)
ListWorkflowInstances returns paginated workflow instances.
func (*RedisStore) ListWorkflows ¶
func (s *RedisStore) ListWorkflows(ctx context.Context) ([]*types.WorkflowInstance, error)
ListWorkflows returns all workflow instances.
func (*RedisStore) ListWorkflowsByStatus ¶ added in v0.1.3
func (s *RedisStore) ListWorkflowsByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.WorkflowInstance, error)
ListWorkflowsByStatus returns workflow instances with the given status.
func (*RedisStore) MoveDelayedToStream ¶
func (s *RedisStore) MoveDelayedToStream(ctx context.Context, tierName string, maxMove int) (int64, error)
MoveDelayedToStream atomically moves ripe delayed messages back to the work stream. Returns the number of messages moved.
func (*RedisStore) PauseQueue ¶
func (s *RedisStore) PauseQueue(ctx context.Context, tierName string) error
PauseQueue marks a tier/queue as paused. Workers will skip paused queues.
func (*RedisStore) PopOverlapBuffer ¶
PopOverlapBuffer retrieves and deletes the buffered dispatch for BUFFER_ONE.
func (*RedisStore) PopOverlapBufferAll ¶
func (s *RedisStore) PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)
PopOverlapBufferAll pops the next buffered dispatch from BUFFER_ALL.
func (*RedisStore) Prefix ¶
func (s *RedisStore) Prefix() string
func (*RedisStore) PublishEvent ¶
PublishEvent publishes an event to both Pub/Sub (real-time) and the event stream (history).
func (*RedisStore) PublishJobNotification ¶
func (s *RedisStore) PublishJobNotification(ctx context.Context, jobID, taskType string, priority int) error
PublishJobNotification publishes a lightweight notification to the job:notify Pub/Sub channel. The NotifierActor subscribes to this channel and forwards it to the DispatcherActor.
func (*RedisStore) PublishResult ¶
func (s *RedisStore) PublishResult(ctx context.Context, result types.WorkResult) error
PublishResult stores a work result and notifies waiting clients.
func (*RedisStore) PurgeJobData ¶ added in v0.1.10
func (s *RedisStore) PurgeJobData(ctx context.Context, jobID string) error
PurgeJobData removes the payload and result data from a job while preserving metadata (status, timestamps, task type, etc.). This supports GDPR "right to erasure" by deleting personal data while maintaining audit trails.
func (*RedisStore) PushOverlapBufferAll ¶
func (s *RedisStore) PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
PushOverlapBufferAll appends to the BUFFER_ALL queue.
func (*RedisStore) ReadGroup ¶ added in v0.1.10
func (s *RedisStore) ReadGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
ReadGroup reads all messages from a group WITHOUT deleting them. Use DeleteGroupMessages after successful processing to remove them.
func (*RedisStore) ReadSignals ¶ added in v0.2.0
func (s *RedisStore) ReadSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)
ReadSignals reads all pending signals for a workflow without deleting them. Each returned signal has its StreamID populated so the caller can pass processed signals to AckSignals after handling. This prevents signal loss if the caller crashes between read and processing.
func (*RedisStore) ReenqueueWork ¶
func (s *RedisStore) ReenqueueWork(ctx context.Context, tierName string, wm *types.WorkMessage, redeliveries int) error
ReenqueueWork re-adds a work message to the stream without dedup. Used when a worker picks up a message for a task type it doesn't handle.
func (*RedisStore) ReleaseConcurrencySlot ¶ added in v0.2.0
func (s *RedisStore) ReleaseConcurrencySlot(ctx context.Context, concKey, runID string) error
ReleaseConcurrencySlot releases a concurrency slot held by the given run.
func (*RedisStore) RequestCancel ¶ added in v0.2.1
func (s *RedisStore) RequestCancel(ctx context.Context, runID string) error
RequestCancel sets a durable cancel flag for a run and publishes a Pub/Sub signal for immediate delivery. The flag has a 24-hour TTL for auto-cleanup. This dual-write pattern ensures cancellation survives worker disconnects: Pub/Sub is the fast path; the persisted flag is the durable fallback.
func (*RedisStore) SaveBatch ¶
func (s *RedisStore) SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
SaveBatch upserts a batch instance and maintains sorted-set indexes.
func (*RedisStore) SaveBatchInstance ¶
func (s *RedisStore) SaveBatchInstance(ctx context.Context, batch *types.BatchInstance) error
SaveBatchInstance saves a batch instance (alias for history compatibility).
func (*RedisStore) SaveBatchItemResult ¶
func (s *RedisStore) SaveBatchItemResult(ctx context.Context, result *types.BatchItemResult) error
SaveBatchItemResult stores a single batch item result.
func (*RedisStore) SaveJob ¶
SaveJob upserts a job and maintains all secondary indexes. The job hash is updated atomically via a single-key Lua script; secondary indexes are updated individually (best-effort) for Redis Cluster compatibility.
func (*RedisStore) SaveJobEvent ¶
SaveJobEvent persists a job event to the history indexes.
func (*RedisStore) SaveJobRun ¶
SaveJobRun persists a job run to the history indexes.
func (*RedisStore) SaveOverlapBuffer ¶
func (s *RedisStore) SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
SaveOverlapBuffer stores a buffered dispatch timestamp for BUFFER_ONE policy. Overwrites any previous buffered entry.
func (*RedisStore) SaveRun ¶
SaveRun upserts a job run and maintains the active set and by-job index.
func (*RedisStore) SaveSchedule ¶
func (s *RedisStore) SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
SaveSchedule upserts a schedule and updates the due sorted set.
func (*RedisStore) SaveWorkflow ¶
func (s *RedisStore) SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
SaveWorkflow upserts a workflow instance and maintains sorted-set indexes.
func (*RedisStore) SaveWorkflowInstance ¶
func (s *RedisStore) SaveWorkflowInstance(ctx context.Context, wf *types.WorkflowInstance) error
SaveWorkflowInstance saves a workflow instance (alias for history compatibility).
func (*RedisStore) SendSignal ¶ added in v0.1.3
func (s *RedisStore) SendSignal(ctx context.Context, signal *types.WorkflowSignal) error
SendSignal appends a signal to a workflow's signal stream. If the signal has a DedupeKey, it prevents duplicate signals using SET NX.
func (*RedisStore) SetNodeDrain ¶ added in v0.1.3
SetNodeDrain sets the drain flag for a node. While draining, the worker stops fetching new messages but continues processing in-flight tasks.
func (*RedisStore) SetUniqueKey ¶
func (s *RedisStore) SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
SetUniqueKey atomically claims a unique key for a job ID.
func (*RedisStore) TrimWorkflowEvents ¶ added in v0.2.0
TrimWorkflowEvents trims a workflow's event index to the most recent maxEntries.
func (*RedisStore) UnpauseQueue ¶
func (s *RedisStore) UnpauseQueue(ctx context.Context, tierName string) error
UnpauseQueue removes the pause flag from a tier/queue.
func (*RedisStore) UpdateBatch ¶
func (s *RedisStore) UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)
UpdateBatch performs a CAS update on a batch instance.
func (*RedisStore) UpdateJob ¶
func (s *RedisStore) UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
UpdateJob performs a CAS update on a job and maintains all secondary indexes. The job hash is updated atomically via a single-key Lua script; secondary indexes are diffed and updated individually.
func (*RedisStore) UpdateWorkflow ¶
func (s *RedisStore) UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)
UpdateWorkflow performs a CAS update on a workflow instance.
type RedisStoreConfig ¶
type RedisStoreConfig struct {
// KeyPrefix is prepended to all Redis keys (e.g., "prod" → "prod_dureq:...").
KeyPrefix string
// NodeTTL is the TTL for node heartbeat entries. Default: 15s.
NodeTTL time.Duration
// ElectionTTL is the TTL for the leader election key. Default: 10s.
ElectionTTL time.Duration
// LockTTL is the default TTL for distributed lock entries. Default: 30s.
LockTTL time.Duration
// EventStreamMaxLen is the max length of the events stream. Default: 50000.
EventStreamMaxLen int64
// DLQStreamMaxLen is the max length of the DLQ stream. Default: 10000.
DLQStreamMaxLen int64
// ResultTTL is the TTL for job result entries. Default: 1h.
ResultTTL time.Duration
// DedupTTL is the TTL for run ID deduplication keys. Default: 5m.
DedupTTL time.Duration
// RequestDedupTTL is the TTL for client-level request idempotency keys.
// Must exceed the maximum expected client retry window. Default: 1h.
RequestDedupTTL time.Duration
// OverlapBufferTTL is the TTL for buffered overlap dispatch entries.
// Must exceed the longest schedule interval (e.g., monthly cron = 31d).
// Default: 7d.
OverlapBufferTTL time.Duration
// WorkflowResultTTL is the TTL for workflow task result entries.
// Must exceed the longest workflow execution duration. Default: 7d.
WorkflowResultTTL time.Duration
// SideEffectTTL is the TTL for side-effect step cache entries.
// Must exceed the longest handler execution timeout. Default: 24h.
SideEffectTTL time.Duration
// SignalDedupTTL is the TTL for workflow signal deduplication keys.
// Must exceed the longest workflow lifetime. Default: 30d.
SignalDedupTTL time.Duration
// Tiers is the list of user-defined priority tiers. Default: [normal].
Tiers []TierConfig
}
RedisStoreConfig holds configuration for the Redis store.
type RunFilter ¶
type RunFilter struct {
JobID *string
NodeID *string
Status *types.RunStatus
TaskType *types.TaskType
Tags []string
Since *time.Time
Until *time.Time
Limit int
Offset int
Sort string // "newest" (default) or "oldest"
}
RunFilter specifies criteria for listing job runs.
type ScheduleFilter ¶
ScheduleFilter specifies criteria for listing active schedules.
type SignalStats ¶ added in v0.2.1
type SignalStats struct {
PendingCount int64 `json:"pending_count"`
OldestUnackedMs int64 `json:"oldest_unacked_age_ms"`
}
SignalStats holds signal stream statistics for a workflow.
type StatsFilter ¶
type StatsFilter struct {
TaskType *types.TaskType
Tags []string
Since *time.Time
Until *time.Time
}
StatsFilter specifies criteria for aggregated stats queries.
type Store ¶ added in v0.1.10
type Store interface {
// --- Job CRUD ---
SaveJob(ctx context.Context, job *types.Job) (uint64, error)
CreateJob(ctx context.Context, job *types.Job) (uint64, error)
GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)
UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
DeleteJob(ctx context.Context, jobID string) error
ListJobs(ctx context.Context) ([]*types.Job, error)
ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
GetJobRevision(ctx context.Context, jobID string) (uint64, error)
// --- Schedule ---
SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
DeleteSchedule(ctx context.Context, jobID string) error
ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)
// --- Run ---
SaveRun(ctx context.Context, run *types.JobRun) (uint64, error)
GetRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
DeleteRun(ctx context.Context, runID string) error
ListRuns(ctx context.Context) ([]*types.JobRun, error)
ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
HasActiveRunForJob(ctx context.Context, jobID string) (bool, error)
SaveJobRun(ctx context.Context, run *types.JobRun) error
// --- Workflow ---
SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)
// --- Batch ---
SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)
// --- Node ---
SaveNode(ctx context.Context, node *types.NodeInfo) (uint64, error)
ListNodes(ctx context.Context) ([]*types.NodeInfo, error)
// --- Events ---
PublishEvent(ctx context.Context, event types.JobEvent) error
PublishResult(ctx context.Context, result types.WorkResult) error
GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)
// --- Work dispatch ---
DispatchWork(ctx context.Context, tierName string, msg *types.WorkMessage) (string, error)
AckMessage(ctx context.Context, tierName string, msgID string) error
ReenqueueWork(ctx context.Context, tierName string, msg *types.WorkMessage, redeliveries int) error
AddDelayed(ctx context.Context, tierName string, msg *types.WorkMessage, executeAt time.Time) error
// --- Overlap ---
SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
PopOverlapBuffer(ctx context.Context, jobID string) (time.Time, bool, error)
PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)
// --- Unique keys ---
CheckUniqueKey(ctx context.Context, uniqueKey string) (string, bool, error)
SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
DeleteUniqueKey(ctx context.Context, uniqueKey string) error
// --- Stats ---
IncrDailyStat(ctx context.Context, field string)
// --- GDPR ---
PurgeJobData(ctx context.Context, jobID string) error
}
Store defines the core persistence interface for dureq. RedisStore is the primary implementation; this interface enables testing with in-memory stores and future backend alternatives.
type TierConfig ¶
type TierConfig struct {
// Name is the tier identifier (e.g., "urgent", "normal", "background").
Name string
// Weight controls fetch priority. Higher weight = polled first and more frequently.
Weight int
// FetchBatch is the number of messages to read per XREADGROUP call. Default: 10.
FetchBatch int
// RateLimit, if set, limits how many messages per second can be fetched from this tier.
// Uses a distributed token bucket. Zero = no rate limit.
RateLimit float64
// RateBurst is the maximum burst size for the rate limiter. Default: RateLimit.
RateBurst int
}
TierConfig defines a user-customizable priority tier.
func DefaultTiers ¶
func DefaultTiers() []TierConfig
DefaultTiers returns a sensible default tier configuration.
type WorkflowFilter ¶
type WorkflowFilter struct {
WorkflowName *string
Status *types.WorkflowStatus
Sort string // "newest" or "oldest"
Limit int
Offset int
}
WorkflowFilter specifies criteria for listing workflow instances.