Documentation
¶
Index ¶
- Constants
- Variables
- func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (replicate.ReplicaClient, error)
- func ParseHost(host string) (bucket, region, endpoint string, forcePathStyle bool)
- func ParseURL(s, endpoint string) (bucket, region, key string, err error)
- type Leaser
- func (l *Leaser) AcquireLease(ctx context.Context) (*replicate.Lease, error)
- func (l *Leaser) Client() S3API
- func (l *Leaser) ReleaseLease(ctx context.Context, lease *replicate.Lease) error
- func (l *Leaser) RenewLease(ctx context.Context, lease *replicate.Lease) (*replicate.Lease, error)
- func (l *Leaser) SetClient(client S3API)
- func (l *Leaser) SetLogger(logger *slog.Logger)
- func (l *Leaser) Type() string
- type ReplicaClient
- func (c *ReplicaClient) DeleteAll(ctx context.Context) error
- func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error
- func (c *ReplicaClient) GenerationsV3(ctx context.Context) ([]string, error)
- func (c *ReplicaClient) Init(ctx context.Context) (err error)
- func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
- func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
- func (c *ReplicaClient) OpenSnapshotV3(ctx context.Context, generation string, index int) (io.ReadCloser, error)
- func (c *ReplicaClient) OpenWALSegmentV3(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error)
- func (c *ReplicaClient) SetLogger(logger *slog.Logger)
- func (c *ReplicaClient) SnapshotsV3(ctx context.Context, generation string) ([]replicate.SnapshotInfoV3, error)
- func (c *ReplicaClient) Type() string
- func (c *ReplicaClient) WALSegmentsV3(ctx context.Context, generation string) ([]replicate.WALSegmentInfoV3, error)
- func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
- type S3API
Constants ¶
const ( DefaultLeaseTTL = 30 * time.Second DefaultLeasePath = "lock.json" LeaserType = "s3" )
const DefaultMetadataConcurrency = 50
DefaultMetadataConcurrency is the default number of concurrent HeadObject calls for fetching accurate timestamps during timestamp-based restore. S3 can handle 5,500+ HEAD requests per second per prefix.
const DefaultR2Concurrency = 2
DefaultR2Concurrency is the default number of concurrent multipart upload parts for Cloudflare R2, which has strict concurrent upload limits.
const DefaultRegion = "us-east-1"
DefaultRegion is the region used if one is not specified.
const MaxKeys = 1000
MaxKeys is the number of keys S3 can operate on per batch.
const MetadataKeyTimestamp = "replicate-timestamp"
MetadataKeyTimestamp is the metadata key for storing LTX file timestamps in S3.
const ReplicaClientType = "s3"
ReplicaClientType is the client type for this package.
Variables ¶
Functions ¶
func NewReplicaClientFromURL ¶
func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (replicate.ReplicaClient, error)
NewReplicaClientFromURL creates a new ReplicaClient from URL components. This is used by the replica client factory registration.
func ParseHost ¶
ParseHost parses the host/endpoint for an S3-like storage system. Endpoints: https://docs.aws.amazon.com/general/latest/gr/s3.html
Types ¶
type Leaser ¶
type Leaser struct {
Bucket string
Path string
TTL time.Duration
Owner string
// contains filtered or unexported fields
}
func (*Leaser) AcquireLease ¶
func (*Leaser) ReleaseLease ¶
func (*Leaser) RenewLease ¶
type ReplicaClient ¶
type ReplicaClient struct {
// AWS authentication keys.
AccessKeyID string
SecretAccessKey string
// S3 bucket information
Region string
Bucket string
Path string
Endpoint string
ForcePathStyle bool
SkipVerify bool
SignPayload bool
RequireContentMD5 bool
// Upload configuration
PartSize int64 // Part size for multipart uploads (default: 5MB)
Concurrency int // Number of concurrent parts to upload (default: 5)
// MetadataConcurrency controls parallel HeadObject calls for timestamp-based restore.
// Higher values improve restore speed for large backup histories.
// Default: 50 (S3 can handle 5,500+ HEAD/s per prefix)
MetadataConcurrency int
// Server-Side Encryption - Customer Provided Keys (SSE-C)
// Works with all S3-compatible providers (AWS, MinIO, Exoscale, etc.)
SSECustomerAlgorithm string // Must be "AES256" if set
SSECustomerKey string // Base64-encoded 256-bit (32 byte) encryption key
SSECustomerKeyMD5 string // Base64-encoded MD5 of key (auto-computed if not set)
// Server-Side Encryption - AWS KMS (SSE-KMS)
// Only works with AWS S3 (not S3-compatible providers)
SSEKMSKeyID string // KMS key ID, ARN, or alias
// contains filtered or unexported fields
}
ReplicaClient is a client for writing LTX files to S3.
func NewReplicaClient ¶
func NewReplicaClient() *ReplicaClient
NewReplicaClient returns a new instance of ReplicaClient.
func (*ReplicaClient) DeleteAll ¶
func (c *ReplicaClient) DeleteAll(ctx context.Context) error
DeleteAll deletes all files.
func (*ReplicaClient) DeleteLTXFiles ¶
DeleteLTXFiles deletes one or more LTX files.
func (*ReplicaClient) GenerationsV3 ¶
func (c *ReplicaClient) GenerationsV3(ctx context.Context) ([]string, error)
GenerationsV3 returns a list of v0.3.x generation IDs in the replica.
func (*ReplicaClient) Init ¶
func (c *ReplicaClient) Init(ctx context.Context) (err error)
Init initializes the connection to S3. No-op if already initialized.
func (*ReplicaClient) LTXFiles ¶
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
LTXFiles returns an iterator over all LTX files on the replica for the given level. When useMetadata is true, fetches accurate timestamps from S3 metadata via HeadObject. This uses parallel batched requests (controlled by MetadataConcurrency) to avoid hangs with large backup histories (see issue #930). When false, uses fast LastModified timestamps from LIST operation.
func (*ReplicaClient) OpenLTXFile ¶
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
OpenLTXFile returns a reader for an LTX file Returns os.ErrNotExist if no matching index/offset is found.
func (*ReplicaClient) OpenSnapshotV3 ¶
func (c *ReplicaClient) OpenSnapshotV3(ctx context.Context, generation string, index int) (io.ReadCloser, error)
OpenSnapshotV3 opens a v0.3.x snapshot file for reading. The returned reader provides LZ4-decompressed data.
func (*ReplicaClient) OpenWALSegmentV3 ¶
func (c *ReplicaClient) OpenWALSegmentV3(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error)
OpenWALSegmentV3 opens a v0.3.x WAL segment file for reading. The returned reader provides LZ4-decompressed data.
func (*ReplicaClient) SetLogger ¶
func (c *ReplicaClient) SetLogger(logger *slog.Logger)
func (*ReplicaClient) SnapshotsV3 ¶
func (c *ReplicaClient) SnapshotsV3(ctx context.Context, generation string) ([]replicate.SnapshotInfoV3, error)
SnapshotsV3 returns snapshots for a generation, sorted by index.
func (*ReplicaClient) Type ¶
func (c *ReplicaClient) Type() string
Type returns "s3" as the client type.
func (*ReplicaClient) WALSegmentsV3 ¶
func (c *ReplicaClient) WALSegmentsV3(ctx context.Context, generation string) ([]replicate.WALSegmentInfoV3, error)
WALSegmentsV3 returns WAL segments for a generation, sorted by index then offset.
func (*ReplicaClient) WriteLTXFile ¶
func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
WriteLTXFile writes an LTX file to the replica. Extracts timestamp from LTX header and stores it in S3 metadata to preserve original creation time.
type S3API ¶
type S3API interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
}
S3API is the interface for S3 operations needed by Leaser.