Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
- type DurableBackend
- type DurableCodec
- type DurableHandlerSpec
- type DurableTask
- type DurableTaskLease
- type GRPCAuthFunc
- type GRPCServer
- func (s *GRPCServer) CancelTask(ctx context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
- func (s *GRPCServer) GetTask(ctx context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
- func (s *GRPCServer) RegisterDurableTasks(ctx context.Context, req *workerpb.RegisterDurableTasksRequest) (*workerpb.RegisterDurableTasksResponse, error)
- func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
- func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, ...) error
- type GRPCServerOption
- type HandlerSpec
- type MetricsSnapshot
- type Middleware
- type OTelMetricsOption
- type ProtoDurableCodec
- type RedisDurableBackend
- func (b *RedisDurableBackend) Ack(ctx context.Context, lease DurableTaskLease) error
- func (b *RedisDurableBackend) Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
- func (b *RedisDurableBackend) Enqueue(ctx context.Context, task DurableTask) error
- func (b *RedisDurableBackend) Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
- func (b *RedisDurableBackend) Fail(ctx context.Context, lease DurableTaskLease, err error) error
- func (b *RedisDurableBackend) Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
- type RedisDurableOption
- type Result
- type ResultDropPolicy
- type RetentionPolicy
- type Service
- type Task
- func (task *Task) CancelledAt() time.Time
- func (task *Task) CancelledChan() <-chan struct{}
- func (task *Task) CompletedAt() time.Time
- func (task *Task) Error() error
- func (task *Task) IsValid() (err error)
- func (task *Task) Result() any
- func (task *Task) ShouldSchedule() error
- func (task *Task) StartedAt() time.Time
- func (task *Task) Status() TaskStatus
- type TaskFunc
- type TaskHooks
- type TaskManager
- func (tm *TaskManager) CancelAll()
- func (tm *TaskManager) CancelTask(id uuid.UUID) error
- func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
- func (tm *TaskManager) GetActiveTasks() int
- func (tm *TaskManager) GetMetrics() MetricsSnapshot
- func (tm *TaskManager) GetResults() <-chan Result
- func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
- func (tm *TaskManager) GetTasks() []*Task
- func (tm *TaskManager) IsEmpty() bool
- func (tm *TaskManager) RegisterDurableTask(ctx context.Context, task DurableTask) error
- func (tm *TaskManager) RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error
- func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
- func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error
- func (tm *TaskManager) SetHooks(hooks TaskHooks)
- func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
- func (tm *TaskManager) SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error
- func (tm *TaskManager) SetResultsDropPolicy(policy ResultDropPolicy)
- func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)
- func (tm *TaskManager) SetTracer(tracer TaskTracer)
- func (tm *TaskManager) StartWorkers(ctx context.Context)
- func (tm *TaskManager) StopGraceful(ctx context.Context) error
- func (tm *TaskManager) StopNow()
- func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())
- func (tm *TaskManager) Wait(ctx context.Context) error
- type TaskManagerOption
- func WithDurableBackend(backend DurableBackend) TaskManagerOption
- func WithDurableBatchSize(size int) TaskManagerOption
- func WithDurableCodec(codec DurableCodec) TaskManagerOption
- func WithDurableHandlers(handlers map[string]DurableHandlerSpec) TaskManagerOption
- func WithDurableLease(lease time.Duration) TaskManagerOption
- func WithDurableLeaseRenewalInterval(interval time.Duration) TaskManagerOption
- func WithDurablePollInterval(interval time.Duration) TaskManagerOption
- func WithMaxRetries(n int) TaskManagerOption
- func WithMaxTasks(n int) TaskManagerOption
- func WithMaxWorkers(n int) TaskManagerOption
- func WithRetryDelay(delay time.Duration) TaskManagerOption
- func WithTasksPerSecond(n float64) TaskManagerOption
- func WithTimeout(timeout time.Duration) TaskManagerOption
- type TaskSpan
- type TaskStatus
- type TaskTracer
Constants ¶
const ( // ContextDeadlineReached means the context is past its deadline. ContextDeadlineReached = TaskStatus(1) // RateLimited means the number of concurrent tasks per second exceeded the maximum allowed. RateLimited = TaskStatus(2) // Cancelled means `CancelTask` was invoked and the `Task` was cancelled. Cancelled = TaskStatus(3) // Failed means the `Task` failed. Failed = TaskStatus(4) // Queued means the `Task` is queued. Queued = TaskStatus(5) // Running means the `Task` is running. Running = TaskStatus(6) // Invalid means the `Task` is invalid. Invalid = TaskStatus(7) // Completed means the `Task` is completed. Completed = TaskStatus(8) )
TaskStatus values.
const ( // DefaultMaxTasks is the default maximum number of tasks that can be executed at once. DefaultMaxTasks = 10 // DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second. DefaultTasksPerSecond = 5 // DefaultTimeout is the default timeout for tasks. DefaultTimeout = 5 * time.Minute // DefaultRetryDelay is the default delay between retries. DefaultRetryDelay = 1 * time.Second // DefaultMaxRetries is the default maximum number of retries. DefaultMaxRetries = 3 // ErrMsgContextDone is the error message used when the context is done. ErrMsgContextDone = "context done" )
Variables ¶
var ( // ErrInvalidTaskID is returned when a task has an invalid ID. ErrInvalidTaskID = ewrap.New("invalid task id") // ErrInvalidTaskFunc is returned when a task has an invalid function. ErrInvalidTaskFunc = ewrap.New("invalid task function") // ErrInvalidTaskContext is returned when a task has an invalid context. ErrInvalidTaskContext = ewrap.New("invalid task context") // ErrTaskNotFound is returned when a task is not found. ErrTaskNotFound = ewrap.New("task not found") // ErrTaskTimeout is returned when a task times out. ErrTaskTimeout = ewrap.New("task timeout") // ErrTaskCancelled is returned when a task is cancelled. ErrTaskCancelled = ewrap.New("task cancelled") // ErrTaskAlreadyStarted is returned when a task is already started. ErrTaskAlreadyStarted = ewrap.New("task already started") // ErrTaskCompleted is returned when a task is already completed. ErrTaskCompleted = ewrap.New("task completed") // ErrDurableLeaseNotFound is returned when a durable lease cannot be renewed. ErrDurableLeaseNotFound = ewrap.New("durable lease not found") )
Errors returned by the TaskManager.
Functions ¶
func RegisterMiddleware ¶
func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
RegisterMiddleware registers middlewares to the provided service.
Types ¶
type DurableBackend ¶ added in v0.1.8
type DurableBackend interface {
Enqueue(ctx context.Context, task DurableTask) error
Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
Ack(ctx context.Context, lease DurableTaskLease) error
Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
Fail(ctx context.Context, lease DurableTaskLease, err error) error
Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
}
DurableBackend provides persistence and leasing for durable tasks.
type DurableCodec ¶ added in v0.1.8
type DurableCodec interface {
Marshal(msg proto.Message) ([]byte, error)
Unmarshal(data []byte, msg proto.Message) error
}
DurableCodec marshals and unmarshals durable task payloads.
type DurableHandlerSpec ¶ added in v0.1.8
type DurableHandlerSpec struct {
Make func() proto.Message
Fn func(ctx context.Context, payload proto.Message) (any, error)
}
DurableHandlerSpec describes a durable task handler.
type DurableTask ¶ added in v0.1.8
type DurableTask struct {
ID uuid.UUID
Handler string
Message proto.Message
Payload []byte
Priority int
Retries int
RetryDelay time.Duration
Metadata map[string]string
}
DurableTask represents a task that can be persisted and rehydrated.
type DurableTaskLease ¶ added in v0.1.8
type DurableTaskLease struct {
Task DurableTask
LeaseID string
Attempts int
MaxRetries int
}
DurableTaskLease represents a leased durable task.
type GRPCAuthFunc ¶ added in v0.1.6
GRPCAuthFunc authorizes a gRPC request before handling it. Return a gRPC status error to control the response code.
type GRPCServer ¶ added in v0.1.1
type GRPCServer struct {
// contains filtered or unexported fields
}
GRPCServer implements the generated WorkerServiceServer interface.
func NewGRPCServer ¶ added in v0.1.1
func NewGRPCServer(svc Service, handlers map[string]HandlerSpec, opts ...GRPCServerOption) *GRPCServer
NewGRPCServer creates a new gRPC server backed by the provided Service.
func (*GRPCServer) CancelTask ¶ added in v0.1.1
func (s *GRPCServer) CancelTask(ctx context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
CancelTask cancels an active task by its ID.
func (*GRPCServer) GetTask ¶ added in v0.1.1
func (s *GRPCServer) GetTask(ctx context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
GetTask returns information about a task by its ID.
func (*GRPCServer) RegisterDurableTasks ¶ added in v0.1.8
func (s *GRPCServer) RegisterDurableTasks( ctx context.Context, req *workerpb.RegisterDurableTasksRequest, ) (*workerpb.RegisterDurableTasksResponse, error)
RegisterDurableTasks registers one or more durable tasks with the underlying service.
func (*GRPCServer) RegisterTasks ¶ added in v0.1.1
func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
RegisterTasks registers one or more tasks with the underlying service.
func (*GRPCServer) StreamResults ¶ added in v0.1.1
func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, stream workerpb.WorkerService_StreamResultsServer) error
StreamResults streams task results back to the client.
type GRPCServerOption ¶ added in v0.1.6
type GRPCServerOption func(*GRPCServer)
GRPCServerOption configures the gRPC server.
func WithGRPCAuth ¶ added in v0.1.6
func WithGRPCAuth(auth GRPCAuthFunc) GRPCServerOption
WithGRPCAuth installs an authorization hook for gRPC requests.
type HandlerSpec ¶ added in v0.1.1
type HandlerSpec struct {
// Make returns a zero value of the payload message to unmarshal into.
Make func() protoreflect.ProtoMessage
// Fn does the work. Your Task.Execute will call this.
Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}
HandlerSpec describes a single handler for a gRPC method.
type MetricsSnapshot ¶ added in v0.1.0
type MetricsSnapshot struct {
Scheduled int64
Running int64
Completed int64
Failed int64
Cancelled int64
Retried int64
ResultsDropped int64
QueueDepth int
TaskLatencyCount int64
TaskLatencyTotal time.Duration
TaskLatencyMax time.Duration
}
MetricsSnapshot represents a snapshot of task metrics.
type OTelMetricsOption ¶ added in v0.1.6
type OTelMetricsOption func(*otelMetricsConfig)
OTelMetricsOption configures OpenTelemetry metrics.
func WithOTelMeterName ¶ added in v0.1.6
func WithOTelMeterName(name string) OTelMetricsOption
WithOTelMeterName overrides the default OTel meter name.
func WithOTelMeterVersion ¶ added in v0.1.6
func WithOTelMeterVersion(version string) OTelMetricsOption
WithOTelMeterVersion sets the instrumentation version reported by the meter.
type ProtoDurableCodec ¶ added in v0.1.8
type ProtoDurableCodec struct{}
ProtoDurableCodec uses protobuf for serialization.
type RedisDurableBackend ¶ added in v0.1.8
type RedisDurableBackend struct {
// contains filtered or unexported fields
}
RedisDurableBackend implements a durable task backend using Redis.
func NewRedisDurableBackend ¶ added in v0.1.8
func NewRedisDurableBackend(client rueidis.Client, opts ...RedisDurableOption) (*RedisDurableBackend, error)
NewRedisDurableBackend creates a new RedisDurableBackend with the given Redis client and options.
func (*RedisDurableBackend) Ack ¶ added in v0.1.8
func (b *RedisDurableBackend) Ack(ctx context.Context, lease DurableTaskLease) error
Ack acknowledges the successful processing of a durable task.
func (*RedisDurableBackend) Dequeue ¶ added in v0.1.8
func (b *RedisDurableBackend) Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
Dequeue retrieves a batch of durable tasks from the Redis backend with a lease.
func (*RedisDurableBackend) Enqueue ¶ added in v0.1.8
func (b *RedisDurableBackend) Enqueue(ctx context.Context, task DurableTask) error
Enqueue adds a new durable task to the Redis backend.
func (*RedisDurableBackend) Extend ¶ added in v0.1.8
func (b *RedisDurableBackend) Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
Extend renews the processing lease for a durable task.
func (*RedisDurableBackend) Fail ¶ added in v0.1.8
func (b *RedisDurableBackend) Fail(ctx context.Context, lease DurableTaskLease, err error) error
Fail marks a durable task as failed and moves it to the dead letter queue.
func (*RedisDurableBackend) Nack ¶ added in v0.1.8
func (b *RedisDurableBackend) Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
Nack negatively acknowledges a durable task, making it available for reprocessing after a delay.
type RedisDurableOption ¶ added in v0.1.8
type RedisDurableOption func(*RedisDurableBackend)
RedisDurableOption defines an option for configuring RedisDurableBackend.
func WithRedisDurableBatchSize ¶ added in v0.1.8
func WithRedisDurableBatchSize(size int) RedisDurableOption
WithRedisDurableBatchSize sets the batch size for dequeuing tasks.
func WithRedisDurablePrefix ¶ added in v0.1.8
func WithRedisDurablePrefix(prefix string) RedisDurableOption
WithRedisDurablePrefix sets the key prefix for Redis durable backend.
type Result ¶ added in v0.0.4
type Result struct {
Task *Task // the task that produced the result
Result any // the result of the task
Error error // the error returned by the task
}
Result is a task result.
type ResultDropPolicy ¶ added in v0.1.6
type ResultDropPolicy uint8
ResultDropPolicy defines how to handle full subscriber buffers.
const ( // DropNewest drops the new result when the subscriber buffer is full. DropNewest ResultDropPolicy = iota // DropOldest drops the oldest buffered result to make room for the new one. DropOldest )
type RetentionPolicy ¶ added in v0.1.4
RetentionPolicy controls how completed tasks are kept in the registry.
type Service ¶
type Service interface {
// contains filtered or unexported methods
}
Service is an interface for a task manager.
type Task ¶
type Task struct {
ID uuid.UUID `json:"id"` // ID is the id of the task
Name string `json:"name"` // Name is the name of the task
Description string `json:"description"` // Description is the description of the task
Priority int `json:"priority"` // Priority is the priority of the task
Execute TaskFunc `json:"-"` // Execute is the function that will be executed by the task
Ctx context.Context `json:"-"` // Ctx is the context of the task
CancelFunc context.CancelFunc `json:"-"` // CancelFunc is the cancel function of the task
Retries int `json:"retries"` // Retries is the maximum number of retries for failed tasks
RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
// contains filtered or unexported fields
}
Task represents a function that can be executed by the task manager.
Note: access Task state via methods to avoid data races.
func (*Task) CancelledAt ¶ added in v0.1.4
CancelledAt returns the cancellation time if set.
func (*Task) CancelledChan ¶ added in v0.0.4
func (task *Task) CancelledChan() <-chan struct{}
CancelledChan returns a channel which gets closed when the task is cancelled.
func (*Task) CompletedAt ¶ added in v0.1.4
CompletedAt returns the completion time if set.
func (*Task) ShouldSchedule ¶ added in v0.0.5
ShouldSchedule returns an error if the task should not be scheduled.
func (*Task) Status ¶ added in v0.0.4
func (task *Task) Status() TaskStatus
Status returns the current task status.
type TaskHooks ¶ added in v0.1.5
type TaskHooks struct {
OnQueued func(task *Task)
OnStart func(task *Task)
OnFinish func(task *Task, status TaskStatus, result any, err error)
OnRetry func(task *Task, delay time.Duration, attempt int)
}
TaskHooks defines optional callbacks for task lifecycle events.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager is a struct that manages a pool of goroutines that can execute tasks.
func NewTaskManager ¶
func NewTaskManager( ctx context.Context, maxWorkers, maxTasks int, tasksPerSecond float64, timeout, retryDelay time.Duration, maxRetries int, ) *TaskManager
NewTaskManager creates a new task manager.
- ctx is the context for the task manager
- maxWorkers is the number of workers to start, if <=0, the number of CPUs will be used
- maxTasks is the maximum number of tasks that can be queued at once, defaults to 10
- tasksPerSecond is the rate limit of tasks that can be executed per second; <=0 disables rate limiting The limiter uses a burst size of min(maxWorkers, maxTasks) for deterministic throttling.
- timeout is the default timeout for tasks, defaults to 5 minutes
- retryDelay is the default delay between retries, defaults to 1 second
- maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)
func NewTaskManagerWithDefaults ¶ added in v0.0.5
func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager
NewTaskManagerWithDefaults creates a new task manager with default values.
- maxWorkers: runtime.NumCPU()
- maxTasks: 10
- tasksPerSecond: 5
- timeout: 5 minutes
- retryDelay: 1 second
- maxRetries: 3
func NewTaskManagerWithOptions ¶ added in v0.1.8
func NewTaskManagerWithOptions(ctx context.Context, opts ...TaskManagerOption) *TaskManager
NewTaskManagerWithOptions creates a new task manager using functional options.
func (*TaskManager) CancelTask ¶
func (tm *TaskManager) CancelTask(id uuid.UUID) error
CancelTask cancels a task by its ID.
func (*TaskManager) ExecuteTask ¶ added in v0.0.2
func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
ExecuteTask executes a task given its ID and returns the result.
func (*TaskManager) GetActiveTasks ¶ added in v0.0.4
func (tm *TaskManager) GetActiveTasks() int
GetActiveTasks returns the number of running tasks.
func (*TaskManager) GetMetrics ¶ added in v0.1.0
func (tm *TaskManager) GetMetrics() MetricsSnapshot
GetMetrics returns a snapshot of current metrics.
func (*TaskManager) GetResults ¶
func (tm *TaskManager) GetResults() <-chan Result
GetResults returns a results channel (compatibility shim for legacy API). Use SubscribeResults for explicit unsubscription and buffer control.
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
GetTask gets a task by its ID.
func (*TaskManager) IsEmpty ¶ added in v0.0.7
func (tm *TaskManager) IsEmpty() bool
IsEmpty checks if the task scheduler queue is empty.
func (*TaskManager) RegisterDurableTask ¶ added in v0.1.8
func (tm *TaskManager) RegisterDurableTask(ctx context.Context, task DurableTask) error
RegisterDurableTask registers a durable task into the configured backend.
func (*TaskManager) RegisterDurableTasks ¶ added in v0.1.8
func (tm *TaskManager) RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error
RegisterDurableTasks registers multiple durable tasks.
func (*TaskManager) RegisterTask ¶
func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
RegisterTask registers a new task to the task manager.
func (*TaskManager) RegisterTasks ¶ added in v0.0.4
func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error
RegisterTasks registers multiple tasks to the task manager at once.
func (*TaskManager) SetHooks ¶ added in v0.1.5
func (tm *TaskManager) SetHooks(hooks TaskHooks)
SetHooks configures callbacks for task lifecycle events.
func (*TaskManager) SetMaxWorkers ¶ added in v0.1.0
func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.
func (*TaskManager) SetMeterProvider ¶ added in v0.1.6
func (tm *TaskManager) SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error
SetMeterProvider enables OpenTelemetry metrics collection. Passing nil disables it.
func (*TaskManager) SetResultsDropPolicy ¶ added in v0.1.6
func (tm *TaskManager) SetResultsDropPolicy(policy ResultDropPolicy)
SetResultsDropPolicy configures how full subscriber buffers are handled.
func (*TaskManager) SetRetentionPolicy ¶ added in v0.1.4
func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)
SetRetentionPolicy configures task registry retention.
func (*TaskManager) SetTracer ¶ added in v0.1.6
func (tm *TaskManager) SetTracer(tracer TaskTracer)
SetTracer configures the task tracer.
func (*TaskManager) StartWorkers ¶ added in v0.0.4
func (tm *TaskManager) StartWorkers(ctx context.Context)
StartWorkers starts the task manager's workers and scheduler (idempotent).
func (*TaskManager) StopGraceful ¶ added in v0.1.4
func (tm *TaskManager) StopGraceful(ctx context.Context) error
StopGraceful stops accepting new tasks and waits for completion before stopping workers.
func (*TaskManager) StopNow ¶ added in v0.1.4
func (tm *TaskManager) StopNow()
StopNow cancels running tasks and stops workers immediately.
func (*TaskManager) SubscribeResults ¶ added in v0.1.4
func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())
SubscribeResults returns a results channel and an unsubscribe function.
type TaskManagerOption ¶ added in v0.1.8
type TaskManagerOption func(*taskManagerConfig)
TaskManagerOption configures a TaskManager.
func WithDurableBackend ¶ added in v0.1.8
func WithDurableBackend(backend DurableBackend) TaskManagerOption
WithDurableBackend sets the durable backend for the TaskManager.
func WithDurableBatchSize ¶ added in v0.1.8
func WithDurableBatchSize(size int) TaskManagerOption
WithDurableBatchSize sets the durable task batch size.
func WithDurableCodec ¶ added in v0.1.8
func WithDurableCodec(codec DurableCodec) TaskManagerOption
WithDurableCodec sets the durable codec for the TaskManager.
func WithDurableHandlers ¶ added in v0.1.8
func WithDurableHandlers(handlers map[string]DurableHandlerSpec) TaskManagerOption
WithDurableHandlers sets the durable handlers for the TaskManager.
func WithDurableLease ¶ added in v0.1.8
func WithDurableLease(lease time.Duration) TaskManagerOption
WithDurableLease sets the durable task lease duration.
func WithDurableLeaseRenewalInterval ¶ added in v0.1.8
func WithDurableLeaseRenewalInterval(interval time.Duration) TaskManagerOption
WithDurableLeaseRenewalInterval sets the interval for renewing durable task leases. Set to <= 0 to disable renewal.
func WithDurablePollInterval ¶ added in v0.1.8
func WithDurablePollInterval(interval time.Duration) TaskManagerOption
WithDurablePollInterval sets the durable task polling interval.
func WithMaxRetries ¶ added in v0.1.8
func WithMaxRetries(n int) TaskManagerOption
WithMaxRetries sets the maximum number of retries for a task.
func WithMaxTasks ¶ added in v0.1.8
func WithMaxTasks(n int) TaskManagerOption
WithMaxTasks sets the maximum number of tasks in the queue.
func WithMaxWorkers ¶ added in v0.1.8
func WithMaxWorkers(n int) TaskManagerOption
WithMaxWorkers sets the maximum number of workers.
func WithRetryDelay ¶ added in v0.1.8
func WithRetryDelay(delay time.Duration) TaskManagerOption
WithRetryDelay sets the delay between task retries.
func WithTasksPerSecond ¶ added in v0.1.8
func WithTasksPerSecond(n float64) TaskManagerOption
WithTasksPerSecond sets the maximum number of tasks to start per second.
func WithTimeout ¶ added in v0.1.8
func WithTimeout(timeout time.Duration) TaskManagerOption
WithTimeout sets the task execution timeout.
type TaskSpan ¶ added in v0.1.6
type TaskSpan interface {
End(err error)
}
TaskSpan represents an in-flight task span.
type TaskStatus ¶ added in v0.0.4
type TaskStatus uint8
TaskStatus is a value used to represent the task status.
func (TaskStatus) String ¶ added in v0.0.4
func (ts TaskStatus) String() string
String returns the string representation of the task status.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
__examples
|
|
|
durable_dlq_replay
command
|
|
|
durable_queue_inspect
command
|
|
|
durable_redis
command
|
|
|
grpc
command
|
|
|
grpc_durable
command
|
|
|
manual
command
|
|
|
middleware
command
|
|
|
multi
command
|
|
|
otel_metrics
command
|
|
|
otel_metrics_otlp
command
|
|
|
otel_tracing
command
|
|
|
tracing
command
|
|
|
pkg
|
|