observable

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package observable wraps mcp-toolkit handler functions in cold rxgo Observables with configurable retry, exponential backoff, and error classification.

Typical usage — zero config (production defaults):

tool := observable.New("search_web", "Search the web.", myFn)
reg.Add(tool)

Tune a single option:

tool := observable.New("search_web", "Search the web.", myFn,
    observable.WithMaxRetries(5),
)

Swap an entire policy:

tool := observable.New("search_web", "Search the web.", myFn,
    observable.WithRetryPolicy(myCircuitBreaker),
    observable.WithClassifier(myClassifier),
)

The agent loop's Dispatch function detects observable.Tool and calls ExecuteRx for retry-aware fan-out via rxgo.Merge.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultErrorClassifier

func DefaultErrorClassifier(err error) error

DefaultErrorClassifier treats every error as transient (always retry up to MaxRetries).

func Permanent

func Permanent(err error) error

Permanent wraps err so the retry loop stops immediately without consuming any remaining retry budget. Use it in handlers or ErrorClassifiers for deterministic failures that retrying cannot fix (invalid input, not found, auth errors, etc.).

// In a handler:
if in.Query == "" {
    return "", observable.Permanent(ErrInvalidQuery)
}

// In a classifier:
observable.WithClassifier(func(err error) error {
    if errors.Is(err, ErrNotFound) { return observable.Permanent(err) }
    return err
})

errors.Is / errors.As still work through the wrapper because backoff.PermanentError implements Unwrap.

Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/v8tix/mcp-toolkit/observable"
)

func main() {
	errNotFound := errors.New("not found")

	tool := observable.New("find", "Find by ID.",
		func(_ context.Context, in struct {
			ID int `json:"id"`
		}) (string, error) {
			if in.ID == 0 {
				// Stop retrying immediately — no point retrying a missing ID.
				return "", observable.Permanent(errNotFound)
			}
			return "found", nil
		},
		observable.WithMaxRetries(5),
	)

	var execErr error
	for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"id":0}`)).Observe() {
		if item.E != nil {
			execErr = item.E
		}
	}

	fmt.Println(errors.Is(execErr, errNotFound))
}
Output:
true

Types

type Config

type Config struct {
	// Retry controls when and how many times to retry a failed invocation.
	Retry RetryPolicy
	// ErrPolicy classifies errors before each retry attempt.
	// Return backoff.Permanent(err) to stop retrying immediately.
	ErrPolicy ErrorPolicy
	// OnRetry is called on every transient failure before the next retry.
	// attempt is 1-indexed (1 = first failure). nil = no hook.
	OnRetry func(attempt uint64, err error)
}

Config holds the resolved configuration for an observable Tool. It is intentionally exported so that callers can define their own Option functions that target any field — enabling extensibility without modifying this package:

func WithCircuitBreaker(cb RetryPolicy) observable.Option {
    return func(c *observable.Config) { c.Retry = cb }
}

type ErrorClassifier

type ErrorClassifier func(err error) error

ErrorClassifier is a convenience function type that satisfies ErrorPolicy when wrapped with WithClassifier.

type ErrorPolicy

type ErrorPolicy interface {
	Classify(err error) error
}

ErrorPolicy controls error classification. Return backoff.Permanent(err) to stop retrying immediately.

type Option

type Option func(*Config)

Option is a functional option that configures an observable Tool. Options are applied left-to-right; later options override earlier ones.

func DefaultOptions

func DefaultOptions() []Option

DefaultOptions returns the production defaults as a slice so callers can spread and selectively override individual options:

opts := append(observable.DefaultOptions(), observable.WithMaxRetries(5))
tool := observable.New("t", "d", fn, opts...)

func WithClassifier

func WithClassifier(clf ErrorClassifier) Option

WithClassifier is a convenience option that sets the error classifier while keeping the existing retry policy.

tool := observable.New("t", "d", fn, observable.WithClassifier(func(err error) error {
    if isNotFound(err) { return backoff.Permanent(err) }
    return err
}))
Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/v8tix/mcp-toolkit/observable"
)

func main() {
	errBadInput := errors.New("bad input")

	tool := observable.New("process", "Process.",
		func(_ context.Context, _ struct{}) (string, error) {
			return "", errBadInput
		},
		observable.WithMaxRetries(5),
		observable.WithClassifier(func(err error) error {
			if errors.Is(err, errBadInput) {
				return observable.Permanent(err)
			}
			return err
		}),
	)

	var execErr error
	for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{}`)).Observe() {
		if item.E != nil {
			execErr = item.E
		}
	}

	fmt.Println(errors.Is(execErr, errBadInput))
}
Output:
true

func WithErrorPolicy

func WithErrorPolicy(p ErrorPolicy) Option

WithErrorPolicy replaces the entire error-classification strategy.

tool := observable.New("t", "d", fn, observable.WithErrorPolicy(myPolicy))

func WithMaxRetries

func WithMaxRetries(n uint64) Option

WithMaxRetries is a convenience option that sets the retry cap while keeping exponential backoff as the backoff strategy.

tool := observable.New("t", "d", fn, observable.WithMaxRetries(5))
Example
package main

import (
	"context"
	"fmt"

	"github.com/v8tix/mcp-toolkit/observable"
)

type calcArgs struct {
	A int `json:"a" desc:"First operand."`
	B int `json:"b" desc:"Second operand."`
}

func main() {
	tool := observable.New("add", "Add.",
		func(_ context.Context, in calcArgs) (int, error) { return in.A + in.B, nil },
		observable.WithMaxRetries(0), // no retry
	)

	fmt.Println(tool.Definition().Function.Name)
}
Output:
add

func WithOnRetry

func WithOnRetry(fn func(attempt uint64, err error)) Option

WithOnRetry sets a hook that is called on every transient failure before the next retry attempt. attempt is 1-indexed (1 = first failure, 2 = second, …). The hook is not called for permanent errors since no retry will follow.

tool := observable.New("t", "d", fn,
    observable.WithOnRetry(func(attempt uint64, err error) {
        log.Printf("retry %d after: %v", attempt, err)
    }),
)
Example
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"

	"github.com/v8tix/mcp-toolkit/observable"
)

type calcArgs struct {
	A int `json:"a" desc:"First operand."`
	B int `json:"b" desc:"Second operand."`
}

func main() {
	attempts := 0
	calls := 0

	tool := observable.New("flaky", "Flaky tool.",
		func(_ context.Context, _ calcArgs) (int, error) {
			calls++
			if calls < 3 {
				return 0, errors.New("transient")
			}
			return 42, nil
		},
		observable.WithMaxRetries(5),
		observable.WithOnRetry(func(attempt uint64, err error) {
			attempts++
			log.Printf("retry %d: %v", attempt, err)
		}),
	)

	var result any
	for item := range tool.ExecuteRx(context.Background(), json.RawMessage(`{"a":1,"b":2}`)).Observe() {
		if item.E == nil {
			result = item.V
		}
	}

	fmt.Println(result)
	fmt.Println(attempts)
}
Output:
42
2

func WithRetryPolicy

func WithRetryPolicy(p RetryPolicy) Option

WithRetryPolicy replaces the entire retry strategy.

tool := observable.New("t", "d", fn, observable.WithRetryPolicy(myCircuitBreaker))

type RetryPolicy

type RetryPolicy interface {
	// MaxRetries is the maximum number of retry attempts after the first
	// failure. Zero means one attempt only (no retries).
	MaxRetries() uint64
	// NewBackOff returns a fresh backoff.BackOff instance. It is called once
	// per ExecuteRx subscription so concurrent tool calls never share state.
	NewBackOff() backoff.BackOff
}

RetryPolicy controls retry behavior. Implement this interface to plug in a circuit breaker, fixed-interval policy, or any custom strategy.

type Tool

type Tool interface {
	handler.ExecutableTool

	// ExecuteRx wraps Execute in a cold rxgo.Observable that applies the
	// configured retry, backoff, and error classification.
	//
	// The observable emits exactly one item on success, or terminates with
	// an error after all retries are exhausted.
	ExecuteRx(ctx context.Context, rawArgs json.RawMessage) rxgo.Observable
}

Tool extends handler.ExecutableTool with a reactive execution path. Dispatch checks for this interface and calls ExecuteRx when available; plain ExecutableTools fall back to a simple rxgo.Defer wrapper (no retry).

func New

func New[In any, Out any](
	name, description string,
	fn func(context.Context, In) (Out, error),
	opts ...Option,
) Tool

New creates an observable Tool directly from a typed handler function. This is the primary entry point: define your function, get back a Tool.

Zero options applies production defaults (3 retries, exponential backoff).

tool := observable.New("search_web", "Search the web.", myFn)
tool := observable.New("search_web", "Search the web.", myFn, observable.WithMaxRetries(5))
Example
package main

import (
	"context"
	"fmt"

	"github.com/v8tix/mcp-toolkit/observable"
)

type calcArgs struct {
	A int `json:"a" desc:"First operand."`
	B int `json:"b" desc:"Second operand."`
}

func main() {
	tool := observable.New("add", "Add two integers.",
		func(_ context.Context, in calcArgs) (int, error) {
			return in.A + in.B, nil
		},
	)

	fmt.Println(tool.Definition().Function.Name)
}
Output:
add

func Wrap

func Wrap(inner handler.ExecutableTool, opts ...Option) Tool

Wrap wraps any existing handler.ExecutableTool in an observable Tool.

inner := handler.NewTool("greet", "Greet.", myFn)
tool  := observable.Wrap(inner, observable.WithMaxRetries(5))
Example
package main

import (
	"context"
	"fmt"

	"github.com/v8tix/mcp-toolkit/handler"
	"github.com/v8tix/mcp-toolkit/observable"
)

type calcArgs struct {
	A int `json:"a" desc:"First operand."`
	B int `json:"b" desc:"Second operand."`
}

func main() {
	base := handler.NewTool("add", "Add two integers.",
		func(_ context.Context, in calcArgs) (int, error) {
			return in.A + in.B, nil
		},
	)

	tool := observable.Wrap(base, observable.WithMaxRetries(5))
	fmt.Println(tool.Definition().Function.Name)
}
Output:
add

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL