Documentation
¶
Index ¶
- type Client
- func (c *Client) BackfillSchedule(ctx context.Context, jobID string, startTime, endTime time.Time) (int, error)
- func (c *Client) Cancel(ctx context.Context, jobID string) error
- func (c *Client) CancelBatch(ctx context.Context, batchID string) error
- func (c *Client) CancelByUniqueKey(ctx context.Context, uniqueKey string) error
- func (c *Client) CancelWorkflow(ctx context.Context, workflowID string) error
- func (c *Client) Close()
- func (c *Client) DrainNode(ctx context.Context, nodeID string, drain bool) error
- func (c *Client) Enqueue(ctx context.Context, taskType types.TaskType, payload any) (*types.Job, error)
- func (c *Client) EnqueueAndWait(ctx context.Context, taskType types.TaskType, payload any, ...) (*types.WorkResult, error)
- func (c *Client) EnqueueBatch(ctx context.Context, def types.BatchDefinition) (*types.BatchInstance, error)
- func (c *Client) EnqueueGroup(ctx context.Context, opt types.EnqueueGroupOption) (int64, error)
- func (c *Client) EnqueueScheduled(ctx context.Context, req *EnqueueRequest) (*types.Job, error)
- func (c *Client) EnqueueWorkflow(ctx context.Context, def types.WorkflowDefinition, input any) (*types.WorkflowInstance, error)
- func (c *Client) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, error)
- func (c *Client) GetBatchByParamJSONPath(ctx context.Context, path string, value any) (*types.BatchInstance, error)
- func (c *Client) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
- func (c *Client) GetJob(ctx context.Context, jobID string) (*types.Job, error)
- func (c *Client) GetJobByParamJSONPath(ctx context.Context, path string, value any) (*types.Job, error)
- func (c *Client) GetJobByParamJSONPathWithTaskType(ctx context.Context, path string, value any, taskType string) (*types.Job, error)
- func (c *Client) GetWorkflow(ctx context.Context, workflowID string) (*types.WorkflowInstance, error)
- func (c *Client) GetWorkflowByParamJSONPath(ctx context.Context, path string, value any) (*types.WorkflowInstance, error)
- func (c *Client) ResumeJob(ctx context.Context, jobID string) error
- func (c *Client) Retry(ctx context.Context, jobID string) error
- func (c *Client) RetryBatch(ctx context.Context, batchID string, retryFailedOnly bool) (*types.BatchInstance, error)
- func (c *Client) RetryWorkflow(ctx context.Context, workflowID string) (*types.WorkflowInstance, error)
- func (c *Client) SignalWorkflow(ctx context.Context, workflowID string, signalName string, payload any) error
- func (c *Client) Status(ctx context.Context, jobID string) (*types.Job, error)
- func (c *Client) Store() *store.RedisStore
- func (c *Client) SubscribeBatchProgress(ctx context.Context, batchID string) (<-chan types.BatchProgress, error)
- type EnqueueRequest
- type Option
- func WithClusterAddrs(addrs []string) Option
- func WithKeyPrefix(prefix string) Option
- func WithPriorityTiers(tiers []store.TierConfig) Option
- func WithRedisClient(rdb rueidis.Client) Option
- func WithRedisDB(db int) Option
- func WithRedisPassword(password string) Option
- func WithRedisPoolSize(n int) Option
- func WithRedisURL(url string) Option
- func WithRedisUsername(username string) Option
- func WithStore(s *store.RedisStore) Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the dureq client SDK for enqueuing jobs and querying status. It connects directly to Redis — no server-side request/reply needed.
func (*Client) BackfillSchedule ¶
func (c *Client) BackfillSchedule(ctx context.Context, jobID string, startTime, endTime time.Time) (int, error)
BackfillSchedule manually triggers all missed firings for a scheduled job within the given time range. Each firing is dispatched as a separate execution. Returns the number of firings dispatched.
func (*Client) CancelBatch ¶
CancelBatch cancels a running batch.
func (*Client) CancelByUniqueKey ¶
CancelByUniqueKey cancels a job identified by its unique key.
func (*Client) CancelWorkflow ¶
CancelWorkflow cancels a running workflow and all its non-terminal tasks.
func (*Client) Close ¶
func (c *Client) Close()
Close closes the client connection (only if we created the Redis client).
func (*Client) DrainNode ¶
DrainNode puts a node into drain mode. The node stops fetching new messages but continues processing in-flight tasks. Call with drain=false to resume.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(ctx context.Context, taskType types.TaskType, payload any) (*types.Job, error)
Enqueue submits a job for immediate execution.
func (*Client) EnqueueAndWait ¶
func (c *Client) EnqueueAndWait(ctx context.Context, taskType types.TaskType, payload any, timeout time.Duration) (*types.WorkResult, error)
EnqueueAndWait submits a job for immediate execution and waits for the result. It subscribes to the result channel BEFORE enqueuing to prevent a race condition where a fast job completes before the subscription is active.
func (*Client) EnqueueBatch ¶
func (c *Client) EnqueueBatch(ctx context.Context, def types.BatchDefinition) (*types.BatchInstance, error)
EnqueueBatch submits a batch definition for processing.
func (*Client) EnqueueGroup ¶
EnqueueGroup adds a task to a named group for aggregated processing. The group is flushed automatically by the server's aggregation processor based on the configured GracePeriod, MaxDelay, or MaxSize. Returns the current group size after adding the message.
func (*Client) EnqueueScheduled ¶
EnqueueScheduled submits a job with a schedule.
func (*Client) EnqueueWorkflow ¶
func (c *Client) EnqueueWorkflow(ctx context.Context, def types.WorkflowDefinition, input any) (*types.WorkflowInstance, error)
EnqueueWorkflow submits a workflow definition for execution.
func (*Client) GetBatchByParamJSONPath ¶
func (c *Client) GetBatchByParamJSONPath(ctx context.Context, path string, value any) (*types.BatchInstance, error)
GetBatchByParamJSONPath finds the first batch where the onetime payload or any item payload matches the given JSONPath + value.
func (*Client) GetBatchResults ¶
func (c *Client) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
GetBatchResults retrieves all item results for a batch.
func (*Client) GetJobByParamJSONPath ¶
func (c *Client) GetJobByParamJSONPath(ctx context.Context, path string, value any) (*types.Job, error)
GetJobByParamJSONPath finds the first job whose Payload matches the given JSONPath + value. Uses RedisJSON's native JSONPath for efficient field extraction. path is a dot-separated field path (e.g. "user_id", "order.type").
func (*Client) GetJobByParamJSONPathWithTaskType ¶ added in v0.1.7
func (c *Client) GetJobByParamJSONPathWithTaskType(ctx context.Context, path string, value any, taskType string) (*types.Job, error)
GetJobByParamJSONPathWithTaskType finds the first job of the given task type whose Payload matches the given JSONPath + value. Only scans jobs of that task type, so it is significantly faster than GetJobByParamJSONPath when the task type is known.
func (*Client) GetWorkflow ¶
func (c *Client) GetWorkflow(ctx context.Context, workflowID string) (*types.WorkflowInstance, error)
GetWorkflow retrieves the current state of a workflow instance.
func (*Client) GetWorkflowByParamJSONPath ¶
func (c *Client) GetWorkflowByParamJSONPath(ctx context.Context, path string, value any) (*types.WorkflowInstance, error)
GetWorkflowByParamJSONPath finds the first workflow where any task's Payload matches the given JSONPath + value.
func (*Client) ResumeJob ¶ added in v0.1.10
ResumeJob manually resumes a paused job. The job transitions back to retrying and is immediately dispatched for execution.
func (*Client) RetryBatch ¶
func (c *Client) RetryBatch(ctx context.Context, batchID string, retryFailedOnly bool) (*types.BatchInstance, error)
RetryBatch retries a failed batch. If retryFailedOnly is true, only failed items are re-processed.
func (*Client) RetryWorkflow ¶
func (c *Client) RetryWorkflow(ctx context.Context, workflowID string) (*types.WorkflowInstance, error)
RetryWorkflow retries a failed workflow. Only failed/dead/cancelled tasks are reset — completed tasks are preserved so work resumes from the failure point.
func (*Client) SignalWorkflow ¶
func (c *Client) SignalWorkflow(ctx context.Context, workflowID string, signalName string, payload any) error
SignalWorkflow sends an asynchronous signal to a running workflow. The signal is appended to a per-workflow signal stream and consumed by the orchestrator on the next event loop tick.
func (*Client) Store ¶
func (c *Client) Store() *store.RedisStore
Store returns the underlying RedisStore.
func (*Client) SubscribeBatchProgress ¶
func (c *Client) SubscribeBatchProgress(ctx context.Context, batchID string) (<-chan types.BatchProgress, error)
SubscribeBatchProgress subscribes to batch progress events.
type EnqueueRequest ¶
type EnqueueRequest struct {
TaskType types.TaskType `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Schedule types.Schedule `json:"schedule"`
RetryPolicy *types.RetryPolicy `json:"retry_policy,omitempty"`
Tags []string `json:"tags,omitempty"`
UniqueKey *string `json:"unique_key,omitempty"`
DLQAfter *int `json:"dlq_after,omitempty"`
Priority *types.Priority `json:"priority,omitempty"`
HeartbeatTimeout *types.Duration `json:"heartbeat_timeout,omitempty"`
// RequestID enables idempotent enqueue: if the same RequestID is sent again
// within 5 minutes, the cached response is returned instead of creating a duplicate job.
RequestID *string `json:"request_id,omitempty"`
}
EnqueueRequest describes a job to enqueue.
type Option ¶
type Option func(*clientConfig)
Option configures the client.
func WithClusterAddrs ¶
WithClusterAddrs sets Redis Cluster node addresses for cluster mode.
func WithKeyPrefix ¶
WithKeyPrefix sets the key prefix for multi-tenant isolation.
func WithPriorityTiers ¶
func WithPriorityTiers(tiers []store.TierConfig) Option
WithPriorityTiers sets the priority tier configuration.
func WithRedisClient ¶
WithRedisClient uses a pre-existing rueidis client.
func WithRedisPassword ¶
WithRedisPassword sets the Redis password.
func WithRedisPoolSize ¶
WithRedisPoolSize sets the Redis connection pool size.
func WithRedisUsername ¶ added in v0.1.10
WithRedisUsername sets the Redis ACL username (Redis 6+).
func WithStore ¶
func WithStore(s *store.RedisStore) Option
WithStore uses a pre-existing RedisStore (for in-process use with the server).