Documentation
¶
Index ¶
- Constants
- Variables
- func BatchKey(prefix, id string) string
- func BatchPayloadKey(prefix, id string) string
- func BatchResultKey(prefix, batchID, itemID string) string
- func BatchResultsSetKey(prefix, batchID string) string
- func BatchesByCreatedKey(prefix string) string
- func BatchesByStatusKey(prefix, status string) string
- func CancelChannelKey(prefix string) string
- func DLQStreamKey(prefix string) string
- func DailyStatsKey(prefix, date string) string
- func DedupKey(prefix, runID string) string
- func DelayedKey(prefix, tierName string) string
- func ElectionEpochKey(prefix string) string
- func ElectionLeaderKey(prefix string) string
- func EventsBatchChannel(prefix, batchID string) string
- func EventsPubSubChannel(prefix string) string
- func EventsStreamKey(prefix string) string
- func GroupMessagesKey(prefix, group string) string
- func GroupMetaKey(prefix, group string) string
- func GroupSetKey(prefix string) string
- func HistoryEventKey(prefix, eventID string) string
- func HistoryEventsByJobKey(prefix, jobID string) string
- func HistoryEventsByTypeKey(prefix, eventType string) string
- func HistoryEventsKey(prefix string) string
- func HistoryRunKey(prefix, runID string) string
- func HistoryRunsByJobKey(prefix, jobID string) string
- func HistoryRunsByStatusKey(prefix, status string) string
- func HistoryRunsKey(prefix string) string
- func JobAuditKey(prefix, jobID string) string
- func JobKey(prefix, id string) string
- func JobNotifyChannel(prefix string) string
- func JobPayloadKey(prefix, id string) string
- func JobsByBatchKey(prefix, batchID string) string
- func JobsByCreatedKey(prefix string) string
- func JobsByStatusKey(prefix, status string) string
- func JobsByTagKey(prefix, tag string) string
- func JobsByTaskTypeKey(prefix, taskType string) string
- func JobsByWorkflowKey(prefix, wfID string) string
- func LockKey(prefix, key string) string
- func LuaExtendLockScript() string
- func LuaFlushGroupScript() string
- func LuaLeaderElectWithEpochScript() string
- func LuaLeaderRefreshScript() string
- func LuaLeaderRefreshWithEpochScript() string
- func LuaUnlockIfOwnerScript() string
- func NodeDrainKey(prefix, nodeID string) string
- func NodeKey(prefix, nodeID string) string
- func NodesAllKey(prefix string) string
- func OverlapBufferListKey(prefix, jobID string) string
- func OverlapBufferedKey(prefix, jobID string) string
- func ParseRedisInfo(raw string) map[string]map[string]string
- func QueuePausedKey(prefix, tierName string) string
- func RequestDedupKey(prefix, reqID string) string
- func ResultKey(prefix, jobID string) string
- func ResultNotifyChannel(prefix, jobID string) string
- func RunKey(prefix, runID string) string
- func RunsActiveByJobKey(prefix, jobID string) string
- func RunsActiveKey(prefix string) string
- func RunsByJobKey(prefix, jobID string) string
- func ScheduleKey(prefix, jobID string) string
- func SchedulesByNextKey(prefix string) string
- func SchedulesDueKey(prefix string) string
- func TiersKey(prefix string) string
- func UniqueKeyKey(prefix, uniqueKey string) string
- func WorkMessageFromStreamValues(values map[string]string) types.WorkMessage
- func WorkStreamKey(prefix, tierName string) string
- func WorkflowEventsKey(prefix, wfID string) string
- func WorkflowKey(prefix, id string) string
- func WorkflowPayloadKey(prefix, id string) string
- func WorkflowSignalStreamKey(prefix, wfID string) string
- func WorkflowsByCreatedKey(prefix string) string
- func WorkflowsByStatusKey(prefix, status string) string
- type AuditEntry
- type BatchFilter
- type CompletionBatch
- type DailyStats
- type DelayedPoller
- type EventFilter
- type JobFilter
- type JobStats
- type RedisStore
- func (s *RedisStore) AckMessage(ctx context.Context, tierName, messageID string) error
- func (s *RedisStore) AddDelayed(ctx context.Context, tierName string, wm *types.WorkMessage, ...) error
- func (s *RedisStore) AddToGroup(ctx context.Context, group string, msg types.GroupMessage) (int64, error)
- func (s *RedisStore) CancelChannel() string
- func (s *RedisStore) CheckUniqueKey(ctx context.Context, uniqueKey string) (string, bool, error)
- func (s *RedisStore) ClaimStaleMessages(ctx context.Context, tierName, consumerName string, minIdleTime time.Duration, ...) ([]rueidis.XRangeEntry, error)
- func (s *RedisStore) Cleanup(ctx context.Context, olderThan time.Time) (int64, error)
- func (s *RedisStore) Client() rueidis.Client
- func (s *RedisStore) Close() error
- func (s *RedisStore) CompleteRun(ctx context.Context, batch *CompletionBatch) error
- func (s *RedisStore) Config() RedisStoreConfig
- func (s *RedisStore) ConsumeSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)
- func (s *RedisStore) CreateJob(ctx context.Context, job *types.Job) (uint64, error)
- func (s *RedisStore) DeleteBatch(ctx context.Context, batchID string) error
- func (s *RedisStore) DeleteGroupMessages(ctx context.Context, group string)
- func (s *RedisStore) DeleteJob(ctx context.Context, jobID string) error
- func (s *RedisStore) DeleteJobAuditTrail(ctx context.Context, jobID string)
- func (s *RedisStore) DeleteRun(ctx context.Context, runID string) error
- func (s *RedisStore) DeleteSchedule(ctx context.Context, jobID string) error
- func (s *RedisStore) DeleteSignalStream(ctx context.Context, workflowID string)
- func (s *RedisStore) DeleteUniqueKey(ctx context.Context, uniqueKey string) error
- func (s *RedisStore) DeleteWorkflow(ctx context.Context, id string) error
- func (s *RedisStore) DeleteWorkflowEvents(ctx context.Context, workflowID string)
- func (s *RedisStore) DispatchToDLQ(ctx context.Context, wm *types.WorkMessage) error
- func (s *RedisStore) DispatchWork(ctx context.Context, tierName string, wm *types.WorkMessage) (string, error)
- func (s *RedisStore) EnsureStreams(ctx context.Context) error
- func (s *RedisStore) FindBatchByPayloadPath(ctx context.Context, jsonPath string, value any) (*types.BatchInstance, uint64, error)
- func (s *RedisStore) FindJobByPayloadPath(ctx context.Context, jsonPath string, value any, taskType string) (*types.Job, uint64, error)
- func (s *RedisStore) FindWorkflowByTaskPayloadPath(ctx context.Context, jsonPath string, value any) (*types.WorkflowInstance, uint64, error)
- func (s *RedisStore) FlushGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
- func (s *RedisStore) GetActiveJobStats(ctx context.Context) (map[types.JobStatus]int, error)
- func (s *RedisStore) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
- func (s *RedisStore) GetBatchByID(ctx context.Context, id string) (*types.BatchInstance, error)
- func (s *RedisStore) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)
- func (s *RedisStore) GetDailyStats(ctx context.Context, days int) ([]DailyStats, error)
- func (s *RedisStore) GetGroupSize(ctx context.Context, group string) (int64, error)
- func (s *RedisStore) GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)
- func (s *RedisStore) GetJobAuditCount(ctx context.Context, jobID string) (int64, error)
- func (s *RedisStore) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)
- func (s *RedisStore) GetJobAuditTrail(ctx context.Context, jobID string) ([]AuditEntry, error)
- func (s *RedisStore) GetJobRevision(ctx context.Context, jobID string) (uint64, error)
- func (s *RedisStore) GetJobStats(ctx context.Context, _ StatsFilter) (*JobStats, error)
- func (s *RedisStore) GetNode(ctx context.Context, nodeID string) (*types.NodeInfo, error)
- func (s *RedisStore) GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)
- func (s *RedisStore) GetRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
- func (s *RedisStore) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
- func (s *RedisStore) GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
- func (s *RedisStore) GetWorkflowByID(ctx context.Context, id string) (*types.WorkflowInstance, error)
- func (s *RedisStore) GetWorkflowEvents(ctx context.Context, workflowID string) ([]types.JobEvent, error)
- func (s *RedisStore) HasActiveRunForJob(ctx context.Context, jobID string) (bool, error)
- func (s *RedisStore) IncrDailyStat(ctx context.Context, field string)
- func (s *RedisStore) IsNodeDraining(ctx context.Context, nodeID string) (bool, error)
- func (s *RedisStore) IsQueuePaused(ctx context.Context, tierName string) (bool, error)
- func (s *RedisStore) JobStatusCounts(ctx context.Context) (map[types.JobStatus]int, error)
- func (s *RedisStore) ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
- func (s *RedisStore) ListBatchInstances(ctx context.Context, filter BatchFilter) ([]types.BatchInstance, int, error)
- func (s *RedisStore) ListBatchItemResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
- func (s *RedisStore) ListBatches(ctx context.Context) ([]*types.BatchInstance, error)
- func (s *RedisStore) ListBatchesByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.BatchInstance, error)
- func (s *RedisStore) ListDLQ(ctx context.Context, limit int) ([]*types.WorkMessage, error)
- func (s *RedisStore) ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)
- func (s *RedisStore) ListGroups(ctx context.Context) ([]string, error)
- func (s *RedisStore) ListJobEvents(ctx context.Context, filter EventFilter) ([]types.JobEvent, int, error)
- func (s *RedisStore) ListJobRuns(ctx context.Context, filter RunFilter) ([]types.JobRun, int, error)
- func (s *RedisStore) ListJobs(ctx context.Context) ([]*types.Job, error)
- func (s *RedisStore) ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
- func (s *RedisStore) ListJobsPaginated(ctx context.Context, filter JobFilter) ([]types.Job, int, error)
- func (s *RedisStore) ListNodes(ctx context.Context) ([]*types.NodeInfo, error)
- func (s *RedisStore) ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
- func (s *RedisStore) ListPausedQueues(ctx context.Context) ([]string, error)
- func (s *RedisStore) ListReadyGroups(ctx context.Context, gracePeriod, maxDelay time.Duration, maxSize int) ([]string, error)
- func (s *RedisStore) ListRuns(ctx context.Context) ([]*types.JobRun, error)
- func (s *RedisStore) ListSchedules(ctx context.Context) ([]*types.ScheduleEntry, error)
- func (s *RedisStore) ListSchedulesPaginated(ctx context.Context, filter ScheduleFilter) ([]types.ScheduleEntry, int, error)
- func (s *RedisStore) ListWorkflowInstances(ctx context.Context, filter WorkflowFilter) ([]types.WorkflowInstance, int, error)
- func (s *RedisStore) ListWorkflows(ctx context.Context) ([]*types.WorkflowInstance, error)
- func (s *RedisStore) ListWorkflowsByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.WorkflowInstance, error)
- func (s *RedisStore) MoveDelayedToStream(ctx context.Context, tierName string, maxMove int) (int64, error)
- func (s *RedisStore) PauseQueue(ctx context.Context, tierName string) error
- func (s *RedisStore) Ping(ctx context.Context) error
- func (s *RedisStore) PopOverlapBuffer(ctx context.Context, jobID string) (time.Time, bool, error)
- func (s *RedisStore) PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)
- func (s *RedisStore) Prefix() string
- func (s *RedisStore) PublishEvent(ctx context.Context, event types.JobEvent) error
- func (s *RedisStore) PublishJobNotification(ctx context.Context, jobID, taskType string, priority int) error
- func (s *RedisStore) PublishResult(ctx context.Context, result types.WorkResult) error
- func (s *RedisStore) PurgeJobData(ctx context.Context, jobID string) error
- func (s *RedisStore) PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
- func (s *RedisStore) ReadGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
- func (s *RedisStore) ReenqueueWork(ctx context.Context, tierName string, wm *types.WorkMessage, redeliveries int) error
- func (s *RedisStore) SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
- func (s *RedisStore) SaveBatchInstance(ctx context.Context, batch *types.BatchInstance) error
- func (s *RedisStore) SaveBatchItemResult(ctx context.Context, result *types.BatchItemResult) error
- func (s *RedisStore) SaveJob(ctx context.Context, job *types.Job) (uint64, error)
- func (s *RedisStore) SaveJobEvent(ctx context.Context, event *types.JobEvent) error
- func (s *RedisStore) SaveJobRun(ctx context.Context, run *types.JobRun) error
- func (s *RedisStore) SaveNode(ctx context.Context, node *types.NodeInfo) (uint64, error)
- func (s *RedisStore) SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
- func (s *RedisStore) SaveRun(ctx context.Context, run *types.JobRun) (uint64, error)
- func (s *RedisStore) SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
- func (s *RedisStore) SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
- func (s *RedisStore) SaveWorkflowInstance(ctx context.Context, wf *types.WorkflowInstance) error
- func (s *RedisStore) SendSignal(ctx context.Context, signal *types.WorkflowSignal) error
- func (s *RedisStore) SetNodeDrain(ctx context.Context, nodeID string, drain bool) error
- func (s *RedisStore) SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
- func (s *RedisStore) UnpauseQueue(ctx context.Context, tierName string) error
- func (s *RedisStore) UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)
- func (s *RedisStore) UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
- func (s *RedisStore) UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)
- type RedisStoreConfig
- type RunFilter
- type ScheduleFilter
- type StatsFilter
- type Store
- type TierConfig
- type WorkflowFilter
Constants ¶
const ConsumerGroup = "dureq_workers"
Consumer group names
const OrchestratorConsumerGroup = "dureq_orchestrator"
Variables ¶
var ( ErrCASConflict = errors.New("dureq: CAS conflict") ErrAlreadyExists = errors.New("dureq: already exists") )
Sentinel errors for the Redis store.
Functions ¶
func BatchPayloadKey ¶
func BatchResultsSetKey ¶
func BatchesByCreatedKey ¶
func BatchesByStatusKey ¶
func DelayedKey ¶
func ElectionEpochKey ¶ added in v0.1.3
func ElectionLeaderKey ¶
Election Election keys use {election} hash tag so both land in the same Redis Cluster slot, enabling multi-key Lua scripts (elect + refresh with epoch).
func EventsBatchChannel ¶
func EventsStreamKey ¶
func GroupMessagesKey ¶
GroupMessagesKey stores the list of messages for a specific group.
func GroupMetaKey ¶
GroupMetaKey stores group metadata (first_added, last_added timestamps).
func GroupSetKey ¶
GroupSetKey stores the sorted set of groups with their first-added timestamp.
func HistoryEventKey ¶
func HistoryEventsByJobKey ¶
func HistoryEventsByTypeKey ¶
func HistoryEventsKey ¶
func HistoryRunKey ¶
func HistoryRunsByJobKey ¶
func HistoryRunsByStatusKey ¶
func JobAuditKey ¶ added in v0.1.3
Job Audit Trail (per-job state transition history)
func JobNotifyChannel ¶
Job Notification Pub/Sub (dureqv2 actor dispatch)
func JobPayloadKey ¶
JSON Payload mirrors (RedisJSON) — used for JSONPath-based search.
func JobsByBatchKey ¶
func JobsByCreatedKey ¶
func JobsByStatusKey ¶
func JobsByTagKey ¶
func JobsByTaskTypeKey ¶
func JobsByWorkflowKey ¶
func LuaExtendLockScript ¶
func LuaExtendLockScript() string
func LuaFlushGroupScript ¶ added in v0.1.3
func LuaFlushGroupScript() string
func LuaLeaderElectWithEpochScript ¶ added in v0.1.3
func LuaLeaderElectWithEpochScript() string
func LuaLeaderRefreshScript ¶
func LuaLeaderRefreshScript() string
func LuaLeaderRefreshWithEpochScript ¶ added in v0.1.3
func LuaLeaderRefreshWithEpochScript() string
func LuaUnlockIfOwnerScript ¶
func LuaUnlockIfOwnerScript() string
func NodesAllKey ¶
func OverlapBufferListKey ¶
func OverlapBufferedKey ¶
Overlap buffer (schedule overlap policy)
func ParseRedisInfo ¶
ParseRedisInfo parses raw Redis INFO output into a map of sections.
func QueuePausedKey ¶
Queue Pause (hash-tagged with tier for cluster slot co-location)
func RequestDedupKey ¶ added in v0.1.3
func ResultNotifyChannel ¶
func RunsActiveByJobKey ¶
func RunsActiveKey ¶
func RunsByJobKey ¶
func SchedulesByNextKey ¶
func SchedulesDueKey ¶
func UniqueKeyKey ¶
func WorkMessageFromStreamValues ¶
func WorkMessageFromStreamValues(values map[string]string) types.WorkMessage
WorkMessageFromStreamValues constructs a WorkMessage from Redis stream field map.
func WorkStreamKey ¶
Work Streams (per tier) Hash-tagged keys: {tierName} ensures delayed + work + paused co-locate to the same Redis Cluster slot, enabling multi-key Lua scripts.
func WorkflowEventsKey ¶ added in v0.1.3
Per-Workflow Event Index (sorted set: score=timestamp, member=event stream ID)
func WorkflowPayloadKey ¶
func WorkflowSignalStreamKey ¶ added in v0.1.3
Workflow Signals
func WorkflowsByCreatedKey ¶
func WorkflowsByStatusKey ¶
Types ¶
type AuditEntry ¶ added in v0.1.3
type AuditEntry struct {
ID string `json:"id"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
AuditEntry represents a single state transition in a job's audit trail.
type BatchFilter ¶
type BatchFilter struct {
Name *string
Status *types.WorkflowStatus
Sort string // "newest" or "oldest"
Limit int
Offset int
}
BatchFilter specifies criteria for listing batch instances.
type CompletionBatch ¶
type CompletionBatch struct {
Run *types.JobRun
Event types.JobEvent
Result types.WorkResult
DailyStatField string // "processed" or "failed"
AckTierName string // tier name for XACK (v1 only, empty for v2)
AckMessageID string // stream message ID for XACK (v1 only)
}
CompletionBatch holds pre-computed data for a batched job completion write.
type DailyStats ¶
type DailyStats struct {
Date string `json:"date"`
Processed int64 `json:"processed"`
Failed int64 `json:"failed"`
}
DailyStats holds processed/failed counts for a single date.
type DelayedPoller ¶
type DelayedPoller struct {
// contains filtered or unexported fields
}
DelayedPoller polls delayed sorted sets and moves ripe messages back to work streams.
func NewDelayedPoller ¶
func NewDelayedPoller(store *RedisStore, logger gochainedlog.Logger) *DelayedPoller
NewDelayedPoller creates a delayed retry poller.
func (*DelayedPoller) Start ¶
func (p *DelayedPoller) Start(ctx context.Context)
Start begins polling in the background. Blocks until ctx is cancelled.
type EventFilter ¶
type EventFilter struct {
JobID *string
EventType *types.EventType
Since *time.Time
Until *time.Time
Limit int
Offset int
}
EventFilter specifies criteria for listing job events.
type JobFilter ¶
type JobFilter struct {
Status *types.JobStatus
TaskType *types.TaskType
Tag *string
Search *string // substring match on ID or TaskType
Sort string // "newest" or "oldest"
Limit int
Offset int
}
JobFilter specifies criteria for listing active jobs.
type JobStats ¶
type JobStats struct {
TotalJobs int64 `json:"total_jobs"`
TotalRuns int64 `json:"total_runs"`
ByStatus map[types.JobStatus]int64 `json:"by_status"`
ByTaskType map[types.TaskType]int64 `json:"by_task_type"`
AvgDuration time.Duration `json:"avg_duration"`
SuccessRate float64 `json:"success_rate"`
FailureRate float64 `json:"failure_rate"`
}
JobStats holds aggregated statistics.
type RedisStore ¶
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore manages all dureq state in Redis, replacing both NATSStore and HistoryStore.
func NewRedisStore ¶
func NewRedisStore(rdb rueidis.Client, cfg RedisStoreConfig, logger gochainedlog.Logger) (*RedisStore, error)
NewRedisStore creates a new Redis store and registers tier configuration.
func (*RedisStore) AckMessage ¶
func (s *RedisStore) AckMessage(ctx context.Context, tierName, messageID string) error
AckMessage acknowledges a message in a tier's consumer group.
func (*RedisStore) AddDelayed ¶
func (s *RedisStore) AddDelayed(ctx context.Context, tierName string, wm *types.WorkMessage, executeAt time.Time) error
AddDelayed adds a work message to the delayed sorted set for future re-dispatch.
func (*RedisStore) AddToGroup ¶
func (s *RedisStore) AddToGroup(ctx context.Context, group string, msg types.GroupMessage) (int64, error)
AddToGroup adds a message to a named group. Returns the current group size.
func (*RedisStore) CancelChannel ¶
func (s *RedisStore) CancelChannel() string
CancelChannel returns the Redis Pub/Sub channel name used for task cancellation signals.
func (*RedisStore) CheckUniqueKey ¶
CheckUniqueKey checks whether a unique key is claimed. Returns (jobID, exists, error).
func (*RedisStore) ClaimStaleMessages ¶
func (s *RedisStore) ClaimStaleMessages(ctx context.Context, tierName, consumerName string, minIdleTime time.Duration, count int64) ([]rueidis.XRangeEntry, error)
ClaimStaleMessages reclaims messages that have been pending longer than minIdleTime. This handles worker crashes — other nodes pick up abandoned work.
func (*RedisStore) Client ¶
func (s *RedisStore) Client() rueidis.Client
func (*RedisStore) Close ¶
func (s *RedisStore) Close() error
func (*RedisStore) CompleteRun ¶
func (s *RedisStore) CompleteRun(ctx context.Context, batch *CompletionBatch) error
CompleteRun performs SaveRun + SaveJobRun + DeleteRun + IncrDailyStat + PublishEvent + PublishResult + AckMessage. Slot-targeted commands are pipelined via DoMulti; PUBLISH (no-slot) commands are sent separately for Redis Cluster compatibility.
func (*RedisStore) Config ¶
func (s *RedisStore) Config() RedisStoreConfig
func (*RedisStore) ConsumeSignals ¶ added in v0.1.3
func (s *RedisStore) ConsumeSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)
ConsumeSignals reads and acknowledges all pending signals for a workflow. Returns the signals and removes them from the stream.
func (*RedisStore) CreateJob ¶
CreateJob atomically creates a job only if it doesn't already exist. Secondary indexes are added individually after creation.
func (*RedisStore) DeleteBatch ¶
func (s *RedisStore) DeleteBatch(ctx context.Context, batchID string) error
DeleteBatch removes a batch instance and its indexes.
func (*RedisStore) DeleteGroupMessages ¶ added in v0.1.10
func (s *RedisStore) DeleteGroupMessages(ctx context.Context, group string)
DeleteGroupMessages atomically deletes all messages for a group and cleans up metadata. Should be called only after successful processing of ReadGroup results.
func (*RedisStore) DeleteJob ¶
func (s *RedisStore) DeleteJob(ctx context.Context, jobID string) error
DeleteJob removes a job and cleans all secondary indexes individually.
func (*RedisStore) DeleteJobAuditTrail ¶ added in v0.1.3
func (s *RedisStore) DeleteJobAuditTrail(ctx context.Context, jobID string)
DeleteJobAuditTrail removes the audit stream for a job (cleanup).
func (*RedisStore) DeleteRun ¶
func (s *RedisStore) DeleteRun(ctx context.Context, runID string) error
DeleteRun removes a run and cleans its indexes.
func (*RedisStore) DeleteSchedule ¶
func (s *RedisStore) DeleteSchedule(ctx context.Context, jobID string) error
DeleteSchedule removes a schedule and its index entry.
func (*RedisStore) DeleteSignalStream ¶ added in v0.1.3
func (s *RedisStore) DeleteSignalStream(ctx context.Context, workflowID string)
DeleteSignalStream removes the signal stream for a workflow (cleanup).
func (*RedisStore) DeleteUniqueKey ¶
func (s *RedisStore) DeleteUniqueKey(ctx context.Context, uniqueKey string) error
DeleteUniqueKey releases a unique key.
func (*RedisStore) DeleteWorkflow ¶
func (s *RedisStore) DeleteWorkflow(ctx context.Context, id string) error
DeleteWorkflow removes a workflow instance and its indexes.
func (*RedisStore) DeleteWorkflowEvents ¶ added in v0.1.3
func (s *RedisStore) DeleteWorkflowEvents(ctx context.Context, workflowID string)
DeleteWorkflowEvents removes the per-workflow event index (cleanup).
func (*RedisStore) DispatchToDLQ ¶
func (s *RedisStore) DispatchToDLQ(ctx context.Context, wm *types.WorkMessage) error
DispatchToDLQ adds a message to the dead letter queue stream.
func (*RedisStore) DispatchWork ¶
func (s *RedisStore) DispatchWork(ctx context.Context, tierName string, wm *types.WorkMessage) (string, error)
DispatchWork adds a work message to the tier-specific Redis Stream.
func (*RedisStore) EnsureStreams ¶
func (s *RedisStore) EnsureStreams(ctx context.Context) error
EnsureStreams creates consumer groups for all configured tier streams and the DLQ stream. Idempotent — ignores "BUSYGROUP" errors when group already exists.
func (*RedisStore) FindBatchByPayloadPath ¶
func (s *RedisStore) FindBatchByPayloadPath(ctx context.Context, jsonPath string, value any) (*types.BatchInstance, uint64, error)
FindBatchByPayloadPath scans all batches and returns the first where onetime or any item Payload matches the given JSONPath + value.
func (*RedisStore) FindJobByPayloadPath ¶
func (s *RedisStore) FindJobByPayloadPath(ctx context.Context, jsonPath string, value any, taskType string) (*types.Job, uint64, error)
FindJobByPayloadPath scans jobs and returns the first whose Payload matches the given JSONPath + value. Uses RedisJSON's JSON.GET for native path extraction. If taskType is non-empty, only jobs of that type are scanned (via the by_task_type index).
func (*RedisStore) FindWorkflowByTaskPayloadPath ¶
func (s *RedisStore) FindWorkflowByTaskPayloadPath(ctx context.Context, jsonPath string, value any) (*types.WorkflowInstance, uint64, error)
FindWorkflowByTaskPayloadPath scans all workflows and returns the first where any task's Payload matches the given JSONPath + value.
func (*RedisStore) FlushGroup ¶
func (s *RedisStore) FlushGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
FlushGroup atomically reads and deletes all messages from a group. Uses a single-key Lua script so that concurrent callers on different nodes are safe: only one caller will receive the messages; others get an empty result. Cleanup of the metadata hash and active set is done separately (idempotent).
func (*RedisStore) GetActiveJobStats ¶
GetActiveJobStats returns aggregated counts by job status (alias for monitor API).
func (*RedisStore) GetBatch ¶
func (s *RedisStore) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
GetBatch retrieves a batch instance by ID.
func (*RedisStore) GetBatchByID ¶
func (s *RedisStore) GetBatchByID(ctx context.Context, id string) (*types.BatchInstance, error)
GetBatchByID retrieves a batch instance by ID (convenience for monitor API).
func (*RedisStore) GetBatchItemResult ¶
func (s *RedisStore) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)
GetBatchItemResult retrieves a single batch item result.
func (*RedisStore) GetDailyStats ¶
func (s *RedisStore) GetDailyStats(ctx context.Context, days int) ([]DailyStats, error)
GetDailyStats returns daily stats for the last N days.
func (*RedisStore) GetGroupSize ¶
GetGroupSize returns the number of messages in a group.
func (*RedisStore) GetJobAuditCount ¶ added in v0.1.3
GetJobAuditCount returns the number of audit entries for a job (XLEN).
func (*RedisStore) GetJobAuditCounts ¶ added in v0.1.3
func (s *RedisStore) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)
GetJobAuditCounts returns audit entry counts for multiple jobs via pipelined XLEN.
func (*RedisStore) GetJobAuditTrail ¶ added in v0.1.3
func (s *RedisStore) GetJobAuditTrail(ctx context.Context, jobID string) ([]AuditEntry, error)
GetJobAuditTrail returns the state transition history for a job.
func (*RedisStore) GetJobRevision ¶ added in v0.1.10
GetJobRevision returns only the CAS revision for a job (without full deserialization).
func (*RedisStore) GetJobStats ¶
func (s *RedisStore) GetJobStats(ctx context.Context, _ StatsFilter) (*JobStats, error)
GetJobStats returns aggregated job statistics.
func (*RedisStore) GetNode ¶
GetNode retrieves a node by ID, returns nil if not found or TTL expired.
func (*RedisStore) GetResult ¶
func (s *RedisStore) GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)
GetResult retrieves a stored work result by job ID.
func (*RedisStore) GetSchedule ¶
func (s *RedisStore) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
GetSchedule retrieves a schedule entry by job ID.
func (*RedisStore) GetWorkflow ¶
func (s *RedisStore) GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
GetWorkflow retrieves a workflow instance by ID.
func (*RedisStore) GetWorkflowByID ¶
func (s *RedisStore) GetWorkflowByID(ctx context.Context, id string) (*types.WorkflowInstance, error)
GetWorkflowByID retrieves a workflow instance by ID (convenience for monitor API).
func (*RedisStore) GetWorkflowEvents ¶ added in v0.1.3
func (s *RedisStore) GetWorkflowEvents(ctx context.Context, workflowID string) ([]types.JobEvent, error)
GetWorkflowEvents returns all indexed events for a workflow, ordered by timestamp.
func (*RedisStore) HasActiveRunForJob ¶
HasActiveRunForJob returns true if any non-terminal run exists for the given jobID. Uses the per-job active run set for O(1) check.
func (*RedisStore) IncrDailyStat ¶
func (s *RedisStore) IncrDailyStat(ctx context.Context, field string)
IncrDailyStat atomically increments the processed or failed counter for today.
func (*RedisStore) IsNodeDraining ¶ added in v0.1.3
IsNodeDraining returns true if the node is in drain mode.
func (*RedisStore) IsQueuePaused ¶
IsQueuePaused checks whether a tier/queue is currently paused.
func (*RedisStore) JobStatusCounts ¶
JobStatusCounts returns job counts by status using ZCARD on index sets.
func (*RedisStore) ListActiveRunsByJobID ¶
func (s *RedisStore) ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
ListActiveRunsByJobID returns all active runs belonging to a specific job. Uses the per-job active run set (RunsActiveByJobKey) for O(per-job) lookup.
func (*RedisStore) ListBatchInstances ¶
func (s *RedisStore) ListBatchInstances(ctx context.Context, filter BatchFilter) ([]types.BatchInstance, int, error)
ListBatchInstances returns paginated batch instances.
func (*RedisStore) ListBatchItemResults ¶
func (s *RedisStore) ListBatchItemResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
ListBatchItemResults returns all item results for a batch.
func (*RedisStore) ListBatches ¶
func (s *RedisStore) ListBatches(ctx context.Context) ([]*types.BatchInstance, error)
ListBatches returns all batch instances.
func (*RedisStore) ListBatchesByStatus ¶ added in v0.1.3
func (s *RedisStore) ListBatchesByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.BatchInstance, error)
ListBatchesByStatus returns batch instances with the given status.
func (*RedisStore) ListDLQ ¶
func (s *RedisStore) ListDLQ(ctx context.Context, limit int) ([]*types.WorkMessage, error)
ListDLQ reads recent messages from the DLQ stream.
func (*RedisStore) ListDueSchedules ¶
func (s *RedisStore) ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)
ListDueSchedules returns schedules where NextRunAt <= cutoff. O(log N + M) using sorted set.
func (*RedisStore) ListGroups ¶
func (s *RedisStore) ListGroups(ctx context.Context) ([]string, error)
ListGroups returns all active group names.
func (*RedisStore) ListJobEvents ¶
func (s *RedisStore) ListJobEvents(ctx context.Context, filter EventFilter) ([]types.JobEvent, int, error)
ListJobEvents returns paginated job events from history.
func (*RedisStore) ListJobRuns ¶
func (s *RedisStore) ListJobRuns(ctx context.Context, filter RunFilter) ([]types.JobRun, int, error)
ListJobRuns returns paginated job runs from history.
func (*RedisStore) ListJobsByStatus ¶ added in v0.1.3
func (s *RedisStore) ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
ListJobsByStatus returns all jobs with the given status.
func (*RedisStore) ListJobsPaginated ¶
func (s *RedisStore) ListJobsPaginated(ctx context.Context, filter JobFilter) ([]types.Job, int, error)
ListJobsPaginated returns a paginated, filtered list of jobs backed by Redis sorted sets.
func (*RedisStore) ListNodes ¶
ListNodes returns all live nodes, pruning stale entries from the tracking set.
func (*RedisStore) ListPausedJobsDueForResume ¶ added in v0.1.10
func (s *RedisStore) ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
ListPausedJobsDueForResume returns paused jobs whose ResumeAt time has passed.
func (*RedisStore) ListPausedQueues ¶
func (s *RedisStore) ListPausedQueues(ctx context.Context) ([]string, error)
ListPausedQueues returns the names of all currently paused queues.
func (*RedisStore) ListReadyGroups ¶
func (s *RedisStore) ListReadyGroups(ctx context.Context, gracePeriod, maxDelay time.Duration, maxSize int) ([]string, error)
ListReadyGroups returns groups that are ready to be flushed based on the given criteria.
func (*RedisStore) ListSchedules ¶
func (s *RedisStore) ListSchedules(ctx context.Context) ([]*types.ScheduleEntry, error)
ListSchedules returns all active schedule entries.
func (*RedisStore) ListSchedulesPaginated ¶
func (s *RedisStore) ListSchedulesPaginated(ctx context.Context, filter ScheduleFilter) ([]types.ScheduleEntry, int, error)
ListSchedulesPaginated returns a paginated list of schedules.
func (*RedisStore) ListWorkflowInstances ¶
func (s *RedisStore) ListWorkflowInstances(ctx context.Context, filter WorkflowFilter) ([]types.WorkflowInstance, int, error)
ListWorkflowInstances returns paginated workflow instances.
func (*RedisStore) ListWorkflows ¶
func (s *RedisStore) ListWorkflows(ctx context.Context) ([]*types.WorkflowInstance, error)
ListWorkflows returns all workflow instances.
func (*RedisStore) ListWorkflowsByStatus ¶ added in v0.1.3
func (s *RedisStore) ListWorkflowsByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.WorkflowInstance, error)
ListWorkflowsByStatus returns workflow instances with the given status.
func (*RedisStore) MoveDelayedToStream ¶
func (s *RedisStore) MoveDelayedToStream(ctx context.Context, tierName string, maxMove int) (int64, error)
MoveDelayedToStream atomically moves ripe delayed messages back to the work stream. Returns the number of messages moved.
func (*RedisStore) PauseQueue ¶
func (s *RedisStore) PauseQueue(ctx context.Context, tierName string) error
PauseQueue marks a tier/queue as paused. Workers will skip paused queues.
func (*RedisStore) PopOverlapBuffer ¶
PopOverlapBuffer retrieves and deletes the buffered dispatch for BUFFER_ONE.
func (*RedisStore) PopOverlapBufferAll ¶
func (s *RedisStore) PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)
PopOverlapBufferAll pops the next buffered dispatch from BUFFER_ALL.
func (*RedisStore) Prefix ¶
func (s *RedisStore) Prefix() string
func (*RedisStore) PublishEvent ¶
PublishEvent publishes an event to both Pub/Sub (real-time) and the event stream (history).
func (*RedisStore) PublishJobNotification ¶
func (s *RedisStore) PublishJobNotification(ctx context.Context, jobID, taskType string, priority int) error
PublishJobNotification publishes a lightweight notification to the job:notify Pub/Sub channel. The NotifierActor subscribes to this channel and forwards it to the DispatcherActor.
func (*RedisStore) PublishResult ¶
func (s *RedisStore) PublishResult(ctx context.Context, result types.WorkResult) error
PublishResult stores a work result and notifies waiting clients.
func (*RedisStore) PurgeJobData ¶ added in v0.1.10
func (s *RedisStore) PurgeJobData(ctx context.Context, jobID string) error
PurgeJobData removes the payload and result data from a job while preserving metadata (status, timestamps, task type, etc.). This supports GDPR "right to erasure" by deleting personal data while maintaining audit trails.
func (*RedisStore) PushOverlapBufferAll ¶
func (s *RedisStore) PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
PushOverlapBufferAll appends to the BUFFER_ALL queue.
func (*RedisStore) ReadGroup ¶ added in v0.1.10
func (s *RedisStore) ReadGroup(ctx context.Context, group string) ([]types.GroupMessage, error)
ReadGroup reads all messages from a group WITHOUT deleting them. Use DeleteGroupMessages after successful processing to remove them.
func (*RedisStore) ReenqueueWork ¶
func (s *RedisStore) ReenqueueWork(ctx context.Context, tierName string, wm *types.WorkMessage, redeliveries int) error
ReenqueueWork re-adds a work message to the stream without dedup. Used when a worker picks up a message for a task type it doesn't handle.
func (*RedisStore) SaveBatch ¶
func (s *RedisStore) SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
SaveBatch upserts a batch instance and maintains sorted-set indexes.
func (*RedisStore) SaveBatchInstance ¶
func (s *RedisStore) SaveBatchInstance(ctx context.Context, batch *types.BatchInstance) error
SaveBatchInstance saves a batch instance (alias for history compatibility).
func (*RedisStore) SaveBatchItemResult ¶
func (s *RedisStore) SaveBatchItemResult(ctx context.Context, result *types.BatchItemResult) error
SaveBatchItemResult stores a single batch item result.
func (*RedisStore) SaveJob ¶
SaveJob upserts a job and maintains all secondary indexes. The job hash is updated atomically via a single-key Lua script; secondary indexes are updated individually (best-effort) for Redis Cluster compatibility.
func (*RedisStore) SaveJobEvent ¶
SaveJobEvent persists a job event to the history indexes.
func (*RedisStore) SaveJobRun ¶
SaveJobRun persists a job run to the history indexes.
func (*RedisStore) SaveOverlapBuffer ¶
func (s *RedisStore) SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
SaveOverlapBuffer stores a buffered dispatch timestamp for BUFFER_ONE policy. Overwrites any previous buffered entry.
func (*RedisStore) SaveRun ¶
SaveRun upserts a job run and maintains the active set and by-job index.
func (*RedisStore) SaveSchedule ¶
func (s *RedisStore) SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
SaveSchedule upserts a schedule and updates the due sorted set.
func (*RedisStore) SaveWorkflow ¶
func (s *RedisStore) SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
SaveWorkflow upserts a workflow instance and maintains sorted-set indexes.
func (*RedisStore) SaveWorkflowInstance ¶
func (s *RedisStore) SaveWorkflowInstance(ctx context.Context, wf *types.WorkflowInstance) error
SaveWorkflowInstance saves a workflow instance (alias for history compatibility).
func (*RedisStore) SendSignal ¶ added in v0.1.3
func (s *RedisStore) SendSignal(ctx context.Context, signal *types.WorkflowSignal) error
SendSignal appends a signal to a workflow's signal stream.
func (*RedisStore) SetNodeDrain ¶ added in v0.1.3
SetNodeDrain sets the drain flag for a node. While draining, the worker stops fetching new messages but continues processing in-flight tasks.
func (*RedisStore) SetUniqueKey ¶
func (s *RedisStore) SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
SetUniqueKey atomically claims a unique key for a job ID.
func (*RedisStore) UnpauseQueue ¶
func (s *RedisStore) UnpauseQueue(ctx context.Context, tierName string) error
UnpauseQueue removes the pause flag from a tier/queue.
func (*RedisStore) UpdateBatch ¶
func (s *RedisStore) UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)
UpdateBatch performs a CAS update on a batch instance.
func (*RedisStore) UpdateJob ¶
func (s *RedisStore) UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
UpdateJob performs a CAS update on a job and maintains all secondary indexes. The job hash is updated atomically via a single-key Lua script; secondary indexes are diffed and updated individually.
func (*RedisStore) UpdateWorkflow ¶
func (s *RedisStore) UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)
UpdateWorkflow performs a CAS update on a workflow instance.
type RedisStoreConfig ¶
type RedisStoreConfig struct {
// KeyPrefix is prepended to all Redis keys (e.g., "prod" → "prod_dureq:...").
KeyPrefix string
// NodeTTL is the TTL for node heartbeat entries. Default: 15s.
NodeTTL time.Duration
// ElectionTTL is the TTL for the leader election key. Default: 10s.
ElectionTTL time.Duration
// LockTTL is the default TTL for distributed lock entries. Default: 30s.
LockTTL time.Duration
// EventStreamMaxLen is the max length of the events stream. Default: 50000.
EventStreamMaxLen int64
// DLQStreamMaxLen is the max length of the DLQ stream. Default: 10000.
DLQStreamMaxLen int64
// ResultTTL is the TTL for job result entries. Default: 1h.
ResultTTL time.Duration
// DedupTTL is the TTL for run ID deduplication keys. Default: 5m.
DedupTTL time.Duration
// Tiers is the list of user-defined priority tiers. Default: [normal].
Tiers []TierConfig
}
RedisStoreConfig holds configuration for the Redis store.
type RunFilter ¶
type RunFilter struct {
JobID *string
NodeID *string
Status *types.RunStatus
TaskType *types.TaskType
Tags []string
Since *time.Time
Until *time.Time
Limit int
Offset int
Sort string // "newest" (default) or "oldest"
}
RunFilter specifies criteria for listing job runs.
type ScheduleFilter ¶
ScheduleFilter specifies criteria for listing active schedules.
type StatsFilter ¶
type StatsFilter struct {
TaskType *types.TaskType
Tags []string
Since *time.Time
Until *time.Time
}
StatsFilter specifies criteria for aggregated stats queries.
type Store ¶ added in v0.1.10
type Store interface {
// --- Job CRUD ---
SaveJob(ctx context.Context, job *types.Job) (uint64, error)
CreateJob(ctx context.Context, job *types.Job) (uint64, error)
GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)
UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
DeleteJob(ctx context.Context, jobID string) error
ListJobs(ctx context.Context) ([]*types.Job, error)
ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
GetJobRevision(ctx context.Context, jobID string) (uint64, error)
// --- Schedule ---
SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
DeleteSchedule(ctx context.Context, jobID string) error
ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)
// --- Run ---
SaveRun(ctx context.Context, run *types.JobRun) (uint64, error)
GetRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
DeleteRun(ctx context.Context, runID string) error
ListRuns(ctx context.Context) ([]*types.JobRun, error)
ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
HasActiveRunForJob(ctx context.Context, jobID string) (bool, error)
SaveJobRun(ctx context.Context, run *types.JobRun) error
// --- Workflow ---
SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)
// --- Batch ---
SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)
// --- Node ---
SaveNode(ctx context.Context, node *types.NodeInfo) (uint64, error)
ListNodes(ctx context.Context) ([]*types.NodeInfo, error)
// --- Events ---
PublishEvent(ctx context.Context, event types.JobEvent) error
PublishResult(ctx context.Context, result types.WorkResult) error
GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)
// --- Work dispatch ---
DispatchWork(ctx context.Context, tierName string, msg *types.WorkMessage) (string, error)
AckMessage(ctx context.Context, tierName string, msgID string) error
ReenqueueWork(ctx context.Context, tierName string, msg *types.WorkMessage, redeliveries int) error
AddDelayed(ctx context.Context, tierName string, msg *types.WorkMessage, executeAt time.Time) error
// --- Overlap ---
SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
PopOverlapBuffer(ctx context.Context, jobID string) (time.Time, bool, error)
PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)
// --- Unique keys ---
CheckUniqueKey(ctx context.Context, uniqueKey string) (string, bool, error)
SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
DeleteUniqueKey(ctx context.Context, uniqueKey string) error
// --- Stats ---
IncrDailyStat(ctx context.Context, field string)
// --- GDPR ---
PurgeJobData(ctx context.Context, jobID string) error
}
Store defines the core persistence interface for dureq. RedisStore is the primary implementation; this interface enables testing with in-memory stores and future backend alternatives.
type TierConfig ¶
type TierConfig struct {
// Name is the tier identifier (e.g., "urgent", "normal", "background").
Name string
// Weight controls fetch priority. Higher weight = polled first and more frequently.
Weight int
// FetchBatch is the number of messages to read per XREADGROUP call. Default: 10.
FetchBatch int
// RateLimit, if set, limits how many messages per second can be fetched from this tier.
// Uses a distributed token bucket. Zero = no rate limit.
RateLimit float64
// RateBurst is the maximum burst size for the rate limiter. Default: RateLimit.
RateBurst int
}
TierConfig defines a user-customizable priority tier.
func DefaultTiers ¶
func DefaultTiers() []TierConfig
DefaultTiers returns a sensible default tier configuration.
type WorkflowFilter ¶
type WorkflowFilter struct {
WorkflowName *string
Status *types.WorkflowStatus
Sort string // "newest" or "oldest"
Limit int
Offset int
}
WorkflowFilter specifies criteria for listing workflow instances.