provider

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: MIT Imports: 11 Imported by: 0

README

provider

Generic provider registry, manager, and selection strategies for pluggable backends.

Install

go get github.com/kbukum/gokit

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/kbukum/gokit/provider"
)

// MyProvider implements provider.Provider
type MyProvider struct{ name string }
func (p *MyProvider) Name() string                          { return p.name }
func (p *MyProvider) IsAvailable(ctx context.Context) bool  { return true }

func main() {
    registry := provider.NewRegistry[*MyProvider]()
    selector := &provider.PrioritySelector[*MyProvider]{}
    mgr := provider.NewManager(registry, selector)

    // Register a factory
    mgr.Register("primary", func(cfg map[string]any) (*MyProvider, error) {
        return &MyProvider{name: "primary"}, nil
    })
    mgr.Initialize("primary", nil)
    mgr.SetDefault("primary")

    // Get best available provider
    p, err := mgr.Get(context.Background())
    fmt.Println(p.Name(), err) // "primary" <nil>
}

Key Types & Functions

Name Description
Provider Interface: Name() + IsAvailable()
Registry[T] Generic provider registry with factory support
Manager[T] Manages provider lifecycle and selection
Factory[T] Factory function type for creating providers
Selector[T] Interface for provider selection strategy
PrioritySelector / RoundRobinSelector / HealthCheckSelector Built-in selection strategies

⬅ Back to main README

Documentation

Overview

Package provider implements a generic provider framework using Go generics for swappable backends with runtime switching capabilities.

It provides a registry for managing multiple provider implementations with factory-based instantiation, availability checking, and runtime selection.

The package defines four interaction patterns:

  • RequestResponse[I, O]: one input → one output (HTTP, gRPC, subprocess)
  • Stream[I, O]: one input → many outputs (SSE, streaming gRPC, piped subprocess)
  • Sink[I]: one input → ack (Kafka produce, webhook, push notification)
  • Duplex[I, O]: bidirectional (WebSocket, gRPC bidi-stream)

Opt-in lifecycle:

  • Initializable: providers that need setup (dial gRPC, validate binary)
  • Closeable: providers that hold resources (connections, daemon processes)

State Management

The Stateful wrapper adds automatic state load/save around provider execution:

store := provider.NewMemoryStore[MyState]()
stateful := provider.NewStateful(provider.StatefulConfig[In, Out, MyState]{
    Inner:   myProvider,
    Store:   store,
    KeyFunc: func(in In) string { return in.SessionID },
    Inject:  func(in In, s *MyState) In { /* enrich input */ },
    Extract: func(in In, out Out) *MyState { /* derive state */ },
    TTL:     5 * time.Minute,
})

ContextStore[C] is the state persistence interface; MemoryStore is the built-in in-memory implementation for testing. Production implementations (e.g., redis.TypedStore) live in sub-modules to avoid dependency coupling.

Middleware

Middleware[I, O] is a function that wraps a RequestResponse provider. Use Chain to compose multiple middlewares:

wrapped := provider.Chain(
    provider.WithLogging[In, Out](log),
    provider.WithMetrics[In, Out](metrics),
    provider.WithTracing[In, Out]("my-service"),
)(rawProvider)

Usage

reg := provider.NewRegistry[MyProvider]()
reg.RegisterFactory("default", myFactory)
mgr := provider.NewManager(reg, &provider.HealthCheckSelector[MyProvider]{})
mgr.InitializeWithContext(ctx, "default", cfg)
p, _ := mgr.Get(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteWithResilience added in v0.1.1

func ExecuteWithResilience[T any](ctx context.Context, s *ResilienceState, fn func() (T, error)) (T, error)

ExecuteWithResilience runs fn through the resilience chain: RateLimiter.Wait → Bulkhead → CircuitBreaker → Retry → fn. Exported so other packages (e.g., process) can reuse the chain. Resilience errors are wrapped as gokit AppError for consistency.

Types

type Closeable added in v0.1.1

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is optionally implemented by providers that hold resources requiring explicit cleanup (e.g., gRPC connection, daemon process, LDAP bind). The Manager calls Close() automatically during shutdown.

type ContextStore added in v0.1.2

type ContextStore[C any] interface {
	// Load retrieves state. Returns (nil, nil) if key doesn't exist.
	Load(ctx context.Context, key string) (*C, error)
	// Save persists state with optional TTL. TTL of 0 means no expiration.
	Save(ctx context.Context, key string, val *C, ttl time.Duration) error
	// Delete removes state.
	Delete(ctx context.Context, key string) error
}

ContextStore provides typed state persistence for stateful providers. Implementations live in sub-modules (redis, database, etc.) to avoid forcing dependencies on the core.

The key is an opaque string — the consumer decides the key schema. TTL of 0 means no expiration.

type Duplex added in v0.1.1

type Duplex[I, O any] interface {
	Provider
	Open(ctx context.Context) (DuplexStream[I, O], error)
}

Duplex represents a provider with bidirectional communication. This covers: WebSocket, gRPC bidi-stream, long-running subprocess.

func WithDuplexResilience added in v0.1.1

func WithDuplexResilience[I, O any](p Duplex[I, O], cfg ResilienceConfig) Duplex[I, O]

WithDuplexResilience wraps a Duplex provider with resilience middleware. Only CircuitBreaker and RateLimiter are applied to the Open call. Retry is not applied — persistent connections should reconnect at a higher level.

type DuplexStream added in v0.1.1

type DuplexStream[I, O any] interface {
	// Send writes a value to the remote end.
	Send(I) error
	// Recv reads a value from the remote end. Returns io.EOF when closed.
	Recv() (O, error)
	// Close terminates the stream.
	Close() error
}

DuplexStream provides bidirectional communication over a single connection.

type Factory

type Factory[T Provider] func(cfg map[string]any) (T, error)

Factory creates a provider instance from configuration.

type HealthCheckSelector

type HealthCheckSelector[T Provider] struct{}

HealthCheckSelector picks the first available provider by calling IsAvailable.

func (*HealthCheckSelector[T]) Select

func (s *HealthCheckSelector[T]) Select(ctx context.Context, providers map[string]T) (T, error)

Select returns the first provider that reports as available.

type Initializable added in v0.1.1

type Initializable interface {
	Init(ctx context.Context) error
}

Initializable is optionally implemented by providers that need setup before handling requests (e.g., dial gRPC, validate binary, warm cache). The Manager calls Init() automatically when initializing providers.

type Iterator added in v0.1.1

type Iterator[T any] interface {
	// Next returns the next value. Returns (zero, false, nil) when exhausted.
	Next(ctx context.Context) (T, bool, error)
	// Close releases any resources held by the iterator.
	Close() error
}

Iterator provides pull-based sequential access to a stream of values. The consumer calls Next() to retrieve values one at a time. Close must be called when done to release resources.

type Manager

type Manager[T Provider] struct {
	// contains filtered or unexported fields
}

Manager provides the main API for working with providers, combining a Registry for storage and a Selector for choosing providers.

func NewManager

func NewManager[T Provider](registry *Registry[T], selector Selector[T]) *Manager[T]

NewManager creates a Manager backed by the given registry and selector.

func (*Manager[T]) Available

func (m *Manager[T]) Available() []string

Available returns the names of all initialized providers.

func (*Manager[T]) CloseAll added in v0.1.1

func (m *Manager[T]) CloseAll(ctx context.Context) error

CloseAll calls Close() on all providers that implement Closeable.

func (*Manager[T]) Get

func (m *Manager[T]) Get(ctx context.Context) (T, error)

Get returns a provider chosen by the selector, or the default if set.

func (*Manager[T]) GetByName

func (m *Manager[T]) GetByName(name string) (T, error)

GetByName returns a specific provider by name.

func (*Manager[T]) Initialize

func (m *Manager[T]) Initialize(name string, cfg map[string]any) error

Initialize creates a provider from its factory, calls Init() if the provider implements Initializable, and stores it for use.

func (*Manager[T]) InitializeWithContext added in v0.1.1

func (m *Manager[T]) InitializeWithContext(ctx context.Context, name string, cfg map[string]any) error

InitializeWithContext creates a provider from its factory, calls Init() if the provider implements Initializable, and stores it for use.

func (*Manager[T]) InitializeWithResilience added in v0.1.1

func (m *Manager[T]) InitializeWithResilience(ctx context.Context, name string, cfg map[string]any, wrap func(T) T) error

InitializeWithResilience creates a provider from its factory, wraps it with the given middleware function, calls Init(), and stores it for use. The wrap function applies resilience (or any other middleware) to the provider. Example:

mgr.InitializeWithResilience(ctx, "http", nil, func(p MyProvider) MyProvider {
    return provider.WithResilience(p, resilienceCfg).(MyProvider)
})

func (*Manager[T]) Register

func (m *Manager[T]) Register(name string, factory Factory[T])

Register adds a factory to the underlying registry.

func (*Manager[T]) SetDefault

func (m *Manager[T]) SetDefault(name string) error

SetDefault sets the default provider by name.

type MemoryStore added in v0.1.2

type MemoryStore[C any] struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory ContextStore for testing and development. It enforces TTL expiration on Load for testing fidelity. Not intended for production use (no persistence, no distributed locking).

func NewMemoryStore added in v0.1.2

func NewMemoryStore[C any]() *MemoryStore[C]

NewMemoryStore creates a new in-memory ContextStore.

func (*MemoryStore[C]) Delete added in v0.1.2

func (s *MemoryStore[C]) Delete(_ context.Context, key string) error

Delete removes state.

func (*MemoryStore[C]) Len added in v0.1.2

func (s *MemoryStore[C]) Len() int

Len returns the number of entries (including expired but not yet cleaned up). Useful for test assertions.

func (*MemoryStore[C]) Load added in v0.1.2

func (s *MemoryStore[C]) Load(_ context.Context, key string) (*C, error)

Load retrieves state. Returns (nil, nil) if key doesn't exist or has expired.

func (*MemoryStore[C]) Save added in v0.1.2

func (s *MemoryStore[C]) Save(_ context.Context, key string, val *C, ttl time.Duration) error

Save persists state with optional TTL. TTL of 0 means no expiration.

type Middleware added in v0.1.2

type Middleware[I, O any] func(RequestResponse[I, O]) RequestResponse[I, O]

Middleware transforms a RequestResponse provider by wrapping it. The returned provider typically delegates to the original while adding cross-cutting behavior (logging, metrics, tracing, etc.).

func Chain added in v0.1.2

func Chain[I, O any](middlewares ...Middleware[I, O]) Middleware[I, O]

Chain composes multiple middlewares into one. Middlewares are applied in order: the first middleware is outermost (executes first on the way in, last on the way out).

Chain(a, b, c)(provider) is equivalent to a(b(c(provider))).

func WithLogging added in v0.1.2

func WithLogging[I, O any](log *logger.Logger) Middleware[I, O]

WithLogging returns a Middleware that logs each Execute call. Logs: provider name, duration, and success/error status.

func WithMetrics added in v0.1.2

func WithMetrics[I, O any](metrics *observability.Metrics) Middleware[I, O]

WithMetrics returns a Middleware that records execution metrics using the gokit observability.Metrics instruments. Records: operation count, duration histogram, and errors.

func WithTracing added in v0.1.2

func WithTracing[I, O any](serviceName string) Middleware[I, O]

WithTracing returns a Middleware that creates an OpenTelemetry span around each Execute call using the gokit observability package. The span name is "{serviceName}.{providerName}".

type PrioritySelector

type PrioritySelector[T Provider] struct {
	// Priority is the ordered list of provider names to try.
	Priority []string
}

PrioritySelector tries providers in the given priority order and returns the first one that is available.

func (*PrioritySelector[T]) Select

func (s *PrioritySelector[T]) Select(ctx context.Context, providers map[string]T) (T, error)

Select returns the first available provider in priority order.

type Provider

type Provider interface {
	// Name returns the provider's unique name.
	Name() string
	// IsAvailable checks if the provider is ready to handle requests.
	IsAvailable(ctx context.Context) bool
}

Provider is the base interface all providers must implement.

type Registry

type Registry[T Provider] struct {
	// contains filtered or unexported fields
}

Registry manages named provider factories and cached instances.

func NewRegistry

func NewRegistry[T Provider]() *Registry[T]

NewRegistry creates a new empty Registry.

func (*Registry[T]) Create

func (r *Registry[T]) Create(name string, cfg map[string]any) (T, error)

Create instantiates a provider using the named factory and config.

func (*Registry[T]) Get

func (r *Registry[T]) Get(name string) (T, bool)

Get returns a cached provider instance by name.

func (*Registry[T]) List

func (r *Registry[T]) List() []string

List returns sorted names of all registered factories.

func (*Registry[T]) RegisterFactory

func (r *Registry[T]) RegisterFactory(name string, factory Factory[T])

RegisterFactory registers a named factory for creating providers.

func (*Registry[T]) Set

func (r *Registry[T]) Set(name string, instance T)

Set caches a provider instance by name.

type RequestResponse added in v0.1.1

type RequestResponse[I, O any] interface {
	Provider
	Execute(ctx context.Context, input I) (O, error)
}

RequestResponse represents a provider that takes one input and returns one output. This covers: HTTP calls, gRPC unary, subprocess exec, SQL queries, S3 operations.

func Adapt added in v0.1.3

func Adapt[I, O, BI, BO any](
	inner RequestResponse[BI, BO],
	name string,
	mapIn func(ctx context.Context, input I) (BI, error),
	mapOut func(output BO) (O, error),
) RequestResponse[I, O]

Adapt wraps a RequestResponse provider with input/output type transformation. This bridges a backend service with types [BI, BO] to a domain interface with types [I, O].

mapIn converts the domain input to the backend input. mapOut converts the backend output to the domain output.

Adapt composes naturally with WithResilience, Stateful, and other middleware.

func WithResilience added in v0.1.1

func WithResilience[I, O any](p RequestResponse[I, O], cfg ResilienceConfig) RequestResponse[I, O]

WithResilience wraps a RequestResponse provider with resilience middleware. Execution chain: RateLimiter → Bulkhead → CircuitBreaker → Retry → Execute. Nil config fields are skipped. Empty config returns the provider unchanged.

type ResilienceConfig added in v0.1.1

type ResilienceConfig struct {
	// CircuitBreaker prevents cascading failures by stopping calls after repeated errors.
	CircuitBreaker *resilience.CircuitBreakerConfig
	// Retry automatically retries failed calls with exponential backoff.
	Retry *resilience.RetryConfig
	// RateLimiter limits the rate of calls using a token bucket algorithm.
	RateLimiter *resilience.RateLimiterConfig
	// Bulkhead limits concurrent calls to prevent resource exhaustion.
	Bulkhead *resilience.BulkheadConfig
}

ResilienceConfig bundles optional resilience policies for a provider. Nil fields are skipped — zero config means pure passthrough with no overhead.

func (ResilienceConfig) IsEmpty added in v0.1.1

func (c ResilienceConfig) IsEmpty() bool

IsEmpty returns true if no resilience policies are configured.

type ResilienceState added in v0.1.1

type ResilienceState struct {
	// contains filtered or unexported fields
}

ResilienceState holds initialized resilience primitives built from config.

func BuildResilience added in v0.1.1

func BuildResilience(cfg ResilienceConfig) *ResilienceState

BuildResilience creates initialized resilience primitives from config.

type RoundRobinSelector

type RoundRobinSelector[T Provider] struct {
	// contains filtered or unexported fields
}

RoundRobinSelector distributes requests across providers.

func (*RoundRobinSelector[T]) Select

func (s *RoundRobinSelector[T]) Select(ctx context.Context, providers map[string]T) (T, error)

Select picks the next provider using round-robin distribution.

type Selector

type Selector[T Provider] interface {
	Select(ctx context.Context, providers map[string]T) (T, error)
}

Selector picks a provider from the available options.

type Sink added in v0.1.1

type Sink[I any] interface {
	Provider
	Send(ctx context.Context, input I) error
}

Sink represents a provider that accepts input with no meaningful output. This covers: Kafka produce, webhook, push notification, logging.

func WithSinkResilience added in v0.1.1

func WithSinkResilience[I any](p Sink[I], cfg ResilienceConfig) Sink[I]

WithSinkResilience wraps a Sink provider with resilience middleware. Execution chain: RateLimiter → Bulkhead → CircuitBreaker → Retry → Send.

type Stateful added in v0.1.2

type Stateful[I, O, C any] struct {
	// contains filtered or unexported fields
}

Stateful wraps a RequestResponse provider with automatic state load/save around each Execute call.

Before Execute: loads state from store, calls inject to enrich input. After Execute: calls extract to derive updated state from output, saves to store.

If the store returns nil (first call for a key), inject receives nil — the consumer's inject function handles initialization.

Stateful implements RequestResponse[I, O] so it composes with WithResilience and other middleware.

func NewStateful added in v0.1.2

func NewStateful[I, O, C any](cfg StatefulConfig[I, O, C]) *Stateful[I, O, C]

NewStateful creates a Stateful wrapper from configuration.

func (*Stateful[I, O, C]) Execute added in v0.1.2

func (s *Stateful[I, O, C]) Execute(ctx context.Context, input I) (O, error)

Execute loads state, injects it into input, executes inner, extracts state, and saves.

func (*Stateful[I, O, C]) IsAvailable added in v0.1.2

func (s *Stateful[I, O, C]) IsAvailable(ctx context.Context) bool

IsAvailable delegates to the inner provider.

func (*Stateful[I, O, C]) Name added in v0.1.2

func (s *Stateful[I, O, C]) Name() string

Name delegates to the inner provider.

type StatefulConfig added in v0.1.2

type StatefulConfig[I, O, C any] struct {
	// Inner is the wrapped provider.
	Inner RequestResponse[I, O]
	// Store provides state persistence.
	Store ContextStore[C]
	// KeyFunc derives the state key from input.
	KeyFunc func(I) string
	// Inject enriches input with loaded state. Receives nil state on first call.
	Inject func(I, *C) I
	// Extract derives updated state from input and output for persistence.
	Extract func(I, O) *C
	// TTL is the time-to-live for persisted state. Zero means no expiration.
	TTL time.Duration
}

StatefulConfig holds configuration for creating a Stateful wrapper.

type Stream added in v0.1.1

type Stream[I, O any] interface {
	Provider
	Execute(ctx context.Context, input I) (Iterator[O], error)
}

Stream represents a provider that takes one input and returns multiple outputs. This covers: gRPC server-stream, subprocess stdout pipe, SSE, chunked HTTP.

func WithStreamResilience added in v0.1.1

func WithStreamResilience[I, O any](p Stream[I, O], cfg ResilienceConfig) Stream[I, O]

WithStreamResilience wraps a Stream provider with resilience middleware. Resilience is applied to the Execute call that opens the stream. Individual Next() calls on the returned Iterator are NOT wrapped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL