routine

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2019 License: MIT Imports: 13 Imported by: 12

README

routine

go routine control with context, support: Main, GO, Pool and some useful Executors.

Why we need control go routine?

The keyword go will create a go routine for the function, but if you want to control the routine, it need more work to do, like:

  • manage the go routine object
  • signal to the running go routine to stop
  • waiting for signals when go routine died

these works are really boring, that's why we need control go routine.

How to control go routine?

use an Executor interface, which user context.Context to control the go routine. According to the go routines level, you can use the following entry function help you to control the go routine.

  • Main, encapsulates default signal handlers, process level waiting for go routines, and with prepare & cleanup options
  • Go, wrapper for the go. You should use it in the Main scope.
  • Pool, the simplest go routine pool which implement Routine interface

Quick Start

Main function

the Main function encapsulates default signal handlers, process level waiting for go routines, and with prepare & cleanup options


import "github.com/x-mod/routine"

func main() {
	err := routine.Main(routine.ExecutorFunc(func(ctx context.Context) error {
		//TODO your code here
		return nil
	}))
	//...
}

Go function

the Go function is the wrapper of the golang's keyword go, when you use the Go function, it act the same like keywork go, but with inside context controling.


import "github.com/x-mod/routine"

func main() {
	err := routine.Main(routine.ExecutorFunc(func(ctx context.Context) error {
		//ignore the result error
		routine.Go(ctx, routine.ExecutorFunc(func(ctx context.Context) error {
			//go routine 1 ...
			return nil
		}))

		//get the result error
		err := <-routine.Go(ctx, routine.ExecutorFunc(func(ctx context.Context) error {
			//go routine 2 ...
			return nil
		}))
		return nil
	}))
	//...
}

Pool routines


import "github.com/x-mod/routine"

func main() {
	//create a pool
	pool := routine.NewPool(routine.NumOfRoutines(4))
	defer pool.Close()
	
	err := routine.Main(routine.ExecutorFunc(func(ctx context.Context) error {	
		//ignore the result error
		pool.Go(ctx, routine.ExecutorFunc(func(ctx context.Context) error {
			//go routine 1 ...
			return nil
		}))

		//get the result error
		err := <-pool.Go(ctx, routine.ExecutorFunc(func(ctx context.Context) error {
			//go routine 2 ...
			return nil
		}))
		return nil
	}))
	//...
}

Executors

provide some useful executors, like:

  • retry, retry your executor when failed
  • repeat, repeat your executor
  • crontab, schedule your executor
  • guarantee, make sure your executor never panic

and so on.


import "github.com/x-mod/routine"

func main() {
	
	err := routine.Main(routine.ExecutorFunc(func(ctx context.Context) error {	
		//retry
		routine.Go(ctx, routine.Retry(3, routine.ExecutorFunc(func(ctx context.Context) error {
			//go routine 1 ...
			return nil
		})))

		//guarantee
		routine.Go(ctx, routine.Guarantee(3, routine.ExecutorFunc(func(ctx context.Context) error {
			panic("panic")
			return nil
		})))

		//concurrent
		routine.Go(ctx, routine.Concurrent(10, routine.ExecutorFunc(func(ctx context.Context) error {
			return nil
		})))
	
		//timeout
		routine.Go(ctx, routine.Timeout(time.Second, routine.ExecutorFunc(func(ctx context.Context) error {
			return nil
		})))
		return nil
	}))
	//...
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//ErrNoneExecutor error
	ErrNoneExecutor = errors.New("none executor")
	//ErrNoneContext error
	ErrNoneContext = errors.New("none context")
	//ErrNonePlan error
	ErrNonePlan = errors.New("none plan")
)
View Source
var DefaultCancelInterruptors []Interruptor

DefaultCancelInterruptors include INT/TERM/KILL signals

Functions

func ArgumentsFrom added in v1.1.0

func ArgumentsFrom(ctx context.Context) ([]interface{}, bool)

ArgumentsFrom extract from context

func Debug

func Debug(ctx context.Context, args ...interface{})

Debug ctx

func EnvironFrom added in v1.1.0

func EnvironFrom(ctx context.Context) []string

EnvironFrom get env

func Error

func Error(ctx context.Context, args ...interface{})

Error ctx

func FromConcurrent

func FromConcurrent(ctx context.Context) int

FromConcurrent current num

func FromCrontab

func FromCrontab(ctx context.Context) time.Time

FromCrontab current crontab time

func FromRepeat

func FromRepeat(ctx context.Context) int

FromRepeat current repeated times

func FromRetry

func FromRetry(ctx context.Context) int

FromRetry current retied times

func Go

func Go(ctx context.Context, exec Executor) chan error

Go wrapper for go keyword, use in MAIN function

func Info

func Info(ctx context.Context, args ...interface{})

Info ctx

func Main

func Main(exec Executor, opts ...Opt) error

Main wrapper for executor with waits & signal interuptors

func StderrFrom added in v1.1.0

func StderrFrom(ctx context.Context) io.Writer

StderrFrom get stderr

func StdinFrom added in v1.1.0

func StdinFrom(ctx context.Context) io.Reader

StdinFrom get stdin

func StdoutFrom added in v1.1.0

func StdoutFrom(ctx context.Context) io.Writer

StdoutFrom get stdout

func Trace

func Trace(ctx context.Context, args ...interface{})

Trace ctx

func Wait

func Wait(ctx context.Context)

Wait should be invoked when Executor implemention use Go

func WaitAdd

func WaitAdd(ctx context.Context, delta int)

WaitAdd if context with sync.WaitGroup, wait.Add

func WaitDone

func WaitDone(ctx context.Context)

WaitDone if context with sync.WaitGroup, wait.Done

func Warn

func Warn(ctx context.Context, args ...interface{})

Warn ctx

func WithArgments

func WithArgments(ctx context.Context, args ...interface{}) context.Context

WithArgments inject into context

func WithEnviron

func WithEnviron(ctx context.Context, key string, value string) context.Context

WithEnviron set env

func WithLogger

func WithLogger(ctx context.Context, logger Logger) context.Context

WithLogger context with logger

func WithStderr

func WithStderr(ctx context.Context, out io.Writer) context.Context

WithStderr set stderr

func WithStdin

func WithStdin(ctx context.Context, in io.Reader) context.Context

WithStdin set stdin

func WithStdout

func WithStdout(ctx context.Context, out io.Writer) context.Context

WithStdout set stdout

func WithWait

func WithWait(ctx context.Context) context.Context

WithWait context with sync.WaitGroup, reset WaitGroup

Types

type CancelInterruptor

type CancelInterruptor struct {
	// contains filtered or unexported fields
}

CancelInterruptor definition

func NewCancelInterruptor

func NewCancelInterruptor(sig syscall.Signal) *CancelInterruptor

NewCancelInterruptor if fn is nil will cancel context

func (*CancelInterruptor) Interrupt

func (c *CancelInterruptor) Interrupt() InterruptHandler

Interrupt inplement the interface

func (*CancelInterruptor) Signal

func (c *CancelInterruptor) Signal() syscall.Signal

Signal inplement the interface

type Code

type Code int32

Code for process exit

const (
	// OK is returned on success.
	OK              Code = 0
	GeneralErr      Code = 1
	MisUseErr       Code = 2
	NotExecutable   Code = 126
	IllegalCommand  Code = 127
	InvalidArgments Code = 128
)

func SignalCode

func SignalCode(sig syscall.Signal) Code

SignalCode signal code

func (Code) String

func (i Code) String() string

func (Code) Value

func (c Code) Value() int32

Value of Code

type CommandExecutor

type CommandExecutor struct {
	// contains filtered or unexported fields
}

CommandExecutor struct

func (*CommandExecutor) Execute

func (cmd *CommandExecutor) Execute(ctx context.Context) error

Execute implement Executor

type ConcurrentExecutor

type ConcurrentExecutor struct {
	// contains filtered or unexported fields
}

ConcurrentExecutor struct

func (*ConcurrentExecutor) Execute

func (ce *ConcurrentExecutor) Execute(ctx context.Context) error

Execute implement Executor

type CountReader added in v1.1.0

type CountReader struct {
	// contains filtered or unexported fields
}

CountReader count io reader

func NewCountReader added in v1.1.0

func NewCountReader(rd io.Reader) *CountReader

NewCountReader new

func (*CountReader) Count added in v1.1.0

func (rc *CountReader) Count() int

Count get count

func (*CountReader) Read added in v1.1.0

func (rc *CountReader) Read(p []byte) (int, error)

Read impl

type CountWriter added in v1.1.0

type CountWriter struct {
	// contains filtered or unexported fields
}

CountWriter count io writer

func NewCountWriter added in v1.1.0

func NewCountWriter(wr io.Writer) *CountWriter

NewCountWriter new

func (*CountWriter) Count added in v1.1.0

func (wc *CountWriter) Count() int

Count get count

func (*CountWriter) Write added in v1.1.0

func (wc *CountWriter) Write(p []byte) (int, error)

Write impl

type CrontabExecutor

type CrontabExecutor struct {
	// contains filtered or unexported fields
}

CrontabExecutor struct

func (*CrontabExecutor) Execute

func (c *CrontabExecutor) Execute(ctx context.Context) error

Execute implement Executor

type DeadlineExecutor

type DeadlineExecutor struct {
	// contains filtered or unexported fields
}

DeadlineExecutor struct

func (*DeadlineExecutor) Execute

func (tm *DeadlineExecutor) Execute(ctx context.Context) error

Execute implement Executor

type Executor

type Executor interface {
	//Execute before stopping make sure all subroutines stopped
	Execute(context.Context) error
}

Executor interface definition

func Command

func Command(cmd string, args ...string) Executor

Command new

func Concurrent

func Concurrent(c int, exec Executor) Executor

Concurrent new

func Crontab

func Crontab(plan string, exec Executor) Executor

Crontab new

func Deadline

func Deadline(d time.Time, exec Executor) Executor

Deadline new

func Guarantee

func Guarantee(exec Executor) Executor

Guarantee insure exec NEVER PANIC

func Repeat

func Repeat(repeat int, interval time.Duration, exec Executor) Executor

Repeat new

func Report

func Report(ch chan *Result, exec Executor) Executor

Report new

func Retry

func Retry(retry int, exec Executor) Executor

Retry new

func Timeout

func Timeout(d time.Duration, exec Executor) Executor

Timeout new

func UseExecutorMiddleware

func UseExecutorMiddleware(exec Executor, middleware ...ExecutorMiddleware) Executor

UseExecutorMiddleware wraps a Executor in one or more middleware.

type ExecutorFunc

type ExecutorFunc func(context.Context) error

ExecutorFunc definition

func (ExecutorFunc) Execute

func (f ExecutorFunc) Execute(ctx context.Context) error

Execute ExecutorFunc implemention of Executor

type ExecutorMiddleware

type ExecutorMiddleware func(Executor) Executor

ExecutorMiddleware is a function that middlewares can implement to be able to chain.

type GoFunc

type GoFunc func(context.Context, Executor) chan error

GoFunc definition

func (GoFunc) Go

func (f GoFunc) Go(ctx context.Context, exec Executor) chan error

Go GoFunc implemention of Routine

type GuaranteeExecutor

type GuaranteeExecutor struct {
	// contains filtered or unexported fields
}

GuaranteeExecutor struct, make sure of none error return

func (*GuaranteeExecutor) Execute

func (g *GuaranteeExecutor) Execute(ctx context.Context) (err error)

Execute implement Executor interface

type InterruptHandler

type InterruptHandler func(ctx context.Context) (exit bool)

InterruptHandler definition

type Interruptor

type Interruptor interface {
	Signal() syscall.Signal
	Interrupt() InterruptHandler
}

Interruptor definition

type Logger

type Logger interface {
	Debug(args ...interface{})
	Trace(args ...interface{})
	Info(args ...interface{})
	Error(args ...interface{})
	Warn(args ...interface{})
}

Logger declare

type Opt

type Opt func(*options)

Opt interface

func Arguments

func Arguments(args ...interface{}) Opt

Arguments Opt for Main

func Cleanup

func Cleanup(exec Executor) Opt

Cleanup Opt for Main

func Context added in v1.1.0

func Context(ctx context.Context) Opt

Context Opt

func Interrupts

func Interrupts(ints ...Interruptor) Opt

Interrupts Opt for Main

func Prepare

func Prepare(exec Executor) Opt

Prepare Opt for Main

type Pool added in v1.1.0

type Pool struct {
	// contains filtered or unexported fields
}

Pool struct

func NewPool added in v1.1.0

func NewPool(opts ...PoolOpt) *Pool

NewPool new routine pool

func (*Pool) Close added in v1.1.0

func (p *Pool) Close()

Close running pool

func (*Pool) Go added in v1.1.0

func (p *Pool) Go(ctx context.Context, exec Executor) chan error

Go impl routine interface

func (*Pool) Open added in v1.1.0

func (p *Pool) Open(ctx context.Context)

Open starting the pool routines

type PoolOpt added in v1.1.0

type PoolOpt func(*Pool)

PoolOpt opt for pool

func MaxOfRequestBufferSize added in v1.1.0

func MaxOfRequestBufferSize(n int) PoolOpt

MaxOfRequestBufferSize opt

func NumOfRoutines added in v1.1.0

func NumOfRoutines(n int) PoolOpt

NumOfRoutines opt

type RepeatExecutor

type RepeatExecutor struct {
	// contains filtered or unexported fields
}

RepeatExecutor struct

func (*RepeatExecutor) Execute

func (r *RepeatExecutor) Execute(ctx context.Context) error

Execute implement Executor

type ReportExecutor

type ReportExecutor struct {
	// contains filtered or unexported fields
}

ReportExecutor struct

func (*ReportExecutor) Execute

func (re *ReportExecutor) Execute(ctx context.Context) error

Execute implement Executor

type Result

type Result struct {
	Err           error
	Code          int
	Begin         time.Time
	Duration      time.Duration
	ContentLength int
}

Result struct

type RetryExecutor

type RetryExecutor struct {
	// contains filtered or unexported fields
}

RetryExecutor struct

func (*RetryExecutor) Execute

func (retry *RetryExecutor) Execute(ctx context.Context) error

Execute implement Executor interface

type Routine

type Routine interface {
	Go(context.Context, Executor) chan error
}

Routine for Executors

type TimeoutExecutor

type TimeoutExecutor struct {
	// contains filtered or unexported fields
}

TimeoutExecutor struct

func (*TimeoutExecutor) Execute

func (tm *TimeoutExecutor) Execute(ctx context.Context) error

Execute implement Executor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL