bucket

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: MIT Imports: 17 Imported by: 0

README

Bucket Partitioned Repository

This package provides a generalized, schema-agnostic interface for managing partitioned data across organization buckets in PostgreSQL.

Architecture

The architecture separates bucket/org management from schema-specific operations through dependency injection:

  • PartitionedRepository: Interface for org/bucket management and data operations
  • SchemaHandler: Interface for schema-specific operations (table creation, data copying, scrubbing)
  • PostgresqlPartitionedRepository: Generic implementation that delegates schema operations to a SchemaHandler
  • CachingRepository: Optional wrapper that adds org collection caching to any PartitionedRepository

Usage

Basic Usage Example
import (
    "time"
    "github.com/controlplane-com/libs-go/pkg/database/bucket"
)

// Create the config (only repository-related settings)
config := bucket.Config{
    OrgBucketCount:                256,
    PartitionPreparationThreshold: 90 * 24 * time.Hour,
}

// Create your schema handler (implementation specific)
schemaHandler := myschema.NewMyCustomSchemaHandler()

// Create the repository (pass nil to use default ModulatedHash selector)
repo := bucket.NewPostgresqlPartitionedRepository(config, schemaHandler, nil)

// Optionally, provide a custom bucket index selector
// chooser := func(org *bucket.Org, bucketCount int) int { return 1 /* always bucket 1 */ }
// repo = bucket.NewPostgresqlPartitionedRepository(config, schemaHandler, chooser)

// Initialize with database connection
err := repo.Initialize(dbConnection)
Enable caching (optional)

Wrap the repository with a CachingRepository and provide a CollectionCache implementation (for example, Redis-backed):

import (
    "time"
    "github.com/controlplane-com/libs-go/pkg/cache"
    "github.com/controlplane-com/libs-go/pkg/database/bucket"
)

redisCfg := cache.RedisClientConfig{ Hosts: []string{"localhost:6379"}, Mode: cache.RedisModeStandalone }

// nameOf extracts the cache key (org name) from an Org
nameOf := func(o *bucket.Org) (string, error) { return o.Name, nil }
orgCache := cache.NewRedisCollectionCache[bucket.Org](
    redisCfg,
    100,            // batch size
    "orgs",        // key prefix
    "orgs:index",  // index key for validity TTL
    nameOf,
)

// Wrap with caching (24h validity for the index)
repo = bucket.NewCachingRepository(repo, orgCache, 24*time.Hour)
Creating a Custom Schema Handler

To use this architecture with a different schema structure:

package myschema

import (
    "time"
    "gorm.io/gorm"
    "github.com/controlplane-com/libs-go/pkg/database/bucket"
)

type MyCustomSchemaHandler struct {
    // Add any configuration or dependencies needed
}

func NewMyCustomSchemaHandler() *MyCustomSchemaHandler {
    return &MyCustomSchemaHandler{}
}

func (h *MyCustomSchemaHandler) EnsureSchema(db *gorm.DB, b *bucket.Bucket) error {
    schema := b.SchemaName()
    // Create your custom tables, indexes, etc.
    return db.Exec(`
        CREATE SCHEMA IF NOT EXISTS ` + schema + `;
        CREATE TABLE IF NOT EXISTS ` + schema + `.my_data (
            id SERIAL PRIMARY KEY,
            org_id VARCHAR,
            data JSONB,
            created_at TIMESTAMPTZ
        );
    `).Error
}

func (h *MyCustomSchemaHandler) EnsurePartitions(db *gorm.DB, b *bucket.Bucket, startTime time.Time, years int) (time.Time, error) {
    // Implement partition creation for your schema
    // Return the next partition time
    nextTime := startTime.AddDate(years, 0, 0)
    return nextTime, nil
}

func (h *MyCustomSchemaHandler) CopyOrgData(db *gorm.DB, org *bucket.Org, startTime time.Time, endTime time.Time, oldBucket *bucket.Bucket, newBucket *bucket.Bucket) error {
    // Implement data copying logic for your schema
    oldSchema := oldBucket.SchemaName()
    newSchema := newBucket.SchemaName()

    query := `
        INSERT INTO ` + newSchema + `.my_data (org_id, data, created_at)
        SELECT org_id, data, created_at
        FROM ` + oldSchema + `.my_data
        WHERE org_id = ? AND created_at >= ? AND created_at < ?
        ON CONFLICT DO NOTHING;
    `
    return db.Exec(query, org.Name, startTime, endTime).Error
}

func (h *MyCustomSchemaHandler) ScrubOrgData(db *gorm.DB, org *bucket.Org, startTime time.Time, endTime time.Time, b *bucket.Bucket) error {
    // Implement data deletion logic for your schema
    schema := b.SchemaName()
    query := `
        DELETE FROM ` + schema + `.my_data
        WHERE org_id = ? AND created_at >= ? AND created_at < ?;
    `
    return db.Exec(query, org.Name, startTime, endTime).Error
}

func (h *MyCustomSchemaHandler) BuildAuditQuery(oldBucket *bucket.Bucket, newBucket *bucket.Bucket, startTime time.Time, endTime time.Time) string {
    // Build a query to audit differences between buckets
    oldSchema := oldBucket.SchemaName()
    newSchema := newBucket.SchemaName()

    return `
        SELECT o.org_id, o.data as old_data, n.data as new_data
        FROM ` + oldSchema + `.my_data o
        FULL OUTER JOIN ` + newSchema + `.my_data n
            ON o.id = n.id
        WHERE o.data IS NULL OR n.data IS NULL OR o.data != n.data;
    `
}

The example above shows how to integrate your custom schema handler with the repository. See the "Basic Usage Example" section for repository setup, and the "Enable caching (optional)" section to add caching.

Key Interfaces

PartitionedRepository

Main interface for interacting with bucket-partitioned data:

  • Org Management: CreateOrg, GetOrg, ListOrgs, FilterOrgs, ListBucketOrgs, ListOrgsByName
  • Bucket Management: GetBucket, ListBuckets, EnsureBucketPartitions, MoveOrgToBucket
  • Data Operations: CopyOrgData, ScrubOrgData, AuditBucket
SchemaHandler

Interface for schema-specific operations that must be implemented for each data structure:

  • EnsureSchema: Create all necessary tables, indexes, and schema structures
  • EnsurePartitions: Create time-based partitions for tables
  • CopyOrgData: Copy organization data between buckets
  • ScrubOrgData: Delete organization data from a bucket
  • BuildAuditQuery: Build query to audit differences between buckets

Configuration

Config

Configuration struct for the partitioned repository:

type Config struct {
    OrgBucketCount                int           // Number of buckets for org partitioning
    PartitionPreparationThreshold time.Duration // Time before partition end to create next partition
}

Note: Caching configuration is not part of this Config. To add org caching, wrap your repository with NewCachingRepository and provide a cache.CollectionCache implementation (see "Enable caching (optional)").

Benefits

  1. Schema Flexibility: Easily support different data structures by implementing a new SchemaHandler
  2. Separation of Concerns: Bucket/org management is separated from schema-specific operations
  3. Dependency Injection: Schema behavior can be injected at runtime
  4. Testability: Schema handlers can be mocked for testing repository logic
  5. No External Dependencies: Only depends on go-libs packages (cache, checkpoints, database, etc.)

Types

Org
type Org struct {
    Name     string
    Created  time.Time
    BucketId int
    Bucket   *Bucket
}
Bucket
type Bucket struct {
    Id               int
    PartitionsEnding time.Time
    Created          time.Time
    Orgs             []*Org
}
AuditResult
type AuditResult struct {
    Org              string
    Tags             interface{}
    OriginalValue    *float64
    DestinationValue *float64
    StartTime        time.Time
}

Helper Functions

The package also provides several helper functions in helpers.go:

  • ModulatedHash(input string, space int) int: Converts a string to a number in the range (0, space] for consistent bucket assignment
  • IsPostgresIdentifier(identifier string) bool: Validates PostgreSQL identifier syntax
  • SchemaName(schema string) string: Sanitizes schema names by replacing hyphens with underscores
  • TableName(schema string, tableName string) string: Returns a fully qualified table name
  • IndexName(schema string, tableName string, suffix string) string: Generates an index name

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IndexName

func IndexName(schema string, tableName string, suffix string) string

IndexName generates an index name with schema, table, and suffix

func IsPostgresIdentifier

func IsPostgresIdentifier(identifier string) bool

IsPostgresIdentifier checks if a string is a valid PostgreSQL identifier

func ModulatedHash

func ModulatedHash(input string, space int) int

ModulatedHash converts the input string to a number in the half-closed interval (0, space]

func RunBucketMaintenanceLoop

func RunBucketMaintenanceLoop(opts BucketMaintenanceOptions)

RunBucketMaintenanceLoop continuously ensures bucket partitions are prepared ahead of time. This function runs indefinitely and should be called in a goroutine. Only the leader replica will perform maintenance work; followers will sleep and check periodically.

func SchemaName

func SchemaName(schema string) string

SchemaName sanitizes a schema name by replacing hyphens with underscores

func TableName

func TableName(schema string, tableName string) string

TableName returns a fully qualified table name with schema

Types

type Bucket

type Bucket struct {
	Id               int       `json:"id"`
	PartitionsEnding time.Time `json:"partitionsEnding" gorm:"type:timestamptz"`
	Created          time.Time `json:"created" gorm:"type:timestamptz;default:now()"`
	Orgs             []*Org    `json:"orgs,omitempty"`
}

Bucket represents a partition bucket that contains multiple organizations

func (*Bucket) CheckpointName

func (b *Bucket) CheckpointName() string

func (*Bucket) SchemaName

func (b *Bucket) SchemaName() string

type BucketIndexSelector

type BucketIndexSelector func(org *Org, bucketCount int) int

BucketIndexSelector selects the target bucket ID (1..bucketCount) for a given org when creating/assigning it. Implementations must return a value in the inclusive range [1, bucketCount].

type BucketMaintenanceOptions

type BucketMaintenanceOptions struct {
	Repository                    PartitionedRepository
	LeaderElectorClass            string
	MaintenanceFrequency          time.Duration
	PartitionPreparationThreshold time.Duration
}

BucketMaintenanceOptions configures the bucket maintenance loop

type CachingRepository

type CachingRepository struct {
	PartitionedRepository
	// contains filtered or unexported fields
}

CachingRepository is a wrapper that adds CollectionCache-based caching to any PartitionedRepository implementation.

func (*CachingRepository) CreateOrg

func (r *CachingRepository) CreateOrg(orgName string) error

CreateOrg checks cache for existence to short-circuit, delegates creation, then updates cache.

func (*CachingRepository) FilterOrgs

func (r *CachingRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)

FilterOrgs delegates to inner by default.

func (*CachingRepository) GetOrg

func (r *CachingRepository) GetOrg(name string) (*Org, error)

GetOrg attempts cache, falls back to delegate, and then writes the single org to cache.

func (*CachingRepository) ListBucketOrgs

func (r *CachingRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)

ListBucketOrgs delegates (no collection-level cache here because it's already part of the full orgs cache)

func (*CachingRepository) ListOrgs

func (r *CachingRepository) ListOrgs(pattern string) ([]*Org, error)

ListOrgs serves from cache when valid; otherwise loads from delegate, fills cache, marks valid, then filters.

func (*CachingRepository) ListOrgsByName

func (r *CachingRepository) ListOrgsByName(names []string) ([]*Org, error)

ListOrgsByName reuses ListOrgs with a compiled regex pattern.

func (*CachingRepository) MoveOrgToBucket

func (r *CachingRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error

MoveOrgToBucket clears cache before delegating to ensure consistency.

type Config

type Config struct {
	OrgBucketCount                int
	PartitionPreparationThreshold time.Duration
}

Config holds configuration for PartitionedRepository

type CopyOrgRequest

type CopyOrgRequest struct {
	SourceBucket      int       `json:"sourceBucket"`
	DestinationBucket int       `json:"destinationBucket"`
	StartTime         time.Time `json:"startTime"`
	EndTime           time.Time `json:"endTime"`
}

CopyOrgRequest contains parameters for copying organization data between buckets

type MoveOrgRequest

type MoveOrgRequest struct {
	SourceBucket      int       `json:"sourceBucket"`
	DestinationBucket int       `json:"destinationBucket"`
	StartTime         time.Time `json:"startTime"`
	EndTime           time.Time `json:"endTime"`
}

MoveOrgRequest contains parameters for moving an organization to a different bucket

type Org

type Org struct {
	Name     string    `json:"name" gorm:"primaryKey"`
	Created  time.Time `json:"created" gorm:"type:timestamptz;default:now()"`
	BucketId int       `json:"bucketId"`
	Bucket   *Bucket   `json:"bucket,omitempty"`
}

Org represents an organization that is assigned to a bucket for data partitioning

func (*Org) String

func (o *Org) String() string

type PartitionedRepository

type PartitionedRepository interface {
	checkpoints.CheckpointRepository
	database.Repository

	// Org Management
	CreateOrg(org string) error
	GetOrg(name string) (*Org, error)
	ListOrgs(filter string) ([]*Org, error)
	FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
	ListBucketOrgs(bucket *Bucket) ([]*Org, error)
	ListOrgsByName(names []string) ([]*Org, error)

	// Bucket Management
	GetBucket(id int) (*Bucket, error)
	ListBuckets() ([]*Bucket, error)
	EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error
	MoveOrgToBucket(org *Org, bucket *Bucket) error

	// Data Operations
	CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error
	ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error
	AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, orgFilter string) ([]any, error)
}

PartitionedRepository is a generalized interface for managing partitioned data across organization buckets. It provides methods for org/bucket management, and schema-agnostic operations.

func NewCachingRepository

func NewCachingRepository(inner PartitionedRepository, collectionCache cache.CollectionCache[Org], orgCacheTtl time.Duration) PartitionedRepository

NewCachingRepository wraps an existing PartitionedRepository with caching behavior for org collection operations. If collectionCache is nil, the wrapper behaves as a transparent pass-through.

func NewPostgresqlPartitionedRepository

func NewPostgresqlPartitionedRepository(config Config, schemaHandler SchemaHandler, indexSelector BucketIndexSelector) PartitionedRepository

NewPostgresqlPartitionedRepository creates a new repository with a schema handler

type PostgresqlPartitionedRepository

type PostgresqlPartitionedRepository struct {
	checkpoints.CheckpointRepository
	// contains filtered or unexported fields
}

PostgresqlPartitionedRepository is a generic repository for managing bucket-partitioned data

func (*PostgresqlPartitionedRepository) AuditBucket

func (r *PostgresqlPartitionedRepository) AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, orgFilter string) ([]any, error)

AuditBucket audits differences between two buckets

func (*PostgresqlPartitionedRepository) CopyOrgData

func (r *PostgresqlPartitionedRepository) CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error

CopyOrgData delegates to SchemaHandler to copy org data between buckets

func (*PostgresqlPartitionedRepository) CreateOrg

func (r *PostgresqlPartitionedRepository) CreateOrg(orgName string) error

CreateOrg creates a new organization and assigns it to a bucket

func (*PostgresqlPartitionedRepository) EnsureBucketPartitions

func (r *PostgresqlPartitionedRepository) EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error

EnsureBucketPartitions ensures that partitions exist for a bucket

func (*PostgresqlPartitionedRepository) FilterOrgs

func (r *PostgresqlPartitionedRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)

FilterOrgs filters a list of organizations by a pattern

func (*PostgresqlPartitionedRepository) GetBucket

func (r *PostgresqlPartitionedRepository) GetBucket(id int) (*Bucket, error)

GetBucket retrieves a bucket by ID

func (*PostgresqlPartitionedRepository) GetOrg

func (r *PostgresqlPartitionedRepository) GetOrg(name string) (*Org, error)

GetOrg retrieves an organization by name

func (*PostgresqlPartitionedRepository) Initialize

func (r *PostgresqlPartitionedRepository) Initialize(connection database.Connection) error

func (*PostgresqlPartitionedRepository) ListBucketOrgs

func (r *PostgresqlPartitionedRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)

ListBucketOrgs lists all organizations in a specific bucket

func (*PostgresqlPartitionedRepository) ListBuckets

func (r *PostgresqlPartitionedRepository) ListBuckets() ([]*Bucket, error)

ListBuckets lists all buckets

func (*PostgresqlPartitionedRepository) ListOrgs

func (r *PostgresqlPartitionedRepository) ListOrgs(pattern string) ([]*Org, error)

ListOrgs lists all organizations matching a filter pattern

func (*PostgresqlPartitionedRepository) ListOrgsByName

func (r *PostgresqlPartitionedRepository) ListOrgsByName(names []string) ([]*Org, error)

ListOrgsByName lists organizations by their names

func (*PostgresqlPartitionedRepository) MoveOrgToBucket

func (r *PostgresqlPartitionedRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error

MoveOrgToBucket moves an organization to a different bucket

func (*PostgresqlPartitionedRepository) ScrubOrgData

func (r *PostgresqlPartitionedRepository) ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error

ScrubOrgData delegates to SchemaHandler to scrub org data from a bucket

type SchemaHandler

type SchemaHandler interface {
	// EnsureSchema creates all necessary tables, indexes, and schema structures for a bucket
	EnsureSchema(db *gorm.DB, bucket *Bucket) error

	// EnsurePartitions creates time-based partitions for a bucket's tables
	EnsurePartitions(db *gorm.DB, bucket *Bucket, startTime time.Time, years int) (nextTime time.Time, err error)

	// CopyOrgData copies all data for an organization from one bucket to another within a time range
	CopyOrgData(db *gorm.DB, org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error

	// ScrubOrgData deletes all data for an organization from a bucket within a time range
	ScrubOrgData(db *gorm.DB, org *Org, startTime time.Time, endTime time.Time, bucket *Bucket) error

	// AuditBuckets executes a query to audit differences between two buckets and returns data-model-specific results
	AuditBuckets(db *gorm.DB, oldBucket *Bucket, newBucket *Bucket, startTime time.Time, endTime time.Time, orgNames []string) ([]any, error)
}

SchemaHandler defines the interface for handling schema-specific operations Implementations of this interface handle creating the database schema structure, copying and scrubbing org data, and managing partitions.

type ScrubOrgRequest

type ScrubOrgRequest struct {
	DestinationBucket int       `json:"destinationBucket"`
	StartTime         time.Time `json:"startTime"`
	EndTime           time.Time `json:"endTime"`
}

ScrubOrgRequest contains parameters for deleting organization data from a bucket

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL