Documentation
¶
Overview ¶
Package observable wraps mcp-toolkit handler functions in cold rxgo Observables with configurable retry, exponential backoff, and error classification.
Typical usage — zero config (production defaults):
tool := observable.New("search_web", "Search the web.", myFn)
reg.Add(tool)
Tune a single option:
tool := observable.New("search_web", "Search the web.", myFn,
observable.WithMaxRetries(5),
)
Swap an entire policy:
tool := observable.New("search_web", "Search the web.", myFn,
observable.WithRetryPolicy(myCircuitBreaker),
observable.WithClassifier(myClassifier),
)
The agent loop's Dispatch function detects observable.Tool and calls ExecuteRx for retry-aware fan-out via rxgo.Merge.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultErrorClassifier ¶
DefaultErrorClassifier treats every error as transient (always retry up to MaxRetries).
func Permanent ¶
Permanent wraps err so the retry loop stops immediately without consuming any remaining retry budget. Use it in handlers or ErrorClassifiers for deterministic failures that retrying cannot fix (invalid input, not found, auth errors, etc.).
// In a handler:
if in.Query == "" {
return "", observable.Permanent(ErrInvalidQuery)
}
// In a classifier:
observable.WithClassifier(func(err error) error {
if errors.Is(err, ErrNotFound) { return observable.Permanent(err) }
return err
})
errors.Is / errors.As still work through the wrapper because backoff.PermanentError implements Unwrap.
Example ¶
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/v8tix/mcp-toolkit/observable"
)
func main() {
errNotFound := errors.New("not found")
tool := observable.New("find", "Find by ID.",
func(_ context.Context, in struct {
ID int `json:"id"`
}) (string, error) {
if in.ID == 0 {
// Stop retrying immediately — no point retrying a missing ID.
return "", observable.Permanent(errNotFound)
}
return "found", nil
},
observable.WithMaxRetries(5),
)
var execErr error
for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"id":0}`)).Observe() {
if item.E != nil {
execErr = item.E
}
}
fmt.Println(errors.Is(execErr, errNotFound))
}
Output: true
Types ¶
type Config ¶
type Config struct {
// Retry controls when and how many times to retry a failed invocation.
Retry RetryPolicy
// ErrPolicy classifies errors before each retry attempt.
// Return backoff.Permanent(err) to stop retrying immediately.
ErrPolicy ErrorPolicy
// OnRetry is called on every transient failure before the next retry.
// attempt is 1-indexed (1 = first failure). nil = no hook.
OnRetry func(attempt uint64, err error)
}
Config holds the resolved configuration for an observable Tool. It is intentionally exported so that callers can define their own Option functions that target any field — enabling extensibility without modifying this package:
func WithCircuitBreaker(cb RetryPolicy) observable.Option {
return func(c *observable.Config) { c.Retry = cb }
}
type ErrorClassifier ¶
ErrorClassifier is a convenience function type that satisfies ErrorPolicy when wrapped with WithClassifier.
type ErrorPolicy ¶
ErrorPolicy controls error classification. Return backoff.Permanent(err) to stop retrying immediately.
type Option ¶
type Option func(*Config)
Option is a functional option that configures an observable Tool. Options are applied left-to-right; later options override earlier ones.
func DefaultOptions ¶
func DefaultOptions() []Option
DefaultOptions returns the production defaults as a slice so callers can spread and selectively override individual options:
opts := append(observable.DefaultOptions(), observable.WithMaxRetries(5))
tool := observable.New("t", "d", fn, opts...)
func WithClassifier ¶
func WithClassifier(clf ErrorClassifier) Option
WithClassifier is a convenience option that sets the error classifier while keeping the existing retry policy.
tool := observable.New("t", "d", fn, observable.WithClassifier(func(err error) error {
if isNotFound(err) { return backoff.Permanent(err) }
return err
}))
Example ¶
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/v8tix/mcp-toolkit/observable"
)
func main() {
errBadInput := errors.New("bad input")
tool := observable.New("process", "Process.",
func(_ context.Context, _ struct{}) (string, error) {
return "", errBadInput
},
observable.WithMaxRetries(5),
observable.WithClassifier(func(err error) error {
if errors.Is(err, errBadInput) {
return observable.Permanent(err)
}
return err
}),
)
var execErr error
for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{}`)).Observe() {
if item.E != nil {
execErr = item.E
}
}
fmt.Println(errors.Is(execErr, errBadInput))
}
Output: true
func WithErrorPolicy ¶
func WithErrorPolicy(p ErrorPolicy) Option
WithErrorPolicy replaces the entire error-classification strategy.
tool := observable.New("t", "d", fn, observable.WithErrorPolicy(myPolicy))
func WithMaxRetries ¶
WithMaxRetries is a convenience option that sets the retry cap while keeping exponential backoff as the backoff strategy.
tool := observable.New("t", "d", fn, observable.WithMaxRetries(5))
Example ¶
package main
import (
"context"
"fmt"
"github.com/v8tix/mcp-toolkit/observable"
)
type calcArgs struct {
A int `json:"a" desc:"First operand."`
B int `json:"b" desc:"Second operand."`
}
func main() {
tool := observable.New("add", "Add.",
func(_ context.Context, in calcArgs) (int, error) { return in.A + in.B, nil },
observable.WithMaxRetries(0), // no retry
)
fmt.Println(tool.Definition().Function.Name)
}
Output: add
func WithOnRetry ¶
WithOnRetry sets a hook that is called on every transient failure before the next retry attempt. attempt is 1-indexed (1 = first failure, 2 = second, …). The hook is not called for permanent errors since no retry will follow.
tool := observable.New("t", "d", fn,
observable.WithOnRetry(func(attempt uint64, err error) {
log.Printf("retry %d after: %v", attempt, err)
}),
)
Example ¶
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"github.com/v8tix/mcp-toolkit/observable"
)
type calcArgs struct {
A int `json:"a" desc:"First operand."`
B int `json:"b" desc:"Second operand."`
}
func main() {
attempts := 0
calls := 0
tool := observable.New("flaky", "Flaky tool.",
func(_ context.Context, _ calcArgs) (int, error) {
calls++
if calls < 3 {
return 0, errors.New("transient")
}
return 42, nil
},
observable.WithMaxRetries(5),
observable.WithOnRetry(func(attempt uint64, err error) {
attempts++
log.Printf("retry %d: %v", attempt, err)
}),
)
var result any
for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"a":1,"b":2}`)).Observe() {
if item.E == nil {
result = item.V
}
}
fmt.Println(result)
fmt.Println(attempts)
}
Output: 42 2
func WithRetryPolicy ¶
func WithRetryPolicy(p RetryPolicy) Option
WithRetryPolicy replaces the entire retry strategy.
tool := observable.New("t", "d", fn, observable.WithRetryPolicy(myCircuitBreaker))
type RetryPolicy ¶
type RetryPolicy interface {
// MaxRetries is the maximum number of retry attempts after the first
// failure. Zero means one attempt only (no retries).
MaxRetries() uint64
// NewBackOff returns a fresh backoff.BackOff instance. It is called once
// per ExecuteRx subscription so concurrent tool calls never share state.
NewBackOff() backoff.BackOff
}
RetryPolicy controls retry behavior. Implement this interface to plug in a circuit breaker, fixed-interval policy, or any custom strategy.
type Tool ¶
type Tool interface {
handler.ExecutableTool
// ExecuteRx wraps Execute in a cold rxgo.Observable that applies the
// configured retry, backoff, and error classification.
//
// The observable emits exactly one item on success, or terminates with
// an error after all retries are exhausted.
ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable
}
Tool extends handler.ExecutableTool with a reactive execution path. Dispatch checks for this interface and calls ExecuteRx when available; plain ExecutableTools fall back to a simple rxgo.Defer wrapper (no retry).
func New ¶
func New[In any, Out any]( name, description string, fn func(context.Context, In) (Out, error), opts ...Option, ) Tool
New creates an observable Tool directly from a typed handler function. This is the primary entry point: define your function, get back a Tool.
Zero options applies production defaults (3 retries, exponential backoff).
tool := observable.New("search_web", "Search the web.", myFn)
tool := observable.New("search_web", "Search the web.", myFn, observable.WithMaxRetries(5))
Example ¶
package main
import (
"context"
"fmt"
"github.com/v8tix/mcp-toolkit/observable"
)
type calcArgs struct {
A int `json:"a" desc:"First operand."`
B int `json:"b" desc:"Second operand."`
}
func main() {
tool := observable.New("add", "Add two integers.",
func(_ context.Context, in calcArgs) (int, error) {
return in.A + in.B, nil
},
)
fmt.Println(tool.Definition().Function.Name)
}
Output: add
func Wrap ¶
func Wrap(inner handler.ExecutableTool, opts ...Option) Tool
Wrap wraps any existing handler.ExecutableTool in an observable Tool.
inner := handler.NewTool("greet", "Greet.", myFn)
tool := observable.Wrap(inner, observable.WithMaxRetries(5))
Example ¶
package main
import (
"context"
"fmt"
"github.com/v8tix/mcp-toolkit/handler"
"github.com/v8tix/mcp-toolkit/observable"
)
type calcArgs struct {
A int `json:"a" desc:"First operand."`
B int `json:"b" desc:"Second operand."`
}
func main() {
base := handler.NewTool("add", "Add two integers.",
func(_ context.Context, in calcArgs) (int, error) {
return in.A + in.B, nil
},
)
tool := observable.Wrap(base, observable.WithMaxRetries(5))
fmt.Println(tool.Definition().Function.Name)
}
Output: add