Documentation
¶
Index ¶
- Constants
- type Command
- type Config
- type ErrorEncoder
- type Options
- type Pool
- type StaticPool
- func (sp *StaticPool) Destroy(ctx context.Context)
- func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error)
- func (sp *StaticPool) GetConfig() interface{}
- func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error
- func (sp *StaticPool) Reset(ctx context.Context) error
- func (sp *StaticPool) Workers() (workers []worker.BaseProcess)
- type Supervised
- type SupervisorConfig
- type Watcher
Constants ¶
View Source
const (
MB = 1024 * 1024
)
View Source
const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
NSEC_IN_SEC nanoseconds in second
View Source
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = `{"stop":true}`
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers uint64 `mapstructure:"num_workers"`
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
MaxJobs uint64 `mapstructure:"max_jobs"`
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`
// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`
// Supervision config to limit worker and pool memory usage.
Supervisor *SupervisorConfig `mapstructure:"supervisor"`
}
Config .. Pool config Configures the pool behavior.
func (*Config) InitDefaults ¶
func (cfg *Config) InitDefaults()
InitDefaults enables default config values.
type ErrorEncoder ¶
ErrorEncoder encode error or make a decision based on the error type
type Options ¶
type Options func(p *StaticPool)
func WithCustomErrEncoder ¶
func WithCustomErrEncoder(errEnc ErrorEncoder) Options
func WithLogger ¶
type Pool ¶
type Pool interface {
// GetConfig returns pool configuration.
GetConfig() interface{}
// Exec executes task with payload
Exec(rqs *payload.Payload) (*payload.Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
// RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
// Reset kill all workers inside the watcher and replaces with new
Reset(ctx context.Context) error
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
// contains filtered or unexported methods
}
Pool managed set of inner worker processes.
type StaticPool ¶
type StaticPool struct {
// contains filtered or unexported fields
}
StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
func (*StaticPool) Destroy ¶
func (sp *StaticPool) Destroy(ctx context.Context)
Destroy all underlying stack (but let them complete the task).
func (*StaticPool) GetConfig ¶
func (sp *StaticPool) GetConfig() interface{}
GetConfig returns associated pool configuration. Immutable.
func (*StaticPool) RemoveWorker ¶
func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error
func (*StaticPool) Workers ¶
func (sp *StaticPool) Workers() (workers []worker.BaseProcess)
Workers returns worker list associated with the pool.
type Supervised ¶
type Supervised interface {
Pool
// Start used to start watching process for all pool workers
Start()
}
type SupervisorConfig ¶
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
WatchTick time.Duration `mapstructure:"watch_tick"`
// TTL defines maximum time worker is allowed to live.
TTL time.Duration `mapstructure:"ttl"`
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
IdleTTL time.Duration `mapstructure:"idle_ttl"`
// ExecTTL defines maximum lifetime per job.
ExecTTL time.Duration `mapstructure:"exec_ttl"`
// MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
}
func (*SupervisorConfig) InitDefaults ¶
func (cfg *SupervisorConfig) InitDefaults()
InitDefaults enables default config values.
type Watcher ¶
type Watcher interface {
// Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error
// Take takes the first free worker
Take(ctx context.Context) (worker.BaseProcess, error)
// Release releases the worker putting it back to the queue
Release(w worker.BaseProcess)
// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error
// Destroy destroys the underlying container
Destroy(ctx context.Context)
// Reset will replace container and workers array, kill all workers
Reset(ctx context.Context)
// List return all container w/o removing it from internal storage
List() []worker.BaseProcess
// Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
Watcher is an interface for the Sync workers lifecycle
Click to show internal directories.
Click to hide internal directories.