poolmanager

package
v2.7.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package poolmanager provides multi-tenant support for Midaz services.

This package offers utilities for managing tenant context, validation, and error handling in multi-tenant applications. It provides:

  • Tenant context key for request-scoped tenant identification
  • Standard tenant-related errors for consistent error handling
  • Tenant isolation utilities to prevent cross-tenant data access
  • Connection pool management for PostgreSQL, MongoDB, Valkey, and RabbitMQ

Index

Constants

View Source
const (
	// PackageName is the name of this package, used for logging and identification.
	PackageName = "tenants"
)
View Source
const (
	// TenantIDHeader is the AMQP header key used to store the tenant identifier.
	// This header is automatically injected by TenantRabbitMQPublisher and can be
	// extracted using ExtractTenantFromMessage.
	TenantIDHeader = "X-Tenant-ID"
)

Variables

View Source
var (
	// ErrTenantNotFound indicates that the requested tenant does not exist.
	ErrTenantNotFound = errors.New("tenant not found")

	// ErrTenantAlreadyExists indicates that a tenant with the given identifier already exists.
	ErrTenantAlreadyExists = errors.New("tenant already exists")

	// ErrInvalidTenantID indicates that the provided tenant ID is invalid or malformed.
	ErrInvalidTenantID = errors.New("invalid tenant ID")

	// ErrTenantContextMissing indicates that tenant context is required but not present.
	ErrTenantContextMissing = errors.New("tenant context missing")

	// ErrTenantInactive indicates that the tenant exists but is in an inactive state.
	ErrTenantInactive = errors.New("tenant is inactive")

	// ErrTenantSuspended indicates that the tenant has been suspended.
	ErrTenantSuspended = errors.New("tenant is suspended")

	// ErrTenantDeleted indicates that the tenant has been deleted.
	ErrTenantDeleted = errors.New("tenant is deleted")

	// ErrCrossTenantAccess indicates an attempt to access resources belonging to another tenant.
	ErrCrossTenantAccess = errors.New("cross-tenant access denied")

	// ErrTenantIsolationViolation indicates a breach of tenant isolation boundaries.
	ErrTenantIsolationViolation = errors.New("tenant isolation violation")

	// ErrTenantConnectionRequired is returned when multi-tenant mode is enabled
	// but no tenant connection was found in context. This is a critical error
	// that prevents data from being written to the wrong database.
	ErrTenantConnectionRequired = errors.New("tenant connection required but not found in context")

	// ErrNoConnectionAvailable is returned when no database connection is available.
	ErrNoConnectionAvailable = errors.New("no database connection available")
)

Tenant-related errors for consistent error handling across services.

Functions

func ExtractTenantFromMessage

func ExtractTenantFromMessage(msg amqp.Delivery) (string, error)

ExtractTenantFromMessage extracts the tenant ID from an AMQP message's headers. Returns the tenant ID if the X-Tenant-ID header exists and is a non-empty string. Returns an error if:

  • The message headers are nil
  • The X-Tenant-ID header is missing
  • The header value is not a string
  • The header value is empty or contains only whitespace

Example:

func handleMessage(msg amqp.Delivery) error {
    tenantID, err := ExtractTenantFromMessage(msg)
    if err != nil {
        return fmt.Errorf("failed to extract tenant: %w", err)
    }
    // Use tenantID for tenant-scoped operations
}

func GetDBForTenant

func GetDBForTenant(ctx context.Context, defaultConn *libPostgres.PostgresConnection) (dbresolver.DB, error)

GetDBForTenant returns the appropriate database connection based on multi-tenant mode.

When multi-tenant is ENABLED (tenant ID in context):

  • Returns the tenant-specific connection from context
  • Returns ERROR if tenant connection not found (NO fallback to default!)
  • This prevents data being written to wrong database

When multi-tenant is DISABLED (no tenant ID in context):

  • Returns the default connection (single-tenant mode)

Usage in repositories:

func (r *Repository) Create(ctx context.Context, entity *Entity) error {
    db, err := poolmanager.GetDBForTenant(ctx, r.connection)
    if err != nil {
        return err // Don't proceed without correct connection!
    }
    // Use db for queries...
}

func GetDBFromContext

func GetDBFromContext(ctx context.Context, dbType DBType) interface{}

GetDBFromContext returns the database configuration for the specified type from context. It supports both standard context.Context and *fiber.Ctx. Returns nil if the context is nil, the database type is unknown, or no configuration is found.

Example:

pgConfig := GetDBFromContext(ctx, DBTypePostgreSQL)
if pgConfig != nil {
    config := pgConfig.(*PostgreSQLConfig)
    // Use config...
}

func GetTenantID

func GetTenantID(ctx context.Context) string

GetTenantID retrieves the tenant ID from the context. Returns an empty string if not found.

func GetTenantIDFromFiber

func GetTenantIDFromFiber(c *fiber.Ctx) string

GetTenantIDFromFiber extracts the tenant ID from a Fiber context. Returns an empty string if the context is nil or no tenant ID is found.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    tenantID := GetTenantIDFromFiber(c)
    if tenantID == "" {
        return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "no tenant"})
    }
    // Use tenantID...
})

func GetTenantPGConnection

func GetTenantPGConnection(ctx context.Context) (dbresolver.DB, bool)

GetTenantPGConnection retrieves the tenant-specific PostgreSQL connection from the context. Returns the connection and true if found, otherwise nil and false. This returns an actual database connection, not just configuration.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    conn, ok := GetTenantPGConnection(c.UserContext())
    if !ok {
        return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "no database connection"})
    }
    // Use conn for database operations...
})

func GetTenantPGConnectionFromFiber

func GetTenantPGConnectionFromFiber(c *fiber.Ctx) (dbresolver.DB, bool)

GetTenantPGConnectionFromFiber extracts the tenant PostgreSQL connection from a Fiber context. Returns the connection and true if found, otherwise nil and false.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    conn, ok := GetTenantPGConnectionFromFiber(c)
    if !ok {
        // Fall back to config-based connection
    }
    // Use conn...
})

func IsMultiTenantContext

func IsMultiTenantContext(ctx context.Context) bool

IsMultiTenantContext checks if the request is in multi-tenant mode. This is set by the middleware when it processes a request with tenant context.

func WithAllTenantContext

func WithAllTenantContext(
	ctx context.Context,
	tenantID string,
	tenantConfig *TenantConfig,
	pgConfig *PostgreSQLConfig,
	mongoConfig *MongoDBConfig,
	valkeyConfig *ValkeyConfig,
	rabbitConfig *RabbitMQConfig,
) context.Context

WithAllTenantContext returns a new context with all tenant-related values set. This is a convenience function for testing scenarios where full tenant context is needed. Individual configs can be nil if not needed.

Example:

ctx := WithAllTenantContext(
    context.Background(),
    "tenant-123",
    tenantConfig,
    pgConfig,
    mongoConfig,
    valkeyConfig,
    rabbitConfig,
)

func WithTenantConfig

func WithTenantConfig(ctx context.Context, config *TenantConfig) context.Context

WithTenantConfig returns a new context with the tenant configuration set. This is useful for testing and for propagating tenant context in workers.

Example:

config := &TenantConfig{ID: "tenant-123", Status: "active"}
ctx := WithTenantConfig(context.Background(), config)
retrieved := GetTenantConfig(ctx) // Returns config

func WithTenantID

func WithTenantID(ctx context.Context, tenantID string) context.Context

WithTenantID returns a new context with the tenant ID set. This is useful for testing and for propagating tenant context in workers.

Example:

ctx := WithTenantID(context.Background(), "tenant-123")
tenantID := GetTenantID(ctx) // Returns "tenant-123"

func WithTenantMongo

func WithTenantMongo(ctx context.Context, config *MongoDBConfig) context.Context

WithTenantMongo returns a new context with the MongoDB configuration set. This is useful for testing and for propagating tenant context in workers.

Example:

mongoConfig := &MongoDBConfig{URI: "mongodb://localhost:27017", Database: "test"}
ctx := WithTenantMongo(context.Background(), mongoConfig)
retrieved := GetTenantMongoDB(ctx) // Returns mongoConfig

func WithTenantMongoDatabase

func WithTenantMongoDatabase(ctx context.Context, db *TenantDatabase) context.Context

WithTenantMongoDatabase returns a new context with the tenant MongoDB database set. This is useful for testing and for propagating tenant connections in workers.

Example:

ctx := WithTenantMongoDatabase(context.Background(), tenantDB)
db, _ := GetTenantMongoDatabase(ctx) // Returns tenantDB

func WithTenantPG

func WithTenantPG(ctx context.Context, config *PostgreSQLConfig) context.Context

WithTenantPG returns a new context with the PostgreSQL configuration set. This is useful for testing and for propagating tenant context in workers.

Example:

pgConfig := &PostgreSQLConfig{Host: "localhost", Port: 5432}
ctx := WithTenantPG(context.Background(), pgConfig)
retrieved := GetTenantPostgreSQL(ctx) // Returns pgConfig

func WithTenantPGConnection

func WithTenantPGConnection(ctx context.Context, conn dbresolver.DB) context.Context

WithTenantPGConnection returns a new context with the tenant PostgreSQL connection set. This is useful for testing and for propagating tenant connections in workers.

Example:

ctx := WithTenantPGConnection(context.Background(), dbConn)
conn, _ := GetTenantPGConnection(ctx) // Returns dbConn

func WithTenantRabbitMQ

func WithTenantRabbitMQ(ctx context.Context, config *RabbitMQConfig) context.Context

WithTenantRabbitMQ returns a new context with the RabbitMQ configuration set. This is useful for testing and for propagating tenant context in workers.

Example:

rabbitConfig := &RabbitMQConfig{URL: "amqp://localhost:5672"}
ctx := WithTenantRabbitMQ(context.Background(), rabbitConfig)
retrieved := GetTenantRabbitMQ(ctx) // Returns rabbitConfig

func WithTenantValkey

func WithTenantValkey(ctx context.Context, config *ValkeyConfig) context.Context

WithTenantValkey returns a new context with the Valkey configuration set. This is useful for testing and for propagating tenant context in workers.

Example:

valkeyConfig := &ValkeyConfig{Addresses: []string{"localhost:6379"}}
ctx := WithTenantValkey(context.Background(), valkeyConfig)
retrieved := GetTenantValkey(ctx) // Returns valkeyConfig

Types

type AMQPChannel

type AMQPChannel interface {
	// PublishWithContext sends a message to an exchange with the given routing key.
	PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
	// IsClosed returns true if the channel is marked as closed.
	IsClosed() bool
	// Close closes the channel.
	Close() error
}

AMQPChannel defines the interface for AMQP channel operations needed by the publisher. This interface allows for testing with mock implementations.

type Config

type Config struct {
	// Enabled controls whether tenant functionality is active.
	Enabled bool

	// ApplicationName identifies the application using tenant services.
	ApplicationName string

	// PoolManagerURL is the URL of the pool manager API.
	PoolManagerURL string

	// CacheTTL is the duration for caching tenant data.
	// Default: 24h
	CacheTTL time.Duration

	// TenantClaimKey is the JWT claim key used to extract tenant ID.
	// Default: "tenantId"
	TenantClaimKey string

	// Pool contains connection pool configuration.
	Pool PoolConfig
}

Config holds the configuration for tenant management.

func ConfigFromEnv

func ConfigFromEnv() *Config

ConfigFromEnv creates a Config populated from environment variables. Uses default values when environment variables are not set or invalid.

Environment variables:

  • TENANT_ENABLED: boolean (default: true)
  • TENANT_APPLICATION_NAME: string
  • POOL_MANAGER_URL: string
  • TENANT_CACHE_TTL: duration string (default: 24h)
  • TENANT_CLAIM_KEY: string (default: "tenantId")
  • TENANT_POOL_MAX_SIZE: integer (default: 100)
  • TENANT_POOL_IDLE_TIMEOUT: duration string (default: 30m)

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible default values.

func (*Config) String

func (c *Config) String() string

String returns a human-readable string representation of the Config.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the Config is valid. Returns an error if validation fails.

type ConnectionConfig

type ConnectionConfig struct {
	// MaxOpenConnections sets the maximum number of open connections per tenant.
	// Default: 25
	MaxOpenConnections int

	// MaxIdleConnections sets the maximum number of idle connections per tenant.
	// Default: 5
	MaxIdleConnections int
}

ConnectionConfig holds connection pool settings for tenant databases.

type ContextKey

type ContextKey string

ContextKey is the type used for context keys in tenant operations. It provides type safety and prevents collisions with other context keys.

const (
	// TenantIDContextKey is the key for storing tenant ID in context.
	TenantIDContextKey ContextKey = "tenantId"

	// TenantConfigContextKey is the key for storing tenant configuration in context.
	TenantConfigContextKey ContextKey = "tenant_config"

	// TenantPGContextKey is the key for storing PostgreSQL configuration in context.
	TenantPGContextKey ContextKey = "tenant_pg"

	// TenantMongoContextKey is the key for storing MongoDB configuration in context.
	TenantMongoContextKey ContextKey = "tenant_mongo"

	// TenantValkeyContextKey is the key for storing Valkey configuration in context.
	TenantValkeyContextKey ContextKey = "tenant_valkey"

	// TenantRabbitMQContextKey is the key for storing RabbitMQ configuration in context.
	TenantRabbitMQContextKey ContextKey = "tenant_rabbitmq"

	// TenantPGConnectionContextKey is the key for storing actual PostgreSQL connection in context.
	TenantPGConnectionContextKey ContextKey = "tenant_pg_connection"

	// TenantMongoDBConnectionContextKey is the key for storing actual MongoDB database in context.
	TenantMongoDBConnectionContextKey ContextKey = "tenant_mongo_connection"
)

Context keys for storing tenant information. These are exported to allow external packages to reference them if needed, while the actual context operations should use the helper functions.

type DBType

type DBType string

DBType represents the type of database.

const (
	// DBTypePostgreSQL represents PostgreSQL database type.
	DBTypePostgreSQL DBType = "postgresql"

	// DBTypeMongoDB represents MongoDB database type.
	DBTypeMongoDB DBType = "mongodb"
)

Database type constants.

type DatabaseServices

type DatabaseServices struct {
	PostgreSQL *PostgreSQLConfig `json:"postgresql,omitempty"`
	MongoDB    *MongoDBConfig    `json:"mongodb,omitempty"`
}

DatabaseServices holds database configurations for a service.

type Extractor

type Extractor interface {
	// ExtractFromJWT extracts the tenant ID from a JWT token's claims.
	// It parses the token payload and retrieves the value of the configured claim key.
	// Returns an error if the token is invalid or the claim is missing/empty.
	ExtractFromJWT(token string) (string, error)

	// ExtractFromContext extracts the tenant ID from the context.
	// It looks for the TenantIDContextKey in the context and returns its value.
	// Returns an error if the context is nil or the tenant ID is missing/invalid.
	ExtractFromContext(ctx context.Context) (string, error)
}

Extractor defines the interface for extracting tenant IDs from various sources.

func NewExtractor

func NewExtractor(claimKey string, opts ...ExtractorOption) Extractor

NewExtractor creates a new Extractor with the specified claim key. If claimKey is empty, it defaults to "tenantId".

type ExtractorOption

type ExtractorOption func(*extractorImpl)

ExtractorOption is a function that configures an Extractor.

func WithExtractorLogger

func WithExtractorLogger(logger libLog.Logger) ExtractorOption

WithExtractorLogger sets the logger for the extractor.

type Middleware

type Middleware interface {
	// Handler returns the Fiber middleware handler.
	Handler() fiber.Handler
	// IsEnabled returns whether the middleware is enabled.
	IsEnabled() bool
}

Middleware defines the interface for the multi-tenant middleware.

func NewMiddleware

func NewMiddleware(config *Config, resolver Resolver, opts ...MiddlewareOption) Middleware

NewMiddleware creates a new tenant middleware instance. Returns nil if config or resolver is nil.

type MiddlewareOption

type MiddlewareOption func(*middlewareImpl)

MiddlewareOption is a function that configures the middleware.

func WithMiddlewareLogger

func WithMiddlewareLogger(logger libLog.Logger) MiddlewareOption

WithMiddlewareLogger sets the logger for the middleware.

func WithMongoPoolManager

func WithMongoPoolManager(mongoPoolMgr MongoPoolManager) MiddlewareOption

WithMongoPoolManager sets the MongoDB pool manager for the middleware. When set, the middleware will inject actual database connections into the context instead of just configuration. This enables handlers to use GetTenantMongoDatabase() to get tenant-specific MongoDB database handles.

func WithPostgresPoolManager

func WithPostgresPoolManager(pgPoolMgr PostgresPoolManager) MiddlewareOption

WithPostgresPoolManager sets the PostgreSQL pool manager for the middleware. When set, the middleware will inject actual database connections into the context instead of just configuration. This enables handlers to use GetTenantPGConnection() to get tenant-specific database connections.

func WithSkipPaths

func WithSkipPaths(paths ...string) MiddlewareOption

WithSkipPaths adds custom paths to the skip list. These paths will bypass tenant validation.

type MongoDBConfig

type MongoDBConfig struct {
	URI      string `json:"uri"`
	Database string `json:"database"`
}

MongoDBConfig holds MongoDB connection configuration.

func GetTenantMongoDB

func GetTenantMongoDB(ctx context.Context) *MongoDBConfig

GetTenantMongoDB retrieves the MongoDB configuration from the context. Returns nil if not found. Checks both the internal middleware key and the exported TenantMongoContextKey for compatibility.

func GetTenantMongoFromFiber

func GetTenantMongoFromFiber(c *fiber.Ctx) *MongoDBConfig

GetTenantMongoFromFiber extracts the MongoDB configuration from a Fiber context. Returns nil if the context is nil or no MongoDB configuration is found.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    mongoConfig := GetTenantMongoFromFiber(c)
    if mongoConfig == nil {
        return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "no database"})
    }
    // Use mongoConfig...
})

type MongoPoolManager

type MongoPoolManager interface {
	// GetClient returns a MongoDB client for the specified tenant and application.
	// In database mode: returns a dedicated client per tenant.
	// In schema mode: returns a shared client (multiple tenants share the same client).
	GetClient(ctx context.Context, tenantID, applicationName string) (*mongo.Client, error)

	// GetDatabase returns a TenantDatabase for the specified tenant and application.
	// TenantDatabase wraps mongo.Database and applies collection prefix for schema mode.
	// In database mode: collection prefix is empty (no prefixing).
	// In schema mode: collection prefix is "tenant_{sanitized_tenant_id}_".
	GetDatabase(ctx context.Context, tenantID, applicationName string) (*TenantDatabase, error)

	// GetDefaultConnection returns the default (single-tenant) MongoConnection.
	// Use this when operating in single-tenant mode.
	GetDefaultConnection() *libMongo.MongoConnection

	// CloseClient closes the MongoDB client for a specific tenant and application.
	CloseClient(tenantID, applicationName string) error

	// CloseAll closes all managed MongoDB clients and stops background cleanup.
	CloseAll(ctx context.Context) error

	// Stats returns statistics for all managed clients.
	Stats() map[string]MongoPoolStats
}

MongoPoolManager defines the interface for managing MongoDB client pools across multiple tenants with support for different isolation modes. It leverages the existing commons/mongo infrastructure for connection management.

func NewMongoPoolManager

func NewMongoPoolManager(resolver Resolver, opts ...MongoPoolManagerOption) MongoPoolManager

NewMongoPoolManager creates a new MongoPoolManager with the given resolver and options.

func NewMongoPoolManagerWithConfig

func NewMongoPoolManagerWithConfig(cfg MongoPoolManagerConfig) (MongoPoolManager, error)

NewMongoPoolManagerWithConfig creates a new MongoPoolManager with explicit configuration. This is the recommended constructor for production use.

type MongoPoolManagerConfig

type MongoPoolManagerConfig struct {
	// DefaultConnection is the default MongoConnection for single-tenant/schema mode.
	// Required for schema mode, optional for database-only mode.
	DefaultConnection *libMongo.MongoConnection

	// Resolver is the tenant resolver for fetching tenant configurations.
	// Required.
	Resolver Resolver

	// Logger is the logger instance.
	// Required.
	Logger libLog.Logger

	// MaxConnections is the maximum number of tenant connections to maintain.
	// Default: 100
	MaxConnections int

	// IdleTimeout is the duration after which idle connections are closed.
	// Default: 30 minutes
	IdleTimeout time.Duration

	// CleanupInterval is the interval for the background cleanup goroutine.
	// Default: 5 minutes
	CleanupInterval time.Duration

	// MaxPoolSize sets the maximum number of connections in each MongoDB pool.
	// Default: 100
	MaxPoolSize uint64
}

MongoPoolManagerConfig holds configuration for the pool manager.

type MongoPoolManagerOption

type MongoPoolManagerOption func(*mongoPoolManagerImpl)

MongoPoolManagerOption is a function that configures a MongoPoolManager.

func WithMongoCleanupInterval

func WithMongoCleanupInterval(interval time.Duration) MongoPoolManagerOption

WithMongoCleanupInterval sets the interval for the background cleanup goroutine. Default is 5 minutes.

func WithMongoDefaultConnection

func WithMongoDefaultConnection(conn *libMongo.MongoConnection) MongoPoolManagerOption

WithMongoDefaultConnection sets the default MongoConnection for schema mode.

func WithMongoIdleTimeout

func WithMongoIdleTimeout(timeout time.Duration) MongoPoolManagerOption

WithMongoIdleTimeout sets the duration after which idle clients are closed. Default is 30 minutes.

func WithMongoLogger

func WithMongoLogger(logger libLog.Logger) MongoPoolManagerOption

WithMongoLogger sets the logger for the pool manager.

func WithMongoMaxClients

func WithMongoMaxClients(maxClients int) MongoPoolManagerOption

WithMongoMaxClients sets the maximum number of MongoDB clients. Default is 100.

func WithMongoMaxPoolSize

func WithMongoMaxPoolSize(size uint64) MongoPoolManagerOption

WithMongoMaxPoolSize sets the maximum pool size for new MongoDB connections. Default is 100.

type MongoPoolStats

type MongoPoolStats struct {
	TenantID        string
	ApplicationName string
	IsolationMode   string
	CreatedAt       time.Time
	LastUsedAt      time.Time
	DatabaseName    string
}

MongoPoolStats contains statistics for a MongoDB client.

type PoolConfig

type PoolConfig struct {
	// MaxSize is the maximum number of connections in the pool.
	// Default: 100
	MaxSize int

	// IdleTimeout is the duration after which idle connections are closed.
	// Default: 30m
	IdleTimeout time.Duration
}

PoolConfig holds connection pool configuration for tenant connections.

func (*PoolConfig) ApplyDefaults

func (p *PoolConfig) ApplyDefaults()

ApplyDefaults sets default values for zero-valued fields in PoolConfig.

type PoolManagerOption

type PoolManagerOption func(*postgresPoolManagerImpl)

PoolManagerOption is a function that configures a PostgresPoolManager.

func WithCleanupInterval

func WithCleanupInterval(interval time.Duration) PoolManagerOption

WithCleanupInterval sets the interval for the background cleanup goroutine. Default is 5 minutes.

func WithConnectionConfig

func WithConnectionConfig(config ConnectionConfig) PoolManagerOption

WithConnectionConfig sets the connection configuration for tenant databases.

func WithDefaultConnection

func WithDefaultConnection(conn *libPostgres.PostgresConnection) PoolManagerOption

WithDefaultConnection sets the default PostgresConnection for schema mode.

func WithIdleTimeout

func WithIdleTimeout(timeout time.Duration) PoolManagerOption

WithIdleTimeout sets the duration after which idle pools are closed. Default is 30 minutes.

func WithLogger

func WithLogger(logger libLog.Logger) PoolManagerOption

WithLogger sets the logger for the pool manager.

func WithMaxPools

func WithMaxPools(maxPools int) PoolManagerOption

WithMaxPools sets the maximum number of connection pools. Default is 100.

type PoolStats

type PoolStats struct {
	TenantID        string
	ApplicationName string
	IsolationMode   string
	CreatedAt       time.Time
	LastUsedAt      time.Time
	OpenConnections int
	IdleConnections int
}

PoolStats contains statistics for a connection pool.

type PostgreSQLConfig

type PostgreSQLConfig struct {
	Host     string `json:"host"`
	Port     int    `json:"port"`
	Database string `json:"database"`
	Username string `json:"username"`
	Password string `json:"password"`
	SSLMode  string `json:"sslmode,omitempty"`
}

PostgreSQLConfig holds PostgreSQL connection configuration.

func GetTenantPGFromFiber

func GetTenantPGFromFiber(c *fiber.Ctx) *PostgreSQLConfig

GetTenantPGFromFiber extracts the PostgreSQL configuration from a Fiber context. Returns nil if the context is nil or no PostgreSQL configuration is found.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    pgConfig := GetTenantPGFromFiber(c)
    if pgConfig == nil {
        return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "no database"})
    }
    // Use pgConfig...
})

func GetTenantPostgreSQL

func GetTenantPostgreSQL(ctx context.Context) *PostgreSQLConfig

GetTenantPostgreSQL retrieves the PostgreSQL configuration from the context. Returns nil if not found. Checks both the internal middleware key and the exported TenantPGContextKey for compatibility.

type PostgresPoolManager

type PostgresPoolManager interface {
	// GetConnection returns a database connection for the specified tenant and application.
	// In database mode: returns a connection from a dedicated PostgresConnection per tenant.
	// In schema mode: returns the default connection (caller must SET search_path).
	GetConnection(ctx context.Context, tenantID, applicationName string) (dbresolver.DB, error)

	// GetSchemaConnection returns a connection with search_path set for schema mode.
	// This is a convenience method that gets a connection and sets the search_path.
	GetSchemaConnection(ctx context.Context, tenantID, applicationName string) (dbresolver.DB, error)

	// GetDefaultConnection returns the default (single-tenant) connection.
	// Use this when operating in single-tenant mode.
	GetDefaultConnection() (dbresolver.DB, error)

	// ClosePool closes the connection pool for a specific tenant and application.
	ClosePool(tenantID, applicationName string) error

	// CloseTenant closes all connection pools associated with a tenant.
	CloseTenant(tenantID string) error

	// CloseAll closes all managed connection pools and stops background cleanup.
	CloseAll() error

	// Stats returns statistics for all managed pools.
	Stats() map[string]PoolStats
}

PostgresPoolManager defines the interface for managing PostgreSQL connection pools across multiple tenants with support for different isolation modes. It leverages the existing commons/postgres infrastructure for connection management.

func NewPostgresPoolManager

func NewPostgresPoolManager(resolver Resolver, opts ...PoolManagerOption) PostgresPoolManager

NewPostgresPoolManager creates a new PostgresPoolManager with the given resolver and options.

func NewPostgresPoolManagerWithConfig

func NewPostgresPoolManagerWithConfig(cfg PostgresPoolManagerConfig) (PostgresPoolManager, error)

NewPostgresPoolManagerWithConfig creates a new PostgresPoolManager with explicit configuration. This is the recommended constructor for production use.

type PostgresPoolManagerConfig

type PostgresPoolManagerConfig struct {
	// DefaultConnection is the default PostgresConnection for single-tenant/schema mode.
	// Required for schema mode, optional for database-only mode.
	DefaultConnection *libPostgres.PostgresConnection

	// Resolver is the tenant resolver for fetching tenant configurations.
	// Required.
	Resolver Resolver

	// Logger is the logger instance.
	// Required.
	Logger libLog.Logger

	// MaxConnections is the maximum number of tenant connections to maintain.
	// Default: 100
	MaxConnections int

	// IdleTimeout is the duration after which idle connections are closed.
	// Default: 30 minutes
	IdleTimeout time.Duration

	// CleanupInterval is the interval for the background cleanup goroutine.
	// Default: 5 minutes
	CleanupInterval time.Duration

	// ConnectionConfig holds shared connection settings for tenant connections.
	ConnectionConfig ConnectionConfig
}

PostgresPoolManagerConfig holds configuration for the pool manager.

type RabbitMQConfig

type RabbitMQConfig struct {
	URL string `json:"url"`
}

RabbitMQConfig holds RabbitMQ connection configuration.

func GetTenantRabbitMQ

func GetTenantRabbitMQ(ctx context.Context) *RabbitMQConfig

GetTenantRabbitMQ retrieves the RabbitMQ configuration from the context. Returns nil if not found.

func GetTenantRabbitMQFromFiber

func GetTenantRabbitMQFromFiber(c *fiber.Ctx) *RabbitMQConfig

GetTenantRabbitMQFromFiber extracts the RabbitMQ configuration from a Fiber context. Returns nil if the context is nil or no RabbitMQ configuration is found.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    rabbitConfig := GetTenantRabbitMQFromFiber(c)
    if rabbitConfig == nil {
        return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "no message queue"})
    }
    // Use rabbitConfig...
})

type Resolver

type Resolver interface {
	// Resolve retrieves the tenant configuration for the given tenant ID.
	// It returns the cached result if available and not expired.
	Resolve(ctx context.Context, tenantID string) (*TenantConfig, error)

	// ResolveWithService retrieves the tenant configuration filtered by service name.
	// It uses the service query parameter to filter the response.
	ResolveWithService(ctx context.Context, tenantID, serviceName string) (*TenantConfig, error)

	// InvalidateCache removes the cached configuration for a specific tenant.
	InvalidateCache(tenantID string)

	// InvalidateCacheAll removes all cached tenant configurations.
	InvalidateCacheAll()
}

Resolver defines the interface for resolving tenant configurations.

func NewResolver

func NewResolver(serviceURL string, opts ...ResolverOption) Resolver

NewResolver creates a new Resolver with the given service URL and options.

type ResolverOption

type ResolverOption func(*resolverImpl)

ResolverOption is a function that configures a Resolver.

func WithAPIKey

func WithAPIKey(apiKey string) ResolverOption

WithAPIKey sets the API key for authentication with the Tenant Service.

func WithCacheTTL

func WithCacheTTL(ttl time.Duration) ResolverOption

WithCacheTTL sets the cache TTL for the resolver. Default is 24 hours.

func WithHTTPClient

func WithHTTPClient(client *http.Client) ResolverOption

WithHTTPClient sets a custom HTTP client for the resolver.

func WithResolverLogger

func WithResolverLogger(logger libLog.Logger) ResolverOption

WithResolverLogger sets the logger for the resolver.

type TenantConfig

type TenantConfig struct {
	ID            string                      `json:"_id"`
	TenantSlug    string                      `json:"tenant_slug"`
	TenantName    string                      `json:"tenant_name"`
	Status        string                      `json:"status"`
	IsolationMode string                      `json:"isolation_mode"`
	Databases     map[string]DatabaseServices `json:"databases,omitempty"`
	Valkey        *ValkeyConfig               `json:"valkey,omitempty"`
	RabbitMQ      *RabbitMQConfig             `json:"rabbitmq,omitempty"`
}

TenantConfig holds the complete configuration for a tenant. This structure matches the response from the Tenant Service API.

func GetTenantConfig

func GetTenantConfig(ctx context.Context) *TenantConfig

GetTenantConfig retrieves the tenant configuration from the context. Returns nil if not found. Checks both the internal middleware key and the exported TenantConfigContextKey for compatibility.

func GetTenantConfigFromFiber

func GetTenantConfigFromFiber(c *fiber.Ctx) *TenantConfig

GetTenantConfigFromFiber extracts the tenant configuration from a Fiber context. Returns nil if the context is nil or no tenant configuration is found.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    config := GetTenantConfigFromFiber(c)
    if config == nil {
        return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "no tenant config"})
    }
    // Use config...
})

type TenantDatabase

type TenantDatabase struct {
	// contains filtered or unexported fields
}

TenantDatabase wraps mongo.Database and prefixes collection names for schema isolation mode. This provides transparent collection prefixing without changing application code.

func GetMongoDatabaseForTenant

func GetMongoDatabaseForTenant(ctx context.Context, defaultConn *libMongo.MongoConnection) (*TenantDatabase, error)

GetMongoDatabaseForTenant returns the appropriate MongoDB database based on multi-tenant mode.

When multi-tenant is ENABLED (tenant ID in context):

  • Returns the tenant-specific database from context
  • Returns ERROR if tenant database not found (NO fallback!)

When multi-tenant is DISABLED (no tenant ID in context):

  • Returns the default database (single-tenant mode)

Usage in repositories:

func (r *Repository) Create(ctx context.Context, entity *Entity) error {
    db, err := poolmanager.GetMongoDatabaseForTenant(ctx, r.connection)
    if err != nil {
        return err // Don't proceed without correct connection!
    }
    collection := db.Collection("mycollection")
    // Use collection for queries...
}

func GetTenantMongoDatabase

func GetTenantMongoDatabase(ctx context.Context) (*TenantDatabase, bool)

GetTenantMongoDatabase retrieves the tenant-specific MongoDB database from the context. Returns the TenantDatabase and true if found, otherwise nil and false. This returns an actual database handle with automatic collection prefixing for schema mode.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    db, ok := GetTenantMongoDatabase(c.UserContext())
    if !ok {
        return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "no database connection"})
    }
    collection := db.Collection("users") // Auto-prefixed in schema mode
    // Use collection for database operations...
})

func GetTenantMongoDatabaseFromFiber

func GetTenantMongoDatabaseFromFiber(c *fiber.Ctx) (*TenantDatabase, bool)

GetTenantMongoDatabaseFromFiber extracts the tenant MongoDB database from a Fiber context. Returns the TenantDatabase and true if found, otherwise nil and false.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    db, ok := GetTenantMongoDatabaseFromFiber(c)
    if !ok {
        // Fall back to config-based connection
    }
    // Use db...
})

func (*TenantDatabase) Collection

func (td *TenantDatabase) Collection(name string, opts ...*options.CollectionOptions) *mongo.Collection

Collection returns a handle for a collection with the tenant prefix applied. In database mode (empty prefix): returns collection as-is. In schema mode: returns collection with "tenant_{id}_" prefix.

func (*TenantDatabase) Database

func (td *TenantDatabase) Database() *mongo.Database

Database returns the underlying mongo.Database pointer.

func (*TenantDatabase) Prefix

func (td *TenantDatabase) Prefix() string

Prefix returns the collection prefix for this tenant database.

func (*TenantDatabase) TenantID

func (td *TenantDatabase) TenantID() string

TenantID returns the tenant ID associated with this database.

type TenantRabbitMQPublisher

type TenantRabbitMQPublisher interface {
	// Publish sends a message to the specified exchange with the given routing key.
	// The X-Tenant-ID header is automatically injected into the message.
	// Returns an error if the context is nil or if publishing fails.
	Publish(ctx context.Context, exchange, routingKey string, msg amqp.Publishing) error

	// GetTenantID returns the tenant identifier associated with this publisher.
	GetTenantID() string
}

TenantRabbitMQPublisher defines the interface for a tenant-aware RabbitMQ publisher. All published messages are automatically enriched with the X-Tenant-ID header containing the tenant identifier.

func NewTenantRabbitMQPublisher

func NewTenantRabbitMQPublisher(channel AMQPChannel, tenantID string) TenantRabbitMQPublisher

NewTenantRabbitMQPublisher creates a new tenant-aware RabbitMQ publisher. Returns nil if channel is nil or tenantID is empty/whitespace.

The publisher wraps RabbitMQ publish operations and automatically injects the X-Tenant-ID header into all published messages to ensure tenant context is propagated through message queues.

Example:

publisher := NewTenantRabbitMQPublisher(channel, "org-123")
publisher.Publish(ctx, "exchange", "routing.key", amqp.Publishing{
    Body: []byte(`{"event": "user.created"}`),
})
// Message will have header: X-Tenant-ID: org-123

func NewTenantRabbitMQPublisherWithLogger

func NewTenantRabbitMQPublisherWithLogger(channel AMQPChannel, tenantID string, logger libLog.Logger) TenantRabbitMQPublisher

NewTenantRabbitMQPublisherWithLogger creates a new tenant-aware RabbitMQ publisher with logging support. Returns nil if channel is nil or tenantID is empty/whitespace.

Example:

publisher := NewTenantRabbitMQPublisherWithLogger(channel, "org-123", logger)
publisher.Publish(ctx, "exchange", "routing.key", amqp.Publishing{
    Body: []byte(`{"event": "user.created"}`),
})
// Message will have header: X-Tenant-ID: org-123

type TenantValkeyClient

type TenantValkeyClient interface {
	// Get retrieves the value of a key within the tenant's namespace.
	// Returns redis.Nil error if the key does not exist.
	Get(ctx context.Context, key string) (string, error)

	// Set stores a value for a key within the tenant's namespace.
	// If ttl is 0, the key has no expiration.
	Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error

	// Del removes one or more keys from the tenant's namespace.
	// Returns the number of keys that were removed.
	Del(ctx context.Context, keys ...string) (int64, error)

	// Keys returns all keys matching the pattern within the tenant's namespace.
	// The pattern is automatically prefixed with the tenant prefix.
	// Returned keys have the tenant prefix stripped.
	Keys(ctx context.Context, pattern string) ([]string, error)

	// Exists returns the number of keys that exist within the tenant's namespace.
	Exists(ctx context.Context, keys ...string) (int64, error)

	// GetTenantID returns the tenant ID for this client.
	GetTenantID() string

	// GetPrefix returns the key prefix used by this client (format: "tenant:{id}#").
	GetPrefix() string

	// GetUnderlyingClient returns the underlying Redis client for advanced operations.
	// Use with caution: operations on this client bypass tenant isolation.
	GetUnderlyingClient() redis.Cmdable
}

TenantValkeyClient defines the interface for a tenant-aware Valkey/Redis client. All keys are automatically prefixed with the tenant identifier using the pattern: tenant:{id}# This ensures complete tenant isolation at the key level.

func NewTenantValkeyClient

func NewTenantValkeyClient(client redis.Cmdable, tenantID string) TenantValkeyClient

NewTenantValkeyClient creates a new tenant-aware Valkey/Redis client. Returns nil if client is nil or tenantID is empty.

The client wraps common Redis operations and automatically prefixes all keys with "tenant:{tenantID}#" to ensure tenant isolation.

Example:

client := NewTenantValkeyClient(redisClient, "org-123")
client.Set(ctx, "user:1", "data", 0)
// Stored as: tenant:org-123#user:1

func NewTenantValkeyClientFromConnection

func NewTenantValkeyClientFromConnection(ctx context.Context, conn *libRedis.RedisConnection, tenantID string) (TenantValkeyClient, error)

NewTenantValkeyClientFromConnection creates a tenant-aware Valkey/Redis client from a RedisConnection. This is the preferred way to create a tenant client when using the commons/redis infrastructure.

The function retrieves the underlying client from the RedisConnection and wraps it with tenant-aware key prefixing.

Example:

conn := &redis.RedisConnection{...}
if err := conn.Connect(ctx); err != nil {
    return err
}
client, err := NewTenantValkeyClientFromConnection(ctx, conn, "org-123")
if err != nil {
    return err
}
client.Set(ctx, "user:1", "data", 0)
// Stored as: tenant:org-123#user:1

func NewTenantValkeyClientWithLogger

func NewTenantValkeyClientWithLogger(client redis.Cmdable, tenantID string, logger libLog.Logger) TenantValkeyClient

NewTenantValkeyClientWithLogger creates a new tenant-aware Valkey/Redis client with logging support. Returns nil if client is nil or tenantID is empty.

Example:

client := NewTenantValkeyClientWithLogger(redisClient, "org-123", logger)
client.Set(ctx, "user:1", "data", 0)
// Stored as: tenant:org-123#user:1

type ValkeyConfig

type ValkeyConfig struct {
	Addresses []string `json:"addresses"`
	Password  string   `json:"password"`
	DB        int      `json:"db"`
}

ValkeyConfig holds Valkey (Redis-compatible) connection configuration.

func GetTenantValkey

func GetTenantValkey(ctx context.Context) *ValkeyConfig

GetTenantValkey retrieves the Valkey configuration from the context. Returns nil if not found.

func GetTenantValkeyFromFiber

func GetTenantValkeyFromFiber(c *fiber.Ctx) *ValkeyConfig

GetTenantValkeyFromFiber extracts the Valkey configuration from a Fiber context. Returns nil if the context is nil or no Valkey configuration is found.

Example:

app.Get("/api/resource", func(c *fiber.Ctx) error {
    valkeyConfig := GetTenantValkeyFromFiber(c)
    if valkeyConfig == nil {
        return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "no cache"})
    }
    // Use valkeyConfig...
})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL