bucket

package
v1.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IndexName

func IndexName(schema string, tableName string, suffix string) string

IndexName generates an index name with schema, table, and suffix

func IsPostgresIdentifier

func IsPostgresIdentifier(identifier string) bool

IsPostgresIdentifier checks if a string is a valid PostgreSQL identifier

func ModulatedHash

func ModulatedHash(input string, space int) int

ModulatedHash converts the input string to a number in the half-closed interval (0, space]

func RunBucketMaintenanceLoop

func RunBucketMaintenanceLoop(opts BucketMaintenanceOptions)

RunBucketMaintenanceLoop continuously ensures bucket partitions are prepared ahead of time. This function runs indefinitely and should be called in a goroutine. Only the leader replica will perform maintenance work; followers will sleep and check periodically.

func SchemaName

func SchemaName(schema string) string

SchemaName sanitizes a schema name by replacing hyphens with underscores

func TableName

func TableName(schema string, tableName string) string

TableName returns a fully qualified table name with schema

Types

type Bucket

type Bucket struct {
	Id               int       `json:"id"`
	PartitionsEnding time.Time `json:"partitionsEnding" gorm:"type:timestamptz"`
	Created          time.Time `json:"created" gorm:"type:timestamptz;default:now()"`
	Orgs             []*Org    `json:"orgs,omitempty"`
}

Bucket represents a partition bucket that contains multiple organizations

func (*Bucket) CheckpointName

func (b *Bucket) CheckpointName() string

func (*Bucket) SchemaName

func (b *Bucket) SchemaName() string

type BucketIndexSelector

type BucketIndexSelector func(org *Org, bucketCount int) int

BucketIndexSelector selects the target bucket ID (1..bucketCount) for a given org when creating/assigning it. Implementations must return a value in the inclusive range [1, bucketCount].

type BucketMaintenanceOptions

type BucketMaintenanceOptions struct {
	Repository                    PartitionedRepository
	LeaderElectorClass            string
	MaintenanceFrequency          time.Duration
	PartitionPreparationThreshold time.Duration
}

BucketMaintenanceOptions configures the bucket maintenance loop

type CachingRepository

type CachingRepository struct {
	PartitionedRepository
	// contains filtered or unexported fields
}

CachingRepository is a wrapper that adds CollectionCache-based caching to any PartitionedRepository implementation.

func (*CachingRepository) CreateOrg

func (r *CachingRepository) CreateOrg(orgName string) error

CreateOrg checks cache for existence to short-circuit, delegates creation, then updates cache.

func (*CachingRepository) FilterOrgs

func (r *CachingRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)

FilterOrgs delegates to inner by default.

func (*CachingRepository) GetOrg

func (r *CachingRepository) GetOrg(name string) (*Org, error)

GetOrg attempts cache, falls back to delegate, and then writes the single org to cache.

func (*CachingRepository) ListBucketOrgs

func (r *CachingRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)

ListBucketOrgs delegates (no collection-level cache here because it's already part of the full orgs cache)

func (*CachingRepository) ListOrgs

func (r *CachingRepository) ListOrgs(pattern string) ([]*Org, error)

ListOrgs serves from cache when valid; otherwise loads from delegate, fills cache, marks valid, then filters.

func (*CachingRepository) ListOrgsByName

func (r *CachingRepository) ListOrgsByName(names []string) ([]*Org, error)

ListOrgsByName reuses ListOrgs with a compiled regex pattern.

func (*CachingRepository) MoveOrgToBucket

func (r *CachingRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error

MoveOrgToBucket clears cache before delegating to ensure consistency.

type Config

type Config struct {
	OrgBucketCount                int
	PartitionPreparationThreshold time.Duration
}

Config holds configuration for PartitionedRepository

type CopyOrgRequest

type CopyOrgRequest struct {
	SourceBucket      int       `json:"sourceBucket"`
	DestinationBucket int       `json:"destinationBucket"`
	StartTime         time.Time `json:"startTime"`
	EndTime           time.Time `json:"endTime"`
}

CopyOrgRequest contains parameters for copying organization data between buckets

type MoveOrgRequest

type MoveOrgRequest struct {
	SourceBucket      int       `json:"sourceBucket"`
	DestinationBucket int       `json:"destinationBucket"`
	StartTime         time.Time `json:"startTime"`
	EndTime           time.Time `json:"endTime"`
}

MoveOrgRequest contains parameters for moving an organization to a different bucket

type Org

type Org struct {
	Name     string    `json:"name" gorm:"primaryKey"`
	Created  time.Time `json:"created" gorm:"type:timestamptz;default:now()"`
	BucketId int       `json:"bucketId"`
	Bucket   *Bucket   `json:"bucket,omitempty"`
}

Org represents an organization that is assigned to a bucket for data partitioning

func (*Org) String

func (o *Org) String() string

type PartitionedRepository

type PartitionedRepository interface {
	checkpoints.CheckpointRepository
	database.Repository

	// Org Management
	CreateOrg(org string) error
	GetOrg(name string) (*Org, error)
	ListOrgs(filter string) ([]*Org, error)
	FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
	ListBucketOrgs(bucket *Bucket) ([]*Org, error)
	ListOrgsByName(names []string) ([]*Org, error)

	// Bucket Management
	GetBucket(id int) (*Bucket, error)
	ListBuckets() ([]*Bucket, error)
	EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error
	MoveOrgToBucket(org *Org, bucket *Bucket) error

	// Data Operations
	CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error
	ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error
	AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, orgFilter string) ([]any, error)
}

PartitionedRepository is a generalized interface for managing partitioned data across organization buckets. It provides methods for org/bucket management, and schema-agnostic operations.

func NewCachingRepository

func NewCachingRepository(inner PartitionedRepository, collectionCache cache.CollectionCache[Org], orgCacheTtl time.Duration) PartitionedRepository

NewCachingRepository wraps an existing PartitionedRepository with caching behavior for org collection operations. If collectionCache is nil, the wrapper behaves as a transparent pass-through.

func NewPostgresqlPartitionedRepository

func NewPostgresqlPartitionedRepository(config Config, schemaHandler SchemaHandler, indexSelector BucketIndexSelector) PartitionedRepository

NewPostgresqlPartitionedRepository creates a new repository with a schema handler

type PostgresqlPartitionedRepository

type PostgresqlPartitionedRepository struct {
	checkpoints.CheckpointRepository
	// contains filtered or unexported fields
}

PostgresqlPartitionedRepository is a generic repository for managing bucket-partitioned data

func (*PostgresqlPartitionedRepository) AuditBucket

func (r *PostgresqlPartitionedRepository) AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, orgFilter string) ([]any, error)

AuditBucket audits differences between two buckets

func (*PostgresqlPartitionedRepository) CopyOrgData

func (r *PostgresqlPartitionedRepository) CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error

CopyOrgData delegates to SchemaHandler to copy org data between buckets

func (*PostgresqlPartitionedRepository) CreateOrg

func (r *PostgresqlPartitionedRepository) CreateOrg(orgName string) error

CreateOrg creates a new organization and assigns it to a bucket

func (*PostgresqlPartitionedRepository) EnsureBucketPartitions

func (r *PostgresqlPartitionedRepository) EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error

EnsureBucketPartitions ensures that partitions exist for a bucket

func (*PostgresqlPartitionedRepository) FilterOrgs

func (r *PostgresqlPartitionedRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)

FilterOrgs filters a list of organizations by a pattern

func (*PostgresqlPartitionedRepository) GetBucket

func (r *PostgresqlPartitionedRepository) GetBucket(id int) (*Bucket, error)

GetBucket retrieves a bucket by ID

func (*PostgresqlPartitionedRepository) GetOrg

func (r *PostgresqlPartitionedRepository) GetOrg(name string) (*Org, error)

GetOrg retrieves an organization by name

func (*PostgresqlPartitionedRepository) Initialize

func (r *PostgresqlPartitionedRepository) Initialize(connection database.Connection) error

func (*PostgresqlPartitionedRepository) ListBucketOrgs

func (r *PostgresqlPartitionedRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)

ListBucketOrgs lists all organizations in a specific bucket

func (*PostgresqlPartitionedRepository) ListBuckets

func (r *PostgresqlPartitionedRepository) ListBuckets() ([]*Bucket, error)

ListBuckets lists all buckets

func (*PostgresqlPartitionedRepository) ListOrgs

func (r *PostgresqlPartitionedRepository) ListOrgs(pattern string) ([]*Org, error)

ListOrgs lists all organizations matching a filter pattern

func (*PostgresqlPartitionedRepository) ListOrgsByName

func (r *PostgresqlPartitionedRepository) ListOrgsByName(names []string) ([]*Org, error)

ListOrgsByName lists organizations by their names

func (*PostgresqlPartitionedRepository) MoveOrgToBucket

func (r *PostgresqlPartitionedRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error

MoveOrgToBucket moves an organization to a different bucket

func (*PostgresqlPartitionedRepository) ScrubOrgData

func (r *PostgresqlPartitionedRepository) ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error

ScrubOrgData delegates to SchemaHandler to scrub org data from a bucket

type SchemaHandler

type SchemaHandler interface {
	// EnsureSchema creates all necessary tables, indexes, and schema structures for a bucket
	EnsureSchema(db *gorm.DB, bucket *Bucket) error

	// EnsurePartitions creates time-based partitions for a bucket's tables
	EnsurePartitions(db *gorm.DB, bucket *Bucket, startTime time.Time, years int) (nextTime time.Time, err error)

	// CopyOrgData copies all data for an organization from one bucket to another within a time range
	CopyOrgData(db *gorm.DB, org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error

	// ScrubOrgData deletes all data for an organization from a bucket within a time range
	ScrubOrgData(db *gorm.DB, org *Org, startTime time.Time, endTime time.Time, bucket *Bucket) error

	// AuditBuckets executes a query to audit differences between two buckets and returns data-model-specific results
	AuditBuckets(db *gorm.DB, oldBucket *Bucket, newBucket *Bucket, startTime time.Time, endTime time.Time, orgNames []string) ([]any, error)
}

SchemaHandler defines the interface for handling schema-specific operations Implementations of this interface handle creating the database schema structure, copying and scrubbing org data, and managing partitions.

type ScrubOrgRequest

type ScrubOrgRequest struct {
	DestinationBucket int       `json:"destinationBucket"`
	StartTime         time.Time `json:"startTime"`
	EndTime           time.Time `json:"endTime"`
}

ScrubOrgRequest contains parameters for deleting organization data from a bucket

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL