Documentation
¶
Overview ¶
Package provider implements a generic provider framework using Go generics for swappable backends with runtime switching capabilities.
It provides a registry for managing multiple provider implementations with factory-based instantiation, availability checking, and runtime selection.
The package defines four interaction patterns:
- RequestResponse[I, O]: one input → one output (HTTP, gRPC, subprocess)
- Stream[I, O]: one input → many outputs (SSE, streaming gRPC, piped subprocess)
- Sink[I]: one input → ack (Kafka produce, webhook, push notification)
- Duplex[I, O]: bidirectional (WebSocket, gRPC bidi-stream)
Opt-in lifecycle:
- Initializable: providers that need setup (dial gRPC, validate binary)
- Closeable: providers that hold resources (connections, daemon processes)
Usage ¶
reg := provider.NewRegistry[MyProvider]()
reg.RegisterFactory("default", myFactory)
mgr := provider.NewManager(reg, &provider.HealthCheckSelector[MyProvider]{})
mgr.InitializeWithContext(ctx, "default", cfg)
p, _ := mgr.Get(ctx)
Index ¶
- func ExecuteWithResilience[T any](ctx context.Context, s *ResilienceState, fn func() (T, error)) (T, error)
- type Closeable
- type Duplex
- type DuplexStream
- type Factory
- type HealthCheckSelector
- type Initializable
- type Iterator
- type Manager
- func (m *Manager[T]) Available() []string
- func (m *Manager[T]) CloseAll(ctx context.Context) error
- func (m *Manager[T]) Get(ctx context.Context) (T, error)
- func (m *Manager[T]) GetByName(name string) (T, error)
- func (m *Manager[T]) Initialize(name string, cfg map[string]any) error
- func (m *Manager[T]) InitializeWithContext(ctx context.Context, name string, cfg map[string]any) error
- func (m *Manager[T]) InitializeWithResilience(ctx context.Context, name string, cfg map[string]any, wrap func(T) T) error
- func (m *Manager[T]) Register(name string, factory Factory[T])
- func (m *Manager[T]) SetDefault(name string) error
- type PrioritySelector
- type Provider
- type Registry
- type RequestResponse
- type ResilienceConfig
- type ResilienceState
- type RoundRobinSelector
- type Selector
- type Sink
- type Stream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExecuteWithResilience ¶ added in v0.1.1
func ExecuteWithResilience[T any](ctx context.Context, s *ResilienceState, fn func() (T, error)) (T, error)
ExecuteWithResilience runs fn through the resilience chain: RateLimiter.Wait → Bulkhead → CircuitBreaker → Retry → fn. Exported so other packages (e.g., process) can reuse the chain. Resilience errors are wrapped as gokit AppError for consistency.
Types ¶
type Closeable ¶ added in v0.1.1
Closeable is optionally implemented by providers that hold resources requiring explicit cleanup (e.g., gRPC connection, daemon process, LDAP bind). The Manager calls Close() automatically during shutdown.
type Duplex ¶ added in v0.1.1
Duplex represents a provider with bidirectional communication. This covers: WebSocket, gRPC bidi-stream, long-running subprocess.
func WithDuplexResilience ¶ added in v0.1.1
func WithDuplexResilience[I, O any](p Duplex[I, O], cfg ResilienceConfig) Duplex[I, O]
WithDuplexResilience wraps a Duplex provider with resilience middleware. Only CircuitBreaker and RateLimiter are applied to the Open call. Retry is not applied — persistent connections should reconnect at a higher level.
type DuplexStream ¶ added in v0.1.1
type DuplexStream[I, O any] interface { // Send writes a value to the remote end. Send(I) error // Recv reads a value from the remote end. Returns io.EOF when closed. Recv() (O, error) // Close terminates the stream. Close() error }
DuplexStream provides bidirectional communication over a single connection.
type HealthCheckSelector ¶
type HealthCheckSelector[T Provider] struct{}
HealthCheckSelector picks the first available provider by calling IsAvailable.
type Initializable ¶ added in v0.1.1
Initializable is optionally implemented by providers that need setup before handling requests (e.g., dial gRPC, validate binary, warm cache). The Manager calls Init() automatically when initializing providers.
type Iterator ¶ added in v0.1.1
type Iterator[T any] interface { // Next returns the next value. Returns (zero, false, nil) when exhausted. Next(ctx context.Context) (T, bool, error) // Close releases any resources held by the iterator. Close() error }
Iterator provides pull-based sequential access to a stream of values. The consumer calls Next() to retrieve values one at a time. Close must be called when done to release resources.
type Manager ¶
type Manager[T Provider] struct { // contains filtered or unexported fields }
Manager provides the main API for working with providers, combining a Registry for storage and a Selector for choosing providers.
func NewManager ¶
NewManager creates a Manager backed by the given registry and selector.
func (*Manager[T]) CloseAll ¶ added in v0.1.1
CloseAll calls Close() on all providers that implement Closeable.
func (*Manager[T]) Initialize ¶
Initialize creates a provider from its factory, calls Init() if the provider implements Initializable, and stores it for use.
func (*Manager[T]) InitializeWithContext ¶ added in v0.1.1
func (m *Manager[T]) InitializeWithContext(ctx context.Context, name string, cfg map[string]any) error
InitializeWithContext creates a provider from its factory, calls Init() if the provider implements Initializable, and stores it for use.
func (*Manager[T]) InitializeWithResilience ¶ added in v0.1.1
func (m *Manager[T]) InitializeWithResilience(ctx context.Context, name string, cfg map[string]any, wrap func(T) T) error
InitializeWithResilience creates a provider from its factory, wraps it with the given middleware function, calls Init(), and stores it for use. The wrap function applies resilience (or any other middleware) to the provider. Example:
mgr.InitializeWithResilience(ctx, "http", nil, func(p MyProvider) MyProvider {
return provider.WithResilience(p, resilienceCfg).(MyProvider)
})
func (*Manager[T]) SetDefault ¶
SetDefault sets the default provider by name.
type PrioritySelector ¶
type PrioritySelector[T Provider] struct { // Priority is the ordered list of provider names to try. Priority []string }
PrioritySelector tries providers in the given priority order and returns the first one that is available.
type Provider ¶
type Provider interface {
// Name returns the provider's unique name.
Name() string
// IsAvailable checks if the provider is ready to handle requests.
IsAvailable(ctx context.Context) bool
}
Provider is the base interface all providers must implement.
type Registry ¶
type Registry[T Provider] struct { // contains filtered or unexported fields }
Registry manages named provider factories and cached instances.
func NewRegistry ¶
NewRegistry creates a new empty Registry.
func (*Registry[T]) RegisterFactory ¶
RegisterFactory registers a named factory for creating providers.
type RequestResponse ¶ added in v0.1.1
type RequestResponse[I, O any] interface { Provider Execute(ctx context.Context, input I) (O, error) }
RequestResponse represents a provider that takes one input and returns one output. This covers: HTTP calls, gRPC unary, subprocess exec, SQL queries, S3 operations.
func WithResilience ¶ added in v0.1.1
func WithResilience[I, O any](p RequestResponse[I, O], cfg ResilienceConfig) RequestResponse[I, O]
WithResilience wraps a RequestResponse provider with resilience middleware. Execution chain: RateLimiter → Bulkhead → CircuitBreaker → Retry → Execute. Nil config fields are skipped. Empty config returns the provider unchanged.
type ResilienceConfig ¶ added in v0.1.1
type ResilienceConfig struct {
// CircuitBreaker prevents cascading failures by stopping calls after repeated errors.
CircuitBreaker *resilience.CircuitBreakerConfig
// Retry automatically retries failed calls with exponential backoff.
Retry *resilience.RetryConfig
// RateLimiter limits the rate of calls using a token bucket algorithm.
RateLimiter *resilience.RateLimiterConfig
// Bulkhead limits concurrent calls to prevent resource exhaustion.
Bulkhead *resilience.BulkheadConfig
}
ResilienceConfig bundles optional resilience policies for a provider. Nil fields are skipped — zero config means pure passthrough with no overhead.
func (ResilienceConfig) IsEmpty ¶ added in v0.1.1
func (c ResilienceConfig) IsEmpty() bool
IsEmpty returns true if no resilience policies are configured.
type ResilienceState ¶ added in v0.1.1
type ResilienceState struct {
// contains filtered or unexported fields
}
ResilienceState holds initialized resilience primitives built from config.
func BuildResilience ¶ added in v0.1.1
func BuildResilience(cfg ResilienceConfig) *ResilienceState
BuildResilience creates initialized resilience primitives from config.
type RoundRobinSelector ¶
type RoundRobinSelector[T Provider] struct { // contains filtered or unexported fields }
RoundRobinSelector distributes requests across providers.
type Selector ¶
type Selector[T Provider] interface { Select(ctx context.Context, providers map[string]T) (T, error) }
Selector picks a provider from the available options.
type Sink ¶ added in v0.1.1
Sink represents a provider that accepts input with no meaningful output. This covers: Kafka produce, webhook, push notification, logging.
func WithSinkResilience ¶ added in v0.1.1
func WithSinkResilience[I any](p Sink[I], cfg ResilienceConfig) Sink[I]
WithSinkResilience wraps a Sink provider with resilience middleware. Execution chain: RateLimiter → Bulkhead → CircuitBreaker → Retry → Send.
type Stream ¶ added in v0.1.1
type Stream[I, O any] interface { Provider Execute(ctx context.Context, input I) (Iterator[O], error) }
Stream represents a provider that takes one input and returns multiple outputs. This covers: gRPC server-stream, subprocess stdout pipe, SSE, chunked HTTP.
func WithStreamResilience ¶ added in v0.1.1
func WithStreamResilience[I, O any](p Stream[I, O], cfg ResilienceConfig) Stream[I, O]
WithStreamResilience wraps a Stream provider with resilience middleware. Resilience is applied to the Execute call that opens the stream. Individual Next() calls on the returned Iterator are NOT wrapped.