server

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Redis connection options.
	RedisOptions types.RedisOptions

	// NodeID uniquely identifies this server node. Auto-generated if empty.
	NodeID string

	// MaxConcurrency is the overall worker pool size per node. Default: 100.
	MaxConcurrency int

	// SchedulerTickInterval is how often the leader scans for due schedules. Default: 1s.
	SchedulerTickInterval time.Duration

	// HeartbeatInterval is how often nodes report health. Default: 5s.
	HeartbeatInterval time.Duration

	// ElectionTTL is the leader election key TTL. Default: 10s.
	ElectionTTL time.Duration

	// LockTTL is the distributed lock TTL. Default: 30s.
	LockTTL time.Duration

	// LockAutoExtend is the interval for auto-extending locks. Default: LockTTL/3.
	LockAutoExtend time.Duration

	// NodeTTL is the TTL for the node heartbeat KV entry. Default: 15s.
	NodeTTL time.Duration

	// ShutdownTimeout is the grace period for in-flight tasks before abort.
	// After this duration, workers will requeue their messages and exit.
	// Default: 30s.
	ShutdownTimeout time.Duration

	// StoreConfig holds Redis store configuration.
	StoreConfig store.RedisStoreConfig

	// GroupConfig enables task group aggregation. Nil = disabled.
	GroupConfig *types.GroupConfig

	// RetentionPeriod controls how long completed/dead/cancelled jobs are kept.
	// Nil = default (7 days). Set to pointer to 0 to disable archival.
	RetentionPeriod *time.Duration

	// Logger is the structured logger. Default: slog.Default().
	Logger gochainedlog.Logger
}

Config holds all server configuration.

type HandlerRegistry added in v0.1.3

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry stores registered task type handlers. It supports exact matches and pattern-based routing (e.g., "email:*"). Longest matching pattern wins, similar to net/http.ServeMux. It is safe for concurrent use.

func NewHandlerRegistry added in v0.1.3

func NewHandlerRegistry() *HandlerRegistry

NewHandlerRegistry creates a new empty handler registry.

func (*HandlerRegistry) Get added in v0.1.3

Get returns the handler definition for a task type. It first checks exact matches, then falls back to pattern matching (longest prefix wins).

func (*HandlerRegistry) Len added in v0.1.3

func (r *HandlerRegistry) Len() int

Len returns the number of registered handlers (exact + pattern).

func (*HandlerRegistry) Register added in v0.1.3

func (r *HandlerRegistry) Register(def types.HandlerDefinition) error

Register adds a handler definition to the registry. TaskType can be an exact name ("email.send") or a pattern ("email:*", "email.*"). Patterns ending with "*" match any task type sharing that prefix.

func (*HandlerRegistry) TaskTypes added in v0.1.3

func (r *HandlerRegistry) TaskTypes() []string

TaskTypes returns all registered task type names (exact + pattern).

type Option

type Option func(*Config)

Option configures the server.

func WithElectionTTL added in v0.1.3

func WithElectionTTL(d time.Duration) Option

func WithGroupAggregation added in v0.1.3

func WithGroupAggregation(cfg types.GroupConfig) Option

WithGroupAggregation enables task group aggregation with the given configuration.

func WithHeartbeatInterval added in v0.1.3

func WithHeartbeatInterval(d time.Duration) Option

func WithKeyPrefix

func WithKeyPrefix(prefix string) Option

func WithLockTTL

func WithLockTTL(d time.Duration) Option

func WithLogger

func WithLogger(l gochainedlog.Logger) Option

func WithMaxConcurrency

func WithMaxConcurrency(n int) Option

func WithNodeID

func WithNodeID(id string) Option

func WithPriorityTiers added in v0.1.2

func WithPriorityTiers(tiers []store.TierConfig) Option

WithPriorityTiers sets user-defined priority tiers. Each tier becomes its own Redis Stream.

func WithRedisCluster

func WithRedisCluster(addrs []string) Option

WithRedisCluster configures Redis Cluster mode with the given node addresses. Both single-shard and multi-shard clusters are supported. All multi-key Lua scripts use hash-tagged keys to ensure slot co-location.

func WithRedisDB

func WithRedisDB(db int) Option

WithRedisDB sets the Redis database number.

func WithRedisPassword

func WithRedisPassword(password string) Option

WithRedisPassword sets the Redis password.

func WithRedisPoolSize

func WithRedisPoolSize(n int) Option

WithRedisPoolSize sets the Redis connection pool size.

func WithRedisSentinel

func WithRedisSentinel(masterName string, sentinelAddrs []string, sentinelPassword string) Option

WithRedisSentinel configures Redis Sentinel failover mode.

func WithRedisURL

func WithRedisURL(url string) Option

WithRedisURL sets the Redis server URL.

func WithRetentionPeriod added in v0.1.3

func WithRetentionPeriod(d time.Duration) Option

WithRetentionPeriod sets how long completed/dead/cancelled jobs are kept before archival cleanup. Default: 7 days. Set to 0 to disable archival.

func WithSchedulerTickInterval added in v0.1.3

func WithSchedulerTickInterval(d time.Duration) Option

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) Option

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main dureq server node. All nodes are workers; the elected leader additionally runs the scheduler and orchestrator.

func New

func New(opts ...Option) (*Server, error)

New creates a new server with the given options. Call RegisterHandler to add task types, then Start to begin.

func (*Server) ClusterTaskTypes added in v0.1.3

func (s *Server) ClusterTaskTypes() map[string]bool

ClusterTaskTypes returns a set of all task types registered across the cluster (from node heartbeats) plus this node's local registry.

func (*Server) Dispatcher

func (s *Server) Dispatcher() *dispatcher.Dispatcher

Dispatcher returns the dispatcher. Available after Start().

func (*Server) DynConfig added in v0.1.3

func (s *Server) DynConfig() *dynconfig.Manager

DynConfig returns the dynamic configuration manager. Available after Start().

func (*Server) Middlewares

func (s *Server) Middlewares() []types.MiddlewareFunc

Middlewares returns the registered global middleware chain.

func (*Server) RedisClient

func (s *Server) RedisClient() rueidis.Client

RedisClient returns the Redis client. Available after Start().

func (*Server) RegisterHandler

func (s *Server) RegisterHandler(def types.HandlerDefinition) error

RegisterHandler registers a task type handler on this server. Must be called before Start.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start connects to Redis, initializes all subsystems, and begins operation.

func (*Server) Stop

func (s *Server) Stop() error

Stop gracefully shuts down the server. The worker is stopped first to allow graceful drain and requeue of in-flight tasks.

func (*Server) Store

func (s *Server) Store() *store.RedisStore

Store returns the Redis store. Available after Start().

func (*Server) Use

func (s *Server) Use(mw ...types.MiddlewareFunc)

Use adds global middleware that wraps every handler. Must be called before Start. Middlewares are applied in order: first added = outermost.

func (*Server) Worker

func (s *Server) Worker() *worker.Worker

Worker returns the worker instance. Available after Start().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL