worker_pool

package module
v1.0.1-0...-a0cd23e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2019 License: MIT Imports: 11 Imported by: 0

README

worker-pool

intro:

worker pool is a simple go library to intergrate a limited routine process in to your golang application. Jobs added to the worker pool will be assigned to a worker depending on a preffered hash algorithm calculated by the key. This ensures that the Jobs with same key will be processed by the same worker, thus ensuring race conditions in concurrent processes.

Warning: if shared resources are used by the process function, the library does not gurantee the above claims, in which case you must implement your own logic to avoid race conditions.

usage:
  • import the library to development environment using go-get

        go-get github.com/wishperera/worker-pool
    
  • creating a new pool and initializing with a process function

    
    conf := worker_pool.NewPoolConfig()
    conf.Workers = 100
    conf.WorkerBufferSize = 10
    conf.Metrics.NameSpace = "test_namespace"
    conf.Metrics.SubSystem = "test_subsystem"
    conf.HashFunc = worker_pool.SHA256
    
    pool,err := worker_pool.NewPool(conf)
    if err != nil{
        //todo- handle error as preffered
    }
    
    pool.Init(context.Background(), func(ctx context.Context, in interface{}) (out interface{}, err error) {
        //todo - process function body
        return out,nil
    })
    
    
  • conf.Workers refers to the number of workers(spawned in go routines) running in parallel, and conf.WorkerBufferSize refers to the size of the workers buffered channel.

  • conf.HashFunc refers to the bucket hashing algorithm used by job manager

  • conf.Metrics is used to set the prometheus metric name space and subsystem for pool metrics

  • worker_pool.DefaultConfig can be passed as the config for worker_pool.NewPool() function in which case the defaults will be as follows

        Workers           = 100
        WorkerBufferSize  = 10
        Metrics.NameSpace = "worker_pool"
        Metrics.SubSystem = "worker_pool"
        HashFunc          =  worker_pool.SHA256
    
  • adding jobs to the pool

     val := 300
     jobID := pool.AddNewJob(context.Background(),val,key)
    
  • value is the input to the process function, which can be a primitive type or a struct if multiple input parameters are required

  • key is can be a string or an integer (8,16,32,64). Jobs with same key will be processed by the same worker.

  • output can be retreived from the pool.Output channel which returns a Job type structure containg the original input,output and the possible errors. The GetID method returns a unique id to the Job that can be used to align the input with output.

  • more documentation can be found here

Documentation

Overview

worker pool is a simple go library to intergrate a limited routine process in to your golang application.

Index

Constants

View Source
const (
	SHA256 = iota + 1
	SHA512
	MD5
)

Variables

View Source
var DefaultConfig poolConfig

default pool config

View Source
var (
	PoolMap map[uuid.UUID](*Pool)
)

Reduntant for now.Will be useful for a multiple pool scenario

Functions

func NewPoolConfig

func NewPoolConfig() poolConfig

return an empty pool config

Types

type Job

type Job struct {
	// contains filtered or unexported fields
}

func (Job) GetID

func (j Job) GetID() uuid.UUID

returns a id that can uniquely address the job

func (Job) GetInput

func (j Job) GetInput() (input interface{})

get the input passed to the process function

func (Job) GetKey

func (j Job) GetKey() interface{}

return the key associated with the job

func (Job) GetOutput

func (j Job) GetOutput() (output interface{}, err error)

get the output and the error returned by the process function

type Pool

type Pool struct {
	Output chan Job
	// contains filtered or unexported fields
}

func NewPool

func NewPool(conf poolConfig) (p *Pool, err error)

returns a pointer to pool object with the input,output and error channels size set to the buffersize and routines count limited to workers

func (*Pool) AddNewJob

func (p *Pool) AddNewJob(ctx context.Context, input, key interface{}) (jobID uuid.UUID)

adds a new job to the process queue. will panic if the pool is not initialized using pool.Init(). Returns the job id for future use.

func (*Pool) Close

func (p *Pool) Close(ctx context.Context)

shut down the pool gracefully after waiting all worker routines to close.

func (*Pool) Init

func (p *Pool) Init(ctx context.Context, processFunc func(ctx context.Context, in interface{}) (out interface{}, err error))

intiailize the pool with a process function that accepts a context and the function parameters as a interface. parameter can be a single value or a structure in case of multiple expected inputs, same goes for output.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL