Documentation
¶
Index ¶
- func IndexName(schema string, tableName string, suffix string) string
- func IsPostgresIdentifier(identifier string) bool
- func ModulatedHash(input string, space int) int
- func RunBucketMaintenanceLoop(opts BucketMaintenanceOptions)
- func SchemaName(schema string) string
- func TableName(schema string, tableName string) string
- type Bucket
- type BucketIndexSelector
- type BucketMaintenanceOptions
- type CachingRepository
- func (r *CachingRepository) CreateOrg(orgName string) error
- func (r *CachingRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
- func (r *CachingRepository) GetOrg(name string) (*Org, error)
- func (r *CachingRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)
- func (r *CachingRepository) ListOrgs(pattern string) ([]*Org, error)
- func (r *CachingRepository) ListOrgsByName(names []string) ([]*Org, error)
- func (r *CachingRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error
- type Config
- type CopyOrgRequest
- type MoveOrgRequest
- type Org
- type PartitionedRepository
- type PostgresqlPartitionedRepository
- func (r *PostgresqlPartitionedRepository) AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, ...) ([]any, error)
- func (r *PostgresqlPartitionedRepository) CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, ...) error
- func (r *PostgresqlPartitionedRepository) CreateOrg(orgName string) error
- func (r *PostgresqlPartitionedRepository) EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error
- func (r *PostgresqlPartitionedRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
- func (r *PostgresqlPartitionedRepository) GetBucket(id int) (*Bucket, error)
- func (r *PostgresqlPartitionedRepository) GetOrg(name string) (*Org, error)
- func (r *PostgresqlPartitionedRepository) Initialize(connection database.Connection) error
- func (r *PostgresqlPartitionedRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)
- func (r *PostgresqlPartitionedRepository) ListBuckets() ([]*Bucket, error)
- func (r *PostgresqlPartitionedRepository) ListOrgs(pattern string) ([]*Org, error)
- func (r *PostgresqlPartitionedRepository) ListOrgsByName(names []string) ([]*Org, error)
- func (r *PostgresqlPartitionedRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error
- func (r *PostgresqlPartitionedRepository) ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error
- type SchemaHandler
- type ScrubOrgRequest
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsPostgresIdentifier ¶
IsPostgresIdentifier checks if a string is a valid PostgreSQL identifier
func ModulatedHash ¶
ModulatedHash converts the input string to a number in the half-closed interval (0, space]
func RunBucketMaintenanceLoop ¶
func RunBucketMaintenanceLoop(opts BucketMaintenanceOptions)
RunBucketMaintenanceLoop continuously ensures bucket partitions are prepared ahead of time. This function runs indefinitely and should be called in a goroutine. Only the leader replica will perform maintenance work; followers will sleep and check periodically.
func SchemaName ¶
SchemaName sanitizes a schema name by replacing hyphens with underscores
Types ¶
type Bucket ¶
type Bucket struct {
Id int `json:"id"`
PartitionsEnding time.Time `json:"partitionsEnding" gorm:"type:timestamptz"`
Created time.Time `json:"created" gorm:"type:timestamptz;default:now()"`
Orgs []*Org `json:"orgs,omitempty"`
}
Bucket represents a partition bucket that contains multiple organizations
func (*Bucket) CheckpointName ¶
func (*Bucket) SchemaName ¶
type BucketIndexSelector ¶
BucketIndexSelector selects the target bucket ID (1..bucketCount) for a given org when creating/assigning it. Implementations must return a value in the inclusive range [1, bucketCount].
type BucketMaintenanceOptions ¶
type BucketMaintenanceOptions struct {
Repository PartitionedRepository
LeaderElectorClass string
MaintenanceFrequency time.Duration
PartitionPreparationThreshold time.Duration
}
BucketMaintenanceOptions configures the bucket maintenance loop
type CachingRepository ¶
type CachingRepository struct {
PartitionedRepository
// contains filtered or unexported fields
}
CachingRepository is a wrapper that adds CollectionCache-based caching to any PartitionedRepository implementation.
func (*CachingRepository) CreateOrg ¶
func (r *CachingRepository) CreateOrg(orgName string) error
CreateOrg checks cache for existence to short-circuit, delegates creation, then updates cache.
func (*CachingRepository) FilterOrgs ¶
func (r *CachingRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
FilterOrgs delegates to inner by default.
func (*CachingRepository) GetOrg ¶
func (r *CachingRepository) GetOrg(name string) (*Org, error)
GetOrg attempts cache, falls back to delegate, and then writes the single org to cache.
func (*CachingRepository) ListBucketOrgs ¶
func (r *CachingRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)
ListBucketOrgs delegates (no collection-level cache here because it's already part of the full orgs cache)
func (*CachingRepository) ListOrgs ¶
func (r *CachingRepository) ListOrgs(pattern string) ([]*Org, error)
ListOrgs serves from cache when valid; otherwise loads from delegate, fills cache, marks valid, then filters.
func (*CachingRepository) ListOrgsByName ¶
func (r *CachingRepository) ListOrgsByName(names []string) ([]*Org, error)
ListOrgsByName reuses ListOrgs with a compiled regex pattern.
func (*CachingRepository) MoveOrgToBucket ¶
func (r *CachingRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error
MoveOrgToBucket clears cache before delegating to ensure consistency.
type CopyOrgRequest ¶
type CopyOrgRequest struct {
SourceBucket int `json:"sourceBucket"`
DestinationBucket int `json:"destinationBucket"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
}
CopyOrgRequest contains parameters for copying organization data between buckets
type MoveOrgRequest ¶
type MoveOrgRequest struct {
SourceBucket int `json:"sourceBucket"`
DestinationBucket int `json:"destinationBucket"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
}
MoveOrgRequest contains parameters for moving an organization to a different bucket
type Org ¶
type Org struct {
Name string `json:"name" gorm:"primaryKey"`
Created time.Time `json:"created" gorm:"type:timestamptz;default:now()"`
BucketId int `json:"bucketId"`
Bucket *Bucket `json:"bucket,omitempty"`
}
Org represents an organization that is assigned to a bucket for data partitioning
type PartitionedRepository ¶
type PartitionedRepository interface {
checkpoints.CheckpointRepository
database.Repository
// Org Management
CreateOrg(org string) error
GetOrg(name string) (*Org, error)
ListOrgs(filter string) ([]*Org, error)
FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
ListBucketOrgs(bucket *Bucket) ([]*Org, error)
ListOrgsByName(names []string) ([]*Org, error)
// Bucket Management
GetBucket(id int) (*Bucket, error)
ListBuckets() ([]*Bucket, error)
EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error
MoveOrgToBucket(org *Org, bucket *Bucket) error
// Data Operations
CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error
ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error
AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, orgFilter string) ([]any, error)
}
PartitionedRepository is a generalized interface for managing partitioned data across organization buckets. It provides methods for org/bucket management, and schema-agnostic operations.
func NewCachingRepository ¶
func NewCachingRepository(inner PartitionedRepository, collectionCache cache.CollectionCache[Org], orgCacheTtl time.Duration) PartitionedRepository
NewCachingRepository wraps an existing PartitionedRepository with caching behavior for org collection operations. If collectionCache is nil, the wrapper behaves as a transparent pass-through.
func NewPostgresqlPartitionedRepository ¶
func NewPostgresqlPartitionedRepository(config Config, schemaHandler SchemaHandler, indexSelector BucketIndexSelector) PartitionedRepository
NewPostgresqlPartitionedRepository creates a new repository with a schema handler
type PostgresqlPartitionedRepository ¶
type PostgresqlPartitionedRepository struct {
checkpoints.CheckpointRepository
// contains filtered or unexported fields
}
PostgresqlPartitionedRepository is a generic repository for managing bucket-partitioned data
func (*PostgresqlPartitionedRepository) AuditBucket ¶
func (r *PostgresqlPartitionedRepository) AuditBucket(startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket, orgFilter string) ([]any, error)
AuditBucket audits differences between two buckets
func (*PostgresqlPartitionedRepository) CopyOrgData ¶
func (r *PostgresqlPartitionedRepository) CopyOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error
CopyOrgData delegates to SchemaHandler to copy org data between buckets
func (*PostgresqlPartitionedRepository) CreateOrg ¶
func (r *PostgresqlPartitionedRepository) CreateOrg(orgName string) error
CreateOrg creates a new organization and assigns it to a bucket
func (*PostgresqlPartitionedRepository) EnsureBucketPartitions ¶
func (r *PostgresqlPartitionedRepository) EnsureBucketPartitions(b *Bucket, startTime time.Time, years int) error
EnsureBucketPartitions ensures that partitions exist for a bucket
func (*PostgresqlPartitionedRepository) FilterOrgs ¶
func (r *PostgresqlPartitionedRepository) FilterOrgs(orgs []*Org, filter string) ([]*Org, error)
FilterOrgs filters a list of organizations by a pattern
func (*PostgresqlPartitionedRepository) GetBucket ¶
func (r *PostgresqlPartitionedRepository) GetBucket(id int) (*Bucket, error)
GetBucket retrieves a bucket by ID
func (*PostgresqlPartitionedRepository) GetOrg ¶
func (r *PostgresqlPartitionedRepository) GetOrg(name string) (*Org, error)
GetOrg retrieves an organization by name
func (*PostgresqlPartitionedRepository) Initialize ¶
func (r *PostgresqlPartitionedRepository) Initialize(connection database.Connection) error
func (*PostgresqlPartitionedRepository) ListBucketOrgs ¶
func (r *PostgresqlPartitionedRepository) ListBucketOrgs(bucket *Bucket) ([]*Org, error)
ListBucketOrgs lists all organizations in a specific bucket
func (*PostgresqlPartitionedRepository) ListBuckets ¶
func (r *PostgresqlPartitionedRepository) ListBuckets() ([]*Bucket, error)
ListBuckets lists all buckets
func (*PostgresqlPartitionedRepository) ListOrgs ¶
func (r *PostgresqlPartitionedRepository) ListOrgs(pattern string) ([]*Org, error)
ListOrgs lists all organizations matching a filter pattern
func (*PostgresqlPartitionedRepository) ListOrgsByName ¶
func (r *PostgresqlPartitionedRepository) ListOrgsByName(names []string) ([]*Org, error)
ListOrgsByName lists organizations by their names
func (*PostgresqlPartitionedRepository) MoveOrgToBucket ¶
func (r *PostgresqlPartitionedRepository) MoveOrgToBucket(org *Org, bucket *Bucket) error
MoveOrgToBucket moves an organization to a different bucket
func (*PostgresqlPartitionedRepository) ScrubOrgData ¶
func (r *PostgresqlPartitionedRepository) ScrubOrgData(org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket) error
ScrubOrgData delegates to SchemaHandler to scrub org data from a bucket
type SchemaHandler ¶
type SchemaHandler interface {
// EnsureSchema creates all necessary tables, indexes, and schema structures for a bucket
EnsureSchema(db *gorm.DB, bucket *Bucket) error
// EnsurePartitions creates time-based partitions for a bucket's tables
EnsurePartitions(db *gorm.DB, bucket *Bucket, startTime time.Time, years int) (nextTime time.Time, err error)
// CopyOrgData copies all data for an organization from one bucket to another within a time range
CopyOrgData(db *gorm.DB, org *Org, startTime time.Time, endTime time.Time, oldBucket *Bucket, newBucket *Bucket) error
// ScrubOrgData deletes all data for an organization from a bucket within a time range
ScrubOrgData(db *gorm.DB, org *Org, startTime time.Time, endTime time.Time, bucket *Bucket) error
// AuditBuckets executes a query to audit differences between two buckets and returns data-model-specific results
AuditBuckets(db *gorm.DB, oldBucket *Bucket, newBucket *Bucket, startTime time.Time, endTime time.Time, orgNames []string) ([]any, error)
}
SchemaHandler defines the interface for handling schema-specific operations Implementations of this interface handle creating the database schema structure, copying and scrubbing org data, and managing partitions.