mr

package
v0.0.0-...-e74f8b9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Serial   = 0
	Parallel = 1

	//1 rule rpc fail or biz fail
	RuleErrorStatus = 1
)
View Source
const (
	PriorityParallelType = iota + 1
	ParallelType
	SerialType
)

Variables

View Source
var ErrCancelWithNil = errors.New("mapreduce cancelled with nil")

Functions

func DoPriorityGroup

func DoPriorityGroup(ruleFlow []interface{})

DoPriorityGroup

func DoSerialGroup

func DoSerialGroup(ruleFlow []interface{})

DoSerialGroup

func Finish

func Finish(fns ...func() error) error

func FinishVoid

func FinishVoid(fns ...func())

func Map

func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{}

func MapReduce

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)

func MapReduceVoid

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error

func MapReduceWithSource

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
	opts ...Option) (interface{}, error)

func MapVoid

func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option)

func Run

func Run()

explain run type

func WorkFlowGroup

func WorkFlowGroup(ruleFlow []interface{})

WorkFlowGroup exec Parallel do request then get response

Types

type GenerateFunc

type GenerateFunc func(source chan<- interface{})

type IRule

type IRule interface {
	Output(req interface{}) *RuleOutput
}

type IWorkFlow

type IWorkFlow interface {
	DoSerialGroup() (res []RuleOutput)
	DoPriorityGroup() (res []RuleOutput)
	DoParallelGroup() (res []RuleOutput)
}

WorkFlow interface

type MapFunc

type MapFunc func(item interface{}, writer Writer)

type MapperFunc

type MapperFunc func(item interface{}, writer Writer, cancel func(error))

type Option

type Option func(opts *mapReduceOptions)

func WithWorkers

func WithWorkers(workers int) Option

type PriorityGroup

type PriorityGroup struct {
	Optimal RuleOutput
	// contains filtered or unexported fields
}

func (*PriorityGroup) Do

func (p *PriorityGroup) Do(length uint32, sro *RuleOutput)

n=len | 2^(n-1) + 2^(n-2) ... to be optimized two bit 10&10

func (*PriorityGroup) IsHit

func (p *PriorityGroup) IsHit() (bool, *RuleOutput)

type ReducerFunc

type ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))

type RuleBase

type RuleBase struct{}

type RuleOutput

type RuleOutput struct {
	Status int
	// Priority. build
	Priority uint32
	Result   interface{}
	Error    error
}

RuleOutput

func DoRule

func DoRule(rule interface{}) *RuleOutput

func WorkFlowManager

func WorkFlowManager(ruleFlow []interface{}) (res []RuleOutput)

control group manager

type VoidMapFunc

type VoidMapFunc func(item interface{})

type VoidReducerFunc

type VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))

type WGroup

type WGroup struct {
	Type int

	//Rules
	//example:     RuleA{State: Parallel, Priority: 1},
	Rules []interface{}

	//Hub
	//0 if len([]*RuleOutput) not zero  then return
	//1
	Hub int
}

func (*WGroup) DoParallelGroup

func (w *WGroup) DoParallelGroup() (res []RuleOutput)

func (*WGroup) DoPriorityGroup

func (w *WGroup) DoPriorityGroup() (res []RuleOutput)

DoPriorityGroup

func (*WGroup) DoSerialGroup

func (w *WGroup) DoSerialGroup() (res []RuleOutput)

group DoSerialGroup

type Writer

type Writer interface {
	Write(v interface{})
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL