Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL/CockroachDB implementation of the db.DB interface.
Index ¶
- func NewPostgres(cfg Config) (db.DB, error)
- type Config
- type Postgres
- func (p *Postgres) AddChunkReplica(ctx context.Context, chunkID, serverID, backendID string) error
- func (p *Postgres) Close() error
- func (p *Postgres) DecrementChunkRefCount(ctx context.Context, chunkID string) error
- func (p *Postgres) DecrementChunkRefCountBatch(ctx context.Context, chunkIDs []string) error
- func (p *Postgres) DeleteBucketLogging(ctx context.Context, bucket string) error
- func (p *Postgres) DeleteChunkRegistry(ctx context.Context, chunkID string) error
- func (p *Postgres) DeleteFederationConfig(ctx context.Context, bucket string) error
- func (p *Postgres) DeleteNotificationConfiguration(ctx context.Context, bucket string) error
- func (p *Postgres) DeleteOwnershipControls(ctx context.Context, bucket string) error
- func (p *Postgres) DeletePublicAccessBlock(ctx context.Context, bucket string) error
- func (p *Postgres) GetBucketLogging(ctx context.Context, bucket string) (*db.BucketLoggingConfig, error)
- func (p *Postgres) GetBucketsNeedingScan(ctx context.Context, minAge time.Duration, limit int) ([]string, error)
- func (p *Postgres) GetChunkRefCount(ctx context.Context, chunkID string) (int, error)
- func (p *Postgres) GetChunkReplicas(ctx context.Context, chunkID string) ([]db.ReplicaInfo, error)
- func (p *Postgres) GetChunksByServer(ctx context.Context, serverID string) ([]string, error)
- func (p *Postgres) GetFederatedBucketsNeedingSync(ctx context.Context, limit int) ([]*s3types.FederationConfig, error)
- func (p *Postgres) GetFederationConfig(ctx context.Context, bucket string) (*s3types.FederationConfig, error)
- func (p *Postgres) GetNotificationConfiguration(ctx context.Context, bucket string) (*s3types.NotificationConfiguration, error)
- func (p *Postgres) GetObjectLegalHold(ctx context.Context, bucket, key string) (*s3types.ObjectLockLegalHold, error)
- func (p *Postgres) GetObjectLockConfiguration(ctx context.Context, bucket string) (*s3types.ObjectLockConfiguration, error)
- func (p *Postgres) GetObjectRetention(ctx context.Context, bucket, key string) (*s3types.ObjectLockRetention, error)
- func (p *Postgres) GetOwnershipControls(ctx context.Context, bucket string) (*s3types.OwnershipControls, error)
- func (p *Postgres) GetPublicAccessBlock(ctx context.Context, bucket string) (*s3types.PublicAccessBlockConfig, error)
- func (p *Postgres) GetScanState(ctx context.Context, bucket string) (*db.LifecycleScanState, error)
- func (p *Postgres) GetZeroRefChunks(ctx context.Context, olderThan time.Time, limit int) ([]db.ZeroRefChunk, error)
- func (p *Postgres) IncrementChunkRefCount(ctx context.Context, chunkID string, size int64) error
- func (p *Postgres) IncrementChunkRefCountBatch(ctx context.Context, chunks []db.ChunkInfo) error
- func (p *Postgres) ListBucketsWithLifecycle(ctx context.Context) ([]string, error)
- func (p *Postgres) ListFederatedBuckets(ctx context.Context) ([]*s3types.FederationConfig, error)
- func (p *Postgres) ListLoggingConfigs(ctx context.Context) ([]*db.BucketLoggingConfig, error)
- func (p *Postgres) Migrate(ctx context.Context) error
- func (p *Postgres) PutObject(ctx context.Context, obj *types.ObjectRef) error
- func (p *Postgres) RemoveChunkReplica(ctx context.Context, chunkID, serverID string) error
- func (p *Postgres) ResetScanState(ctx context.Context, bucket string) error
- func (p *Postgres) SetBucketLogging(ctx context.Context, config *db.BucketLoggingConfig) error
- func (p *Postgres) SetDualWriteEnabled(ctx context.Context, bucket string, enabled bool) error
- func (p *Postgres) SetFederationConfig(ctx context.Context, config *s3types.FederationConfig) error
- func (p *Postgres) SetMigrationPaused(ctx context.Context, bucket string, paused bool) error
- func (p *Postgres) SetNotificationConfiguration(ctx context.Context, bucket string, config *s3types.NotificationConfiguration) error
- func (p *Postgres) SetObjectLegalHold(ctx context.Context, bucket, key string, ...) error
- func (p *Postgres) SetObjectLockConfiguration(ctx context.Context, bucket string, config *s3types.ObjectLockConfiguration) error
- func (p *Postgres) SetObjectRetention(ctx context.Context, bucket, key string, ...) error
- func (p *Postgres) SetOwnershipControls(ctx context.Context, bucket string, controls *s3types.OwnershipControls) error
- func (p *Postgres) SetPublicAccessBlock(ctx context.Context, bucket string, config *s3types.PublicAccessBlockConfig) error
- func (p *Postgres) SqlDB() *sql.DB
- func (p *Postgres) UpdateMigrationProgress(ctx context.Context, bucket string, objectsSynced, bytesSynced int64, ...) error
- func (p *Postgres) UpdateScanState(ctx context.Context, state *db.LifecycleScanState) error
- func (p *Postgres) UsageStore() usage.Store
- func (p *Postgres) WithTx(ctx context.Context, fn func(tx db.TxStore) error) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct {
// DSN is the data source name (e.g., "postgres://user:pass@host:port/database?sslmode=disable")
DSN string
// Driver is the specific driver type (postgres or cockroachdb)
Driver db.Driver
// Connection pool settings
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
Config holds PostgreSQL connection configuration
type Postgres ¶
type Postgres struct {
*dbsql.Store // Embedded for shared object operations
// contains filtered or unexported fields
}
Postgres implements db.DB using PostgreSQL/CockroachDB as the backing store
func (*Postgres) AddChunkReplica ¶
AddChunkReplica records that a chunk exists on a specific file server.
func (*Postgres) DecrementChunkRefCount ¶
DecrementChunkRefCount decrements the reference count for a chunk. If ref_count reaches 0, sets zero_ref_since to current time for GC grace period.
func (*Postgres) DecrementChunkRefCountBatch ¶
DecrementChunkRefCountBatch decrements ref counts for multiple chunks atomically.
func (*Postgres) DeleteBucketLogging ¶
DeleteBucketLogging removes the logging configuration for a bucket.
func (*Postgres) DeleteChunkRegistry ¶
DeleteChunkRegistry removes a chunk from the registry. The chunk_replicas entries are automatically deleted via CASCADE.
func (*Postgres) DeleteFederationConfig ¶
DeleteFederationConfig removes the federation config for a bucket.
func (*Postgres) DeleteNotificationConfiguration ¶
DeleteNotificationConfiguration removes the notification config for a bucket.
func (*Postgres) DeleteOwnershipControls ¶
func (*Postgres) DeletePublicAccessBlock ¶
func (*Postgres) GetBucketLogging ¶
func (p *Postgres) GetBucketLogging(ctx context.Context, bucket string) (*db.BucketLoggingConfig, error)
GetBucketLogging retrieves the logging configuration for a bucket.
func (*Postgres) GetBucketsNeedingScan ¶
func (*Postgres) GetChunkRefCount ¶
GetChunkRefCount returns the current reference count for a chunk. Used by GC to re-check ref count within a transaction before deleting.
func (*Postgres) GetChunkReplicas ¶
GetChunkReplicas returns all replica locations for a chunk.
func (*Postgres) GetChunksByServer ¶
GetChunksByServer returns all chunk IDs stored on a specific server. Used for server decommissioning and rebalancing.
func (*Postgres) GetFederatedBucketsNeedingSync ¶
func (p *Postgres) GetFederatedBucketsNeedingSync(ctx context.Context, limit int) ([]*s3types.FederationConfig, error)
GetFederatedBucketsNeedingSync returns buckets that are in migrating mode and not paused. Note: The bucket mode is stored in the Manager's Raft state, not in the metadata DB. This method returns all non-paused federation configs; the caller should check bucket mode.
func (*Postgres) GetFederationConfig ¶
func (p *Postgres) GetFederationConfig(ctx context.Context, bucket string) (*s3types.FederationConfig, error)
GetFederationConfig retrieves the federation config for a bucket.
func (*Postgres) GetNotificationConfiguration ¶
func (p *Postgres) GetNotificationConfiguration(ctx context.Context, bucket string) (*s3types.NotificationConfiguration, error)
GetNotificationConfiguration retrieves the notification config for a bucket.
func (*Postgres) GetObjectLegalHold ¶
func (*Postgres) GetObjectLockConfiguration ¶
func (*Postgres) GetObjectRetention ¶
func (*Postgres) GetOwnershipControls ¶
func (*Postgres) GetPublicAccessBlock ¶
func (*Postgres) GetScanState ¶
func (*Postgres) GetZeroRefChunks ¶
func (p *Postgres) GetZeroRefChunks(ctx context.Context, olderThan time.Time, limit int) ([]db.ZeroRefChunk, error)
GetZeroRefChunks returns chunks with ref_count=0 where zero_ref_since is older than the given time (past the grace period). Includes replica info for deletion.
func (*Postgres) IncrementChunkRefCount ¶
IncrementChunkRefCount increments the reference count for a chunk. If the chunk doesn't exist, it creates it with ref_count=1. Clears zero_ref_since if the chunk was previously at ref_count=0.
func (*Postgres) IncrementChunkRefCountBatch ¶
IncrementChunkRefCountBatch increments ref counts for multiple chunks atomically.
func (*Postgres) ListBucketsWithLifecycle ¶
func (*Postgres) ListFederatedBuckets ¶
ListFederatedBuckets returns all buckets with federation configurations.
func (*Postgres) ListLoggingConfigs ¶
ListLoggingConfigs returns all bucket logging configurations.
func (*Postgres) PutObject ¶
PutObject creates or updates an object within a transaction. This method uses SELECT FOR UPDATE to serialize concurrent writes to the same bucket/key, preventing races where two concurrent calls both set is_latest=TRUE.
func (*Postgres) RemoveChunkReplica ¶
RemoveChunkReplica removes the record of a chunk on a file server.
func (*Postgres) ResetScanState ¶
func (*Postgres) SetBucketLogging ¶
SetBucketLogging stores the logging configuration for a bucket.
func (*Postgres) SetDualWriteEnabled ¶
SetDualWriteEnabled sets the dual_write_enabled flag for a bucket.
func (*Postgres) SetFederationConfig ¶
SetFederationConfig stores or updates the federation config for a bucket.
func (*Postgres) SetMigrationPaused ¶
SetMigrationPaused sets the migration_paused flag for a bucket.
func (*Postgres) SetNotificationConfiguration ¶
func (p *Postgres) SetNotificationConfiguration(ctx context.Context, bucket string, config *s3types.NotificationConfiguration) error
SetNotificationConfiguration stores the notification config for a bucket.
func (*Postgres) SetObjectLegalHold ¶
func (*Postgres) SetObjectLockConfiguration ¶
func (*Postgres) SetObjectRetention ¶
func (*Postgres) SetOwnershipControls ¶
func (*Postgres) SetPublicAccessBlock ¶
func (*Postgres) UpdateMigrationProgress ¶
func (p *Postgres) UpdateMigrationProgress(ctx context.Context, bucket string, objectsSynced, bytesSynced int64, lastSyncKey string) error
UpdateMigrationProgress updates the migration progress counters for a bucket.
func (*Postgres) UpdateScanState ¶
func (*Postgres) UsageStore ¶
UsageStore returns a usage.Store backed by PostgreSQL.