observable

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package observable wraps mcp-toolkit handler functions in cold rxgo Observables with configurable retry, exponential backoff, and error classification.

Typical usage — zero config (production defaults):

tool := observable.New("search_web", "Search the web.", myFn)
reg.Add(tool)

Tune options with fluent chaining:

tool := observable.New("search_web", "Search the web.", myFn).
    WithMaxRetries(5)

Custom options via With (escape hatch):

tool := observable.New("search_web", "Search the web.", myFn).
    WithRetryPolicy(myCircuitBreaker).
    With(myCustomOption)

The agent loop's Dispatch function detects observable.Tool and calls ExecuteRx for retry-aware fan-out via rxgo.Merge.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultErrorClassifier

func DefaultErrorClassifier(err error) error

DefaultErrorClassifier treats every error as transient (always retry up to MaxRetries).

func Permanent

func Permanent(err error) error

Permanent wraps err so the retry loop stops immediately without consuming any remaining retry budget. Use it in handlers or ErrorClassifiers for deterministic failures that retrying cannot fix (invalid input, not found, auth errors, etc.).

// In a handler:
if in.Query == "" {
    return "", observable.Permanent(ErrInvalidQuery)
}

// In a classifier:
observable.WithClassifier(func(err error) error {
    if errors.Is(err, ErrNotFound) { return observable.Permanent(err) }
    return err
})

errors.Is / errors.As still work through the wrapper because backoff.PermanentError implements Unwrap.

Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/v8tix/mcp-toolkit/v2/observable"
)

func main() {
	errNotFound := errors.New("not found")

	tool := observable.New("find", "Find by ID.",
		func(_ context.Context, in struct {
			ID int `json:"id"`
		}) (string, error) {
			if in.ID == 0 {
				// Stop retrying immediately — no point retrying a missing ID.
				return "", observable.Permanent(errNotFound)
			}
			return "found", nil
		},
	).WithMaxRetries(5)

	var execErr error
	for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"id":0}`)).Observe() {
		if item.E != nil {
			execErr = item.E
		}
	}

	fmt.Println(errors.Is(execErr, errNotFound))
}
Output:
true

Types

type Builder

type Builder[In any, Out any] struct {
	// contains filtered or unexported fields
}

Builder[In, Out] constructs an observable Tool from a typed handler. Create one with New and chain With* methods before use.

tool := observable.New("search_web", "Search.", myFn).
    WithMaxRetries(5).
    WithClassifier(myClassifier)

Custom options are still supported via With:

func WithCircuitBreaker(cb RetryPolicy) observable.Option {
    return func(c *observable.Config) { c.Retry = cb }
}
tool := observable.New("search_web", "Search.", myFn).With(WithCircuitBreaker(cb))

func New

func New[In any, Out any](
	name, description string,
	fn func(context.Context, In) (Out, error),
) *Builder[In, Out]

New creates a Builder for an observable Tool from a typed handler function. Chain With* methods to configure retry behavior before use.

Zero options applies production defaults (3 retries, exponential backoff).

tool := observable.New("search_web", "Search the web.", myFn)
tool := observable.New("search_web", "Search the web.", myFn).WithMaxRetries(5)
Example
package main

import (
	"context"
	"fmt"

	"github.com/v8tix/mcp-toolkit/v2/observable"
)

type calcArgs struct {
	A int `json:"a" description:"First operand."`
	B int `json:"b" description:"Second operand."`
}

func main() {
	tool := observable.New("add", "Add two integers.",
		func(_ context.Context, in calcArgs) (int, error) {
			return in.A + in.B, nil
		},
	)

	fmt.Println(tool.Definition().Name)
}
Output:
add

func (*Builder[In, Out]) Definition

func (b *Builder[In, Out]) Definition() *sdkmcp.Tool

Definition implements model.Tool.

func (*Builder[In, Out]) Execute

func (b *Builder[In, Out]) Execute(ctx context.Context, rawArgs json.RawMessage) (any, error)

Execute implements handler.ExecutableTool.

func (*Builder[In, Out]) ExecuteRx

func (b *Builder[In, Out]) ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable

ExecuteRx implements observable.Tool.

func (*Builder[In, Out]) With

func (b *Builder[In, Out]) With(opts ...Option) *Builder[In, Out]

With appends arbitrary Option values — the escape hatch for custom options.

tool := observable.New("t", "d", fn).With(myCustomOption)

func (*Builder[In, Out]) WithClassifier

func (b *Builder[In, Out]) WithClassifier(clf ErrorClassifier) *Builder[In, Out]

WithClassifier sets the error classifier while keeping the existing retry policy.

func (*Builder[In, Out]) WithErrorPolicy

func (b *Builder[In, Out]) WithErrorPolicy(p ErrorPolicy) *Builder[In, Out]

WithErrorPolicy replaces the entire error-classification strategy.

func (*Builder[In, Out]) WithMaxRetries

func (b *Builder[In, Out]) WithMaxRetries(n uint64) *Builder[In, Out]

WithMaxRetries sets the retry cap while keeping exponential backoff.

func (*Builder[In, Out]) WithOnRetry

func (b *Builder[In, Out]) WithOnRetry(fn func(attempt uint64, err error)) *Builder[In, Out]

WithOnRetry sets a hook called on every transient failure before next retry.

func (*Builder[In, Out]) WithRetryPolicy

func (b *Builder[In, Out]) WithRetryPolicy(p RetryPolicy) *Builder[In, Out]

WithRetryPolicy replaces the entire retry strategy.

type Config

type Config struct {
	// Retry controls when and how many times to retry a failed invocation.
	Retry RetryPolicy
	// ErrPolicy classifies errors before each retry attempt.
	// Return backoff.Permanent(err) to stop retrying immediately.
	ErrPolicy ErrorPolicy
	// OnRetry is called on every transient failure before the next retry.
	// attempt is 1-indexed (1 = first failure). nil = no hook.
	OnRetry func(attempt uint64, err error)
}

Config holds the resolved configuration for an observable Tool. It is intentionally exported so that callers can define their own Option functions that target any field — enabling extensibility without modifying this package:

func WithCircuitBreaker(cb RetryPolicy) observable.Option {
    return func(c *observable.Config) { c.Retry = cb }
}

type ErrorClassifier

type ErrorClassifier func(err error) error

ErrorClassifier is a convenience function type that satisfies ErrorPolicy when wrapped with WithClassifier.

type ErrorPolicy

type ErrorPolicy interface {
	Classify(err error) error
}

ErrorPolicy controls error classification. Return backoff.Permanent(err) to stop retrying immediately.

type Option

type Option func(*Config)

Option is a functional option that configures an observable Tool. Options are applied left-to-right; later options override earlier ones.

func DefaultOptions

func DefaultOptions() []Option

DefaultOptions returns the production defaults as a slice so callers can spread and selectively override individual options:

opts := append(observable.DefaultOptions(), observable.WithMaxRetries(5))
tool := observable.New("t", "d", fn, opts...)

func WithClassifier

func WithClassifier(clf ErrorClassifier) Option

WithClassifier is a convenience option that sets the error classifier while keeping the existing retry policy.

tool := observable.New("t", "d", fn, observable.WithClassifier(func(err error) error {
    if isNotFound(err) { return backoff.Permanent(err) }
    return err
}))
Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/v8tix/mcp-toolkit/v2/observable"
)

func main() {
	errBadInput := errors.New("bad input")

	tool := observable.New("process", "Process.",
		func(_ context.Context, _ struct{}) (string, error) {
			return "", errBadInput
		},
	).
		WithMaxRetries(5).
		WithClassifier(func(err error) error {
			if errors.Is(err, errBadInput) {
				return observable.Permanent(err)
			}
			return err
		})

	var execErr error
	for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{}`)).Observe() {
		if item.E != nil {
			execErr = item.E
		}
	}

	fmt.Println(errors.Is(execErr, errBadInput))
}
Output:
true

func WithErrorPolicy

func WithErrorPolicy(p ErrorPolicy) Option

WithErrorPolicy replaces the entire error-classification strategy.

tool := observable.New("t", "d", fn, observable.WithErrorPolicy(myPolicy))

func WithMaxRetries

func WithMaxRetries(n uint64) Option

WithMaxRetries is a convenience option that sets the retry cap while keeping exponential backoff as the backoff strategy.

tool := observable.New("t", "d", fn, observable.WithMaxRetries(5))
Example
package main

import (
	"context"
	"fmt"

	"github.com/v8tix/mcp-toolkit/v2/observable"
)

type calcArgs struct {
	A int `json:"a" description:"First operand."`
	B int `json:"b" description:"Second operand."`
}

func main() {
	tool := observable.New("add", "Add.",
		func(_ context.Context, in calcArgs) (int, error) { return in.A + in.B, nil },
	).WithMaxRetries(0) // no retry

	fmt.Println(tool.Definition().Name)
}
Output:
add

func WithOnRetry

func WithOnRetry(fn func(attempt uint64, err error)) Option

WithOnRetry sets a hook that is called on every transient failure before the next retry attempt. attempt is 1-indexed (1 = first failure, 2 = second, …). The hook is not called for permanent errors since no retry will follow.

tool := observable.New("t", "d", fn,
    observable.WithOnRetry(func(attempt uint64, err error) {
        log.Printf("retry %d after: %v", attempt, err)
    }),
)
Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"

	"github.com/v8tix/mcp-toolkit/v2/observable"
)

type calcArgs struct {
	A int `json:"a" description:"First operand."`
	B int `json:"b" description:"Second operand."`
}

func main() {
	attempts := 0
	calls := 0

	tool := observable.New("flaky", "Flaky tool.",
		func(_ context.Context, _ calcArgs) (int, error) {
			calls++
			if calls < 3 {
				return 0, errors.New("transient")
			}
			return 42, nil
		},
	).
		WithMaxRetries(5).
		WithOnRetry(func(attempt uint64, err error) {
			attempts++
			log.Printf("retry %d: %v", attempt, err)
		})

	var result any
	for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"a":1,"b":2}`)).Observe() {
		if item.E == nil {
			result = item.V
		}
	}

	fmt.Println(result)
	fmt.Println(attempts)
}
Output:
42
2

func WithRetryPolicy

func WithRetryPolicy(p RetryPolicy) Option

WithRetryPolicy replaces the entire retry strategy.

tool := observable.New("t", "d", fn, observable.WithRetryPolicy(myCircuitBreaker))

type RetryPolicy

type RetryPolicy interface {
	// MaxRetries is the maximum number of retry attempts after the first
	// failure. Zero means one attempt only (no retries).
	MaxRetries() uint64
	// NewBackOff returns a fresh backoff.BackOff instance. It is called once
	// per ExecuteRx subscription so concurrent tool calls never share state.
	NewBackOff() backoff.BackOff
}

RetryPolicy controls retry behavior. Implement this interface to plug in a circuit breaker, fixed-interval policy, or any custom strategy.

type Tool

type Tool interface {
	handler.ExecutableTool

	// ExecuteRx wraps Execute in a cold rxgo.Observable that applies the
	// configured retry, backoff, and error classification.
	//
	// The observable emits exactly one item on success, or terminates with
	// an error after all retries are exhausted.
	ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable
}

Tool extends handler.ExecutableTool with a reactive execution path. Dispatch checks for this interface and calls ExecuteRx when available; plain ExecutableTools fall back to a simple rxgo.Defer wrapper (no retry).

func Wrap

func Wrap(inner handler.ExecutableTool, opts ...Option) Tool

Wrap wraps any existing handler.ExecutableTool in an observable Tool.

inner := handler.NewTool("greet", "Greet.", myFn)
tool  := observable.Wrap(inner, observable.WithMaxRetries(5))
Example
package main

import (
	"context"
	"fmt"

	"github.com/v8tix/mcp-toolkit/v2/handler"
	"github.com/v8tix/mcp-toolkit/v2/observable"
)

type calcArgs struct {
	A int `json:"a" description:"First operand."`
	B int `json:"b" description:"Second operand."`
}

func main() {
	base := handler.NewTool("add", "Add two integers.",
		func(_ context.Context, in calcArgs) (int, error) {
			return in.A + in.B, nil
		},
	)

	tool := observable.Wrap(base, observable.WithMaxRetries(5))
	fmt.Println(tool.Definition().Name)
}
Output:
add

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL