Documentation
¶
Overview ¶
Package callbacks provides callback mechanisms for component execution in Eino.
This package allows you to inject callback handlers at different stages of component execution, such as start, end, and error handling. It's particularly useful for implementing governance capabilities like logging, monitoring, and metrics collection.
The package provides two ways to create callback handlers:
1. Create a callback handler using HandlerBuilder:
handler := callbacks.NewHandlerBuilder().
OnStart(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
// Handle component start
return ctx
}).
OnEnd(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
// Handle component end
return ctx
}).
OnError(func(ctx context.Context, info *RunInfo, err error) context.Context {
// Handle component error
return ctx
}).
OnStartWithStreamInput(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
// Handle component start with stream input
return ctx
}).
OnEndWithStreamOutput(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
// Handle component end with stream output
return ctx
}).
Build()
For this way, you need to convert the callback input types by yourself, and implement the logic for different component types in one handler.
2. Use [template.HandlerHelper] to create a handler:
Package utils/callbacks provides [HandlerHelper] as a convenient way to build callback handlers for different component types. It allows you to set specific handlers for each component type,
e.g.
// Create handlers for specific components
modelHandler := &model.CallbackHandler{
OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context {
log.Printf("Model execution started: %s", info.ComponentName)
return ctx
},
}
promptHandler := &prompt.CallbackHandler{
OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context {
log.Printf("Prompt execution completed: %s", output.Result)
return ctx
},
}
// Build the handler using HandlerHelper
handler := callbacks.NewHandlerHelper().
ChatModel(modelHandler).
Prompt(promptHandler).
Fallback(fallbackHandler).
Handler()
[HandlerHelper] supports handlers for various component types including:
- Prompt components (via prompt.CallbackHandler)
- Chat model components (via model.CallbackHandler)
- Embedding components (via embedding.CallbackHandler)
- Indexer components (via indexer.CallbackHandler)
- Retriever components (via retriever.CallbackHandler)
- Document loader components (via loader.CallbackHandler)
- Document transformer components (via transformer.CallbackHandler)
- Tool components (via tool.CallbackHandler)
- Graph (via Handler)
- Chain (via Handler)
- Tools node (via Handler)
- Lambda (via Handler)
Use the handler with a component:
runnable.Invoke(ctx, input, compose.WithCallbacks(handler))
Index ¶
- func AppendGlobalHandlers(handlers ...Handler)
- func EnsureRunInfo(ctx context.Context, typ string, comp components.Component) context.Context
- func InitCallbackHandlers(handlers []Handler)
- func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context
- func OnEnd[T any](ctx context.Context, output T) context.Context
- func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func OnError(ctx context.Context, err error) context.Context
- func OnStart[T any](ctx context.Context, input T) context.Context
- func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func ReuseHandlers(ctx context.Context, info *RunInfo) context.Context
- type CallbackInput
- type CallbackOutput
- type CallbackTiming
- type Handler
- type HandlerBuilder
- func (hb *HandlerBuilder) Build() Handler
- func (hb *HandlerBuilder) OnEndFn(...) *HandlerBuilder
- func (hb *HandlerBuilder) OnEndWithStreamOutputFn(fn func(ctx context.Context, info *RunInfo, ...) context.Context) *HandlerBuilder
- func (hb *HandlerBuilder) OnErrorFn(fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder
- func (hb *HandlerBuilder) OnStartFn(...) *HandlerBuilder
- func (hb *HandlerBuilder) OnStartWithStreamInputFn(...) *HandlerBuilder
- type RunInfo
- type TimingChecker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AppendGlobalHandlers ¶ added in v0.3.16
func AppendGlobalHandlers(handlers ...Handler)
AppendGlobalHandlers appends the given handlers to the global callback handlers. This is the preferred way to add global callback handlers as it preserves existing handlers. The global callback handlers will be executed for all nodes BEFORE user-specific handlers in CallOption. Note: This function is not thread-safe and should only be called during process initialization.
func EnsureRunInfo ¶ added in v0.3.27
EnsureRunInfo ensures the RunInfo in context matches the given type and component. If the current callback manager doesn't match or doesn't exist, it creates a new one while preserving existing handlers. Will initialize Global callback handlers if none exist in the ctx before.
func InitCallbackHandlers ¶
func InitCallbackHandlers(handlers []Handler)
InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes. Deprecated: Use AppendGlobalHandlers instead.
func InitCallbacks ¶
InitCallbacks initializes a new context with the provided RunInfo and handlers. Any previously set RunInfo and Handlers for this ctx will be overwritten.
func OnEnd ¶
OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup and finalization when a process ends. handlers are executed in normal order (compared to add order).
func OnEndWithStreamOutput ¶
func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that every input stream should be closed properly in handler. handlers are executed in normal order (compared to add order).
func OnError ¶
OnError invokes the OnError logic of the particular, notice that error in stream will not represent here. handlers are executed in normal order (compared to add order).
func OnStart ¶
OnStart invokes the OnStart logic for the particular context, ensuring that all registered handlers are executed in reverse order (compared to add order) when a process begins.
func OnStartWithStreamInput ¶
func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnStartWithStreamInput invokes the OnStartWithStreamInput logic of the particular context, ensuring that every input stream should be closed properly in handler. handlers are executed in reverse order (compared to add order).
Types ¶
type CallbackInput ¶
type CallbackInput = callbacks.CallbackInput
CallbackInput is the input of the callback. the type of input is defined by the component. using type Assert or convert func to convert the input to the right type you want. e.g.
CallbackInput in components/model/interface.go is:
type CallbackInput struct {
Messages []*schema.Message
Config *Config
Extra map[string]any
}
and provide a func of model.ConvCallbackInput() to convert CallbackInput to *model.CallbackInput
in callback handler, you can use the following code to get the input:
modelCallbackInput := model.ConvCallbackInput(in)
if modelCallbackInput == nil {
// is not a model callback input, just ignore it
return
}
type CallbackOutput ¶
type CallbackOutput = callbacks.CallbackOutput
type CallbackTiming ¶
type CallbackTiming = callbacks.CallbackTiming
CallbackTiming enumerates all the timing of callback aspects.
const ( TimingOnStart CallbackTiming = iota TimingOnEnd TimingOnError TimingOnStartWithStreamInput TimingOnEndWithStreamOutput )
type HandlerBuilder ¶
type HandlerBuilder struct {
// contains filtered or unexported fields
}
func NewHandlerBuilder ¶
func NewHandlerBuilder() *HandlerBuilder
NewHandlerBuilder creates and returns a new HandlerBuilder instance. HandlerBuilder is used to construct a Handler with custom callback functions
func (*HandlerBuilder) Build ¶ added in v0.3.3
func (hb *HandlerBuilder) Build() Handler
Build returns a Handler with the functions set in the builder.
func (*HandlerBuilder) OnEndFn ¶
func (hb *HandlerBuilder) OnEndFn( fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder
func (*HandlerBuilder) OnEndWithStreamOutputFn ¶
func (hb *HandlerBuilder) OnEndWithStreamOutputFn( fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder
OnEndWithStreamOutputFn sets the callback function to be called.
func (*HandlerBuilder) OnErrorFn ¶
func (hb *HandlerBuilder) OnErrorFn( fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder
func (*HandlerBuilder) OnStartFn ¶
func (hb *HandlerBuilder) OnStartFn( fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder
func (*HandlerBuilder) OnStartWithStreamInputFn ¶
func (hb *HandlerBuilder) OnStartWithStreamInputFn( fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder
OnStartWithStreamInputFn sets the callback function to be called.
type TimingChecker ¶
type TimingChecker = callbacks.TimingChecker
TimingChecker checks if the handler is needed for the given callback aspect timing. It's recommended for callback handlers to implement this interface, but not mandatory. If a callback handler is created by using callbacks.HandlerHelper or handlerBuilder, then this interface is automatically implemented. Eino's callback mechanism will try to use this interface to determine whether any handlers are needed for the given timing. Also, the callback handler that is not needed for that timing will be skipped.