Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
NodeID string
Store *store.RedisStore
Registry HandlerRegistry
Disp *dispatcher.Dispatcher
Locker *lock.Locker
MaxConcurrency int
ShutdownTimeout time.Duration // Grace period before aborting in-flight tasks. Default: 30s.
GlobalMiddlewares []types.MiddlewareFunc
Logger gochainedlog.Logger
}
Config holds worker configuration.
type HandlerRegistry ¶
type HandlerRegistry interface {
Get(taskType types.TaskType) (*types.HandlerDefinition, bool)
TaskTypes() []string
}
HandlerRegistry is the interface for looking up handlers by task type.
type SyncRetrier ¶
type SyncRetrier struct {
// contains filtered or unexported fields
}
SyncRetrier retries failed Redis write operations in the background with exponential backoff. Pending/failed counters are exposed for the monitoring UI.
func NewSyncRetrier ¶
func NewSyncRetrier(ctx context.Context, logger gochainedlog.Logger) *SyncRetrier
NewSyncRetrier creates a SyncRetrier and starts its background loop.
func (*SyncRetrier) Stats ¶
func (sr *SyncRetrier) Stats() SyncRetryStats
Stats returns a snapshot of the current retrier state.
func (*SyncRetrier) Submit ¶
func (sr *SyncRetrier) Submit(req *syncRequest)
Submit enqueues a failed operation for async retry.
type SyncRetryItem ¶
type SyncRetryItem struct {
Description string `json:"description"`
FailedAt time.Time `json:"failed_at"`
Retries int `json:"retries"`
}
SyncRetryItem represents a single failed operation for display in the UI.
type SyncRetryStats ¶
type SyncRetryStats struct {
Pending int64 `json:"pending"`
Failed int64 `json:"failed"`
Recent []SyncRetryItem `json:"recent"`
}
SyncRetryStats exposes retrier state for the monitoring API.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker pulls work messages from Redis Streams and executes them using a panjf2000/ants goroutine pool.
func (*Worker) ActiveRunIDs ¶
ActiveRunIDs returns the list of currently executing run IDs on this worker.
func (*Worker) Start ¶
Start begins consuming work messages from Redis Streams. Creates XREADGROUP consumers for each tier and runs a weighted fetch loop.
func (*Worker) Stop ¶
func (w *Worker) Stop()
Stop gracefully stops the worker using a two-phase shutdown:
- Close quit channel → fetch loop stops accepting new messages.
- After ShutdownTimeout → close abort channel → in-flight handlers detect the signal and requeue their messages for other workers.
- ReleaseTimeout waits for all ants pool workers to return.
func (*Worker) SyncerStats ¶
func (w *Worker) SyncerStats() *SyncRetryStats
SyncerStats returns the current SyncRetrier stats for the monitoring API.