connfx

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

README

ajan/connfx

Overview

connfx provides centralized connection management based on connection behaviors rather than specific technologies. It allows adapters to register themselves and define their own supported behaviors, making the system extensible and non-opinionated about specific connection types.

Features

  • Behavior-Based Design: Connections are categorized by behavior (stateful, stateless, streaming)
  • Provider-Defined Behaviors: Adapters determine their own supported behaviors (no user configuration needed)
  • Multiple Behaviors per Provider: Providers like Redis can support multiple behaviors simultaneously
  • Self-Registering Adapters: Adapters register themselves with the connection registry
  • Extensible Architecture: Easy to add new connection types without modifying core code
  • Health Checks: Built-in health monitoring for all connections
  • Context Support: Full context.Context support for cancellation and timeouts
  • Shared Connections: Multiple modules can share the same connection instance
  • Configuration-driven: Load connections from configuration files
  • Thread Safety: All operations are thread-safe

Connection Behaviors

  • Stateful: Persistent connections that maintain state (databases, connection pools)
  • Stateless: Connections that don't maintain state (HTTP APIs, REST services)
  • Streaming: Real-time/streaming connections (message queues, event streams, websockets)

Quick Start

Basic Usage with Registry
package main

import (
    "context"
    "log"
    "os"

    "github.com/eser/ajan/connfx"
    "github.com/eser/ajan/connfx/adapters"
    "github.com/eser/ajan/logfx"
    "database/sql"
    "net/http"
)

func main() {
    // Create logger using logfx
    logger := logfx.NewLogger(os.Stdout, &logfx.Config{
        Level:      "INFO",
        PrettyMode: true,
    })

    // Create connection registry
    registry := connfx.NewRegistryWithDefaults(logger)

    // Load configuration - no behavior field needed!
    config := &connfx.Config{
        Connections: map[string]connfx.ConfigTarget{
            "default": {
                Protocol: "sqlite",
                Database: ":memory:",
            },
            "api": {
                Protocol: "http",
                URL:     "https://api.example.com",
            },
        },
    }

    ctx := context.Background()
    if err := registry.LoadFromConfig(ctx, config); err != nil {
        log.Fatal(err)
    }

    // Use connections
    dbConn := registry.GetDefault()
    if dbConn == nil {
        log.Fatal("Database connection not found")
    }

    apiConn := registry.GetNamed("api")
    if apiConn == nil {
        log.Fatal("API connection not found")
    }

    // Check behaviors determined by providers
    log.Printf("DB behaviors: %v", dbConn.GetBehaviors())     // [stateful]
    log.Printf("API behaviors: %v", apiConn.GetBehaviors())   // [stateless]

    // Check capabilities determined by providers
    log.Printf("DB capabilities: %v", dbConn.GetCapabilities())     // [transactional relational]
    log.Printf("API capabilities: %v", apiConn.GetBehaviors())   // []

    // Use the connections with type safety
    db, err := connfx.GetTypedConnection[*sql.DB](dbConn)
    if err != nil {
        log.Fatal("Failed to get SQL DB:", err)
    }

    httpClient, err := connfx.GetTypedConnection[*http.Client](apiConn)
    if err != nil {
        log.Fatal("Failed to get HTTP client:", err)
    }

    // Now use the typed connections safely
    _ = db        // *sql.DB
    _ = httpClient // *http.Client
}
Service Integration Pattern
func NewService(logger *logfx.Logger) *Service {
    registry := connfx.NewRegistry(logger)

    // Register required adapters
    registry.RegisterFactory(connfx.NewSQLConnectionFactory("sqlite"))
    registry.RegisterFactory(connfx.NewHTTPConnectionFactory("http"))
    registry.RegisterFactory(connfx.NewRedisConnectionFactory("redis")) // Supports both stateful + streaming

    // Load config
    registry.LoadFromConfig(ctx, config)

    return &Service{
        connRegistry: registry,
    }
}

func (s *Service) DoSomething(ctx context.Context) error {
    conn := s.connRegistry.GetNamed("primary")
    if conn == nil {
        return errors.New("connection not found")
    }

    // Verify protocol if needed
    if conn.GetProtocol() != "postgres" {
        return errors.New("expected postgres connection")
    }

    // Use connection...
    return nil
}
Creating Custom Adapters
package myadapter

import (
    "context"
    "github.com/eser/ajan/connfx"
)

// CustomConnection implements connfx.Connection
type CustomConnection struct {
    name     string
    protocol string
    // ... your connection-specific fields
}

func (c *CustomConnection) GetProtocol() string { return c.protocol }
func (c *CustomConnection) GetBehaviors() []connfx.ConnectionBehavior {
    // Your adapter defines what behaviors it supports
    return []connfx.ConnectionBehavior{
        connfx.ConnectionBehaviorStateful,
        connfx.ConnectionBehaviorStreaming, // Example: supports multiple behaviors
    }
}

func (c *CustomConnection) GetCapabilities() []connfx.ConnectionCapability {
    // Your adapter defines what capabilities it supports
    return []connfx.ConnectionCapability{
        connfx.ConnectionCapabilityRelational,
        connfx.ConnectionCapabilityTransactional, // Example: supports multiple capabilities
    }
}
// ... implement other Connection interface methods

// CustomConnectionFactory implements connfx.ConnectionFactory
type CustomConnectionFactory struct{}

func (f *CustomConnectionFactory) CreateConnection(ctx context.Context, config connfx.ConnectionConfig) (connfx.Connection, error) {
    // Create your custom connection
}

func (f *CustomConnectionFactory) GetProtocol() string { return "myprotocol" }

// Register the adapter
func RegisterCustomAdapter(registry *connfx.Registry) {
    factory := &CustomConnectionFactory{}
    registry.RegisterFactory(factory)
}

Configuration

Provider-Based Configuration
connections:
  primary_db:
    protocol: postgres  # Provider determines this is stateful
    host: localhost
    port: 5432
    database: myapp
    username: user
    password: secret

  cache:
    protocol: redis     # Provider determines this supports stateful + streaming
    host: localhost
    port: 6379

  external_api:
    protocol: http      # Provider determines this is stateless
    url: "https://api.example.com"
    timeout: 30s
    properties:
      headers:
        Authorization: "Bearer TOKEN"

  event_stream:
    protocol: kafka     # Provider determines this is streaming
    host: localhost
    port: 9092
Provider Behavior Examples
  • SQL Databases (postgres/mysql/sqlite): [stateful]
  • HTTP APIs (http/https/graphql): [stateless]
  • Redis: [stateful, streaming] - supports both key-value and pub/sub
  • Message Queues (kafka/rabbitmq): [streaming]

Working with Connections

Basic Connection Retrieval
// Get default connection
defaultConn := registry.GetDefault()
if defaultConn == nil {
    return errors.New("default connection not available")
}

// Get named connection
dbConn := registry.GetNamed("primary_db")
if dbConn == nil {
    return errors.New("database connection not found")
}

// Check connection properties
fmt.Printf("Protocol: %s, Behaviors: %v, Capabilities: %v\n",
    dbConn.GetProtocol(), dbConn.GetBehaviors(), dbConn.GetCapabilities())
Filtering by Behavior
// Get all stateful connections (databases, Redis for key-value, etc.)
statefulConns := registry.GetByBehavior(connfx.ConnectionBehaviorStateful)

// Get all stateless connections (APIs, etc.)
statelessConns := registry.GetByBehavior(connfx.ConnectionBehaviorStateless)

// Get all streaming connections (queues, Redis for pub/sub, etc.)
streamingConns := registry.GetByBehavior(connfx.ConnectionBehaviorStreaming)

// Redis appears in both stateful and streaming lists!
for _, conn := range statefulConns {
    fmt.Printf("Stateful connection: %s\n", conn.GetProtocol())
}
Filtering by Capabilities
// Get all relational connections (databases, etc.)
relationalConns := registry.GetByCapability(connfx.ConnectionCapabilityRelational)

// Get all transactional connections (databases, etc.)
transactionalConns := registry.GetByCapability(connfx.ConnectionCapabilityTransactional)

// Get all cache connections (redis, etc.)
cacheConns := registry.GetByCapability(connfx.ConnectionCapabilityCache)

// Get all queue connections (amqp, etc.)
queueConns := registry.GetByCapability(connfx.ConnectionCapabilityQueue)

// Get all key-value connections (redis, etc.)
keyValueConns := registry.GetByCapability(connfx.ConnectionCapabilityKeyValue)

// Redis appears in cache, queue and key-value lists!
for _, conn := range queueConns {
    fmt.Printf("Queue connection: %s\n", conn.GetProtocol())
}
Filtering by Protocol
// Get all Postgres connections
postgresConns := registry.GetByProtocol("postgres")

// Get all Redis connections (support multiple behaviors and capabilities)
redisConns := registry.GetByProtocol("redis")
for _, conn := range redisConns {
    fmt.Printf("Redis connection supports: %v and %v\n", conn.GetBehaviors(), conn.GetCapabilities())
    // Output: Redis connection supports: [stateful streaming] and [cache queue key-value]
}
Type-Safe Connection Extraction

The GetTypedConnection generic function provides type-safe extraction of raw connections without manual type assertions:

import "database/sql"

// Get connection
conn := registry.GetNamed("database")
if conn == nil {
    return errors.New("database connection not found")
}

// Extract typed connection safely
db, err := connfx.GetTypedConnection[*sql.DB](conn)
if err != nil {
    return fmt.Errorf("failed to get SQL database: %w", err)
}

// Now db is *sql.DB and can be used safely
rows, err := db.QueryContext(ctx, "SELECT * FROM users")
if err != nil {
    return err
}
defer rows.Close()

// Works with any connection type
httpConn := registry.GetNamed("api")
if httpConn == nil {
    return errors.New("API connection not found")
}

client, err := connfx.GetTypedConnection[*http.Client](httpConn)
if err != nil {
    return fmt.Errorf("failed to get HTTP client: %w", err)
}

resp, err := client.Get("https://api.example.com/data")
Combined Usage Pattern
// Get and extract in one pattern
func getDatabase(registry *connfx.Registry, name string) (*sql.DB, error) {
    conn := registry.GetNamed(name)
    if conn == nil {
        return nil, fmt.Errorf("connection %q not found", name)
    }

    return connfx.GetTypedConnection[*sql.DB](conn)
}

// Usage
db, err := getDatabase(registry, "primary")
if err != nil {
    return err
}
// db is now *sql.DB

Health Checks

// Check all connections
statuses := registry.HealthCheck(ctx)
for name, status := range statuses {
    conn := registry.GetNamed(name)
    if conn != nil {
        fmt.Printf("Connection %s (%s/%v): %s (latency: %v)\n",
            name, conn.GetProtocol(), conn.GetBehaviors(),
            status.State, status.Latency)
    }
}

// Check specific connection
conn := registry.GetNamed("primary")
status := conn.HealthCheck(ctx)
fmt.Printf("Status: %s, Message: %s\n", status.State, status.Message)

Available Adapters

SQL Databases (Stateful)
  • Postgres: registry.RegisterFactory(postgresFactory)[stateful]
  • MySQL: registry.RegisterFactory(mysqlFactory)[stateful]
  • SQLite: registry.RegisterFactory(sqliteFactory)[stateful]
HTTP APIs (Stateless)
  • HTTP: registry.RegisterFactory(httpFactory)[stateless]
  • GraphQL: registry.RegisterFactory(graphqlFactory)[stateless]
Multiple Behavior Providers
  • Redis: registry.RegisterFactory(redisFactory)[stateful, streaming]

Integration Examples

datafx Integration
// datafx can get SQL connections by behavior
statefulConns := registry.GetByBehavior(connfx.ConnectionBehaviorStateful)
for _, conn := range statefulConns {
    if conn.GetProtocol() == "postgres" {
        db, err := connfx.GetTypedConnection[*sql.DB](conn)
        if err != nil {
            log.Printf("Failed to get SQL DB: %v", err)
            continue
        }
        // Use db for datafx operations
    }
}
queuefx Integration
// queuefx can get streaming connections
streamingConns := registry.GetByBehavior(connfx.ConnectionBehaviorStreaming)
for _, conn := range streamingConns {
    switch conn.GetProtocol() {
    case "redis":
        // Type-safe Redis connection extraction
        client, err := connfx.GetTypedConnection[redis.Client](conn) // Assuming redis.Client type
        if err != nil {
            log.Printf("Failed to get Redis client: %v", err)
            continue
        }
        // Use client for queue operations
    case "kafka":
        // Type-safe Kafka connection extraction
        kafkaConn, err := connfx.GetTypedConnection[kafka.Connection](conn) // Assuming kafka.Connection type
        if err != nil {
            log.Printf("Failed to get Kafka connection: %v", err)
            continue
        }
        // Handle Kafka connections
    }
}
Multi-Purpose Redis Usage
// Use Redis for both caching (stateful) and pub/sub (streaming)
redisConn := registry.GetNamed("cache")
if redisConn == nil {
    return errors.New("cache connection not found")
}

// Check what behaviors and capabilities this Redis connection supports
behaviors := redisConn.GetBehaviors()
capabilities := redisConn.GetCapabilities()
fmt.Printf("Redis supports: %v and %v\n", behaviors, capabilities) // [stateful streaming] and [cache queue key-value]

// Extract Redis client safely
redisClient, err := connfx.GetTypedConnection[redis.Client](redisConn) // Assuming redis.Client type
if err != nil {
    return fmt.Errorf("failed to get Redis client: %w", err)
}

// Use for caching (stateful behavior)
if hasStateful(behaviors) {
    // Use redisClient for GET/SET operations
    err := redisClient.Set("key", "value")
    if err != nil {
        return err
    }
}

// Use for pub/sub (streaming behavior)
if hasStreaming(behaviors) {
    // Use redisClient for PUBLISH/SUBSCRIBE operations
    err := redisClient.Publish("channel", "message")
    if err != nil {
        return err
    }
}

API Reference

Registry
type Registry struct {
    // ... internal fields
}

// Core methods for connection retrieval
func (r *Registry) GetDefault() Connection
func (r *Registry) GetNamed(name string) Connection

// Behavior and protocol filtering
func (r *Registry) GetByBehavior(behavior ConnectionBehavior) []Connection
func (r *Registry) GetByProtocol(protocol string) []Connection

// Connection management
func (r *Registry) AddConnection(ctx context.Context, config *ConnectionConfig) (Connection, error)
func (r *Registry) RemoveConnection(ctx context.Context, name string) error
func (r *Registry) LoadFromConfig(ctx context.Context, config *Config) error

// Health monitoring
func (r *Registry) HealthCheck(ctx context.Context) map[string]*HealthStatus

// Administrative methods
func (r *Registry) ListConnections() []string
func (r *Registry) ListRegisteredProtocols() []string
func (r *Registry) Close(ctx context.Context) error

// Adapter registration
func (r *Registry) RegisterFactory(factory ConnectionFactory) error
Connection Interface
type Connection interface {
    GetBehaviors() []ConnectionBehavior
    GetCapabilities() []ConnectionCapability
    GetProtocol() string
    GetState() ConnectionState
    HealthCheck(ctx context.Context) *HealthStatus
    Close(ctx context.Context) error
    GetRawConnection() any
}
Type-Safe Connection Extraction
func GetTypedConnection[T any](conn Connection) (T, error)

Error Handling

// Check for nil connections
conn := registry.GetNamed("nonexistent")
if conn == nil {
    // Handle missing connection
    return errors.New("connection not found")
}

// Type extraction errors
db, err := connfx.GetTypedConnection[*sql.DB](conn)
if errors.Is(err, connfx.ErrInvalidType) {
    // Handle type mismatch
}

Best Practices

  1. Register Adapters Early: Register all needed adapters during application startup
  2. Let Providers Define Behaviors: Don't specify behaviors in config - let adapters define them
  3. Use Behavior Filtering: Filter connections by behavior for generic operations
  4. Check for Nil: Always check if GetNamed() returns nil before using connections
  5. Type-Safe Extraction: Use GetTypedConnection[T]() instead of manual type assertions for better error handling
  6. Multi-Behavior Awareness: Remember that some providers (like Redis) support multiple behaviors
  7. Health Monitoring: Regularly check connection health for monitoring
  8. Graceful Shutdown: Call registry.Close(ctx) during shutdown
  9. Configuration: Use configuration files for connection management
  10. Adapter Separation: Keep adapters in separate packages for modularity

Architecture Benefits

  • Open/Closed Principle: Easy to add new connection types without modifying core code
  • Dependency Inversion: Core module doesn't depend on specific connection implementations
  • Single Responsibility: Each adapter handles one protocol/technology
  • Provider Autonomy: Adapters define their own supported behaviors
  • Extensibility: Anyone can create and register custom adapters with any behavior combination
  • Non-Opinionated: Core module doesn't make assumptions about specific technologies
  • Simplified API: Focus on essential operations with clear, predictable behavior

Thread Safety

connfx is fully thread-safe and can be used concurrently from multiple goroutines. All operations on the registry and connections are protected by appropriate synchronization mechanisms.

Documentation

Index

Constants

View Source
const (
	DefaultHTTPTimeout = 30 * time.Second
	HealthCheckTimeout = 2 * time.Second
)
View Source
const (
	DefaultPrefetchCount = 10
	DefaultMaxRetries    = 3
	DefaultBlockTimeout  = 5 * time.Second
)

Default values for consumer configuration.

View Source
const DefaultConnection = "default"

Variables

View Source
var (
	ErrAMQPClientNotInitialized = errors.New("AMQP client not initialized")
	ErrFailedToOpenConnection   = errors.New("failed to open AMQP connection")
	ErrFailedToOpenChannel      = errors.New("failed to open AMQP channel")
	ErrFailedToCloseConnection  = errors.New("failed to close AMQP connection")
	ErrFailedToCloseChannel     = errors.New("failed to close AMQP channel")
	ErrFailedToDeclareQueue     = errors.New("failed to declare queue")
	ErrFailedToPublishMessage   = errors.New("failed to publish message")
	ErrFailedToStartConsuming   = errors.New("failed to start consuming")
	ErrChannelClosed            = errors.New("channel closed")
	ErrFailedToReconnect        = errors.New("failed to reconnect")
	ErrDeliveryChannelClosed    = errors.New("delivery channel closed")
	ErrNoChannelAvailable       = errors.New("no channel available")
	ErrFailedToCloseAMQPClient  = errors.New("failed to close AMQP client")
	ErrAMQPOperation            = errors.New("AMQP operation failed")
	ErrAMQPConnectionFailed     = errors.New("failed to connect to AMQP")
	ErrFailedToCreateAMQPClient = errors.New("failed to create AMQP client")
	ErrAMQPUnsupportedOperation = errors.New("operation not supported by AMQP")
	ErrIntegerOverflow          = errors.New("integer overflow in conversion")
)
View Source
var (
	ErrFailedToCreateHTTPClient = errors.New("failed to create HTTP client")
	ErrFailedToHealthCheckHTTP  = errors.New("failed to health check HTTP endpoint")
	ErrInvalidConfigTypeHTTP    = errors.New("invalid config type for HTTP connection")
	ErrUnsupportedBodyType      = errors.New("unsupported body type")
	ErrFailedToCreateRequest    = errors.New("failed to create HTTP request")
	ErrFailedToLoadCertificate  = errors.New("failed to load client certificate")
)
View Source
var (
	ErrRedisClientNotInitialized   = errors.New("redis client not initialized")
	ErrFailedToCloseRedisClient    = errors.New("failed to close Redis client")
	ErrRedisOperation              = errors.New("redis operation failed")
	ErrRedisConnectionFailed       = errors.New("failed to connect to Redis")
	ErrRedisUnexpectedPingResponse = errors.New("unexpected ping response")
	ErrRedisPoolTimeouts           = errors.New("redis connection pool has timeouts")
	ErrFailedToCreateRedisClient   = errors.New("failed to create Redis client")
)
View Source
var (
	ErrFailedToOpenSQLConnection = errors.New("failed to open SQL connection")
	ErrFailedToPingSQL           = errors.New("failed to ping SQL database")
	ErrInvalidConfigTypeSQL      = errors.New("invalid config type for SQL connection")
	ErrUnsupportedSQLProtocol    = errors.New("unsupported SQL protocol")
	ErrFailedToCloseSQLDB        = errors.New("failed to close SQL database")
	ErrSQLConnectionNil          = errors.New("SQL connection is nil")
)
View Source
var (
	ErrInvalidConnectionBehavior = errors.New("invalid connection behavior")
	ErrInvalidConnectionProtocol = errors.New("invalid connection protocol")
	ErrInvalidDSN                = errors.New("invalid DSN")
	ErrInvalidURL                = errors.New("invalid URL")
	ErrInvalidConfigType         = errors.New("invalid config type")
)
View Source
var (
	ErrConnectionIsNil    = errors.New("connection is nil")
	ErrRawConnectionIsNil = errors.New("raw connection is nil")
	ErrInvalidType        = errors.New("invalid type")
)

Sentinel errors for GetTypedConnection function.

View Source
var (
	ErrConnectionNotFound       = errors.New("connection not found")
	ErrConnectionAlreadyExists  = errors.New("connection already exists")
	ErrFailedToCreateConnection = errors.New("failed to create connection")
	ErrUnsupportedProtocol      = errors.New("unsupported protocol")
	ErrFailedToCloseConnections = errors.New("failed to close connections")
	ErrFailedToAddConnection    = errors.New("failed to add connection")
	ErrConnectionNotSupported   = errors.New("connection does not support required operations")
	ErrInterfaceNotImplemented  = errors.New("connection does not implement required interface")
)

Functions

func GetTypedConnection

func GetTypedConnection[T any](conn Connection) (T, error)

GetTypedConnection extracts a typed connection from a Connection interface. This provides type-safe access to the underlying connection without manual type assertions.

Example usage:

conn, err := connfx.GetConnection("database")
if err != nil { return err }

db, err := connfx.GetTypedConnection[*sql.DB](conn)
if err != nil { return err }

// Now db is *sql.DB and can be used safely
rows, err := db.Query("SELECT * FROM users")

Types

type AMQPAdapter

type AMQPAdapter struct {
	// contains filtered or unexported fields
}

AMQPAdapter implements the QueueRepository interface for AMQP-based message queues.

func (*AMQPAdapter) AckMessage added in v0.7.1

func (aa *AMQPAdapter) AckMessage(
	ctx context.Context,
	queueName, consumerGroup, receiptHandle string,
) error

func (*AMQPAdapter) ClaimPendingMessages added in v0.7.1

func (aa *AMQPAdapter) ClaimPendingMessages(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	minIdleTime time.Duration,
	count int,
) ([]Message, error)

func (*AMQPAdapter) Consume

func (aa *AMQPAdapter) Consume(
	ctx context.Context,
	queueName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*AMQPAdapter) ConsumeWithGroup added in v0.7.1

func (aa *AMQPAdapter) ConsumeWithGroup(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*AMQPAdapter) DeleteMessage added in v0.7.1

func (aa *AMQPAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error

func (*AMQPAdapter) Publish

func (aa *AMQPAdapter) Publish(ctx context.Context, queueName string, body []byte) error

func (*AMQPAdapter) PublishWithHeaders added in v0.7.1

func (aa *AMQPAdapter) PublishWithHeaders(
	ctx context.Context,
	queueName string,
	body []byte,
	headers map[string]any,
) error

func (*AMQPAdapter) QueueDeclare

func (aa *AMQPAdapter) QueueDeclare(ctx context.Context, name string) (string, error)

QueueRepository interface implementation.

func (*AMQPAdapter) QueueDeclareWithConfig added in v0.7.1

func (aa *AMQPAdapter) QueueDeclareWithConfig(
	ctx context.Context,
	name string,
	config QueueConfig,
) (string, error)

type AMQPConfig added in v0.7.1

type AMQPConfig struct {
	URL string
}

AMQPConfig holds AMQP-specific configuration options.

func NewDefaultAMQPConfig added in v0.7.1

func NewDefaultAMQPConfig() *AMQPConfig

NewDefaultAMQPConfig creates an AMQP configuration with sensible defaults.

type AMQPConnection

type AMQPConnection struct {
	// contains filtered or unexported fields
}

AMQPConnection implements the connfx.Connection interface for AMQP connections.

func NewAMQPConnection

func NewAMQPConnection(protocol string, config *AMQPConfig) *AMQPConnection

NewAMQPConnection creates a new AMQP connection.

func (*AMQPConnection) Close

func (ac *AMQPConnection) Close(ctx context.Context) error

func (*AMQPConnection) GetBehaviors

func (ac *AMQPConnection) GetBehaviors() []ConnectionBehavior

Connection interface implementation.

func (*AMQPConnection) GetCapabilities

func (ac *AMQPConnection) GetCapabilities() []ConnectionCapability

func (*AMQPConnection) GetProtocol

func (ac *AMQPConnection) GetProtocol() string

func (*AMQPConnection) GetRawConnection

func (ac *AMQPConnection) GetRawConnection() any

func (*AMQPConnection) GetState

func (ac *AMQPConnection) GetState() ConnectionState

func (*AMQPConnection) HealthCheck

func (ac *AMQPConnection) HealthCheck(ctx context.Context) *HealthStatus

type AMQPConnectionFactory

type AMQPConnectionFactory struct {
	// contains filtered or unexported fields
}

AMQPConnectionFactory creates AMQP connections.

func NewAMQPConnectionFactory

func NewAMQPConnectionFactory(protocol string) *AMQPConnectionFactory

NewAMQPConnectionFactory creates a new AMQP connection factory for a specific protocol.

func (*AMQPConnectionFactory) CreateConnection

func (f *AMQPConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*AMQPConnectionFactory) GetProtocol

func (f *AMQPConnectionFactory) GetProtocol() string

type CacheRepository

type CacheRepository interface {
	Repository

	// SetWithExpiration stores a value with the given key and expiration time
	SetWithExpiration(ctx context.Context, key string, value []byte, expiration time.Duration) error

	// GetTTL returns the time-to-live for a key
	GetTTL(ctx context.Context, key string) (time.Duration, error)

	// Expire sets an expiration time for an existing key
	Expire(ctx context.Context, key string, expiration time.Duration) error
}

CacheRepository extends Repository with cache-specific operations.

type Config

type Config struct {
	Targets map[string]ConfigTarget `conf:"targets"`
}

Config represents the main configuration for connfx.

type ConfigTarget

type ConfigTarget struct {
	Properties map[string]any `conf:"properties"`

	Protocol string `conf:"protocol"` // e.g., "postgres", "redis", "http"
	DSN      string `conf:"dsn"`
	URL      string `conf:"url"`
	Host     string `conf:"host"`
	CertFile string `conf:"cert_file"`
	KeyFile  string `conf:"key_file"`
	CAFile   string `conf:"ca_file"`

	// External credential management
	Port    int           `conf:"port"`
	Timeout time.Duration `conf:"timeout"`

	// Authentication and security
	TLS           bool `conf:"tls"`
	TLSSkipVerify bool `conf:"tls_skip_verify"`
}

ConfigTarget represents the configuration data for a connection.

type Connection

type Connection interface {
	// GetBehaviors returns the connection behaviors this connection supports
	GetBehaviors() []ConnectionBehavior

	// GetCapabilities returns the connection capabilities this connection supports
	GetCapabilities() []ConnectionCapability

	// GetProtocol returns the protocol/technology used (e.g., "postgres", "redis", "http")
	GetProtocol() string

	// GetState returns the current connection state
	GetState() ConnectionState

	// HealthCheck performs a health check and returns the status
	HealthCheck(ctx context.Context) *HealthStatus

	// Close closes the connection
	Close(ctx context.Context) error

	// GetRawConnection returns the underlying connection object
	GetRawConnection() any
}

Connection represents a generic connection interface.

type ConnectionBehavior

type ConnectionBehavior string

ConnectionBehavior represents the behavioral type of connection.

const (
	// ConnectionBehaviorStateful represents persistent connections that maintain state
	// Examples: database connections, persistent TCP connections, connection pools.
	ConnectionBehaviorStateful ConnectionBehavior = "stateful"

	// ConnectionBehaviorStateless represents connections that don't maintain state
	// Examples: HTTP clients, REST APIs, stateless services.
	ConnectionBehaviorStateless ConnectionBehavior = "stateless"

	// ConnectionBehaviorStreaming represents streaming/real-time connections
	// Examples: message queues, event streams, websockets, gRPC streams.
	ConnectionBehaviorStreaming ConnectionBehavior = "streaming"
)

type ConnectionCapability

type ConnectionCapability string
const (
	// ConnectionCapabilityKeyValue represents key-value storage behavior.
	ConnectionCapabilityKeyValue ConnectionCapability = "key-value"

	// ConnectionCapabilityDocument represents document storage behavior.
	ConnectionCapabilityDocument ConnectionCapability = "document"

	// ConnectionCapabilityRelational represents relational database behavior.
	ConnectionCapabilityRelational ConnectionCapability = "relational"

	// ConnectionCapabilityTransactional represents transactional behavior.
	ConnectionCapabilityTransactional ConnectionCapability = "transactional"

	// ConnectionCapabilityCache represents caching behavior with expiration support.
	ConnectionCapabilityCache ConnectionCapability = "cache"

	// ConnectionCapabilityQueue represents message queue behavior.
	ConnectionCapabilityQueue ConnectionCapability = "queue"
)

type ConnectionFactory

type ConnectionFactory interface {
	// CreateConnection creates a new connection from configuration
	CreateConnection(ctx context.Context, config *ConfigTarget) (Connection, error)

	// GetProtocol returns the protocol this factory supports (e.g., "postgres", "redis")
	GetProtocol() string
}

ConnectionFactory creates connections from configuration.

type ConnectionState

type ConnectionState int32

ConnectionState represents the current state of a connection.

const (
	ConnectionStateNotInitialized ConnectionState = 0
	ConnectionStateConnected      ConnectionState = 1
	ConnectionStateLive           ConnectionState = 2
	ConnectionStateReady          ConnectionState = 3
	ConnectionStateDisconnected   ConnectionState = 4
	ConnectionStateError          ConnectionState = 5
	ConnectionStateReconnecting   ConnectionState = 6
)

func (ConnectionState) String

func (i ConnectionState) String() string

type ConsumerConfig

type ConsumerConfig struct {
	// Args additional arguments for queue declaration
	Args map[string]any
	// AutoAck when true, the server will automatically acknowledge messages
	AutoAck bool
	// Exclusive when true, only this consumer can access the queue
	Exclusive bool
	// NoLocal when true, the server will not send messages to the connection that published them
	NoLocal bool
	// NoWait when true, the server will not respond to the declare
	NoWait bool
	// PrefetchCount sets how many messages to prefetch
	PrefetchCount int
	// BlockTimeout sets how long to wait for messages
	BlockTimeout time.Duration
	// MaxRetries sets maximum number of retries for failed messages
	MaxRetries int
	// RetryDelay sets delay between retries
	RetryDelay time.Duration
}

ConsumerConfig holds configuration for message consumption.

func DefaultConsumerConfig

func DefaultConsumerConfig() ConsumerConfig

DefaultConsumerConfig returns a default configuration for consuming messages.

type ConsumerGroupInfo added in v0.7.1

type ConsumerGroupInfo struct {
	Name            string `json:"name"`
	LastDeliveredID string `json:"last_delivered_id"`
	Consumers       int64  `json:"consumers"`
	Pending         int64  `json:"pending"`
	EntriesRead     int64  `json:"entries_read"`
	Lag             int64  `json:"lag"`
}

ConsumerGroupInfo provides information about a consumer group.

type ExecuteResult

type ExecuteResult interface {
	// RowsAffected returns the number of rows affected
	RowsAffected() (int64, error)

	// LastInsertId returns the last insert ID (if applicable)
	LastInsertId() (int64, error)
}

ExecuteResult represents execution results.

type HTTPConnection

type HTTPConnection struct {
	// contains filtered or unexported fields
}

HTTPConnection represents an HTTP API connection.

func (*HTTPConnection) Close

func (c *HTTPConnection) Close(ctx context.Context) error

func (*HTTPConnection) GetBaseURL

func (c *HTTPConnection) GetBaseURL() string

GetBaseURL returns the base URL for this connection.

func (*HTTPConnection) GetBehaviors

func (c *HTTPConnection) GetBehaviors() []ConnectionBehavior

func (*HTTPConnection) GetCapabilities

func (c *HTTPConnection) GetCapabilities() []ConnectionCapability

func (*HTTPConnection) GetClient

func (c *HTTPConnection) GetClient() *http.Client

GetClient returns the underlying HTTP client.

func (*HTTPConnection) GetHeaders

func (c *HTTPConnection) GetHeaders() map[string]string

GetHeaders returns the default headers for this connection.

func (*HTTPConnection) GetProtocol

func (c *HTTPConnection) GetProtocol() string

func (*HTTPConnection) GetRawConnection

func (c *HTTPConnection) GetRawConnection() any

func (*HTTPConnection) GetState

func (c *HTTPConnection) GetState() ConnectionState

func (*HTTPConnection) HealthCheck

func (c *HTTPConnection) HealthCheck(
	ctx context.Context,
) *HealthStatus

func (*HTTPConnection) NewRequest

func (c *HTTPConnection) NewRequest(
	ctx context.Context,
	method string,
	path string,
	body any,
) (*http.Request, error)

NewRequest creates a new HTTP request with the connection's default headers.

type HTTPConnectionFactory

type HTTPConnectionFactory struct {
	// contains filtered or unexported fields
}

HTTPConnectionFactory creates HTTP connections.

func NewHTTPConnectionFactory

func NewHTTPConnectionFactory(protocol string) *HTTPConnectionFactory

NewHTTPConnectionFactory creates a new HTTP connection factory.

func (*HTTPConnectionFactory) CreateConnection

func (f *HTTPConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*HTTPConnectionFactory) GetProtocol

func (f *HTTPConnectionFactory) GetProtocol() string

type HealthStatus

type HealthStatus struct {
	Timestamp time.Time       `json:"timestamp"`
	Error     error           `json:"error,omitempty"`
	Message   string          `json:"message,omitempty"`
	Latency   time.Duration   `json:"latency,omitempty"`
	State     ConnectionState `json:"state"`
}

HealthStatus represents the health check result.

type Message

type Message struct {
	// Timestamp when the message was created
	Timestamp time.Time
	// Headers contains message headers
	Headers map[string]any

	// ReceiptHandle is a unique identifier for the message (for acknowledgment)
	ReceiptHandle string
	// MessageID is the message identifier
	MessageID string
	// ConsumerGroup indicates which consumer group this message belongs to (if applicable)
	ConsumerGroup string
	// StreamName indicates which stream this message came from (for stream-based systems)
	StreamName string
	// Body contains the message payload
	Body []byte
	// DeliveryCount indicates how many times this message has been delivered
	DeliveryCount int
	// contains filtered or unexported fields
}

Message represents a consumed message with its metadata and acknowledgment functions.

func (*Message) Ack

func (m *Message) Ack() error

Ack acknowledges the message.

func (*Message) Nack

func (m *Message) Nack(requeue bool) error

Nack negatively acknowledges the message.

func (*Message) SetAckFunc

func (m *Message) SetAckFunc(ackFunc func() error)

SetAckFunc sets the acknowledgment function.

func (*Message) SetNackFunc

func (m *Message) SetNackFunc(nackFunc func(requeue bool) error)

SetNackFunc sets the negative acknowledgment function.

type QueryRepository

type QueryRepository interface {
	// Query executes a query and returns raw results
	Query(ctx context.Context, query string, args ...any) (QueryResult, error)

	// Execute runs a command (INSERT, UPDATE, DELETE)
	Execute(ctx context.Context, command string, args ...any) (ExecuteResult, error)
}

QueryRepository defines the port for query operations (for SQL-like storages).

type QueryResult

type QueryResult interface {
	// Next advances to the next row
	Next() bool

	// Scan scans the current row into destinations
	Scan(dest ...any) error

	// Close closes the result set
	Close() error
}

QueryResult represents query results.

type QueueConfig added in v0.7.1

type QueueConfig struct {
	// Args contains additional queue-specific arguments
	Args map[string]any
	// MaxLength sets maximum number of messages in queue (0 = unlimited)
	MaxLength int64
	// MessageTTL sets default TTL for messages
	MessageTTL time.Duration
	// Durable indicates if the queue should survive server restarts
	Durable bool
	// AutoDelete indicates if the queue should be deleted when no longer in use
	AutoDelete bool
	// Exclusive indicates if the queue is exclusive to one connection
	Exclusive bool
}

QueueConfig holds configuration for queue declaration.

func DefaultQueueConfig added in v0.7.1

func DefaultQueueConfig() QueueConfig

DefaultQueueConfig returns a default configuration for queue declaration.

type QueueRepository

type QueueRepository interface {
	// QueueDeclare declares a queue and returns its name
	QueueDeclare(ctx context.Context, name string) (string, error)

	// QueueDeclareWithConfig declares a queue with specific configuration
	QueueDeclareWithConfig(ctx context.Context, name string, config QueueConfig) (string, error)

	// Publish sends a message to a queue
	Publish(ctx context.Context, queueName string, body []byte) error

	// PublishWithHeaders sends a message with custom headers
	PublishWithHeaders(
		ctx context.Context,
		queueName string,
		body []byte,
		headers map[string]any,
	) error

	// Consume starts consuming messages from a queue
	Consume(
		ctx context.Context,
		queueName string,
		config ConsumerConfig,
	) (<-chan Message, <-chan error)

	// ConsumeWithGroup starts consuming messages as part of a consumer group
	ConsumeWithGroup(
		ctx context.Context,
		queueName string,
		consumerGroup string,
		consumerName string,
		config ConsumerConfig,
	) (<-chan Message, <-chan error)

	// ClaimPendingMessages claims pending messages from a consumer group
	ClaimPendingMessages(
		ctx context.Context,
		queueName string,
		consumerGroup string,
		consumerName string,
		minIdleTime time.Duration,
		count int,
	) ([]Message, error)

	// AckMessage acknowledges a specific message by receipt handle
	AckMessage(ctx context.Context, queueName, consumerGroup, receiptHandle string) error

	// DeleteMessage removes a message from the queue (for non-streaming queues)
	DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
}

QueueRepository defines the port for message queue operations.

type QueueStreamRepository added in v0.7.1

type QueueStreamRepository interface {
	QueueRepository

	// CreateConsumerGroup creates a consumer group for a stream
	CreateConsumerGroup(ctx context.Context, streamName, consumerGroup, startID string) error

	// StreamInfo returns information about a stream
	StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)

	// ConsumerGroupInfo returns information about consumer groups
	ConsumerGroupInfo(ctx context.Context, streamName string) ([]ConsumerGroupInfo, error)

	// TrimStream trims a stream to a maximum length
	TrimStream(ctx context.Context, streamName string, maxLen int64) error
}

QueueStreamRepository defines operations for stream-based message systems (Redis Streams, Kafka, etc.)

type RedisAdapter

type RedisAdapter struct {
	// contains filtered or unexported fields
}

RedisAdapter implements Redis operations and wraps the Redis client.

func (*RedisAdapter) AckMessage added in v0.7.1

func (ra *RedisAdapter) AckMessage(
	ctx context.Context,
	queueName, consumerGroup, receiptHandle string,
) error

func (*RedisAdapter) ClaimPendingMessages added in v0.7.1

func (ra *RedisAdapter) ClaimPendingMessages(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	minIdleTime time.Duration,
	count int,
) ([]Message, error)

ClaimPendingMessages claims pending messages from a consumer group.

func (*RedisAdapter) Consume added in v0.7.1

func (ra *RedisAdapter) Consume(
	ctx context.Context,
	queueName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*RedisAdapter) ConsumeWithGroup added in v0.7.1

func (ra *RedisAdapter) ConsumeWithGroup(
	ctx context.Context,
	queueName string,
	consumerGroup string,
	consumerName string,
	config ConsumerConfig,
) (<-chan Message, <-chan error)

func (*RedisAdapter) ConsumerGroupInfo added in v0.7.1

func (ra *RedisAdapter) ConsumerGroupInfo(
	ctx context.Context,
	streamName string,
) ([]ConsumerGroupInfo, error)

func (*RedisAdapter) CreateConsumerGroup added in v0.7.1

func (ra *RedisAdapter) CreateConsumerGroup(
	ctx context.Context,
	streamName, consumerGroup, startID string,
) error

StreamRepository interface implementation.

func (*RedisAdapter) DeleteMessage added in v0.7.1

func (ra *RedisAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error

func (*RedisAdapter) Exists

func (ra *RedisAdapter) Exists(ctx context.Context, key string) (bool, error)

func (*RedisAdapter) Expire

func (ra *RedisAdapter) Expire(ctx context.Context, key string, expiration time.Duration) error

func (*RedisAdapter) Get

func (ra *RedisAdapter) Get(ctx context.Context, key string) ([]byte, error)

StoreRepository interface implementation.

func (*RedisAdapter) GetTTL

func (ra *RedisAdapter) GetTTL(ctx context.Context, key string) (time.Duration, error)

func (*RedisAdapter) Publish added in v0.7.1

func (ra *RedisAdapter) Publish(ctx context.Context, queueName string, body []byte) error

func (*RedisAdapter) PublishWithHeaders added in v0.7.1

func (ra *RedisAdapter) PublishWithHeaders(
	ctx context.Context,
	queueName string,
	body []byte,
	headers map[string]any,
) error

func (*RedisAdapter) QueueDeclare added in v0.7.1

func (ra *RedisAdapter) QueueDeclare(ctx context.Context, name string) (string, error)

QueueRepository interface implementation for Redis Streams.

func (*RedisAdapter) QueueDeclareWithConfig added in v0.7.1

func (ra *RedisAdapter) QueueDeclareWithConfig(
	ctx context.Context,
	name string,
	config QueueConfig,
) (string, error)

func (*RedisAdapter) Remove

func (ra *RedisAdapter) Remove(ctx context.Context, key string) error

func (*RedisAdapter) Set

func (ra *RedisAdapter) Set(ctx context.Context, key string, value []byte) error

func (*RedisAdapter) SetWithExpiration

func (ra *RedisAdapter) SetWithExpiration(
	ctx context.Context,
	key string,
	value []byte,
	expiration time.Duration,
) error

CacheRepository interface implementation.

func (*RedisAdapter) StreamInfo added in v0.7.1

func (ra *RedisAdapter) StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)

func (*RedisAdapter) TrimStream added in v0.7.1

func (ra *RedisAdapter) TrimStream(ctx context.Context, streamName string, maxLen int64) error

func (*RedisAdapter) Update

func (ra *RedisAdapter) Update(ctx context.Context, key string, value []byte) error

type RedisConfig added in v0.7.1

type RedisConfig struct {
	Address               string
	Password              string
	DB                    int
	PoolSize              int
	MinIdleConns          int
	MaxIdleConns          int
	ConnMaxIdleTime       time.Duration
	PoolTimeout           time.Duration
	MaxRetries            int
	MinRetryBackoff       time.Duration
	MaxRetryBackoff       time.Duration
	TLSEnabled            bool
	TLSInsecureSkipVerify bool
}

RedisConfig holds Redis-specific configuration options.

func NewDefaultRedisConfig added in v0.7.1

func NewDefaultRedisConfig() *RedisConfig

NewDefaultRedisConfig creates a Redis configuration with sensible defaults.

type RedisConnection

type RedisConnection struct {
	// contains filtered or unexported fields
}

RedisConnection implements the connfx.Connection interface.

func NewRedisConnection

func NewRedisConnection(protocol string, config *RedisConfig) *RedisConnection

NewRedisConnection creates a new Redis connection with enhanced configuration.

func (*RedisConnection) Close

func (rc *RedisConnection) Close(ctx context.Context) error

func (*RedisConnection) GetBehaviors

func (rc *RedisConnection) GetBehaviors() []ConnectionBehavior

Connection interface implementation.

func (*RedisConnection) GetCapabilities

func (rc *RedisConnection) GetCapabilities() []ConnectionCapability

func (*RedisConnection) GetClient added in v0.7.1

func (rc *RedisConnection) GetClient() *redis.Client

GetClient returns the underlying Redis client for advanced operations.

func (*RedisConnection) GetProtocol

func (rc *RedisConnection) GetProtocol() string

func (*RedisConnection) GetRawConnection

func (rc *RedisConnection) GetRawConnection() any

func (*RedisConnection) GetState

func (rc *RedisConnection) GetState() ConnectionState

func (*RedisConnection) GetStats added in v0.7.1

func (rc *RedisConnection) GetStats() map[string]any

GetStats returns detailed connection and pool statistics.

func (*RedisConnection) HealthCheck

func (rc *RedisConnection) HealthCheck(ctx context.Context) *HealthStatus

type RedisConnectionFactory

type RedisConnectionFactory struct {
	// contains filtered or unexported fields
}

RedisConnectionFactory creates Redis connections with enhanced configuration.

func NewRedisConnectionFactory

func NewRedisConnectionFactory(protocol string) *RedisConnectionFactory

NewRedisConnectionFactory creates a new Redis connection factory for a specific protocol.

func (*RedisConnectionFactory) CreateConnection

func (f *RedisConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*RedisConnectionFactory) GetProtocol

func (f *RedisConnectionFactory) GetProtocol() string

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages all connections in the system.

func NewRegistry

func NewRegistry(logger *logfx.Logger) *Registry

NewRegistry creates a new connection registry.

func NewRegistryWithDefaults

func NewRegistryWithDefaults(logger *logfx.Logger) *Registry

func (*Registry) AddConnection

func (registry *Registry) AddConnection(
	ctx context.Context,
	name string,
	config *ConfigTarget,
) (Connection, error)

AddConnection adds a new connection to the registry.

func (*Registry) Close

func (registry *Registry) Close(ctx context.Context) error

Close closes all connections in the registry.

func (*Registry) GetByBehavior

func (registry *Registry) GetByBehavior(behavior ConnectionBehavior) []Connection

GetByBehavior returns all connections of a specific behavior.

func (*Registry) GetByCapability

func (registry *Registry) GetByCapability(capability ConnectionCapability) []Connection

GetByCapability returns all connections of a specific capability.

func (*Registry) GetByProtocol

func (registry *Registry) GetByProtocol(protocol string) []Connection

GetByProtocol returns all connections of a specific protocol.

func (*Registry) GetDefault

func (registry *Registry) GetDefault() Connection

GetDefault returns the default connection.

func (*Registry) GetNamed

func (registry *Registry) GetNamed(name string) Connection

GetNamed returns a named connection.

func (*Registry) GetRepository

func (registry *Registry) GetRepository(name string) (Repository, error)

GetRepository returns a Repository from a connection if it supports it.

func (*Registry) HealthCheck

func (registry *Registry) HealthCheck(ctx context.Context) map[string]*HealthStatus

HealthCheck performs health checks on all connections.

func (*Registry) ListConnections

func (registry *Registry) ListConnections() []string

ListConnections returns all connection names.

func (*Registry) ListRegisteredProtocols

func (registry *Registry) ListRegisteredProtocols() []string

ListRegisteredProtocols returns all registered protocols.

func (*Registry) LoadFromConfig

func (registry *Registry) LoadFromConfig(ctx context.Context, config *Config) error

func (*Registry) RegisterFactory

func (registry *Registry) RegisterFactory(factory ConnectionFactory)

RegisterFactory registers a connection factory for a specific protocol.

func (*Registry) RemoveConnection

func (registry *Registry) RemoveConnection(ctx context.Context, name string) error

RemoveConnection removes a connection from the registry.

type Repository

type Repository interface {
	// Get retrieves a value by key
	Get(ctx context.Context, key string) ([]byte, error)

	// Set stores a value with the given key
	Set(ctx context.Context, key string, value []byte) error

	// Remove deletes a value by key
	Remove(ctx context.Context, key string) error

	// Update updates an existing value by key
	Update(ctx context.Context, key string, value []byte) error

	// Exists checks if a key exists
	Exists(ctx context.Context, key string) (bool, error)
}

Repository defines the port for data access operations. This interface will be implemented by adapters in connfx for different storage technologies.

type SQLConnection

type SQLConnection struct {
	// contains filtered or unexported fields
}

SQLConnection represents a SQL database connection.

func (*SQLConnection) Close

func (c *SQLConnection) Close(ctx context.Context) error

func (*SQLConnection) GetBehaviors

func (c *SQLConnection) GetBehaviors() []ConnectionBehavior

func (*SQLConnection) GetCapabilities

func (c *SQLConnection) GetCapabilities() []ConnectionCapability

func (*SQLConnection) GetDB

func (c *SQLConnection) GetDB() *sql.DB

GetDB returns the underlying *sql.DB instance.

func (*SQLConnection) GetProtocol

func (c *SQLConnection) GetProtocol() string

func (*SQLConnection) GetRawConnection

func (c *SQLConnection) GetRawConnection() any

func (*SQLConnection) GetState

func (c *SQLConnection) GetState() ConnectionState

func (*SQLConnection) HealthCheck

func (c *SQLConnection) HealthCheck(ctx context.Context) *HealthStatus

func (*SQLConnection) Stats

func (c *SQLConnection) Stats() sql.DBStats

Stats returns database connection statistics.

type SQLConnectionFactory

type SQLConnectionFactory struct {
	// contains filtered or unexported fields
}

SQLConnectionFactory creates SQL connections.

func NewSQLConnectionFactory

func NewSQLConnectionFactory(protocol string) *SQLConnectionFactory

NewSQLConnectionFactory creates a new SQL connection factory for a specific protocol.

func (*SQLConnectionFactory) CreateConnection

func (f *SQLConnectionFactory) CreateConnection(
	ctx context.Context,
	config *ConfigTarget,
) (Connection, error)

func (*SQLConnectionFactory) GetProtocol

func (f *SQLConnectionFactory) GetProtocol() string

type StreamEntry added in v0.7.1

type StreamEntry struct {
	Fields map[string]string `json:"fields"`
	ID     string            `json:"id"`
}

StreamEntry represents a single stream entry.

type StreamInfo added in v0.7.1

type StreamInfo struct {
	FirstEntry      *StreamEntry      `json:"first_entry,omitempty"`
	LastEntry       *StreamEntry      `json:"last_entry,omitempty"`
	Metadata        map[string]string `json:"metadata,omitempty"`
	LastGeneratedID string            `json:"last_generated_id"`
	MaxDeletedID    string            `json:"max_deleted_id"`
	RecordedFirstID string            `json:"recorded_first_id"`
	Length          int64             `json:"length"`
	RadixTreeKeys   int64             `json:"radix_tree_keys"`
	RadixTreeNodes  int64             `json:"radix_tree_nodes"`
	Groups          int64             `json:"groups"`
	EntriesAdded    int64             `json:"entries_added"`
}

StreamInfo provides information about a stream.

type TransactionContext

type TransactionContext interface {
	// Commit commits the transaction
	Commit() error

	// Rollback rolls back the transaction
	Rollback() error

	// GetRepository returns a repository bound to this transaction
	GetRepository() Repository
}

TransactionContext represents a transaction context for data operations.

type TransactionalRepository

type TransactionalRepository interface {
	Repository

	// BeginTransaction starts a new transaction
	BeginTransaction(ctx context.Context) (TransactionContext, error)
}

TransactionalRepository extends Repository with transaction support.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL