worker

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	NodeID            string
	Store             *store.RedisStore
	Registry          HandlerRegistry
	Disp              *dispatcher.Dispatcher
	Locker            *lock.Locker
	MaxConcurrency    int
	ShutdownTimeout   time.Duration // Grace period before aborting in-flight tasks. Default: 30s.
	GlobalMiddlewares []types.MiddlewareFunc
	Logger            gochainedlog.Logger
}

Config holds worker configuration.

type HandlerRegistry

type HandlerRegistry interface {
	Get(taskType types.TaskType) (*types.HandlerDefinition, bool)
	TaskTypes() []string
}

HandlerRegistry is the interface for looking up handlers by task type.

type SyncRetrier

type SyncRetrier struct {
	// contains filtered or unexported fields
}

SyncRetrier retries failed Redis write operations in the background with exponential backoff. Pending/failed counters are exposed for the monitoring UI.

func NewSyncRetrier

func NewSyncRetrier(ctx context.Context, logger gochainedlog.Logger) *SyncRetrier

NewSyncRetrier creates a SyncRetrier and starts its background loop.

func (*SyncRetrier) Stats

func (sr *SyncRetrier) Stats() SyncRetryStats

Stats returns a snapshot of the current retrier state.

func (*SyncRetrier) Submit

func (sr *SyncRetrier) Submit(req *syncRequest)

Submit enqueues a failed operation for async retry.

type SyncRetryItem

type SyncRetryItem struct {
	Description string    `json:"description"`
	FailedAt    time.Time `json:"failed_at"`
	Retries     int       `json:"retries"`
}

SyncRetryItem represents a single failed operation for display in the UI.

type SyncRetryStats

type SyncRetryStats struct {
	Pending int64           `json:"pending"`
	Failed  int64           `json:"failed"`
	Recent  []SyncRetryItem `json:"recent"`
}

SyncRetryStats exposes retrier state for the monitoring API.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker pulls work messages from Redis Streams and executes them using a panjf2000/ants goroutine pool.

func New

func New(cfg Config) (*Worker, error)

New creates a new worker.

func (*Worker) ActiveRunIDs

func (w *Worker) ActiveRunIDs() []string

ActiveRunIDs returns the list of currently executing run IDs on this worker.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start begins consuming work messages from Redis Streams. Creates XREADGROUP consumers for each tier and runs a weighted fetch loop.

func (*Worker) Stats

func (w *Worker) Stats() *types.PoolStats

Stats returns current worker pool statistics.

func (*Worker) Stop

func (w *Worker) Stop()

Stop gracefully stops the worker using a two-phase shutdown:

  1. Close quit channel → fetch loop stops accepting new messages.
  2. After ShutdownTimeout → close abort channel → in-flight handlers detect the signal and requeue their messages for other workers.
  3. ReleaseTimeout waits for all ants pool workers to return.

func (*Worker) SyncerStats

func (w *Worker) SyncerStats() *SyncRetryStats

SyncerStats returns the current SyncRetrier stats for the monitoring API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL