Documentation
¶
Overview ¶
Package observable wraps mcp-toolkit handler functions in cold rxgo Observables with configurable retry, exponential backoff, and error classification.
Typical usage — zero config (production defaults):
tool := observable.New("search_web", "Search the web.", myFn)
reg.Add(tool)
Tune options with fluent chaining:
tool := observable.New("search_web", "Search the web.", myFn).
WithMaxRetries(5)
Custom options via With (escape hatch):
tool := observable.New("search_web", "Search the web.", myFn).
WithRetryPolicy(myCircuitBreaker).
With(myCustomOption)
The agent loop's Dispatch function detects observable.Tool and calls ExecuteRx for retry-aware fan-out via rxgo.Merge.
Index ¶
- func DefaultErrorClassifier(err error) error
- func Permanent(err error) error
- type Builder
- func (b *Builder[In, Out]) Definition() *sdkmcp.Tool
- func (b *Builder[In, Out]) Execute(ctx context.Context, rawArgs json.RawMessage) (any, error)
- func (b *Builder[In, Out]) ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable
- func (b *Builder[In, Out]) With(opts ...Option) *Builder[In, Out]
- func (b *Builder[In, Out]) WithClassifier(clf ErrorClassifier) *Builder[In, Out]
- func (b *Builder[In, Out]) WithErrorPolicy(p ErrorPolicy) *Builder[In, Out]
- func (b *Builder[In, Out]) WithMaxRetries(n uint64) *Builder[In, Out]
- func (b *Builder[In, Out]) WithOnRetry(fn func(attempt uint64, err error)) *Builder[In, Out]
- func (b *Builder[In, Out]) WithRetryPolicy(p RetryPolicy) *Builder[In, Out]
- type Config
- type ErrorClassifier
- type ErrorPolicy
- type Option
- type RetryPolicy
- type Tool
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultErrorClassifier ¶
DefaultErrorClassifier treats every error as transient (always retry up to MaxRetries).
func Permanent ¶
Permanent wraps err so the retry loop stops immediately without consuming any remaining retry budget. Use it in handlers or ErrorClassifiers for deterministic failures that retrying cannot fix (invalid input, not found, auth errors, etc.).
// In a handler:
if in.Query == "" {
return "", observable.Permanent(ErrInvalidQuery)
}
// In a classifier:
observable.WithClassifier(func(err error) error {
if errors.Is(err, ErrNotFound) { return observable.Permanent(err) }
return err
})
errors.Is / errors.As still work through the wrapper because backoff.PermanentError implements Unwrap.
Example ¶
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/v8tix/mcp-toolkit/v2/observable"
)
func main() {
errNotFound := errors.New("not found")
tool := observable.New("find", "Find by ID.",
func(_ context.Context, in struct {
ID int `json:"id"`
}) (string, error) {
if in.ID == 0 {
// Stop retrying immediately — no point retrying a missing ID.
return "", observable.Permanent(errNotFound)
}
return "found", nil
},
).WithMaxRetries(5)
var execErr error
for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"id":0}`)).Observe() {
if item.E != nil {
execErr = item.E
}
}
fmt.Println(errors.Is(execErr, errNotFound))
}
Output: true
Types ¶
type Builder ¶
Builder[In, Out] constructs an observable Tool from a typed handler. Create one with New and chain With* methods before use.
tool := observable.New("search_web", "Search.", myFn).
WithMaxRetries(5).
WithClassifier(myClassifier)
Custom options are still supported via With:
func WithCircuitBreaker(cb RetryPolicy) observable.Option {
return func(c *observable.Config) { c.Retry = cb }
}
tool := observable.New("search_web", "Search.", myFn).With(WithCircuitBreaker(cb))
func New ¶
func New[In any, Out any]( name, description string, fn func(context.Context, In) (Out, error), ) *Builder[In, Out]
New creates a Builder for an observable Tool from a typed handler function. Chain With* methods to configure retry behavior before use.
Zero options applies production defaults (3 retries, exponential backoff).
tool := observable.New("search_web", "Search the web.", myFn)
tool := observable.New("search_web", "Search the web.", myFn).WithMaxRetries(5)
Example ¶
package main
import (
"context"
"fmt"
"github.com/v8tix/mcp-toolkit/v2/observable"
)
type calcArgs struct {
A int `json:"a" description:"First operand."`
B int `json:"b" description:"Second operand."`
}
func main() {
tool := observable.New("add", "Add two integers.",
func(_ context.Context, in calcArgs) (int, error) {
return in.A + in.B, nil
},
)
fmt.Println(tool.Definition().Name)
}
Output: add
func (*Builder[In, Out]) Definition ¶
Definition implements model.Tool.
func (*Builder[In, Out]) ExecuteRx ¶
func (b *Builder[In, Out]) ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable
ExecuteRx implements observable.Tool.
func (*Builder[In, Out]) With ¶
With appends arbitrary Option values — the escape hatch for custom options.
tool := observable.New("t", "d", fn).With(myCustomOption)
func (*Builder[In, Out]) WithClassifier ¶
func (b *Builder[In, Out]) WithClassifier(clf ErrorClassifier) *Builder[In, Out]
WithClassifier sets the error classifier while keeping the existing retry policy.
func (*Builder[In, Out]) WithErrorPolicy ¶
func (b *Builder[In, Out]) WithErrorPolicy(p ErrorPolicy) *Builder[In, Out]
WithErrorPolicy replaces the entire error-classification strategy.
func (*Builder[In, Out]) WithMaxRetries ¶
WithMaxRetries sets the retry cap while keeping exponential backoff.
func (*Builder[In, Out]) WithOnRetry ¶
WithOnRetry sets a hook called on every transient failure before next retry.
func (*Builder[In, Out]) WithRetryPolicy ¶
func (b *Builder[In, Out]) WithRetryPolicy(p RetryPolicy) *Builder[In, Out]
WithRetryPolicy replaces the entire retry strategy.
type Config ¶
type Config struct {
// Retry controls when and how many times to retry a failed invocation.
Retry RetryPolicy
// ErrPolicy classifies errors before each retry attempt.
// Return backoff.Permanent(err) to stop retrying immediately.
ErrPolicy ErrorPolicy
// OnRetry is called on every transient failure before the next retry.
// attempt is 1-indexed (1 = first failure). nil = no hook.
OnRetry func(attempt uint64, err error)
}
Config holds the resolved configuration for an observable Tool. It is intentionally exported so that callers can define their own Option functions that target any field — enabling extensibility without modifying this package:
func WithCircuitBreaker(cb RetryPolicy) observable.Option {
return func(c *observable.Config) { c.Retry = cb }
}
type ErrorClassifier ¶
ErrorClassifier is a convenience function type that satisfies ErrorPolicy when wrapped with WithClassifier.
type ErrorPolicy ¶
ErrorPolicy controls error classification. Return backoff.Permanent(err) to stop retrying immediately.
type Option ¶
type Option func(*Config)
Option is a functional option that configures an observable Tool. Options are applied left-to-right; later options override earlier ones.
func DefaultOptions ¶
func DefaultOptions() []Option
DefaultOptions returns the production defaults as a slice so callers can spread and selectively override individual options:
opts := append(observable.DefaultOptions(), observable.WithMaxRetries(5))
tool := observable.New("t", "d", fn, opts...)
func WithClassifier ¶
func WithClassifier(clf ErrorClassifier) Option
WithClassifier is a convenience option that sets the error classifier while keeping the existing retry policy.
tool := observable.New("t", "d", fn, observable.WithClassifier(func(err error) error {
if isNotFound(err) { return backoff.Permanent(err) }
return err
}))
Example ¶
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/v8tix/mcp-toolkit/v2/observable"
)
func main() {
errBadInput := errors.New("bad input")
tool := observable.New("process", "Process.",
func(_ context.Context, _ struct{}) (string, error) {
return "", errBadInput
},
).
WithMaxRetries(5).
WithClassifier(func(err error) error {
if errors.Is(err, errBadInput) {
return observable.Permanent(err)
}
return err
})
var execErr error
for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{}`)).Observe() {
if item.E != nil {
execErr = item.E
}
}
fmt.Println(errors.Is(execErr, errBadInput))
}
Output: true
func WithErrorPolicy ¶
func WithErrorPolicy(p ErrorPolicy) Option
WithErrorPolicy replaces the entire error-classification strategy.
tool := observable.New("t", "d", fn, observable.WithErrorPolicy(myPolicy))
func WithMaxRetries ¶
WithMaxRetries is a convenience option that sets the retry cap while keeping exponential backoff as the backoff strategy.
tool := observable.New("t", "d", fn, observable.WithMaxRetries(5))
Example ¶
package main
import (
"context"
"fmt"
"github.com/v8tix/mcp-toolkit/v2/observable"
)
type calcArgs struct {
A int `json:"a" description:"First operand."`
B int `json:"b" description:"Second operand."`
}
func main() {
tool := observable.New("add", "Add.",
func(_ context.Context, in calcArgs) (int, error) { return in.A + in.B, nil },
).WithMaxRetries(0) // no retry
fmt.Println(tool.Definition().Name)
}
Output: add
func WithOnRetry ¶
WithOnRetry sets a hook that is called on every transient failure before the next retry attempt. attempt is 1-indexed (1 = first failure, 2 = second, …). The hook is not called for permanent errors since no retry will follow.
tool := observable.New("t", "d", fn,
observable.WithOnRetry(func(attempt uint64, err error) {
log.Printf("retry %d after: %v", attempt, err)
}),
)
Example ¶
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"github.com/v8tix/mcp-toolkit/v2/observable"
)
type calcArgs struct {
A int `json:"a" description:"First operand."`
B int `json:"b" description:"Second operand."`
}
func main() {
attempts := 0
calls := 0
tool := observable.New("flaky", "Flaky tool.",
func(_ context.Context, _ calcArgs) (int, error) {
calls++
if calls < 3 {
return 0, errors.New("transient")
}
return 42, nil
},
).
WithMaxRetries(5).
WithOnRetry(func(attempt uint64, err error) {
attempts++
log.Printf("retry %d: %v", attempt, err)
})
var result any
for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"a":1,"b":2}`)).Observe() {
if item.E == nil {
result = item.V
}
}
fmt.Println(result)
fmt.Println(attempts)
}
Output: 42 2
func WithRetryPolicy ¶
func WithRetryPolicy(p RetryPolicy) Option
WithRetryPolicy replaces the entire retry strategy.
tool := observable.New("t", "d", fn, observable.WithRetryPolicy(myCircuitBreaker))
type RetryPolicy ¶
type RetryPolicy interface {
// MaxRetries is the maximum number of retry attempts after the first
// failure. Zero means one attempt only (no retries).
MaxRetries() uint64
// NewBackOff returns a fresh backoff.BackOff instance. It is called once
// per ExecuteRx subscription so concurrent tool calls never share state.
NewBackOff() backoff.BackOff
}
RetryPolicy controls retry behavior. Implement this interface to plug in a circuit breaker, fixed-interval policy, or any custom strategy.
type Tool ¶
type Tool interface {
handler.ExecutableTool
// ExecuteRx wraps Execute in a cold rxgo.Observable that applies the
// configured retry, backoff, and error classification.
//
// The observable emits exactly one item on success, or terminates with
// an error after all retries are exhausted.
ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable
}
Tool extends handler.ExecutableTool with a reactive execution path. Dispatch checks for this interface and calls ExecuteRx when available; plain ExecutableTools fall back to a simple rxgo.Defer wrapper (no retry).
func Wrap ¶
func Wrap(inner handler.ExecutableTool, opts ...Option) Tool
Wrap wraps any existing handler.ExecutableTool in an observable Tool.
inner := handler.NewTool("greet", "Greet.", myFn)
tool := observable.Wrap(inner, observable.WithMaxRetries(5))
Example ¶
package main
import (
"context"
"fmt"
"github.com/v8tix/mcp-toolkit/v2/handler"
"github.com/v8tix/mcp-toolkit/v2/observable"
)
type calcArgs struct {
A int `json:"a" description:"First operand."`
B int `json:"b" description:"Second operand."`
}
func main() {
base := handler.NewTool("add", "Add two integers.",
func(_ context.Context, in calcArgs) (int, error) {
return in.A + in.B, nil
},
)
tool := observable.Wrap(base, observable.WithMaxRetries(5))
fmt.Println(tool.Definition().Name)
}
Output: add