xprocess

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2021 License: MIT Imports: 15 Imported by: 9

README

xprocess

一个通用的机制去管理goroutine的生命周期

  1. 通过xprocess.Go替换原生的直接启动goroutine的方式
  2. 通过context传递, 管理goroutine的生命周期
  3. GoLoop是对goroutine中有for的封装和替换
  4. Group是一个WaitGroup的wrap,不过增加了goroutine统计和可退出机制
  5. Stack会得到内存中所有在运行的goroutine, 以及数量

Examples

fmt.Println(xprocess.Stack())
cancel := xprocess.Go(func(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            time.Sleep(time.Millisecond * 100)
            fmt.Println("g1")
        }
    }
})

time.Sleep(time.Second)
fmt.Println(xprocess.Stack())
fmt.Println(cancel())
time.Sleep(time.Second)
fmt.Println(xprocess.Stack())
fmt.Println(xprocess.Stack())
for {
    xprocess.Go(func(ctx context.Context) error {
        time.Sleep(time.Second)
        fmt.Println("g2")
        return ctx.Err()
    })
    xprocess.GoLoop(func(ctx context.Context) error {
        time.Sleep(time.Second)
        fmt.Println("g3")
        return ctx.Err()
    })

    g := xprocess.NewGroup()
    g.Go(func(ctx context.Context) error {
        fmt.Println("g4")
        return nil
    })
    g.Go(func(ctx context.Context) error {
        fmt.Println("g5")
        return nil
    })
    g.Go(func(ctx context.Context) error {
        fmt.Println("g6")
        return xerror.Fmt("test error")
    })
    g.Wait()
    fmt.Println(g.Err())

    g.Cancel()

    fmt.Println(xprocess.Stack())
    time.Sleep(time.Second)
}
Timeout
func TestTimeout(t *testing.T) {
	err := xprocess.Timeout(time.Second, func(ctx context.Context) error {
		time.Sleep(time.Millisecond * 990)
		return nil
	})
	assert.Nil(t, err)

	err = xprocess.Timeout(time.Second, func(ctx context.Context) error {
		time.Sleep(time.Second + time.Millisecond*10)
		return nil
	})
	assert.NotNil(t, err)
}
Future

async,await,yield

package xprocess

import (
	"fmt"
	"net/http"
	"testing"

	"github.com/pubgo/xerror"
)

func handleReq(i int) Value {
	fmt.Println("url", i)
	return Async(http.Get, "https://www.cnblogs.com")
}

func getData() IFuture {
	return Future(func(y Yield) {
		for i := 10; i > 0; i-- {
			i := i
			if i <= 3 {
				return
			}

			y.Await(handleReq(i), func(resp *http.Response, err error) (*http.Response, error) {
				xerror.Panic(err)

				resp.Header.Add("test", "11111")
				return resp, err
			})

			y.Yield(Async(http.Get, "https://www.cnblogs.com"))
		}
	})
}

func handleData() IFuture {
	return Future(func(y Yield) {
		getData().Value(func(resp *http.Response, err error) {
			y.Yield(resp.Header)
		})
	})
}

func TestStream(t *testing.T) {
	handleData().Value(func(head http.Header) {
		fmt.Println("dt", head)
	})
}

func TestAsync(t *testing.T) {
	val1 := handleReq(1)
	val2 := handleReq(2)
	val3 := handleReq(3)
	val4 := handleReq(4)

	fmt.Printf("%#v, %#v, %#v, %#v\n", val1.Get(), val2.Get(), val3.Get(), val4.Get())
}

func TestGetData(t *testing.T) {
	getData().Value(func(resp *http.Response, err error) {
		fmt.Println(resp)
	})
}

func handleData2() IFuture {
	return Future(func(y Yield) {
		for i := 10; i > 0; i-- {
			y.Yield(i)
		}
	})
}

func TestName11w(t *testing.T) {
	handleData2().Value(func(i int) {
		fmt.Println(i)
	})
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Break = xerror.New("break")
View Source
var ErrTimeout = xerror.New("timeout")

Functions

func Go

func Go(fn func(ctx context.Context)) context.CancelFunc

Go 启动一个goroutine

func GoDelay added in v0.0.9

func GoDelay(dur time.Duration, fn func(ctx context.Context)) context.CancelFunc

GoDelay 延迟goroutine

func GoLoop

func GoLoop(fn func(ctx context.Context) error) context.CancelFunc

GoLoop 启动一个goroutine loop 是为了替换 `go func() {for{ }}()` 这类的代码

func Map added in v0.1.1

func Map(data interface{}, fn interface{}) interface{}

func NewGroup

func NewGroup(opts ...GroupOption) *group

NewGroup 创建一个group对象, 可以带上默认的Context

func Stack

func Stack() string

Stack 获取正在运行的goroutine的stack和数量

func Timeout added in v0.0.3

func Timeout(dur time.Duration, fn func(ctx context.Context) error) error

Timeout 执行超时函数, 超时后, 函数自动退出

Types

type Future added in v0.1.1

type Future interface {
	Cancel()
	Yield(data interface{}, fn ...interface{}) // async
	Await(val FutureValue, fn interface{})     // block
}

type FutureValue added in v0.1.1

type FutureValue interface {
	Err() error
	String() string
	Get() []reflect.Value
	Value(fn interface{})
}

func Async added in v0.1.1

func Async(fn interface{}, args ...interface{}) FutureValue

func Await added in v0.1.1

func Await(val FutureValue, fn interface{}) FutureValue

type Group added in v0.0.3

type Group = group

type GroupOption added in v0.0.8

type GroupOption func(*group)

func WithConcurrency added in v0.0.8

func WithConcurrency(c uint32) GroupOption

type IPromise added in v0.1.1

type IPromise interface {
	Wait() error
	Cancelled() bool
	Value(fn interface{}) // block
}

func Promise added in v0.1.1

func Promise(fn func(g Future)) IPromise

type WaitGroup added in v0.1.1

type WaitGroup struct {
	Check      int8
	Concurrent int16
	sync.WaitGroup
	// contains filtered or unexported fields
}

func (*WaitGroup) Add added in v0.1.1

func (t *WaitGroup) Add(delta int)

func (*WaitGroup) Count added in v0.1.1

func (t *WaitGroup) Count() int16

func (*WaitGroup) Dec added in v0.1.1

func (t *WaitGroup) Dec()

func (*WaitGroup) Inc added in v0.1.1

func (t *WaitGroup) Inc()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL