Documentation
¶
Index ¶
- Constants
- Variables
- func AddTypedDurableHandler[T proto.Message](r *TypedDurableRegistry, name string, spec TypedDurableHandlerSpec[T]) error
- func AddTypedHandler[T protoreflect.ProtoMessage](r *TypedHandlerRegistry, name string, spec TypedHandlerSpec[T]) error
- func IsTarballSource(source string) bool
- func JobPayloadFromSpec(spec AdminJobSpec) (*structpb.Struct, error)
- func LoadAdminMTLSConfig(certFile, keyFile, caFile string) (*tls.Config, error)
- func NewAdminGatewayServer(cfg AdminGatewayConfig) (*http.Server, error)
- func NormalizeJobSource(source string) string
- func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
- type AdminActionCounters
- type AdminAuditEvent
- type AdminAuditEventFilter
- type AdminAuditEventPage
- type AdminBackend
- type AdminCoordination
- type AdminDLQEntry
- type AdminDLQEntryDetail
- type AdminDLQFilter
- type AdminDLQPage
- type AdminGatewayConfig
- type AdminGuardrails
- type AdminJob
- type AdminJobEvent
- type AdminJobEventFilter
- type AdminJobEventPage
- type AdminJobEventStore
- type AdminJobSpec
- type AdminObservability
- func (collector *AdminObservability) Middleware(next http.Handler) http.Handler
- func (collector *AdminObservability) Prometheus() string
- func (collector *AdminObservability) RecordGRPC(method string, code codes.Code, duration time.Duration)
- func (collector *AdminObservability) RecordHTTP(method, route string, statusCode int, duration time.Duration)
- func (collector *AdminObservability) RecordJobOutcome(statusCode string, duration time.Duration)
- func (collector *AdminObservability) RecordJobQueued()
- func (collector *AdminObservability) Snapshot() AdminObservabilitySnapshot
- func (collector *AdminObservability) UnaryServerInterceptor() grpc.UnaryServerInterceptor
- type AdminObservabilityJobStat
- type AdminObservabilityMethodStat
- type AdminObservabilitySnapshot
- type AdminOverview
- type AdminQueueSummary
- type AdminSchedule
- type AdminScheduleEvent
- type AdminScheduleEventFilter
- type AdminScheduleEventPage
- type AdminScheduleFactory
- type AdminScheduleSpec
- type CronDurableFactory
- type CronTaskFactory
- type DurableBackend
- type DurableCodec
- type DurableHandlerSpec
- type DurableTask
- type DurableTaskLease
- type FileJobEventStore
- type FileJobEventStoreOption
- type GRPCAuthFunc
- type GRPCServer
- func (s *GRPCServer) CancelTask(ctx context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
- func (s *GRPCServer) CreateSchedule(ctx context.Context, req *workerpb.CreateScheduleRequest) (*workerpb.CreateScheduleResponse, error)
- func (s *GRPCServer) DeleteJob(ctx context.Context, req *workerpb.DeleteJobRequest) (*workerpb.DeleteJobResponse, error)
- func (s *GRPCServer) DeleteSchedule(ctx context.Context, req *workerpb.DeleteScheduleRequest) (*workerpb.DeleteScheduleResponse, error)
- func (s *GRPCServer) GetDLQEntry(ctx context.Context, req *workerpb.GetDLQEntryRequest) (*workerpb.GetDLQEntryResponse, error)
- func (s *GRPCServer) GetHealth(ctx context.Context, req *workerpb.GetHealthRequest) (*workerpb.GetHealthResponse, error)
- func (s *GRPCServer) GetJob(ctx context.Context, req *workerpb.GetJobRequest) (*workerpb.GetJobResponse, error)
- func (s *GRPCServer) GetOverview(ctx context.Context, req *workerpb.GetOverviewRequest) (*workerpb.GetOverviewResponse, error)
- func (s *GRPCServer) GetQueue(ctx context.Context, req *workerpb.GetQueueRequest) (*workerpb.GetQueueResponse, error)
- func (s *GRPCServer) GetTask(ctx context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
- func (s *GRPCServer) ListAuditEvents(ctx context.Context, req *workerpb.ListAuditEventsRequest) (*workerpb.ListAuditEventsResponse, error)
- func (s *GRPCServer) ListDLQ(ctx context.Context, req *workerpb.ListDLQRequest) (*workerpb.ListDLQResponse, error)
- func (s *GRPCServer) ListJobEvents(ctx context.Context, req *workerpb.ListJobEventsRequest) (*workerpb.ListJobEventsResponse, error)
- func (s *GRPCServer) ListJobs(ctx context.Context, req *workerpb.ListJobsRequest) (*workerpb.ListJobsResponse, error)
- func (s *GRPCServer) ListQueues(ctx context.Context, req *workerpb.ListQueuesRequest) (*workerpb.ListQueuesResponse, error)
- func (s *GRPCServer) ListScheduleEvents(ctx context.Context, req *workerpb.ListScheduleEventsRequest) (*workerpb.ListScheduleEventsResponse, error)
- func (s *GRPCServer) ListScheduleFactories(ctx context.Context, req *workerpb.ListScheduleFactoriesRequest) (*workerpb.ListScheduleFactoriesResponse, error)
- func (s *GRPCServer) ListSchedules(ctx context.Context, req *workerpb.ListSchedulesRequest) (*workerpb.ListSchedulesResponse, error)
- func (s *GRPCServer) PauseDequeue(ctx context.Context, req *workerpb.PauseDequeueRequest) (*workerpb.PauseDequeueResponse, error)
- func (s *GRPCServer) PauseQueue(ctx context.Context, req *workerpb.PauseQueueRequest) (*workerpb.PauseQueueResponse, error)
- func (s *GRPCServer) PauseSchedule(ctx context.Context, req *workerpb.PauseScheduleRequest) (*workerpb.PauseScheduleResponse, error)
- func (s *GRPCServer) PauseSchedules(ctx context.Context, req *workerpb.PauseSchedulesRequest) (*workerpb.PauseSchedulesResponse, error)
- func (s *GRPCServer) RegisterDurableTasks(ctx context.Context, req *workerpb.RegisterDurableTasksRequest) (*workerpb.RegisterDurableTasksResponse, error)
- func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
- func (s *GRPCServer) ReplayDLQ(ctx context.Context, req *workerpb.ReplayDLQRequest) (*workerpb.ReplayDLQResponse, error)
- func (s *GRPCServer) ReplayDLQByID(ctx context.Context, req *workerpb.ReplayDLQByIDRequest) (*workerpb.ReplayDLQByIDResponse, error)
- func (s *GRPCServer) ResetQueueWeight(ctx context.Context, req *workerpb.ResetQueueWeightRequest) (*workerpb.ResetQueueWeightResponse, error)
- func (s *GRPCServer) ResumeDequeue(ctx context.Context, req *workerpb.ResumeDequeueRequest) (*workerpb.ResumeDequeueResponse, error)
- func (s *GRPCServer) RunJob(ctx context.Context, req *workerpb.RunJobRequest) (*workerpb.RunJobResponse, error)
- func (s *GRPCServer) RunSchedule(ctx context.Context, req *workerpb.RunScheduleRequest) (*workerpb.RunScheduleResponse, error)
- func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, ...) error
- func (s *GRPCServer) UpdateQueueWeight(ctx context.Context, req *workerpb.UpdateQueueWeightRequest) (*workerpb.UpdateQueueWeightResponse, error)
- func (s *GRPCServer) UpsertJob(ctx context.Context, req *workerpb.UpsertJobRequest) (*workerpb.UpsertJobResponse, error)
- type GRPCServerOption
- type HandlerSpec
- type MetricsSnapshot
- type Middleware
- type OTelMetricsOption
- type ProtoDurableCodec
- type QueueConfigurableBackend
- type RedisDurableBackend
- func (b *RedisDurableBackend) Ack(ctx context.Context, lease DurableTaskLease) error
- func (b *RedisDurableBackend) AdminAuditEvents(ctx context.Context, filter AdminAuditEventFilter) (AdminAuditEventPage, error)
- func (*RedisDurableBackend) AdminCreateSchedule(ctx context.Context, _ AdminScheduleSpec) (AdminSchedule, error)
- func (b *RedisDurableBackend) AdminDLQ(ctx context.Context, filter AdminDLQFilter) (AdminDLQPage, error)
- func (b *RedisDurableBackend) AdminDLQEntry(ctx context.Context, id string) (AdminDLQEntryDetail, error)
- func (b *RedisDurableBackend) AdminDeleteJob(ctx context.Context, name string) (bool, error)
- func (*RedisDurableBackend) AdminDeleteSchedule(ctx context.Context, _ string) (bool, error)
- func (b *RedisDurableBackend) AdminJob(ctx context.Context, name string) (AdminJob, error)
- func (b *RedisDurableBackend) AdminJobEvents(ctx context.Context, filter AdminJobEventFilter) (AdminJobEventPage, error)
- func (b *RedisDurableBackend) AdminJobs(ctx context.Context) ([]AdminJob, error)
- func (b *RedisDurableBackend) AdminOverview(ctx context.Context) (AdminOverview, error)
- func (b *RedisDurableBackend) AdminPause(ctx context.Context) error
- func (b *RedisDurableBackend) AdminPauseQueue(ctx context.Context, name string, paused bool) (AdminQueueSummary, error)
- func (*RedisDurableBackend) AdminPauseSchedule(ctx context.Context, _ string, _ bool) (AdminSchedule, error)
- func (*RedisDurableBackend) AdminPauseSchedules(ctx context.Context, _ bool) (int, error)
- func (b *RedisDurableBackend) AdminQueue(ctx context.Context, name string) (AdminQueueSummary, error)
- func (b *RedisDurableBackend) AdminQueues(ctx context.Context) ([]AdminQueueSummary, error)
- func (b *RedisDurableBackend) AdminRecordAuditEvent(ctx context.Context, event AdminAuditEvent, limit int) error
- func (b *RedisDurableBackend) AdminRecordJobEvent(ctx context.Context, event AdminJobEvent, limit int) error
- func (b *RedisDurableBackend) AdminReplayDLQ(ctx context.Context, limit int) (int, error)
- func (b *RedisDurableBackend) AdminReplayDLQByID(ctx context.Context, ids []string) (int, error)
- func (b *RedisDurableBackend) AdminResetQueueWeight(ctx context.Context, name string) (AdminQueueSummary, error)
- func (b *RedisDurableBackend) AdminResume(ctx context.Context) error
- func (*RedisDurableBackend) AdminRunSchedule(ctx context.Context, _ string) (string, error)
- func (*RedisDurableBackend) AdminScheduleEvents(ctx context.Context, _ AdminScheduleEventFilter) (AdminScheduleEventPage, error)
- func (*RedisDurableBackend) AdminScheduleFactories(ctx context.Context) ([]AdminScheduleFactory, error)
- func (*RedisDurableBackend) AdminSchedules(ctx context.Context) ([]AdminSchedule, error)
- func (b *RedisDurableBackend) AdminSetQueueWeight(ctx context.Context, name string, weight int) (AdminQueueSummary, error)
- func (b *RedisDurableBackend) AdminUpsertJob(ctx context.Context, spec AdminJobSpec) (AdminJob, error)
- func (b *RedisDurableBackend) ConfigureQueues(defaultQueue string, weights map[string]int)
- func (b *RedisDurableBackend) Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
- func (b *RedisDurableBackend) Enqueue(ctx context.Context, task DurableTask) error
- func (b *RedisDurableBackend) Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
- func (b *RedisDurableBackend) Fail(ctx context.Context, lease DurableTaskLease, err error) error
- func (b *RedisDurableBackend) Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
- type RedisDurableOption
- func WithRedisAdminJobEventStore(store AdminJobEventStore) RedisDurableOption
- func WithRedisDurableBatchSize(size int) RedisDurableOption
- func WithRedisDurableDefaultQueue(name string) RedisDurableOption
- func WithRedisDurableGlobalRateLimit(rate float64, burst int) RedisDurableOption
- func WithRedisDurableLeaderLock(lease time.Duration) RedisDurableOption
- func WithRedisDurablePrefix(prefix string) RedisDurableOption
- func WithRedisDurableQueueWeights(weights map[string]int) RedisDurableOption
- type Result
- type ResultDropPolicy
- type RetentionPolicy
- type Service
- type Task
- func (task *Task) CancelledAt() time.Time
- func (task *Task) CancelledChan() <-chan struct{}
- func (task *Task) CompletedAt() time.Time
- func (task *Task) Error() error
- func (task *Task) IsValid() (err error)
- func (task *Task) Result() any
- func (task *Task) ShouldSchedule() error
- func (task *Task) StartedAt() time.Time
- func (task *Task) Status() TaskStatus
- type TaskFunc
- type TaskHooks
- type TaskManager
- func (tm *TaskManager) AdminAuditEvents(ctx context.Context, filter AdminAuditEventFilter) (AdminAuditEventPage, error)
- func (tm *TaskManager) AdminCreateSchedule(ctx context.Context, spec AdminScheduleSpec) (AdminSchedule, error)
- func (tm *TaskManager) AdminDLQ(ctx context.Context, filter AdminDLQFilter) (AdminDLQPage, error)
- func (tm *TaskManager) AdminDLQEntry(ctx context.Context, id string) (AdminDLQEntryDetail, error)
- func (tm *TaskManager) AdminDeleteJob(ctx context.Context, name string) (bool, error)
- func (tm *TaskManager) AdminDeleteSchedule(ctx context.Context, name string) (bool, error)
- func (tm *TaskManager) AdminJob(ctx context.Context, name string) (AdminJob, error)
- func (tm *TaskManager) AdminJobEvents(ctx context.Context, filter AdminJobEventFilter) (AdminJobEventPage, error)
- func (tm *TaskManager) AdminJobs(ctx context.Context) ([]AdminJob, error)
- func (tm *TaskManager) AdminOverview(ctx context.Context) (AdminOverview, error)
- func (tm *TaskManager) AdminPause(ctx context.Context) error
- func (tm *TaskManager) AdminPauseQueue(ctx context.Context, name string, paused bool) (AdminQueueSummary, error)
- func (tm *TaskManager) AdminPauseSchedule(ctx context.Context, name string, paused bool) (AdminSchedule, error)
- func (tm *TaskManager) AdminPauseSchedules(ctx context.Context, paused bool) (int, error)
- func (tm *TaskManager) AdminQueue(ctx context.Context, name string) (AdminQueueSummary, error)
- func (tm *TaskManager) AdminQueues(ctx context.Context) ([]AdminQueueSummary, error)
- func (tm *TaskManager) AdminRecordAuditEvent(ctx context.Context, event AdminAuditEvent, _ int) error
- func (tm *TaskManager) AdminReplayDLQ(ctx context.Context, limit int) (int, error)
- func (tm *TaskManager) AdminReplayDLQByID(ctx context.Context, ids []string) (int, error)
- func (tm *TaskManager) AdminResetQueueWeight(ctx context.Context, name string) (AdminQueueSummary, error)
- func (tm *TaskManager) AdminResume(ctx context.Context) error
- func (tm *TaskManager) AdminRunJob(ctx context.Context, name string) (string, error)
- func (tm *TaskManager) AdminRunSchedule(ctx context.Context, name string) (string, error)
- func (tm *TaskManager) AdminScheduleEvents(ctx context.Context, filter AdminScheduleEventFilter) (AdminScheduleEventPage, error)
- func (tm *TaskManager) AdminScheduleFactories(ctx context.Context) ([]AdminScheduleFactory, error)
- func (tm *TaskManager) AdminSchedules(ctx context.Context) ([]AdminSchedule, error)
- func (tm *TaskManager) AdminSetQueueWeight(ctx context.Context, name string, weight int) (AdminQueueSummary, error)
- func (tm *TaskManager) AdminUpsertJob(ctx context.Context, spec AdminJobSpec) (AdminJob, error)
- func (tm *TaskManager) CancelAll()
- func (tm *TaskManager) CancelTask(id uuid.UUID) error
- func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
- func (tm *TaskManager) GetActiveTasks() int
- func (tm *TaskManager) GetMetrics() MetricsSnapshot
- func (tm *TaskManager) GetResults() <-chan Result
- func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
- func (tm *TaskManager) GetTasks() []*Task
- func (tm *TaskManager) IsEmpty() bool
- func (tm *TaskManager) RegisterCronTask(ctx context.Context, name string, spec string, factory CronTaskFactory) error
- func (tm *TaskManager) RegisterDurableCronTask(ctx context.Context, name string, spec string, factory CronDurableFactory) error
- func (tm *TaskManager) RegisterDurableTask(ctx context.Context, task DurableTask) error
- func (tm *TaskManager) RegisterDurableTaskAfter(ctx context.Context, task DurableTask, delay time.Duration) error
- func (tm *TaskManager) RegisterDurableTaskAt(ctx context.Context, task DurableTask, runAt time.Time) error
- func (tm *TaskManager) RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error
- func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
- func (tm *TaskManager) RegisterTaskAfter(ctx context.Context, task *Task, delay time.Duration) error
- func (tm *TaskManager) RegisterTaskAt(ctx context.Context, task *Task, runAt time.Time) error
- func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error
- func (tm *TaskManager) SetHooks(hooks TaskHooks)
- func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
- func (tm *TaskManager) SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error
- func (tm *TaskManager) SetResultsDropPolicy(policy ResultDropPolicy)
- func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)
- func (tm *TaskManager) SetTracer(tracer TaskTracer)
- func (tm *TaskManager) StartWorkers(ctx context.Context)
- func (tm *TaskManager) StopGraceful(ctx context.Context) error
- func (tm *TaskManager) StopNow()
- func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())
- func (tm *TaskManager) SyncJobFactories(ctx context.Context) error
- func (tm *TaskManager) UnregisterCronTask(name string) bool
- func (tm *TaskManager) Wait(ctx context.Context) error
- type TaskManagerOption
- func WithAdminAuditArchiveDir(dir string) TaskManagerOption
- func WithAdminAuditArchiveInterval(interval time.Duration) TaskManagerOption
- func WithAdminAuditEventLimit(limit int) TaskManagerOption
- func WithAdminAuditRetention(ttl time.Duration) TaskManagerOption
- func WithCronLocation(location *time.Location) TaskManagerOption
- func WithDefaultQueue(name string) TaskManagerOption
- func WithDurableBackend(backend DurableBackend) TaskManagerOption
- func WithDurableBatchSize(size int) TaskManagerOption
- func WithDurableCodec(codec DurableCodec) TaskManagerOption
- func WithDurableHandlers(handlers map[string]DurableHandlerSpec) TaskManagerOption
- func WithDurableLease(lease time.Duration) TaskManagerOption
- func WithDurableLeaseRenewalInterval(interval time.Duration) TaskManagerOption
- func WithDurablePollInterval(interval time.Duration) TaskManagerOption
- func WithMaxRetries(n int) TaskManagerOption
- func WithMaxTasks(n int) TaskManagerOption
- func WithMaxWorkers(n int) TaskManagerOption
- func WithQueueWeights(weights map[string]int) TaskManagerOption
- func WithRetryDelay(delay time.Duration) TaskManagerOption
- func WithTasksPerSecond(n float64) TaskManagerOption
- func WithTimeout(timeout time.Duration) TaskManagerOption
- type TaskSpan
- type TaskStatus
- type TaskTracer
- type TypedDurableHandlerSpec
- type TypedDurableRegistry
- type TypedHandlerRegistry
- type TypedHandlerSpec
Constants ¶
const ( // JobSourceGitTag builds a job from a git tag (default). JobSourceGitTag = "git_tag" // JobSourceTarballURL builds a job from an HTTP(S) tarball URL. JobSourceTarballURL = "tarball_url" // JobSourceTarballPath builds a job from a local tarball path. JobSourceTarballPath = "tarball_path" )
const ( // MetadataQueueKey is the metadata key for the queue name. MetadataQueueKey = "queue" // MetadataWeightKey is the metadata key for the task weight. MetadataWeightKey = "weight" )
const ( // ContextDeadlineReached means the context is past its deadline. ContextDeadlineReached = TaskStatus(1) // RateLimited means the number of concurrent tasks per second exceeded the maximum allowed. RateLimited = TaskStatus(2) // Cancelled means `CancelTask` was invoked and the `Task` was cancelled. Cancelled = TaskStatus(3) // Failed means the `Task` failed. Failed = TaskStatus(4) // Queued means the `Task` is queued. Queued = TaskStatus(5) // Running means the `Task` is running. Running = TaskStatus(6) // Invalid means the `Task` is invalid. Invalid = TaskStatus(7) // Completed means the `Task` is completed. Completed = TaskStatus(8) )
TaskStatus values.
const ( // DefaultMaxTasks is the default maximum number of tasks that can be executed at once. DefaultMaxTasks = 10 // DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second. DefaultTasksPerSecond = 5 // DefaultQueueName is the default queue name when none is provided. DefaultQueueName = "default" // DefaultQueueWeight is the default scheduling weight for a queue. DefaultQueueWeight = 1 // DefaultTaskWeight is the default scheduling weight for a task. DefaultTaskWeight = 1 // DefaultTimeout is the default timeout for tasks. DefaultTimeout = 5 * time.Minute // DefaultRetryDelay is the default delay between retries. DefaultRetryDelay = 1 * time.Second // DefaultMaxRetries is the default maximum number of retries. DefaultMaxRetries = 3 // ErrMsgContextDone is the error message used when the context is done. ErrMsgContextDone = "context done" )
const JobHandlerName = "job_runner"
JobHandlerName is the durable handler name for containerized jobs.
Variables ¶
var ( ErrAdminBackendUnavailable = ewrap.New("admin backend unavailable") // ErrAdminUnsupported indicates the backend does not support the admin operation. ErrAdminUnsupported = ewrap.New("admin operation unsupported") // ErrAdminQueueNotFound indicates the queue was not found. ErrAdminQueueNotFound = ewrap.New("admin queue not found") // ErrAdminQueueNameRequired indicates a queue name is required. ErrAdminQueueNameRequired = ewrap.New("admin queue name is required") // ErrAdminQueueWeightInvalid indicates a queue weight is invalid. ErrAdminQueueWeightInvalid = ewrap.New("admin queue weight is invalid") // ErrAdminQueuePauseUnsupported indicates pausing queues is not supported. ErrAdminQueuePauseUnsupported = ewrap.New("admin queue pause unsupported") // ErrAdminDLQFilterTooLarge indicates the DLQ is too large for filtered queries. ErrAdminDLQFilterTooLarge = ewrap.New("DLQ too large for filtered query") // ErrAdminDLQEntryIDRequired indicates a DLQ id is required. ErrAdminDLQEntryIDRequired = ewrap.New("admin DLQ id is required") // ErrAdminDLQEntryNotFound indicates the DLQ entry was not found. ErrAdminDLQEntryNotFound = ewrap.New("admin DLQ entry not found") // ErrAdminReplayIDsRequired indicates replay-by-id requires at least one id. ErrAdminReplayIDsRequired = ewrap.New("admin replay ids are required") // ErrAdminReplayIDsTooLarge indicates too many ids were provided. ErrAdminReplayIDsTooLarge = ewrap.New("admin replay ids limit exceeded") // ErrAdminReplayLimitExceeded indicates replay limit exceeds policy. ErrAdminReplayLimitExceeded = ewrap.New("admin replay limit exceeds policy") // ErrAdminScheduleRunRateLimited indicates schedule run cap was exceeded. ErrAdminScheduleRunRateLimited = ewrap.New("admin schedule run rate limit exceeded") // ErrAdminApprovalRequired indicates a policy approval token is required. ErrAdminApprovalRequired = ewrap.New("admin approval token is required") // ErrAdminApprovalInvalid indicates a policy approval token is invalid. ErrAdminApprovalInvalid = ewrap.New("admin approval token is invalid") // ErrAdminScheduleNameRequired indicates a schedule name is required. ErrAdminScheduleNameRequired = ewrap.New("admin schedule name is required") // ErrAdminScheduleSpecRequired indicates a schedule spec is required. ErrAdminScheduleSpecRequired = ewrap.New("admin schedule spec is required") // ErrAdminScheduleNotFound indicates the schedule was not found. ErrAdminScheduleNotFound = ewrap.New("admin schedule not found") // ErrAdminScheduleFactoryMissing indicates a factory was not registered. ErrAdminScheduleFactoryMissing = ewrap.New("admin schedule factory missing") // ErrAdminScheduleDurableMismatch indicates durable flag mismatched. ErrAdminScheduleDurableMismatch = ewrap.New("admin schedule durable mismatch") // ErrAdminJobNameRequired indicates a job name is required. ErrAdminJobNameRequired = ewrap.New("admin job name is required") // ErrAdminJobRepoRequired indicates a job repo is required. ErrAdminJobRepoRequired = ewrap.New("admin job repo is required") // ErrAdminJobTagRequired indicates a job tag is required. ErrAdminJobTagRequired = ewrap.New("admin job tag is required") // ErrAdminJobSourceInvalid indicates the job source is invalid. ErrAdminJobSourceInvalid = ewrap.New("admin job source is invalid") // ErrAdminJobTarballURLRequired indicates a tarball URL is required. ErrAdminJobTarballURLRequired = ewrap.New("admin job tarball url is required") // ErrAdminJobTarballPathRequired indicates a tarball path is required. ErrAdminJobTarballPathRequired = ewrap.New("admin job tarball path is required") // ErrAdminJobTarballSHAInvalid indicates tarball SHA256 is invalid. ErrAdminJobTarballSHAInvalid = ewrap.New("admin job tarball sha256 is invalid") // ErrAdminJobCommandRequired indicates a job command is required. ErrAdminJobCommandRequired = ewrap.New("admin job command is required") // ErrAdminJobNotFound indicates the job was not found. ErrAdminJobNotFound = ewrap.New("admin job not found") )
var ( // ErrInvalidTaskID is returned when a task has an invalid ID. ErrInvalidTaskID = ewrap.New("invalid task id") // ErrInvalidTaskFunc is returned when a task has an invalid function. ErrInvalidTaskFunc = ewrap.New("invalid task function") // ErrInvalidTaskContext is returned when a task has an invalid context. ErrInvalidTaskContext = ewrap.New("invalid task context") // ErrTaskNotFound is returned when a task is not found. ErrTaskNotFound = ewrap.New("task not found") // ErrTaskTimeout is returned when a task times out. ErrTaskTimeout = ewrap.New("task timeout") // ErrTaskCancelled is returned when a task is cancelled. ErrTaskCancelled = ewrap.New("task cancelled") // ErrTaskAlreadyStarted is returned when a task is already started. ErrTaskAlreadyStarted = ewrap.New("task already started") // ErrTaskCompleted is returned when a task is already completed. ErrTaskCompleted = ewrap.New("task completed") // ErrDurableLeaseNotFound is returned when a durable lease cannot be renewed. ErrDurableLeaseNotFound = ewrap.New("durable lease not found") )
Errors returned by the TaskManager.
var ( // ErrHandlerNameRequired indicates a missing handler name during registration. ErrHandlerNameRequired = ewrap.New("handler name is required") // ErrHandlerSpecInvalid indicates an invalid handler spec during registration. ErrHandlerSpecInvalid = ewrap.New("handler spec is invalid") // ErrHandlerAlreadyRegistered indicates a duplicate handler name during registration. ErrHandlerAlreadyRegistered = ewrap.New("handler already registered") // ErrHandlerPayloadTypeMismatch indicates a payload type mismatch during handler invocation. ErrHandlerPayloadTypeMismatch = ewrap.New("handler payload type mismatch") // ErrHandlerRegistryNil indicates a nil handler registry during registration. ErrHandlerRegistryNil = ewrap.New("handler registry is nil") )
Functions ¶
func AddTypedDurableHandler ¶ added in v0.1.9
func AddTypedDurableHandler[T proto.Message](r *TypedDurableRegistry, name string, spec TypedDurableHandlerSpec[T]) error
AddTypedDurableHandler registers a typed durable handler spec into the registry.
func AddTypedHandler ¶ added in v0.1.9
func AddTypedHandler[T protoreflect.ProtoMessage](r *TypedHandlerRegistry, name string, spec TypedHandlerSpec[T]) error
AddTypedHandler registers a typed handler spec into the registry.
func IsTarballSource ¶ added in v0.2.2
IsTarballSource returns true when the job source expects a tarball.
func JobPayloadFromSpec ¶ added in v0.2.2
func JobPayloadFromSpec(spec AdminJobSpec) (*structpb.Struct, error)
JobPayloadFromSpec converts a job spec into a protobuf Struct payload.
func LoadAdminMTLSConfig ¶ added in v0.2.2
LoadAdminMTLSConfig loads a server TLS config that enforces mTLS.
func NewAdminGatewayServer ¶ added in v0.2.2
func NewAdminGatewayServer(cfg AdminGatewayConfig) (*http.Server, error)
NewAdminGatewayServer builds an HTTPS gateway that proxies to AdminService.
func NormalizeJobSource ¶ added in v0.2.2
NormalizeJobSource returns a valid job source, defaulting to git tag.
func RegisterMiddleware ¶
func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T
RegisterMiddleware registers middlewares to the provided service.
Types ¶
type AdminActionCounters ¶ added in v0.2.2
AdminActionCounters tracks admin action counts.
type AdminAuditEvent ¶ added in v0.2.2
type AdminAuditEvent struct {
At time.Time `json:"at"`
Actor string `json:"actor"`
RequestID string `json:"request_id"`
Action string `json:"action"`
Target string `json:"target"`
Status string `json:"status"`
PayloadHash string `json:"payload_hash"`
Detail string `json:"detail"`
Metadata map[string]string `json:"metadata,omitempty"`
}
AdminAuditEvent describes an admin mutation audit record.
type AdminAuditEventFilter ¶ added in v0.2.2
AdminAuditEventFilter filters admin audit records.
type AdminAuditEventPage ¶ added in v0.2.2
type AdminAuditEventPage struct {
Events []AdminAuditEvent
}
AdminAuditEventPage represents admin audit records.
type AdminBackend ¶ added in v0.2.2
type AdminBackend interface {
AdminOverview(ctx context.Context) (AdminOverview, error)
// contains filtered or unexported methods
}
AdminBackend provides admin data and actions for a backend.
type AdminCoordination ¶ added in v0.2.2
AdminCoordination describes coordination state for durable dequeue.
type AdminDLQEntry ¶ added in v0.2.2
AdminDLQEntry represents a DLQ entry.
type AdminDLQEntryDetail ¶ added in v0.2.2
type AdminDLQEntryDetail struct {
ID string
Queue string
Handler string
Attempts int
AgeMs int64
FailedAtMs int64
UpdatedAtMs int64
LastError string
PayloadSize int64
Metadata map[string]string
}
AdminDLQEntryDetail represents a detailed DLQ entry.
type AdminDLQFilter ¶ added in v0.2.2
AdminDLQFilter controls DLQ listing.
type AdminDLQPage ¶ added in v0.2.2
type AdminDLQPage struct {
Entries []AdminDLQEntry
Total int64
}
AdminDLQPage represents a page of DLQ entries.
type AdminGatewayConfig ¶ added in v0.2.2
type AdminGatewayConfig struct {
GRPCAddr string
HTTPAddr string
// GRPCTLS is optional; when nil, the gateway dials gRPC without TLS.
GRPCTLS *tls.Config
// TLSCertFile, TLSKeyFile, and TLSCAFile are required for mTLS.
TLSCertFile string
TLSKeyFile string
TLSCAFile string
JobTarballDir string
AuditExportMax int
ReadTimeout time.Duration
WriteTimeout time.Duration
Observability *AdminObservability
}
AdminGatewayConfig configures the admin HTTP gateway.
type AdminGuardrails ¶ added in v0.2.2
type AdminGuardrails struct {
ReplayLimitMax int
ReplayIDsMax int
ScheduleRunMax int
ScheduleRunWindow time.Duration
RequireApproval bool
ApprovalToken string
}
AdminGuardrails configures safety limits for high-impact admin operations.
type AdminJob ¶ added in v0.2.2
type AdminJob struct {
AdminJobSpec
CreatedAt time.Time
UpdatedAt time.Time
}
AdminJob represents a persisted job definition.
type AdminJobEvent ¶ added in v0.2.2
type AdminJobEvent struct {
TaskID string `json:"task_id"`
Name string `json:"name"`
Status string `json:"status"`
Queue string `json:"queue"`
Repo string `json:"repo"`
Tag string `json:"tag"`
Path string `json:"path"`
Dockerfile string `json:"dockerfile"`
Command string `json:"command"`
ScheduleName string `json:"schedule_name"`
ScheduleSpec string `json:"schedule_spec"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
DurationMs int64 `json:"duration_ms"`
Result string `json:"result"`
Error string `json:"error"`
Metadata map[string]string `json:"metadata"`
}
AdminJobEvent describes a containerized job execution event.
type AdminJobEventFilter ¶ added in v0.2.2
AdminJobEventFilter filters job execution events.
type AdminJobEventPage ¶ added in v0.2.2
type AdminJobEventPage struct {
Events []AdminJobEvent
}
AdminJobEventPage represents job execution events.
type AdminJobEventStore ¶ added in v0.2.2
type AdminJobEventStore interface {
Record(ctx context.Context, event AdminJobEvent) error
List(ctx context.Context, filter AdminJobEventFilter) (AdminJobEventPage, error)
}
AdminJobEventStore persists and lists admin job events.
type AdminJobSpec ¶ added in v0.2.2
type AdminJobSpec struct {
Name string
Description string
Repo string
Tag string
Source string
TarballURL string
TarballPath string
TarballSHA string
Path string
Dockerfile string
Command []string
Env []string
Queue string
Retries int
Timeout time.Duration
}
AdminJobSpec defines a job configuration for containerized execution.
func JobSpecFromPayload ¶ added in v0.2.2
func JobSpecFromPayload(payload *structpb.Struct) (AdminJobSpec, error)
JobSpecFromPayload decodes a protobuf Struct payload into a job spec.
type AdminObservability ¶ added in v0.2.2
type AdminObservability struct {
// contains filtered or unexported fields
}
AdminObservability collects lightweight admin service metrics.
func NewAdminObservability ¶ added in v0.2.2
func NewAdminObservability() *AdminObservability
NewAdminObservability creates an in-memory admin metrics collector.
func (*AdminObservability) Middleware ¶ added in v0.2.2
func (collector *AdminObservability) Middleware(next http.Handler) http.Handler
Middleware records per-route HTTP latency and status metrics.
func (*AdminObservability) Prometheus ¶ added in v0.2.3
func (collector *AdminObservability) Prometheus() string
Prometheus renders the current observability snapshot in Prometheus text format.
func (*AdminObservability) RecordGRPC ¶ added in v0.2.2
func (collector *AdminObservability) RecordGRPC(method string, code codes.Code, duration time.Duration)
RecordGRPC records gRPC method metrics.
func (*AdminObservability) RecordHTTP ¶ added in v0.2.2
func (collector *AdminObservability) RecordHTTP(method, route string, statusCode int, duration time.Duration)
RecordHTTP records HTTP route metrics.
func (*AdminObservability) RecordJobOutcome ¶ added in v0.2.2
func (collector *AdminObservability) RecordJobOutcome(statusCode string, duration time.Duration)
RecordJobOutcome records a final job status and duration.
func (*AdminObservability) RecordJobQueued ¶ added in v0.2.2
func (collector *AdminObservability) RecordJobQueued()
RecordJobQueued increments running gauge when a job run starts.
func (*AdminObservability) Snapshot ¶ added in v0.2.2
func (collector *AdminObservability) Snapshot() AdminObservabilitySnapshot
Snapshot returns a consistent view of collected metrics.
func (*AdminObservability) UnaryServerInterceptor ¶ added in v0.2.2
func (collector *AdminObservability) UnaryServerInterceptor() grpc.UnaryServerInterceptor
UnaryServerInterceptor records gRPC latency and status metrics.
type AdminObservabilityJobStat ¶ added in v0.2.2
type AdminObservabilityJobStat struct {
Running int64 `json:"running"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
TotalMs int64 `json:"totalMs"`
MaxMs int64 `json:"maxMs"`
AvgMs float64 `json:"avgMs"`
LastCode string `json:"lastCode"`
LastUpdated time.Time `json:"lastUpdated"`
}
AdminObservabilityJobStat describes aggregate job-runner outcomes.
type AdminObservabilityMethodStat ¶ added in v0.2.2
type AdminObservabilityMethodStat struct {
Calls int64 `json:"calls"`
Errors int64 `json:"errors"`
TotalMs int64 `json:"totalMs"`
MaxMs int64 `json:"maxMs"`
AvgMs float64 `json:"avgMs"`
LastCode string `json:"lastCode"`
LastUpdated time.Time `json:"lastUpdated"`
}
AdminObservabilityMethodStat describes aggregate metrics for an HTTP/gRPC method.
type AdminObservabilitySnapshot ¶ added in v0.2.2
type AdminObservabilitySnapshot struct {
StartedAt time.Time `json:"startedAt"`
UptimeSec int64 `json:"uptimeSec"`
HTTP map[string]AdminObservabilityMethodStat `json:"http"`
GRPC map[string]AdminObservabilityMethodStat `json:"grpc"`
Jobs AdminObservabilityJobStat `json:"jobs"`
}
AdminObservabilitySnapshot is a serializable view of admin metrics.
type AdminOverview ¶ added in v0.2.2
type AdminOverview struct {
ActiveWorkers int
QueuedTasks int64
Queues int
AvgLatencyMs int64
P95LatencyMs int64
Coordination AdminCoordination
Actions AdminActionCounters
}
AdminOverview describes the admin overview snapshot.
type AdminQueueSummary ¶ added in v0.2.2
type AdminQueueSummary struct {
Name string
Ready int64
Processing int64
Dead int64
Weight int
Paused bool
}
AdminQueueSummary represents queue counts and weights.
type AdminSchedule ¶ added in v0.2.2
type AdminSchedule struct {
Name string
Spec string
NextRun time.Time
LastRun time.Time
Durable bool
Paused bool
}
AdminSchedule represents a cron schedule entry.
type AdminScheduleEvent ¶ added in v0.2.2
type AdminScheduleEvent struct {
TaskID string
Name string
Spec string
Durable bool
Status string
Queue string
StartedAt time.Time
FinishedAt time.Time
DurationMs int64
Result string
Error string
Metadata map[string]string
}
AdminScheduleEvent describes a cron schedule execution event.
type AdminScheduleEventFilter ¶ added in v0.2.2
AdminScheduleEventFilter filters schedule events.
type AdminScheduleEventPage ¶ added in v0.2.2
type AdminScheduleEventPage struct {
Events []AdminScheduleEvent
}
AdminScheduleEventPage represents schedule events.
type AdminScheduleFactory ¶ added in v0.2.2
AdminScheduleFactory describes a registered schedule factory.
type AdminScheduleSpec ¶ added in v0.2.2
AdminScheduleSpec defines a schedule request.
type CronDurableFactory ¶ added in v0.2.1
type CronDurableFactory func(ctx context.Context) (DurableTask, error)
CronDurableFactory builds a durable task when a cron schedule fires.
type CronTaskFactory ¶ added in v0.2.1
CronTaskFactory builds a task when a cron schedule fires.
type DurableBackend ¶ added in v0.1.8
type DurableBackend interface {
Enqueue(ctx context.Context, task DurableTask) error
Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
Ack(ctx context.Context, lease DurableTaskLease) error
Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
Fail(ctx context.Context, lease DurableTaskLease, err error) error
Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
}
DurableBackend provides persistence and leasing for durable tasks.
type DurableCodec ¶ added in v0.1.8
type DurableCodec interface {
Marshal(msg proto.Message) ([]byte, error)
Unmarshal(data []byte, msg proto.Message) error
}
DurableCodec marshals and unmarshals durable task payloads.
type DurableHandlerSpec ¶ added in v0.1.8
type DurableHandlerSpec struct {
Make func() proto.Message
Fn func(ctx context.Context, payload proto.Message) (any, error)
}
DurableHandlerSpec describes a durable task handler.
func TypedDurableHandler ¶ added in v0.1.9
func TypedDurableHandler[T proto.Message](spec TypedDurableHandlerSpec[T]) DurableHandlerSpec
TypedDurableHandler converts a typed spec into the untyped DurableHandlerSpec.
type DurableTask ¶ added in v0.1.8
type DurableTask struct {
ID uuid.UUID
Handler string
Message proto.Message
Payload []byte
Priority int
RunAt time.Time
Queue string
Weight int
Retries int
RetryDelay time.Duration
Metadata map[string]string
}
DurableTask represents a task that can be persisted and rehydrated.
type DurableTaskLease ¶ added in v0.1.8
type DurableTaskLease struct {
Task DurableTask
LeaseID string
Attempts int
MaxRetries int
}
DurableTaskLease represents a leased durable task.
type FileJobEventStore ¶ added in v0.2.2
type FileJobEventStore struct {
// contains filtered or unexported fields
}
FileJobEventStore stores job events as JSON files on disk.
func NewFileJobEventStore ¶ added in v0.2.2
func NewFileJobEventStore(dir string, opts ...FileJobEventStoreOption) (*FileJobEventStore, error)
NewFileJobEventStore creates a file-backed job event store.
func (*FileJobEventStore) List ¶ added in v0.2.2
func (s *FileJobEventStore) List(ctx context.Context, filter AdminJobEventFilter) (AdminJobEventPage, error)
List returns recent job events.
func (*FileJobEventStore) Record ¶ added in v0.2.2
func (s *FileJobEventStore) Record(ctx context.Context, event AdminJobEvent) error
Record appends a job event record.
type FileJobEventStoreOption ¶ added in v0.2.2
type FileJobEventStoreOption func(*FileJobEventStore)
FileJobEventStoreOption configures FileJobEventStore behavior.
func WithJobEventStoreCacheTTL ¶ added in v0.2.2
func WithJobEventStoreCacheTTL(ttl time.Duration) FileJobEventStoreOption
WithJobEventStoreCacheTTL configures cache TTL for file-backed job events.
func WithJobEventStoreMaxEntries ¶ added in v0.2.2
func WithJobEventStoreMaxEntries(maxEntries int) FileJobEventStoreOption
WithJobEventStoreMaxEntries configures max entries retained per key (0 = unbounded).
type GRPCAuthFunc ¶ added in v0.1.6
GRPCAuthFunc authorizes a gRPC request before handling it. Return a gRPC status error to control the response code.
type GRPCServer ¶ added in v0.1.1
type GRPCServer struct {
// contains filtered or unexported fields
}
GRPCServer implements the generated WorkerServiceServer interface.
func NewGRPCServer ¶ added in v0.1.1
func NewGRPCServer(svc Service, handlers map[string]HandlerSpec, opts ...GRPCServerOption) *GRPCServer
NewGRPCServer creates a new gRPC server backed by the provided Service.
func (*GRPCServer) CancelTask ¶ added in v0.1.1
func (s *GRPCServer) CancelTask(ctx context.Context, req *workerpb.CancelTaskRequest) (*workerpb.CancelTaskResponse, error)
CancelTask cancels an active task by its ID.
func (*GRPCServer) CreateSchedule ¶ added in v0.2.2
func (s *GRPCServer) CreateSchedule(ctx context.Context, req *workerpb.CreateScheduleRequest) (*workerpb.CreateScheduleResponse, error)
CreateSchedule registers or updates a cron schedule.
func (*GRPCServer) DeleteJob ¶ added in v0.2.2
func (s *GRPCServer) DeleteJob(ctx context.Context, req *workerpb.DeleteJobRequest) (*workerpb.DeleteJobResponse, error)
DeleteJob removes a job definition.
func (*GRPCServer) DeleteSchedule ¶ added in v0.2.2
func (s *GRPCServer) DeleteSchedule(ctx context.Context, req *workerpb.DeleteScheduleRequest) (*workerpb.DeleteScheduleResponse, error)
DeleteSchedule removes a cron schedule.
func (*GRPCServer) GetDLQEntry ¶ added in v0.2.2
func (s *GRPCServer) GetDLQEntry(ctx context.Context, req *workerpb.GetDLQEntryRequest) (*workerpb.GetDLQEntryResponse, error)
GetDLQEntry returns a detailed DLQ entry by id.
func (*GRPCServer) GetHealth ¶ added in v0.2.2
func (s *GRPCServer) GetHealth(ctx context.Context, req *workerpb.GetHealthRequest) (*workerpb.GetHealthResponse, error)
GetHealth returns build and runtime info for the admin service.
func (*GRPCServer) GetJob ¶ added in v0.2.2
func (s *GRPCServer) GetJob(ctx context.Context, req *workerpb.GetJobRequest) (*workerpb.GetJobResponse, error)
GetJob returns a job definition by name.
func (*GRPCServer) GetOverview ¶ added in v0.2.2
func (s *GRPCServer) GetOverview(ctx context.Context, req *workerpb.GetOverviewRequest) (*workerpb.GetOverviewResponse, error)
GetOverview returns the admin overview snapshot.
func (*GRPCServer) GetQueue ¶ added in v0.2.2
func (s *GRPCServer) GetQueue(ctx context.Context, req *workerpb.GetQueueRequest) (*workerpb.GetQueueResponse, error)
GetQueue returns a queue summary by name.
func (*GRPCServer) GetTask ¶ added in v0.1.1
func (s *GRPCServer) GetTask(ctx context.Context, req *workerpb.GetTaskRequest) (*workerpb.GetTaskResponse, error)
GetTask returns information about a task by its ID.
func (*GRPCServer) ListAuditEvents ¶ added in v0.2.2
func (s *GRPCServer) ListAuditEvents( ctx context.Context, req *workerpb.ListAuditEventsRequest, ) (*workerpb.ListAuditEventsResponse, error)
ListAuditEvents returns recent admin mutation audit records.
func (*GRPCServer) ListDLQ ¶ added in v0.2.2
func (s *GRPCServer) ListDLQ(ctx context.Context, req *workerpb.ListDLQRequest) (*workerpb.ListDLQResponse, error)
ListDLQ returns DLQ entries.
func (*GRPCServer) ListJobEvents ¶ added in v0.2.2
func (s *GRPCServer) ListJobEvents( ctx context.Context, req *workerpb.ListJobEventsRequest, ) (*workerpb.ListJobEventsResponse, error)
ListJobEvents returns recent job execution events.
func (*GRPCServer) ListJobs ¶ added in v0.2.2
func (s *GRPCServer) ListJobs(ctx context.Context, req *workerpb.ListJobsRequest) (*workerpb.ListJobsResponse, error)
ListJobs returns admin job definitions.
func (*GRPCServer) ListQueues ¶ added in v0.2.2
func (s *GRPCServer) ListQueues(ctx context.Context, req *workerpb.ListQueuesRequest) (*workerpb.ListQueuesResponse, error)
ListQueues returns queue summaries.
func (*GRPCServer) ListScheduleEvents ¶ added in v0.2.2
func (s *GRPCServer) ListScheduleEvents( ctx context.Context, req *workerpb.ListScheduleEventsRequest, ) (*workerpb.ListScheduleEventsResponse, error)
ListScheduleEvents returns recent cron schedule execution events.
func (*GRPCServer) ListScheduleFactories ¶ added in v0.2.2
func (s *GRPCServer) ListScheduleFactories( ctx context.Context, req *workerpb.ListScheduleFactoriesRequest, ) (*workerpb.ListScheduleFactoriesResponse, error)
ListScheduleFactories returns registered schedule factories.
func (*GRPCServer) ListSchedules ¶ added in v0.2.2
func (s *GRPCServer) ListSchedules(ctx context.Context, req *workerpb.ListSchedulesRequest) (*workerpb.ListSchedulesResponse, error)
ListSchedules returns cron schedule summaries.
func (*GRPCServer) PauseDequeue ¶ added in v0.2.2
func (s *GRPCServer) PauseDequeue(ctx context.Context, req *workerpb.PauseDequeueRequest) (*workerpb.PauseDequeueResponse, error)
PauseDequeue pauses durable dequeue.
func (*GRPCServer) PauseQueue ¶ added in v0.2.2
func (s *GRPCServer) PauseQueue(ctx context.Context, req *workerpb.PauseQueueRequest) (*workerpb.PauseQueueResponse, error)
PauseQueue pauses or resumes a queue.
func (*GRPCServer) PauseSchedule ¶ added in v0.2.2
func (s *GRPCServer) PauseSchedule(ctx context.Context, req *workerpb.PauseScheduleRequest) (*workerpb.PauseScheduleResponse, error)
PauseSchedule pauses or resumes a cron schedule.
func (*GRPCServer) PauseSchedules ¶ added in v0.2.2
func (s *GRPCServer) PauseSchedules( ctx context.Context, req *workerpb.PauseSchedulesRequest, ) (*workerpb.PauseSchedulesResponse, error)
PauseSchedules pauses or resumes all cron schedules.
func (*GRPCServer) RegisterDurableTasks ¶ added in v0.1.8
func (s *GRPCServer) RegisterDurableTasks( ctx context.Context, req *workerpb.RegisterDurableTasksRequest, ) (*workerpb.RegisterDurableTasksResponse, error)
RegisterDurableTasks registers one or more durable tasks with the underlying service.
func (*GRPCServer) RegisterTasks ¶ added in v0.1.1
func (s *GRPCServer) RegisterTasks(ctx context.Context, req *workerpb.RegisterTasksRequest) (*workerpb.RegisterTasksResponse, error)
RegisterTasks registers one or more tasks with the underlying service.
func (*GRPCServer) ReplayDLQ ¶ added in v0.2.2
func (s *GRPCServer) ReplayDLQ(ctx context.Context, req *workerpb.ReplayDLQRequest) (*workerpb.ReplayDLQResponse, error)
ReplayDLQ replays DLQ entries.
func (*GRPCServer) ReplayDLQByID ¶ added in v0.2.2
func (s *GRPCServer) ReplayDLQByID(ctx context.Context, req *workerpb.ReplayDLQByIDRequest) (*workerpb.ReplayDLQByIDResponse, error)
ReplayDLQByID replays DLQ entries by ID.
func (*GRPCServer) ResetQueueWeight ¶ added in v0.2.2
func (s *GRPCServer) ResetQueueWeight( ctx context.Context, req *workerpb.ResetQueueWeightRequest, ) (*workerpb.ResetQueueWeightResponse, error)
ResetQueueWeight resets a queue weight to default.
func (*GRPCServer) ResumeDequeue ¶ added in v0.2.2
func (s *GRPCServer) ResumeDequeue(ctx context.Context, req *workerpb.ResumeDequeueRequest) (*workerpb.ResumeDequeueResponse, error)
ResumeDequeue resumes durable dequeue.
func (*GRPCServer) RunJob ¶ added in v0.2.2
func (s *GRPCServer) RunJob(ctx context.Context, req *workerpb.RunJobRequest) (*workerpb.RunJobResponse, error)
RunJob enqueues a job immediately.
func (*GRPCServer) RunSchedule ¶ added in v0.2.2
func (s *GRPCServer) RunSchedule(ctx context.Context, req *workerpb.RunScheduleRequest) (*workerpb.RunScheduleResponse, error)
RunSchedule triggers a cron schedule immediately.
func (*GRPCServer) StreamResults ¶ added in v0.1.1
func (s *GRPCServer) StreamResults(req *workerpb.StreamResultsRequest, stream workerpb.WorkerService_StreamResultsServer) error
StreamResults streams task results back to the client.
func (*GRPCServer) UpdateQueueWeight ¶ added in v0.2.2
func (s *GRPCServer) UpdateQueueWeight( ctx context.Context, req *workerpb.UpdateQueueWeightRequest, ) (*workerpb.UpdateQueueWeightResponse, error)
UpdateQueueWeight updates a queue weight.
func (*GRPCServer) UpsertJob ¶ added in v0.2.2
func (s *GRPCServer) UpsertJob(ctx context.Context, req *workerpb.UpsertJobRequest) (*workerpb.UpsertJobResponse, error)
UpsertJob creates or updates a job definition.
type GRPCServerOption ¶ added in v0.1.6
type GRPCServerOption func(*GRPCServer)
GRPCServerOption configures the gRPC server.
func WithAdminBackend ¶ added in v0.2.2
func WithAdminBackend(backend AdminBackend) GRPCServerOption
WithAdminBackend sets the admin backend for AdminService.
func WithAdminGuardrails ¶ added in v0.2.2
func WithAdminGuardrails(guardrails AdminGuardrails) GRPCServerOption
WithAdminGuardrails configures safety limits for admin mutations.
func WithGRPCAuth ¶ added in v0.1.6
func WithGRPCAuth(auth GRPCAuthFunc) GRPCServerOption
WithGRPCAuth installs an authorization hook for gRPC requests.
type HandlerSpec ¶ added in v0.1.1
type HandlerSpec struct {
// Make returns a zero value of the payload message to unmarshal into.
Make func() protoreflect.ProtoMessage
// Fn does the work. Your Task.Execute will call this.
Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}
HandlerSpec describes a single handler for a gRPC method.
func TypedHandler ¶ added in v0.1.9
func TypedHandler[T protoreflect.ProtoMessage](spec TypedHandlerSpec[T]) HandlerSpec
TypedHandler converts a typed spec into the untyped HandlerSpec.
type MetricsSnapshot ¶ added in v0.1.0
type MetricsSnapshot struct {
Scheduled int64
Running int64
Completed int64
Failed int64
Cancelled int64
Retried int64
ResultsDropped int64
QueueDepth int
TaskLatencyCount int64
TaskLatencyTotal time.Duration
TaskLatencyMax time.Duration
}
MetricsSnapshot represents a snapshot of task metrics.
type OTelMetricsOption ¶ added in v0.1.6
type OTelMetricsOption func(*otelMetricsConfig)
OTelMetricsOption configures OpenTelemetry metrics.
func WithOTelMeterName ¶ added in v0.1.6
func WithOTelMeterName(name string) OTelMetricsOption
WithOTelMeterName overrides the default OTel meter name.
func WithOTelMeterVersion ¶ added in v0.1.6
func WithOTelMeterVersion(version string) OTelMetricsOption
WithOTelMeterVersion sets the instrumentation version reported by the meter.
type ProtoDurableCodec ¶ added in v0.1.8
type ProtoDurableCodec struct{}
ProtoDurableCodec uses protobuf for serialization.
type QueueConfigurableBackend ¶ added in v0.1.9
type QueueConfigurableBackend interface {
ConfigureQueues(defaultQueue string, weights map[string]int)
}
QueueConfigurableBackend allows TaskManager to propagate queue configuration to backends.
type RedisDurableBackend ¶ added in v0.1.8
type RedisDurableBackend struct {
// contains filtered or unexported fields
}
RedisDurableBackend implements a durable task backend using Redis.
func NewRedisDurableBackend ¶ added in v0.1.8
func NewRedisDurableBackend(client rueidis.Client, opts ...RedisDurableOption) (*RedisDurableBackend, error)
NewRedisDurableBackend creates a new RedisDurableBackend with the given Redis client and options.
func (*RedisDurableBackend) Ack ¶ added in v0.1.8
func (b *RedisDurableBackend) Ack(ctx context.Context, lease DurableTaskLease) error
Ack acknowledges the successful processing of a durable task.
func (*RedisDurableBackend) AdminAuditEvents ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminAuditEvents(ctx context.Context, filter AdminAuditEventFilter) (AdminAuditEventPage, error)
AdminAuditEvents returns recent admin mutation audit events.
func (*RedisDurableBackend) AdminCreateSchedule ¶ added in v0.2.2
func (*RedisDurableBackend) AdminCreateSchedule(ctx context.Context, _ AdminScheduleSpec) (AdminSchedule, error)
AdminCreateSchedule is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminDLQ ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminDLQ(ctx context.Context, filter AdminDLQFilter) (AdminDLQPage, error)
AdminDLQ returns entries from the dead letter queue.
func (*RedisDurableBackend) AdminDLQEntry ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminDLQEntry(ctx context.Context, id string) (AdminDLQEntryDetail, error)
AdminDLQEntry returns detailed DLQ entry information.
func (*RedisDurableBackend) AdminDeleteJob ¶ added in v0.2.2
AdminDeleteJob removes a job definition.
func (*RedisDurableBackend) AdminDeleteSchedule ¶ added in v0.2.2
AdminDeleteSchedule is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminJobEvents ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminJobEvents(ctx context.Context, filter AdminJobEventFilter) (AdminJobEventPage, error)
AdminJobEvents returns job execution events from Redis.
func (*RedisDurableBackend) AdminJobs ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminJobs(ctx context.Context) ([]AdminJob, error)
AdminJobs lists registered job definitions.
func (*RedisDurableBackend) AdminOverview ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminOverview(ctx context.Context) (AdminOverview, error)
AdminOverview retrieves an overview of the durable backend status.
func (*RedisDurableBackend) AdminPause ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminPause(ctx context.Context) error
AdminPause pauses dequeueing of tasks.
func (*RedisDurableBackend) AdminPauseQueue ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminPauseQueue(ctx context.Context, name string, paused bool) (AdminQueueSummary, error)
AdminPauseQueue pauses or resumes a specific queue.
func (*RedisDurableBackend) AdminPauseSchedule ¶ added in v0.2.2
func (*RedisDurableBackend) AdminPauseSchedule(ctx context.Context, _ string, _ bool) (AdminSchedule, error)
AdminPauseSchedule is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminPauseSchedules ¶ added in v0.2.2
AdminPauseSchedules is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminQueue ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminQueue(ctx context.Context, name string) (AdminQueueSummary, error)
AdminQueue returns a summary for a single queue.
func (*RedisDurableBackend) AdminQueues ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminQueues(ctx context.Context) ([]AdminQueueSummary, error)
AdminQueues returns summaries for all queues.
func (*RedisDurableBackend) AdminRecordAuditEvent ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminRecordAuditEvent(ctx context.Context, event AdminAuditEvent, limit int) error
AdminRecordAuditEvent persists an admin mutation audit event.
func (*RedisDurableBackend) AdminRecordJobEvent ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminRecordJobEvent(ctx context.Context, event AdminJobEvent, limit int) error
AdminRecordJobEvent persists a job execution event.
func (*RedisDurableBackend) AdminReplayDLQ ¶ added in v0.2.2
AdminReplayDLQ replays tasks from the dead letter queue back to their respective ready queues.
func (*RedisDurableBackend) AdminReplayDLQByID ¶ added in v0.2.2
AdminReplayDLQByID replays specific DLQ entries by ID.
func (*RedisDurableBackend) AdminResetQueueWeight ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminResetQueueWeight(ctx context.Context, name string) (AdminQueueSummary, error)
AdminResetQueueWeight resets a queue weight to the default.
func (*RedisDurableBackend) AdminResume ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminResume(ctx context.Context) error
AdminResume resumes dequeueing of tasks.
func (*RedisDurableBackend) AdminRunSchedule ¶ added in v0.2.2
AdminRunSchedule is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminScheduleEvents ¶ added in v0.2.2
func (*RedisDurableBackend) AdminScheduleEvents(ctx context.Context, _ AdminScheduleEventFilter) (AdminScheduleEventPage, error)
AdminScheduleEvents is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminScheduleFactories ¶ added in v0.2.2
func (*RedisDurableBackend) AdminScheduleFactories(ctx context.Context) ([]AdminScheduleFactory, error)
AdminScheduleFactories is not supported by the Redis durable backend.
func (*RedisDurableBackend) AdminSchedules ¶ added in v0.2.2
func (*RedisDurableBackend) AdminSchedules(ctx context.Context) ([]AdminSchedule, error)
AdminSchedules returns cron schedule data if supported.
func (*RedisDurableBackend) AdminSetQueueWeight ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminSetQueueWeight(ctx context.Context, name string, weight int) (AdminQueueSummary, error)
AdminSetQueueWeight updates the scheduler weight for a queue.
func (*RedisDurableBackend) AdminUpsertJob ¶ added in v0.2.2
func (b *RedisDurableBackend) AdminUpsertJob(ctx context.Context, spec AdminJobSpec) (AdminJob, error)
AdminUpsertJob creates or updates a job definition.
func (*RedisDurableBackend) ConfigureQueues ¶ added in v0.1.9
func (b *RedisDurableBackend) ConfigureQueues(defaultQueue string, weights map[string]int)
ConfigureQueues applies default queue and weights for durable scheduling.
func (*RedisDurableBackend) Dequeue ¶ added in v0.1.8
func (b *RedisDurableBackend) Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
Dequeue retrieves a batch of durable tasks from the Redis backend with a lease.
func (*RedisDurableBackend) Enqueue ¶ added in v0.1.8
func (b *RedisDurableBackend) Enqueue(ctx context.Context, task DurableTask) error
Enqueue adds a new durable task to the Redis backend.
func (*RedisDurableBackend) Extend ¶ added in v0.1.8
func (b *RedisDurableBackend) Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
Extend renews the processing lease for a durable task.
func (*RedisDurableBackend) Fail ¶ added in v0.1.8
func (b *RedisDurableBackend) Fail(ctx context.Context, lease DurableTaskLease, err error) error
Fail marks a durable task as failed and moves it to the dead letter queue.
func (*RedisDurableBackend) Nack ¶ added in v0.1.8
func (b *RedisDurableBackend) Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
Nack negatively acknowledges a durable task, making it available for reprocessing after a delay.
type RedisDurableOption ¶ added in v0.1.8
type RedisDurableOption func(*RedisDurableBackend)
RedisDurableOption defines an option for configuring RedisDurableBackend.
func WithRedisAdminJobEventStore ¶ added in v0.2.2
func WithRedisAdminJobEventStore(store AdminJobEventStore) RedisDurableOption
WithRedisAdminJobEventStore configures a job event store for admin events.
func WithRedisDurableBatchSize ¶ added in v0.1.8
func WithRedisDurableBatchSize(size int) RedisDurableOption
WithRedisDurableBatchSize sets the batch size for dequeuing tasks.
func WithRedisDurableDefaultQueue ¶ added in v0.1.9
func WithRedisDurableDefaultQueue(name string) RedisDurableOption
WithRedisDurableDefaultQueue sets the default queue name for durable tasks.
func WithRedisDurableGlobalRateLimit ¶ added in v0.2.1
func WithRedisDurableGlobalRateLimit(rate float64, burst int) RedisDurableOption
WithRedisDurableGlobalRateLimit enables a Redis-based global rate limit for dequeue. The rate is tokens per second and burst is the max token bucket size.
func WithRedisDurableLeaderLock ¶ added in v0.2.1
func WithRedisDurableLeaderLock(lease time.Duration) RedisDurableOption
WithRedisDurableLeaderLock enables a Redis-based leader lock for dequeue coordination.
func WithRedisDurablePrefix ¶ added in v0.1.8
func WithRedisDurablePrefix(prefix string) RedisDurableOption
WithRedisDurablePrefix sets the key prefix for Redis durable backend.
func WithRedisDurableQueueWeights ¶ added in v0.1.9
func WithRedisDurableQueueWeights(weights map[string]int) RedisDurableOption
WithRedisDurableQueueWeights sets queue weights for durable task scheduling.
type Result ¶ added in v0.0.4
type Result struct {
Task *Task // the task that produced the result
Result any // the result of the task
Error error // the error returned by the task
}
Result is a task result.
type ResultDropPolicy ¶ added in v0.1.6
type ResultDropPolicy uint8
ResultDropPolicy defines how to handle full subscriber buffers.
const ( // DropNewest drops the new result when the subscriber buffer is full. DropNewest ResultDropPolicy = iota // DropOldest drops the oldest buffered result to make room for the new one. DropOldest )
type RetentionPolicy ¶ added in v0.1.4
RetentionPolicy controls how completed tasks are kept in the registry.
type Service ¶
type Service interface {
// contains filtered or unexported methods
}
Service is an interface for a task manager.
type Task ¶
type Task struct {
ID uuid.UUID `json:"id"` // ID is the id of the task
Name string `json:"name"` // Name is the name of the task
Description string `json:"description"` // Description is the description of the task
Priority int `json:"priority"` // Priority is the priority of the task
RunAt time.Time `json:"run_at"` // RunAt is the earliest time the task should execute
Queue string `json:"queue"` // Queue is the queue name for scheduling
Weight int `json:"weight"` // Weight influences scheduling share within a queue
Execute TaskFunc `json:"-"` // Execute is the function that will be executed by the task
Ctx context.Context `json:"-"` // Ctx is the context of the task
CancelFunc context.CancelFunc `json:"-"` // CancelFunc is the cancel function of the task
Retries int `json:"retries"` // Retries is the maximum number of retries for failed tasks
RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
// contains filtered or unexported fields
}
Task represents a function that can be executed by the task manager.
Note: access Task state via methods to avoid data races.
func (*Task) CancelledAt ¶ added in v0.1.4
CancelledAt returns the cancellation time if set.
func (*Task) CancelledChan ¶ added in v0.0.4
func (task *Task) CancelledChan() <-chan struct{}
CancelledChan returns a channel which gets closed when the task is cancelled.
func (*Task) CompletedAt ¶ added in v0.1.4
CompletedAt returns the completion time if set.
func (*Task) ShouldSchedule ¶ added in v0.0.5
ShouldSchedule returns an error if the task should not be scheduled.
func (*Task) Status ¶ added in v0.0.4
func (task *Task) Status() TaskStatus
Status returns the current task status.
type TaskHooks ¶ added in v0.1.5
type TaskHooks struct {
OnQueued func(task *Task)
OnStart func(task *Task)
OnFinish func(task *Task, status TaskStatus, result any, err error)
OnRetry func(task *Task, delay time.Duration, attempt int)
}
TaskHooks defines optional callbacks for task lifecycle events.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager is a struct that manages a pool of goroutines that can execute tasks.
func NewTaskManager ¶
func NewTaskManager( ctx context.Context, maxWorkers, maxTasks int, tasksPerSecond float64, timeout, retryDelay time.Duration, maxRetries int, ) *TaskManager
NewTaskManager creates a new task manager.
- ctx is the context for the task manager
- maxWorkers is the number of workers to start, if <=0, the number of CPUs will be used
- maxTasks is the maximum number of tasks that can be queued at once, defaults to 10
- tasksPerSecond is the rate limit of tasks that can be executed per second; <=0 disables rate limiting The limiter uses a burst size of min(maxWorkers, maxTasks) for deterministic throttling.
- timeout is the default timeout for tasks, defaults to 5 minutes
- retryDelay is the default delay between retries, defaults to 1 second
- maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)
func NewTaskManagerWithDefaults ¶ added in v0.0.5
func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager
NewTaskManagerWithDefaults creates a new task manager with default values.
- maxWorkers: runtime.NumCPU()
- maxTasks: 10
- tasksPerSecond: 5
- timeout: 5 minutes
- retryDelay: 1 second
- maxRetries: 3
func NewTaskManagerWithOptions ¶ added in v0.1.8
func NewTaskManagerWithOptions(ctx context.Context, opts ...TaskManagerOption) *TaskManager
NewTaskManagerWithOptions creates a new task manager using functional options.
func (*TaskManager) AdminAuditEvents ¶ added in v0.2.2
func (tm *TaskManager) AdminAuditEvents( ctx context.Context, filter AdminAuditEventFilter, ) (AdminAuditEventPage, error)
AdminAuditEvents returns recent admin mutation audit records.
func (*TaskManager) AdminCreateSchedule ¶ added in v0.2.2
func (tm *TaskManager) AdminCreateSchedule(ctx context.Context, spec AdminScheduleSpec) (AdminSchedule, error)
AdminCreateSchedule creates or updates a cron schedule by name.
func (*TaskManager) AdminDLQ ¶ added in v0.2.2
func (tm *TaskManager) AdminDLQ(ctx context.Context, filter AdminDLQFilter) (AdminDLQPage, error)
AdminDLQ lists DLQ entries.
func (*TaskManager) AdminDLQEntry ¶ added in v0.2.2
func (tm *TaskManager) AdminDLQEntry(ctx context.Context, id string) (AdminDLQEntryDetail, error)
AdminDLQEntry returns a detailed DLQ entry by ID.
func (*TaskManager) AdminDeleteJob ¶ added in v0.2.2
AdminDeleteJob removes a job definition and its cron factory.
func (*TaskManager) AdminDeleteSchedule ¶ added in v0.2.2
AdminDeleteSchedule removes a cron schedule by name.
func (*TaskManager) AdminJobEvents ¶ added in v0.2.2
func (tm *TaskManager) AdminJobEvents( ctx context.Context, filter AdminJobEventFilter, ) (AdminJobEventPage, error)
AdminJobEvents returns recent job execution events.
func (*TaskManager) AdminJobs ¶ added in v0.2.2
func (tm *TaskManager) AdminJobs(ctx context.Context) ([]AdminJob, error)
AdminJobs lists registered job definitions.
func (*TaskManager) AdminOverview ¶ added in v0.2.2
func (tm *TaskManager) AdminOverview(ctx context.Context) (AdminOverview, error)
AdminOverview returns an admin overview snapshot.
func (*TaskManager) AdminPause ¶ added in v0.2.2
func (tm *TaskManager) AdminPause(ctx context.Context) error
AdminPause pauses durable dequeue.
func (*TaskManager) AdminPauseQueue ¶ added in v0.2.2
func (tm *TaskManager) AdminPauseQueue(ctx context.Context, name string, paused bool) (AdminQueueSummary, error)
AdminPauseQueue pauses or resumes a specific queue.
func (*TaskManager) AdminPauseSchedule ¶ added in v0.2.2
func (tm *TaskManager) AdminPauseSchedule(ctx context.Context, name string, paused bool) (AdminSchedule, error)
AdminPauseSchedule pauses or resumes a cron schedule by name.
func (*TaskManager) AdminPauseSchedules ¶ added in v0.2.2
AdminPauseSchedules pauses or resumes all cron schedules.
func (*TaskManager) AdminQueue ¶ added in v0.2.2
func (tm *TaskManager) AdminQueue(ctx context.Context, name string) (AdminQueueSummary, error)
AdminQueue returns a queue summary by name.
func (*TaskManager) AdminQueues ¶ added in v0.2.2
func (tm *TaskManager) AdminQueues(ctx context.Context) ([]AdminQueueSummary, error)
AdminQueues lists queue summaries.
func (*TaskManager) AdminRecordAuditEvent ¶ added in v0.2.2
func (tm *TaskManager) AdminRecordAuditEvent(ctx context.Context, event AdminAuditEvent, _ int) error
AdminRecordAuditEvent persists an admin mutation audit record.
func (*TaskManager) AdminReplayDLQ ¶ added in v0.2.2
AdminReplayDLQ replays DLQ entries.
func (*TaskManager) AdminReplayDLQByID ¶ added in v0.2.2
AdminReplayDLQByID replays specific DLQ entries by ID.
func (*TaskManager) AdminResetQueueWeight ¶ added in v0.2.2
func (tm *TaskManager) AdminResetQueueWeight(ctx context.Context, name string) (AdminQueueSummary, error)
AdminResetQueueWeight resets a queue weight to default.
func (*TaskManager) AdminResume ¶ added in v0.2.2
func (tm *TaskManager) AdminResume(ctx context.Context) error
AdminResume resumes durable dequeue.
func (*TaskManager) AdminRunJob ¶ added in v0.2.2
AdminRunJob enqueues a job immediately as a durable task.
func (*TaskManager) AdminRunSchedule ¶ added in v0.2.2
AdminRunSchedule triggers a cron schedule immediately.
func (*TaskManager) AdminScheduleEvents ¶ added in v0.2.2
func (tm *TaskManager) AdminScheduleEvents( ctx context.Context, filter AdminScheduleEventFilter, ) (AdminScheduleEventPage, error)
AdminScheduleEvents returns recent cron schedule execution events.
func (*TaskManager) AdminScheduleFactories ¶ added in v0.2.2
func (tm *TaskManager) AdminScheduleFactories(ctx context.Context) ([]AdminScheduleFactory, error)
AdminScheduleFactories lists registered schedule factories.
func (*TaskManager) AdminSchedules ¶ added in v0.2.2
func (tm *TaskManager) AdminSchedules(ctx context.Context) ([]AdminSchedule, error)
AdminSchedules lists cron schedules.
func (*TaskManager) AdminSetQueueWeight ¶ added in v0.2.2
func (tm *TaskManager) AdminSetQueueWeight(ctx context.Context, name string, weight int) (AdminQueueSummary, error)
AdminSetQueueWeight updates queue weight and returns the updated summary.
func (*TaskManager) AdminUpsertJob ¶ added in v0.2.2
func (tm *TaskManager) AdminUpsertJob(ctx context.Context, spec AdminJobSpec) (AdminJob, error)
AdminUpsertJob creates or updates a job definition and registers a cron factory.
func (*TaskManager) CancelTask ¶
func (tm *TaskManager) CancelTask(id uuid.UUID) error
CancelTask cancels a task by its ID.
func (*TaskManager) ExecuteTask ¶ added in v0.0.2
func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
ExecuteTask executes a task given its ID and returns the result.
func (*TaskManager) GetActiveTasks ¶ added in v0.0.4
func (tm *TaskManager) GetActiveTasks() int
GetActiveTasks returns the number of running tasks.
func (*TaskManager) GetMetrics ¶ added in v0.1.0
func (tm *TaskManager) GetMetrics() MetricsSnapshot
GetMetrics returns a snapshot of current metrics.
func (*TaskManager) GetResults ¶
func (tm *TaskManager) GetResults() <-chan Result
GetResults returns a results channel (compatibility shim for legacy API). Use SubscribeResults for explicit unsubscription and buffer control.
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)
GetTask gets a task by its ID.
func (*TaskManager) IsEmpty ¶ added in v0.0.7
func (tm *TaskManager) IsEmpty() bool
IsEmpty checks if the task scheduler queue is empty.
func (*TaskManager) RegisterCronTask ¶ added in v0.2.1
func (tm *TaskManager) RegisterCronTask( ctx context.Context, name string, spec string, factory CronTaskFactory, ) error
RegisterCronTask registers a cron job that enqueues a task on each tick.
func (*TaskManager) RegisterDurableCronTask ¶ added in v0.2.1
func (tm *TaskManager) RegisterDurableCronTask( ctx context.Context, name string, spec string, factory CronDurableFactory, ) error
RegisterDurableCronTask registers a cron job that enqueues a durable task on each tick.
func (*TaskManager) RegisterDurableTask ¶ added in v0.1.8
func (tm *TaskManager) RegisterDurableTask(ctx context.Context, task DurableTask) error
RegisterDurableTask registers a durable task into the configured backend.
func (*TaskManager) RegisterDurableTaskAfter ¶ added in v0.2.0
func (tm *TaskManager) RegisterDurableTaskAfter(ctx context.Context, task DurableTask, delay time.Duration) error
RegisterDurableTaskAfter registers a durable task to execute after the provided delay.
func (*TaskManager) RegisterDurableTaskAt ¶ added in v0.2.0
func (tm *TaskManager) RegisterDurableTaskAt(ctx context.Context, task DurableTask, runAt time.Time) error
RegisterDurableTaskAt registers a durable task to execute at or after the provided time.
func (*TaskManager) RegisterDurableTasks ¶ added in v0.1.8
func (tm *TaskManager) RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error
RegisterDurableTasks registers multiple durable tasks.
func (*TaskManager) RegisterTask ¶
func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error
RegisterTask registers a new task to the task manager.
func (*TaskManager) RegisterTaskAfter ¶ added in v0.2.0
func (tm *TaskManager) RegisterTaskAfter(ctx context.Context, task *Task, delay time.Duration) error
RegisterTaskAfter registers a new task to execute after the provided delay.
func (*TaskManager) RegisterTaskAt ¶ added in v0.2.0
RegisterTaskAt registers a new task to execute at or after the provided time.
func (*TaskManager) RegisterTasks ¶ added in v0.0.4
func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error
RegisterTasks registers multiple tasks to the task manager at once.
func (*TaskManager) SetHooks ¶ added in v0.1.5
func (tm *TaskManager) SetHooks(hooks TaskHooks)
SetHooks configures callbacks for task lifecycle events.
func (*TaskManager) SetMaxWorkers ¶ added in v0.1.0
func (tm *TaskManager) SetMaxWorkers(maxWorkers int)
SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.
func (*TaskManager) SetMeterProvider ¶ added in v0.1.6
func (tm *TaskManager) SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error
SetMeterProvider enables OpenTelemetry metrics collection. Passing nil disables it.
func (*TaskManager) SetResultsDropPolicy ¶ added in v0.1.6
func (tm *TaskManager) SetResultsDropPolicy(policy ResultDropPolicy)
SetResultsDropPolicy configures how full subscriber buffers are handled.
func (*TaskManager) SetRetentionPolicy ¶ added in v0.1.4
func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)
SetRetentionPolicy configures task registry retention.
func (*TaskManager) SetTracer ¶ added in v0.1.6
func (tm *TaskManager) SetTracer(tracer TaskTracer)
SetTracer configures the task tracer.
func (*TaskManager) StartWorkers ¶ added in v0.0.4
func (tm *TaskManager) StartWorkers(ctx context.Context)
StartWorkers starts the task manager's workers and scheduler (idempotent).
func (*TaskManager) StopGraceful ¶ added in v0.1.4
func (tm *TaskManager) StopGraceful(ctx context.Context) error
StopGraceful stops accepting new tasks and waits for completion before stopping workers.
func (*TaskManager) StopNow ¶ added in v0.1.4
func (tm *TaskManager) StopNow()
StopNow cancels running tasks and stops workers immediately.
func (*TaskManager) SubscribeResults ¶ added in v0.1.4
func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())
SubscribeResults returns a results channel and an unsubscribe function.
func (*TaskManager) SyncJobFactories ¶ added in v0.2.2
func (tm *TaskManager) SyncJobFactories(ctx context.Context) error
SyncJobFactories loads persisted jobs and registers factories for them.
func (*TaskManager) UnregisterCronTask ¶ added in v0.2.1
func (tm *TaskManager) UnregisterCronTask(name string) bool
UnregisterCronTask removes a cron job by name.
type TaskManagerOption ¶ added in v0.1.8
type TaskManagerOption func(*taskManagerConfig)
TaskManagerOption configures a TaskManager.
func WithAdminAuditArchiveDir ¶ added in v0.2.3
func WithAdminAuditArchiveDir(dir string) TaskManagerOption
WithAdminAuditArchiveDir enables file archival for aged-out audit events.
func WithAdminAuditArchiveInterval ¶ added in v0.2.3
func WithAdminAuditArchiveInterval(interval time.Duration) TaskManagerOption
WithAdminAuditArchiveInterval sets the flush interval for audit archival.
func WithAdminAuditEventLimit ¶ added in v0.2.2
func WithAdminAuditEventLimit(limit int) TaskManagerOption
WithAdminAuditEventLimit sets the maximum number of in-memory admin audit events.
func WithAdminAuditRetention ¶ added in v0.2.3
func WithAdminAuditRetention(ttl time.Duration) TaskManagerOption
WithAdminAuditRetention sets a max age for admin audit events. Set <= 0 to disable age pruning.
func WithCronLocation ¶ added in v0.2.1
func WithCronLocation(location *time.Location) TaskManagerOption
WithCronLocation sets the time zone for cron schedules.
func WithDefaultQueue ¶ added in v0.1.9
func WithDefaultQueue(name string) TaskManagerOption
WithDefaultQueue sets the default queue name for tasks without a queue.
func WithDurableBackend ¶ added in v0.1.8
func WithDurableBackend(backend DurableBackend) TaskManagerOption
WithDurableBackend sets the durable backend for the TaskManager.
func WithDurableBatchSize ¶ added in v0.1.8
func WithDurableBatchSize(size int) TaskManagerOption
WithDurableBatchSize sets the durable task batch size.
func WithDurableCodec ¶ added in v0.1.8
func WithDurableCodec(codec DurableCodec) TaskManagerOption
WithDurableCodec sets the durable codec for the TaskManager.
func WithDurableHandlers ¶ added in v0.1.8
func WithDurableHandlers(handlers map[string]DurableHandlerSpec) TaskManagerOption
WithDurableHandlers sets the durable handlers for the TaskManager.
func WithDurableLease ¶ added in v0.1.8
func WithDurableLease(lease time.Duration) TaskManagerOption
WithDurableLease sets the durable task lease duration.
func WithDurableLeaseRenewalInterval ¶ added in v0.1.8
func WithDurableLeaseRenewalInterval(interval time.Duration) TaskManagerOption
WithDurableLeaseRenewalInterval sets the interval for renewing durable task leases. Set to <= 0 to disable renewal.
func WithDurablePollInterval ¶ added in v0.1.8
func WithDurablePollInterval(interval time.Duration) TaskManagerOption
WithDurablePollInterval sets the durable task polling interval.
func WithMaxRetries ¶ added in v0.1.8
func WithMaxRetries(n int) TaskManagerOption
WithMaxRetries sets the maximum number of retries for a task.
func WithMaxTasks ¶ added in v0.1.8
func WithMaxTasks(n int) TaskManagerOption
WithMaxTasks sets the maximum number of tasks in the queue.
func WithMaxWorkers ¶ added in v0.1.8
func WithMaxWorkers(n int) TaskManagerOption
WithMaxWorkers sets the maximum number of workers.
func WithQueueWeights ¶ added in v0.1.9
func WithQueueWeights(weights map[string]int) TaskManagerOption
WithQueueWeights sets the queue weight map for named queues.
func WithRetryDelay ¶ added in v0.1.8
func WithRetryDelay(delay time.Duration) TaskManagerOption
WithRetryDelay sets the delay between task retries.
func WithTasksPerSecond ¶ added in v0.1.8
func WithTasksPerSecond(n float64) TaskManagerOption
WithTasksPerSecond sets the maximum number of tasks to start per second.
func WithTimeout ¶ added in v0.1.8
func WithTimeout(timeout time.Duration) TaskManagerOption
WithTimeout sets the task execution timeout.
type TaskSpan ¶ added in v0.1.6
type TaskSpan interface {
End(err error)
}
TaskSpan represents an in-flight task span.
type TaskStatus ¶ added in v0.0.4
type TaskStatus uint8
TaskStatus is a value used to represent the task status.
func (TaskStatus) String ¶ added in v0.0.4
func (ts TaskStatus) String() string
String returns the string representation of the task status.
type TaskTracer ¶ added in v0.1.6
TaskTracer provides tracing spans around task execution.
type TypedDurableHandlerSpec ¶ added in v0.1.9
type TypedDurableHandlerSpec[T proto.Message] struct { Make func() T Fn func(ctx context.Context, payload T) (any, error) }
TypedDurableHandlerSpec provides a compile-time checked handler signature for durable tasks.
type TypedDurableRegistry ¶ added in v0.1.9
type TypedDurableRegistry struct {
// contains filtered or unexported fields
}
TypedDurableRegistry collects typed durable handlers and exposes the untyped map.
func NewTypedDurableRegistry ¶ added in v0.1.9
func NewTypedDurableRegistry() *TypedDurableRegistry
NewTypedDurableRegistry constructs a registry for typed durable handlers.
func (*TypedDurableRegistry) Add ¶ added in v0.1.9
func (r *TypedDurableRegistry) Add(name string, spec DurableHandlerSpec) error
Add registers an untyped durable handler spec into the registry.
func (*TypedDurableRegistry) Handlers ¶ added in v0.1.9
func (r *TypedDurableRegistry) Handlers() map[string]DurableHandlerSpec
Handlers returns a defensive copy of the registered durable handler map.
type TypedHandlerRegistry ¶ added in v0.1.9
type TypedHandlerRegistry struct {
// contains filtered or unexported fields
}
TypedHandlerRegistry collects typed gRPC handlers and exposes the untyped map.
func NewTypedHandlerRegistry ¶ added in v0.1.9
func NewTypedHandlerRegistry() *TypedHandlerRegistry
NewTypedHandlerRegistry constructs a registry for typed gRPC handlers.
func (*TypedHandlerRegistry) Add ¶ added in v0.1.9
func (r *TypedHandlerRegistry) Add(name string, spec HandlerSpec) error
Add registers an untyped handler spec into the registry.
func (*TypedHandlerRegistry) Handlers ¶ added in v0.1.9
func (r *TypedHandlerRegistry) Handlers() map[string]HandlerSpec
Handlers returns a defensive copy of the registered handler map.
type TypedHandlerSpec ¶ added in v0.1.9
type TypedHandlerSpec[T protoreflect.ProtoMessage] struct { Make func() T Fn func(ctx context.Context, payload T) (any, error) }
TypedHandlerSpec provides a compile-time checked handler signature for gRPC tasks.
Source Files
¶
- admin_actions.go
- admin_artifact_store.go
- admin_audit.go
- admin_audit_archive.go
- admin_backend.go
- admin_consts.go
- admin_gateway.go
- admin_guardrails.go
- admin_job_event_store.go
- admin_observability.go
- admin_observability_prometheus.go
- admin_taskmanager.go
- broadcaster.go
- build_info.go
- cron.go
- cron_events.go
- durable.go
- durable_admin_redis.go
- durable_redis.go
- durable_types.go
- grpc.go
- grpc_admin.go
- heap.go
- hooks.go
- interface.go
- job_events.go
- job_factory.go
- job_payload.go
- job_source.go
- metrics.go
- options.go
- otel_metrics.go
- queue_backend.go
- queue_config.go
- queue_metadata.go
- queue_scheduler.go
- result.go
- results_policy.go
- retention.go
- task.go
- tracer.go
- typed_registry.go
- worker.go
Directories
¶
| Path | Synopsis |
|---|---|
|
__examples
|
|
|
durable_dlq_replay
command
|
|
|
durable_queue_inspect
command
|
|
|
durable_redis
command
|
|
|
grpc
command
|
|
|
grpc_durable
command
|
|
|
manual
command
|
|
|
middleware
command
|
|
|
multi
command
|
|
|
otel_metrics
command
|
|
|
otel_metrics_otlp
command
|
|
|
otel_tracing
command
|
|
|
tracing
command
|
|
|
cmd
|
|
|
worker-admin
command
|
|
|
worker-service
command
|
|
|
workerctl
command
|
|
|
pkg
|
|