Documentation
¶
Overview ¶
Package callbacks provides observability hooks for component execution in Eino.
Callbacks fire at five lifecycle timings around every component invocation:
- TimingOnStart / TimingOnEnd: non-streaming input and output.
- TimingOnStartWithStreamInput / TimingOnEndWithStreamOutput: streaming variants — handlers receive a copy of the stream and MUST close it.
- TimingOnError: component returned a non-nil error (stream-internal errors are NOT reported here).
Attaching Handlers ¶
Global handlers (observe every node in every graph):
callbacks.AppendGlobalHandlers(myHandler) // call once, at startup — NOT thread-safe
Per-invocation handlers (observe one graph run):
runnable.Invoke(ctx, input, compose.WithCallbacks(myHandler))
Target a specific node:
compose.WithCallbacks(myHandler).DesignateNode("nodeName")
Handler inheritance: if the context passed to a graph run already carries handlers (e.g. from a parent graph), those handlers are inherited by the entire child run automatically.
Building Handlers ¶
Option 1 — NewHandlerBuilder: register raw functions for the timings you need. Input/output are untyped; use the component package's ConvCallbackInput helper to cast to a concrete type:
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
// Handle component start
return ctx
}).
OnEndFn(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
// Handle component end
return ctx
}).
OnErrorFn(func(ctx context.Context, info *RunInfo, err error) context.Context {
// Handle component error
return ctx
}).
OnStartWithStreamInputFn(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
defer input.Close() // MUST close — failure causes pipeline goroutine leak
return ctx
}).
OnEndWithStreamOutputFn(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
defer output.Close() // MUST close
return ctx
}).
Build()
Option 2 — utils/callbacks.NewHandlerHelper: dispatches by component type, so each handler function receives the concrete typed input/output directly:
handler := callbacks.NewHandlerHelper().
ChatModel(&model.CallbackHandler{
OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context {
log.Printf("Model started: %s, messages: %d", info.Name, len(input.Messages))
return ctx
},
}).
Prompt(&prompt.CallbackHandler{
OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context {
log.Printf("Prompt completed")
return ctx
},
}).
Handler()
Passing State Within a Handler ¶
The ctx returned by one timing is passed to the next timing of the SAME handler, enabling OnStart→OnEnd state transfer via context.WithValue:
NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *RunInfo, _ CallbackInput) context.Context {
return context.WithValue(ctx, startTimeKey{}, time.Now())
}).
OnEndFn(func(ctx context.Context, info *RunInfo, _ CallbackOutput) context.Context {
start := ctx.Value(startTimeKey{}).(time.Time)
log.Printf("duration: %v", time.Since(start))
return ctx
}).Build()
Between DIFFERENT handlers there is no guaranteed execution order and no context chain. To share state between handlers, store it in a concurrency-safe variable in the outermost context instead.
Common Pitfalls ¶
Stream copies must be closed: when N handlers register for a streaming timing, the stream is copied N+1 times (one per handler + one for downstream). If any handler's copy is not closed, the original stream cannot be freed and the entire pipeline leaks.
Do NOT mutate Input/Output: all downstream nodes and handlers share the same pointer. Mutations cause data races in concurrent graph execution.
AppendGlobalHandlers is NOT thread-safe: call only during initialization, never concurrently with graph execution.
Stream errors are invisible to OnError: errors that occur while a consumer reads from a StreamReader are not routed through OnError.
RunInfo may be nil: always nil-check before dereferencing in handlers, especially when a component is used standalone outside a graph without InitCallbacks being called.
Index ¶
- func AppendGlobalHandlers(handlers ...Handler)
- func EnsureRunInfo(ctx context.Context, typ string, comp components.Component) context.Context
- func InitCallbackHandlers(handlers []Handler)
- func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context
- func OnEnd[T any](ctx context.Context, output T) context.Context
- func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func OnError(ctx context.Context, err error) context.Context
- func OnStart[T any](ctx context.Context, input T) context.Context
- func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func ReuseHandlers(ctx context.Context, info *RunInfo) context.Context
- type CallbackInput
- type CallbackOutput
- type CallbackTiming
- type Handler
- type HandlerBuilder
- func (hb *HandlerBuilder) Build() Handler
- func (hb *HandlerBuilder) OnEndFn(...) *HandlerBuilder
- func (hb *HandlerBuilder) OnEndWithStreamOutputFn(fn func(ctx context.Context, info *RunInfo, ...) context.Context) *HandlerBuilder
- func (hb *HandlerBuilder) OnErrorFn(fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder
- func (hb *HandlerBuilder) OnStartFn(...) *HandlerBuilder
- func (hb *HandlerBuilder) OnStartWithStreamInputFn(...) *HandlerBuilder
- type RunInfo
- type TimingChecker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AppendGlobalHandlers ¶ added in v0.3.16
func AppendGlobalHandlers(handlers ...Handler)
AppendGlobalHandlers appends handlers to the process-wide list of callback handlers. Global handlers run before per-invocation handlers provided via compose.WithCallbacks, giving them higher priority for instrumentation that must observe every component invocation (e.g. distributed tracing, metrics).
This function is NOT thread-safe. Call it once during program initialization (e.g. in main or TestMain), before any graph executions begin. Calling it concurrently with ongoing graph executions leads to data races.
func EnsureRunInfo ¶ added in v0.3.27
EnsureRunInfo ensures the context carries a RunInfo for the given type and component kind. If the context already has a matching RunInfo, it is returned unchanged. Otherwise, a new callback manager is created that inherits the global handlers plus any handlers already in ctx.
Component implementations that set IsCallbacksEnabled() = true should call this at the start of every public method (Generate, Stream, etc.) before calling OnStart, so that the RunInfo is never missing from callbacks.
func InitCallbackHandlers ¶
func InitCallbackHandlers(handlers []Handler)
InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes. Deprecated: Use AppendGlobalHandlers instead.
func InitCallbacks ¶
InitCallbacks creates a new context with the given RunInfo and handlers, completely replacing any RunInfo and handlers already in ctx.
Use this when running a component standalone outside a Graph — the Graph normally manages RunInfo injection automatically, but standalone callers must set it up themselves:
ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{
Type: myModel.GetType(),
Component: components.ComponentOfChatModel,
Name: "my-model",
}, myHandler)
func OnEnd ¶
OnEnd invokes the OnEnd timing for all registered handlers. Call this after the component produces a successful result. Handlers run in registration order (first registered = first called).
Do not call both OnEnd and OnError for the same invocation — OnEnd signals success; OnError signals failure.
func OnEndWithStreamOutput ¶
func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnEndWithStreamOutput invokes the OnEndWithStreamOutput timing. Use this when the component produces a streaming output (Stream / Transform paradigms). Like OnStartWithStreamInput, stream copies are made per handler; each handler must close its copy.
Returns the updated context and the StreamReader the component should return to its caller.
func OnError ¶
OnError invokes the OnError timing for all registered handlers. Call this when the component returns an error. Errors that occur mid-stream (after the StreamReader has been returned) are NOT routed through OnError; they surface as errors inside Recv.
Handlers run in registration order (same as OnEnd).
func OnStart ¶
OnStart invokes the OnStart timing for all registered handlers in the context. This is called by component implementations that manage their own callbacks (i.e. implement components.Checker and return true from IsCallbacksEnabled). The returned context must be propagated to subsequent OnEnd/OnError calls so handlers can correlate start and end events.
Handlers are invoked in reverse registration order (last registered = first called) to match the middleware wrapping convention.
Example — typical usage inside a component's Generate method:
func (m *myChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.Message, error) {
ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input})
resp, err := m.doGenerate(ctx, input, opts...)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
callbacks.OnEnd(ctx, &model.CallbackOutput{Message: resp})
return resp, nil
}
func OnStartWithStreamInput ¶
func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnStartWithStreamInput invokes the OnStartWithStreamInput timing. Use this when the component's input is itself a stream (Collect / Transform paradigms). The framework automatically copies the stream so each handler receives an independent reader; handlers MUST close their copy or the underlying goroutine will leak.
Returns the updated context and a new StreamReader that the component should use going forward (the original is consumed by the framework).
func ReuseHandlers ¶ added in v0.3.4
ReuseHandlers creates a new context that inherits all handlers already present in ctx and sets a new RunInfo. Global handlers are added if ctx carries none yet.
Use this when a component calls another component internally and wants the inner component's callbacks to share the same set of handlers as the outer component, but with the inner component's own identity in RunInfo:
innerCtx := callbacks.ReuseHandlers(ctx, &callbacks.RunInfo{
Type: "InnerChatModel",
Component: components.ComponentOfChatModel,
Name: "inner-cm",
})
Types ¶
type CallbackInput ¶
type CallbackInput = callbacks.CallbackInput
CallbackInput is the value passed to OnStart and OnStartWithStreamInput handlers. The concrete type is defined by the component — for example, ChatModel callbacks carry *model.CallbackInput. Use the component package's ConvCallbackInput helper (e.g. model.ConvCallbackInput) to cast safely; it returns nil if the type does not match, so you can ignore irrelevant component types:
modelInput := model.ConvCallbackInput(in)
if modelInput == nil {
return ctx // not a model invocation, skip
}
log.Printf("prompt: %v", modelInput.Messages)
type CallbackOutput ¶
type CallbackOutput = callbacks.CallbackOutput
CallbackOutput is the value passed to OnEnd and OnEndWithStreamOutput handlers. Like CallbackInput, the concrete type is component-defined. Use the component package's ConvCallbackOutput helper to cast safely.
type CallbackTiming ¶
type CallbackTiming = callbacks.CallbackTiming
CallbackTiming enumerates the lifecycle moments at which a callback handler is invoked. Implement TimingChecker on your handler and return false for timings you do not handle, so the framework skips the overhead of stream copying and goroutine spawning for those timings.
const ( // TimingOnStart fires just before the component begins processing. // Receives a fully-formed input value (non-streaming). TimingOnStart CallbackTiming = iota // TimingOnEnd fires after the component returns a result successfully. // Receives the output value. Only fires on success — not on error. TimingOnEnd // TimingOnError fires when the component returns a non-nil error. // Stream errors (mid-stream panics) are NOT reported here; they surface // as errors inside the stream reader. TimingOnError // TimingOnStartWithStreamInput fires when the component receives a // streaming input (Collect / Transform paradigms). The handler receives a // copy of the input stream and must close it after reading. TimingOnStartWithStreamInput // TimingOnEndWithStreamOutput fires after the component returns a // streaming output (Stream / Transform paradigms). The handler receives a // copy of the output stream and must close it after reading. This is // typically where you implement streaming metrics or logging. TimingOnEndWithStreamOutput )
Callback timing constants.
type Handler ¶
Handler is the unified callback handler interface. Implement all five methods (OnStart, OnEnd, OnError, OnStartWithStreamInput, OnEndWithStreamOutput) or use NewHandlerBuilder to set only the timings you care about.
Each method receives the context returned by the previous timing of the SAME handler, which lets a single handler pass state between its OnStart and OnEnd calls via context.WithValue. There is NO guaranteed execution order between DIFFERENT handlers, and the context chain does not flow from one handler to the next — do not rely on handler ordering.
Implement TimingChecker (the Needed method) on your handler so the framework can skip timings you have not registered; this avoids unnecessary stream copies and goroutine allocations on every component invocation.
Stream handlers (OnStartWithStreamInput, OnEndWithStreamOutput) receive a *schema.StreamReader that has already been copied; they MUST close their copy after reading. If any handler's copy is not closed, the original stream cannot be freed, causing a goroutine/memory leak for the entire pipeline.
Important: do NOT mutate the Input or Output values. All downstream nodes and handlers share the same pointer (direct assignment, not a deep copy). Mutations cause data races in concurrent graph execution.
type HandlerBuilder ¶
type HandlerBuilder struct {
// contains filtered or unexported fields
}
HandlerBuilder constructs a Handler by registering callback functions for individual timings. Only set the timings you care about; the built handler implements TimingChecker and returns false for unregistered timings, so the framework skips those timings with no overhead.
The input/output values are untyped (CallbackInput / CallbackOutput). To work with a specific component's payload, use the component package's ConvCallbackInput / ConvCallbackOutput helpers inside your function. For a higher-level API that dispatches by component type automatically, see utils/callbacks.NewHandlerHelper.
Example:
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
mi := model.ConvCallbackInput(input)
if mi != nil {
log.Printf("[%s] model start: %d messages", info.Name, len(mi.Messages))
}
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
mo := model.ConvCallbackOutput(output)
if mo != nil && mo.Message.ResponseMeta != nil {
log.Printf("[%s] tokens: %d", info.Name, mo.Message.ResponseMeta.Usage.TotalTokens)
}
return ctx
}).
Build()
func NewHandlerBuilder ¶
func NewHandlerBuilder() *HandlerBuilder
NewHandlerBuilder creates and returns a new HandlerBuilder instance. HandlerBuilder is used to construct a Handler with custom callback functions
func (*HandlerBuilder) Build ¶ added in v0.3.3
func (hb *HandlerBuilder) Build() Handler
Build returns a Handler with the functions set in the builder.
func (*HandlerBuilder) OnEndFn ¶
func (hb *HandlerBuilder) OnEndFn( fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder
OnEndFn sets the handler for the end timing.
func (*HandlerBuilder) OnEndWithStreamOutputFn ¶
func (hb *HandlerBuilder) OnEndWithStreamOutputFn( fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder
OnEndWithStreamOutputFn sets the callback invoked when a component produces streaming output. Like OnStartWithStreamInputFn, the handler receives a private copy of the stream and MUST close it after reading to prevent goroutine and memory leaks. This is the right place to implement streaming token-usage accounting or streaming log capture.
func (*HandlerBuilder) OnErrorFn ¶
func (hb *HandlerBuilder) OnErrorFn( fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder
OnErrorFn sets the handler for the error timing.
func (*HandlerBuilder) OnStartFn ¶
func (hb *HandlerBuilder) OnStartFn( fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder
OnStartFn sets the handler for the start timing.
func (*HandlerBuilder) OnStartWithStreamInputFn ¶
func (hb *HandlerBuilder) OnStartWithStreamInputFn( fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder
OnStartWithStreamInputFn sets the callback invoked when a component receives streaming input. The handler receives a *schema.StreamReader that is a private copy; it MUST close the reader after consuming it to avoid goroutine and memory leaks.
type RunInfo ¶
RunInfo describes the entity that triggered a callback. Always nil-check before dereferencing — a component that calls OnStart without first calling EnsureRunInfo or InitCallbacks will leave RunInfo absent in the context.
Fields:
- Name: business-meaningful name specified by the user. For nodes in a graph this is the node name (compose.WithNodeName). For standalone components it must be set explicitly via InitCallbacks or ReuseHandlers; it is empty string if not set.
- Type: implementation identity, e.g. "OpenAI". Set by the component via components.Typer; falls back to reflection (struct/func name) if the interface is not implemented. Empty for Graph itself.
- Component: category constant, e.g. components.ComponentOfChatModel. Fixed value "Lambda" for lambdas, "Graph"/"Chain"/"Workflow" for graphs. Use this to branch on component kind without caring about implementation.
Handlers should filter using RunInfo rather than assuming a fixed execution order — there is no guaranteed ordering between different Handlers.
type TimingChecker ¶
type TimingChecker = callbacks.TimingChecker
TimingChecker is an optional interface for Handler implementations. When a handler implements Needed, the framework calls it before each component invocation to decide whether to set up callback infrastructure (stream copying, goroutine allocation) for that timing. Returning false avoids unnecessary overhead.
Handlers built with NewHandlerBuilder or utils/callbacks.NewHandlerHelper automatically implement TimingChecker based on which callback functions were set.