providers

package
v1.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotSQLDatabase is returned when attempting SQL operations on a non-SQL database
	ErrNotSQLDatabase = errors.New("not a SQL database")

	// ErrNotMongoDB is returned when attempting MongoDB operations on a non-MongoDB connection
	ErrNotMongoDB = errors.New("not a MongoDB connection")
)

Common errors

Functions

This section is empty.

Types

type ConnectionConfig

type ConnectionConfig interface {
	BuildDSN() (string, error)
	GetName() string
	GetType() string
	GetHost() string
	GetPort() int
	GetUser() string
	GetPassword() string
	GetDatabase() string
	GetFilePath() string
	GetConnectTimeout() time.Duration
	GetQueryTimeout() time.Duration
	GetEnableLogging() bool
	GetEnableMetrics() bool
	GetMaxOpenConns() *int
	GetMaxIdleConns() *int
	GetConnMaxLifetime() *time.Duration
	GetConnMaxIdleTime() *time.Duration
	GetReadPreference() string
}

ConnectionConfig is a minimal interface for configuration The actual implementation is in dbmanager package

type ConnectionStats

type ConnectionStats struct {
	Name              string
	Type              string // Database type as string to avoid circular dependency
	Connected         bool
	LastHealthCheck   time.Time
	HealthCheckStatus string

	// SQL connection pool stats
	OpenConnections   int
	InUse             int
	Idle              int
	WaitCount         int64
	WaitDuration      time.Duration
	MaxIdleClosed     int64
	MaxLifetimeClosed int64
}

ConnectionStats contains statistics about a database connection

type MSSQLProvider

type MSSQLProvider struct {
	// contains filtered or unexported fields
}

MSSQLProvider implements Provider for Microsoft SQL Server databases

func NewMSSQLProvider

func NewMSSQLProvider() *MSSQLProvider

NewMSSQLProvider creates a new MSSQL provider

func (*MSSQLProvider) Close

func (p *MSSQLProvider) Close() error

Close closes the MSSQL connection

func (*MSSQLProvider) Connect

func (p *MSSQLProvider) Connect(ctx context.Context, cfg ConnectionConfig) error

Connect establishes a MSSQL connection

func (*MSSQLProvider) GetMongo

func (p *MSSQLProvider) GetMongo() (*mongo.Client, error)

GetMongo returns an error for MSSQL (not a MongoDB connection)

func (*MSSQLProvider) GetNative

func (p *MSSQLProvider) GetNative() (*sql.DB, error)

GetNative returns the native *sql.DB connection

func (*MSSQLProvider) HealthCheck

func (p *MSSQLProvider) HealthCheck(ctx context.Context) error

HealthCheck verifies the MSSQL connection is alive

func (*MSSQLProvider) Stats

func (p *MSSQLProvider) Stats() *ConnectionStats

Stats returns connection pool statistics

type MongoProvider

type MongoProvider struct {
	// contains filtered or unexported fields
}

MongoProvider implements Provider for MongoDB databases

func NewMongoProvider

func NewMongoProvider() *MongoProvider

NewMongoProvider creates a new MongoDB provider

func (*MongoProvider) Close

func (p *MongoProvider) Close() error

Close closes the MongoDB connection

func (*MongoProvider) Connect

func (p *MongoProvider) Connect(ctx context.Context, cfg ConnectionConfig) error

Connect establishes a MongoDB connection

func (*MongoProvider) GetMongo

func (p *MongoProvider) GetMongo() (*mongo.Client, error)

GetMongo returns the MongoDB client

func (*MongoProvider) GetNative

func (p *MongoProvider) GetNative() (*sql.DB, error)

GetNative returns an error for MongoDB (not a SQL database)

func (*MongoProvider) HealthCheck

func (p *MongoProvider) HealthCheck(ctx context.Context) error

HealthCheck verifies the MongoDB connection is alive

func (*MongoProvider) Stats

func (p *MongoProvider) Stats() *ConnectionStats

Stats returns connection statistics for MongoDB

type NotificationHandler

type NotificationHandler func(channel string, payload string)

NotificationHandler is called when a notification is received

type PostgresListener

type PostgresListener struct {
	// contains filtered or unexported fields
}

PostgresListener manages PostgreSQL LISTEN/NOTIFY functionality

Example (Basic)

ExamplePostgresListener_basic demonstrates basic LISTEN/NOTIFY usage

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bitechdev/ResolveSpec/pkg/dbmanager"
	"github.com/bitechdev/ResolveSpec/pkg/dbmanager/providers"
)

func main() {
	// Create a connection config
	cfg := &dbmanager.ConnectionConfig{
		Name:           "example",
		Type:           dbmanager.DatabaseTypePostgreSQL,
		Host:           "localhost",
		Port:           5432,
		User:           "postgres",
		Password:       "password",
		Database:       "testdb",
		ConnectTimeout: 10 * time.Second,
		EnableLogging:  true,
	}

	// Create and connect PostgreSQL provider
	provider := providers.NewPostgresProvider()
	ctx := context.Background()

	if err := provider.Connect(ctx, cfg); err != nil {
		panic(fmt.Sprintf("Failed to connect: %v", err))
	}
	defer provider.Close()

	// Get listener
	listener, err := provider.GetListener(ctx)
	if err != nil {
		panic(fmt.Sprintf("Failed to get listener: %v", err))
	}

	// Subscribe to a channel with a handler
	err = listener.Listen("user_events", func(channel, payload string) {
		fmt.Printf("Received notification on %s: %s\n", channel, payload)
	})
	if err != nil {
		panic(fmt.Sprintf("Failed to listen: %v", err))
	}

	// Send a notification
	err = listener.Notify(ctx, "user_events", `{"event":"user_created","user_id":123}`)
	if err != nil {
		panic(fmt.Sprintf("Failed to notify: %v", err))
	}

	// Wait for notification to be processed
	time.Sleep(100 * time.Millisecond)

	// Unsubscribe from the channel
	if err := listener.Unlisten("user_events"); err != nil {
		panic(fmt.Sprintf("Failed to unlisten: %v", err))
	}
}
Example (ErrorHandling)

ExamplePostgresListener_errorHandling demonstrates error handling and reconnection

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bitechdev/ResolveSpec/pkg/dbmanager"
	"github.com/bitechdev/ResolveSpec/pkg/dbmanager/providers"
)

func main() {
	cfg := &dbmanager.ConnectionConfig{
		Name:           "example",
		Type:           dbmanager.DatabaseTypePostgreSQL,
		Host:           "localhost",
		Port:           5432,
		User:           "postgres",
		Password:       "password",
		Database:       "testdb",
		ConnectTimeout: 10 * time.Second,
		EnableLogging:  true,
	}

	provider := providers.NewPostgresProvider()
	ctx := context.Background()

	if err := provider.Connect(ctx, cfg); err != nil {
		panic(fmt.Sprintf("Failed to connect: %v", err))
	}
	defer provider.Close()

	listener, err := provider.GetListener(ctx)
	if err != nil {
		panic(fmt.Sprintf("Failed to get listener: %v", err))
	}

	// The listener automatically reconnects if the connection is lost
	// Subscribe with error handling in the callback
	err = listener.Listen("critical_events", func(channel, payload string) {
		defer func() {
			if r := recover(); r != nil {
				fmt.Printf("Handler panic recovered: %v\n", r)
			}
		}()

		// Process the event
		fmt.Printf("Processing critical event: %s\n", payload)

		// If processing fails, the panic will be caught by the defer above
		// The listener will continue to function normally
	})

	if err != nil {
		fmt.Printf("Failed to listen: %v\n", err)
		return
	}

	// Check if listener is connected
	if listener.IsConnected() {
		fmt.Println("Listener is connected and ready")
	}

	// Send a notification
	listener.Notify(ctx, "critical_events", "system_alert")

	time.Sleep(100 * time.Millisecond)
}
Example (MultipleChannels)

ExamplePostgresListener_multipleChannels demonstrates listening to multiple channels

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bitechdev/ResolveSpec/pkg/dbmanager"
	"github.com/bitechdev/ResolveSpec/pkg/dbmanager/providers"
)

func main() {
	cfg := &dbmanager.ConnectionConfig{
		Name:           "example",
		Type:           dbmanager.DatabaseTypePostgreSQL,
		Host:           "localhost",
		Port:           5432,
		User:           "postgres",
		Password:       "password",
		Database:       "testdb",
		ConnectTimeout: 10 * time.Second,
		EnableLogging:  false,
	}

	provider := providers.NewPostgresProvider()
	ctx := context.Background()

	if err := provider.Connect(ctx, cfg); err != nil {
		panic(fmt.Sprintf("Failed to connect: %v", err))
	}
	defer provider.Close()

	listener, err := provider.GetListener(ctx)
	if err != nil {
		panic(fmt.Sprintf("Failed to get listener: %v", err))
	}

	// Listen to multiple channels
	channels := []string{"orders", "payments", "notifications"}
	for _, ch := range channels {
		channel := ch // Capture for closure
		err := listener.Listen(channel, func(ch, payload string) {
			fmt.Printf("[%s] %s\n", ch, payload)
		})
		if err != nil {
			panic(fmt.Sprintf("Failed to listen on %s: %v", channel, err))
		}
	}

	// Send notifications to different channels
	listener.Notify(ctx, "orders", "New order #12345")
	listener.Notify(ctx, "payments", "Payment received $99.99")
	listener.Notify(ctx, "notifications", "Welcome email sent")

	// Wait for notifications
	time.Sleep(200 * time.Millisecond)

	// Check active channels
	activeChannels := listener.Channels()
	fmt.Printf("Listening to %d channels: %v\n", len(activeChannels), activeChannels)
}
Example (WithDBManager)

ExamplePostgresListener_withDBManager demonstrates usage with DBManager

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bitechdev/ResolveSpec/pkg/dbmanager"
	"github.com/bitechdev/ResolveSpec/pkg/dbmanager/providers"
)

func main() {
	// This example shows how to use the listener with the full DBManager

	// Assume we have a DBManager instance and get a connection
	// conn, _ := dbMgr.Get("primary")

	// Get the underlying provider (this would need to be exposed via the Connection interface)
	// For now, this is a conceptual example

	ctx := context.Background()

	// Create provider directly for demonstration
	cfg := &dbmanager.ConnectionConfig{
		Name:           "primary",
		Type:           dbmanager.DatabaseTypePostgreSQL,
		Host:           "localhost",
		Port:           5432,
		User:           "postgres",
		Password:       "password",
		Database:       "myapp",
		ConnectTimeout: 10 * time.Second,
	}

	provider := providers.NewPostgresProvider()
	if err := provider.Connect(ctx, cfg); err != nil {
		panic(err)
	}
	defer provider.Close()

	// Get listener
	listener, err := provider.GetListener(ctx)
	if err != nil {
		panic(err)
	}

	// Subscribe to application events
	listener.Listen("cache_invalidation", func(channel, payload string) {
		fmt.Printf("Cache invalidation request: %s\n", payload)
		// Handle cache invalidation logic here
	})

	listener.Listen("config_reload", func(channel, payload string) {
		fmt.Printf("Configuration reload request: %s\n", payload)
		// Handle configuration reload logic here
	})

	// Simulate receiving notifications
	listener.Notify(ctx, "cache_invalidation", "user:123")
	listener.Notify(ctx, "config_reload", "database")

	time.Sleep(100 * time.Millisecond)
}

func NewPostgresListener

func NewPostgresListener(cfg ConnectionConfig) *PostgresListener

NewPostgresListener creates a new PostgreSQL listener

func (*PostgresListener) Channels

func (l *PostgresListener) Channels() []string

Channels returns the list of channels currently being listened to

func (*PostgresListener) Close

func (l *PostgresListener) Close() error

Close closes the listener and all subscriptions

func (*PostgresListener) Connect

func (l *PostgresListener) Connect(ctx context.Context) error

Connect establishes a dedicated connection for listening

func (*PostgresListener) IsConnected

func (l *PostgresListener) IsConnected() bool

IsConnected returns true if the listener is connected

func (*PostgresListener) Listen

func (l *PostgresListener) Listen(channel string, handler NotificationHandler) error

Listen subscribes to a PostgreSQL notification channel

func (*PostgresListener) Notify

func (l *PostgresListener) Notify(ctx context.Context, channel string, payload string) error

Notify sends a notification to a PostgreSQL channel

func (*PostgresListener) Unlisten

func (l *PostgresListener) Unlisten(channel string) error

Unlisten unsubscribes from a PostgreSQL notification channel

type PostgresProvider

type PostgresProvider struct {
	// contains filtered or unexported fields
}

PostgresProvider implements Provider for PostgreSQL databases

func NewPostgresProvider

func NewPostgresProvider() *PostgresProvider

NewPostgresProvider creates a new PostgreSQL provider

func (*PostgresProvider) Close

func (p *PostgresProvider) Close() error

Close closes the PostgreSQL connection

func (*PostgresProvider) Connect

func (p *PostgresProvider) Connect(ctx context.Context, cfg ConnectionConfig) error

Connect establishes a PostgreSQL connection

func (*PostgresProvider) GetListener

func (p *PostgresProvider) GetListener(ctx context.Context) (*PostgresListener, error)

GetListener returns a PostgreSQL listener for NOTIFY/LISTEN functionality The listener is lazily initialized on first call and reused for subsequent calls

func (*PostgresProvider) GetMongo

func (p *PostgresProvider) GetMongo() (*mongo.Client, error)

GetMongo returns an error for PostgreSQL (not a MongoDB connection)

func (*PostgresProvider) GetNative

func (p *PostgresProvider) GetNative() (*sql.DB, error)

GetNative returns the native *sql.DB connection

func (*PostgresProvider) HealthCheck

func (p *PostgresProvider) HealthCheck(ctx context.Context) error

HealthCheck verifies the PostgreSQL connection is alive

func (*PostgresProvider) Stats

func (p *PostgresProvider) Stats() *ConnectionStats

Stats returns connection pool statistics

type Provider

type Provider interface {
	// Connect establishes the database connection
	Connect(ctx context.Context, cfg ConnectionConfig) error

	// Close closes the connection
	Close() error

	// HealthCheck verifies the connection is alive
	HealthCheck(ctx context.Context) error

	// GetNative returns the native *sql.DB (SQL databases only)
	// Returns an error for non-SQL databases
	GetNative() (*sql.DB, error)

	// GetMongo returns the MongoDB client (MongoDB only)
	// Returns an error for non-MongoDB databases
	GetMongo() (*mongo.Client, error)

	// Stats returns connection statistics
	Stats() *ConnectionStats
}

Provider creates and manages the underlying database connection

type SQLiteProvider

type SQLiteProvider struct {
	// contains filtered or unexported fields
}

SQLiteProvider implements Provider for SQLite databases

func NewSQLiteProvider

func NewSQLiteProvider() *SQLiteProvider

NewSQLiteProvider creates a new SQLite provider

func (*SQLiteProvider) Close

func (p *SQLiteProvider) Close() error

Close closes the SQLite connection

func (*SQLiteProvider) Connect

func (p *SQLiteProvider) Connect(ctx context.Context, cfg ConnectionConfig) error

Connect establishes a SQLite connection

func (*SQLiteProvider) GetMongo

func (p *SQLiteProvider) GetMongo() (*mongo.Client, error)

GetMongo returns an error for SQLite (not a MongoDB connection)

func (*SQLiteProvider) GetNative

func (p *SQLiteProvider) GetNative() (*sql.DB, error)

GetNative returns the native *sql.DB connection

func (*SQLiteProvider) HealthCheck

func (p *SQLiteProvider) HealthCheck(ctx context.Context) error

HealthCheck verifies the SQLite connection is alive

func (*SQLiteProvider) Stats

func (p *SQLiteProvider) Stats() *ConnectionStats

Stats returns connection pool statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL