local_cache

package
v0.0.0-...-1675cc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: Apache-2.0 Imports: 74 Imported by: 0

Documentation

Overview

persistent_cache.go implements PersistentCache, the main entry point for the local-cache subsystem. It coordinates:

  • Object resolution and download orchestration (resolveObject, downloadObject, performDownload).
  • Cache-Control / ETag-based revalidation (revalidateObject).
  • Inline (small-file) vs disk (large-file) storage decisions via decisionWriter.
  • No-store passthrough streaming when the origin forbids caching.
  • Namespace ID assignment and federation token management.
  • Background configuration updates from the director.

Supporting types live in other files: CacheDB (database.go), StorageManager / BlockWriter (storage.go), EvictionManager (eviction.go), ConsistencyChecker (consistency.go), and BlockFetcherV2 (block_fetcher.go).

Index

Constants

View Source
const (
	// ChunkSize is the notification granularity - notify waiters every 128KB
	ChunkSize = 128 * 1024

	// DefaultPrefetchTimeout is the default time after which a prefetch with no active
	// clients will be cancelled
	DefaultPrefetchTimeout = 20 * time.Second

	// PrefetchSemaphoreReleaseInterval is how often to release and reacquire the prefetch semaphore
	PrefetchSemaphoreReleaseInterval = 5 * time.Second

	// ETAUpdateInterval is how often to update the ETA estimate
	ETAUpdateInterval = 250 * time.Millisecond

	// DefaultInitialRate is the initial assumed download rate (1 MB/s)
	DefaultInitialRate = 1024 * 1024

	// ETAStaleThreshold is how long an ETA can be late before waiters give up
	ETAStaleThreshold = 5 * time.Second
)
View Source
const (
	// PrefixMeta stores CacheMetadata (headers, validation info, storage mode)
	PrefixMeta = "m:"
	// PrefixState stores Roaring Bitmap tracking downloaded blocks
	PrefixState = "s:"
	// PrefixInline stores encrypted inline data for small objects (< 4KB)
	PrefixInline = "d:"
	// PrefixLRU stores sorted index for eviction candidates: l:<storage_id>:<namespace_id>:<ts>:<instance_hash>
	PrefixLRU = "l:"
	// PrefixUsage stores total bytes used per storage+namespace: u:<storage_id>:<namespace_id>
	PrefixUsage = "u:"
	// PrefixDiskMap stores the mapping of disk IDs to directories
	PrefixDiskMap = "di:"
	// PrefixPurgeFirst stores instance hashes marked for priority eviction
	PrefixPurgeFirst = "pf:"
	// PrefixETag stores the latest ETag for an object: e:<object_hash> -> etag
	PrefixETag = "e:"
	// PrefixNamespace stores namespace prefix -> ID mappings: n:<prefix> -> uint32
	PrefixNamespace = "n:"
	// KeySalt is the single DB key that stores the random salt used when
	// hashing object/instance names.  The salt prevents an attacker with
	// DB access from correlating hashes with known URLs.
	KeySalt = "_salt"
)

Key prefixes for BadgerDB

View Source
const (
	// BlockDataSize is the size of data in each encrypted block (before encryption)
	BlockDataSize = 4080
	// AuthTagSize is the size of the AES-GCM authentication tag
	AuthTagSize = 16
	// BlockTotalSize is the total size of an encrypted block on disk
	BlockTotalSize = BlockDataSize + AuthTagSize
	// InlineThreshold is the max size for inline storage (< 4KB stored in DB)
	InlineThreshold = 4096
	// NonceSize is the standard size for AES-GCM nonce
	NonceSize = 12
	// KeySize is the size for AES-256 key
	KeySize = 32
)

Block size constants for encryption and storage

View Source
const (
	CCFlagNoStore        = ccNoStore
	CCFlagNoCache        = ccNoCache
	CCFlagPrivate        = ccPrivate
	CCFlagMustRevalidate = ccMustRevalidate
)

Cache-Control flag bits — canonical definitions are in cache_control.go (ccNoStore, ccNoCache, ccPrivate, ccMustRevalidate). These aliases are kept for any callers that reference the CCFlag* names directly.

View Source
const DefaultReadBatchBlocks = 16

DefaultReadBatchBlocks is the default number of blocks to read from disk in a single ReadAt syscall before decrypting block-by-block. 16 blocks × 4096 bytes = 64 KiB.

View Source
const NoCacheGracePeriod = 5 * time.Second

NoCacheGracePeriod is the minimum interval between revalidation attempts when the origin sends Cache-Control: no-cache. Strictly speaking, no-cache means "must revalidate before each use," but revalidation in Pelican requires a full GET (no conditional-request support yet), which is expensive. A short grace period prevents a thundering-herd of redundant downloads when many clients hit the same object concurrently.

OptimalReadSize is the ideal number of plaintext bytes to request per read call. It equals DefaultReadBatchBlocks × BlockDataSize (16 × 4080 = 65 280 B). When callers align their buffer sizes to a multiple of BlockDataSize (4080), every block decrypts directly into the destination buffer (zero-copy). Misaligned sizes cause the last partial block in each batch to be decrypted into a temporary buffer and copied, adding overhead.

View Source
const (
	// SaltSize is the number of random bytes prepended to object/instance
	// names before hashing.  32 bytes (256 bits) provides a comfortable
	// security margin.
	SaltSize = 32
)

Variables

View Source
var ErrInitNoStore = errors.New("object must not be stored (no-store/no-cache/private)")

ErrInitNoStore is a sentinel error returned by initObjectFromStat when the origin's Cache-Control indicates the response must not be stored. The caller should fall back to a full streaming download via downloadObject.

View Source
var ErrNoStore = errors.New("origin response has Cache-Control: no-store")

ErrNoStore is returned when the origin sends Cache-Control: no-store (or private). The response data is streamed via persistentDownload.noStoreReader (an io.Pipe) directly to the first caller without persisting to the cache.

View Source
var ErrNoStoreRetry = errors.New("no-store download in progress; retry independently")

ErrNoStoreRetry is returned to waiters that attach to an in-flight no-store download. Because the stream can only be consumed once (by the first caller), subsequent callers must retry independently.

View Source
var ErrNotCached = errors.New("object not cached")

ErrNotCached is returned when an object is not in the cache

Functions

func BlockOffset

func BlockOffset(blockNum uint32) int64

BlockOffset returns the byte offset in the file for a given block number

func BlocksInChunk

func BlocksInChunk(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) (startBlock, endBlock uint32)

BlocksInChunk returns the range of blocks (startBlock, endBlock inclusive) that belong to a specific chunk.

func CalculateBlockCount

func CalculateBlockCount(contentLength int64) uint32

CalculateBlockCount returns the number of blocks needed for a given content length

func CalculateChunkCount

func CalculateChunkCount(contentLength int64, chunkSizeCode ChunkSizeCode) int

CalculateChunkCount returns the number of chunks needed to store an object of the given size with the specified chunk size code. Returns 1 if chunking is disabled (the entire object is one "chunk").

func CalculateFileSize

func CalculateFileSize(contentLength int64) int64

CalculateFileSize returns the exact on-disk file size for a given content length. The last block is only as large as the remaining data plus its AES-GCM authentication tag — it is not padded to a full BlockTotalSize.

func CheckCacheObjectExists

func CheckCacheObjectExists(ctx context.Context, socketPath, objectPath string) (bool, error)

CheckCacheObjectExists checks if an object exists in the PersistentCache by making a HEAD request to the cache's unix socket. Returns true if the object exists. NOTE: This will trigger a download if the object is not cached, so it always returns true unless the origin is unreachable. For testing eviction, use CheckCacheObjectIsCached instead.

func CheckCacheObjectIsCached

func CheckCacheObjectIsCached(ctx context.Context, socketPath, objectPath string) (bool, error)

CheckCacheObjectIsCached checks if an object is actually in the cache (not just accessible). This sends a special header to prevent downloading if the object is not cached. Returns true only if the object is already cached, false if it would need to be downloaded.

func ChunkContainsBlock

func ChunkContainsBlock(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int, blockNum uint32) bool

ChunkContainsBlock returns true if the specified block belongs to the given chunk.

func ChunkContentLength

func ChunkContentLength(totalContentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) int64

ChunkContentLength returns the content length for a specific chunk. This is the size of data within the chunk (not the encrypted file size).

func ChunkSizeCodeToBytes

func ChunkSizeCodeToBytes(code ChunkSizeCode) uint64

ChunkSizeCodeToBytes converts a ChunkSizeCode to the actual chunk size in bytes. Returns 0 if chunking is disabled. The returned size is always rounded down to a multiple of BlockDataSize to ensure blocks don't span chunk boundaries.

func ContentOffsetToBlock

func ContentOffsetToBlock(contentOffset int64) uint32

ContentOffsetToBlock converts a content byte offset to a block number

func ContentOffsetToChunk

func ContentOffsetToChunk(contentOffset int64, chunkSizeCode ChunkSizeCode) int

ContentOffsetToChunk returns the chunk index for a given content byte offset.

func ContentOffsetWithinBlock

func ContentOffsetWithinBlock(contentOffset int64) int

ContentOffsetWithinBlock returns the offset within a block for a content offset

func DefaultFreshness

func DefaultFreshness(lastValidated time.Time) time.Duration

DefaultFreshness returns the jittered freshness lifetime used when the origin doesn't provide explicit Cache-Control headers. The jitter is deterministic per object (seeded from lastValidated) so that repeated calls for the same object return the same duration.

func DiskMappingKey

func DiskMappingKey(storageID StorageID) []byte

DiskMappingKey returns the BadgerDB key for a disk mapping entry.

func ETagKey

func ETagKey(objectHash ObjectHash) []byte

ETagKey returns the BadgerDB key for ETag lookup Maps objectHash -> latest ETag for that object

func FormatChunkSize

func FormatChunkSize(code ChunkSizeCode) string

FormatChunkSize returns a human-readable string for a ChunkSizeCode.

func FormatContentRange

func FormatContentRange(start, end, total int64) string

FormatContentRange formats a Content-Range header value

func GetChunkFileSuffix

func GetChunkFileSuffix(chunkIndex int) string

GetChunkFileSuffix returns the file suffix for a chunk. Chunk 0 has no suffix (the base file), chunk 1 is "-2", chunk 2 is "-3", etc.

func GetChunkPath

func GetChunkPath(basePath string, chunkIndex int) string

GetChunkPath returns the full path for a chunk file. basePath is the path for chunk 0 (no suffix).

func GetChunkRange

func GetChunkRange(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) (start, end int64)

GetChunkRange returns the start and end byte offsets (inclusive) for a specific chunk. chunkIndex is 0-based.

func GetInstanceStoragePath

func GetInstanceStoragePath(hash InstanceHash) string

GetInstanceStoragePath returns the 2-level directory path for storing a file Given hash "42561abfe18be...", returns "42/56/1abfe18be..."

func InitIssuerKeyForTests

func InitIssuerKeyForTests(t testing.TB)

InitIssuerKeyForTests initializes issuer keys for testing. This must be called before creating a CacheDB or EncryptionManager. It generates a new issuer key in a temporary directory and registers a cleanup.

func InlineKey

func InlineKey(instanceHash InstanceHash) []byte

InlineKey returns the BadgerDB key for inline data

func IsRangeRequest

func IsRangeRequest(req *http.Request) bool

IsRangeRequest checks if a request contains a Range header

func LRUKey

func LRUKey(storageID StorageID, namespaceID NamespaceID, timestamp time.Time, instanceHash InstanceHash) []byte

LRUKey returns the BadgerDB key for LRU tracking Format: l:<storage_id>:<namespace_id>:<timestamp_ns>:<instance_hash>

func MetaKey

func MetaKey(instanceHash InstanceHash) []byte

MetaKey returns the BadgerDB key for metadata

func NamespaceKey

func NamespaceKey(prefix string) []byte

NamespaceKey returns the BadgerDB key for a namespace prefix mapping

func NormalizePelicanURL

func NormalizePelicanURL(urlStr string) string

NormalizePelicanURL normalizes a Pelican URL for consistent hashing. Handles pelican://, osdf://, and bare paths.

func OffsetInChunk

func OffsetInChunk(contentOffset int64, chunkSizeCode ChunkSizeCode) int64

OffsetInChunk converts an absolute content offset to an offset within a chunk. Returns the offset relative to the start of the chunk.

func ParseLRUKey

func ParseLRUKey(key []byte) (storageID StorageID, namespaceID NamespaceID, timestamp time.Time, instanceHash InstanceHash, err error)

ParseLRUKey parses an LRU key and returns storageID, namespaceID, timestamp, and instanceHash

func ParseUsageKey

func ParseUsageKey(key []byte) (storageID StorageID, namespaceID NamespaceID, err error)

ParseUsageKey extracts the storage ID and namespace ID from a usage key

func PurgeFirstKey

func PurgeFirstKey(instanceHash InstanceHash) []byte

PurgeFirstKey returns the BadgerDB key for purge first tracking

func RemainingFreshness

func RemainingFreshness(lastValidated time.Time) time.Duration

RemainingFreshness returns how much freshness lifetime is left for an object whose origin did not specify Cache-Control, clamped to zero.

func StateKey

func StateKey(instanceHash InstanceHash) []byte

StateKey returns the BadgerDB key for block state bitmap

func UsageKey

func UsageKey(storageID StorageID, namespaceID NamespaceID) []byte

UsageKey returns the BadgerDB key for namespace usage counter per storage Format: u:<storage_id>:<namespace_id>

func ValidateChunkLocations

func ValidateChunkLocations(contentLength int64, chunkSizeCode ChunkSizeCode, locations []ChunkLocation) error

ValidateChunkLocations checks that ChunkLocations has the correct number of entries. Returns nil if valid, error otherwise.

Types

type AuthorizationError

type AuthorizationError struct {
	Reason string
}

AuthorizationError wraps authorizationDenied with a human-readable reason. errors.Is(err, authorizationDenied) still works for existing checks.

func (*AuthorizationError) Error

func (e *AuthorizationError) Error() string

func (*AuthorizationError) Is

func (e *AuthorizationError) Is(target error) bool

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch allows batching multiple writes for efficiency

func (*Batch) Cancel

func (b *Batch) Cancel()

Cancel discards the batch

func (*Batch) Delete

func (b *Batch) Delete(key []byte) error

Delete adds a delete operation to the batch

func (*Batch) Flush

func (b *Batch) Flush() error

Flush commits the batch

func (*Batch) Set

func (b *Batch) Set(key, value []byte) error

Set adds a key-value pair to the batch

type BlockEncryptor

type BlockEncryptor struct {
	// contains filtered or unexported fields
}

BlockEncryptor handles encryption/decryption of individual blocks

func NewBlockEncryptor

func NewBlockEncryptor(dek, baseNonce []byte) (*BlockEncryptor, error)

NewBlockEncryptor creates a new block encryptor with the given DEK and base nonce

func (*BlockEncryptor) DecryptBlock

func (be *BlockEncryptor) DecryptBlock(blockNum uint32, encryptedBlock []byte) ([]byte, error)

DecryptBlock decrypts a block and verifies its authentication tag Input is BlockTotalSize bytes (ciphertext + auth tag) Returns the decrypted data (up to BlockDataSize bytes)

func (*BlockEncryptor) DecryptBlockTo

func (be *BlockEncryptor) DecryptBlockTo(dst []byte, blockNum uint32, encryptedBlock []byte) ([]byte, error)

DecryptBlockTo decrypts a block directly into the provided destination buffer. dst must have sufficient capacity (at least len(encryptedBlock) - AuthTagSize). The decrypted data is appended to dst and the resulting slice is returned. This avoids allocation when dst has enough capacity.

func (*BlockEncryptor) EncryptBlock

func (be *BlockEncryptor) EncryptBlock(blockNum uint32, data []byte) ([]byte, error)

EncryptBlock encrypts a block of data and returns data + auth tag The input data must be exactly BlockDataSize bytes (or less for the last block) The output is BlockTotalSize bytes (data + 16 byte auth tag)

func (*BlockEncryptor) EncryptBlockTo

func (be *BlockEncryptor) EncryptBlockTo(dst []byte, blockNum uint32, data []byte) ([]byte, error)

EncryptBlockTo encrypts a block of data directly into the provided destination buffer. dst must have sufficient capacity (at least len(data) + AuthTagSize). The encrypted data is appended to dst and the resulting slice is returned. This avoids allocation when dst has enough capacity.

type BlockFetcherV2

type BlockFetcherV2 struct {
	// contains filtered or unexported fields
}

BlockFetcherV2 handles fetching missing blocks from the origin using the Pelican transfer client. It supports: - Per-128KB notification channels for partial completion - Client registration for data requests - Prefetch with semaphore management - Configurable timeout for prefetch cancellation when no active clients

Each BlockFetcherV2 creates its own TransferClient from the shared TransferEngine. This prevents concurrent doFetch goroutines (e.g. during auto-repair) from stealing results off a shared Results() channel.

func NewBlockFetcherV2

func NewBlockFetcherV2(
	storage *StorageManager,
	instanceHash InstanceHash, originURL, token string,
	fedToken client.TokenProvider,
	te *client.TransferEngine,
	cfg BlockFetcherV2Config,
) (*BlockFetcherV2, error)

NewBlockFetcherV2 creates a new block fetcher using the Pelican transfer client. It creates its own TransferClient from the given TransferEngine to avoid sharing Results() channels with other callers.

func (*BlockFetcherV2) AdoptTransfer

func (bf *BlockFetcherV2) AdoptTransfer(
	ctx context.Context,
	tc *client.TransferClient,
	dw *decisionWriter,
	resultChan <-chan *client.TransferResults,
	egrp *errgroup.Group,
	wg *sync.WaitGroup,
	onExit func(err error),
) *fetchOperation

AdoptTransfer takes ownership of an already-in-flight full-object transfer initiated by performDownload and drives it to completion using the fetcher's existing idle-timeout and chunk-notification machinery.

Instead of creating a new BlockWriter, AdoptTransfer wraps the decisionWriter's existing BlockWriter with the fetcher's blockWriter adapter via dw.HandoffBlockWriter. All subsequent data written by the transfer engine flows through the adapter, gaining chunk notification and ETA tracking. Blocks already written before the handoff are harmlessly skipped (the shared bitmap check in writeCurrentBlock handles this).

Parameters:

  • ctx: context for the transfer (cancelled on idle or cache close)
  • tc: the TransferClient that owns the transfer (closed on exit)
  • dw: the decisionWriter whose BlockWriter will be wrapped
  • resultChan: pre-filtered channel delivering the single matching result
  • egrp: errgroup for goroutine lifecycle management (test cleanup)
  • wg: waitgroup to track goroutine completion (decremented on exit)
  • onExit: called when the adopted transfer exits for any reason (clear downloading flag, close completionDone, etc.)

The method starts a goroutine (managed via egrp) and returns the fetchOperation for chunk notification and ETA queries.

func (*BlockFetcherV2) Close

func (bf *BlockFetcherV2) Close()

Close shuts down the fetcher's dedicated TransferClient. Must be called when the fetcher is no longer needed.

func (*BlockFetcherV2) CreateFetchCallback

func (bf *BlockFetcherV2) CreateFetchCallback() func(ctx context.Context, startBlock, endBlock uint32) error

CreateFetchCallback returns a callback function for the RangeReader

func (*BlockFetcherV2) FetchBlocks

func (bf *BlockFetcherV2) FetchBlocks(ctx context.Context, startBlock, endBlock uint32) error

FetchBlocks fetches the specified range of blocks from the origin. Blocks until all requested blocks are available or an error occurs. Implicitly marks this fetcher as having active client interest.

func (*BlockFetcherV2) FetchBlocksAsync

func (bf *BlockFetcherV2) FetchBlocksAsync(ctx context.Context, startBlock, endBlock uint32) (*fetchOperation, error)

FetchBlocksAsync fetches blocks asynchronously and returns a channel that will be closed when the fetch completes (either successfully or with error). The returned error is non-nil only if the fetch couldn't be started. Check the fetchOperation for the final error after doneCh is closed. Implicitly marks this fetcher as having active client interest.

func (*BlockFetcherV2) GetChunkChannel

func (bf *BlockFetcherV2) GetChunkChannel(op *fetchOperation, chunkIndex int64) <-chan struct{}

GetChunkChannel returns a channel that will be closed when the specified chunk is complete. This is the safe notification pattern - callers wait for close, no values are sent. Returns nil if the chunk is already complete.

func (*BlockFetcherV2) StartPrefetch

func (bf *BlockFetcherV2) StartPrefetch(ctx context.Context)

StartPrefetch starts prefetching the entire object in the background. The prefetch will be cancelled if there are no active clients for the prefetch timeout period.

func (*BlockFetcherV2) WaitForChunkWithETA

func (bf *BlockFetcherV2) WaitForChunkWithETA(ctx context.Context, op *fetchOperation, chunkIndex int64) (completed bool, err error)

WaitForChunkWithETA waits for a chunk to complete, but gives up if the per-chunk ETA becomes stale. Returns true if the chunk completed, false if the ETA became stale (caller should try direct download).

type BlockFetcherV2Config

type BlockFetcherV2Config struct {
	PrefetchTimeout time.Duration
	// PrefetchSem is a shared semaphore limiting the total number of
	// concurrent prefetches across all fetchers and downloads.  When
	// nil, a per-fetcher semaphore is created with capacity 5.
	PrefetchSem chan struct{}
}

BlockFetcherV2Config holds configuration for the block fetcher

type BlockRange

type BlockRange struct {
	Start uint32
	End   uint32
}

BlockRange represents a contiguous range of blocks

type BlockSummary

type BlockSummary struct {
	TotalBlocks      uint32   `json:"total_blocks"`
	DownloadedBlocks uint32   `json:"downloaded_blocks"`
	MissingBlocks    []uint32 `json:"missing_blocks,omitempty"` // Up to first 100 missing blocks
	IsComplete       bool     `json:"is_complete"`
	PercentComplete  float64  `json:"percent_complete"`
}

BlockSummary provides an overview of which blocks have been downloaded.

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter is an io.WriteCloser that encrypts and writes blocks directly to disk storage. It buffers incoming data and writes complete blocks as they are filled. Contiguous encrypted blocks are batched (up to writeBatchBlocks) so that multiple blocks can be written to disk in a single WriteAt call, reducing syscall overhead and improving sequential write throughput. This is used for efficient streaming downloads with block-level encryption.

func (*BlockWriter) BytesWritten

func (bw *BlockWriter) BytesWritten() int64

BytesWritten returns the total number of bytes written so far

func (*BlockWriter) Close

func (bw *BlockWriter) Close() error

Close flushes any remaining data and closes the file

func (*BlockWriter) Flush

func (bw *BlockWriter) Flush() error

Flush writes any accumulated batch of encrypted blocks to disk and updates the shared block state so that concurrent readers can see them immediately. This is a no-op when the batch is empty.

Callers that need low-latency streaming (e.g. the blockWriter adapter in AdoptTransfer) should call Flush periodically instead of waiting for the batch to reach writeBatchBlocks.

func (*BlockWriter) Write

func (bw *BlockWriter) Write(p []byte) (n int, err error)

Write implements io.Writer. Data is buffered and written to disk in encrypted blocks. Blocks that already exist in the bitmap are skipped.

type CacheDB

type CacheDB struct {
	// contains filtered or unexported fields
}

CacheDB wraps BadgerDB with cache-specific operations

func NewCacheDB

func NewCacheDB(ctx context.Context, baseDir string) (*CacheDB, error)

NewCacheDB creates and initializes a new cache database.

Requires issuer keys to be initialized via config.GetIssuerPublicJWKS() or InitIssuerKeyForTests() before calling this function.

func OpenCacheDBReadOnly

func OpenCacheDBReadOnly(baseDir string) (*CacheDB, error)

OpenCacheDBReadOnly opens an existing cache database in read-only mode. This is suitable for CLI introspection tools that need to inspect cache contents without modifying anything.

Like NewCacheDB, this requires issuer keys to be available for decrypting the database encryption key.

func (*CacheDB) AddUsage

func (cdb *CacheDB) AddUsage(storageID StorageID, namespaceID NamespaceID, delta int64) error

AddUsage atomically adjusts the namespace-scoped usage counter by delta bytes. Internally this uses BadgerDB's MergeOperator so that the write is append-only (no read-modify-write cycle) and cannot conflict with concurrent transactions.

func (*CacheDB) ChargeUsage

func (cdb *CacheDB) ChargeUsage(storageID StorageID, namespaceID NamespaceID, delta int64) error

ChargeUsage is an alias for AddUsage for backward compatibility.

func (*CacheDB) ClearBlocks

func (cdb *CacheDB) ClearBlocks(instanceHash InstanceHash, blocks []uint32) error

ClearBlocks removes the specified blocks from the downloaded bitmap so they will be re-fetched on the next read. This is used during auto-repair when corruption is detected.

func (*CacheDB) Close

func (cdb *CacheDB) Close() error

Close closes the database. All usage MergeOperators are stopped first (blocking until their background goroutines exit) so that accumulated deltas are flushed before the DB is closed.

func (*CacheDB) ComputeActualUsage

func (cdb *CacheDB) ComputeActualUsage() (map[StorageUsageKey]int64, error)

ComputeActualUsage performs a full scan of the metadata table to compute the real byte-level usage per (StorageID, NamespaceID).

Completed objects contribute their full ContentLength. In-progress objects contribute the bytes implied by their block bitmap.

This is an expensive read-only operation. The consistency checker accumulates usage during its metadata scan instead of calling this; it is retained for ad-hoc diagnostics and tests.

func (*CacheDB) ComputeInlineDataSize

func (cdb *CacheDB) ComputeInlineDataSize() (int64, error)

ComputeInlineDataSize scans all inline data entries (d: prefix) and sums the stored value sizes. This gives the actual bytes consumed in BadgerDB for inline object data (excluding metadata overhead).

func (*CacheDB) DeleteBlockState

func (cdb *CacheDB) DeleteBlockState(instanceHash InstanceHash) error

DeleteBlockState removes block state for a file

func (*CacheDB) DeleteLatestETag

func (cdb *CacheDB) DeleteLatestETag(objectHash ObjectHash) error

DeleteLatestETag removes the ETag entry for an object

func (*CacheDB) DeleteMetadata

func (cdb *CacheDB) DeleteMetadata(instanceHash InstanceHash) error

DeleteMetadata removes metadata for a file

func (*CacheDB) DeleteObject

func (cdb *CacheDB) DeleteObject(instanceHash InstanceHash) error

DeleteObject removes all data for a cached object. Uses metadata to compute exact LRU key for efficient deletion. Also cleans up ETag table, purge-first marker, and adjusts usage counters. Usage is decremented via AddUsage after the transaction commits so that the write cannot conflict.

func (*CacheDB) EvictByLRU

func (cdb *CacheDB) EvictByLRU(storageID StorageID, namespaceID NamespaceID, maxObjects int, maxBytes int64) ([]evictedObject, error)

EvictByLRU evicts objects from a storage+namespace combination, draining purge-first items before walking the regular LRU index — all within a single BadgerDB transaction.

Eviction stops when either maxObjects have been removed or maxBytes of content has been freed — whichever comes first. A value of 0 for either limit means "no limit on that dimension". The method is allowed to go one object over the byte threshold so that progress is always made even when only large objects remain.

func (*CacheDB) FindRecyclableStorageID

func (cdb *CacheDB) FindRecyclableStorageID(mountedDirs map[StorageID]string) (StorageID, error)

FindRecyclableStorageID searches persisted disk mappings for the unmounted storageID with the least usage. mountedDirs maps storageID to directory path for IDs currently assigned to live directories — those are excluded. Returns the storageID and nil on success, or an error if no recyclable ID exists.

func (*CacheDB) GetAllUsage

func (cdb *CacheDB) GetAllUsage() (map[StorageUsageKey]int64, error)

GetAllUsage returns usage for all storage+namespace combinations

func (*CacheDB) GetBlockState

func (cdb *CacheDB) GetBlockState(instanceHash InstanceHash) (*roaring.Bitmap, error)

GetBlockState retrieves the bitmap of downloaded blocks for a file.

If the block-state key is absent but the object's metadata indicates a completed download, a fully-populated bitmap is returned. This allows callers to treat completed objects uniformly without requiring a separate completion check. The block-state key is removed on completion to save database space (see BlockWriter.Close).

func (*CacheDB) GetDirUsage

func (cdb *CacheDB) GetDirUsage(storageID StorageID) (map[NamespaceID]int64, error)

GetDirUsage returns usage for all namespaces within a single storage directory.

func (*CacheDB) GetDownloadedBlockCount

func (cdb *CacheDB) GetDownloadedBlockCount(instanceHash InstanceHash) (uint64, error)

GetDownloadedBlockCount returns the number of downloaded blocks

func (*CacheDB) GetEncryptionManager

func (cdb *CacheDB) GetEncryptionManager() *EncryptionManager

GetEncryptionManager returns the encryption manager

func (*CacheDB) GetInlineData

func (cdb *CacheDB) GetInlineData(instanceHash InstanceHash) ([]byte, error)

GetInlineData retrieves encrypted inline data for a small file

func (*CacheDB) GetLatestETag

func (cdb *CacheDB) GetLatestETag(objectHash ObjectHash) (string, bool, error)

GetLatestETag retrieves the latest ETag for an object. Returns (etag, found, err). An object cached without an ETag will return ("", true, nil); an object not in the cache returns ("", false, nil).

func (*CacheDB) GetMetadata

func (cdb *CacheDB) GetMetadata(instanceHash InstanceHash) (*CacheMetadata, error)

GetMetadata retrieves cache metadata for a file

func (*CacheDB) GetUsage

func (cdb *CacheDB) GetUsage(storageID StorageID, namespaceID NamespaceID) (int64, error)

GetUsage retrieves the total bytes used by a storage+namespace combination. If a MergeOperator is active for the key, its Get() method is used to replay all accumulated deltas; otherwise a raw read is performed.

func (*CacheDB) HasMetadata

func (cdb *CacheDB) HasMetadata(instanceHash InstanceHash) (bool, error)

HasMetadata checks if metadata exists for a file

func (*CacheDB) InstanceHash

func (cdb *CacheDB) InstanceHash(etag string, objectHash ObjectHash) InstanceHash

InstanceHash computes the salted SHA-256 hash for (etag, objectHash).

func (*CacheDB) IsBlockDownloaded

func (cdb *CacheDB) IsBlockDownloaded(instanceHash InstanceHash, blockNum uint32) (bool, error)

IsBlockDownloaded checks if a specific block has been downloaded

func (*CacheDB) IsPurgeFirst

func (cdb *CacheDB) IsPurgeFirst(instanceHash InstanceHash) (bool, error)

IsPurgeFirst checks if a file hash is marked for priority eviction

func (*CacheDB) LoadDiskMappings

func (cdb *CacheDB) LoadDiskMappings() ([]DiskMapping, error)

LoadDiskMappings loads all persisted disk mappings.

func (*CacheDB) LoadNamespaceMappings

func (cdb *CacheDB) LoadNamespaceMappings() (map[string]NamespaceID, NamespaceID, error)

LoadNamespaceMappings loads all persisted namespace mappings and returns them as a map[prefix]->id, along with the highest ID seen (so the caller can resume the counter).

func (*CacheDB) MarkBlocksDownloaded

func (cdb *CacheDB) MarkBlocksDownloaded(instanceHash InstanceHash, startBlock, endBlock uint32, storageID StorageID, namespaceID NamespaceID, contentLength int64) error

MarkBlocksDownloaded marks specific blocks as downloaded and atomically updates usage statistics based on the number of newly-added blocks. Usage tracking requires metadata to be set for the instanceHash; if metadata is not yet available, the bitmap is still updated but usage tracking is skipped.

func (*CacheDB) MarkPurgeFirst

func (cdb *CacheDB) MarkPurgeFirst(instanceHash InstanceHash) error

MarkPurgeFirst marks a file hash for priority eviction

func (*CacheDB) MergeBlockStateWithUsage

func (cdb *CacheDB) MergeBlockStateWithUsage(instanceHash InstanceHash, newBlocks *roaring.Bitmap, storageID StorageID, namespaceID NamespaceID, contentLength int64) error

MergeBlockStateWithUsage atomically merges new blocks into the existing bitmap AND updates usage statistics based on the number of newly-enabled bits.

contentLength controls how the usage delta is calculated:

  • If >= 0, the supplied contentLength, storageID, and namespaceID are used directly, avoiding a metadata DB read.
  • If < 0 (typically -1), the metadata is read within the transaction to obtain the content length, storage ID, and namespace ID.

The method retries on BadgerDB transaction conflicts, which can occur when multiple concurrent block fetchers write to the same object's bitmap.

func (*CacheDB) MergeMetadata

func (cdb *CacheDB) MergeMetadata(instanceHash InstanceHash, incoming *CacheMetadata) error

MergeMetadata performs an atomic read-modify-update of the metadata for instanceHash. If no metadata exists yet, incoming is written as-is (initial creation).

Field-level merge rules:

  • Max-time (LastModified, LastValidated, LastAccessTime, Expires, Completed): keep the later of existing vs incoming.
  • Additive (Checksums): union by ChecksumType; if both sides provide the same Type, prefer the OriginVerified entry.
  • Last-writer-wins (ContentType, ContentLength, VaryHeaders, CCFlags, CCMaxAge): incoming replaces existing when the incoming value is non-zero / non-empty.
  • Set-once (ETag, SourceURL, DataKey, StorageID, NamespaceID): may transition from zero-value to a value, but changing a non-zero value to a different non-zero value returns an error. ETag is set-once because it is part of the instance hash; a changed ETag produces a different instance.

The method retries on BadgerDB transaction conflicts, which can occur when multiple concurrent callers merge metadata for the same instance (e.g. concurrent range-on-miss initialization via initObjectFromStat).

func (*CacheDB) NewBatch

func (cdb *CacheDB) NewBatch() *Batch

NewBatch creates a new write batch

func (*CacheDB) ObjectHash

func (cdb *CacheDB) ObjectHash(pelicanURL string) ObjectHash

ObjectHash computes the salted SHA-256 hash for a pelican URL.

func (*CacheDB) PurgeStorageID

func (cdb *CacheDB) PurgeStorageID(storageID StorageID) error

PurgeStorageID removes all database entries associated with a storageID: object metadata, block state, inline data, LRU entries, purge-first markers, ETag entries, usage counters, and the disk mapping itself.

This is used during storage ID recycling to reclaim an ID that was previously assigned to a directory that is no longer mounted.

Objects are deleted in batches to avoid exceeding BadgerDB's transaction size limit.

func (*CacheDB) Salt

func (cdb *CacheDB) Salt() []byte

Salt returns the per-database random salt used for hashing.

func (*CacheDB) SaveDiskMapping

func (cdb *CacheDB) SaveDiskMapping(dm DiskMapping) error

SaveDiskMapping persists a single storageID → (UUID, directory) mapping.

func (*CacheDB) ScanMetadata

func (cdb *CacheDB) ScanMetadata(fn func(instanceHash InstanceHash, meta *CacheMetadata) error) error

ScanMetadata iterates over all metadata entries

func (*CacheDB) ScanMetadataFrom

func (cdb *CacheDB) ScanMetadataFrom(startKey InstanceHash, fn func(instanceHash InstanceHash, meta *CacheMetadata) error) error

ScanMetadataFrom scans metadata starting from the given instanceHash (empty string = start from beginning)

func (*CacheDB) SetBlockState

func (cdb *CacheDB) SetBlockState(instanceHash InstanceHash, bitmap *roaring.Bitmap) error

SetBlockState stores the bitmap of downloaded blocks

func (*CacheDB) SetInlineData

func (cdb *CacheDB) SetInlineData(instanceHash InstanceHash, encryptedData []byte) error

SetInlineData stores encrypted inline data for a small file. The caller (StoreInline) is responsible for usage accounting via ChargeUsage; this function does NOT adjust usage counters.

func (*CacheDB) SetLatestETag

func (cdb *CacheDB) SetLatestETag(objectHash ObjectHash, etag string, observedAt time.Time) error

SetLatestETag stores the latest ETag for an object, but only if observedAt is more recent than the already-stored timestamp. This prevents a slow download that finishes late from clobbering a newer ETag written by a more recent request.

func (*CacheDB) SetMetadata

func (cdb *CacheDB) SetMetadata(instanceHash InstanceHash, meta *CacheMetadata) error

SetMetadata stores cache metadata for a file, unconditionally replacing any previously stored metadata. Use this only for initial creation of a metadata entry (e.g. InitDiskStorage, StoreInline); for subsequent updates prefer MergeMetadata which applies field-level merge semantics.

func (*CacheDB) SetNamespaceMapping

func (cdb *CacheDB) SetNamespaceMapping(prefix string, id NamespaceID) error

SetNamespaceMapping persists the mapping from a namespace prefix string to a numeric ID. This ensures the IDs survive restarts so that LRU keys and usage counters remain valid.

func (*CacheDB) SetUsage

func (cdb *CacheDB) SetUsage(storageID StorageID, namespaceID NamespaceID, value int64) error

SetUsage sets the absolute usage counter for a (storageID, namespaceID) pair. Any active MergeOperator for the key is stopped first (flushing pending deltas) and then the value is overwritten.

The entry is written with WithDiscard() so that BadgerDB marks all earlier versions of the key as eligible for garbage collection. Without this flag, the MergeOperator's iterateAndMerge would still see the old compacted entry (which carries bitDiscardEarlierVersions) and sum it into the total, causing the counter to include both the new baseline and the old accumulated value — a compounding overcount that grows with every reconciliation cycle.

The next AddUsage call will lazily create a fresh MergeOperator.

func (*CacheDB) StartGC

func (cdb *CacheDB) StartGC(ctx context.Context, egrp *errgroup.Group)

StartGC starts the background garbage collection goroutine

func (*CacheDB) UnmarkPurgeFirst

func (cdb *CacheDB) UnmarkPurgeFirst(instanceHash InstanceHash) error

UnmarkPurgeFirst removes the purge first marker for a file hash

func (*CacheDB) UpdateLRU

func (cdb *CacheDB) UpdateLRU(instanceHash InstanceHash, debounceTime time.Duration) error

UpdateLRU updates the LRU access time for a file Uses debouncing: only updates if last access was more than debounceTime ago This is optimized to avoid iteration by storing the last access time in metadata

type CacheDirectives

type CacheDirectives struct {
	// contains filtered or unexported fields
}

CacheDirectives holds parsed Cache-Control header values that are relevant to the persistent cache.

Boolean directives are packed into a bitfield; use the accessor methods (NoStore, NoCache, etc.) instead of reading fields directly.

max-age and s-maxage are merged into a single freshness lifetime: when both are present the maximum of the two is kept, because the Pelican local cache does not distinguish shared from private.

func ParseCacheControl

func ParseCacheControl(header string) CacheDirectives

ParseCacheControl parses a Cache-Control header value into structured directives. The parsing is case-insensitive for directive names, as required by RFC 7234. When both max-age and s-maxage are present, the maximum of the two durations is kept (the Pelican local cache does not distinguish shared from private).

func (*CacheDirectives) Flags

func (cd *CacheDirectives) Flags() uint8

Flags returns the raw packed bitfield. Callers should prefer the named accessors; this is provided for efficient serialization into CacheMetadata.CCFlags.

func (*CacheDirectives) Freshness

func (cd *CacheDirectives) Freshness() (time.Duration, bool)

Freshness returns the merged freshness lifetime and whether it was set.

func (*CacheDirectives) HasDirectives

func (cd *CacheDirectives) HasDirectives() bool

HasDirectives returns true if any Cache-Control directive was parsed.

func (*CacheDirectives) HasFreshness

func (cd *CacheDirectives) HasFreshness() bool

HasFreshness returns true if the origin provided explicit freshness information (max-age or s-maxage).

func (*CacheDirectives) IsStale

func (cd *CacheDirectives) IsStale(lastValidated time.Time) bool

IsStale checks whether a cached response is stale given when it was last validated (or originally stored). If no freshness information is available (no max-age / s-maxage / no-cache), applies the default cache policy from params with jitter.

func (*CacheDirectives) IsStaleWithDefaults

func (cd *CacheDirectives) IsStaleWithDefaults(lastValidated time.Time) bool

IsStaleWithDefaults checks staleness using configured default values and jitter. This is used when the origin doesn't provide explicit Cache-Control headers.

func (*CacheDirectives) MaxAge

func (cd *CacheDirectives) MaxAge() time.Duration

func (*CacheDirectives) MaxAgeSet

func (cd *CacheDirectives) MaxAgeSet() bool

func (*CacheDirectives) MustRevalidate

func (cd *CacheDirectives) MustRevalidate() bool

func (*CacheDirectives) NoCache

func (cd *CacheDirectives) NoCache() bool

func (*CacheDirectives) NoStore

func (cd *CacheDirectives) NoStore() bool

func (*CacheDirectives) Private

func (cd *CacheDirectives) Private() bool

func (*CacheDirectives) ShouldStore

func (cd *CacheDirectives) ShouldStore() bool

ShouldStore returns true if the response is allowed to be stored in the cache.

type CacheMetadata

type CacheMetadata struct {
	// Validation fields
	ETag          string     `msgpack:"etag"`         // HTTP ETag header
	LastModified  time.Time  `msgpack:"lm"`           // HTTP Last-Modified header
	Expires       time.Time  `msgpack:"exp"`          // HTTP Expires header
	LastValidated time.Time  `msgpack:"lv"`           // When we last validated with origin
	Completed     time.Time  `msgpack:"c"`            // When download was completed
	Checksums     []Checksum `msgpack:"ck,omitempty"` // Object checksums

	// Identification fields
	ContentType   string   `msgpack:"ct"`            // MIME type
	ContentLength int64    `msgpack:"cl"`            // Total object size in bytes
	VaryHeaders   []string `msgpack:"vh,omitempty"`  // Headers that affect caching
	SourceURL     string   `msgpack:"url,omitempty"` // Original URL including federation

	// Cache-Control directives (efficient packed representation)
	CCFlags  uint8 `msgpack:"ccf,omitempty"`  // Bitset: 0x01=no-store, 0x02=no-cache, 0x04=private, 0x08=must-revalidate
	CCMaxAge int32 `msgpack:"ccma,omitempty"` // Merged max-age/s-maxage freshness lifetime (seconds, 0 = not set, max of both if both specified)

	// Storage fields.
	// StorageID encodes both location type and directory identity:
	//   0         = inline (data stored directly in BadgerDB)
	//   1 .. 255  = disk-backed (directory identified by this ID)
	// For chunked objects, this is the location of chunk 0.
	StorageID StorageID `msgpack:"sid"`
	DataKey   []byte    `msgpack:"key"` // Encrypted DEK (Data Encryption Key)

	// Chunking fields for large objects spread across multiple storage directories.
	// ChunkSizeCode encodes the chunk size (0 = chunking disabled, see chunking.go).
	// ChunkLocations stores the StorageID for chunks 1, 2, ... (chunk 0 uses StorageID above).
	ChunkSizeCode  ChunkSizeCode   `msgpack:"csc,omitempty"` // 0 = disabled, see ChunkSizeCodeToBytes()
	ChunkLocations []ChunkLocation `msgpack:"chl,omitempty"` // Locations for chunks after chunk 0

	// Namespace and storage tracking for fairness-aware eviction
	NamespaceID NamespaceID `msgpack:"ns"` // ID of the namespace prefix

	// LRU tracking
	LastAccessTime time.Time `msgpack:"la"` // Last access time for LRU index
}

CacheMetadata stores all metadata about a cached object Serialized using MessagePack for efficiency

Merge semantics (see CacheDB.MergeMetadata)

Fields are classified into groups that govern how concurrent updates are reconciled:

  • Max-time: LastModified, LastValidated, LastAccessTime, Expires, Completed — only advance forward (keep the later timestamp).
  • Additive: Checksums — union by algorithm; prefer OriginVerified.
  • Last-writer-wins: ContentType, ContentLength, VaryHeaders, CCFlags, CCMaxAge — the incoming value always replaces the old one.
  • Set-once: ETag, SourceURL, DataKey, StorageID, NamespaceID, ChunkSizeCode, ChunkLocations — may transition from zero-value to set, but changing a non-zero value to a different non-zero value is an error. ETag is set-once because it is part of the instance hash; a changed ETag produces a different instance. Chunking fields are set-once because changing them would invalidate existing chunk files.

func (*CacheMetadata) AllStorageIDs

func (m *CacheMetadata) AllStorageIDs() []StorageID

AllStorageIDs returns a deduplicated list of all StorageIDs used by this object. For non-chunked objects, returns just the base StorageID.

func (*CacheMetadata) ChunkCount

func (m *CacheMetadata) ChunkCount() int

ChunkCount returns the number of chunks for this object. Returns 1 for non-chunked objects.

func (*CacheMetadata) ComputeExpires

func (m *CacheMetadata) ComputeExpires() time.Time

ComputeExpires returns the absolute time at which this object expires. It uses the origin-supplied max-age when available, otherwise the configured default freshness policy. The result is based on LastValidated (or Completed if LastValidated is zero).

func (*CacheMetadata) EnsureExpires

func (m *CacheMetadata) EnsureExpires()

EnsureExpires sets the Expires field if it is currently zero. Call this after SetCacheControl and after LastValidated is set.

func (*CacheMetadata) GetCacheControlHeader

func (m *CacheMetadata) GetCacheControlHeader() string

GetCacheControlHeader reconstructs the Cache-Control header string for HTTP responses. It returns the origin's directives verbatim when present; when the origin did not specify any Cache-Control, it returns "" (callers should use ResponseCacheControl instead to get a header that reflects the default policy).

func (*CacheMetadata) GetCacheDirectives

func (m *CacheMetadata) GetCacheDirectives() CacheDirectives

GetCacheDirectives returns the parsed cache directives

func (*CacheMetadata) GetChunkInfo

func (m *CacheMetadata) GetChunkInfo() []ChunkInfo

GetChunkInfo returns information about all chunks for this object.

func (*CacheMetadata) GetChunkStorageID

func (m *CacheMetadata) GetChunkStorageID(chunkIndex int) StorageID

GetChunkStorageID returns the StorageID for a specific chunk (0-indexed). Chunk 0 uses the base StorageID, chunks 1+ use ChunkLocations. Returns StorageIDInline (0) for unallocated chunks (lazy allocation).

func (*CacheMetadata) IsChunkAllocated

func (m *CacheMetadata) IsChunkAllocated(chunkIndex int) bool

IsChunkAllocated returns true if the specified chunk has a storage ID assigned. For chunked objects, StorageIDInline (0) means the chunk is not yet allocated. For non-chunked objects, always returns true (single storage).

func (*CacheMetadata) IsChunked

func (m *CacheMetadata) IsChunked() bool

IsChunked returns true when the object is stored across multiple chunk files.

func (*CacheMetadata) IsDisk

func (m *CacheMetadata) IsDisk() bool

IsDisk returns true when the object data is (or will be) stored on disk. Chunked objects are always disk-based, even when chunk 0 has not yet been allocated.

func (*CacheMetadata) IsInline

func (m *CacheMetadata) IsInline() bool

IsInline returns true when the object data is stored directly in BadgerDB. A chunked object is never inline, even if its base StorageID is still 0 (unallocated).

func (*CacheMetadata) PerDirectoryBytes

func (m *CacheMetadata) PerDirectoryBytes() map[StorageID]int64

PerDirectoryBytes returns a map from StorageID to the number of content bytes that live in each storage directory. For non-chunked objects the entire ContentLength is attributed to the base StorageID. For chunked objects the byte count is split according to each chunk's assigned directory. Unallocated chunks (StorageID 0) are skipped.

func (*CacheMetadata) ResponseCacheControl

func (m *CacheMetadata) ResponseCacheControl() string

ResponseCacheControl returns the Cache-Control header value the cache should send to downstream clients. When the origin specified directives, those are forwarded. When it did not, the cache advertises the remaining freshness lifetime (derived from LocalCache_DefaultMaxAge + jitter) as max-age so that downstream clients can cache the response without re-contacting the cache until revalidation is due.

func (*CacheMetadata) SetCacheControl

func (m *CacheMetadata) SetCacheControl(header string)

SetCacheControl parses a Cache-Control header and stores the directives efficiently

func (*CacheMetadata) SetChunkStorageID

func (m *CacheMetadata) SetChunkStorageID(chunkIndex int, storageID StorageID)

SetChunkStorageID sets the StorageID for a specific chunk (0-indexed). For chunk 0, sets the base StorageID. For chunks 1+, sets ChunkLocations. The ChunkLocations slice must be pre-allocated to the correct size.

type CacheMode

type CacheMode int

CacheMode distinguishes whether the persistent cache is running as a full cache server (CacheType) or as a client-side local cache (LocalCacheType). The mode determines which configuration namespace is consulted for defaults (Cache.* vs LocalCache.*).

const (
	// CacheModeLocal is the default: use LocalCache.* params.
	CacheModeLocal CacheMode = iota
	// CacheModeServer uses Cache.* params (cache server).
	CacheModeServer
)

type CacheStats

type CacheStats struct {
	TotalInlineBytes     int64                       `json:"total_inline_bytes"`
	TotalMetadataEntries int64                       `json:"total_metadata_entries"`
	TotalBytesMetadata   int64                       `json:"total_bytes_metadata"`     // Sum of ContentLength from metadata entries
	UsageCounters        map[string]int64            `json:"usage_counters,omitempty"` // Pre-computed usage from u: prefix keys
	StorageBreakdown     map[string]*StorageDirStats `json:"storage_breakdown,omitempty"`
	DirPaths             map[uint8]string            `json:"dir_paths,omitempty"`       // StorageID → directory path
	NamespaceNames       map[uint32]string           `json:"namespace_names,omitempty"` // NamespaceID → prefix string
}

CacheStats contains aggregate size statistics about the cache.

type Checksum

type Checksum struct {
	Type            ChecksumType `msgpack:"t"`
	Value           []byte       `msgpack:"v"`
	OriginVerified  bool         `msgpack:"ov"`           // True if checksum came from origin
	VerifyAttempted bool         `msgpack:"va,omitempty"` // True if we tried to get origin checksum
}

Checksum holds a checksum type and its value

func ParseChecksumHeader

func ParseChecksumHeader(headerValue string, headerType string) *Checksum

ParseChecksumHeader parses a checksum from HTTP headers

type ChecksumInfo

type ChecksumInfo struct {
	Type           string `json:"type"`
	Value          string `json:"value"` // Hex-encoded
	OriginVerified bool   `json:"origin_verified"`
}

ChecksumInfo describes a stored checksum.

type ChecksumStatus

type ChecksumStatus struct {
	Type     string `json:"type"`
	Expected string `json:"expected"` // Hex-encoded
	Computed string `json:"computed"` // Hex-encoded (only if verification ran)
	Match    bool   `json:"match"`
}

ChecksumStatus describes the verification status of a single checksum.

type ChecksumType

type ChecksumType uint8

ChecksumType identifies the type of checksum

const (
	ChecksumMD5    ChecksumType = 0
	ChecksumSHA1   ChecksumType = 1
	ChecksumSHA256 ChecksumType = 2
	ChecksumCRC32  ChecksumType = 3
	ChecksumCRC32C ChecksumType = 4
)

type ChunkInfo

type ChunkInfo struct {
	Index       int       // 0-based chunk index
	StorageID   StorageID // Storage directory for this chunk
	StartOffset int64     // Content byte offset where this chunk starts
	EndOffset   int64     // Content byte offset where this chunk ends (inclusive)
	Size        int64     // Size of this chunk in bytes
}

ChunkInfo holds computed information about a chunk.

func GetChunkInfo

func GetChunkInfo(contentLength int64, chunkSizeCode ChunkSizeCode, chunkLocations []ChunkLocation, baseStorageID StorageID) []ChunkInfo

GetChunkInfo returns information about all chunks for an object. chunkLocations should be the ChunkLocations from CacheMetadata (may be nil/empty for non-chunked objects). baseStorageID is the StorageID from CacheMetadata (where chunk 0 is stored).

type ChunkInfoSummary

type ChunkInfoSummary struct {
	ChunkSizeBytes int64   `json:"chunk_size_bytes"`
	ChunkCount     int     `json:"chunk_count"`
	ChunkLocations []uint8 `json:"chunk_locations"` // StorageID per chunk
}

ChunkInfoSummary describes chunking configuration for large objects.

type ChunkLocation

type ChunkLocation struct {
	StorageID StorageID `msgpack:"s"` // Which storage directory holds this chunk
}

ChunkLocation stores the storage location of a single chunk. Chunk 0 is always stored at the StorageID in CacheMetadata. Additional chunks (1, 2, ...) are stored in the ChunkLocations slice.

type ChunkNotification

type ChunkNotification struct {
	ChunkIndex int64     // Which chunk this notification is for
	Completed  bool      // True if this chunk is complete
	Error      error     // Non-nil if an error occurred
	ETA        time.Time // Estimated time of completion (updated atomically)
}

ChunkNotification contains information about a chunk's completion status

type ChunkSizeCode

type ChunkSizeCode uint8

ChunkSizeCode is a compact uint8 encoding of chunk sizes for storage efficiency. The encoding is non-linear to cover a wide range from 2MB to ~57GB:

  • 0: Chunking disabled (object stored in a single file)
  • 1-6: Doubling: 2^n MB (2, 4, 8, 16, 32, 64 MB)
  • 7-21: 64 MB increments starting at 128 MB (128, 192, ..., 1024 MB)
  • 22-53: 128 MB increments starting at 1152 MB (1152, 1280, ..., 5120 MB)
  • 54-255: 256 MB increments starting at 5376 MB (5376, 5632, ..., ~57 GB)
const (
	// ChunkingDisabled indicates the object is stored in a single file
	ChunkingDisabled ChunkSizeCode = 0
)

func BytesToChunkSizeCode

func BytesToChunkSizeCode(size uint64) ChunkSizeCode

BytesToChunkSizeCode converts a byte size to the nearest ChunkSizeCode that is >= the requested size (rounds up to ensure chunks can hold the data). Returns ChunkingDisabled (0) if size is 0.

func ParseChunkSize

func ParseChunkSize(s string) (ChunkSizeCode, error)

ParseChunkSize parses a human-readable chunk size string (e.g., "64MB", "2GB") and returns the corresponding ChunkSizeCode.

type ConsistencyCheckResult

type ConsistencyCheckResult struct {
	MetadataScanRan    bool   `json:"metadata_scan_ran"`
	DataScanRan        bool   `json:"data_scan_ran"`
	OrphanedFiles      int64  `json:"orphaned_files"`
	OrphanedDBEntries  int64  `json:"orphaned_db_entries"`
	ChecksumMismatches int64  `json:"checksum_mismatches"`
	BytesVerified      int64  `json:"bytes_verified"`
	ObjectsVerified    int64  `json:"objects_verified"`
	MetadataScanErrors int64  `json:"metadata_scan_errors"`
	DataScanErrors     int64  `json:"data_scan_errors"`
	Duration           string `json:"duration"`
	Error              string `json:"error,omitempty"`
}

ConsistencyCheckResult contains the result of a consistency check run.

type ConsistencyChecker

type ConsistencyChecker struct {
	// contains filtered or unexported fields
}

ConsistencyChecker verifies cache consistency between database and disk. For multi-directory configurations, it scans each storage directory independently.

func NewConsistencyChecker

func NewConsistencyChecker(db *CacheDB, storage *StorageManager, config ConsistencyConfig) *ConsistencyChecker

NewConsistencyChecker creates a new consistency checker.

func (*ConsistencyChecker) GetStats

func (cc *ConsistencyChecker) GetStats() ConsistencyStats

GetStats returns current statistics

func (*ConsistencyChecker) RunDataScan

func (cc *ConsistencyChecker) RunDataScan(ctx context.Context, progressCh chan<- ScanProgressEvent) error

RunDataScan performs a full data integrity scan It verifies checksums of stored objects using block-by-block reading

func (*ConsistencyChecker) RunMetadataScan

func (cc *ConsistencyChecker) RunMetadataScan(ctx context.Context, progressCh chan<- ScanProgressEvent) error

RunMetadataScan performs a metadata consistency scan. It verifies that database entries match files on disk and vice versa.

func (*ConsistencyChecker) Start

func (cc *ConsistencyChecker) Start(ctx context.Context, egrp *errgroup.Group)

Start begins the background consistency checking goroutines

func (*ConsistencyChecker) Stop

func (cc *ConsistencyChecker) Stop()

Stop stops the consistency checker

func (*ConsistencyChecker) VerifyBlockIntegrity

func (cc *ConsistencyChecker) VerifyBlockIntegrity(instanceHash InstanceHash) ([]uint32, error)

VerifyBlockIntegrity verifies the integrity of individual blocks by delegating to the storage manager's IdentifyCorruptBlocks, which handles crypto setup, file access, and AES-GCM auth-tag verification.

func (*ConsistencyChecker) VerifyObject

func (cc *ConsistencyChecker) VerifyObject(instanceHash InstanceHash) (bool, error)

VerifyObject verifies a single object's integrity. If checksums are present, it verifies them all in a single pass. If no checksums are present, it still reads all data to verify readability (i.e. that decryption tags are valid and all blocks are accessible). Returns (true, nil) if the object is valid, (false, nil) if corrupt.

type ConsistencyConfig

type ConsistencyConfig struct {
	// MetadataScanActiveMs limits metadata scan to this many ms per second (default: 100)
	MetadataScanActiveMs int64
	// DataScanBytesPerSec limits data scan to this many bytes per second (default: 100MB)
	DataScanBytesPerSec int64
	// MinAgeForCleanup is the minimum age before an entry/file can be cleaned up (default: 5 minutes, 0 for tests)
	MinAgeForCleanup time.Duration
	// ChecksumTypes specifies which checksums to calculate and verify.
	// When empty, defaults to []ChecksumType{ChecksumSHA256}.
	ChecksumTypes []ChecksumType
}

ConsistencyConfig holds configuration for the consistency checker

type ConsistencyStats

type ConsistencyStats struct {
	LastMetadataScan   time.Time
	LastDataScan       time.Time
	MetadataScanErrors int64
	DataScanErrors     int64
	OrphanedFiles      int64
	OrphanedDBEntries  int64
	ChecksumMismatches int64
	BytesVerified      int64
	ObjectsVerified    int64
}

ConsistencyStats holds statistics from consistency checks

type DirDiskStat

type DirDiskStat struct {
	StorageID uint8  `json:"storage_id"`
	Path      string `json:"path"`
	BytesUsed int64  `json:"bytes_used"`
	FileCount int64  `json:"file_count"`
}

DirDiskStat holds per-directory disk usage from walking the filesystem.

type DirEvictionStats

type DirEvictionStats struct {
	MaxSize   uint64
	HighWater uint64
	LowWater  uint64
}

DirEvictionStats contains per-directory eviction statistics

type DiskMapping

type DiskMapping struct {
	ID        StorageID `msgpack:"id"`
	UUID      string    `msgpack:"uuid"`
	Directory string    `msgpack:"dir"`
}

DiskMapping stores the mapping of a storage ID to its directory path and UUID. The UUID file is dropped in the directory root so that directories can be remounted at different paths and re-associated.

type DiskUsageResult

type DiskUsageResult struct {
	TotalBytesOnDisk int64                   `json:"total_bytes_on_disk"`
	TotalFiles       int64                   `json:"total_files"`
	Directories      map[string]*DirDiskStat `json:"directories,omitempty"`
	Duration         string                  `json:"duration"` // How long the walk took
}

DiskUsageResult contains the result of an expensive disk walk.

type EncryptionManager

type EncryptionManager struct {
	// contains filtered or unexported fields
}

EncryptionManager handles all encryption operations for the cache.

The masterKey is loaded (or generated) once during construction and is never mutated afterwards. Two child keys are derived from it via HKDF so that nothing ever uses the masterKey directly:

  • dekWrapKey – wraps/unwraps per-object data encryption keys (DEKs)
  • the DB key – returned by DeriveDBKey for BadgerDB encryption

The mu mutex protects only the on-disk serialisation in saveMasterKey (called by UpdateMasterKeyEncryption); reader methods are lock-free because the fields they touch are immutable after construction.

func NewEncryptionManager

func NewEncryptionManager(baseDir string) (*EncryptionManager, error)

NewEncryptionManager creates a new encryption manager It loads or creates the master key from the specified directory.

Requires issuer keys to be initialized via config.GetIssuerPublicJWKS() or InitIssuerKeyForTests() before calling this function.

func (*EncryptionManager) DecryptDataKey

func (em *EncryptionManager) DecryptDataKey(encryptedDEK []byte) ([]byte, error)

DecryptDataKey decrypts a DEK using the HKDF-derived wrapping key. The wrapping key is immutable after construction, so no lock is needed.

func (*EncryptionManager) DecryptInline

func (em *EncryptionManager) DecryptInline(encryptedData, dek, nonce []byte) ([]byte, error)

DecryptInline decrypts inline data (small objects)

func (*EncryptionManager) DeriveDBKey

func (em *EncryptionManager) DeriveDBKey() ([]byte, error)

DeriveDBKey derives a separate encryption key for BadgerDB using HKDF. This ensures proper key separation: the DEK-wrapping key handles per-object keys, while this derived key encrypts BadgerDB's LSM tree and WAL files (protecting metadata such as ETags, URLs, and timestamps at rest).

masterKey is immutable after construction, so no lock is needed.

func (*EncryptionManager) EncryptDataKey

func (em *EncryptionManager) EncryptDataKey(dek []byte) ([]byte, error)

EncryptDataKey encrypts a DEK using the HKDF-derived wrapping key. The wrapping key is immutable after construction, so no lock is needed.

func (*EncryptionManager) EncryptInline

func (em *EncryptionManager) EncryptInline(data, dek, nonce []byte) ([]byte, error)

EncryptInline encrypts data for inline storage (small objects)

func (*EncryptionManager) GenerateDataKey

func (em *EncryptionManager) GenerateDataKey() ([]byte, error)

GenerateDataKey generates a new random data encryption key (DEK)

func (*EncryptionManager) UpdateMasterKeyEncryption

func (em *EncryptionManager) UpdateMasterKeyEncryption() error

UpdateMasterKeyEncryption re-encrypts the master key with current issuer keys This should be called when issuer keys change

type EvictionConfig

type EvictionConfig struct {
	// DirConfigs maps storageID to its eviction limits.
	// Each entry describes one storage directory.
	DirConfigs map[StorageID]EvictionDirConfig
}

EvictionConfig holds configuration for the eviction manager

type EvictionDirConfig

type EvictionDirConfig struct {
	MaxSize             uint64 // Maximum cache size in bytes for this directory
	HighWaterPercentage int    // Percentage at which eviction starts (0 = default 90)
	LowWaterPercentage  int    // Percentage at which eviction stops  (0 = default 80)
	HighWaterBytes      uint64 // Absolute byte threshold (overrides percentage when > 0)
	LowWaterBytes       uint64 // Absolute byte threshold (overrides percentage when > 0)
}

EvictionDirConfig holds per-directory eviction configuration.

type EvictionManager

type EvictionManager struct {
	// contains filtered or unexported fields
}

EvictionManager handles fairness-aware cache eviction. Each storage directory has independent watermarks; eviction is triggered per-directory when a directory exceeds its high-water mark and proceeds until the directory's usage falls to its low-water mark.

func NewEvictionManager

func NewEvictionManager(db *CacheDB, storage *StorageManager, config EvictionConfig) *EvictionManager

NewEvictionManager creates a new eviction manager

func (*EvictionManager) ChooseDiskStorage

func (em *EvictionManager) ChooseDiskStorage() StorageID

ChooseDiskStorage selects a storage directory using a pre-computed, shuffled lookup table. The table is rebuilt at most every 100 ms from the in-memory atomic usage estimates. The hot path is a single atomic increment + array index, so many goroutines can call this concurrently with negligible contention.

func (*EvictionManager) ForcePurge

func (em *EvictionManager) ForcePurge() error

ForcePurge forces an immediate purge down to the low water mark. Directories are processed concurrently (see checkAndEvict).

func (*EvictionManager) ForcePurgeToBytes

func (em *EvictionManager) ForcePurgeToBytes(targetBytes uint64) (uint64, int64, error)

ForcePurgeToBytes forces an immediate purge across all directories until total cache usage drops to targetBytes or below. Unlike ForcePurge, which uses the configured low-water mark, this method accepts an explicit target. Returns (bytes freed, objects evicted, error).

func (*EvictionManager) GetAllNamespaceUsage

func (em *EvictionManager) GetAllNamespaceUsage() (map[StorageUsageKey]int64, error)

GetAllNamespaceUsage returns usage for all storage+namespace combinations

func (*EvictionManager) GetNamespaceUsage

func (em *EvictionManager) GetNamespaceUsage(storageID StorageID, namespaceID NamespaceID) (int64, error)

GetNamespaceUsage returns usage for a specific storage+namespace combination

func (*EvictionManager) GetStats

func (em *EvictionManager) GetStats() EvictionStats

GetStats returns eviction manager statistics

func (*EvictionManager) GetTotalUsage

func (em *EvictionManager) GetTotalUsage() uint64

GetTotalUsage returns the current total cache usage (sum of per-dir atomics).

func (*EvictionManager) HasSpace

func (em *EvictionManager) HasSpace(needed uint64) bool

HasSpace returns true if there's room for more data in at least one directory. Uses the in-memory atomic estimates (no DB query).

func (*EvictionManager) MarkPurgeFirst

func (em *EvictionManager) MarkPurgeFirst(instanceHash InstanceHash) error

MarkPurgeFirst marks an object to be purged first during next eviction

func (*EvictionManager) NoteUsageIncrease

func (em *EvictionManager) NoteUsageIncrease(storageID StorageID, bytes int64)

NoteUsageIncrease updates the in-memory usage estimate and triggers an eviction check if the specified directory's high-water mark appears to be exceeded. It does NOT write to the persistent database — the caller is responsible for ensuring the DB was already updated (e.g., via MergeBlockStateWithUsage which tracks usage atomically alongside the bitmap merge). The database usage updates handle race conditions where multiple writers may be updating the same blocks simultaneously. This doesn't - hence the estimated usage is going to be potentially higher than the actual usage.

The per-directory atomic counter is the fast path: if the counter is under the high-water mark no database call is made at all. When the counter indicates a possible threshold crossing we consult the database for the authoritative total, correct the counter, and only then decide whether to trigger eviction.

func (*EvictionManager) RecordAccess

func (em *EvictionManager) RecordAccess(instanceHash InstanceHash) error

RecordAccess records an access to an object, updating LRU

func (*EvictionManager) Start

func (em *EvictionManager) Start(ctx context.Context, egrp *errgroup.Group)

Start begins the background eviction goroutine

func (*EvictionManager) TriggerEviction

func (em *EvictionManager) TriggerEviction()

TriggerEviction triggers an eviction check

type EvictionStats

type EvictionStats struct {
	TotalUsage     uint64
	DirStats       map[StorageID]DirEvictionStats
	NamespaceUsage map[StorageUsageKey]int64
}

EvictionStats contains eviction manager statistics

type HeadResult

type HeadResult struct {
	ContentLength int64
	Meta          *CacheMetadata // non-nil when the object is cached
}

HeadResult contains the response metadata for a HEAD request.

type InstanceHash

type InstanceHash string

InstanceHash is an HMAC-SHA-256 digest that identifies a specific version (ETag) of an object. Using a dedicated type prevents accidental confusion with ObjectHash or arbitrary strings.

func ComputeInstanceHash

func ComputeInstanceHash(salt []byte, etag string, objectHash ObjectHash) InstanceHash

ComputeInstanceHash computes HMAC-SHA-256(salt, etag + ":" + objectHash). This identifies a specific version of an object. If etag is empty, uses empty string (for objects without ETag support).

func ParseChunkFilename

func ParseChunkFilename(filename string) (baseHash InstanceHash, chunkIndex int, ok bool)

ParseChunkFilename parses a filename (without path) that may be a chunk file. Returns the base instance hash and chunk index. For non-chunked files (no suffix), returns the filename as-is and chunkIndex 0. For chunk files like "deadbeef...-2", returns the base hash and chunkIndex 1. Returns ok=false if the filename doesn't match expected patterns.

type IntrospectAPIOpen

type IntrospectAPIOpen struct {
	// contains filtered or unexported fields
}

IntrospectAPIOpen provides read-only introspection access to the cache database. It opens the cache database in read-only mode, suitable for CLI tools that need to inspect cache contents without disturbing a running cache server.

func NewIntrospectAPI

func NewIntrospectAPI(baseDir string) (*IntrospectAPIOpen, error)

NewIntrospectAPI creates a new introspection API by opening the cache database and storage manager in read-only mode.

func (*IntrospectAPIOpen) Close

func (api *IntrospectAPIOpen) Close() error

Close releases resources held by the introspection API.

func (*IntrospectAPIOpen) GetCacheStats

func (api *IntrospectAPIOpen) GetCacheStats() (*CacheStats, error)

GetCacheStats returns aggregate cache size statistics by scanning metadata. This scans all metadata entries and the inline data prefix, but does NOT walk the disk (use GetDiskUsage for that).

func (*IntrospectAPIOpen) GetDiskUsage

func (api *IntrospectAPIOpen) GetDiskUsage() (*DiskUsageResult, error)

GetDiskUsage walks the storage directories to compute actual disk usage. This is an expensive operation that reads every file's size on disk.

func (*IntrospectAPIOpen) GetObjectDetails

func (api *IntrospectAPIOpen) GetObjectDetails(instanceHash string) (*ObjectDetails, error)

GetObjectDetails returns detailed metadata for a specific object instance. The instanceHash can be obtained from ListObjectInstances. Alternatively, pass objectURL and etag to look up by those identifiers.

func (*IntrospectAPIOpen) GetObjectDetailsByURL

func (api *IntrospectAPIOpen) GetObjectDetailsByURL(objectURL, etag string) (*ObjectDetails, error)

GetObjectDetailsByURL looks up object details by URL and optional ETag. If etag is empty, returns details for the latest version.

func (*IntrospectAPIOpen) ListAllObjects

func (api *IntrospectAPIOpen) ListAllObjects(limit int, pattern string) ([]ObjectInstance, error)

ListAllObjects returns a summary of all cached objects. This can be slow for large caches; consider using pagination in production.

func (*IntrospectAPIOpen) ListObjectInstances

func (api *IntrospectAPIOpen) ListObjectInstances(objectURL string) ([]ObjectInstance, error)

ListObjectInstances returns all cached instances for a given object URL. The URL should be a pelican:// URL (e.g., pelican://host/path/file.dat). If the URL doesn't have a scheme, it will be normalized with the default federation.

func (*IntrospectAPIOpen) RunConsistencyCheck

func (api *IntrospectAPIOpen) RunConsistencyCheck(ctx context.Context, metadataScan, dataScan bool) (*ConsistencyCheckResult, error)

RunConsistencyCheck runs a full consistency check (metadata scan + data scan) and returns statistics. This creates a temporary ConsistencyChecker with unlimited rate limiting for the ad-hoc run.

func (*IntrospectAPIOpen) VerifyChecksum

func (api *IntrospectAPIOpen) VerifyChecksum(instanceHash string) (*VerificationResult, error)

VerifyChecksum triggers a checksum verification for the specified instance. Returns detailed verification results including per-checksum status.

func (*IntrospectAPIOpen) VerifyChecksumByURL

func (api *IntrospectAPIOpen) VerifyChecksumByURL(objectURL, etag string) (*VerificationResult, error)

VerifyChecksumByURL verifies checksum for an object by URL and optional ETag.

type LocalCache

type LocalCache struct {
	// contains filtered or unexported fields
}

func NewLocalCache

func NewLocalCache(ctx context.Context, egrp *errgroup.Group, options ...LocalCacheOption) (lc *LocalCache, err error)

Create a local cache object

Launches background goroutines associated with the cache

func (*LocalCache) Config

func (lc *LocalCache) Config(egrp *errgroup.Group) (err error)

Try to configure the local cache and launch the reconfigure goroutine

func (*LocalCache) Get

func (sc *LocalCache) Get(ctx context.Context, path, token string) (io.ReadCloser, error)

Get path from the cache

func (*LocalCache) MarkObjectPurgeFirst

func (lc *LocalCache) MarkObjectPurgeFirst(objectPath string) (int, error)

MarkObjectPurgeFirst marks the given object path as PURGEFIRST by creating the corresponding sentinel file on disk and updating the in-memory data structures accordingly.

func (*LocalCache) ReconstructCache

func (lc *LocalCache) ReconstructCache() error

ReconstructCache rebuilds the in-memory data structures of the LocalCache based on the files present in the directory specified by "LocalCache.DataLocation".

func (*LocalCache) Stat

func (lc *LocalCache) Stat(path, token string) (uint64, error)

type LocalCacheOption

type LocalCacheOption = option.Interface

func WithDeferConfig

func WithDeferConfig(deferConfig bool) LocalCacheOption

Create an option to defer the configuration of the local cache

Useful in cases where the cache should be created before the web interface is up -- but the web interface is needed to complete configuration.

type MasterKeyFile

type MasterKeyFile struct {
	// Keys maps public key fingerprint to encrypted master key
	Keys map[string][]byte `json:"keys"`
}

MasterKeyFile represents the encrypted master key file format The master key is encrypted with each issuer private key

type NamespaceID

type NamespaceID uint32

NamespaceID identifies a namespace prefix. Each distinct prefix registered in the cache is assigned a monotonically increasing ID for efficient storage and lookup. Using a dedicated type prevents accidental confusion with StorageID or arbitrary uint32 values.

type ObjectBlockState

type ObjectBlockState struct {
	// contains filtered or unexported fields
}

ObjectBlockState holds the shared, thread-safe block availability state for a single cached object. All RangeReaders serving the same instanceHash share one instance, ensuring that block additions (downloads) and removals (corruption repairs) are immediately visible to every goroutine.

The repairMu mutex serializes repair operations: when multiple readers detect corruption simultaneously, only the first one performs the clear-fetch-verify cycle. Subsequent callers acquire repairMu, re-check block availability, and skip the repair if it has already been done.

func NewObjectBlockState

func NewObjectBlockState(bitmap *roaring.Bitmap) *ObjectBlockState

NewObjectBlockState wraps an existing bitmap in a thread-safe container.

func (*ObjectBlockState) Add

func (obs *ObjectBlockState) Add(block uint32)

Add marks a single block as downloaded.

func (*ObjectBlockState) AddRange

func (obs *ObjectBlockState) AddRange(start, end uint32)

AddRange marks all blocks in [start, end] as downloaded.

func (*ObjectBlockState) ClearDownloading

func (obs *ObjectBlockState) ClearDownloading()

ClearDownloading marks the background download as finished and wakes any goroutines waiting in WaitForBlock.

func (*ObjectBlockState) Clone

func (obs *ObjectBlockState) Clone() *roaring.Bitmap

Clone returns a point-in-time snapshot of the bitmap. The returned bitmap is independent (mutations to it do not affect the shared state).

func (*ObjectBlockState) Contains

func (obs *ObjectBlockState) Contains(block uint32) bool

Contains returns true if the given block is marked as downloaded.

func (*ObjectBlockState) ContainsRange

func (obs *ObjectBlockState) ContainsRange(start, end uint32) bool

ContainsRange returns true if every block in [start, end] is present. Creates a temporary range bitmap and checks if the intersection has the expected cardinality. This is O(log n) for typical bitmap layouts.

func (*ObjectBlockState) GetCardinality

func (obs *ObjectBlockState) GetCardinality() uint64

GetCardinality returns the number of blocks that are downloaded.

func (*ObjectBlockState) LockRepair

func (obs *ObjectBlockState) LockRepair()

LockRepair acquires the per-object repair mutex. Only one goroutine at a time may run the clear-fetch-verify repair cycle.

func (*ObjectBlockState) MissingInRange

func (obs *ObjectBlockState) MissingInRange(start, end uint32) []uint32

MissingInRange returns the list of blocks in [start, end] that are not present. Uses RoaringBitmap's iterator for efficiency instead of checking each block.

func (*ObjectBlockState) Remove

func (obs *ObjectBlockState) Remove(block uint32)

Remove marks a single block as not-downloaded.

func (*ObjectBlockState) RemoveMany

func (obs *ObjectBlockState) RemoveMany(blocks []uint32)

RemoveMany marks the given blocks as not-downloaded.

func (*ObjectBlockState) SetDownloading

func (obs *ObjectBlockState) SetDownloading()

SetDownloading marks this object as having a background download in progress. WaitForBlock will block while downloading is true.

func (*ObjectBlockState) UnlockRepair

func (obs *ObjectBlockState) UnlockRepair()

UnlockRepair releases the per-object repair mutex.

func (*ObjectBlockState) WaitForBlock

func (obs *ObjectBlockState) WaitForBlock(ctx context.Context, block uint32) bool

WaitForBlock waits until the specified block is available in the bitmap. It returns true if the block is available, false if the context was cancelled or the background download finished without producing the block. This avoids starting duplicate range downloads when a full download is already in progress.

The implementation spawns a goroutine to wait on the sync.Cond (which cannot be interrupted) and selects between it and ctx.Done(). On context cancellation, we signal the goroutine via a done channel and broadcast the cond, then wait for the goroutine to acknowledge exit before returning. This guarantees no goroutine leak.

type ObjectDetails

type ObjectDetails struct {
	ObjectInstance

	// Additional details
	NamespaceID   uint16            `json:"namespace_id"`
	StorageID     uint8             `json:"storage_id"`
	LastValidated time.Time         `json:"last_validated,omitempty"`
	CacheControl  string            `json:"cache_control,omitempty"`
	Checksums     []ChecksumInfo    `json:"checksums,omitempty"`
	BlockSummary  *BlockSummary     `json:"block_summary,omitempty"` // nil for inline storage
	ChunkSummary  *ChunkInfoSummary `json:"chunk_info,omitempty"`    // nil for non-chunked
}

ObjectDetails provides detailed metadata for a cached object instance.

type ObjectHash

type ObjectHash string

ObjectHash is an HMAC-SHA-256 digest that identifies a logical object (URL) regardless of version or ETag. Using a dedicated type prevents accidental confusion with InstanceHash or arbitrary strings.

func ComputeObjectHash

func ComputeObjectHash(salt []byte, pelicanURL string) ObjectHash

ComputeObjectHash computes HMAC-SHA-256(salt, normalized URL). This identifies the logical object (URL) regardless of version/ETag. The salt is generated once per cache database and prevents offline correlation of hashes with known URLs.

type ObjectInstance

type ObjectInstance struct {
	InstanceHash  string    `json:"instance_hash"`
	ETag          string    `json:"etag"`
	SourceURL     string    `json:"source_url"`
	ContentLength int64     `json:"content_length"`
	ContentType   string    `json:"content_type"`
	LastModified  time.Time `json:"last_modified,omitempty"`
	Completed     time.Time `json:"completed,omitempty"`
	LastAccessed  time.Time `json:"last_accessed,omitempty"`
	Expires       time.Time `json:"expires,omitempty"`
	IsLatest      bool      `json:"is_latest"` // True if this is the latest ETag
	IsInline      bool      `json:"is_inline"` // True if stored inline in database
}

ObjectInstance represents a cached instance of an object with a specific ETag.

type ObjectReader

type ObjectReader struct {
	// contains filtered or unexported fields
}

ObjectReader provides a reader interface for cached objects. For disk-backed objects, it caches the BlockEncryptor and ObjectBlockState at construction time so that Read/ReadAt calls do not need to look them up through the TTL caches on every call.

func (*ObjectReader) Close

func (r *ObjectReader) Close() error

Close releases the reader's reference to the underlying file handle. The actual FD is closed only when all references (including the TTL cache entry) have been released.

func (*ObjectReader) ContentType

func (r *ObjectReader) ContentType() string

ContentType returns the content type of the object

func (*ObjectReader) ETag

func (r *ObjectReader) ETag() string

ETag returns the ETag

func (*ObjectReader) LastModified

func (r *ObjectReader) LastModified() time.Time

LastModified returns the last modified time

func (*ObjectReader) Read

func (r *ObjectReader) Read(p []byte) (n int, err error)

Read implements io.Reader

func (*ObjectReader) ReadAt

func (r *ObjectReader) ReadAt(p []byte, off int64) (n int, err error)

ReadAt implements io.ReaderAt

func (*ObjectReader) Seek

func (r *ObjectReader) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker

func (*ObjectReader) Size

func (r *ObjectReader) Size() int64

Size returns the total size of the object

func (*ObjectReader) WriteTo

func (r *ObjectReader) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo. When io.Copy detects this interface it calls WriteTo directly instead of issuing many small Read calls with a 32 KB default buffer. This lets the ObjectReader control the batch size, reading and decrypting many blocks at once and writing the plaintext to w in large chunks.

type PersistentCache

type PersistentCache struct {
	// contains filtered or unexported fields
}

PersistentCache is the new persistent local cache implementation It uses BadgerDB for metadata and block tracking, and encrypted files on disk

func NewPersistentCache

func NewPersistentCache(ctx context.Context, egrp *errgroup.Group, cfg PersistentCacheConfig) (*PersistentCache, error)

NewPersistentCache creates a new persistent cache instance

func (*PersistentCache) BackdateObject

func (pc *PersistentCache) BackdateObject(objectPath string, age time.Duration) error

BackdateObject shifts the Completed timestamp of a cached object backward by the given duration. This is useful for testing the Age response header without waiting for wall-clock time to elapse.

func (*PersistentCache) Close

func (pc *PersistentCache) Close() error

Close shuts down the persistent cache.

Shutdown order:

  1. Cancel all in-flight downloads (via downloadCancel).
  2. Wait for completeDownload goroutines to finish — each one closes its own TransferClient, so all clients are gone before step 3.
  3. Shut down the transfer engine (no live clients remain).
  4. Stop the consistency checker.
  5. Close the database.

func (*PersistentCache) Config

func (pc *PersistentCache) Config(egrp *errgroup.Group) error

Config configures the cache and starts periodic updates

func (*PersistentCache) EvictObject

func (pc *PersistentCache) EvictObject(objectPath, token string) error

EvictObject is the programmatic API for evicting an object by path. Returns nil on success, an error if the object is in use or the eviction fails.

func (*PersistentCache) EvictPrefix

func (pc *PersistentCache) EvictPrefix(prefix, token string, immediate bool) (int, error)

EvictPrefix evicts all cached objects whose source URL starts with the given path prefix (or matches it exactly). It iterates every metadata entry, selects those with a matching SourceURL, and either deletes them immediately or marks them for priority eviction (purge-first) depending on the immediate flag. Objects that are currently being downloaded are skipped. The returned count reflects objects that were acted on.

func (*PersistentCache) Get

func (pc *PersistentCache) Get(ctx context.Context, objectPath, token string) (io.ReadCloser, error)

Get retrieves an object from the cache, downloading if necessary

func (*PersistentCache) GetCacheStats

func (pc *PersistentCache) GetCacheStats() (*CacheStats, error)

introspectStatsHandler returns aggregate cache size statistics. GetCacheStats returns aggregate cache size statistics by scanning metadata and reading usage counters. This is the same data returned by the introspect/stats HTTP endpoint.

func (*PersistentCache) GetMetadata

func (pc *PersistentCache) GetMetadata(objectPath, token string) (*CacheMetadata, error)

GetMetadata returns the cache metadata for an object if it exists. Returns nil, nil if the object is not cached.

func (*PersistentCache) GetRange

func (pc *PersistentCache) GetRange(ctx context.Context, objectPath, token, rangeHeader string) (io.ReadCloser, error)

GetRange retrieves a range of an object from the cache

func (*PersistentCache) GetSeekableReader

func (pc *PersistentCache) GetSeekableReader(ctx context.Context, objectPath, bearerToken string, rangeOnly bool) (*SeekableReader, *CacheMetadata, error)

GetSeekableReader returns a seekable reader for the full object with on-demand block fetching. This is designed for use with http.ServeContent which handles Range requests internally.

When rangeOnly is true and the object is not yet cached, GetSeekableReader uses a lightweight HEAD request to initialise on-disk storage instead of starting a full sequential download. This allows BlockFetcherV2 to fetch only the blocks the caller actually reads, avoiding a potentially expensive full transfer. Callers should set rangeOnly when they know the request is for a sub-range of the object (e.g. an HTTP Range request).

func (*PersistentCache) GetStats

func (pc *PersistentCache) GetStats() PersistentCacheStats

GetStats returns cache statistics

func (*PersistentCache) HeadObject

func (pc *PersistentCache) HeadObject(objectPath, token string) (*HeadResult, error)

HeadObject returns metadata for an object without triggering a download. If the object is cached, the full CacheMetadata is returned. If not cached, the origin is queried via HEAD (DoStat) for the size.

func (*PersistentCache) IsFullyCached

func (pc *PersistentCache) IsFullyCached(ctx context.Context, objectPath, token string) bool

IsFullyCached returns true if the object at the given path is fully downloaded and present in the cache (metadata exists, ContentLength >= 0, and the download has been marked complete).

When the cached entry carries Cache-Control directives and is stale, a revalidation request is made to the origin. If revalidation confirms the same ETag (or fails gracefully, serving stale), the object is still considered fully cached. If the origin provides a new version or responds with no-store, the method returns false so the caller falls through to a full download.

func (*PersistentCache) KeyChangeCallback

func (pc *PersistentCache) KeyChangeCallback() func(ctx context.Context) error

KeyChangeCallback returns a callback function that re-encrypts the master key when issuer keys change. This is used with LaunchIssuerKeysDirRefresh to ensure the cache data remains accessible as long as any issuer key is available. The callback updates the masterkey.json file with the master key encrypted under all current issuer keys.

func (*PersistentCache) LaunchListener

func (pc *PersistentCache) LaunchListener(ctx context.Context, egrp *errgroup.Group) (err error)

LaunchListener launches the unix socket listener for the persistent cache

func (*PersistentCache) MarkPurgeFirst

func (pc *PersistentCache) MarkPurgeFirst(objectPath string) error

MarkPurgeFirst marks an object to be purged first during next eviction

func (*PersistentCache) Purge

func (pc *PersistentCache) Purge() error

Purge triggers the eviction manager to purge old entries

func (*PersistentCache) PurgeToBytes

func (pc *PersistentCache) PurgeToBytes(targetBytes uint64) (uint64, int64, error)

PurgeToBytes triggers the eviction manager to purge down to the specified total usage target. Returns (bytes freed, objects evicted, error).

func (*PersistentCache) Register

func (pc *PersistentCache) Register(ctx context.Context, router *gin.RouterGroup)

Register registers the control & monitoring routines with Gin

func (*PersistentCache) RegisterCacheHandlers

func (pc *PersistentCache) RegisterCacheHandlers(engine *gin.Engine, directorEnabled bool) error

RegisterCacheHandlers registers HTTP handlers for the persistent cache on the Gin engine. This is used for the XRootD-free cache implementation where the cache serves content directly via the web server instead of using XRootD.

Unlike LaunchListener (which creates a Unix socket), this registers handlers on the existing Gin web server, allowing it to serve cache requests on the standard HTTP/HTTPS ports.

The /api/v1.0/cache/data/:discovery/*path routes are always registered regardless of whether a director is in use. When running standalone (directorEnabled=false), a NoRoute fallback is also registered so that bare object paths (without the /api prefix) are served. The /api namespace is always reserved and never served by the NoRoute fallback.

func (*PersistentCache) SetFedToken

func (pc *PersistentCache) SetFedToken(tok string)

SetFedToken stores the federation token in memory. It is called by cache.LaunchFedTokManager (via the onTokenUpdate callback) whenever the token is created or refreshed, eliminating the need to read the token back from disk on every origin request.

func (*PersistentCache) Stat

func (pc *PersistentCache) Stat(objectPath, token string) (uint64, error)

Stat returns the size of an object, querying the origin if not cached

func (*PersistentCache) StatCachedOnly

func (pc *PersistentCache) StatCachedOnly(objectPath, token string) (uint64, error)

StatCachedOnly returns the size of an object only if it's cached. Returns 0, ErrNotCached if the object is not in the cache.

type PersistentCacheConfig

type PersistentCacheConfig struct {
	// Mode selects which configuration namespace to use for defaults.
	// CacheModeServer reads Cache.StorageLocation, Cache.HighWaterMark, etc.
	// CacheModeLocal (the zero value) reads LocalCache.DataLocation,
	// LocalCache.HighWaterMarkPercentage, etc.
	Mode CacheMode

	// BaseDir is the root directory for the cache.  The BadgerDB database
	// lives directly under BaseDir.  If StorageDirs is empty, a single
	// storage directory is created under BaseDir/objects.
	BaseDir string

	// StorageDirs configures one or more storage directories.
	// Each entry describes a directory path and its size limits.
	// When empty, a single directory under BaseDir is used with
	// MaxSize / HighWaterMarkPercentage / LowWaterMarkPercentage
	// as its limits (for backward compatibility).
	StorageDirs []StorageDirConfig

	// Legacy single-directory fields — used only when StorageDirs is empty.
	MaxSize                 uint64
	HighWaterMarkPercentage int
	LowWaterMarkPercentage  int

	// InlineStorageMaxBytes sets the maximum size for objects stored
	// inline in BadgerDB.  Objects at or below this threshold are stored
	// inline; larger objects go to disk.  0 means use the default (4096).
	InlineStorageMaxBytes int

	DefaultFederation string

	// DeferConfig delays the initial director namespace fetch until
	// Config() is called explicitly.  The server launcher sets this to
	// true because the director may not be reachable when the cache is
	// first constructed (e.g. cache starts before director discovery).
	DeferConfig bool
}

PersistentCacheConfig holds configuration for the persistent cache

type PersistentCacheStats

type PersistentCacheStats struct {
	TotalUsage       uint64
	DirStats         map[StorageID]DirEvictionStats
	NamespaceUsage   map[string]int64
	ConsistencyStats ConsistencyStats
}

PersistentCacheStats holds cache statistics

type PrestageManager

type PrestageManager struct {
	// contains filtered or unexported fields
}

PrestageManager manages per-identity worker pools for prestage operations.

func NewPrestageManager

func NewPrestageManager(pc *PersistentCache) *PrestageManager

NewPrestageManager creates a prestage manager for the given cache.

func (*PrestageManager) Submit

func (pm *PrestageManager) Submit(ident string, req *prestageRequest) bool

Submit queues a prestage request for the given identity. Returns false if the queue is full (caller should return 429).

type RangeReader

type RangeReader struct {
	// contains filtered or unexported fields
}

RangeReader implements io.ReadSeeker over a byte range of a cached object. It is the primary read-path abstraction that ties the block-level storage layer to HTTP response serving.

Callers obtain a RangeReader via PersistentCache.GetRange (for explicit byte-range requests) or via PersistentCache.GetSeekableReader (wrapped in a SeekableReader so that http.ServeContent can handle Range negotiation).

For normal (cached / cacheable) responses the RangeReader reads encrypted blocks from the StorageManager, fetching missing blocks on demand via the fetchBlocks callback wired to a BlockFetcherV2 instance. It also performs a single round of transparent auto-repair when AES-GCM authentication detects on-disk corruption.

For responses marked Cache-Control: no-store, the origin's body is piped directly through the noStoreReader field; the block-storage path is bypassed entirely.

func NewRangeReader

func NewRangeReader(
	storage *StorageManager,
	instanceHash InstanceHash,
	start, end int64,
	fetchBlocks func(ctx context.Context, startBlock, endBlock uint32) error,
) (*RangeReader, error)

NewRangeReader creates a reader for a range request fetchBlocks is called when blocks need to be fetched from origin

func (*RangeReader) Close

func (rr *RangeReader) Close() error

Close closes the range reader

func (*RangeReader) ContentLength

func (rr *RangeReader) ContentLength() int64

ContentLength returns the length of the range

func (*RangeReader) ContentRange

func (rr *RangeReader) ContentRange() string

ContentRange returns the Content-Range header value

func (*RangeReader) Read

func (rr *RangeReader) Read(p []byte) (n int, err error)

Read implements io.Reader with on-demand block fetching

func (*RangeReader) ReadContext

func (rr *RangeReader) ReadContext(ctx context.Context, p []byte) (n int, err error)

ReadContext reads with context for cancellation

func (*RangeReader) Seek

func (rr *RangeReader) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker (limited to within the range)

type RangeRequest

type RangeRequest struct {
	Start int64
	End   int64 // -1 means "to end of file"
}

RangeRequest represents a parsed HTTP Range header

func ParseRangeHeader

func ParseRangeHeader(rangeHeader string, contentLength int64) ([]RangeRequest, error)

ParseRangeHeader parses an HTTP Range header Supports: "bytes=start-end", "bytes=start-", "bytes=-suffix"

type ScanProgressEvent

type ScanProgressEvent struct {
	// Phase is the scan phase: "metadata" or "data".
	Phase string `json:"phase"`

	// PercentComplete is the estimated completion (0–100).
	PercentComplete float64 `json:"percent_complete"`

	// DBEntriesScanned is the number of DB entries processed so far (metadata only).
	DBEntriesScanned int64 `json:"db_entries_scanned,omitempty"`

	// FilesScanned is the number of filesystem entries processed (metadata only).
	FilesScanned int64 `json:"files_scanned,omitempty"`

	// ObjectsVerified is the number of objects checksummed so far (data only).
	ObjectsVerified int64 `json:"objects_verified,omitempty"`

	// BytesVerified is the total bytes read for checksumming (data only).
	BytesVerified int64 `json:"bytes_verified,omitempty"`

	// Message is a human-readable status line.
	Message string `json:"message,omitempty"`
}

ScanProgressEvent is sent as an SSE event to report scan progress. The Event field (used as SSE event type) is one of:

  • "metadata_progress": periodic update during the metadata scan
  • "metadata_done": the metadata scan has finished
  • "data_progress": periodic update during the data scan
  • "data_done": the data scan has finished
  • "error": an error occurred
  • "done": all scans complete; the Data field is a ConsistencyCheckResult

type SeekableReader

type SeekableReader struct {
	*RangeReader
}

SeekableReader is a reader that supports seeking and on-demand block fetching. It implements io.ReadSeekCloser for use with http.ServeContent.

For no-store streaming responses, the reader is NOT seekable (IsNoStore returns true). In that case the handler must use io.Copy instead of http.ServeContent.

func (*SeekableReader) IsNoStore

func (sr *SeekableReader) IsNoStore() bool

IsNoStore returns true when this reader wraps a streaming no-store response. The caller must NOT use http.ServeContent (which requires seeking); instead it should stream the response with io.Copy and set headers manually.

type StorageDirConfig

type StorageDirConfig struct {
	// Path is the directory that will hold an "objects/" subdirectory
	// and, for the first directory, the database.
	Path string
	// MaxSize is the maximum number of bytes stored on this directory.
	// If 0, auto-detected from the filesystem at startup.
	MaxSize uint64
	// HighWaterMarkPercentage overrides the global high-water mark for this
	// directory.  0 means use the global default.
	HighWaterMarkPercentage int
	// LowWaterMarkPercentage overrides the global low-water mark for this
	// directory.  0 means use the global default.
	LowWaterMarkPercentage int
}

StorageDirConfig describes one disk-backed storage directory. Multiple directories can be configured to spread data across devices; each directory has its own maximum size and optional watermark overrides.

func ParseStorageDirsConfig

func ParseStorageDirsConfig() ([]StorageDirConfig, error)

ParseStorageDirsConfig reads the LocalCache.StorageDirs setting from Viper and returns parsed StorageDirConfig values. It accepts two formats for backward compatibility:

  1. A list of strings (paths only): LocalCache: StorageDirs: - /mnt/cache1 - /mnt/cache2

  2. A list of objects with per-directory configuration: LocalCache: StorageDirs: - Path: /mnt/cache1 MaxSize: 500GB HighWaterMarkPercentage: 95 LowWaterMarkPercentage: 85 - Path: /mnt/cache2 MaxSize: 2TB

Returns nil (not an error) when the key is unset or empty.

type StorageDirInfo

type StorageDirInfo struct {
	StorageID  StorageID
	ObjectsDir string
}

StorageDirInfo describes a configured storage directory at runtime.

type StorageDirStats

type StorageDirStats struct {
	StorageID   uint8 `json:"storage_id"`
	ObjectCount int64 `json:"object_count"`
	TotalBytes  int64 `json:"total_bytes"`   // Sum of ContentLength for objects in this dir
	InlineCount int64 `json:"inline_count"`  // Number of inline objects
	InlineBytes int64 `json:"inline_bytes"`  // Sum of ContentLength for inline objects
	OnDiskCount int64 `json:"on_disk_count"` // Number of on-disk objects
	OnDiskBytes int64 `json:"on_disk_bytes"` // Actual bytes on disk (content + per-block MAC overhead)
}

StorageDirStats holds per-storage-directory statistics.

type StorageID

type StorageID uint8

StorageID identifies a storage location. 0 means inline (data in BadgerDB), 1–255 mean disk-backed storage in the directory mapped to that ID. Using a dedicated type prevents accidental confusion with NamespaceID or arbitrary uint8 values.

const (
	// StorageIDInline is the storage ID for inline data stored in BadgerDB
	StorageIDInline StorageID = 0
	// StorageIDFirstDisk is the storage ID for the first configured disk directory.
	// Additional directories use StorageIDFirstDisk+1, +2, etc.
	StorageIDFirstDisk StorageID = 1
)

type StorageManager

type StorageManager struct {
	// contains filtered or unexported fields
}

func NewStorageManager

func NewStorageManager(db *CacheDB, dirs []string, inlineMax int, egrp *errgroup.Group) (*StorageManager, error)

NewStorageManager creates a new storage manager with UUID-based directory identity. It performs the following steps:

  1. Load existing storageID → (UUID, path) mappings from the database.
  2. Scan the supplied directory paths for UUID files.
  3. Match discovered UUIDs against known mappings, updating paths as needed (so that sysadmins can remount directories at different locations).
  4. Assign new storage IDs to directories that have no UUID yet and drop a new UUID file in each.
  5. Persist updated mappings to the database.

dirs is the ordered set of configured directory paths. inlineMax sets the maximum inline object size (0 = use default InlineThreshold).

func NewStorageManagerReadOnly

func NewStorageManagerReadOnly(baseDir string, db *CacheDB) (*StorageManager, error)

NewStorageManagerReadOnly creates a storage manager for read-only introspection. This is a lightweight variant suitable for CLI tools that only need to read metadata and block states, not perform downloads or writes.

func (*StorageManager) AllocateChunk

func (sm *StorageManager) AllocateChunk(
	ctx context.Context,
	instanceHash InstanceHash,
	meta *CacheMetadata,
	chunkIndex int,
) (*CacheMetadata, error)

AllocateChunk allocates a storage directory for a chunk and creates the chunk file. This must be called before writing to a chunk that has StorageID = 0 (unallocated).

The storage directory is chosen via the pluggable chooseDir function, which defaults to simple round-robin but is replaced in production with EvictionManager.ChooseDiskStorage (weighted by free space).

Returns the updated CacheMetadata (which should be used for subsequent operations).

func (*StorageManager) Close

func (sm *StorageManager) Close()

Close stops TTL cache eviction goroutines and releases cached resources.

func (*StorageManager) Delete

func (sm *StorageManager) Delete(instanceHash InstanceHash) error

Delete removes an object from storage

func (*StorageManager) DirCount

func (sm *StorageManager) DirCount() int

DirCount returns the number of configured storage directories. This is used to determine if chunking should be enabled.

func (*StorageManager) DirIDs

func (sm *StorageManager) DirIDs() []StorageID

DirIDs returns the list of configured storage directory IDs in sorted order.

func (*StorageManager) EvictByLRU

func (sm *StorageManager) EvictByLRU(storageID StorageID, namespaceID NamespaceID, maxObjects int, maxBytes int64) ([]evictedObject, uint64, error)

EvictByLRU walks the LRU index for a given storage+namespace and evicts the oldest objects until either maxObjects have been removed or maxBytes of content has been freed — whichever comes first. A value of 0 for either limit means "no limit on that dimension". The method is allowed to go one object over the byte threshold to prevent starvation.

All DB mutations happen atomically; filesystem deletes follow afterward. Returns the evicted objects, total bytes freed, and any error.

func (*StorageManager) GetDirs

func (sm *StorageManager) GetDirs() map[StorageID]string

GetDirs returns the configured storage directories (storageID → objects dir).

func (*StorageManager) GetMetadata

func (sm *StorageManager) GetMetadata(instanceHash InstanceHash) (*CacheMetadata, error)

GetMetadata retrieves metadata for an object

func (*StorageManager) GetMissingBlocks

func (sm *StorageManager) GetMissingBlocks(instanceHash InstanceHash) ([]BlockRange, error)

GetMissingBlocks returns the ranges of blocks that haven't been downloaded yet

func (*StorageManager) GetObjectSize

func (sm *StorageManager) GetObjectSize(instanceHash InstanceHash) (int64, error)

GetObjectSize returns the content length of a cached object

func (*StorageManager) GetSharedBlockState

func (sm *StorageManager) GetSharedBlockState(instanceHash InstanceHash) (*ObjectBlockState, error)

GetSharedBlockState returns the shared, thread-safe block state for the given instanceHash. The state is loaded from the persistent database on first access and cached in a TTL cache that evicts idle entries after blockStateTTL. Every call touches the entry's TTL so actively-used states remain resident. All callers for the same instanceHash receive the same *ObjectBlockState.

func (*StorageManager) HasObject

func (sm *StorageManager) HasObject(instanceHash InstanceHash) (bool, error)

HasObject checks if an object exists in storage

func (*StorageManager) IdentifyCorruptBlocks

func (sm *StorageManager) IdentifyCorruptBlocks(instanceHash InstanceHash, startBlock, endBlock uint32) ([]uint32, error)

IdentifyCorruptBlocks probes each block in [startBlock, endBlock] on disk and returns the block numbers whose AES-GCM authentication tag fails or whose on-disk data is too short. Blocks that are not in the downloaded bitmap are skipped. A missing or unopenable file returns all requested blocks.

func (*StorageManager) InitDiskStorage

func (sm *StorageManager) InitDiskStorage(ctx context.Context, instanceHash InstanceHash, contentLength int64, storageID StorageID, namespaceID NamespaceID) (*CacheMetadata, error)

InitDiskStorage initializes disk storage for a large object in the specified storage directory. The full contentLength is charged to the usage counter at creation time so that the accounting matches the filesystem's pre-allocation (Truncate). Returns the metadata with encryption keys set up.

func (*StorageManager) InitLazyChunkedStorage

func (sm *StorageManager) InitLazyChunkedStorage(
	ctx context.Context,
	instanceHash InstanceHash,
	contentLength int64,
	chunkSizeCode ChunkSizeCode,
) (*CacheMetadata, error)

InitLazyChunkedStorage initializes metadata for a chunked object WITHOUT creating chunk files. Files are created lazily when AllocateChunk is called (typically on first write to each chunk). This supports byte-range downloads where chunks may be written out of order.

All chunk StorageIDs are initialized to 0 (unallocated). Use AllocateChunk to assign a storage directory and create the file for each chunk before writing.

Parameters:

  • instanceHash: unique identifier for this object version
  • contentLength: total object size in bytes
  • chunkSizeCode: chunk size encoding (must be non-zero for chunked storage)

func (*StorageManager) InlineMaxBytes

func (sm *StorageManager) InlineMaxBytes() int

InlineMaxBytes returns the configured maximum inline object size.

func (*StorageManager) InvalidateSharedBlockState

func (sm *StorageManager) InvalidateSharedBlockState(instanceHash InstanceHash)

InvalidateSharedBlockState removes the cached block state for an instanceHash, forcing the next GetSharedBlockState call to reload from the database. This should be called when an object is deleted or evicted.

func (*StorageManager) IsComplete

func (sm *StorageManager) IsComplete(instanceHash InstanceHash) (bool, error)

IsComplete checks if all blocks have been downloaded

func (*StorageManager) MergeMetadata

func (sm *StorageManager) MergeMetadata(instanceHash InstanceHash, meta *CacheMetadata) error

MergeMetadata performs an atomic read-modify-update of object metadata

func (*StorageManager) NewBlockWriter

func (sm *StorageManager) NewBlockWriter(instanceHash InstanceHash, startBlock uint32, existingBitmap *roaring.Bitmap, onComplete func()) (*BlockWriter, error)

NewBlockWriter creates a new block writer for streaming encrypted writes to disk. The existingBitmap parameter allows resuming partial downloads by skipping already-downloaded blocks. startBlock specifies which block number the first incoming byte corresponds to (use 0 to start from the beginning).

func (*StorageManager) NewObjectReader

func (sm *StorageManager) NewObjectReader(instanceHash InstanceHash) (*ObjectReader, error)

NewObjectReader creates a reader for a cached object

func (*StorageManager) ReadBlocks

func (sm *StorageManager) ReadBlocks(instanceHash InstanceHash, startOffset int64, length int) ([]byte, error)

ReadBlocks reads and decrypts blocks from disk storage. It uses the shared ObjectBlockState to check block availability.

func (*StorageManager) ReadBlocksInto

func (sm *StorageManager) ReadBlocksInto(dst []byte, instanceHash InstanceHash, startOffset int64) (int, error)

ReadBlocksInto reads and decrypts blocks from disk storage directly into the caller-provided dst buffer. It returns the number of bytes written to dst. This avoids the result allocation and copy that ReadBlocks performs. dst must be large enough to hold the requested data.

func (*StorageManager) ReadInline

func (sm *StorageManager) ReadInline(instanceHash InstanceHash) ([]byte, error)

ReadInline reads small data from inline storage

func (*StorageManager) SetMetadata

func (sm *StorageManager) SetMetadata(instanceHash InstanceHash, meta *CacheMetadata) error

SetMetadata stores metadata for an object (full replace, use for initial creation only)

func (*StorageManager) StoreInline

func (sm *StorageManager) StoreInline(ctx context.Context, instanceHash InstanceHash, meta *CacheMetadata, data []byte) error

StoreInline stores small data inline in BadgerDB

func (*StorageManager) WriteBlocks

func (sm *StorageManager) WriteBlocks(instanceHash InstanceHash, startOffset int64, data []byte) error

WriteBlocks writes encrypted blocks to disk storage This is the main write path for large objects

type StorageUsageKey

type StorageUsageKey struct {
	StorageID   StorageID
	NamespaceID NamespaceID
}

StorageUsageKey combines storage ID and namespace ID for usage tracking

type VerificationResult

type VerificationResult struct {
	InstanceHash   string           `json:"instance_hash"`
	Valid          bool             `json:"valid"`
	Error          string           `json:"error,omitempty"`
	ChecksumStatus []ChecksumStatus `json:"checksum_status,omitempty"`
}

VerificationResult contains the result of a checksum verification.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL