Documentation
¶
Index ¶
- type Config
- type HandlerRegistry
- type Option
- func WithElectionTTL(d time.Duration) Option
- func WithGroupAggregation(cfg types.GroupConfig) Option
- func WithHeartbeatInterval(d time.Duration) Option
- func WithKeyPrefix(prefix string) Option
- func WithLockTTL(d time.Duration) Option
- func WithLogger(l gochainedlog.Logger) Option
- func WithMaxConcurrency(n int) Option
- func WithNodeID(id string) Option
- func WithPriorityTiers(tiers []store.TierConfig) Option
- func WithRedisCluster(addrs []string) Option
- func WithRedisDB(db int) Option
- func WithRedisPassword(password string) Option
- func WithRedisPoolSize(n int) Option
- func WithRedisSentinel(masterName string, sentinelAddrs []string, sentinelPassword string) Option
- func WithRedisURL(url string) Option
- func WithRetentionPeriod(d time.Duration) Option
- func WithSchedulerTickInterval(d time.Duration) Option
- func WithShutdownTimeout(d time.Duration) Option
- type Server
- func (s *Server) ClusterTaskTypes() map[string]bool
- func (s *Server) Dispatcher() *dispatcher.Dispatcher
- func (s *Server) DynConfig() *dynconfig.Manager
- func (s *Server) Middlewares() []types.MiddlewareFunc
- func (s *Server) RedisClient() rueidis.Client
- func (s *Server) RegisterHandler(def types.HandlerDefinition) error
- func (s *Server) Start(ctx context.Context) error
- func (s *Server) Stop() error
- func (s *Server) Store() *store.RedisStore
- func (s *Server) Use(mw ...types.MiddlewareFunc)
- func (s *Server) Worker() *worker.Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Redis connection options.
RedisOptions types.RedisOptions
// NodeID uniquely identifies this server node. Auto-generated if empty.
NodeID string
// MaxConcurrency is the overall worker pool size per node. Default: 100.
MaxConcurrency int
// SchedulerTickInterval is how often the leader scans for due schedules. Default: 1s.
SchedulerTickInterval time.Duration
// HeartbeatInterval is how often nodes report health. Default: 5s.
HeartbeatInterval time.Duration
// ElectionTTL is the leader election key TTL. Default: 10s.
ElectionTTL time.Duration
// LockTTL is the distributed lock TTL. Default: 30s.
LockTTL time.Duration
// LockAutoExtend is the interval for auto-extending locks. Default: LockTTL/3.
LockAutoExtend time.Duration
// NodeTTL is the TTL for the node heartbeat KV entry. Default: 15s.
NodeTTL time.Duration
// ShutdownTimeout is the grace period for in-flight tasks before abort.
// After this duration, workers will requeue their messages and exit.
// Default: 30s.
ShutdownTimeout time.Duration
// StoreConfig holds Redis store configuration.
StoreConfig store.RedisStoreConfig
// GroupConfig enables task group aggregation. Nil = disabled.
GroupConfig *types.GroupConfig
// RetentionPeriod controls how long completed/dead/cancelled jobs are kept.
// Nil = default (7 days). Set to pointer to 0 to disable archival.
RetentionPeriod *time.Duration
// Logger is the structured logger. Default: slog.Default().
Logger gochainedlog.Logger
}
Config holds all server configuration.
type HandlerRegistry ¶ added in v0.1.3
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry stores registered task type handlers. It supports exact matches and pattern-based routing (e.g., "email:*"). Longest matching pattern wins, similar to net/http.ServeMux. It is safe for concurrent use.
func NewHandlerRegistry ¶ added in v0.1.3
func NewHandlerRegistry() *HandlerRegistry
NewHandlerRegistry creates a new empty handler registry.
func (*HandlerRegistry) Get ¶ added in v0.1.3
func (r *HandlerRegistry) Get(taskType types.TaskType) (*types.HandlerDefinition, bool)
Get returns the handler definition for a task type. It first checks exact matches, then falls back to pattern matching (longest prefix wins).
func (*HandlerRegistry) Len ¶ added in v0.1.3
func (r *HandlerRegistry) Len() int
Len returns the number of registered handlers (exact + pattern).
func (*HandlerRegistry) Register ¶ added in v0.1.3
func (r *HandlerRegistry) Register(def types.HandlerDefinition) error
Register adds a handler definition to the registry. TaskType can be an exact name ("email.send") or a pattern ("email:*", "email.*"). Patterns ending with "*" match any task type sharing that prefix.
func (*HandlerRegistry) TaskTypes ¶ added in v0.1.3
func (r *HandlerRegistry) TaskTypes() []string
TaskTypes returns all registered task type names (exact + pattern).
type Option ¶
type Option func(*Config)
Option configures the server.
func WithElectionTTL ¶ added in v0.1.3
func WithGroupAggregation ¶ added in v0.1.3
func WithGroupAggregation(cfg types.GroupConfig) Option
WithGroupAggregation enables task group aggregation with the given configuration.
func WithHeartbeatInterval ¶ added in v0.1.3
func WithKeyPrefix ¶
func WithLockTTL ¶
func WithLogger ¶
func WithLogger(l gochainedlog.Logger) Option
func WithMaxConcurrency ¶
func WithNodeID ¶
func WithPriorityTiers ¶ added in v0.1.2
func WithPriorityTiers(tiers []store.TierConfig) Option
WithPriorityTiers sets user-defined priority tiers. Each tier becomes its own Redis Stream.
func WithRedisCluster ¶
WithRedisCluster configures Redis Cluster mode with the given node addresses. Both single-shard and multi-shard clusters are supported. All multi-key Lua scripts use hash-tagged keys to ensure slot co-location.
func WithRedisPassword ¶
WithRedisPassword sets the Redis password.
func WithRedisPoolSize ¶
WithRedisPoolSize sets the Redis connection pool size.
func WithRedisSentinel ¶
WithRedisSentinel configures Redis Sentinel failover mode.
func WithRetentionPeriod ¶ added in v0.1.3
WithRetentionPeriod sets how long completed/dead/cancelled jobs are kept before archival cleanup. Default: 7 days. Set to 0 to disable archival.
func WithSchedulerTickInterval ¶ added in v0.1.3
func WithShutdownTimeout ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main dureq server node. All nodes are workers; the elected leader additionally runs the scheduler and orchestrator.
func New ¶
New creates a new server with the given options. Call RegisterHandler to add task types, then Start to begin.
func (*Server) ClusterTaskTypes ¶ added in v0.1.3
ClusterTaskTypes returns a set of all task types registered across the cluster (from node heartbeats) plus this node's local registry.
func (*Server) Dispatcher ¶
func (s *Server) Dispatcher() *dispatcher.Dispatcher
Dispatcher returns the dispatcher. Available after Start().
func (*Server) DynConfig ¶ added in v0.1.3
DynConfig returns the dynamic configuration manager. Available after Start().
func (*Server) Middlewares ¶
func (s *Server) Middlewares() []types.MiddlewareFunc
Middlewares returns the registered global middleware chain.
func (*Server) RedisClient ¶
RedisClient returns the Redis client. Available after Start().
func (*Server) RegisterHandler ¶
func (s *Server) RegisterHandler(def types.HandlerDefinition) error
RegisterHandler registers a task type handler on this server. Must be called before Start.
func (*Server) Stop ¶
Stop gracefully shuts down the server. The worker is stopped first to allow graceful drain and requeue of in-flight tasks.
func (*Server) Store ¶
func (s *Server) Store() *store.RedisStore
Store returns the Redis store. Available after Start().
func (*Server) Use ¶
func (s *Server) Use(mw ...types.MiddlewareFunc)
Use adds global middleware that wraps every handler. Must be called before Start. Middlewares are applied in order: first added = outermost.