store

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const ConsumerGroup = "dureq_workers"

Consumer group names

View Source
const OrchestratorConsumerGroup = "dureq_orchestrator"

Variables

View Source
var (
	ErrCASConflict   = errors.New("dureq: CAS conflict")
	ErrAlreadyExists = errors.New("dureq: already exists")
)

Sentinel errors for the Redis store.

Functions

func BatchKey

func BatchKey(prefix, id string) string

Batches

func BatchPayloadKey

func BatchPayloadKey(prefix, id string) string

func BatchResultKey

func BatchResultKey(prefix, batchID, itemID string) string

Batch Results

func BatchResultsSetKey

func BatchResultsSetKey(prefix, batchID string) string

func BatchesByCreatedKey

func BatchesByCreatedKey(prefix string) string

func BatchesByStatusKey

func BatchesByStatusKey(prefix, status string) string

func CancelChannelKey

func CancelChannelKey(prefix string) string

Task Cancellation Pub/Sub

func DLQStreamKey

func DLQStreamKey(prefix string) string

DLQ

func DailyStatsKey

func DailyStatsKey(prefix, date string) string

Daily Stats

func DedupKey

func DedupKey(prefix, runID string) string

Deduplication

func DelayedKey

func DelayedKey(prefix, tierName string) string

func ElectionEpochKey added in v0.1.3

func ElectionEpochKey(prefix string) string

func ElectionLeaderKey

func ElectionLeaderKey(prefix string) string

Election Election keys use {election} hash tag so both land in the same Redis Cluster slot, enabling multi-key Lua scripts (elect + refresh with epoch).

func EventsBatchChannel

func EventsBatchChannel(prefix, batchID string) string

func EventsPubSubChannel

func EventsPubSubChannel(prefix string) string

Events

func EventsStreamKey

func EventsStreamKey(prefix string) string

func GroupMessagesKey

func GroupMessagesKey(prefix, group string) string

GroupMessagesKey stores the list of messages for a specific group.

func GroupMetaKey

func GroupMetaKey(prefix, group string) string

GroupMetaKey stores group metadata (first_added, last_added timestamps).

func GroupSetKey

func GroupSetKey(prefix string) string

GroupSetKey stores the sorted set of groups with their first-added timestamp.

func HistoryEventKey

func HistoryEventKey(prefix, eventID string) string

func HistoryEventsByJobKey

func HistoryEventsByJobKey(prefix, jobID string) string

func HistoryEventsByTypeKey

func HistoryEventsByTypeKey(prefix, eventType string) string

func HistoryEventsKey

func HistoryEventsKey(prefix string) string

func HistoryRunKey

func HistoryRunKey(prefix, runID string) string

func HistoryRunsByJobKey

func HistoryRunsByJobKey(prefix, jobID string) string

func HistoryRunsByStatusKey

func HistoryRunsByStatusKey(prefix, status string) string

func HistoryRunsKey

func HistoryRunsKey(prefix string) string

History

func JobAuditKey added in v0.1.3

func JobAuditKey(prefix, jobID string) string

Job Audit Trail (per-job state transition history)

func JobKey

func JobKey(prefix, id string) string

Key patterns for all Redis data structures. Jobs

func JobNotifyChannel

func JobNotifyChannel(prefix string) string

Job Notification Pub/Sub (dureqv2 actor dispatch)

func JobPayloadKey

func JobPayloadKey(prefix, id string) string

JSON Payload mirrors (RedisJSON) — used for JSONPath-based search.

func JobsByBatchKey

func JobsByBatchKey(prefix, batchID string) string

func JobsByCreatedKey

func JobsByCreatedKey(prefix string) string

func JobsByStatusKey

func JobsByStatusKey(prefix, status string) string

func JobsByTagKey

func JobsByTagKey(prefix, tag string) string

func JobsByTaskTypeKey

func JobsByTaskTypeKey(prefix, taskType string) string

func JobsByWorkflowKey

func JobsByWorkflowKey(prefix, wfID string) string

func LockKey

func LockKey(prefix, key string) string

Locks

func LuaExtendLockScript

func LuaExtendLockScript() string

func LuaFlushGroupScript added in v0.1.3

func LuaFlushGroupScript() string

func LuaLeaderElectWithEpochScript added in v0.1.3

func LuaLeaderElectWithEpochScript() string

func LuaLeaderRefreshScript

func LuaLeaderRefreshScript() string

func LuaLeaderRefreshWithEpochScript added in v0.1.3

func LuaLeaderRefreshWithEpochScript() string

func LuaUnlockIfOwnerScript

func LuaUnlockIfOwnerScript() string

func NodeDrainKey added in v0.1.3

func NodeDrainKey(prefix, nodeID string) string

Node Drain Flag

func NodeKey

func NodeKey(prefix, nodeID string) string

Nodes

func NodesAllKey

func NodesAllKey(prefix string) string

func OverlapBufferListKey

func OverlapBufferListKey(prefix, jobID string) string

func OverlapBufferedKey

func OverlapBufferedKey(prefix, jobID string) string

Overlap buffer (schedule overlap policy)

func ParseRedisInfo

func ParseRedisInfo(raw string) map[string]map[string]string

ParseRedisInfo parses raw Redis INFO output into a map of sections.

func QueuePausedKey

func QueuePausedKey(prefix, tierName string) string

Queue Pause (hash-tagged with tier for cluster slot co-location)

func RequestDedupKey added in v0.1.3

func RequestDedupKey(prefix, reqID string) string

func ResultKey

func ResultKey(prefix, jobID string) string

Results

func ResultNotifyChannel

func ResultNotifyChannel(prefix, jobID string) string

func RunKey

func RunKey(prefix, runID string) string

Runs

func RunsActiveByJobKey

func RunsActiveByJobKey(prefix, jobID string) string

func RunsActiveKey

func RunsActiveKey(prefix string) string

func RunsByJobKey

func RunsByJobKey(prefix, jobID string) string

func ScheduleKey

func ScheduleKey(prefix, jobID string) string

Schedules

func SchedulesByNextKey

func SchedulesByNextKey(prefix string) string

func SchedulesDueKey

func SchedulesDueKey(prefix string) string

func TiersKey

func TiersKey(prefix string) string

func UniqueKeyKey

func UniqueKeyKey(prefix, uniqueKey string) string

func WorkMessageFromStreamValues

func WorkMessageFromStreamValues(values map[string]string) types.WorkMessage

WorkMessageFromStreamValues constructs a WorkMessage from Redis stream field map.

func WorkStreamKey

func WorkStreamKey(prefix, tierName string) string

Work Streams (per tier) Hash-tagged keys: {tierName} ensures delayed + work + paused co-locate to the same Redis Cluster slot, enabling multi-key Lua scripts.

func WorkflowEventsKey added in v0.1.3

func WorkflowEventsKey(prefix, wfID string) string

Per-Workflow Event Index (sorted set: score=timestamp, member=event stream ID)

func WorkflowKey

func WorkflowKey(prefix, id string) string

Workflows

func WorkflowPayloadKey

func WorkflowPayloadKey(prefix, id string) string

func WorkflowSignalStreamKey added in v0.1.3

func WorkflowSignalStreamKey(prefix, wfID string) string

Workflow Signals

func WorkflowsByCreatedKey

func WorkflowsByCreatedKey(prefix string) string

func WorkflowsByStatusKey

func WorkflowsByStatusKey(prefix, status string) string

Types

type AuditEntry added in v0.1.3

type AuditEntry struct {
	ID        string    `json:"id"`
	Status    string    `json:"status"`
	Error     string    `json:"error,omitempty"`
	Timestamp time.Time `json:"timestamp"`
}

AuditEntry represents a single state transition in a job's audit trail.

type BatchFilter

type BatchFilter struct {
	Name   *string
	Status *types.WorkflowStatus
	Sort   string // "newest" or "oldest"
	Limit  int
	Offset int
}

BatchFilter specifies criteria for listing batch instances.

type CompletionBatch

type CompletionBatch struct {
	Run            *types.JobRun
	Event          types.JobEvent
	Result         types.WorkResult
	DailyStatField string // "processed" or "failed"
	AckTierName    string // tier name for XACK (v1 only, empty for v2)
	AckMessageID   string // stream message ID for XACK (v1 only)
}

CompletionBatch holds pre-computed data for a batched job completion write.

type DailyStats

type DailyStats struct {
	Date      string `json:"date"`
	Processed int64  `json:"processed"`
	Failed    int64  `json:"failed"`
}

DailyStats holds processed/failed counts for a single date.

type DelayedPoller

type DelayedPoller struct {
	// contains filtered or unexported fields
}

DelayedPoller polls delayed sorted sets and moves ripe messages back to work streams.

func NewDelayedPoller

func NewDelayedPoller(store *RedisStore, logger gochainedlog.Logger) *DelayedPoller

NewDelayedPoller creates a delayed retry poller.

func (*DelayedPoller) Start

func (p *DelayedPoller) Start(ctx context.Context)

Start begins polling in the background. Blocks until ctx is cancelled.

type EventFilter

type EventFilter struct {
	JobID     *string
	EventType *types.EventType
	Since     *time.Time
	Until     *time.Time
	Limit     int
	Offset    int
}

EventFilter specifies criteria for listing job events.

type JobFilter

type JobFilter struct {
	Status   *types.JobStatus
	TaskType *types.TaskType
	Tag      *string
	Search   *string // substring match on ID or TaskType
	Sort     string  // "newest" or "oldest"
	Limit    int
	Offset   int
}

JobFilter specifies criteria for listing active jobs.

type JobStats

type JobStats struct {
	TotalJobs   int64                     `json:"total_jobs"`
	TotalRuns   int64                     `json:"total_runs"`
	ByStatus    map[types.JobStatus]int64 `json:"by_status"`
	ByTaskType  map[types.TaskType]int64  `json:"by_task_type"`
	AvgDuration time.Duration             `json:"avg_duration"`
	SuccessRate float64                   `json:"success_rate"`
	FailureRate float64                   `json:"failure_rate"`
}

JobStats holds aggregated statistics.

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore manages all dureq state in Redis, replacing both NATSStore and HistoryStore.

func NewRedisStore

func NewRedisStore(rdb rueidis.Client, cfg RedisStoreConfig, logger gochainedlog.Logger) (*RedisStore, error)

NewRedisStore creates a new Redis store and registers tier configuration.

func (*RedisStore) AckMessage

func (s *RedisStore) AckMessage(ctx context.Context, tierName, messageID string) error

AckMessage acknowledges a message in a tier's consumer group.

func (*RedisStore) AddDelayed

func (s *RedisStore) AddDelayed(ctx context.Context, tierName string, wm *types.WorkMessage, executeAt time.Time) error

AddDelayed adds a work message to the delayed sorted set for future re-dispatch.

func (*RedisStore) AddToGroup

func (s *RedisStore) AddToGroup(ctx context.Context, group string, msg types.GroupMessage) (int64, error)

AddToGroup adds a message to a named group. Returns the current group size.

func (*RedisStore) CancelChannel

func (s *RedisStore) CancelChannel() string

CancelChannel returns the Redis Pub/Sub channel name used for task cancellation signals.

func (*RedisStore) CheckUniqueKey

func (s *RedisStore) CheckUniqueKey(ctx context.Context, uniqueKey string) (string, bool, error)

CheckUniqueKey checks whether a unique key is claimed. Returns (jobID, exists, error).

func (*RedisStore) ClaimStaleMessages

func (s *RedisStore) ClaimStaleMessages(ctx context.Context, tierName, consumerName string, minIdleTime time.Duration, count int64) ([]rueidis.XRangeEntry, error)

ClaimStaleMessages reclaims messages that have been pending longer than minIdleTime. This handles worker crashes — other nodes pick up abandoned work.

func (*RedisStore) Cleanup

func (s *RedisStore) Cleanup(ctx context.Context, olderThan time.Time) (int64, error)

Cleanup removes history entries older than the given time.

func (*RedisStore) Client

func (s *RedisStore) Client() rueidis.Client

func (*RedisStore) Close

func (s *RedisStore) Close() error

func (*RedisStore) CompleteRun

func (s *RedisStore) CompleteRun(ctx context.Context, batch *CompletionBatch) error

CompleteRun performs SaveRun + SaveJobRun + DeleteRun + IncrDailyStat + PublishEvent + PublishResult + AckMessage. Slot-targeted commands are pipelined via DoMulti; PUBLISH (no-slot) commands are sent separately for Redis Cluster compatibility.

func (*RedisStore) Config

func (s *RedisStore) Config() RedisStoreConfig

func (*RedisStore) ConsumeSignals added in v0.1.3

func (s *RedisStore) ConsumeSignals(ctx context.Context, workflowID string) ([]types.WorkflowSignal, error)

ConsumeSignals reads and acknowledges all pending signals for a workflow. Returns the signals and removes them from the stream.

func (*RedisStore) CreateJob

func (s *RedisStore) CreateJob(ctx context.Context, job *types.Job) (uint64, error)

CreateJob atomically creates a job only if it doesn't already exist. Secondary indexes are added individually after creation.

func (*RedisStore) DeleteBatch

func (s *RedisStore) DeleteBatch(ctx context.Context, batchID string) error

DeleteBatch removes a batch instance and its indexes.

func (*RedisStore) DeleteGroupMessages added in v0.1.10

func (s *RedisStore) DeleteGroupMessages(ctx context.Context, group string)

DeleteGroupMessages atomically deletes all messages for a group and cleans up metadata. Should be called only after successful processing of ReadGroup results.

func (*RedisStore) DeleteJob

func (s *RedisStore) DeleteJob(ctx context.Context, jobID string) error

DeleteJob removes a job and cleans all secondary indexes individually.

func (*RedisStore) DeleteJobAuditTrail added in v0.1.3

func (s *RedisStore) DeleteJobAuditTrail(ctx context.Context, jobID string)

DeleteJobAuditTrail removes the audit stream for a job (cleanup).

func (*RedisStore) DeleteRun

func (s *RedisStore) DeleteRun(ctx context.Context, runID string) error

DeleteRun removes a run and cleans its indexes.

func (*RedisStore) DeleteSchedule

func (s *RedisStore) DeleteSchedule(ctx context.Context, jobID string) error

DeleteSchedule removes a schedule and its index entry.

func (*RedisStore) DeleteSignalStream added in v0.1.3

func (s *RedisStore) DeleteSignalStream(ctx context.Context, workflowID string)

DeleteSignalStream removes the signal stream for a workflow (cleanup).

func (*RedisStore) DeleteUniqueKey

func (s *RedisStore) DeleteUniqueKey(ctx context.Context, uniqueKey string) error

DeleteUniqueKey releases a unique key.

func (*RedisStore) DeleteWorkflow

func (s *RedisStore) DeleteWorkflow(ctx context.Context, id string) error

DeleteWorkflow removes a workflow instance and its indexes.

func (*RedisStore) DeleteWorkflowEvents added in v0.1.3

func (s *RedisStore) DeleteWorkflowEvents(ctx context.Context, workflowID string)

DeleteWorkflowEvents removes the per-workflow event index (cleanup).

func (*RedisStore) DispatchToDLQ

func (s *RedisStore) DispatchToDLQ(ctx context.Context, wm *types.WorkMessage) error

DispatchToDLQ adds a message to the dead letter queue stream.

func (*RedisStore) DispatchWork

func (s *RedisStore) DispatchWork(ctx context.Context, tierName string, wm *types.WorkMessage) (string, error)

DispatchWork adds a work message to the tier-specific Redis Stream.

func (*RedisStore) EnsureStreams

func (s *RedisStore) EnsureStreams(ctx context.Context) error

EnsureStreams creates consumer groups for all configured tier streams and the DLQ stream. Idempotent — ignores "BUSYGROUP" errors when group already exists.

func (*RedisStore) FindBatchByPayloadPath

func (s *RedisStore) FindBatchByPayloadPath(ctx context.Context, jsonPath string, value any) (*types.BatchInstance, uint64, error)

FindBatchByPayloadPath scans all batches and returns the first where onetime or any item Payload matches the given JSONPath + value.

func (*RedisStore) FindJobByPayloadPath

func (s *RedisStore) FindJobByPayloadPath(ctx context.Context, jsonPath string, value any, taskType string) (*types.Job, uint64, error)

FindJobByPayloadPath scans jobs and returns the first whose Payload matches the given JSONPath + value. Uses RedisJSON's JSON.GET for native path extraction. If taskType is non-empty, only jobs of that type are scanned (via the by_task_type index).

func (*RedisStore) FindWorkflowByTaskPayloadPath

func (s *RedisStore) FindWorkflowByTaskPayloadPath(ctx context.Context, jsonPath string, value any) (*types.WorkflowInstance, uint64, error)

FindWorkflowByTaskPayloadPath scans all workflows and returns the first where any task's Payload matches the given JSONPath + value.

func (*RedisStore) FlushGroup

func (s *RedisStore) FlushGroup(ctx context.Context, group string) ([]types.GroupMessage, error)

FlushGroup atomically reads and deletes all messages from a group. Uses a single-key Lua script so that concurrent callers on different nodes are safe: only one caller will receive the messages; others get an empty result. Cleanup of the metadata hash and active set is done separately (idempotent).

func (*RedisStore) GetActiveJobStats

func (s *RedisStore) GetActiveJobStats(ctx context.Context) (map[types.JobStatus]int, error)

GetActiveJobStats returns aggregated counts by job status (alias for monitor API).

func (*RedisStore) GetBatch

func (s *RedisStore) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)

GetBatch retrieves a batch instance by ID.

func (*RedisStore) GetBatchByID

func (s *RedisStore) GetBatchByID(ctx context.Context, id string) (*types.BatchInstance, error)

GetBatchByID retrieves a batch instance by ID (convenience for monitor API).

func (*RedisStore) GetBatchItemResult

func (s *RedisStore) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)

GetBatchItemResult retrieves a single batch item result.

func (*RedisStore) GetDailyStats

func (s *RedisStore) GetDailyStats(ctx context.Context, days int) ([]DailyStats, error)

GetDailyStats returns daily stats for the last N days.

func (*RedisStore) GetGroupSize

func (s *RedisStore) GetGroupSize(ctx context.Context, group string) (int64, error)

GetGroupSize returns the number of messages in a group.

func (*RedisStore) GetJob

func (s *RedisStore) GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)

GetJob retrieves a job by ID.

func (*RedisStore) GetJobAuditCount added in v0.1.3

func (s *RedisStore) GetJobAuditCount(ctx context.Context, jobID string) (int64, error)

GetJobAuditCount returns the number of audit entries for a job (XLEN).

func (*RedisStore) GetJobAuditCounts added in v0.1.3

func (s *RedisStore) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)

GetJobAuditCounts returns audit entry counts for multiple jobs via pipelined XLEN.

func (*RedisStore) GetJobAuditTrail added in v0.1.3

func (s *RedisStore) GetJobAuditTrail(ctx context.Context, jobID string) ([]AuditEntry, error)

GetJobAuditTrail returns the state transition history for a job.

func (*RedisStore) GetJobRevision added in v0.1.10

func (s *RedisStore) GetJobRevision(ctx context.Context, jobID string) (uint64, error)

GetJobRevision returns only the CAS revision for a job (without full deserialization).

func (*RedisStore) GetJobStats

func (s *RedisStore) GetJobStats(ctx context.Context, _ StatsFilter) (*JobStats, error)

GetJobStats returns aggregated job statistics.

func (*RedisStore) GetNode

func (s *RedisStore) GetNode(ctx context.Context, nodeID string) (*types.NodeInfo, error)

GetNode retrieves a node by ID, returns nil if not found or TTL expired.

func (*RedisStore) GetResult

func (s *RedisStore) GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)

GetResult retrieves a stored work result by job ID.

func (*RedisStore) GetRun

func (s *RedisStore) GetRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)

GetRun retrieves a job run by ID.

func (*RedisStore) GetSchedule

func (s *RedisStore) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)

GetSchedule retrieves a schedule entry by job ID.

func (*RedisStore) GetWorkflow

func (s *RedisStore) GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)

GetWorkflow retrieves a workflow instance by ID.

func (*RedisStore) GetWorkflowByID

func (s *RedisStore) GetWorkflowByID(ctx context.Context, id string) (*types.WorkflowInstance, error)

GetWorkflowByID retrieves a workflow instance by ID (convenience for monitor API).

func (*RedisStore) GetWorkflowEvents added in v0.1.3

func (s *RedisStore) GetWorkflowEvents(ctx context.Context, workflowID string) ([]types.JobEvent, error)

GetWorkflowEvents returns all indexed events for a workflow, ordered by timestamp.

func (*RedisStore) HasActiveRunForJob

func (s *RedisStore) HasActiveRunForJob(ctx context.Context, jobID string) (bool, error)

HasActiveRunForJob returns true if any non-terminal run exists for the given jobID. Uses the per-job active run set for O(1) check.

func (*RedisStore) IncrDailyStat

func (s *RedisStore) IncrDailyStat(ctx context.Context, field string)

IncrDailyStat atomically increments the processed or failed counter for today.

func (*RedisStore) IsNodeDraining added in v0.1.3

func (s *RedisStore) IsNodeDraining(ctx context.Context, nodeID string) (bool, error)

IsNodeDraining returns true if the node is in drain mode.

func (*RedisStore) IsQueuePaused

func (s *RedisStore) IsQueuePaused(ctx context.Context, tierName string) (bool, error)

IsQueuePaused checks whether a tier/queue is currently paused.

func (*RedisStore) JobStatusCounts

func (s *RedisStore) JobStatusCounts(ctx context.Context) (map[types.JobStatus]int, error)

JobStatusCounts returns job counts by status using ZCARD on index sets.

func (*RedisStore) ListActiveRunsByJobID

func (s *RedisStore) ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)

ListActiveRunsByJobID returns all active runs belonging to a specific job. Uses the per-job active run set (RunsActiveByJobKey) for O(per-job) lookup.

func (*RedisStore) ListBatchInstances

func (s *RedisStore) ListBatchInstances(ctx context.Context, filter BatchFilter) ([]types.BatchInstance, int, error)

ListBatchInstances returns paginated batch instances.

func (*RedisStore) ListBatchItemResults

func (s *RedisStore) ListBatchItemResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)

ListBatchItemResults returns all item results for a batch.

func (*RedisStore) ListBatches

func (s *RedisStore) ListBatches(ctx context.Context) ([]*types.BatchInstance, error)

ListBatches returns all batch instances.

func (*RedisStore) ListBatchesByStatus added in v0.1.3

func (s *RedisStore) ListBatchesByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.BatchInstance, error)

ListBatchesByStatus returns batch instances with the given status.

func (*RedisStore) ListDLQ

func (s *RedisStore) ListDLQ(ctx context.Context, limit int) ([]*types.WorkMessage, error)

ListDLQ reads recent messages from the DLQ stream.

func (*RedisStore) ListDueSchedules

func (s *RedisStore) ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)

ListDueSchedules returns schedules where NextRunAt <= cutoff. O(log N + M) using sorted set.

func (*RedisStore) ListGroups

func (s *RedisStore) ListGroups(ctx context.Context) ([]string, error)

ListGroups returns all active group names.

func (*RedisStore) ListJobEvents

func (s *RedisStore) ListJobEvents(ctx context.Context, filter EventFilter) ([]types.JobEvent, int, error)

ListJobEvents returns paginated job events from history.

func (*RedisStore) ListJobRuns

func (s *RedisStore) ListJobRuns(ctx context.Context, filter RunFilter) ([]types.JobRun, int, error)

ListJobRuns returns paginated job runs from history.

func (*RedisStore) ListJobs

func (s *RedisStore) ListJobs(ctx context.Context) ([]*types.Job, error)

ListJobs returns all jobs (use sparingly for large datasets).

func (*RedisStore) ListJobsByStatus added in v0.1.3

func (s *RedisStore) ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)

ListJobsByStatus returns all jobs with the given status.

func (*RedisStore) ListJobsPaginated

func (s *RedisStore) ListJobsPaginated(ctx context.Context, filter JobFilter) ([]types.Job, int, error)

ListJobsPaginated returns a paginated, filtered list of jobs backed by Redis sorted sets.

func (*RedisStore) ListNodes

func (s *RedisStore) ListNodes(ctx context.Context) ([]*types.NodeInfo, error)

ListNodes returns all live nodes, pruning stale entries from the tracking set.

func (*RedisStore) ListPausedJobsDueForResume added in v0.1.10

func (s *RedisStore) ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)

ListPausedJobsDueForResume returns paused jobs whose ResumeAt time has passed.

func (*RedisStore) ListPausedQueues

func (s *RedisStore) ListPausedQueues(ctx context.Context) ([]string, error)

ListPausedQueues returns the names of all currently paused queues.

func (*RedisStore) ListReadyGroups

func (s *RedisStore) ListReadyGroups(ctx context.Context, gracePeriod, maxDelay time.Duration, maxSize int) ([]string, error)

ListReadyGroups returns groups that are ready to be flushed based on the given criteria.

func (*RedisStore) ListRuns

func (s *RedisStore) ListRuns(ctx context.Context) ([]*types.JobRun, error)

ListRuns returns all active (non-terminal) runs.

func (*RedisStore) ListSchedules

func (s *RedisStore) ListSchedules(ctx context.Context) ([]*types.ScheduleEntry, error)

ListSchedules returns all active schedule entries.

func (*RedisStore) ListSchedulesPaginated

func (s *RedisStore) ListSchedulesPaginated(ctx context.Context, filter ScheduleFilter) ([]types.ScheduleEntry, int, error)

ListSchedulesPaginated returns a paginated list of schedules.

func (*RedisStore) ListWorkflowInstances

func (s *RedisStore) ListWorkflowInstances(ctx context.Context, filter WorkflowFilter) ([]types.WorkflowInstance, int, error)

ListWorkflowInstances returns paginated workflow instances.

func (*RedisStore) ListWorkflows

func (s *RedisStore) ListWorkflows(ctx context.Context) ([]*types.WorkflowInstance, error)

ListWorkflows returns all workflow instances.

func (*RedisStore) ListWorkflowsByStatus added in v0.1.3

func (s *RedisStore) ListWorkflowsByStatus(ctx context.Context, status types.WorkflowStatus) ([]*types.WorkflowInstance, error)

ListWorkflowsByStatus returns workflow instances with the given status.

func (*RedisStore) MoveDelayedToStream

func (s *RedisStore) MoveDelayedToStream(ctx context.Context, tierName string, maxMove int) (int64, error)

MoveDelayedToStream atomically moves ripe delayed messages back to the work stream. Returns the number of messages moved.

func (*RedisStore) PauseQueue

func (s *RedisStore) PauseQueue(ctx context.Context, tierName string) error

PauseQueue marks a tier/queue as paused. Workers will skip paused queues.

func (*RedisStore) Ping

func (s *RedisStore) Ping(ctx context.Context) error

func (*RedisStore) PopOverlapBuffer

func (s *RedisStore) PopOverlapBuffer(ctx context.Context, jobID string) (time.Time, bool, error)

PopOverlapBuffer retrieves and deletes the buffered dispatch for BUFFER_ONE.

func (*RedisStore) PopOverlapBufferAll

func (s *RedisStore) PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)

PopOverlapBufferAll pops the next buffered dispatch from BUFFER_ALL.

func (*RedisStore) Prefix

func (s *RedisStore) Prefix() string

func (*RedisStore) PublishEvent

func (s *RedisStore) PublishEvent(ctx context.Context, event types.JobEvent) error

PublishEvent publishes an event to both Pub/Sub (real-time) and the event stream (history).

func (*RedisStore) PublishJobNotification

func (s *RedisStore) PublishJobNotification(ctx context.Context, jobID, taskType string, priority int) error

PublishJobNotification publishes a lightweight notification to the job:notify Pub/Sub channel. The NotifierActor subscribes to this channel and forwards it to the DispatcherActor.

func (*RedisStore) PublishResult

func (s *RedisStore) PublishResult(ctx context.Context, result types.WorkResult) error

PublishResult stores a work result and notifies waiting clients.

func (*RedisStore) PurgeJobData added in v0.1.10

func (s *RedisStore) PurgeJobData(ctx context.Context, jobID string) error

PurgeJobData removes the payload and result data from a job while preserving metadata (status, timestamps, task type, etc.). This supports GDPR "right to erasure" by deleting personal data while maintaining audit trails.

func (*RedisStore) PushOverlapBufferAll

func (s *RedisStore) PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error

PushOverlapBufferAll appends to the BUFFER_ALL queue.

func (*RedisStore) ReadGroup added in v0.1.10

func (s *RedisStore) ReadGroup(ctx context.Context, group string) ([]types.GroupMessage, error)

ReadGroup reads all messages from a group WITHOUT deleting them. Use DeleteGroupMessages after successful processing to remove them.

func (*RedisStore) ReenqueueWork

func (s *RedisStore) ReenqueueWork(ctx context.Context, tierName string, wm *types.WorkMessage, redeliveries int) error

ReenqueueWork re-adds a work message to the stream without dedup. Used when a worker picks up a message for a task type it doesn't handle.

func (*RedisStore) SaveBatch

func (s *RedisStore) SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)

SaveBatch upserts a batch instance and maintains sorted-set indexes.

func (*RedisStore) SaveBatchInstance

func (s *RedisStore) SaveBatchInstance(ctx context.Context, batch *types.BatchInstance) error

SaveBatchInstance saves a batch instance (alias for history compatibility).

func (*RedisStore) SaveBatchItemResult

func (s *RedisStore) SaveBatchItemResult(ctx context.Context, result *types.BatchItemResult) error

SaveBatchItemResult stores a single batch item result.

func (*RedisStore) SaveJob

func (s *RedisStore) SaveJob(ctx context.Context, job *types.Job) (uint64, error)

SaveJob upserts a job and maintains all secondary indexes. The job hash is updated atomically via a single-key Lua script; secondary indexes are updated individually (best-effort) for Redis Cluster compatibility.

func (*RedisStore) SaveJobEvent

func (s *RedisStore) SaveJobEvent(ctx context.Context, event *types.JobEvent) error

SaveJobEvent persists a job event to the history indexes.

func (*RedisStore) SaveJobRun

func (s *RedisStore) SaveJobRun(ctx context.Context, run *types.JobRun) error

SaveJobRun persists a job run to the history indexes.

func (*RedisStore) SaveNode

func (s *RedisStore) SaveNode(ctx context.Context, node *types.NodeInfo) (uint64, error)

SaveNode upserts a node with TTL for automatic dead-node cleanup.

func (*RedisStore) SaveOverlapBuffer

func (s *RedisStore) SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error

SaveOverlapBuffer stores a buffered dispatch timestamp for BUFFER_ONE policy. Overwrites any previous buffered entry.

func (*RedisStore) SaveRun

func (s *RedisStore) SaveRun(ctx context.Context, run *types.JobRun) (uint64, error)

SaveRun upserts a job run and maintains the active set and by-job index.

func (*RedisStore) SaveSchedule

func (s *RedisStore) SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)

SaveSchedule upserts a schedule and updates the due sorted set.

func (*RedisStore) SaveWorkflow

func (s *RedisStore) SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)

SaveWorkflow upserts a workflow instance and maintains sorted-set indexes.

func (*RedisStore) SaveWorkflowInstance

func (s *RedisStore) SaveWorkflowInstance(ctx context.Context, wf *types.WorkflowInstance) error

SaveWorkflowInstance saves a workflow instance (alias for history compatibility).

func (*RedisStore) SendSignal added in v0.1.3

func (s *RedisStore) SendSignal(ctx context.Context, signal *types.WorkflowSignal) error

SendSignal appends a signal to a workflow's signal stream.

func (*RedisStore) SetNodeDrain added in v0.1.3

func (s *RedisStore) SetNodeDrain(ctx context.Context, nodeID string, drain bool) error

SetNodeDrain sets the drain flag for a node. While draining, the worker stops fetching new messages but continues processing in-flight tasks.

func (*RedisStore) SetUniqueKey

func (s *RedisStore) SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error

SetUniqueKey atomically claims a unique key for a job ID.

func (*RedisStore) UnpauseQueue

func (s *RedisStore) UnpauseQueue(ctx context.Context, tierName string) error

UnpauseQueue removes the pause flag from a tier/queue.

func (*RedisStore) UpdateBatch

func (s *RedisStore) UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)

UpdateBatch performs a CAS update on a batch instance.

func (*RedisStore) UpdateJob

func (s *RedisStore) UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)

UpdateJob performs a CAS update on a job and maintains all secondary indexes. The job hash is updated atomically via a single-key Lua script; secondary indexes are diffed and updated individually.

func (*RedisStore) UpdateWorkflow

func (s *RedisStore) UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)

UpdateWorkflow performs a CAS update on a workflow instance.

type RedisStoreConfig

type RedisStoreConfig struct {
	// KeyPrefix is prepended to all Redis keys (e.g., "prod" → "prod_dureq:...").
	KeyPrefix string

	// NodeTTL is the TTL for node heartbeat entries. Default: 15s.
	NodeTTL time.Duration

	// ElectionTTL is the TTL for the leader election key. Default: 10s.
	ElectionTTL time.Duration

	// LockTTL is the default TTL for distributed lock entries. Default: 30s.
	LockTTL time.Duration

	// EventStreamMaxLen is the max length of the events stream. Default: 50000.
	EventStreamMaxLen int64

	// DLQStreamMaxLen is the max length of the DLQ stream. Default: 10000.
	DLQStreamMaxLen int64

	// ResultTTL is the TTL for job result entries. Default: 1h.
	ResultTTL time.Duration

	// DedupTTL is the TTL for run ID deduplication keys. Default: 5m.
	DedupTTL time.Duration

	// Tiers is the list of user-defined priority tiers. Default: [normal].
	Tiers []TierConfig
}

RedisStoreConfig holds configuration for the Redis store.

type RunFilter

type RunFilter struct {
	JobID    *string
	NodeID   *string
	Status   *types.RunStatus
	TaskType *types.TaskType
	Tags     []string
	Since    *time.Time
	Until    *time.Time
	Limit    int
	Offset   int
	Sort     string // "newest" (default) or "oldest"
}

RunFilter specifies criteria for listing job runs.

type ScheduleFilter

type ScheduleFilter struct {
	Sort   string // "newest" or "oldest"
	Limit  int
	Offset int
}

ScheduleFilter specifies criteria for listing active schedules.

type StatsFilter

type StatsFilter struct {
	TaskType *types.TaskType
	Tags     []string
	Since    *time.Time
	Until    *time.Time
}

StatsFilter specifies criteria for aggregated stats queries.

type Store added in v0.1.10

type Store interface {
	// --- Job CRUD ---
	SaveJob(ctx context.Context, job *types.Job) (uint64, error)
	CreateJob(ctx context.Context, job *types.Job) (uint64, error)
	GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)
	UpdateJob(ctx context.Context, job *types.Job, expectedRev uint64) (uint64, error)
	DeleteJob(ctx context.Context, jobID string) error
	ListJobs(ctx context.Context) ([]*types.Job, error)
	ListJobsByStatus(ctx context.Context, status types.JobStatus) ([]*types.Job, error)
	ListPausedJobsDueForResume(ctx context.Context, now time.Time) ([]*types.Job, error)
	GetJobRevision(ctx context.Context, jobID string) (uint64, error)

	// --- Schedule ---
	SaveSchedule(ctx context.Context, entry *types.ScheduleEntry) (uint64, error)
	GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, uint64, error)
	DeleteSchedule(ctx context.Context, jobID string) error
	ListDueSchedules(ctx context.Context, cutoff time.Time) ([]*types.ScheduleEntry, error)

	// --- Run ---
	SaveRun(ctx context.Context, run *types.JobRun) (uint64, error)
	GetRun(ctx context.Context, runID string) (*types.JobRun, uint64, error)
	DeleteRun(ctx context.Context, runID string) error
	ListRuns(ctx context.Context) ([]*types.JobRun, error)
	ListActiveRunsByJobID(ctx context.Context, jobID string) ([]*types.JobRun, error)
	HasActiveRunForJob(ctx context.Context, jobID string) (bool, error)
	SaveJobRun(ctx context.Context, run *types.JobRun) error

	// --- Workflow ---
	SaveWorkflow(ctx context.Context, wf *types.WorkflowInstance) (uint64, error)
	GetWorkflow(ctx context.Context, id string) (*types.WorkflowInstance, uint64, error)
	UpdateWorkflow(ctx context.Context, wf *types.WorkflowInstance, expectedRev uint64) (uint64, error)

	// --- Batch ---
	SaveBatch(ctx context.Context, batch *types.BatchInstance) (uint64, error)
	GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, uint64, error)
	UpdateBatch(ctx context.Context, batch *types.BatchInstance, expectedRev uint64) (uint64, error)

	// --- Node ---
	SaveNode(ctx context.Context, node *types.NodeInfo) (uint64, error)
	ListNodes(ctx context.Context) ([]*types.NodeInfo, error)

	// --- Events ---
	PublishEvent(ctx context.Context, event types.JobEvent) error
	PublishResult(ctx context.Context, result types.WorkResult) error
	GetResult(ctx context.Context, jobID string) (*types.WorkResult, error)

	// --- Work dispatch ---
	DispatchWork(ctx context.Context, tierName string, msg *types.WorkMessage) (string, error)
	AckMessage(ctx context.Context, tierName string, msgID string) error
	ReenqueueWork(ctx context.Context, tierName string, msg *types.WorkMessage, redeliveries int) error
	AddDelayed(ctx context.Context, tierName string, msg *types.WorkMessage, executeAt time.Time) error

	// --- Overlap ---
	SaveOverlapBuffer(ctx context.Context, jobID string, nextRunAt time.Time) error
	PopOverlapBuffer(ctx context.Context, jobID string) (time.Time, bool, error)
	PushOverlapBufferAll(ctx context.Context, jobID string, nextRunAt time.Time) error
	PopOverlapBufferAll(ctx context.Context, jobID string) (time.Time, bool, error)

	// --- Unique keys ---
	CheckUniqueKey(ctx context.Context, uniqueKey string) (string, bool, error)
	SetUniqueKey(ctx context.Context, uniqueKey, jobID string) error
	DeleteUniqueKey(ctx context.Context, uniqueKey string) error

	// --- Stats ---
	IncrDailyStat(ctx context.Context, field string)

	// --- GDPR ---
	PurgeJobData(ctx context.Context, jobID string) error
}

Store defines the core persistence interface for dureq. RedisStore is the primary implementation; this interface enables testing with in-memory stores and future backend alternatives.

type TierConfig

type TierConfig struct {
	// Name is the tier identifier (e.g., "urgent", "normal", "background").
	Name string

	// Weight controls fetch priority. Higher weight = polled first and more frequently.
	Weight int

	// FetchBatch is the number of messages to read per XREADGROUP call. Default: 10.
	FetchBatch int

	// RateLimit, if set, limits how many messages per second can be fetched from this tier.
	// Uses a distributed token bucket. Zero = no rate limit.
	RateLimit float64

	// RateBurst is the maximum burst size for the rate limiter. Default: RateLimit.
	RateBurst int
}

TierConfig defines a user-customizable priority tier.

func DefaultTiers

func DefaultTiers() []TierConfig

DefaultTiers returns a sensible default tier configuration.

type WorkflowFilter

type WorkflowFilter struct {
	WorkflowName *string
	Status       *types.WorkflowStatus
	Sort         string // "newest" or "oldest"
	Limit        int
	Offset       int
}

WorkflowFilter specifies criteria for listing workflow instances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL