callbacks

package
v0.9.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: Apache-2.0 Imports: 4 Imported by: 186

Documentation

Overview

Package callbacks provides observability hooks for component execution in Eino.

Callbacks fire at five lifecycle timings around every component invocation:

Attaching Handlers

Global handlers (observe every node in every graph):

callbacks.AppendGlobalHandlers(myHandler) // call once, at startup — NOT thread-safe

Per-invocation handlers (observe one graph run):

runnable.Invoke(ctx, input, compose.WithCallbacks(myHandler))

Target a specific node:

compose.WithCallbacks(myHandler).DesignateNode("nodeName")

Handler inheritance: if the context passed to a graph run already carries handlers (e.g. from a parent graph), those handlers are inherited by the entire child run automatically.

Building Handlers

Option 1 — NewHandlerBuilder: register raw functions for the timings you need. Input/output are untyped; use the component package's ConvCallbackInput helper to cast to a concrete type:

handler := callbacks.NewHandlerBuilder().
	OnStartFn(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
		// Handle component start
		return ctx
	}).
	OnEndFn(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
		// Handle component end
		return ctx
	}).
	OnErrorFn(func(ctx context.Context, info *RunInfo, err error) context.Context {
		// Handle component error
		return ctx
	}).
	OnStartWithStreamInputFn(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
		defer input.Close() // MUST close — failure causes pipeline goroutine leak
		return ctx
	}).
	OnEndWithStreamOutputFn(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
		defer output.Close() // MUST close
		return ctx
	}).
	Build()

Option 2 — utils/callbacks.NewHandlerHelper: dispatches by component type, so each handler function receives the concrete typed input/output directly:

handler := callbacks.NewHandlerHelper().
	ChatModel(&model.CallbackHandler{
		OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context {
			log.Printf("Model started: %s, messages: %d", info.Name, len(input.Messages))
			return ctx
		},
	}).
	Prompt(&prompt.CallbackHandler{
		OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context {
			log.Printf("Prompt completed")
			return ctx
		},
	}).
	Handler()

Passing State Within a Handler

The ctx returned by one timing is passed to the next timing of the SAME handler, enabling OnStart→OnEnd state transfer via context.WithValue:

NewHandlerBuilder().
	OnStartFn(func(ctx context.Context, info *RunInfo, _ CallbackInput) context.Context {
		return context.WithValue(ctx, startTimeKey{}, time.Now())
	}).
	OnEndFn(func(ctx context.Context, info *RunInfo, _ CallbackOutput) context.Context {
		start := ctx.Value(startTimeKey{}).(time.Time)
		log.Printf("duration: %v", time.Since(start))
		return ctx
	}).Build()

Between DIFFERENT handlers there is no guaranteed execution order and no context chain. To share state between handlers, store it in a concurrency-safe variable in the outermost context instead.

Common Pitfalls

  • Stream copies must be closed: when N handlers register for a streaming timing, the stream is copied N+1 times (one per handler + one for downstream). If any handler's copy is not closed, the original stream cannot be freed and the entire pipeline leaks.

  • Do NOT mutate Input/Output: all downstream nodes and handlers share the same pointer. Mutations cause data races in concurrent graph execution.

  • AppendGlobalHandlers is NOT thread-safe: call only during initialization, never concurrently with graph execution.

  • Stream errors are invisible to OnError: errors that occur while a consumer reads from a StreamReader are not routed through OnError.

  • RunInfo may be nil: always nil-check before dereferencing in handlers, especially when a component is used standalone outside a graph without InitCallbacks being called.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppendGlobalHandlers added in v0.3.16

func AppendGlobalHandlers(handlers ...Handler)

AppendGlobalHandlers appends handlers to the process-wide list of callback handlers. Global handlers run before per-invocation handlers provided via compose.WithCallbacks, giving them higher priority for instrumentation that must observe every component invocation (e.g. distributed tracing, metrics).

This function is NOT thread-safe. Call it once during program initialization (e.g. in main or TestMain), before any graph executions begin. Calling it concurrently with ongoing graph executions leads to data races.

func EnsureRunInfo added in v0.3.27

func EnsureRunInfo(ctx context.Context, typ string, comp components.Component) context.Context

EnsureRunInfo ensures the context carries a RunInfo for the given type and component kind. If the context already has a matching RunInfo, it is returned unchanged. Otherwise, a new callback manager is created that inherits the global handlers plus any handlers already in ctx.

Component implementations that set IsCallbacksEnabled() = true should call this at the start of every public method (Generate, Stream, etc.) before calling OnStart, so that the RunInfo is never missing from callbacks.

func InitCallbackHandlers

func InitCallbackHandlers(handlers []Handler)

InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes. Deprecated: Use AppendGlobalHandlers instead.

func InitCallbacks

func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context

InitCallbacks creates a new context with the given RunInfo and handlers, completely replacing any RunInfo and handlers already in ctx.

Use this when running a component standalone outside a Graph — the Graph normally manages RunInfo injection automatically, but standalone callers must set it up themselves:

ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{
    Type:      myModel.GetType(),
    Component: components.ComponentOfChatModel,
    Name:      "my-model",
}, myHandler)

func OnEnd

func OnEnd[T any](ctx context.Context, output T) context.Context

OnEnd invokes the OnEnd timing for all registered handlers. Call this after the component produces a successful result. Handlers run in registration order (first registered = first called).

Do not call both OnEnd and OnError for the same invocation — OnEnd signals success; OnError signals failure.

func OnEndWithStreamOutput

func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (
	nextCtx context.Context, newStreamReader *schema.StreamReader[T])

OnEndWithStreamOutput invokes the OnEndWithStreamOutput timing. Use this when the component produces a streaming output (Stream / Transform paradigms). Like OnStartWithStreamInput, stream copies are made per handler; each handler must close its copy.

Returns the updated context and the StreamReader the component should return to its caller.

func OnError

func OnError(ctx context.Context, err error) context.Context

OnError invokes the OnError timing for all registered handlers. Call this when the component returns an error. Errors that occur mid-stream (after the StreamReader has been returned) are NOT routed through OnError; they surface as errors inside Recv.

Handlers run in registration order (same as OnEnd).

func OnStart

func OnStart[T any](ctx context.Context, input T) context.Context

OnStart invokes the OnStart timing for all registered handlers in the context. This is called by component implementations that manage their own callbacks (i.e. implement components.Checker and return true from IsCallbacksEnabled). The returned context must be propagated to subsequent OnEnd/OnError calls so handlers can correlate start and end events.

Handlers are invoked in reverse registration order (last registered = first called) to match the middleware wrapping convention.

Example — typical usage inside a component's Generate method:

func (m *myChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.Message, error) {
    ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input})
    resp, err := m.doGenerate(ctx, input, opts...)
    if err != nil {
        callbacks.OnError(ctx, err)
        return nil, err
    }
    callbacks.OnEnd(ctx, &model.CallbackOutput{Message: resp})
    return resp, nil
}

func OnStartWithStreamInput

func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (
	nextCtx context.Context, newStreamReader *schema.StreamReader[T])

OnStartWithStreamInput invokes the OnStartWithStreamInput timing. Use this when the component's input is itself a stream (Collect / Transform paradigms). The framework automatically copies the stream so each handler receives an independent reader; handlers MUST close their copy or the underlying goroutine will leak.

Returns the updated context and a new StreamReader that the component should use going forward (the original is consumed by the framework).

func ReuseHandlers added in v0.3.4

func ReuseHandlers(ctx context.Context, info *RunInfo) context.Context

ReuseHandlers creates a new context that inherits all handlers already present in ctx and sets a new RunInfo. Global handlers are added if ctx carries none yet.

Use this when a component calls another component internally and wants the inner component's callbacks to share the same set of handlers as the outer component, but with the inner component's own identity in RunInfo:

innerCtx := callbacks.ReuseHandlers(ctx, &callbacks.RunInfo{
    Type:      "InnerChatModel",
    Component: components.ComponentOfChatModel,
    Name:      "inner-cm",
})

Types

type CallbackInput

type CallbackInput = callbacks.CallbackInput

CallbackInput is the value passed to OnStart and OnStartWithStreamInput handlers. The concrete type is defined by the component — for example, ChatModel callbacks carry *model.CallbackInput. Use the component package's ConvCallbackInput helper (e.g. model.ConvCallbackInput) to cast safely; it returns nil if the type does not match, so you can ignore irrelevant component types:

modelInput := model.ConvCallbackInput(in)
if modelInput == nil {
    return ctx // not a model invocation, skip
}
log.Printf("prompt: %v", modelInput.Messages)

type CallbackOutput

type CallbackOutput = callbacks.CallbackOutput

CallbackOutput is the value passed to OnEnd and OnEndWithStreamOutput handlers. Like CallbackInput, the concrete type is component-defined. Use the component package's ConvCallbackOutput helper to cast safely.

type CallbackTiming

type CallbackTiming = callbacks.CallbackTiming

CallbackTiming enumerates the lifecycle moments at which a callback handler is invoked. Implement TimingChecker on your handler and return false for timings you do not handle, so the framework skips the overhead of stream copying and goroutine spawning for those timings.

const (
	// TimingOnStart fires just before the component begins processing.
	// Receives a fully-formed input value (non-streaming).
	TimingOnStart CallbackTiming = iota
	// TimingOnEnd fires after the component returns a result successfully.
	// Receives the output value. Only fires on success — not on error.
	TimingOnEnd
	// TimingOnError fires when the component returns a non-nil error.
	// Stream errors (mid-stream panics) are NOT reported here; they surface
	// as errors inside the stream reader.
	TimingOnError
	// TimingOnStartWithStreamInput fires when the component receives a
	// streaming input (Collect / Transform paradigms). The handler receives a
	// copy of the input stream and must close it after reading.
	TimingOnStartWithStreamInput
	// TimingOnEndWithStreamOutput fires after the component returns a
	// streaming output (Stream / Transform paradigms). The handler receives a
	// copy of the output stream and must close it after reading. This is
	// typically where you implement streaming metrics or logging.
	TimingOnEndWithStreamOutput
)

Callback timing constants.

type Handler

type Handler = callbacks.Handler

Handler is the unified callback handler interface. Implement all five methods (OnStart, OnEnd, OnError, OnStartWithStreamInput, OnEndWithStreamOutput) or use NewHandlerBuilder to set only the timings you care about.

Each method receives the context returned by the previous timing of the SAME handler, which lets a single handler pass state between its OnStart and OnEnd calls via context.WithValue. There is NO guaranteed execution order between DIFFERENT handlers, and the context chain does not flow from one handler to the next — do not rely on handler ordering.

Implement TimingChecker (the Needed method) on your handler so the framework can skip timings you have not registered; this avoids unnecessary stream copies and goroutine allocations on every component invocation.

Stream handlers (OnStartWithStreamInput, OnEndWithStreamOutput) receive a *schema.StreamReader that has already been copied; they MUST close their copy after reading. If any handler's copy is not closed, the original stream cannot be freed, causing a goroutine/memory leak for the entire pipeline.

Important: do NOT mutate the Input or Output values. All downstream nodes and handlers share the same pointer (direct assignment, not a deep copy). Mutations cause data races in concurrent graph execution.

type HandlerBuilder

type HandlerBuilder struct {
	// contains filtered or unexported fields
}

HandlerBuilder constructs a Handler by registering callback functions for individual timings. Only set the timings you care about; the built handler implements TimingChecker and returns false for unregistered timings, so the framework skips those timings with no overhead.

The input/output values are untyped (CallbackInput / CallbackOutput). To work with a specific component's payload, use the component package's ConvCallbackInput / ConvCallbackOutput helpers inside your function. For a higher-level API that dispatches by component type automatically, see utils/callbacks.NewHandlerHelper.

Example:

handler := callbacks.NewHandlerBuilder().
    OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
        mi := model.ConvCallbackInput(input)
        if mi != nil {
            log.Printf("[%s] model start: %d messages", info.Name, len(mi.Messages))
        }
        return ctx
    }).
    OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
        mo := model.ConvCallbackOutput(output)
        if mo != nil && mo.Message.ResponseMeta != nil {
            log.Printf("[%s] tokens: %d", info.Name, mo.Message.ResponseMeta.Usage.TotalTokens)
        }
        return ctx
    }).
    Build()

func NewHandlerBuilder

func NewHandlerBuilder() *HandlerBuilder

NewHandlerBuilder creates and returns a new HandlerBuilder instance. HandlerBuilder is used to construct a Handler with custom callback functions

func (*HandlerBuilder) Build added in v0.3.3

func (hb *HandlerBuilder) Build() Handler

Build returns a Handler with the functions set in the builder.

func (*HandlerBuilder) OnEndFn

func (hb *HandlerBuilder) OnEndFn(
	fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder

OnEndFn sets the handler for the end timing.

func (*HandlerBuilder) OnEndWithStreamOutputFn

func (hb *HandlerBuilder) OnEndWithStreamOutputFn(
	fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder

OnEndWithStreamOutputFn sets the callback invoked when a component produces streaming output. Like OnStartWithStreamInputFn, the handler receives a private copy of the stream and MUST close it after reading to prevent goroutine and memory leaks. This is the right place to implement streaming token-usage accounting or streaming log capture.

func (*HandlerBuilder) OnErrorFn

func (hb *HandlerBuilder) OnErrorFn(
	fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder

OnErrorFn sets the handler for the error timing.

func (*HandlerBuilder) OnStartFn

func (hb *HandlerBuilder) OnStartFn(
	fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder

OnStartFn sets the handler for the start timing.

func (*HandlerBuilder) OnStartWithStreamInputFn

func (hb *HandlerBuilder) OnStartWithStreamInputFn(
	fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder

OnStartWithStreamInputFn sets the callback invoked when a component receives streaming input. The handler receives a *schema.StreamReader that is a private copy; it MUST close the reader after consuming it to avoid goroutine and memory leaks.

type RunInfo

type RunInfo = callbacks.RunInfo

RunInfo describes the entity that triggered a callback. Always nil-check before dereferencing — a component that calls OnStart without first calling EnsureRunInfo or InitCallbacks will leave RunInfo absent in the context.

Fields:

  • Name: business-meaningful name specified by the user. For nodes in a graph this is the node name (compose.WithNodeName). For standalone components it must be set explicitly via InitCallbacks or ReuseHandlers; it is empty string if not set.
  • Type: implementation identity, e.g. "OpenAI". Set by the component via components.Typer; falls back to reflection (struct/func name) if the interface is not implemented. Empty for Graph itself.
  • Component: category constant, e.g. components.ComponentOfChatModel. Fixed value "Lambda" for lambdas, "Graph"/"Chain"/"Workflow" for graphs. Use this to branch on component kind without caring about implementation.

Handlers should filter using RunInfo rather than assuming a fixed execution order — there is no guaranteed ordering between different Handlers.

type TimingChecker

type TimingChecker = callbacks.TimingChecker

TimingChecker is an optional interface for Handler implementations. When a handler implements Needed, the framework calls it before each component invocation to decide whether to set up callback infrastructure (stream copying, goroutine allocation) for that timing. Returning false avoids unnecessary overhead.

Handlers built with NewHandlerBuilder or utils/callbacks.NewHandlerHelper automatically implement TimingChecker based on which callback functions were set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL