provider

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 9 Imported by: 0

README

provider

Generic provider registry, manager, and selection strategies for pluggable backends.

Install

go get github.com/kbukum/gokit

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/kbukum/gokit/provider"
)

// MyProvider implements provider.Provider
type MyProvider struct{ name string }
func (p *MyProvider) Name() string                          { return p.name }
func (p *MyProvider) IsAvailable(ctx context.Context) bool  { return true }

func main() {
    registry := provider.NewRegistry[*MyProvider]()
    selector := &provider.PrioritySelector[*MyProvider]{}
    mgr := provider.NewManager(registry, selector)

    // Register a factory
    mgr.Register("primary", func(cfg map[string]any) (*MyProvider, error) {
        return &MyProvider{name: "primary"}, nil
    })
    mgr.Initialize("primary", nil)
    mgr.SetDefault("primary")

    // Get best available provider
    p, err := mgr.Get(context.Background())
    fmt.Println(p.Name(), err) // "primary" <nil>
}

Key Types & Functions

Name Description
Provider Interface: Name() + IsAvailable()
Registry[T] Generic provider registry with factory support
Manager[T] Manages provider lifecycle and selection
Factory[T] Factory function type for creating providers
Selector[T] Interface for provider selection strategy
PrioritySelector / RoundRobinSelector / HealthCheckSelector Built-in selection strategies

⬅ Back to main README

Documentation

Overview

Package provider implements a generic provider framework using Go generics for swappable backends with runtime switching capabilities.

It provides a registry for managing multiple provider implementations with factory-based instantiation, availability checking, and runtime selection.

The package defines four interaction patterns:

  • RequestResponse[I, O]: one input → one output (HTTP, gRPC, subprocess)
  • Stream[I, O]: one input → many outputs (SSE, streaming gRPC, piped subprocess)
  • Sink[I]: one input → ack (Kafka produce, webhook, push notification)
  • Duplex[I, O]: bidirectional (WebSocket, gRPC bidi-stream)

Opt-in lifecycle:

  • Initializable: providers that need setup (dial gRPC, validate binary)
  • Closeable: providers that hold resources (connections, daemon processes)

Usage

reg := provider.NewRegistry[MyProvider]()
reg.RegisterFactory("default", myFactory)
mgr := provider.NewManager(reg, &provider.HealthCheckSelector[MyProvider]{})
mgr.InitializeWithContext(ctx, "default", cfg)
p, _ := mgr.Get(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExecuteWithResilience added in v0.1.1

func ExecuteWithResilience[T any](ctx context.Context, s *ResilienceState, fn func() (T, error)) (T, error)

ExecuteWithResilience runs fn through the resilience chain: RateLimiter.Wait → Bulkhead → CircuitBreaker → Retry → fn. Exported so other packages (e.g., process) can reuse the chain. Resilience errors are wrapped as gokit AppError for consistency.

Types

type Closeable added in v0.1.1

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is optionally implemented by providers that hold resources requiring explicit cleanup (e.g., gRPC connection, daemon process, LDAP bind). The Manager calls Close() automatically during shutdown.

type Duplex added in v0.1.1

type Duplex[I, O any] interface {
	Provider
	Open(ctx context.Context) (DuplexStream[I, O], error)
}

Duplex represents a provider with bidirectional communication. This covers: WebSocket, gRPC bidi-stream, long-running subprocess.

func WithDuplexResilience added in v0.1.1

func WithDuplexResilience[I, O any](p Duplex[I, O], cfg ResilienceConfig) Duplex[I, O]

WithDuplexResilience wraps a Duplex provider with resilience middleware. Only CircuitBreaker and RateLimiter are applied to the Open call. Retry is not applied — persistent connections should reconnect at a higher level.

type DuplexStream added in v0.1.1

type DuplexStream[I, O any] interface {
	// Send writes a value to the remote end.
	Send(I) error
	// Recv reads a value from the remote end. Returns io.EOF when closed.
	Recv() (O, error)
	// Close terminates the stream.
	Close() error
}

DuplexStream provides bidirectional communication over a single connection.

type Factory

type Factory[T Provider] func(cfg map[string]any) (T, error)

Factory creates a provider instance from configuration.

type HealthCheckSelector

type HealthCheckSelector[T Provider] struct{}

HealthCheckSelector picks the first available provider by calling IsAvailable.

func (*HealthCheckSelector[T]) Select

func (s *HealthCheckSelector[T]) Select(ctx context.Context, providers map[string]T) (T, error)

Select returns the first provider that reports as available.

type Initializable added in v0.1.1

type Initializable interface {
	Init(ctx context.Context) error
}

Initializable is optionally implemented by providers that need setup before handling requests (e.g., dial gRPC, validate binary, warm cache). The Manager calls Init() automatically when initializing providers.

type Iterator added in v0.1.1

type Iterator[T any] interface {
	// Next returns the next value. Returns (zero, false, nil) when exhausted.
	Next(ctx context.Context) (T, bool, error)
	// Close releases any resources held by the iterator.
	Close() error
}

Iterator provides pull-based sequential access to a stream of values. The consumer calls Next() to retrieve values one at a time. Close must be called when done to release resources.

type Manager

type Manager[T Provider] struct {
	// contains filtered or unexported fields
}

Manager provides the main API for working with providers, combining a Registry for storage and a Selector for choosing providers.

func NewManager

func NewManager[T Provider](registry *Registry[T], selector Selector[T]) *Manager[T]

NewManager creates a Manager backed by the given registry and selector.

func (*Manager[T]) Available

func (m *Manager[T]) Available() []string

Available returns the names of all initialized providers.

func (*Manager[T]) CloseAll added in v0.1.1

func (m *Manager[T]) CloseAll(ctx context.Context) error

CloseAll calls Close() on all providers that implement Closeable.

func (*Manager[T]) Get

func (m *Manager[T]) Get(ctx context.Context) (T, error)

Get returns a provider chosen by the selector, or the default if set.

func (*Manager[T]) GetByName

func (m *Manager[T]) GetByName(name string) (T, error)

GetByName returns a specific provider by name.

func (*Manager[T]) Initialize

func (m *Manager[T]) Initialize(name string, cfg map[string]any) error

Initialize creates a provider from its factory, calls Init() if the provider implements Initializable, and stores it for use.

func (*Manager[T]) InitializeWithContext added in v0.1.1

func (m *Manager[T]) InitializeWithContext(ctx context.Context, name string, cfg map[string]any) error

InitializeWithContext creates a provider from its factory, calls Init() if the provider implements Initializable, and stores it for use.

func (*Manager[T]) InitializeWithResilience added in v0.1.1

func (m *Manager[T]) InitializeWithResilience(ctx context.Context, name string, cfg map[string]any, wrap func(T) T) error

InitializeWithResilience creates a provider from its factory, wraps it with the given middleware function, calls Init(), and stores it for use. The wrap function applies resilience (or any other middleware) to the provider. Example:

mgr.InitializeWithResilience(ctx, "http", nil, func(p MyProvider) MyProvider {
    return provider.WithResilience(p, resilienceCfg).(MyProvider)
})

func (*Manager[T]) Register

func (m *Manager[T]) Register(name string, factory Factory[T])

Register adds a factory to the underlying registry.

func (*Manager[T]) SetDefault

func (m *Manager[T]) SetDefault(name string) error

SetDefault sets the default provider by name.

type PrioritySelector

type PrioritySelector[T Provider] struct {
	// Priority is the ordered list of provider names to try.
	Priority []string
}

PrioritySelector tries providers in the given priority order and returns the first one that is available.

func (*PrioritySelector[T]) Select

func (s *PrioritySelector[T]) Select(ctx context.Context, providers map[string]T) (T, error)

Select returns the first available provider in priority order.

type Provider

type Provider interface {
	// Name returns the provider's unique name.
	Name() string
	// IsAvailable checks if the provider is ready to handle requests.
	IsAvailable(ctx context.Context) bool
}

Provider is the base interface all providers must implement.

type Registry

type Registry[T Provider] struct {
	// contains filtered or unexported fields
}

Registry manages named provider factories and cached instances.

func NewRegistry

func NewRegistry[T Provider]() *Registry[T]

NewRegistry creates a new empty Registry.

func (*Registry[T]) Create

func (r *Registry[T]) Create(name string, cfg map[string]any) (T, error)

Create instantiates a provider using the named factory and config.

func (*Registry[T]) Get

func (r *Registry[T]) Get(name string) (T, bool)

Get returns a cached provider instance by name.

func (*Registry[T]) List

func (r *Registry[T]) List() []string

List returns sorted names of all registered factories.

func (*Registry[T]) RegisterFactory

func (r *Registry[T]) RegisterFactory(name string, factory Factory[T])

RegisterFactory registers a named factory for creating providers.

func (*Registry[T]) Set

func (r *Registry[T]) Set(name string, instance T)

Set caches a provider instance by name.

type RequestResponse added in v0.1.1

type RequestResponse[I, O any] interface {
	Provider
	Execute(ctx context.Context, input I) (O, error)
}

RequestResponse represents a provider that takes one input and returns one output. This covers: HTTP calls, gRPC unary, subprocess exec, SQL queries, S3 operations.

func WithResilience added in v0.1.1

func WithResilience[I, O any](p RequestResponse[I, O], cfg ResilienceConfig) RequestResponse[I, O]

WithResilience wraps a RequestResponse provider with resilience middleware. Execution chain: RateLimiter → Bulkhead → CircuitBreaker → Retry → Execute. Nil config fields are skipped. Empty config returns the provider unchanged.

type ResilienceConfig added in v0.1.1

type ResilienceConfig struct {
	// CircuitBreaker prevents cascading failures by stopping calls after repeated errors.
	CircuitBreaker *resilience.CircuitBreakerConfig
	// Retry automatically retries failed calls with exponential backoff.
	Retry *resilience.RetryConfig
	// RateLimiter limits the rate of calls using a token bucket algorithm.
	RateLimiter *resilience.RateLimiterConfig
	// Bulkhead limits concurrent calls to prevent resource exhaustion.
	Bulkhead *resilience.BulkheadConfig
}

ResilienceConfig bundles optional resilience policies for a provider. Nil fields are skipped — zero config means pure passthrough with no overhead.

func (ResilienceConfig) IsEmpty added in v0.1.1

func (c ResilienceConfig) IsEmpty() bool

IsEmpty returns true if no resilience policies are configured.

type ResilienceState added in v0.1.1

type ResilienceState struct {
	// contains filtered or unexported fields
}

ResilienceState holds initialized resilience primitives built from config.

func BuildResilience added in v0.1.1

func BuildResilience(cfg ResilienceConfig) *ResilienceState

BuildResilience creates initialized resilience primitives from config.

type RoundRobinSelector

type RoundRobinSelector[T Provider] struct {
	// contains filtered or unexported fields
}

RoundRobinSelector distributes requests across providers.

func (*RoundRobinSelector[T]) Select

func (s *RoundRobinSelector[T]) Select(ctx context.Context, providers map[string]T) (T, error)

Select picks the next provider using round-robin distribution.

type Selector

type Selector[T Provider] interface {
	Select(ctx context.Context, providers map[string]T) (T, error)
}

Selector picks a provider from the available options.

type Sink added in v0.1.1

type Sink[I any] interface {
	Provider
	Send(ctx context.Context, input I) error
}

Sink represents a provider that accepts input with no meaningful output. This covers: Kafka produce, webhook, push notification, logging.

func WithSinkResilience added in v0.1.1

func WithSinkResilience[I any](p Sink[I], cfg ResilienceConfig) Sink[I]

WithSinkResilience wraps a Sink provider with resilience middleware. Execution chain: RateLimiter → Bulkhead → CircuitBreaker → Retry → Send.

type Stream added in v0.1.1

type Stream[I, O any] interface {
	Provider
	Execute(ctx context.Context, input I) (Iterator[O], error)
}

Stream represents a provider that takes one input and returns multiple outputs. This covers: gRPC server-stream, subprocess stdout pipe, SSE, chunked HTTP.

func WithStreamResilience added in v0.1.1

func WithStreamResilience[I, O any](p Stream[I, O], cfg ResilienceConfig) Stream[I, O]

WithStreamResilience wraps a Stream provider with resilience middleware. Resilience is applied to the Execute call that opens the stream. Individual Next() calls on the returned Iterator are NOT wrapped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL