client

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the dureq client SDK for enqueuing jobs and querying status. It connects directly to Redis — no server-side request/reply needed.

func New

func New(opts ...Option) (*Client, error)

New creates a new dureq client.

func (*Client) BackfillSchedule

func (c *Client) BackfillSchedule(ctx context.Context, jobID string, startTime, endTime time.Time) (int, error)

BackfillSchedule manually triggers all missed firings for a scheduled job within the given time range. Each firing is dispatched as a separate execution. Returns the number of firings dispatched.

func (*Client) Cancel

func (c *Client) Cancel(ctx context.Context, jobID string) error

Cancel cancels a job by ID.

func (*Client) CancelBatch

func (c *Client) CancelBatch(ctx context.Context, batchID string) error

CancelBatch cancels a running batch.

func (*Client) CancelByUniqueKey

func (c *Client) CancelByUniqueKey(ctx context.Context, uniqueKey string) error

CancelByUniqueKey cancels a job identified by its unique key.

func (*Client) CancelWorkflow

func (c *Client) CancelWorkflow(ctx context.Context, workflowID string) error

CancelWorkflow cancels a running workflow and all its non-terminal tasks.

func (*Client) Close

func (c *Client) Close()

Close closes the client connection (only if we created the Redis client).

func (*Client) DrainNode

func (c *Client) DrainNode(ctx context.Context, nodeID string, drain bool) error

DrainNode puts a node into drain mode. The node stops fetching new messages but continues processing in-flight tasks. Call with drain=false to resume.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, taskType types.TaskType, payload any) (*types.Job, error)

Enqueue submits a job for immediate execution.

func (*Client) EnqueueAndWait

func (c *Client) EnqueueAndWait(ctx context.Context, taskType types.TaskType, payload any, timeout time.Duration) (*types.WorkResult, error)

EnqueueAndWait submits a job for immediate execution and waits for the result. It subscribes to the result channel BEFORE enqueuing to prevent a race condition where a fast job completes before the subscription is active.

func (*Client) EnqueueBatch

func (c *Client) EnqueueBatch(ctx context.Context, def types.BatchDefinition) (*types.BatchInstance, error)

EnqueueBatch submits a batch definition for processing.

func (*Client) EnqueueGroup

func (c *Client) EnqueueGroup(ctx context.Context, opt types.EnqueueGroupOption) (int64, error)

EnqueueGroup adds a task to a named group for aggregated processing. The group is flushed automatically by the server's aggregation processor based on the configured GracePeriod, MaxDelay, or MaxSize. Returns the current group size after adding the message.

func (*Client) EnqueueScheduled

func (c *Client) EnqueueScheduled(ctx context.Context, req *EnqueueRequest) (*types.Job, error)

EnqueueScheduled submits a job with a schedule.

func (*Client) EnqueueWorkflow

func (c *Client) EnqueueWorkflow(ctx context.Context, def types.WorkflowDefinition, input any) (*types.WorkflowInstance, error)

EnqueueWorkflow submits a workflow definition for execution.

func (*Client) GetBatch

func (c *Client) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, error)

GetBatch retrieves the current state of a batch instance.

func (*Client) GetBatchByParamJSONPath

func (c *Client) GetBatchByParamJSONPath(ctx context.Context, path string, value any) (*types.BatchInstance, error)

GetBatchByParamJSONPath finds the first batch where the onetime payload or any item payload matches the given JSONPath + value.

func (*Client) GetBatchResults

func (c *Client) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)

GetBatchResults retrieves all item results for a batch.

func (*Client) GetJob

func (c *Client) GetJob(ctx context.Context, jobID string) (*types.Job, error)

GetJob retrieves a job by ID.

func (*Client) GetJobByParamJSONPath

func (c *Client) GetJobByParamJSONPath(ctx context.Context, path string, value any) (*types.Job, error)

GetJobByParamJSONPath finds the first job whose Payload matches the given JSONPath + value. Uses RedisJSON's native JSONPath for efficient field extraction. path is a dot-separated field path (e.g. "user_id", "order.type").

func (*Client) GetJobByParamJSONPathWithTaskType added in v0.1.7

func (c *Client) GetJobByParamJSONPathWithTaskType(ctx context.Context, path string, value any, taskType string) (*types.Job, error)

GetJobByParamJSONPathWithTaskType finds the first job of the given task type whose Payload matches the given JSONPath + value. Only scans jobs of that task type, so it is significantly faster than GetJobByParamJSONPath when the task type is known.

func (*Client) GetWorkflow

func (c *Client) GetWorkflow(ctx context.Context, workflowID string) (*types.WorkflowInstance, error)

GetWorkflow retrieves the current state of a workflow instance.

func (*Client) GetWorkflowByParamJSONPath

func (c *Client) GetWorkflowByParamJSONPath(ctx context.Context, path string, value any) (*types.WorkflowInstance, error)

GetWorkflowByParamJSONPath finds the first workflow where any task's Payload matches the given JSONPath + value.

func (*Client) ResumeJob added in v0.1.10

func (c *Client) ResumeJob(ctx context.Context, jobID string) error

ResumeJob manually resumes a paused job. The job transitions back to retrying and is immediately dispatched for execution.

func (*Client) Retry

func (c *Client) Retry(ctx context.Context, jobID string) error

Retry re-enqueues a failed or dead job for another attempt.

func (*Client) RetryBatch

func (c *Client) RetryBatch(ctx context.Context, batchID string, retryFailedOnly bool) (*types.BatchInstance, error)

RetryBatch retries a failed batch. If retryFailedOnly is true, only failed items are re-processed.

func (*Client) RetryWorkflow

func (c *Client) RetryWorkflow(ctx context.Context, workflowID string) (*types.WorkflowInstance, error)

RetryWorkflow retries a failed workflow. Only failed/dead/cancelled tasks are reset — completed tasks are preserved so work resumes from the failure point.

func (*Client) SignalWorkflow

func (c *Client) SignalWorkflow(ctx context.Context, workflowID string, signalName string, payload any) error

SignalWorkflow sends an asynchronous signal to a running workflow. The signal is appended to a per-workflow signal stream and consumed by the orchestrator on the next event loop tick.

func (*Client) Status

func (c *Client) Status(ctx context.Context, jobID string) (*types.Job, error)

Status queries the current status of a job.

func (*Client) Store

func (c *Client) Store() *store.RedisStore

Store returns the underlying RedisStore.

func (*Client) SubscribeBatchProgress

func (c *Client) SubscribeBatchProgress(ctx context.Context, batchID string) (<-chan types.BatchProgress, error)

SubscribeBatchProgress subscribes to batch progress events.

type EnqueueRequest

type EnqueueRequest struct {
	TaskType         types.TaskType     `json:"task_type"`
	Payload          json.RawMessage    `json:"payload"`
	Schedule         types.Schedule     `json:"schedule"`
	RetryPolicy      *types.RetryPolicy `json:"retry_policy,omitempty"`
	Tags             []string           `json:"tags,omitempty"`
	UniqueKey        *string            `json:"unique_key,omitempty"`
	DLQAfter         *int               `json:"dlq_after,omitempty"`
	Priority         *types.Priority    `json:"priority,omitempty"`
	HeartbeatTimeout *types.Duration    `json:"heartbeat_timeout,omitempty"`
	// RequestID enables idempotent enqueue: if the same RequestID is sent again
	// within 5 minutes, the cached response is returned instead of creating a duplicate job.
	RequestID *string `json:"request_id,omitempty"`
}

EnqueueRequest describes a job to enqueue.

type Option

type Option func(*clientConfig)

Option configures the client.

func WithClusterAddrs

func WithClusterAddrs(addrs []string) Option

WithClusterAddrs sets Redis Cluster node addresses for cluster mode.

func WithKeyPrefix

func WithKeyPrefix(prefix string) Option

WithKeyPrefix sets the key prefix for multi-tenant isolation.

func WithPriorityTiers

func WithPriorityTiers(tiers []store.TierConfig) Option

WithPriorityTiers sets the priority tier configuration.

func WithRedisClient

func WithRedisClient(rdb rueidis.Client) Option

WithRedisClient uses a pre-existing rueidis client.

func WithRedisDB

func WithRedisDB(db int) Option

WithRedisDB sets the Redis database number.

func WithRedisPassword

func WithRedisPassword(password string) Option

WithRedisPassword sets the Redis password.

func WithRedisPoolSize

func WithRedisPoolSize(n int) Option

WithRedisPoolSize sets the Redis connection pool size.

func WithRedisURL

func WithRedisURL(url string) Option

WithRedisURL sets the Redis server URL.

func WithRedisUsername added in v0.1.10

func WithRedisUsername(username string) Option

WithRedisUsername sets the Redis ACL username (Redis 6+).

func WithStore

func WithStore(s *store.RedisStore) Option

WithStore uses a pre-existing RedisStore (for in-process use with the server).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL