Documentation
¶
Overview ¶
Package callbacks provides callback mechanisms for component execution in Eino.
This package allows you to inject callback handlers at different stages of component execution, such as start, end, and error handling. It's particularly useful for implementing governance capabilities like logging, monitoring, and metrics collection.
The package provides two ways to create callback handlers:
1. Create a callback handler using HandlerBuilder:
handler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
// Handle component start
return ctx
}).
OnEnd(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
// Handle component end
return ctx
}).
OnError(func(ctx context.Context, info *RunInfo, err error) context.Context {
// Handle component error
return ctx
}).
OnStartWithStreamInput(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
// Handle component start with stream input
return ctx
}).
OnEndWithStreamOutput(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
// Handle component end with stream output
return ctx
}).
Build()
For this way, you need to convert the callback input types by yourself, and implement the logic for different component types in one handler.
2. Use [template.HandlerHelper] to create a handler:
Package template provides [template.HandlerHelper] as a convenient way to build callback handlers for different component types. It allows you to set specific handlers for each component type, and a fallback handler for unmatched components.
eg.
// Create handlers for specific components
modelHandler := &model.CallbackHandler{
OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context {
log.Printf("Model execution started: %s", info.ComponentName)
return ctx
},
}
promptHandler := &prompt.CallbackHandler{
OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context {
log.Printf("Prompt execution completed: %s", output.Result)
return ctx
},
}
// Create a fallback handler for unmatched components
fallbackHandler := &DefaultCallbackHandler{
OnStart: func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
log.Printf("Generic component started: %s", info.ComponentName)
return ctx
},
}
// Build the handler using HandlerHelper
handler := callbacks.NewHandlerHelper().
ChatModel(modelHandler).
Prompt(promptHandler).
Fallback(fallbackHandler).
Handler()
[HandlerHelper] supports handlers for various component types including:
- Prompt components (via prompt.CallbackHandler)
- Chat model components (via model.CallbackHandler)
- Embedding components (via embedding.CallbackHandler)
- Indexer components (via indexer.CallbackHandler)
- Retriever components (via retriever.CallbackHandler)
- Document loader components (via loader.CallbackHandler)
- Document transformer components (via transformer.CallbackHandler)
- Tool components (via tool.CallbackHandler)
- Graph (via template.DefaultCallbackHandler)
- State graph (via template.DefaultCallbackHandler)
- Chain (via template.DefaultCallbackHandler)
- Passthrough (via template.DefaultCallbackHandler)
- Tools node (via template.DefaultCallbackHandler)
- Lambda (via template.DefaultCallbackHandler)
Use the handler with a component:
runnable.Invoke(ctx, input, compose.WithCallbacks(handler))
Index ¶
- func CtxWithManager(ctx context.Context, manager *Manager) context.Contextdeprecated
- func InitCallbackHandlers(handlers []Handler)
- func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context
- func NewHandlerBuilder() *handlerBuilder
- func OnEnd(ctx context.Context, output CallbackOutput) context.Context
- func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func OnError(ctx context.Context, err error) context.Context
- func OnStart(ctx context.Context, input CallbackInput) context.Context
- func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func SetRunInfo(ctx context.Context, info *RunInfo) context.Context
- type CallbackInput
- type CallbackOutput
- type CallbackTiming
- type Handler
- type HandlerBuilderdeprecated
- func (h *HandlerBuilder) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
- func (h *HandlerBuilder) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, ...) context.Context
- func (h *HandlerBuilder) OnError(ctx context.Context, info *RunInfo, err error) context.Context
- func (h *HandlerBuilder) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
- func (h *HandlerBuilder) OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
- type Manager
- func (m Manager) Handlers() []Handler
- func (m Manager) OnEnd(ctx context.Context, output CallbackOutput) context.Context
- func (m Manager) OnEndWithStreamOutput(ctx context.Context, output *schema.StreamReader[CallbackOutput]) context.Context
- func (m Manager) OnError(ctx context.Context, err error) context.Context
- func (m Manager) OnStart(ctx context.Context, input CallbackInput) context.Context
- func (m Manager) OnStartWithStreamInput(ctx context.Context, input *schema.StreamReader[CallbackInput]) context.Context
- func (mm *Manager) WithRunInfo(runInfo *RunInfo) *Managerdeprecated
- type RunInfo
- type TimingChecker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitCallbackHandlers ¶
func InitCallbackHandlers(handlers []Handler)
InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes.
func InitCallbacks ¶
InitCallbacks initializes a new context with the provided RunInfo and handlers. Any previously set RunInfo and Handlers for this ctx will be overwritten.
func NewHandlerBuilder ¶
func NewHandlerBuilder() *handlerBuilder
func OnEnd ¶
func OnEnd(ctx context.Context, output CallbackOutput) context.Context
OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup and finalization when a process ends. handlers are executed in normal order (compared to add order).
func OnEndWithStreamOutput ¶
func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that every input stream should be closed properly in handler. handlers are executed in normal order (compared to add order).
func OnError ¶
OnError invokes the OnError logic of the particular, notice that error in stream will not represent here. handlers are executed in normal order (compared to add order).
func OnStart ¶
func OnStart(ctx context.Context, input CallbackInput) context.Context
OnStart Fast inject callback input / output aspect for component developer e.g.
func (t *testchatmodel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
defer func() {
if err != nil {
callbacks.OnEnd(ctx, err)
}
}()
ctx = callbacks.OnStart(ctx, &model.CallbackInput{
Messages: input,
Tools: nil,
Extra: nil,
})
// do smt
ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
Message: resp,
Extra: nil,
})
return resp, nil
}
OnStart invokes the OnStart logic for the particular context, ensuring that all registered handlers are executed in reverse order (compared to add order) when a process begins.
func OnStartWithStreamInput ¶
func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnStartWithStreamInput invokes the OnStartWithStreamInput logic of the particular context, ensuring that every input stream should be closed properly in handler. handlers are executed in reverse order (compared to add order).
Types ¶
type CallbackInput ¶
type CallbackInput any
CallbackInput is the input of the callback. the type of input is defined by the component. using type Assert or convert func to convert the input to the right type you want. e.g.
CallbackInput in components/model/interface.go is:
type CallbackInput struct {
Messages []*schema.Message
Config *Config
Extra map[string]any
}
and provide a func of model.ConvCallbackInput() to convert CallbackInput to *model.CallbackInput
in callback handler, you can use the following code to get the input:
modelCallbackInput := model.ConvCallbackInput(in)
if modelCallbackInput == nil {
// is not a model callback input, just ignore it
return
}
type CallbackOutput ¶
type CallbackOutput any
type CallbackTiming ¶
type CallbackTiming uint8
CallbackTiming enumerates all the timing of callback aspects.
const ( TimingOnStart CallbackTiming = iota TimingOnEnd TimingOnError TimingOnStartWithStreamInput TimingOnEndWithStreamOutput )
type Handler ¶
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo,
input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
output *schema.StreamReader[CallbackOutput]) context.Context
}
func GetGlobalHandlers ¶
func GetGlobalHandlers() []Handler
type HandlerBuilder
deprecated
type HandlerBuilder struct {
OnStartFn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEndFn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnErrorFn func(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInputFn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutputFn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}
HandlerBuilder can be used to build a Handler with callback functions. e.g.
handler := &HandlerBuilder{
OnStartFn: func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {} // self defined start callback function
}
graph := compose.NewGraph[inputType, outputType]()
runnable, err := graph.Compile()
if err != nil {...}
runnable.Invoke(ctx, params, compose.WithCallback(handler)) // => only implement functions which you want to override
Deprecated: In most situations, it is preferred to use callbacks.NewHandlerHelper. Otherwise, use NewHandlerBuilder().OnStartFn()...Build().
func (*HandlerBuilder) OnEnd ¶
func (h *HandlerBuilder) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
func (*HandlerBuilder) OnEndWithStreamOutput ¶
func (h *HandlerBuilder) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
func (*HandlerBuilder) OnStart ¶
func (h *HandlerBuilder) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
func (*HandlerBuilder) OnStartWithStreamInput ¶
func (h *HandlerBuilder) OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is a callback manager of one running node. Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead
func ManagerFromCtx
deprecated
func NewManager ¶
NewManager creates a callback manager. It will return a nil manager if no callback handler is provided, please check the return value first before using. Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead
func (Manager) OnEnd ¶
func (m Manager) OnEnd(ctx context.Context, output CallbackOutput) context.Context
func (Manager) OnEndWithStreamOutput ¶
func (m Manager) OnEndWithStreamOutput( ctx context.Context, output *schema.StreamReader[CallbackOutput]) context.Context
func (Manager) OnStart ¶
func (m Manager) OnStart(ctx context.Context, input CallbackInput) context.Context
func (Manager) OnStartWithStreamInput ¶
func (m Manager) OnStartWithStreamInput( ctx context.Context, input *schema.StreamReader[CallbackInput]) context.Context
func (*Manager) WithRunInfo
deprecated
type RunInfo ¶
type RunInfo struct {
Name string
Type string
Component components.Component
}
RunInfo is the info of run node.
type TimingChecker ¶
type TimingChecker interface {
Needed(ctx context.Context, info *RunInfo, timing CallbackTiming) bool
}
TimingChecker checks if the handler is needed for the given callback aspect timing. It's recommended for callback handlers to implement this interface, but not mandatory. If a callback handler is created by using callbacks.HandlerHelper or handlerBuilder, then this interface is automatically implemented. Eino's callback mechanism will try to use this interface to determine whether any handlers are needed for the given timing. Also, the callback handler that is not needed for that timing will be skipped.