connfx

package
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2025 License: Apache-2.0 Imports: 31 Imported by: 0

README

ajan/connfx

Overview

connfx provides a unified connection management system for all external integrations in the ajan framework. It features centralized OTLP connection management that enables logfx, metricsfx, and tracesfx to share connections efficiently while maintaining separation of concerns.

Key Features
  • 🔗 Unified Connection Management - Single registry for all external connections
  • 🌐 OTLP Adapter - Centralized OpenTelemetry Protocol connections shared across observability packages
  • 🔄 Connection Pooling - Efficient resource sharing and lifecycle management
  • 🛡️ Health Monitoring - Built-in connection health checks and graceful fallbacks
  • 🎛️ Protocol Support - Support for HTTP, Redis, SQL, OTLP, and custom protocols
  • ⚙️ Configuration Management - Environment-based configuration with validation
  • 🔧 Bridge Pattern - Avoid import cycles while enabling package integration

Quick Start

Basic OTLP Connection Setup
package main

import (
    "context"
    "time"

    "github.com/eser/ajan/connfx"
    "github.com/eser/ajan/logfx"
)

func main() {
    ctx := context.Background()

    // Create connection registry
    logger := logfx.NewLogger()
    registry := connfx.NewRegistryWithDefaults(logger)

    // Configure OTLP connection for observability stack
    _, err := registry.AddConnection(ctx, "otel", &connfx.ConfigTarget{
        Protocol: "otlp",
        DSN:      "otel-collector:4318",
        Properties: map[string]any{
            "service_name":    "my-service",
            "service_version": "1.0.0",
            "insecure":        true,
        },
    })
    if err != nil {
        panic(err)
    }

    // Check connection health
    status := registry.HealthCheck(ctx)
    if status["otel"] == "healthy" {
        logger.Info("OTLP connection established successfully")
    }

    // Connection is now available for all observability packages
    defer registry.Close(ctx)
}
Complete Observability Stack Integration
package main

import (
    "context"
    "net/http"
    "time"

    "github.com/eser/ajan/connfx"
    "github.com/eser/ajan/httpfx"
    "github.com/eser/ajan/httpfx/middlewares"
    "github.com/eser/ajan/logfx"
    "github.com/eser/ajan/metricsfx"
    "github.com/eser/ajan/tracesfx"
)

func main() {
    ctx := context.Background()

    // Step 1: Create connection registry with multiple connections
    logger := logfx.NewLogger()
    registry := connfx.NewRegistryWithDefaults(logger)

    // OTLP connection for observability
    _, err := registry.AddConnection(ctx, "otel", &connfx.ConfigTarget{
        Protocol: "otlp",
        DSN:      "otel-collector:4318",
        Properties: map[string]any{
            "service_name":     "my-api",
            "service_version":  "1.0.0",
            "insecure":         true,
        },
    })
    if err != nil {
        panic(err)
    }

    // Redis connection for caching
    _, err = registry.AddConnection(ctx, "cache", &connfx.ConfigTarget{
        Protocol: "redis",
        URL:      "redis://localhost:6379/0",
    })
    if err != nil {
        panic(err)
    }

    // Database connection
    _, err = registry.AddConnection(ctx, "db", &connfx.ConfigTarget{
        Protocol: "sql",
        URL:      "postgres://user:pass@localhost/mydb",
        Properties: map[string]any{
            "max_open_connections": 25,
            "max_idle_connections": 10,
        },
    })
    if err != nil {
        panic(err)
    }

    // Step 2: Create observability stack using shared OTLP connection

    // All packages reference the same "otel" connection
    logger = logfx.NewLogger(
        logfx.WithConfig(&logfx.Config{
            Level:              "INFO",
            OTLPConnectionName: "otel",
        }),
        logfx.WithRegistry(registry),
    )

    metricsProvider := metricsfx.NewMetricsProvider(&metricsfx.Config{
        ServiceName:        "my-api",
        ServiceVersion:     "1.0.0",
        OTLPConnectionName: "otel",
        ExportInterval:     15 * time.Second,
    }, registry)
    _ = metricsProvider.Init()

    tracesProvider := tracesfx.NewTracesProvider(&tracesfx.Config{
        ServiceName:        "my-api",
        ServiceVersion:     "1.0.0",
        OTLPConnectionName: "otel",
        SampleRatio:        1.0,
    }, registry)
    _ = tracesProvider.Init()

    // Step 3: Use connections in application
    router := httpfx.NewRouter("/api")

    // Add observability middleware
    tracer := tracesProvider.Tracer("my-api")
    router.Use(middlewares.CorrelationIDMiddleware())
    router.Use(middlewares.TracingMiddleware(tracer))
    router.Use(middlewares.LoggingMiddleware(logger))

    router.Route("GET /health", func(ctx *httpfx.Context) httpfx.Result {
        // Check all connection health
        health := registry.HealthCheck(ctx.Request.Context())

        // Use connections in handler
        // Redis connection automatically available
        // Database connection automatically available
        // OTLP automatically exports logs, metrics, traces

        return ctx.Results.JSON(map[string]any{
            "status":      "healthy",
            "connections": health,
        })
    })

    // Cleanup all connections on shutdown
    defer registry.Close(ctx)

    http.ListenAndServe(":8080", router.GetMux())
}

OTLP Adapter - Unified Observability

Why Centralized OTLP Connections?

The OTLP adapter in connfx provides a unified approach to OpenTelemetry connections:

Problem (Before):

// Each package configured separately - duplicated configuration
logger := logfx.NewLogger(logfx.WithOTLP("otel-collector:4318", true))
metrics := metricsfx.NewMetricsProvider(&metricsfx.Config{
    OTLPEndpoint: "otel-collector:4318",
    ServiceName:  "my-service",
})
traces := tracesfx.NewTracesProvider(&tracesfx.Config{
    OTLPEndpoint: "otel-collector:4318",
    ServiceName:  "my-service",
})

Solution (After):

// Single OTLP connection shared across all packages
registry.AddConnection(ctx, "otel", &connfx.ConfigTarget{
    Protocol: "otlp",
    DSN:      "otel-collector:4318",
    Properties: map[string]any{
        "service_name":    "my-service",
        "service_version": "1.0.0",
    },
})

// All packages reference the same connection
logger := logfx.NewLogger(logfx.WithOTLP("otel"), logfx.WithRegistry(registry))
metrics := metricsfx.NewMetricsProvider(&metricsfx.Config{OTLPConnectionName: "otel"}, registry)
traces := tracesfx.NewTracesProvider(&tracesfx.Config{OTLPConnectionName: "otel"}, registry)
OTLP Configuration Options
// Complete OTLP connection configuration
otlpConfig := &connfx.ConfigTarget{
    Protocol: "otlp",
    DSN:      "otel-collector:4318",

    // Connection settings
    TLS:      false,                         // Enable TLS

    Properties: map[string]any{
        // Service identification (applied to all signals)
        "service_name":     "my-service",
        "service_version":  "1.0.0",
        "environment":      "production",

        // Connection settings
        "insecure":         true,             // Use HTTP instead of HTTPS
        "timeout":          30 * time.Second, // Connection timeout

        // Export configuration
        "export_interval":  30 * time.Second, // Metrics export interval
        "batch_timeout":    5 * time.Second,  // Traces batch timeout
        "batch_size":       512,              // Traces batch size
        "sample_ratio":     1.0,              // Traces sampling ratio

        // Resource attributes (applied to all signals)
        "deployment.environment": "production",
        "service.namespace":      "ecommerce",
        "service.instance.id":    "pod-123",
    },
}

_, err := registry.AddConnection(ctx, "otel", otlpConfig)
Environment-Based OTLP Configuration
# OTLP connection configuration
CONN_TARGETS_OTEL_PROTOCOL=otlp
CONN_TARGETS_OTEL_DSN=otel-collector:4318
CONN_TARGETS_OTEL_TLS=false

# Service identification
CONN_TARGETS_OTEL_PROPERTIES_SERVICE_NAME=my-service
CONN_TARGETS_OTEL_PROPERTIES_SERVICE_VERSION=1.0.0
CONN_TARGETS_OTEL_PROPERTIES_ENVIRONMENT=production

# Connection settings
CONN_TARGETS_OTEL_PROPERTIES_INSECURE=true
CONN_TARGETS_OTEL_PROPERTIES_TIMEOUT=30s

# Export configuration
CONN_TARGETS_OTEL_PROPERTIES_EXPORT_INTERVAL=30s
CONN_TARGETS_OTEL_PROPERTIES_BATCH_TIMEOUT=5s
CONN_TARGETS_OTEL_PROPERTIES_BATCH_SIZE=512
CONN_TARGETS_OTEL_PROPERTIES_SAMPLE_RATIO=1.0

# Package configuration (references the connection)
LOG_OTLP_CONNECTION_NAME=otel
METRICS_OTLP_CONNECTION_NAME=otel
TRACES_OTLP_CONNECTION_NAME=otel
Multiple OTLP Environments
// Different OTLP endpoints for different purposes
connections := map[string]*connfx.ConfigTarget{
    // Production telemetry
    "otel-prod": {
        Protocol: "otlp",
        URL:      "https://prod-collector:4317",
        TLS:      true,
        Properties: map[string]any{
            "service_name":    "my-service",
            "environment":     "production",
            "sample_ratio":    0.1,  // 10% sampling for production
        },
    },

    // Development telemetry
    "otel-dev": {
        Protocol: "otlp",
        URL:      "http://dev-collector:4318",
        Properties: map[string]any{
            "service_name":    "my-service-dev",
            "environment":     "development",
            "sample_ratio":    1.0,  // 100% sampling for development
        },
    },

    // Business metrics (separate endpoint)
    "otel-business": {
        Protocol: "otlp",
        URL:      "http://business-metrics:4318",
        Properties: map[string]any{
            "service_name":     "my-service",
            "export_interval":  60 * time.Second,  // Less frequent for business metrics
        },
    },
}

// Add all connections
for name, config := range connections {
    _, err := registry.AddConnection(ctx, name, config)
    if err != nil {
        logger.Error("Failed to add connection", "name", name, "error", err)
    }
}

// Use different connections for different purposes
prodLogger := logfx.NewLogger(logfx.WithOTLP("otel-prod"), logfx.WithRegistry(registry))
devMetrics := metricsfx.NewMetricsProvider(&metricsfx.Config{OTLPConnectionName: "otel-dev"}, registry)
businessMetrics := metricsfx.NewMetricsProvider(&metricsfx.Config{OTLPConnectionName: "otel-business"}, registry)

Protocol Support

HTTP Connections
_, err := registry.AddConnection(ctx, "api", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://api.external.com",
    Properties: map[string]any{
        "timeout":         30 * time.Second,
        "max_connections": 100,
        "headers": map[string]string{
            "Authorization": "Bearer token",
            "User-Agent":    "my-service/1.0.0",
        },
    },
})
Redis Connections
_, err := registry.AddConnection(ctx, "cache", &connfx.ConfigTarget{
    Protocol: "redis",
    URL:      "redis://localhost:6379/0",
    Properties: map[string]any{
        "max_idle":        10,
        "max_active":      100,
        "idle_timeout":    240 * time.Second,
        "password":        "secret",
    },
})
SQL Database Connections
_, err := registry.AddConnection(ctx, "db", &connfx.ConfigTarget{
    Protocol: "sql",
    URL:      "postgres://user:pass@localhost/mydb?sslmode=disable",
    Properties: map[string]any{
        "max_open_connections": 25,
        "max_idle_connections": 10,
        "connection_max_lifetime": 5 * time.Minute,
    },
})

Connection Management

Health Monitoring
// Check individual connection
client, err := registry.GetConnection(ctx, "otel")
if err != nil {
    logger.Error("OTLP connection unavailable", "error", err)
}

// Check all connections
healthStatus := registry.HealthCheck(ctx)
for name, status := range healthStatus {
    logger.Info("Connection status", "name", name, "status", status)
}

// Health check returns:
// - "healthy": Connection is working properly
// - "unhealthy": Connection has issues
// - "unknown": Connection status cannot be determined
Connection Lifecycle
// Get connection for use
conn, err := registry.GetConnection(ctx, "cache")
if err != nil {
    return fmt.Errorf("cache unavailable: %w", err)
}

// Use type assertion to get specific client
if redisConn, ok := conn.(*redis.Client); ok {
    return redisConn.Set(ctx, key, value, 0).Err()
}

// Connections are automatically cleaned up
defer registry.Close(ctx)  // Closes all connections gracefully
Registry Configuration
// Create registry with custom configuration
config := &connfx.RegistryConfig{
    HealthCheckInterval: 30 * time.Second,
    HealthCheckTimeout:  5 * time.Second,
    MaxConnections:      100,
    EnableLogging:       true,
}

registry := connfx.NewRegistry(config, logger)

// Add connections with validation
_, err := registry.AddConnection(ctx, "otel", otlpConfig)
if err != nil {
    logger.Error("Failed to configure OTLP", "error", err)
    // Registry handles failures gracefully
}

Bridge Pattern Implementation

How Packages Access Connections

The bridge pattern prevents import cycles while enabling packages to access connections:

// In metricsfx/bridge.go
type ConnectionRegistry interface {
    GetConnection(ctx context.Context, name string) (any, error)
}

func getOTLPExporter(registry ConnectionRegistry, connectionName string) (any, error) {
    if registry == nil {
        return nil, ErrNoRegistryProvided
    }

    // Use reflection to avoid direct dependency on connfx
    return getConnectionViaReflection(registry, connectionName, "otlp")
}

// In metricsfx/provider.go
func NewMetricsProvider(config *Config, registry ConnectionRegistry) *MetricsProvider {
    return &MetricsProvider{
        config:   config,
        registry: registry,  // Store registry interface
    }
}

func (p *MetricsProvider) Init() error {
    if p.config.OTLPConnectionName != "" {
        exporter, err := getOTLPExporter(p.registry, p.config.OTLPConnectionName)
        if err != nil {
            return fmt.Errorf("%w (name=%q): %w", ErrFailedToGetOTLPExporter, p.config.OTLPConnectionName, err)
        }
        // Use exporter...
    }
    return nil
}
Benefits of Bridge Pattern
  1. No Import Cycles - Observability packages don't directly import connfx
  2. Interface Segregation - Each package only sees the connection methods it needs
  3. Loose Coupling - Packages work with or without connection registry
  4. Testability - Easy to mock connection registry for tests
  5. Graceful Degradation - Packages continue working when connections are unavailable

Advanced Usage

Custom Connection Adapters
// Implement custom adapter for new protocols
type MyProtocolAdapter struct {
    config *connfx.ConfigTarget
    client *MyProtocolClient
}

func (a *MyProtocolAdapter) Connect(ctx context.Context) error {
    client, err := NewMyProtocolClient(a.config.URL)
    if err != nil {
        return err
    }
    a.client = client
    return nil
}

func (a *MyProtocolAdapter) Close(ctx context.Context) error {
    return a.client.Close()
}

func (a *MyProtocolAdapter) HealthCheck(ctx context.Context) (string, error) {
    if err := a.client.Ping(ctx); err != nil {
        return "unhealthy", err
    }
    return "healthy", nil
}

func (a *MyProtocolAdapter) GetClient() any {
    return a.client
}

// Register custom adapter
registry.RegisterAdapter("myprotocol", func(config *connfx.ConfigTarget) connfx.Adapter {
    return &MyProtocolAdapter{config: config}
})
Configuration Validation
// Add connection with validation
err := registry.AddConnection(ctx, "otel", &connfx.ConfigTarget{
    Protocol: "otlp",
    URL:      "invalid-url",  // This will be validated
})

if err != nil {
    // Handle configuration errors
    logger.Error("Invalid OTLP configuration", "error", err)
}
Observability Integration Benefits

The centralized OTLP connection approach provides significant benefits for observability:

  1. Unified Configuration - Configure service name, version, and endpoints once
  2. Consistent Attribution - All telemetry signals have consistent resource attributes
  3. Shared Connection Pooling - Efficient resource usage across all packages
  4. Centralized Health Monitoring - Monitor OTLP connection health from one place
  5. Environment Flexibility - Easy switching between dev/staging/prod collectors
  6. Cost Optimization - Single connection reduces overhead and resource usage
  7. Graceful Degradation - Applications continue working when OTLP is unavailable
  8. Configuration Validation - Catch configuration errors early
  9. Lifecycle Management - Proper connection setup and cleanup
  10. Bridge Pattern Benefits - Clean package separation without import cycles

Best Practices

  1. Centralize OTLP Configuration - Use a single OTLP connection for all observability signals
  2. Environment-Based Config - Use environment variables for different deployment environments
  3. Health Monitoring - Regularly check connection health and handle failures gracefully
  4. Connection Lifecycle - Always call registry.Close(ctx) during application shutdown
  5. Error Handling - Handle connection failures gracefully with fallback behavior
  6. Resource Management - Configure appropriate connection pools and timeouts
  7. Security - Use TLS for production OTLP connections
  8. Monitoring - Monitor connection health and performance metrics
  9. Testing - Mock connection registry for unit tests
  10. Documentation - Document connection dependencies for each service

Resilient HTTP Connections

Overview

The HTTP adapter in connfx now integrates with the httpclient package to provide robust, production-ready HTTP connections with built-in resilience features:

  • Circuit Breaker - Prevents cascading failures by opening the circuit when errors exceed threshold
  • Retry Strategy - Automatically retries failed requests with exponential backoff and jitter
  • Connection Pooling - Efficient HTTP connection management and reuse
  • Health Monitoring - Continuous health checks with detailed status reporting
  • TLS Support - Secure connections with certificate management
Basic HTTP Configuration
// Basic HTTP connection with default resilience settings
_, err := registry.AddConnection(ctx, "api", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://api.example.com",
    Properties: map[string]any{
        "headers": map[string]string{
            "User-Agent":    "my-service/1.0",
            "Authorization": "Bearer token",
        },
    },
})

// Use the connection
conn, err := registry.GetConnection(ctx, "api")
httpConn := conn.(*connfx.HTTPConnection)

// Get the resilient client
client := httpConn.GetClient() // Returns *httpclient.Client
// Or get standard client
stdClient := httpConn.GetStandardClient() // Returns *http.Client
Circuit Breaker Configuration

Control when connections are temporarily disabled to prevent cascading failures:

_, err := registry.AddConnection(ctx, "api-resilient", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://api.example.com",
    Properties: map[string]any{
        "circuit_breaker": map[string]any{
            "enabled":                   true,  // Enable circuit breaker
            "failure_threshold":         5,     // Trip after 5 consecutive failures
            "reset_timeout":             30 * time.Second, // Try to reset after 30s
            "half_open_success_needed":  2,     // Need 2 successes to fully close
        },
        "server_error_threshold": 500, // HTTP status codes >= 500 count as failures
    },
})

// Monitor circuit breaker state
httpConn := conn.(*connfx.HTTPConnection)
state := httpConn.GetCircuitBreakerState() // "Closed", "Open", or "HalfOpen"

Circuit Breaker States:

  • Closed - Normal operation, requests pass through
  • Open - Circuit is tripped, requests fail fast without hitting the server
  • HalfOpen - Testing if service has recovered, limited requests allowed
Retry Strategy Configuration

Automatically retry failed requests with intelligent backoff:

_, err := registry.AddConnection(ctx, "api-retry", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://api.example.com",
    Properties: map[string]any{
        "retry_strategy": map[string]any{
            "enabled":          true,   // Enable automatic retries
            "max_attempts":     3,      // Maximum total attempts (original + retries)
            "initial_interval": 100 * time.Millisecond, // Start with 100ms delay
            "max_interval":     10 * time.Second,       // Cap at 10s delay
            "multiplier":       2.0,    // Double the delay each retry
            "random_factor":    0.1,    // Add 10% jitter to prevent thundering herd
        },
    },
})

Exponential Backoff Example:

  • Attempt 1: No delay
  • Attempt 2: ~100ms delay (100ms + jitter)
  • Attempt 3: ~200ms delay (200ms + jitter)
  • etc.
Advanced HTTP Configuration

Complete configuration with all resilience features:

_, err := registry.AddConnection(ctx, "api-advanced", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://api.example.com",
    Timeout:  30 * time.Second,
    TLS:      true,

    // TLS certificate authentication (optional)
    CertFile: "/path/to/client.crt",
    KeyFile:  "/path/to/client.key",

    Properties: map[string]any{
        // Default headers for all requests
        "headers": map[string]string{
            "User-Agent":    "my-service/1.0",
            "Accept":        "application/json",
            "Content-Type":  "application/json",
            "Authorization": "Bearer token",
        },

        // Circuit breaker configuration
        "circuit_breaker": map[string]any{
            "enabled":                   true,
            "failure_threshold":         5,     // Trip after 5 failures
            "reset_timeout":             60 * time.Second, // Reset attempt after 1 minute
            "half_open_success_needed":  3,     // Need 3 successes to close circuit
        },

        // Retry strategy configuration
        "retry_strategy": map[string]any{
            "enabled":          true,
            "max_attempts":     4,      // Original + 3 retries
            "initial_interval": 250 * time.Millisecond,
            "max_interval":     30 * time.Second,
            "multiplier":       2.5,    // Aggressive backoff
            "random_factor":    0.2,    // 20% jitter
        },

        // HTTP error classification
        "server_error_threshold": 500, // Status >= 500 triggers circuit breaker
    },
})
Making HTTP Requests

Use the resilient HTTP client for all requests:

conn, err := registry.GetConnection(ctx, "api")
httpConn := conn.(*connfx.HTTPConnection)

// Method 1: Use the convenience method
req, err := httpConn.NewRequest(ctx, "GET", "/users/123", nil)
if err != nil {
    return err
}

client := httpConn.GetClient()
resp, err := client.Do(req)
if err != nil {
    // Error already went through circuit breaker and retry logic
    return fmt.Errorf("request failed after retries: %w", err)
}
defer resp.Body.Close()

// Method 2: Use standard HTTP client interface
stdClient := httpConn.GetStandardClient()
resp, err = stdClient.Get(httpConn.GetBaseURL() + "/health")

Request Body Types:

// String body
req, err := httpConn.NewRequest(ctx, "POST", "/api/data", `{"key": "value"}`)

// Byte slice body
data := []byte(`{"key": "value"}`)
req, err := httpConn.NewRequest(ctx, "POST", "/api/data", data)

// Reader body
body := strings.NewReader(`{"key": "value"}`)
req, err := httpConn.NewRequest(ctx, "POST", "/api/data", body)

// No body
req, err := httpConn.NewRequest(ctx, "GET", "/api/data", nil)
Health Monitoring

Monitor HTTP connection health with detailed status:

// Check individual connection health
status := httpConn.HealthCheck(ctx)
fmt.Printf("State: %s\n", status.State)        // Ready, Live, Connected, Error
fmt.Printf("Latency: %v\n", status.Latency)    // Response time
fmt.Printf("Message: %s\n", status.Message)    // Detailed status message

// Monitor circuit breaker state
cbState := httpConn.GetCircuitBreakerState()   // Closed, Open, HalfOpen
fmt.Printf("Circuit Breaker: %s\n", cbState)

// Check all connections
healthStatus := registry.HealthCheck(ctx)
for name, status := range healthStatus {
    fmt.Printf("Connection '%s': %s\n", name, status)
}

HTTP Health Check Logic:

  • Uses HEAD request to base URL (fast, minimal data transfer)
  • Falls back to GET request if HEAD returns 405 (Method Not Allowed)
  • Maps HTTP status codes to connection states:
    • 2xx: Ready (healthy and ready to serve)
    • 429: Live (healthy but rate-limited)
    • 503: Connected (reachable but temporarily unavailable)
    • 4xx: Connected (reachable but configuration issues)
    • 5xx: Error (server errors)
Environment Configuration

Configure HTTP connections via environment variables:

# Connection configuration
CONN_TARGETS_API_PROTOCOL=http
CONN_TARGETS_API_URL=https://api.example.com
CONN_TARGETS_API_TIMEOUT=30s
CONN_TARGETS_API_TLS=true

# Circuit breaker configuration
CONN_TARGETS_API_PROPERTIES_CIRCUIT_BREAKER_ENABLED=true
CONN_TARGETS_API_PROPERTIES_CIRCUIT_BREAKER_FAILURE_THRESHOLD=5
CONN_TARGETS_API_PROPERTIES_CIRCUIT_BREAKER_RESET_TIMEOUT=60s
CONN_TARGETS_API_PROPERTIES_CIRCUIT_BREAKER_HALF_OPEN_SUCCESS_NEEDED=2

# Retry strategy configuration
CONN_TARGETS_API_PROPERTIES_RETRY_STRATEGY_ENABLED=true
CONN_TARGETS_API_PROPERTIES_RETRY_STRATEGY_MAX_ATTEMPTS=3
CONN_TARGETS_API_PROPERTIES_RETRY_STRATEGY_INITIAL_INTERVAL=100ms
CONN_TARGETS_API_PROPERTIES_RETRY_STRATEGY_MAX_INTERVAL=10s
CONN_TARGETS_API_PROPERTIES_RETRY_STRATEGY_MULTIPLIER=2.0
CONN_TARGETS_API_PROPERTIES_RETRY_STRATEGY_RANDOM_FACTOR=0.1

# Error threshold
CONN_TARGETS_API_PROPERTIES_SERVER_ERROR_THRESHOLD=500
Error Handling

The resilient HTTP client provides comprehensive error handling:

client := httpConn.GetClient()
resp, err := client.Do(req)

if err != nil {
    // Check for specific error types
    switch {
    case errors.Is(err, httpclient.ErrCircuitOpen):
        // Circuit breaker is open - service is down
        log.Warn("Service temporarily unavailable due to circuit breaker")

    case errors.Is(err, httpclient.ErrMaxRetries):
        // All retry attempts exhausted
        log.Error("Request failed after all retry attempts")

    case errors.Is(err, httpclient.ErrAllRetryAttemptsFailed):
        // All retries failed with transport errors
        log.Error("All retry attempts failed due to transport errors")

    default:
        // Other error (network, timeout, etc.)
        log.Error("Request failed", "error", err)
    }
    return err
}

// Check response status
if resp.StatusCode >= 400 {
    // Handle HTTP error responses
    log.Warn("HTTP error response", "status", resp.StatusCode)
}
Performance Optimization

Optimize HTTP connections for your use case:

// High-throughput configuration
_, err := registry.AddConnection(ctx, "api-high-throughput", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://api.example.com",
    Properties: map[string]any{
        // Disable circuit breaker for high-volume, predictable traffic
        "circuit_breaker": map[string]any{
            "enabled": false,
        },

        // Minimal retries for speed
        "retry_strategy": map[string]any{
            "enabled":          true,
            "max_attempts":     2,  // Only 1 retry
            "initial_interval": 50 * time.Millisecond,
            "multiplier":       1.5,
        },

        // More tolerant error threshold
        "server_error_threshold": 503, // Only 503/504 trigger failures
    },
})

// Fault-tolerant configuration
_, err = registry.AddConnection(ctx, "api-fault-tolerant", &connfx.ConfigTarget{
    Protocol: "http",
    URL:      "https://unreliable-api.example.com",
    Properties: map[string]any{
        // Aggressive circuit breaker
        "circuit_breaker": map[string]any{
            "enabled":                   true,
            "failure_threshold":         3,  // Trip quickly
            "reset_timeout":             10 * time.Second, // Reset quickly
            "half_open_success_needed":  1,  // Only need 1 success
        },

        // Aggressive retries
        "retry_strategy": map[string]any{
            "enabled":          true,
            "max_attempts":     5,  // Many retries
            "initial_interval": 500 * time.Millisecond,
            "max_interval":     60 * time.Second,
            "multiplier":       3.0, // Aggressive backoff
            "random_factor":    0.3, // High jitter
        },

        // Strict error threshold
        "server_error_threshold": 400, // Even 4xx errors trigger circuit breaker
    },
})
Integration with Other Packages

Use resilient HTTP connections with other ajan packages:

// With httpfx for web servers
func MyHandler(ctx *httpfx.Context) httpfx.Result {
    // Get HTTP connection from registry
    conn, err := registry.GetConnection(ctx.Request.Context(), "external-api")
    if err != nil {
        return ctx.Results.InternalServerError("API unavailable")
    }

    httpConn := conn.(*connfx.HTTPConnection)
    req, err := httpConn.NewRequest(ctx.Request.Context(), "GET", "/data", nil)
    if err != nil {
        return ctx.Results.InternalServerError("Failed to create request")
    }

    client := httpConn.GetClient()
    resp, err := client.Do(req)
    if err != nil {
        if errors.Is(err, httpclient.ErrCircuitOpen) {
            return ctx.Results.ServiceUnavailable("External service temporarily unavailable")
        }
        return ctx.Results.InternalServerError("External API request failed")
    }
    defer resp.Body.Close()

    // Process response...
    return ctx.Results.JSON(data)
}

// With metrics for monitoring
metricsBuilder := metricsProvider.NewBuilder()
httpRequests, _ := metricsBuilder.Counter(
    "http_client_requests_total",
    "Total HTTP client requests",
).Build()

httpLatency, _ := metricsBuilder.Histogram(
    "http_client_duration_seconds",
    "HTTP client request duration",
).WithDurationBuckets().Build()

// Monitor HTTP requests
start := time.Now()
resp, err := client.Do(req)
duration := time.Since(start)

httpRequests.Inc(ctx,
    metricsfx.StringAttr("endpoint", req.URL.Path),
    metricsfx.StringAttr("method", req.Method),
    metricsfx.BoolAttr("success", err == nil),
)

httpLatency.RecordDuration(ctx, duration,
    metricsfx.StringAttr("endpoint", req.URL.Path),
)
Best Practices
  1. Circuit Breaker Tuning:

    • Set failure_threshold based on your service's failure tolerance
    • Use longer reset_timeout for external services, shorter for internal services
    • Require multiple successes (half_open_success_needed) before fully trusting recovery
  2. Retry Strategy:

    • Use fewer retries (max_attempts) for user-facing requests to avoid latency
    • Use more retries for background/batch operations
    • Add jitter (random_factor) to prevent thundering herd effects
  3. Error Classification:

    • Set appropriate server_error_threshold (typically 500)
    • Consider 4xx errors as non-retriable for most cases
    • Monitor circuit breaker states and adjust thresholds based on real traffic
  4. Connection Management:

    • Reuse connections through the registry
    • Use different connection names for different service endpoints
    • Monitor connection health regularly
  5. Testing:

    • Test circuit breaker behavior under load
    • Verify retry logic with network failures
    • Monitor latency impact of resilience features

Documentation

Index

Constants

View Source
const (
	DefaultHTTPTimeout = 30 * time.Second
	HealthCheckTimeout = 2 * time.Second

	// Circuit breaker defaults.
	DefaultCircuitBreakerFailureThreshold     = 5
	DefaultCircuitBreakerResetTimeout         = 10 * time.Second
	DefaultCircuitBreakerHalfOpenSuccessCount = 2

	// Retry strategy defaults.
	DefaultRetryMaxAttempts     = 3
	DefaultRetryInitialInterval = 100 * time.Millisecond
	DefaultRetryMaxInterval     = 10 * time.Second
	DefaultRetryMultiplier      = 2.0
	DefaultRetryRandomFactor    = 0.1

	// HTTP error threshold.
	DefaultServerErrorThreshold = 500
)
View Source
const (
	DefaultOTLPTimeout     = 30 * time.Second
	DefaultBatchTimeout    = 5 * time.Second
	DefaultExportInterval  = 30 * time.Second
	DefaultBatchSize       = 512
	DefaultSampleRatio     = 1.0
	HealthCheckRequestPath = "/v1/traces" // Standard OTLP path for health check
	MinimumReadMemInterval = 15 * time.Second
)
View Source
const (
	DefaultPrefetchCount = 10
	DefaultMaxRetries    = 3
	DefaultBlockTimeout  = 5 * time.Second
)

Default values for consumer configuration.

View Source
const DefaultConnection = "default"

Variables

View Source
var (
	ErrAMQPClientNotInitialized = errors.New("AMQP client not initialized")
	ErrFailedToOpenConnection   = errors.New("failed to open AMQP connection")
	ErrFailedToOpenChannel      = errors.New("failed to open AMQP channel")
	ErrFailedToCloseConnection  = errors.New("failed to close AMQP connection")
	ErrFailedToCloseChannel     = errors.New("failed to close AMQP channel")
	ErrFailedToDeclareQueue     = errors.New("failed to declare queue")
	ErrFailedToPublishMessage   = errors.New("failed to publish message")
	ErrFailedToStartConsuming   = errors.New("failed to start consuming")
	ErrChannelClosed            = errors.New("channel closed")
	ErrFailedToReconnect        = errors.New("failed to reconnect")
	ErrDeliveryChannelClosed    = errors.New("delivery channel closed")
	ErrNoChannelAvailable       = errors.New("no channel available")
	ErrFailedToCloseAMQPClient  = errors.New("failed to close AMQP client")
	ErrAMQPOperation            = errors.New("AMQP operation failed")
	ErrAMQPConnectionFailed     = errors.New("failed to connect to AMQP")
	ErrFailedToCreateAMQPClient = errors.New("failed to create AMQP client")
	ErrAMQPUnsupportedOperation = errors.New("operation not supported by AMQP")
	ErrIntegerOverflow          = errors.New("integer overflow in conversion")
)
View Source
var (
	ErrFailedToCreateHTTPClient      = errors.New("failed to create HTTP client")
	ErrFailedToHealthCheckHTTP       = errors.New("failed to health check HTTP endpoint")
	ErrInvalidConfigTypeHTTP         = errors.New("invalid config type for HTTP connection")
	ErrUnsupportedBodyType           = errors.New("unsupported body type")
	ErrFailedToCreateRequest         = errors.New("failed to create HTTP request")
	ErrFailedToLoadCertificate       = errors.New("failed to load client certificate")
	ErrFailedToCreateHealthCheckReq  = errors.New("failed to create health check request")
	ErrFailedToPerformHealthCheckReq = errors.New("failed to perform health check request")
	ErrFailedToCreateGetRequest      = errors.New("failed to create GET request")
	ErrFailedToPerformGetRequest     = errors.New("failed to perform GET request")
	ErrFailedToCreateResilientClient = errors.New("failed to create resilient HTTP client")
)
View Source
var (
	ErrFailedToCreateOTLPLogExporter    = errors.New("failed to create OTLP log exporter")
	ErrFailedToCreateOTLPMetricExporter = errors.New("failed to create OTLP metric exporter")
	ErrFailedToCreateOTLPTraceExporter  = errors.New("failed to create OTLP trace exporter")
	ErrFailedToCreateResource           = errors.New("failed to create resource")
	ErrFailedToShutdownOTLPClient       = errors.New("failed to shutdown OTLP client")
	ErrInvalidConfigTypeOTLP            = errors.New("invalid config type for OTLP connection")
	ErrOTLPEndpointRequired             = errors.New("OTLP endpoint is required")
	ErrOTLPHealthCheckFailed            = errors.New("OTLP health check failed")
	ErrFailedToShutdownLogProvider      = errors.New("failed to shutdown log provider")
	ErrFailedToShutdownMeterProvider    = errors.New("failed to shutdown meter provider")
	ErrFailedToShutdownTracerProvider   = errors.New("failed to shutdown tracer provider")
	ErrFailedToShutdownLogExporter      = errors.New("failed to shutdown log exporter")
	ErrFailedToShutdownMetricExporter   = errors.New("failed to shutdown metric exporter")
	ErrFailedToShutdownTraceExporter    = errors.New("failed to shutdown trace exporter")
	ErrFailedToCreateTestExporter       = errors.New("failed to create test exporter")
	ErrFailedToMergeResources           = errors.New("failed to merge resources")
)
View Source
var (
	ErrRedisClientNotInitialized   = errors.New("redis client not initialized")
	ErrFailedToCloseRedisClient    = errors.New("failed to close Redis client")
	ErrRedisOperation              = errors.New("redis operation failed")
	ErrRedisConnectionFailed       = errors.New("failed to connect to Redis")
	ErrRedisUnexpectedPingResponse = errors.New("unexpected ping response")
	ErrRedisPoolTimeouts           = errors.New("redis connection pool has timeouts")
	ErrFailedToCreateRedisClient   = errors.New("failed to create Redis client")
)
View Source
var (
	ErrFailedToOpenSQLConnection = errors.New("failed to open SQL connection")
	ErrFailedToPingSQL           = errors.New("failed to ping SQL database")
	ErrInvalidConfigTypeSQL      = errors.New("invalid config type for SQL connection")
	ErrUnsupportedSQLProtocol    = errors.New("unsupported SQL protocol")
	ErrFailedToCloseSQLDB        = errors.New("failed to close SQL database")
	ErrSQLConnectionNil          = errors.New("SQL connection is nil")
)
View Source
var (
	ErrInvalidConnectionBehavior = errors.New("invalid connection behavior")
	ErrInvalidConnectionProtocol = errors.New("invalid connection protocol")
	ErrInvalidDSN                = errors.New("invalid DSN")
	ErrInvalidURL                = errors.New("invalid URL")
	ErrInvalidConfigType         = errors.New("invalid config type")
)
View Source
var (
	ErrConnectionIsNil    = errors.New("connection is nil")
	ErrRawConnectionIsNil = errors.New("raw connection is nil")
	ErrInvalidType        = errors.New("invalid type")
)

Sentinel errors for GetTypedConnection function.

View Source
var (
	ErrConnectionNotFound       = errors.New("connection not found")
	ErrConnectionAlreadyExists  = errors.New("connection already exists")
	ErrFailedToCreateConnection = errors.New("failed to create connection")
	ErrUnsupportedProtocol      = errors.New("unsupported protocol")
	ErrFailedToCloseConnections = errors.New("failed to close connections")
	ErrFailedToAddConnection    = errors.New("failed to add connection")
	ErrConnectionNotSupported   = errors.New("connection does not support required operations")
	ErrInterfaceNotImplemented  = errors.New("connection does not implement required interface")
)

Functions

func GetTypedConnection

func GetTypedConnection[T any](conn Connection) (T, error)

GetTypedConnection extracts a typed connection from a Connection interface. This provides type-safe access to the underlying connection without manual type assertions.

Example usage:

conn, err := connfx.GetConnection("database")
if err != nil { return err }

db, err := connfx.GetTypedConnection[*sql.DB](conn)
if err != nil { return err }

// Now db is *sql.DB and can be used safely
rows, err := db.Query("SELECT * FROM users")

Types

type AMQPAdapter

type AMQPAdapter struct {
	// contains filtered or unexported fields
}

AMQPAdapter implements the QueueRepository interface for AMQP-based message queues.

func (*AMQPAdapter) AckMessage added in v0.7.1

func (aa *AMQPAdapter) AckMessage(
	ctx context.Context,
	queueName, consumerGroup, receiptHandle string,
) error

func (*AMQPAdapter) ClaimPendingMessages added in v0.7.1

func (aa *AMQPAdapter) ClaimPendingMessages(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	minIdleTime time.Duration,
	count int,
) ([]Message, error)

func (*AMQPAdapter) Consume

func (aa *AMQPAdapter) Consume(
	ctx context.Context,
	queueName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*AMQPAdapter) ConsumeWithGroup added in v0.7.1

func (aa *AMQPAdapter) ConsumeWithGroup(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*AMQPAdapter) DeleteMessage added in v0.7.1

func (aa *AMQPAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error

func (*AMQPAdapter) Publish

func (aa *AMQPAdapter) Publish(ctx context.Context, queueName string, body []byte) error

func (*AMQPAdapter) PublishWithHeaders added in v0.7.1

func (aa *AMQPAdapter) PublishWithHeaders(
	ctx context.Context,
	queueName string,
	body []byte,
	headers map[string]any,
) error

func (*AMQPAdapter) QueueDeclare

func (aa *AMQPAdapter) QueueDeclare(ctx context.Context, name string) (string, error)

QueueRepository interface implementation.

func (*AMQPAdapter) QueueDeclareWithConfig added in v0.7.1

func (aa *AMQPAdapter) QueueDeclareWithConfig(
	ctx context.Context,
	name string,
	config QueueConfig,
) (string, error)

type AMQPConfig added in v0.7.1

type AMQPConfig struct {
	URL string
}

AMQPConfig holds AMQP-specific configuration options.

func NewDefaultAMQPConfig added in v0.7.1

func NewDefaultAMQPConfig() *AMQPConfig

NewDefaultAMQPConfig creates an AMQP configuration with sensible defaults.

type AMQPConnection

type AMQPConnection struct {
	// contains filtered or unexported fields
}

AMQPConnection implements the connfx.Connection interface for AMQP connections.

func NewAMQPConnection

func NewAMQPConnection(protocol string, config *AMQPConfig) *AMQPConnection

NewAMQPConnection creates a new AMQP connection.

func (*AMQPConnection) Close

func (ac *AMQPConnection) Close(ctx context.Context) error

func (*AMQPConnection) GetBehaviors

func (ac *AMQPConnection) GetBehaviors() []ConnectionBehavior

Connection interface implementation.

func (*AMQPConnection) GetCapabilities

func (ac *AMQPConnection) GetCapabilities() []ConnectionCapability

func (*AMQPConnection) GetProtocol

func (ac *AMQPConnection) GetProtocol() string

func (*AMQPConnection) GetRawConnection

func (ac *AMQPConnection) GetRawConnection() any

func (*AMQPConnection) GetState

func (ac *AMQPConnection) GetState() ConnectionState

func (*AMQPConnection) HealthCheck

func (ac *AMQPConnection) HealthCheck(ctx context.Context) *HealthStatus

type AMQPConnectionFactory

type AMQPConnectionFactory struct {
	// contains filtered or unexported fields
}

AMQPConnectionFactory creates AMQP connections.

func NewAMQPConnectionFactory

func NewAMQPConnectionFactory(protocol string) *AMQPConnectionFactory

NewAMQPConnectionFactory creates a new AMQP connection factory for a specific protocol.

func (*AMQPConnectionFactory) CreateConnection

func (f *AMQPConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*AMQPConnectionFactory) GetProtocol

func (f *AMQPConnectionFactory) GetProtocol() string

type CacheRepository

type CacheRepository interface {
	Repository

	// SetWithExpiration stores a value with the given key and expiration time
	SetWithExpiration(ctx context.Context, key string, value []byte, expiration time.Duration) error

	// GetTTL returns the time-to-live for a key
	GetTTL(ctx context.Context, key string) (time.Duration, error)

	// Expire sets an expiration time for an existing key
	Expire(ctx context.Context, key string, expiration time.Duration) error
}

CacheRepository extends Repository with cache-specific operations.

type Config

type Config struct {
	Targets map[string]ConfigTarget `conf:"targets"`
}

Config represents the main configuration for connfx.

type ConfigTarget

type ConfigTarget struct {
	Properties map[string]any `conf:"properties"`

	Protocol string `conf:"protocol"` // e.g., "postgres", "redis", "http"
	DSN      string `conf:"dsn"`
	URL      string `conf:"url"`
	Host     string `conf:"host"`
	CertFile string `conf:"cert_file"`
	KeyFile  string `conf:"key_file"`
	CAFile   string `conf:"ca_file"`

	// External credential management
	Port    int           `conf:"port"`
	Timeout time.Duration `conf:"timeout"`

	// Authentication and security
	TLS           bool `conf:"tls"`
	TLSSkipVerify bool `conf:"tls_skip_verify"`
}

ConfigTarget represents the configuration data for a connection.

type Connection

type Connection interface {
	// GetBehaviors returns the connection behaviors this connection supports
	GetBehaviors() []ConnectionBehavior

	// GetCapabilities returns the connection capabilities this connection supports
	GetCapabilities() []ConnectionCapability

	// GetProtocol returns the protocol/technology used (e.g., "postgres", "redis", "http")
	GetProtocol() string

	// GetState returns the current connection state
	GetState() ConnectionState

	// HealthCheck performs a health check and returns the status
	HealthCheck(ctx context.Context) *HealthStatus

	// Close closes the connection
	Close(ctx context.Context) error

	// GetRawConnection returns the underlying connection object
	GetRawConnection() any
}

Connection represents a generic connection interface.

type ConnectionBehavior

type ConnectionBehavior string

ConnectionBehavior represents the behavioral type of connection.

const (
	// ConnectionBehaviorStateful represents persistent connections that maintain state
	// Examples: database connections, persistent TCP connections, connection pools.
	ConnectionBehaviorStateful ConnectionBehavior = "stateful"

	// ConnectionBehaviorStateless represents connections that don't maintain state
	// Examples: HTTP clients, REST APIs, stateless services.
	ConnectionBehaviorStateless ConnectionBehavior = "stateless"

	// ConnectionBehaviorStreaming represents streaming/real-time connections
	// Examples: message queues, event streams, websockets, gRPC streams.
	ConnectionBehaviorStreaming ConnectionBehavior = "streaming"
)

type ConnectionCapability

type ConnectionCapability string
const (
	// ConnectionCapabilityObservability represents general observability behavior.
	ConnectionCapabilityObservability ConnectionCapability = "observability"

	// ConnectionCapabilityLogging represents logging behavior.
	ConnectionCapabilityLogging ConnectionCapability = "logging"

	// ConnectionCapabilityMetrics represents metrics behavior.
	ConnectionCapabilityMetrics ConnectionCapability = "metrics"

	// ConnectionCapabilityTracing represents tracing behavior.
	ConnectionCapabilityTracing ConnectionCapability = "tracing"
)

Add missing connection capabilities for observability.

const (
	// ConnectionCapabilityKeyValue represents key-value storage behavior.
	ConnectionCapabilityKeyValue ConnectionCapability = "key-value"

	// ConnectionCapabilityDocument represents document storage behavior.
	ConnectionCapabilityDocument ConnectionCapability = "document"

	// ConnectionCapabilityRelational represents relational database behavior.
	ConnectionCapabilityRelational ConnectionCapability = "relational"

	// ConnectionCapabilityTransactional represents transactional behavior.
	ConnectionCapabilityTransactional ConnectionCapability = "transactional"

	// ConnectionCapabilityCache represents caching behavior with expiration support.
	ConnectionCapabilityCache ConnectionCapability = "cache"

	// ConnectionCapabilityQueue represents message queue behavior.
	ConnectionCapabilityQueue ConnectionCapability = "queue"
)

type ConnectionFactory

type ConnectionFactory interface {
	// CreateConnection creates a new connection from configuration
	CreateConnection(ctx context.Context, config *ConfigTarget) (Connection, error)

	// GetProtocol returns the protocol this factory supports (e.g., "postgres", "redis")
	GetProtocol() string
}

ConnectionFactory creates connections from configuration.

type ConnectionState

type ConnectionState int32

ConnectionState represents the current state of a connection.

const (
	ConnectionStateNotInitialized ConnectionState = 0
	ConnectionStateConnected      ConnectionState = 1
	ConnectionStateLive           ConnectionState = 2
	ConnectionStateReady          ConnectionState = 3
	ConnectionStateDisconnected   ConnectionState = 4
	ConnectionStateError          ConnectionState = 5
	ConnectionStateReconnecting   ConnectionState = 6
)

func (ConnectionState) String

func (i ConnectionState) String() string

type ConsumerConfig

type ConsumerConfig struct {
	// Args additional arguments for queue declaration
	Args map[string]any
	// AutoAck when true, the server will automatically acknowledge messages
	AutoAck bool
	// Exclusive when true, only this consumer can access the queue
	Exclusive bool
	// NoLocal when true, the server will not send messages to the connection that published them
	NoLocal bool
	// NoWait when true, the server will not respond to the declare
	NoWait bool
	// PrefetchCount sets how many messages to prefetch
	PrefetchCount int
	// BlockTimeout sets how long to wait for messages
	BlockTimeout time.Duration
	// MaxRetries sets maximum number of retries for failed messages
	MaxRetries int
	// RetryDelay sets delay between retries
	RetryDelay time.Duration
}

ConsumerConfig holds configuration for message consumption.

func DefaultConsumerConfig

func DefaultConsumerConfig() ConsumerConfig

DefaultConsumerConfig returns a default configuration for consuming messages.

type ConsumerGroupInfo added in v0.7.1

type ConsumerGroupInfo struct {
	Name            string `json:"name"`
	LastDeliveredID string `json:"last_delivered_id"`
	Consumers       int64  `json:"consumers"`
	Pending         int64  `json:"pending"`
	EntriesRead     int64  `json:"entries_read"`
	Lag             int64  `json:"lag"`
}

ConsumerGroupInfo provides information about a consumer group.

type ExecuteResult

type ExecuteResult interface {
	// RowsAffected returns the number of rows affected
	RowsAffected() (int64, error)

	// LastInsertId returns the last insert ID (if applicable)
	LastInsertId() (int64, error)
}

ExecuteResult represents execution results.

type HTTPConnection

type HTTPConnection struct {
	// contains filtered or unexported fields
}

HTTPConnection represents an HTTP API connection with resilience features.

func (*HTTPConnection) Close

func (c *HTTPConnection) Close(ctx context.Context) error

func (*HTTPConnection) GetBaseURL

func (c *HTTPConnection) GetBaseURL() string

GetBaseURL returns the base URL for this connection.

func (*HTTPConnection) GetBehaviors

func (c *HTTPConnection) GetBehaviors() []ConnectionBehavior

func (*HTTPConnection) GetCapabilities

func (c *HTTPConnection) GetCapabilities() []ConnectionCapability

func (*HTTPConnection) GetCircuitBreakerState added in v0.7.4

func (c *HTTPConnection) GetCircuitBreakerState() string

GetCircuitBreakerState returns the current state of the circuit breaker.

func (*HTTPConnection) GetClient

func (c *HTTPConnection) GetClient() *httpclient.Client

GetClient returns the underlying resilient HTTP client.

func (*HTTPConnection) GetHeaders

func (c *HTTPConnection) GetHeaders() map[string]string

GetHeaders returns the default headers for this connection.

func (*HTTPConnection) GetProtocol

func (c *HTTPConnection) GetProtocol() string

func (*HTTPConnection) GetRawConnection

func (c *HTTPConnection) GetRawConnection() any

func (*HTTPConnection) GetStandardClient added in v0.7.4

func (c *HTTPConnection) GetStandardClient() *http.Client

GetStandardClient returns the standard HTTP client from the resilient client.

func (*HTTPConnection) GetState

func (c *HTTPConnection) GetState() ConnectionState

func (*HTTPConnection) HealthCheck

func (c *HTTPConnection) HealthCheck(
	ctx context.Context,
) *HealthStatus

func (*HTTPConnection) NewRequest

func (c *HTTPConnection) NewRequest(
	ctx context.Context,
	method string,
	path string,
	body any,
) (*http.Request, error)

NewRequest creates a new HTTP request with the connection's default headers.

type HTTPConnectionFactory

type HTTPConnectionFactory struct {
	// contains filtered or unexported fields
}

HTTPConnectionFactory creates HTTP connections.

func NewHTTPConnectionFactory

func NewHTTPConnectionFactory(protocol string) *HTTPConnectionFactory

NewHTTPConnectionFactory creates a new HTTP connection factory.

func (*HTTPConnectionFactory) CreateConnection

func (f *HTTPConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*HTTPConnectionFactory) GetProtocol

func (f *HTTPConnectionFactory) GetProtocol() string

type HealthStatus

type HealthStatus struct {
	Timestamp time.Time       `json:"timestamp"`
	Error     error           `json:"error,omitempty"`
	Message   string          `json:"message,omitempty"`
	Latency   time.Duration   `json:"latency,omitempty"`
	State     ConnectionState `json:"state"`
}

HealthStatus represents the health check result.

type Message

type Message struct {
	// Timestamp when the message was created
	Timestamp time.Time
	// Headers contains message headers
	Headers map[string]any

	// ReceiptHandle is a unique identifier for the message (for acknowledgment)
	ReceiptHandle string
	// MessageID is the message identifier
	MessageID string
	// ConsumerGroup indicates which consumer group this message belongs to (if applicable)
	ConsumerGroup string
	// StreamName indicates which stream this message came from (for stream-based systems)
	StreamName string
	// Body contains the message payload
	Body []byte
	// DeliveryCount indicates how many times this message has been delivered
	DeliveryCount int
	// contains filtered or unexported fields
}

Message represents a consumed message with its metadata and acknowledgment functions.

func (*Message) Ack

func (m *Message) Ack() error

Ack acknowledges the message.

func (*Message) Nack

func (m *Message) Nack(requeue bool) error

Nack negatively acknowledges the message.

func (*Message) SetAckFunc

func (m *Message) SetAckFunc(ackFunc func() error)

SetAckFunc sets the acknowledgment function.

func (*Message) SetNackFunc

func (m *Message) SetNackFunc(nackFunc func(requeue bool) error)

SetNackFunc sets the negative acknowledgment function.

type OTLPConnection added in v0.7.4

type OTLPConnection struct {
	// contains filtered or unexported fields
}

OTLPConnection represents an OpenTelemetry Protocol connection.

func (*OTLPConnection) Close added in v0.7.4

func (c *OTLPConnection) Close(ctx context.Context) error

func (*OTLPConnection) GetBehaviors added in v0.7.4

func (c *OTLPConnection) GetBehaviors() []ConnectionBehavior

func (*OTLPConnection) GetCapabilities added in v0.7.4

func (c *OTLPConnection) GetCapabilities() []ConnectionCapability

func (*OTLPConnection) GetEndpoint added in v0.7.4

func (c *OTLPConnection) GetEndpoint() string

GetEndpoint returns the OTLP endpoint.

func (*OTLPConnection) GetLogExporter added in v0.7.4

func (c *OTLPConnection) GetLogExporter() *otlploghttp.Exporter

GetLogExporter returns the OTLP log exporter.

func (*OTLPConnection) GetLoggerProvider added in v0.7.4

func (c *OTLPConnection) GetLoggerProvider() *sdklog.LoggerProvider

GetLoggerProvider returns the OpenTelemetry log provider.

func (*OTLPConnection) GetMeterProvider added in v0.7.4

func (c *OTLPConnection) GetMeterProvider() *sdkmetric.MeterProvider

GetMeterProvider returns the OpenTelemetry meter provider.

func (*OTLPConnection) GetMetricExporter added in v0.7.4

func (c *OTLPConnection) GetMetricExporter() *otlpmetrichttp.Exporter

GetMetricExporter returns the OTLP metric exporter.

func (*OTLPConnection) GetProtocol added in v0.7.4

func (c *OTLPConnection) GetProtocol() string

func (*OTLPConnection) GetRawConnection added in v0.7.4

func (c *OTLPConnection) GetRawConnection() any

func (*OTLPConnection) GetResource added in v0.7.4

func (c *OTLPConnection) GetResource() *resource.Resource

GetResource returns the resource used for telemetry attribution.

func (*OTLPConnection) GetState added in v0.7.4

func (c *OTLPConnection) GetState() ConnectionState

func (*OTLPConnection) GetTraceExporter added in v0.7.4

func (c *OTLPConnection) GetTraceExporter() *otlptrace.Exporter

GetTraceExporter returns the OTLP trace exporter.

func (*OTLPConnection) GetTracerProvider added in v0.7.4

func (c *OTLPConnection) GetTracerProvider() *sdktrace.TracerProvider

GetTracerProvider returns the OpenTelemetry tracer provider.

func (*OTLPConnection) HealthCheck added in v0.7.4

func (c *OTLPConnection) HealthCheck(ctx context.Context) *HealthStatus

func (*OTLPConnection) IsInsecure added in v0.7.4

func (c *OTLPConnection) IsInsecure() bool

IsInsecure returns whether the connection uses insecure transport.

type OTLPConnectionFactory added in v0.7.4

type OTLPConnectionFactory struct {
	// contains filtered or unexported fields
}

OTLPConnectionFactory creates OTLP connections.

func NewOTLPConnectionFactory added in v0.7.4

func NewOTLPConnectionFactory(protocol string) *OTLPConnectionFactory

NewOTLPConnectionFactory creates a new OTLP connection factory.

func (*OTLPConnectionFactory) CreateConnection added in v0.7.4

func (f *OTLPConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*OTLPConnectionFactory) GetProtocol added in v0.7.4

func (f *OTLPConnectionFactory) GetProtocol() string

type QueryRepository

type QueryRepository interface {
	// Query executes a query and returns raw results
	Query(ctx context.Context, query string, args ...any) (QueryResult, error)

	// Execute runs a command (INSERT, UPDATE, DELETE)
	Execute(ctx context.Context, command string, args ...any) (ExecuteResult, error)
}

QueryRepository defines the port for query operations (for SQL-like storages).

type QueryResult

type QueryResult interface {
	// Next advances to the next row
	Next() bool

	// Scan scans the current row into destinations
	Scan(dest ...any) error

	// Close closes the result set
	Close() error
}

QueryResult represents query results.

type QueueConfig added in v0.7.1

type QueueConfig struct {
	// Args contains additional queue-specific arguments
	Args map[string]any
	// MaxLength sets maximum number of messages in queue (0 = unlimited)
	MaxLength int64
	// MessageTTL sets default TTL for messages
	MessageTTL time.Duration
	// Durable indicates if the queue should survive server restarts
	Durable bool
	// AutoDelete indicates if the queue should be deleted when no longer in use
	AutoDelete bool
	// Exclusive indicates if the queue is exclusive to one connection
	Exclusive bool
}

QueueConfig holds configuration for queue declaration.

func DefaultQueueConfig added in v0.7.1

func DefaultQueueConfig() QueueConfig

DefaultQueueConfig returns a default configuration for queue declaration.

type QueueRepository

type QueueRepository interface {
	// QueueDeclare declares a queue and returns its name
	QueueDeclare(ctx context.Context, name string) (string, error)

	// QueueDeclareWithConfig declares a queue with specific configuration
	QueueDeclareWithConfig(ctx context.Context, name string, config QueueConfig) (string, error)

	// Publish sends a message to a queue
	Publish(ctx context.Context, queueName string, body []byte) error

	// PublishWithHeaders sends a message with custom headers
	PublishWithHeaders(
		ctx context.Context,
		queueName string,
		body []byte,
		headers map[string]any,
	) error

	// Consume starts consuming messages from a queue
	Consume(
		ctx context.Context,
		queueName string,
		config ConsumerConfig,
	) (<-chan Message, <-chan error)

	// ConsumeWithGroup starts consuming messages as part of a consumer group
	ConsumeWithGroup(
		ctx context.Context,
		queueName string,
		consumerGroup string,
		consumerName string,
		config ConsumerConfig,
	) (<-chan Message, <-chan error)

	// ClaimPendingMessages claims pending messages from a consumer group
	ClaimPendingMessages(
		ctx context.Context,
		queueName string,
		consumerGroup string,
		consumerName string,
		minIdleTime time.Duration,
		count int,
	) ([]Message, error)

	// AckMessage acknowledges a specific message by receipt handle
	AckMessage(ctx context.Context, queueName, consumerGroup, receiptHandle string) error

	// DeleteMessage removes a message from the queue (for non-streaming queues)
	DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
}

QueueRepository defines the port for message queue operations.

type QueueStreamRepository added in v0.7.1

type QueueStreamRepository interface {
	QueueRepository

	// CreateConsumerGroup creates a consumer group for a stream
	CreateConsumerGroup(ctx context.Context, streamName, consumerGroup, startID string) error

	// StreamInfo returns information about a stream
	StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)

	// ConsumerGroupInfo returns information about consumer groups
	ConsumerGroupInfo(ctx context.Context, streamName string) ([]ConsumerGroupInfo, error)

	// TrimStream trims a stream to a maximum length
	TrimStream(ctx context.Context, streamName string, maxLen int64) error
}

QueueStreamRepository defines operations for stream-based message systems (Redis Streams, Kafka, etc.)

type RedisAdapter

type RedisAdapter struct {
	// contains filtered or unexported fields
}

RedisAdapter implements Redis operations and wraps the Redis client.

func (*RedisAdapter) AckMessage added in v0.7.1

func (ra *RedisAdapter) AckMessage(
	ctx context.Context,
	queueName, consumerGroup, receiptHandle string,
) error

func (*RedisAdapter) ClaimPendingMessages added in v0.7.1

func (ra *RedisAdapter) ClaimPendingMessages(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	minIdleTime time.Duration,
	count int,
) ([]Message, error)

ClaimPendingMessages claims pending messages from a consumer group.

func (*RedisAdapter) Consume added in v0.7.1

func (ra *RedisAdapter) Consume(
	ctx context.Context,
	queueName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*RedisAdapter) ConsumeWithGroup added in v0.7.1

func (ra *RedisAdapter) ConsumeWithGroup(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*RedisAdapter) ConsumerGroupInfo added in v0.7.1

func (ra *RedisAdapter) ConsumerGroupInfo(
	ctx context.Context,
	streamName string,
) ([]ConsumerGroupInfo, error)

func (*RedisAdapter) CreateConsumerGroup added in v0.7.1

func (ra *RedisAdapter) CreateConsumerGroup(
	ctx context.Context,
	streamName, consumerGroup, startID string,
) error

StreamRepository interface implementation.

func (*RedisAdapter) DeleteMessage added in v0.7.1

func (ra *RedisAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error

func (*RedisAdapter) Exists

func (ra *RedisAdapter) Exists(ctx context.Context, key string) (bool, error)

func (*RedisAdapter) Expire

func (ra *RedisAdapter) Expire(ctx context.Context, key string, expiration time.Duration) error

func (*RedisAdapter) Get

func (ra *RedisAdapter) Get(ctx context.Context, key string) ([]byte, error)

StoreRepository interface implementation.

func (*RedisAdapter) GetTTL

func (ra *RedisAdapter) GetTTL(ctx context.Context, key string) (time.Duration, error)

func (*RedisAdapter) Publish added in v0.7.1

func (ra *RedisAdapter) Publish(ctx context.Context, queueName string, body []byte) error

func (*RedisAdapter) PublishWithHeaders added in v0.7.1

func (ra *RedisAdapter) PublishWithHeaders(
	ctx context.Context,
	queueName string,
	body []byte,
	headers map[string]any,
) error

func (*RedisAdapter) QueueDeclare added in v0.7.1

func (ra *RedisAdapter) QueueDeclare(ctx context.Context, name string) (string, error)

QueueRepository interface implementation for Redis Streams.

func (*RedisAdapter) QueueDeclareWithConfig added in v0.7.1

func (ra *RedisAdapter) QueueDeclareWithConfig(
	ctx context.Context,
	name string,
	config QueueConfig,
) (string, error)

func (*RedisAdapter) Remove

func (ra *RedisAdapter) Remove(ctx context.Context, key string) error

func (*RedisAdapter) Set

func (ra *RedisAdapter) Set(ctx context.Context, key string, value []byte) error

func (*RedisAdapter) SetWithExpiration

func (ra *RedisAdapter) SetWithExpiration(
	ctx context.Context,
	key string,
	value []byte,
	expiration time.Duration,
) error

CacheRepository interface implementation.

func (*RedisAdapter) StreamInfo added in v0.7.1

func (ra *RedisAdapter) StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)

func (*RedisAdapter) TrimStream added in v0.7.1

func (ra *RedisAdapter) TrimStream(ctx context.Context, streamName string, maxLen int64) error

func (*RedisAdapter) Update

func (ra *RedisAdapter) Update(ctx context.Context, key string, value []byte) error

type RedisConfig added in v0.7.1

type RedisConfig struct {
	Address               string
	Password              string
	DB                    int
	PoolSize              int
	MinIdleConns          int
	MaxIdleConns          int
	ConnMaxIdleTime       time.Duration
	PoolTimeout           time.Duration
	MaxRetries            int
	MinRetryBackoff       time.Duration
	MaxRetryBackoff       time.Duration
	TLSEnabled            bool
	TLSInsecureSkipVerify bool
}

RedisConfig holds Redis-specific configuration options.

func NewDefaultRedisConfig added in v0.7.1

func NewDefaultRedisConfig() *RedisConfig

NewDefaultRedisConfig creates a Redis configuration with sensible defaults.

type RedisConnection

type RedisConnection struct {
	// contains filtered or unexported fields
}

RedisConnection implements the connfx.Connection interface.

func NewRedisConnection

func NewRedisConnection(protocol string, config *RedisConfig) *RedisConnection

NewRedisConnection creates a new Redis connection with enhanced configuration.

func (*RedisConnection) Close

func (rc *RedisConnection) Close(ctx context.Context) error

func (*RedisConnection) GetBehaviors

func (rc *RedisConnection) GetBehaviors() []ConnectionBehavior

Connection interface implementation.

func (*RedisConnection) GetCapabilities

func (rc *RedisConnection) GetCapabilities() []ConnectionCapability

func (*RedisConnection) GetClient added in v0.7.1

func (rc *RedisConnection) GetClient() *redis.Client

GetClient returns the underlying Redis client for advanced operations.

func (*RedisConnection) GetProtocol

func (rc *RedisConnection) GetProtocol() string

func (*RedisConnection) GetRawConnection

func (rc *RedisConnection) GetRawConnection() any

func (*RedisConnection) GetState

func (rc *RedisConnection) GetState() ConnectionState

func (*RedisConnection) GetStats added in v0.7.1

func (rc *RedisConnection) GetStats() map[string]any

GetStats returns detailed connection and pool statistics.

func (*RedisConnection) HealthCheck

func (rc *RedisConnection) HealthCheck(ctx context.Context) *HealthStatus

type RedisConnectionFactory

type RedisConnectionFactory struct {
	// contains filtered or unexported fields
}

RedisConnectionFactory creates Redis connections with enhanced configuration.

func NewRedisConnectionFactory

func NewRedisConnectionFactory(protocol string) *RedisConnectionFactory

NewRedisConnectionFactory creates a new Redis connection factory for a specific protocol.

func (*RedisConnectionFactory) CreateConnection

func (f *RedisConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*RedisConnectionFactory) GetProtocol

func (f *RedisConnectionFactory) GetProtocol() string

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages all connections in the system.

func NewRegistry

func NewRegistry(logger *logfx.Logger) *Registry

NewRegistry creates a new connection registry.

func NewRegistryWithDefaults

func NewRegistryWithDefaults(logger *logfx.Logger) *Registry

func (*Registry) AddConnection

func (registry *Registry) AddConnection(
	ctx context.Context,
	name string,
	config *ConfigTarget,
) (Connection, error)

AddConnection adds a new connection to the registry.

func (*Registry) Close

func (registry *Registry) Close(ctx context.Context) error

Close closes all connections in the registry.

func (*Registry) GetByBehavior

func (registry *Registry) GetByBehavior(behavior ConnectionBehavior) []Connection

GetByBehavior returns all connections of a specific behavior.

func (*Registry) GetByCapability

func (registry *Registry) GetByCapability(capability ConnectionCapability) []Connection

GetByCapability returns all connections of a specific capability.

func (*Registry) GetByProtocol

func (registry *Registry) GetByProtocol(protocol string) []Connection

GetByProtocol returns all connections of a specific protocol.

func (*Registry) GetDefault

func (registry *Registry) GetDefault() Connection

GetDefault returns the default connection.

func (*Registry) GetNamed

func (registry *Registry) GetNamed(name string) Connection

GetNamed returns a named connection.

func (*Registry) GetRepository

func (registry *Registry) GetRepository(name string) (Repository, error)

GetRepository returns a Repository from a connection if it supports it.

func (*Registry) HealthCheck

func (registry *Registry) HealthCheck(ctx context.Context) map[string]*HealthStatus

HealthCheck performs health checks on all connections.

func (*Registry) ListConnections

func (registry *Registry) ListConnections() []string

ListConnections returns all connection names.

func (*Registry) ListRegisteredProtocols

func (registry *Registry) ListRegisteredProtocols() []string

ListRegisteredProtocols returns all registered protocols.

func (*Registry) LoadFromConfig

func (registry *Registry) LoadFromConfig(ctx context.Context, config *Config) error

func (*Registry) RegisterFactory

func (registry *Registry) RegisterFactory(factory ConnectionFactory)

RegisterFactory registers a connection factory for a specific protocol.

func (*Registry) RemoveConnection

func (registry *Registry) RemoveConnection(ctx context.Context, name string) error

RemoveConnection removes a connection from the registry.

type Repository

type Repository interface {
	// Get retrieves a value by key
	Get(ctx context.Context, key string) ([]byte, error)

	// Set stores a value with the given key
	Set(ctx context.Context, key string, value []byte) error

	// Remove deletes a value by key
	Remove(ctx context.Context, key string) error

	// Update updates an existing value by key
	Update(ctx context.Context, key string, value []byte) error

	// Exists checks if a key exists
	Exists(ctx context.Context, key string) (bool, error)
}

Repository defines the port for data access operations. This interface will be implemented by adapters in connfx for different storage technologies.

type SQLConnection

type SQLConnection struct {
	// contains filtered or unexported fields
}

SQLConnection represents a SQL database connection.

func (*SQLConnection) Close

func (c *SQLConnection) Close(ctx context.Context) error

func (*SQLConnection) GetBehaviors

func (c *SQLConnection) GetBehaviors() []ConnectionBehavior

func (*SQLConnection) GetCapabilities

func (c *SQLConnection) GetCapabilities() []ConnectionCapability

func (*SQLConnection) GetDB

func (c *SQLConnection) GetDB() *sql.DB

GetDB returns the underlying *sql.DB instance.

func (*SQLConnection) GetProtocol

func (c *SQLConnection) GetProtocol() string

func (*SQLConnection) GetRawConnection

func (c *SQLConnection) GetRawConnection() any

func (*SQLConnection) GetState

func (c *SQLConnection) GetState() ConnectionState

func (*SQLConnection) HealthCheck

func (c *SQLConnection) HealthCheck(ctx context.Context) *HealthStatus

func (*SQLConnection) Stats

func (c *SQLConnection) Stats() sql.DBStats

Stats returns database connection statistics.

type SQLConnectionFactory

type SQLConnectionFactory struct {
	// contains filtered or unexported fields
}

SQLConnectionFactory creates SQL connections.

func NewSQLConnectionFactory

func NewSQLConnectionFactory(protocol string) *SQLConnectionFactory

NewSQLConnectionFactory creates a new SQL connection factory for a specific protocol.

func (*SQLConnectionFactory) CreateConnection

func (f *SQLConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*SQLConnectionFactory) GetProtocol

func (f *SQLConnectionFactory) GetProtocol() string

type StreamEntry added in v0.7.1

type StreamEntry struct {
	Fields map[string]string `json:"fields"`
	ID     string            `json:"id"`
}

StreamEntry represents a single stream entry.

type StreamInfo added in v0.7.1

type StreamInfo struct {
	FirstEntry      *StreamEntry      `json:"first_entry,omitempty"`
	LastEntry       *StreamEntry      `json:"last_entry,omitempty"`
	Metadata        map[string]string `json:"metadata,omitempty"`
	LastGeneratedID string            `json:"last_generated_id"`
	MaxDeletedID    string            `json:"max_deleted_id"`
	RecordedFirstID string            `json:"recorded_first_id"`
	Length          int64             `json:"length"`
	RadixTreeKeys   int64             `json:"radix_tree_keys"`
	RadixTreeNodes  int64             `json:"radix_tree_nodes"`
	Groups          int64             `json:"groups"`
	EntriesAdded    int64             `json:"entries_added"`
}

StreamInfo provides information about a stream.

type TransactionContext

type TransactionContext interface {
	// Commit commits the transaction
	Commit() error

	// Rollback rolls back the transaction
	Rollback() error

	// GetRepository returns a repository bound to this transaction
	GetRepository() Repository
}

TransactionContext represents a transaction context for data operations.

type TransactionalRepository

type TransactionalRepository interface {
	Repository

	// BeginTransaction starts a new transaction
	BeginTransaction(ctx context.Context) (TransactionContext, error)
}

TransactionalRepository extends Repository with transaction support.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL