Documentation
¶
Index ¶
- Constants
- Variables
- func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (replicate.ReplicaClient, error)
- type Leaser
- func (l *Leaser) AcquireLease(ctx context.Context) (*replicate.Lease, error)
- func (l *Leaser) MaxLeaseTTL() time.Duration
- func (l *Leaser) ReleaseLease(ctx context.Context, lease *replicate.Lease) error
- func (l *Leaser) RenewLease(ctx context.Context, lease *replicate.Lease) (*replicate.Lease, error)
- func (l *Leaser) SetLogger(logger *slog.Logger)
- func (l *Leaser) Type() string
- type ReplicaClient
- func (c *ReplicaClient) DeleteAll(ctx context.Context) error
- func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error
- func (c *ReplicaClient) GenerationsV3(ctx context.Context) ([]string, error)
- func (c *ReplicaClient) Init(ctx context.Context) error
- func (c *ReplicaClient) LTXFilePath(level int, minTXID, maxTXID ltx.TXID) string
- func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
- func (c *ReplicaClient) LTXLevelDir(level int) string
- func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
- func (c *ReplicaClient) OpenSnapshotV3(ctx context.Context, generation string, index int) (io.ReadCloser, error)
- func (c *ReplicaClient) OpenWALSegmentV3(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error)
- func (c *ReplicaClient) Path() string
- func (c *ReplicaClient) SetLogger(logger *slog.Logger)
- func (c *ReplicaClient) SnapshotsV3(ctx context.Context, generation string) ([]replicate.SnapshotInfoV3, error)
- func (c *ReplicaClient) Type() string
- func (c *ReplicaClient) WALSegmentsV3(ctx context.Context, generation string) ([]replicate.WALSegmentInfoV3, error)
- func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, rd io.Reader) (info *ltx.FileInfo, err error)
Constants ¶
const ( DefaultLeaseTTL = 30 * time.Second DefaultLeaseMaxTTL = 30 * time.Second DefaultLeaseFile = "lock.json" LeaserType = "file" )
const ReplicaClientType = "file"
ReplicaClientType is the client type for this package.
Variables ¶
var ( ErrLeaseRequired = errors.New("lease required") ErrLeaseETagRequired = errors.New("lease etag required") ErrLeaseAlreadyReleased = errors.New("lease already released") ErrTTLExceedsMax = errors.New("requested lease TTL exceeds MaxTTL") ErrLeaseCorrupt = errors.New("on-disk lease ExpiresAt exceeds MaxTTL — refusing to honor") )
Functions ¶
func NewReplicaClientFromURL ¶
func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (replicate.ReplicaClient, error)
NewReplicaClientFromURL creates a new ReplicaClient from URL components. This is used by the replica client factory registration.
Types ¶
type Leaser ¶ added in v0.7.3
type Leaser struct {
Dir string // directory containing the lock file
Path string // relative path under Dir; empty means DefaultLeaseFile
TTL time.Duration // lease TTL on Acquire/Renew
MaxTTL time.Duration // server-side ceiling; zero means DefaultLeaseMaxTTL
Owner string // string written to the lease body for forensics
// contains filtered or unexported fields
}
Leaser implements replicate.Leaser using POSIX filesystem semantics: O_CREAT|O_EXCL for initial creation, content-hash ETag matching for updates, atomic rename for cutover. Mirrors the S3 leaser's CAS contract so callers can swap backends without code changes.
Production deployments MUST set MaxTTL to a value commensurate with their heartbeat interval (≥ 2 × heartbeat tick is the standard). A buggy or malicious caller that requests TTL > MaxTTL is rejected with ErrTTLExceedsMax — without this, one bad pod can pin the lease for an arbitrary duration regardless of liveness. MaxTTL also bounds the read side: any on-disk ExpiresAt > now + MaxTTL is treated as corrupt/malicious and rejected (defense in depth against a writer that bypassed this API).
func NewLeaser ¶ added in v0.7.3
NewLeaser constructs a file-backed lease coordinator rooted at dir. Lease state is a single JSON file at dir/path/lock.json. Concurrent processes coordinate via filesystem CAS — O_EXCL for first writer, content-hash ETag for renew/release.
func (*Leaser) AcquireLease ¶ added in v0.7.3
func (*Leaser) MaxLeaseTTL ¶ added in v0.7.3
MaxLeaseTTL satisfies replicate.Leaser. Always non-zero — defaults to DefaultLeaseMaxTTL when MaxTTL is unset.
func (*Leaser) ReleaseLease ¶ added in v0.7.3
func (*Leaser) RenewLease ¶ added in v0.7.3
type ReplicaClient ¶
ReplicaClient is a client for writing LTX files to disk.
func NewReplicaClient ¶
func NewReplicaClient(path string) *ReplicaClient
NewReplicaClient returns a new instance of ReplicaClient.
func (*ReplicaClient) DeleteAll ¶
func (c *ReplicaClient) DeleteAll(ctx context.Context) error
DeleteAll deletes all LTX files.
func (*ReplicaClient) DeleteLTXFiles ¶
DeleteLTXFiles deletes LTX files.
func (*ReplicaClient) GenerationsV3 ¶
func (c *ReplicaClient) GenerationsV3(ctx context.Context) ([]string, error)
GenerationsV3 returns a list of v0.3.x generation IDs in the replica.
func (*ReplicaClient) Init ¶
func (c *ReplicaClient) Init(ctx context.Context) error
Init is a no-op for file replica client as no initialization is required.
func (*ReplicaClient) LTXFilePath ¶
func (c *ReplicaClient) LTXFilePath(level int, minTXID, maxTXID ltx.TXID) string
LTXFilePath returns the path to an LTX file.
func (*ReplicaClient) LTXFiles ¶
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
LTXFiles returns an iterator over all LTX files on the replica for the given level. The useMetadata parameter is ignored for file backend as ModTime is always available from readdir.
func (*ReplicaClient) LTXLevelDir ¶
func (c *ReplicaClient) LTXLevelDir(level int) string
LTXLevelDir returns the path to a given level.
func (*ReplicaClient) OpenLTXFile ¶
func (c *ReplicaClient) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
OpenLTXFile returns a reader for an LTX file at the given position. Returns os.ErrNotExist if no matching index/offset is found.
func (*ReplicaClient) OpenSnapshotV3 ¶
func (c *ReplicaClient) OpenSnapshotV3(ctx context.Context, generation string, index int) (io.ReadCloser, error)
OpenSnapshotV3 opens a v0.3.x snapshot file for reading. The returned reader provides LZ4-decompressed data.
func (*ReplicaClient) OpenWALSegmentV3 ¶
func (c *ReplicaClient) OpenWALSegmentV3(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error)
OpenWALSegmentV3 opens a v0.3.x WAL segment file for reading. The returned reader provides LZ4-decompressed data.
func (*ReplicaClient) Path ¶
func (c *ReplicaClient) Path() string
Path returns the destination path to replicate the database to.
func (*ReplicaClient) SetLogger ¶
func (c *ReplicaClient) SetLogger(logger *slog.Logger)
func (*ReplicaClient) SnapshotsV3 ¶
func (c *ReplicaClient) SnapshotsV3(ctx context.Context, generation string) ([]replicate.SnapshotInfoV3, error)
SnapshotsV3 returns snapshots for a generation, sorted by index.
func (*ReplicaClient) Type ¶
func (c *ReplicaClient) Type() string
Type returns "file" as the client type.
func (*ReplicaClient) WALSegmentsV3 ¶
func (c *ReplicaClient) WALSegmentsV3(ctx context.Context, generation string) ([]replicate.WALSegmentInfoV3, error)
WALSegmentsV3 returns WAL segments for a generation, sorted by index then offset.
func (*ReplicaClient) WriteLTXFile ¶
func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, rd io.Reader) (info *ltx.FileInfo, err error)
WriteLTXFile writes an LTX file to the replica. Extracts timestamp from LTX header and sets it as the file's ModTime to preserve original creation time.