kitchen

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2024 License: MIT Imports: 21 Imported by: 2

README

Kitchen Framework

A golang framework for building progressive backend services.

Introduction

Kitchen is a framework designed for building progressive, scalable services.

The core concept of Kitchen is to create placeholders for all major functions. These placeholders define the input, output, and dependencies required for execution. Function bodies can then be assigned to these placeholders.

At runtime, functions are invoked from the placeholders. This allows for the integration of additional logic such as logging, tracing, metrics, callbacks, and more into the function calls without messing up the code.

Since execution is managed via placeholders, the call is not necessarily executed in local. This enables seamless upgrading of the service from a monolithic architecture to horizontal monoliths, and even splitting its scope to microservices.

Overview

Key Features

  • Tracing / logging / metrics
  • Effortless/Restartless scaling
  • Dependencies management
  • Concurrency management
  • Plugin for serving as web / gRPC API
  • OpenApi schema generation
  • Pipeline for state management
  • Asynchronous call

Introduction

There are few components in the framework that are used to build the API. Let's take an abstract example of a cafe.


// In a cafe, what we focus on is to serve the customer with DISH, like coffee, cake etc.
// DISH is the placeholder for the function

// COOKWARE is the dependency that is required for serving the request.
type coffeeCookware struct {
    grinder *Grinder
    coffeeMachine *CoffeeMachine
}

// MENU is the collection of DISH
// Each MENU will be assigned a COOKWARE
// All the DISHes in the MENU will be using the same COOKWARE to serve
type CoffeeMenu struct {
    kitchen.MenuBase[*CoffeeMenu, coffeeCookware] // Base struct for the menu
    Cappuccino kitchen.Dish[                // Dish placeholder
	    coffeeCookware,                     // Cookware
		*cafeProto.CappuccinoOrder,         // Input, like specifications of the coffee like milk, beans etc.
		*cafeProto.CappuccinoOutput,        // Output, your coffee!
	]
}

// Another cookware for cake
type cakeCookware struct {
    //oven
    //mixer
}

// Cakes need another menu since they use different cookware
type CakeMenu struct {
    kitchen.MenuBase[*CakeMenu, cakeCookware]
    Tiramisu kitchen.Dish[cakeCookware, *cafeProto.TiramisuOrder, *cafeProto.TiramisuOutput]
}

func main() {
    coffeeMenu := kitchen.InitMenu(       // Initialize the menu
		new(CoffeeMenu),                  // Menu prototype
		newCoffeeCookwarePool(),          // Cookware, can be a reusable pointer/sync.Pool
		)
	
	// Assign the Cooker to the Dish, which is the actual function body
    coffeeMenu.Cappuccino.SetCooker(func(
		ctx kitchen.IContext[coffeeCookware],    // Context contain the cookware and lifecycle utils
		input *testProto.CappuccinoOrder) 
	    (*testProto.CappuccinoOutput, error) {
    
        //let's cook!
        grindedBeans := ctx.Cookware().grinder.grindBeans(input.Beans)
        coffeeReady := ctx.Cookware().coffeeMachine.brewCoffee(grindedBeans, input.Milk)
        
        return &testProto.CappuccinoOutput{ Cappuccino: coffeeReady}, nil // return error or the coffee, enjoy!
    })

	// Try a callback after the cooking is done
    coffeeMenu.Cappuccino.AfterCook(func(ctx kitchen.IContext[coffeeCookware], input *testProto.CappuccinoOrder, output *testProto.CappuccinoOutput, err error) {
        ctx.Cookware().coffeeMachine.clean() // clean the machine after brewing
    })

	//cook the coffee
    coffee, res := coffeeMenu.Cappuccino.Cook(context.Background(), &testProto.CappuccinoOrder{Beans: "arabica", Milk: "whole"}
}


Web API Plugin

type WebTest struct {
    kitchen.MenuBase[*WebTest, *WebCookware]
    HelloWorld kitchen.Dish[*WebCookware, string, string]       // url: /hello_world
    Login     kitchen.Dish[*WebCookware, any, int64]            // url: /login
}

//special cookware for web
type WebCookware struct {
    UserId      int64
}

// Implement the IWebCookware to parse the request into cookware with session
func (d WebCookware) RequestParser(action kitchen.IDish, bundle kitchen.IWebBundle) (routerHelper.IWebCookware, error) {
    dd := d
    dd.UserId, _ = strconv.ParseInt(bundle.Headers().Get("UserId"), 10, 64)
    return &dd, nil
}

func main() {
    webTest := kitchen.InitMenu(new(WebTest), newWebCookwarePool())
    webTest.HelloWorld.SetCooker(func(ctx kitchen.IContext[*WebCookware], input string) (string, error) {
        return input+" world!", nil
    })
    webTest.Login.SetCooker(func(ctx kitchen.IContext[*WebCookware], input any) (int64, error) {
        return 1, nil
    })
    
    router := mux.NewRouter()                  // Prepare the router
    helper := muxHelper.NewWrapper(router)     // mux router helper, also available for echo/fasthttp
    helper.AddMenuToRouter(webTest)
    //helper.AddMenuToRouter(anotherMenu)
    
	// Generate the openapi schema
    jsonBytes, err := kitchenWeb.MakeOpenApi(someApiName, []string{"host1.com"}, "/", someVersion, webTest)
    
    http.ListenAndServe(":8080", router)
}
	
	

gRPC API Plugin


//setup the gRPC menu following the proto service signature
type GrpcMenu struct {
    kitchen.MenuBase[*GrpcTest, *AnyGrpcCookware]
    SayHello kitchen.Dish[*DummyWorkerCookware, *helloworld.HelloRequest, *helloworld.HelloReply]
}

func main() {
	
	// initialize the menu as usual
	grpcTest := kitchen.InitMenu[*GrpcMenu, *AnyGrpcCookware](&GrpcTest{}, &AnyGrpcCookware{})
	grpcTest.SayHello.SetCooker(func(ctx kitchen.IContext[*AnyGrpcCookware], input *helloworld.HelloRequest) (output *helloworld.HelloReply, err error) {
		return &helloworld.HelloReply{Response: input.Request + " world"}, nil
	})
	
	// start the gRPC server
	lis, err := net.Listen("tcp", "0.0.0.0:19527")
	if err != nil {
        panic(err)
    }
	s := grpc.NewServer()
	// Register the gRPC server with the plugin
	err = kitchenGrpc.RegisterGrpcServer(s, helloworld.Greeter_ServiceDesc, grpcTest)
	if err != nil {
        panic(err)
    }
    if err := s.Serve(lis); err != nil {
        panic(err)
    }
}

Contributors

Documentation

Overview

Package kitchen A golang framework for building progressive backend services.

Index

Constants

This section is empty.

Variables

View Source
var ErrCookerNotSet = errors.New("cooker not set")
View Source
var ErrInvalidStatus = errors.New("invalid pipeline status")
View Source
var (
	TraceIdGenerator = traceId
)

Functions

func GroupPrefork

func GroupPrefork[D ICookware](ctx context.Context, concurrent, buffer int, preHeatCookwareCanNilMeansUseDefault func() D, dishes ...iDish[D])

GroupPrefork is for preparing for a group of dishes with prefork goroutines if you just want to limit the concurrent number, ConcurrentLimit is always faster prefork is for regulate async goroutines or use preHeatCookware preHeatCookware is useful for delegating resources to each goroutine

func InitMenu

func InitMenu[W iMenu[D], D ICookware](menuPtr W, bundle any) W

InitMenu initializes a menu.

func InitPipeline

func InitPipeline[D IPipelineCookware[M], M IPipelineModel, P iPipeline[D, M]](pipelinePtr P, dep D) P

func NewOtelTraceableCookware

func NewOtelTraceableCookware[D ICookware](t trace.Tracer) *otelTraceableCookware[D]

func NewWebContext

func NewWebContext(ctx context.Context, bundle IWebBundle, cookware ICookware) *webContext

NewWebContext creates a new web context for web router wrappers and prevent the dish context ends after the web request ends.

Types

type AfterListenHandlers

type AfterListenHandlers[D ICookware, I any, O any] func(ctx IContext[D], input I, output O, err error)

type ChainTraceableCookware

type ChainTraceableCookware[D ICookware] []ITraceableCookware[D]

func NewChainTraceableCookware

func NewChainTraceableCookware[D ICookware](deps ...ITraceableCookware[D]) *ChainTraceableCookware[D]

func (ChainTraceableCookware[D]) StartTrace

func (d ChainTraceableCookware[D]) StartTrace(ctx IContext[D], id string, input any) (context.Context, iTraceSpan[D])

type Context

type Context[D ICookware] struct {
	context.Context
	// contains filtered or unexported fields
}

Context is a struct that holds various information and dependencies needed for the dishes.

func (Context[D]) Cookware

func (c Context[D]) Cookware() D

func (Context[D]) Dependency

func (c Context[D]) Dependency() D

func (Context[D]) Dish

func (c Context[D]) Dish() iDish[D]

func (Context[D]) GetCtx

func (c Context[D]) GetCtx() context.Context

func (Context[D]) Menu

func (c Context[D]) Menu() iMenu[D]

func (Context[D]) RawCookware

func (c Context[D]) RawCookware() ICookware

func (*Context[D]) Session

func (c *Context[D]) Session(nodes ...IDishServe) []IDishServe

Session Context will pass through nesting call of dishes, this appends sessions and returns all the session of the context.

func (*Context[D]) SetCtx

func (c *Context[D]) SetCtx(ctx context.Context)

func (Context[D]) Sets

func (c Context[D]) Sets() []iSet[D]

func (*Context[D]) TraceSpan

func (c *Context[D]) TraceSpan() iTraceSpan[D]

TraceSpan returns the trace span of the context, maybe nil if not the cookware not implement any tracer.

func (Context[D]) WebBundle

func (c Context[D]) WebBundle() IWebBundle

WebBundle returns the web bundle of the context, generated from router wrapper.

type ContextForTest

type ContextForTest[D ICookware] struct {
	context.Context
	SessionServed []IDishServe
	DummyCookware D
	DummyDish     iDish[D]
	DummySets     []iSet[D]
	DummyMenu     iMenu[D]
	// contains filtered or unexported fields
}

A ContextForTest is for mocking the Context struct for testing.

func (ContextForTest[D]) Cookware

func (c ContextForTest[D]) Cookware() D

func (ContextForTest[D]) Dependency

func (c ContextForTest[D]) Dependency() D

func (ContextForTest[D]) Dish

func (c ContextForTest[D]) Dish() iDish[D]

func (ContextForTest[D]) GetCtx

func (c ContextForTest[D]) GetCtx() context.Context

func (ContextForTest[D]) Menu

func (c ContextForTest[D]) Menu() iMenu[D]

func (ContextForTest[D]) RawCookware

func (c ContextForTest[D]) RawCookware() ICookware

func (*ContextForTest[D]) Session

func (c *ContextForTest[D]) Session(serve ...IDishServe) []IDishServe

func (ContextForTest[D]) SetCtx

func (c ContextForTest[D]) SetCtx(ctx context.Context)

func (*ContextForTest[D]) SetWebBundle

func (c *ContextForTest[D]) SetWebBundle(body []byte, bundle IWebBundle)

func (ContextForTest[D]) Sets

func (c ContextForTest[D]) Sets() []iSet[D]

func (ContextForTest[D]) TraceSpan

func (c ContextForTest[D]) TraceSpan() iTraceSpan[D]

func (ContextForTest[D]) WebBundle

func (c ContextForTest[D]) WebBundle() IWebBundle

type CounterTraceableCookware

type CounterTraceableCookware[D ICookware] struct {
	Menus []*MenuCounter
	// contains filtered or unexported fields
}

func NewCounterTraceableCookware

func NewCounterTraceableCookware[D ICookware]() *CounterTraceableCookware[D]

func (*CounterTraceableCookware[D]) StartTrace

func (d *CounterTraceableCookware[D]) StartTrace(ctx IContext[D], id string, input any) (context.Context, iTraceSpan[D])

type Dish

type Dish[D ICookware, I any, O any] struct {
	// contains filtered or unexported fields
}

A Dish is the placeholder for a function in the menu.

func (*Dish) AfterCook

func (r *Dish) AfterCook(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec registers a handler to be called after the dish has been executed.

func (*Dish) AfterCookAsync

func (r *Dish) AfterCookAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync registers a handler to be called after the dish has been executed asynchronously.

func (*Dish) AfterExec

func (r *Dish) AfterExec(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec aliases of AfterCook

func (*Dish) AfterExecAsync

func (r *Dish) AfterExecAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync aliases of AfterCookAsync

func (*Dish) ConcurrentLimit

func (r *Dish) ConcurrentLimit(limit int32)

ConcurrentLimit sets the maximum number of concurrent executions of this node and it's children.

func (*Dish[D, I, O]) Cook

func (a *Dish[D, I, O]) Cook(ctx context.Context, input I) (output O, err error)

Cook executes the dish with the input and returns the output and error. Return ErrCookerNotSet if the cooker is not set.

func (*Dish[D, I, O]) CookAny

func (a *Dish[D, I, O]) CookAny(ctx context.Context, input any) (output any, err error)

CookAny is for cooking with any input, mainly for external node calling with network.

func (*Dish[D, I, O]) CookAsync

func (a *Dish[D, I, O]) CookAsync(ctx context.Context, input I, optionalCallback ...func(O, error))

CookAsync executes the dish asynchronously with the input and optional callback.

func (*Dish[D, I, O]) CookWithCookware

func (a *Dish[D, I, O]) CookWithCookware(ctx context.Context, cookware D, input I) (output O, err error)

CookWithCookware executes the dish with the cookware(dependency) and the input, returns the output and error.

func (Dish[D, I, O]) Cookware

func (a Dish[D, I, O]) Cookware() ICookware

Cookware returns the cookware(dependency) of the dish.

func (Dish[D, I, O]) Dependency

func (a Dish[D, I, O]) Dependency() D

Dependency alias of Cookware

func (*Dish[D, I, O]) Exec

func (a *Dish[D, I, O]) Exec(ctx context.Context, input I) (output O, err error)

deprecated use Cook

func (*Dish[D, I, O]) ExecAsync

func (a *Dish[D, I, O]) ExecAsync(ctx context.Context, input I, optionalCallback ...func(O, error))

deprecated use CookAsync

func (*Dish[D, I, O]) ExecWithDep

func (a *Dish[D, I, O]) ExecWithDep(ctx context.Context, dep D, input I) (output O, err error)

deprecated use CookWithCookware

func (Dish[D, I, O]) FullName

func (a Dish[D, I, O]) FullName() string

func (Dish[D, I, O]) IO

func (a Dish[D, I, O]) IO() (any, any)

func (Dish[D, I, O]) Id

func (a Dish[D, I, O]) Id() uint32

func (Dish[D, I, O]) Input

func (a Dish[D, I, O]) Input() any

func (Dish[D, I, O]) Menu

func (a Dish[D, I, O]) Menu() IMenu

func (Dish[D, I, O]) Name

func (a Dish[D, I, O]) Name() string

func (Dish) Nodes

func (b Dish) Nodes() []IInstance

Nodes returns the children of this node.

func (*Dish[D, I, O]) PanicRecover

func (a *Dish[D, I, O]) PanicRecover(recover func(iContext IContext[D], recover any)) *Dish[D, I, O]

PanicRecover sets the recover function after panic, no recovery if nil.

func (*Dish[D, I, O]) Prefork

func (a *Dish[D, I, O]) Prefork(ctx context.Context, concurrent, buffer int, preHeatCookware ...func() D)

Prefork is for regulating async goroutines or using preHeatCookware

func (*Dish[D, I, O]) SetCooker

func (a *Dish[D, I, O]) SetCooker(cooker DishCooker[D, I, O]) *Dish[D, I, O]

SetCooker sets the function body to be executed when the dish is cooked.

func (*Dish[D, I, O]) SetExecer

func (a *Dish[D, I, O]) SetExecer(cooker DishCooker[D, I, O]) *Dish[D, I, O]

SetExecer is an alias of SetCooker

func (Dish[D, I, O]) Sets

func (a Dish[D, I, O]) Sets() []ISet

func (Dish[D, I, O]) Tags

func (a Dish[D, I, O]) Tags() reflect.StructTag

type DishCooker

type DishCooker[D ICookware, I any, O any] func(IContext[D], I) (output O, err error)

type IContext

type IContext[D ICookware] interface {
	IContextWithSession

	Menu() iMenu[D]
	Sets() []iSet[D]
	Dish() iDish[D]
	Dependency() D
	Cookware() D

	TraceSpan() iTraceSpan[D]
	// contains filtered or unexported methods
}

type IContextWithSession

type IContextWithSession interface {
	context.Context
	Session(...IDishServe) []IDishServe
	SetCtx(context.Context)
	RawCookware() ICookware
	WebBundle() IWebBundle
	GetCtx() context.Context
	// contains filtered or unexported methods
}

type ICookware

type ICookware interface {
}

type ICookwareFactory

type ICookwareFactory[D ICookware] interface {
	New() D
	Put(any)
}

type ICookwareInheritable

type ICookwareInheritable interface {
	ICookware
	Inherit(ICookware) ICookware
}

ICookwareInheritable is a cookware that can be inherited by another cookware. Useful when cross menu cooking is needed and pass the cookware around.

type IDbRunner

type IDbRunner interface {
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

type IDbTx

type IDbTx interface {
	IDbRunner
	Rollback() error
	Commit() error
}

type IDish

type IDish interface {
	Name() string
	FullName() string
	Input() any
	IO() (any, any)
	Id() uint32
	Menu() IMenu
	Cookware() ICookware
	Sets() []ISet
	Tags() reflect.StructTag

	// CookAny is for cooking with any input, mainly for external node calling with network.
	CookAny(ctx context.Context, input any) (output any, err error)
	// contains filtered or unexported methods
}

type IDishServe

type IDishServe interface {
	Record() (action IDish, finish bool, input any, output any, err error)
	// contains filtered or unexported methods
}

type IInstance

type IInstance interface {
	Name() string
	Menu() IMenu
	Nodes() []IInstance
}

type IManager

type IManager interface {
	AddMenu(menuInitializer func() IMenu) IManager
	SetMainKitchen(url string, port uint16) IManager

	Init() (IManager, error)
	SelectServeMenus(menuNames ...string) IManager
	DisableMenu(name string) IManager
	// contains filtered or unexported methods
}

IManager is the used for managing menus for scaling.

func NewDeliveryManager

func NewDeliveryManager(localHostUrl string, localRepPort uint16) IManager

NewDeliveryManager creates a new Manager. localHostUrl is the local host url which typically host from docker/kubernetes. localRepPort is the local port for the manager to listen to and exported for foreign call.

type IMenu

type IMenu interface {
	IInstance
	Name() string
	Manager() IManager
	Cookware() ICookware

	ID() uint32
	// contains filtered or unexported methods
}

type IPipeline

type IPipeline interface {
	IMenu
	GetActionsForModel(any) (status string, actions []IPipelineAction)
	GetActionsForStatus(status string) []IPipelineAction
	NewModel() IPipelineModel
}

type IPipelineAction

type IPipelineAction interface {
	IDish
	Status() PipelineStatus
	ModelToMap(model IPipelineModel) map[string]string
	ExecByIdAny(ctx context.Context, input any, ids ...any) (output any, err error)
	WillCreateModel() bool
	Pipeline() IPipeline
}

type IPipelineContext

type IPipelineContext[D IPipelineCookware[M], M IPipelineModel] interface {
	IContext[D]
	Tx() IDbTx
	Pipeline() iPipeline[D, M]
	Stage() iPipelineStage[D, M]
}

type IPipelineCookware

type IPipelineCookware[M IPipelineModel] interface {
	ICookware
	BeginTx(context.Context, ...*sql.TxOptions) (IDbTx, error)
	FinishTx(IDbTx, error) error
	GetModelById(ctx context.Context, pks ...any) (M, error)
	SaveModel(db IDbRunner, model M, oStatus PipelineStatus) error
}

type IPipelineModel

type IPipelineModel interface {
	GetStatus() PipelineStatus
	SetStatus(status PipelineStatus)
	PrimaryKey() any
}

type IPipelineModelCanMap

type IPipelineModelCanMap interface {
	ToMap() map[string]string
}

type IPipelineStage

type IPipelineStage interface {
	ISet
	Status() PipelineStatus
	Actions() []IPipelineAction
}

type ISet

type ISet interface {
	IInstance
	Name() string
	Menu() IMenu
	Tree() []ISet
}

type ITraceSpan

type ITraceSpan interface {
	Detail() any
	End(output any, err error)
	AddEvent(name string, attrSets ...map[string]any)
	SetAttributes(key string, value any)
	Raw() any
}

type ITraceableCookware

type ITraceableCookware[D ICookware] interface {
	StartTrace(ctx IContext[D], id string, input any) (context.Context, iTraceSpan[D])
}

ITraceableCookware is a cookware that trace the life-cycle of cooking can refer to built in tracers.

type IWebBundle

type IWebBundle interface {
	Ctx() context.Context
	Method() string
	Body() ([]byte, error)
	Url() *url.URL
	UrlParams() map[string]string
	Headers() http.Header
	Raw() any
	Response() http.ResponseWriter
}

IWebBundle is generated by web router wrapper, it contains all the information of a web request.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

A Manager is a struct for managing menus for scaling.

func (*Manager) AddMenu

func (m *Manager) AddMenu(menuInitializer func() IMenu) IManager

AddMenu adds a menu to the manager. menuInitializer is a function that returns a menu, TODO should like menu to dispose when disabled.

func (*Manager) DisableMenu

func (m *Manager) DisableMenu(name string) IManager

DisableMenu disables a menu. should call after Init

func (*Manager) Init

func (m *Manager) Init() (IManager, error)

Init initializes the manager and start listening.

func (*Manager) SelectServeMenus

func (m *Manager) SelectServeMenus(menuNamesNilIsAll ...string) IManager

SelectServeMenus selects the menus to serve. not select = serves all should call after Init

func (*Manager) SetMainKitchen

func (m *Manager) SetMainKitchen(url string, port uint16) IManager

SetMainKitchen sets the main host for the manager.

type MenuBase[WPtr iMenu[D], D ICookware] struct {
	// contains filtered or unexported fields
}

MenuBase is a struct for supporting IMenu, should embed to all menus.

func (r *MenuBase) AfterCook(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec registers a handler to be called after the dish has been executed.

func (r *MenuBase) AfterCookAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync registers a handler to be called after the dish has been executed asynchronously.

func (r *MenuBase) AfterExec(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec aliases of AfterCook

func (r *MenuBase) AfterExecAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync aliases of AfterCookAsync

func (r *MenuBase) ConcurrentLimit(limit int32)

ConcurrentLimit sets the maximum number of concurrent executions of this node and it's children.

func (r MenuBase[W, D]) Cookware() ICookware
func (b MenuBase[W, D]) Dependency() D
func (b *MenuBase[W, D]) Dishes() []iDish[D]
func (b *MenuBase[W, D]) ID() uint32
func (b *MenuBase[W, D]) Manager() IManager

Manager returns the manager of the menu, maybe nil if not associated with a manager.

func (b *MenuBase[W, D]) Menu() IMenu
func (b MenuBase[W, D]) Name() string
func (b MenuBase) Nodes() []IInstance

Nodes returns the children of this node.

type MenuCounter struct {
	Ok             *uint64
	Err            *uint64
	SideEffect     *uint64
	DishOk         []*uint64
	DishErr        []*uint64
	DishSideEffect []*uint64
	// contains filtered or unexported fields
}

type PipelineAction

type PipelineAction[D IPipelineCookware[M], M IPipelineModel, I any, O any] struct {
	Dish[D, *PipelineActionInput[M, I], *PipelineActionOutput[M, O]]
	// contains filtered or unexported fields
}

A PipelineAction is a super set of Dish specific for managing workflow of stateful data model.

func (*PipelineAction) AfterCook

func (r *PipelineAction) AfterCook(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec registers a handler to be called after the dish has been executed.

func (*PipelineAction) AfterCookAsync

func (r *PipelineAction) AfterCookAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync registers a handler to be called after the dish has been executed asynchronously.

func (*PipelineAction) AfterExec

func (r *PipelineAction) AfterExec(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec aliases of AfterCook

func (*PipelineAction) AfterExecAsync

func (r *PipelineAction) AfterExecAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync aliases of AfterCookAsync

func (*PipelineAction) ConcurrentLimit

func (r *PipelineAction) ConcurrentLimit(limit int32)

ConcurrentLimit sets the maximum number of concurrent executions of this node and it's children.

func (*PipelineAction[D, M, I, O]) CreateModel

func (p *PipelineAction[D, M, I, O]) CreateModel() *PipelineAction[D, M, I, O]

func (*PipelineAction[D, M, I, O]) ExecById

func (p *PipelineAction[D, M, I, O]) ExecById(ctx context.Context, input I, modelId ...any) (output O, err error)

ExecById executes the action with the input and model id.

func (*PipelineAction[D, M, I, O]) ExecByIdAndDep

func (p *PipelineAction[D, M, I, O]) ExecByIdAndDep(ctx context.Context, dep D, input I, modelId ...any) (output O, err error)

ExecByIdAndDep executes the action with the dependencies and model id.

func (*PipelineAction[D, M, I, O]) ExecByIdAny

func (p *PipelineAction[D, M, I, O]) ExecByIdAny(ctx context.Context, input any, ids ...any) (output any, err error)

ExecByIdAny executes the action with the input and model id.

func (*PipelineAction[D, M, I, O]) ExecWithModel

func (p *PipelineAction[D, M, I, O]) ExecWithModel(ctx context.Context, model M, input I) (output O, err error)

ExecWithModel executes the action with the model and input.

func (*PipelineAction[D, M, I, O]) ExecWithModelAndDep

func (p *PipelineAction[D, M, I, O]) ExecWithModelAndDep(ctx context.Context, dep D, model M, input I) (output O, err error)

ExecWithModelAndDep executes the action with the model, input and dependencies.

func (PipelineAction[D, M, I, O]) ModelToMap

func (p PipelineAction[D, M, I, O]) ModelToMap(model IPipelineModel) map[string]string

ModelToMap converts the model to map for logging.

func (PipelineAction) Nodes

func (b PipelineAction) Nodes() []IInstance

Nodes returns the children of this node.

func (PipelineAction[D, M, I, O]) Pipeline

func (p PipelineAction[D, M, I, O]) Pipeline() IPipeline

func (*PipelineAction[D, M, I, O]) SetCooker

func (p *PipelineAction[D, M, I, O]) SetCooker(cooker PipelineDishCooker[D, M, I, O]) *PipelineAction[D, M, I, O]

SetCooker sets the cooker for the action, will transform to func(i IContext[D], p *PipelineActionInput[M, I]) (output *PipelineActionOutput[M, O], err error).

func (*PipelineAction[D, M, I, O]) SetNextStage

func (p *PipelineAction[D, M, I, O]) SetNextStage(stage IPipelineStage) *PipelineAction[D, M, I, O]

SetNextStage sets the next stage for the action.

func (PipelineAction[D, M, I, O]) Status

func (p PipelineAction[D, M, I, O]) Status() PipelineStatus

func (*PipelineAction[D, M, I, O]) WillCreateModel

func (p *PipelineAction[D, M, I, O]) WillCreateModel() bool

WillCreateModel returns true if the action will create a model, can input nil in Cook.

type PipelineActionInput

type PipelineActionInput[M IPipelineModel, I any] struct {
	Input  I
	Model  M
	Before map[string]string
	Status PipelineStatus
}

func PipelineActionInputToAny

func PipelineActionInputToAny[M IPipelineModel](input any) PipelineActionInput[M, any]

PipelineActionInputToAny transforms the input to PipelineActionInput.

type PipelineActionOutput

type PipelineActionOutput[M IPipelineModel, O any] struct {
	Output O
	Model  M
}

func PipelineActionOutputToAny

func PipelineActionOutputToAny[M IPipelineModel](output any) PipelineActionOutput[M, any]

PipelineActionOutputToAny transforms the output to PipelineActionOutput.

type PipelineBase

type PipelineBase[P iPipeline[D, M], D IPipelineCookware[M], M IPipelineModel] struct {
	MenuBase[P, D]
	StageByStatus map[string]iPipelineStage[D, M]
}

PipelineBase is a super set of MenuBase, should embed to all pipelines.

func (*PipelineBase) AfterCook

func (r *PipelineBase) AfterCook(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec registers a handler to be called after the dish has been executed.

func (*PipelineBase) AfterCookAsync

func (r *PipelineBase) AfterCookAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync registers a handler to be called after the dish has been executed asynchronously.

func (*PipelineBase) AfterExec

func (r *PipelineBase) AfterExec(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec aliases of AfterCook

func (*PipelineBase) AfterExecAsync

func (r *PipelineBase) AfterExecAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync aliases of AfterCookAsync

func (*PipelineBase) ConcurrentLimit

func (r *PipelineBase) ConcurrentLimit(limit int32)

ConcurrentLimit sets the maximum number of concurrent executions of this node and it's children.

func (*PipelineBase[P, D, M]) GetActionsForModel

func (p *PipelineBase[P, D, M]) GetActionsForModel(model any) (string, []IPipelineAction)

GetActionsForModel returns the actions available for the model.

func (*PipelineBase[P, D, M]) GetActionsForStatus

func (p *PipelineBase[P, D, M]) GetActionsForStatus(status string) []IPipelineAction

GetActionsForStatus returns the actions available for the status.

func (PipelineBase[P, D, M]) NewModel

func (p PipelineBase[P, D, M]) NewModel() IPipelineModel

func (PipelineBase) Nodes

func (b PipelineBase) Nodes() []IInstance

Nodes returns the children of this node.

type PipelineContext

type PipelineContext[D IPipelineCookware[M], M IPipelineModel] struct {
	Context[D]
	// contains filtered or unexported fields
}

func (PipelineContext[D, M]) Pipeline

func (b PipelineContext[D, M]) Pipeline() iPipeline[D, M]

func (PipelineContext[D, M]) Stage

func (b PipelineContext[D, M]) Stage() iPipelineStage[D, M]

func (PipelineContext[D, M]) Tx

func (b PipelineContext[D, M]) Tx() IDbTx

type PipelineContextForTest

type PipelineContextForTest[D IPipelineCookware[M], M IPipelineModel] struct {
	ContextForTest[D]
	DummyTx IDbTx
}

func (PipelineContextForTest[D, M]) Pipeline

func (b PipelineContextForTest[D, M]) Pipeline() iPipeline[D, M]

func (PipelineContextForTest[D, M]) Stage

func (b PipelineContextForTest[D, M]) Stage() iPipelineStage[D, M]

func (PipelineContextForTest[D, M]) Tx

func (b PipelineContextForTest[D, M]) Tx() IDbTx

type PipelineDishCooker

type PipelineDishCooker[D IPipelineCookware[M], M IPipelineModel, I any, O any] func(IPipelineContext[D, M], M, I) (output O, toSaveCanNil M, err error)

type PipelineStage

type PipelineStage[D IPipelineCookware[M], M IPipelineModel] struct {
	SetBase[D]
	// contains filtered or unexported fields
}

PipelineStage is a super set of SetBase, should embed to all pipeline stages. example: a order model can have stages like "ordered", "wait for payment", "pending delivery", "delivery", "complete".

func (PipelineStage[D, M]) Actions

func (ps PipelineStage[D, M]) Actions() []IPipelineAction

Actions returns the actions available for the stage.

func (*PipelineStage) AfterCook

func (r *PipelineStage) AfterCook(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec registers a handler to be called after the dish has been executed.

func (*PipelineStage) AfterCookAsync

func (r *PipelineStage) AfterCookAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync registers a handler to be called after the dish has been executed asynchronously.

func (*PipelineStage) AfterExec

func (r *PipelineStage) AfterExec(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec aliases of AfterCook

func (*PipelineStage) AfterExecAsync

func (r *PipelineStage) AfterExecAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync aliases of AfterCookAsync

func (*PipelineStage) ConcurrentLimit

func (r *PipelineStage) ConcurrentLimit(limit int32)

ConcurrentLimit sets the maximum number of concurrent executions of this node and it's children.

func (PipelineStage) Nodes

func (b PipelineStage) Nodes() []IInstance

Nodes returns the children of this node.

func (PipelineStage[D, M]) Status

func (ps PipelineStage[D, M]) Status() PipelineStatus

type PipelineStatus

type PipelineStatus string

type SetBase

type SetBase[D ICookware] struct {
	// contains filtered or unexported fields
}

SetBase is a struct for supporting ISet, can treat as a sub menu.

func (*SetBase) AfterCook

func (r *SetBase) AfterCook(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec registers a handler to be called after the dish has been executed.

func (*SetBase) AfterCookAsync

func (r *SetBase) AfterCookAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync registers a handler to be called after the dish has been executed asynchronously.

func (*SetBase) AfterExec

func (r *SetBase) AfterExec(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExec aliases of AfterCook

func (*SetBase) AfterExecAsync

func (r *SetBase) AfterExecAsync(handler AfterListenHandlers[D, I, O], toLog ...any) *cookbook[D, I, O]

AfterExecAsync aliases of AfterCookAsync

func (*SetBase) ConcurrentLimit

func (r *SetBase) ConcurrentLimit(limit int32)

ConcurrentLimit sets the maximum number of concurrent executions of this node and it's children.

func (SetBase[D]) Menu

func (s SetBase[D]) Menu() IMenu

func (SetBase[D]) Name

func (s SetBase[D]) Name() string

func (SetBase) Nodes

func (b SetBase) Nodes() []IInstance

Nodes returns the children of this node.

func (*SetBase[D]) OverridePath

func (s *SetBase[D]) OverridePath(path string) *SetBase[D]

func (SetBase[D]) Tree

func (s SetBase[D]) Tree() []ISet

type ZeroLogTraceableCookware

type ZeroLogTraceableCookware[D ICookware] struct {
	Logger *zerolog.Logger
}

func NewZeroLogTraceableCookware

func NewZeroLogTraceableCookware[D ICookware](l *zerolog.Logger) *ZeroLogTraceableCookware[D]

func (ZeroLogTraceableCookware[D]) StartTrace

func (d ZeroLogTraceableCookware[D]) StartTrace(ctx IContext[D], id string, input any) (context.Context, iTraceSpan[D])

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL