callbacks

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 4 Imported by: 136

Documentation

Overview

Package callbacks provides callback mechanisms for component execution in Eino.

This package allows you to inject callback handlers at different stages of component execution, such as start, end, and error handling. It's particularly useful for implementing governance capabilities like logging, monitoring, and metrics collection.

The package provides two ways to create callback handlers:

1. Create a callback handler using HandlerBuilder:

handler := callbacks.NewHandlerBuilder().
	OnStart(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
		// Handle component start
		return ctx
	}).
	OnEnd(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
		// Handle component end
		return ctx
	}).
	OnError(func(ctx context.Context, info *RunInfo, err error) context.Context {
		// Handle component error
		return ctx
	}).
	OnStartWithStreamInput(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
		// Handle component start with stream input
		return ctx
	}).
	OnEndWithStreamOutput(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
		// Handle component end with stream output
		return ctx
	}).
	Build()

For this way, you need to convert the callback input types by yourself, and implement the logic for different component types in one handler.

2. Use [template.HandlerHelper] to create a handler:

Package template provides [template.HandlerHelper] as a convenient way to build callback handlers for different component types. It allows you to set specific handlers for each component type, and a fallback handler for unmatched components.

eg.

// Create handlers for specific components
modelHandler := &model.CallbackHandler{
	OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context {
		log.Printf("Model execution started: %s", info.ComponentName)
		return ctx
	},
}

promptHandler := &prompt.CallbackHandler{
	OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context {
		log.Printf("Prompt execution completed: %s", output.Result)
		return ctx
	},
}

// Create a fallback handler for unmatched components
fallbackHandler := &DefaultCallbackHandler{
	OnStart: func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
		log.Printf("Generic component started: %s", info.ComponentName)
		return ctx
	},
}

// Build the handler using HandlerHelper
handler := callbacks.NewHandlerHelper().
	ChatModel(modelHandler).
	Prompt(promptHandler).
	Fallback(fallbackHandler).
	Handler()

[HandlerHelper] supports handlers for various component types including:

  • Prompt components (via prompt.CallbackHandler)
  • Chat model components (via model.CallbackHandler)
  • Embedding components (via embedding.CallbackHandler)
  • Indexer components (via indexer.CallbackHandler)
  • Retriever components (via retriever.CallbackHandler)
  • Document loader components (via loader.CallbackHandler)
  • Document transformer components (via transformer.CallbackHandler)
  • Tool components (via tool.CallbackHandler)
  • Graph (via template.DefaultCallbackHandler)
  • State graph (via template.DefaultCallbackHandler)
  • Chain (via template.DefaultCallbackHandler)
  • Passthrough (via template.DefaultCallbackHandler)
  • Tools node (via template.DefaultCallbackHandler)
  • Lambda (via template.DefaultCallbackHandler)

Use the handler with a component:

runnable.Invoke(ctx, input, compose.WithCallbacks(handler))

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CtxWithManager deprecated

func CtxWithManager(ctx context.Context, manager *Manager) context.Context

Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead

func InitCallbackHandlers

func InitCallbackHandlers(handlers []Handler)

InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes.

func InitCallbacks

func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context

InitCallbacks initializes a new context with the provided RunInfo and handlers. If successful, it returns a new context containing RunInfo and handlers; otherwise, it returns a context with a nil manager.

func Needed

func Needed(ctx context.Context) bool

Needed checks if any callback handlers exist in this context.

func NeededForTiming

func NeededForTiming(ctx context.Context, timing CallbackTiming) bool

NeededForTiming checks if any callback handlers exist in this context that are needed for this specific timing.

func NewHandlerBuilder

func NewHandlerBuilder() *handlerBuilder

func OnEnd

func OnEnd(ctx context.Context, output CallbackOutput) context.Context

OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup and finalization when a process ends. handlers are executed in normal order (compared to add order).

func OnEndWithStreamOutput

func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (
	nextCtx context.Context, newStreamReader *schema.StreamReader[T])

OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that every input stream should be closed properly in handler. handlers are executed in normal order (compared to add order).

func OnError

func OnError(ctx context.Context, err error) context.Context

OnError invokes the OnError logic of the particular, notice that error in stream will not represent here. handlers are executed in normal order (compared to add order).

func OnStart

func OnStart(ctx context.Context, input CallbackInput) context.Context

Fast inject callback input / output aspect for component developer e.g.

func (t *testchatmodel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
	defer func() {
		if err != nil {
			callbacks.OnEnd(ctx, err)
		}
	}()

	ctx = callbacks.OnStart(ctx, &model.CallbackInput{
		Messages: input,
		Tools:    nil,
		Extra:    nil,
	})

	// do smt

	ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
		Message: resp,
		Extra:   nil,
	})

	return resp, nil
}

OnStart invokes the OnStart logic for the particular context, ensuring that all registered handlers are executed in reverse order (compared to add order) when a process begins.

func OnStartWithStreamInput

func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (
	nextCtx context.Context, newStreamReader *schema.StreamReader[T])

OnStartWithStreamInput invokes the OnStartWithStreamInput logic of the particular context, ensuring that every input stream should be closed properly in handler. handlers are executed in reverse order (compared to add order).

func SwitchRunInfo

func SwitchRunInfo(ctx context.Context, info *RunInfo) context.Context

SwitchRunInfo updates the RunInfo in the context if a previous RunInfo already exists for that context.

Types

type CallbackInput

type CallbackInput any

CallbackInput is the input of the callback. the type of input is defined by the component. using type Assert or convert func to convert the input to the right type you want. e.g.

	CallbackInput in components/model/interface.go is:
	type CallbackInput struct {
		Messages []*schema.Message
		Config   *Config
		Extra map[string]any
	}

 and provide a func of model.ConvCallbackInput() to convert CallbackInput to *model.CallbackInput
 in callback handler, you can use the following code to get the input:

	modelCallbackInput := model.ConvCallbackInput(in)
	if modelCallbackInput == nil {
		// is not a model callback input, just ignore it
		return
	}

type CallbackOutput

type CallbackOutput any

type CallbackTiming

type CallbackTiming uint8

CallbackTiming enumerates all the timing of callback aspects.

const (
	TimingOnStart CallbackTiming = iota
	TimingOnEnd
	TimingOnError
	TimingOnStartWithStreamInput
	TimingOnEndWithStreamOutput
)

type Handler

type Handler interface {
	OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
	OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context

	OnError(ctx context.Context, info *RunInfo, err error) context.Context

	OnStartWithStreamInput(ctx context.Context, info *RunInfo,
		input *schema.StreamReader[CallbackInput]) context.Context
	OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
		output *schema.StreamReader[CallbackOutput]) context.Context
}

func GetGlobalHandlers

func GetGlobalHandlers() []Handler

type HandlerBuilder deprecated

type HandlerBuilder struct {
	OnStartFn                func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
	OnEndFn                  func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
	OnErrorFn                func(ctx context.Context, info *RunInfo, err error) context.Context
	OnStartWithStreamInputFn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
	OnEndWithStreamOutputFn  func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}

HandlerBuilder can be used to build a Handler with callback functions. e.g.

handler := &HandlerBuilder{
	OnStartFn: func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {} // self defined start callback function
}

graph := compose.NewGraph[inputType, outputType]()
runnable, err := graph.Compile()
if err != nil {...}
runnable.Invoke(ctx, params, compose.WithCallback(handler)) // => only implement functions which you want to override

Deprecated: In most situations, it is preferred to use template.NewHandlerHelper. Otherwise, use NewHandlerBuilder().OnStartFn()...Build().

func (*HandlerBuilder) OnEnd

func (h *HandlerBuilder) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context

func (*HandlerBuilder) OnEndWithStreamOutput

func (h *HandlerBuilder) OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context

func (*HandlerBuilder) OnError

func (h *HandlerBuilder) OnError(ctx context.Context, info *RunInfo, err error) context.Context

func (*HandlerBuilder) OnStart

func (h *HandlerBuilder) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context

func (*HandlerBuilder) OnStartWithStreamInput

func (h *HandlerBuilder) OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is a callback manager of one running node. Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead

func ManagerFromCtx deprecated

func ManagerFromCtx(ctx context.Context) (*Manager, bool)

Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead

func NewManager

func NewManager(runInfo *RunInfo, handlers ...Handler) (*Manager, bool)

NewManager creates a callback manager. It will return a nil manager if no callback handler is provided, please check the return value first before using. Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead

func (Manager) Handlers

func (m Manager) Handlers() []Handler

func (Manager) OnEnd

func (m Manager) OnEnd(ctx context.Context, output CallbackOutput) context.Context

func (Manager) OnEndWithStreamOutput

func (m Manager) OnEndWithStreamOutput(
	ctx context.Context, output *schema.StreamReader[CallbackOutput]) context.Context

func (Manager) OnError

func (m Manager) OnError(ctx context.Context, err error) context.Context

func (Manager) OnStart

func (m Manager) OnStart(ctx context.Context, input CallbackInput) context.Context

func (Manager) OnStartWithStreamInput

func (m Manager) OnStartWithStreamInput(
	ctx context.Context, input *schema.StreamReader[CallbackInput]) context.Context

func (*Manager) WithRunInfo deprecated

func (mm *Manager) WithRunInfo(runInfo *RunInfo) *Manager

Deprecated: Manager will become the inner conception, use methods in aspect_inject.go instead

type RunInfo

type RunInfo struct {
	Name      string
	Type      string
	Component components.Component
}

RunInfo is the info of run node.

type TimingChecker

type TimingChecker interface {
	Needed(ctx context.Context, info *RunInfo, timing CallbackTiming) bool
}

TimingChecker checks if the handler is needed for the given callback aspect timing. It's recommended for callback handlers to implement this interface, but not mandatory. If a callback handler is created by using template.HandlerHelper or handlerBuilder, then this interface is automatically implemented. Eino's callback mechanism will try to use this interface to determine whether any handlers are needed for the given timing. Also, the callback handler that is not needed for that timing will be skipped.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL