Documentation
¶
Overview ¶
persistent_cache.go implements PersistentCache, the main entry point for the local-cache subsystem. It coordinates:
- Object resolution and download orchestration (resolveObject, downloadObject, performDownload).
- Cache-Control / ETag-based revalidation (revalidateObject).
- Inline (small-file) vs disk (large-file) storage decisions via decisionWriter.
- No-store passthrough streaming when the origin forbids caching.
- Namespace ID assignment and federation token management.
- Background configuration updates from the director.
Supporting types live in other files: CacheDB (database.go), StorageManager / BlockWriter (storage.go), EvictionManager (eviction.go), ConsistencyChecker (consistency.go), and BlockFetcherV2 (block_fetcher.go).
Index ¶
- Constants
- Variables
- func BlockOffset(blockNum uint32) int64
- func BlocksInChunk(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) (startBlock, endBlock uint32)
- func CalculateBlockCount(contentLength int64) uint32
- func CalculateChunkCount(contentLength int64, chunkSizeCode ChunkSizeCode) int
- func CalculateFileSize(contentLength int64) int64
- func CheckCacheObjectExists(ctx context.Context, socketPath, objectPath string) (bool, error)
- func CheckCacheObjectIsCached(ctx context.Context, socketPath, objectPath string) (bool, error)
- func ChunkContainsBlock(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int, ...) bool
- func ChunkContentLength(totalContentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) int64
- func ChunkSizeCodeToBytes(code ChunkSizeCode) uint64
- func ContentOffsetToBlock(contentOffset int64) uint32
- func ContentOffsetToChunk(contentOffset int64, chunkSizeCode ChunkSizeCode) int
- func ContentOffsetWithinBlock(contentOffset int64) int
- func DefaultFreshness(lastValidated time.Time) time.Duration
- func DiskMappingKey(storageID StorageID) []byte
- func ETagKey(objectHash ObjectHash) []byte
- func FormatChunkSize(code ChunkSizeCode) string
- func FormatContentRange(start, end, total int64) string
- func GetChunkFileSuffix(chunkIndex int) string
- func GetChunkPath(basePath string, chunkIndex int) string
- func GetChunkRange(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) (start, end int64)
- func GetInstanceStoragePath(hash InstanceHash) string
- func InitIssuerKeyForTests(t testing.TB)
- func InlineKey(instanceHash InstanceHash) []byte
- func IsRangeRequest(req *http.Request) bool
- func LRUKey(storageID StorageID, namespaceID NamespaceID, timestamp time.Time, ...) []byte
- func MetaKey(instanceHash InstanceHash) []byte
- func NamespaceKey(prefix string) []byte
- func NormalizePelicanURL(urlStr string) string
- func OffsetInChunk(contentOffset int64, chunkSizeCode ChunkSizeCode) int64
- func ParseLRUKey(key []byte) (storageID StorageID, namespaceID NamespaceID, timestamp time.Time, ...)
- func ParseUsageKey(key []byte) (storageID StorageID, namespaceID NamespaceID, err error)
- func PurgeFirstKey(instanceHash InstanceHash) []byte
- func RemainingFreshness(lastValidated time.Time) time.Duration
- func StateKey(instanceHash InstanceHash) []byte
- func UsageKey(storageID StorageID, namespaceID NamespaceID) []byte
- func ValidateChunkLocations(contentLength int64, chunkSizeCode ChunkSizeCode, locations []ChunkLocation) error
- type AuthorizationError
- type Batch
- type BlockEncryptor
- func (be *BlockEncryptor) DecryptBlock(blockNum uint32, encryptedBlock []byte) ([]byte, error)
- func (be *BlockEncryptor) DecryptBlockTo(dst []byte, blockNum uint32, encryptedBlock []byte) ([]byte, error)
- func (be *BlockEncryptor) EncryptBlock(blockNum uint32, data []byte) ([]byte, error)
- func (be *BlockEncryptor) EncryptBlockTo(dst []byte, blockNum uint32, data []byte) ([]byte, error)
- type BlockFetcherV2
- func (bf *BlockFetcherV2) AdoptTransfer(ctx context.Context, tc *client.TransferClient, dw *decisionWriter, ...) *fetchOperation
- func (bf *BlockFetcherV2) Close()
- func (bf *BlockFetcherV2) CreateFetchCallback() func(ctx context.Context, startBlock, endBlock uint32) error
- func (bf *BlockFetcherV2) FetchBlocks(ctx context.Context, startBlock, endBlock uint32) error
- func (bf *BlockFetcherV2) FetchBlocksAsync(ctx context.Context, startBlock, endBlock uint32) (*fetchOperation, error)
- func (bf *BlockFetcherV2) GetChunkChannel(op *fetchOperation, chunkIndex int64) <-chan struct{}
- func (bf *BlockFetcherV2) StartPrefetch(ctx context.Context)
- func (bf *BlockFetcherV2) WaitForChunkWithETA(ctx context.Context, op *fetchOperation, chunkIndex int64) (completed bool, err error)
- type BlockFetcherV2Config
- type BlockRange
- type BlockSummary
- type BlockWriter
- type CacheDB
- func (cdb *CacheDB) AddUsage(storageID StorageID, namespaceID NamespaceID, delta int64) error
- func (cdb *CacheDB) ChargeUsage(storageID StorageID, namespaceID NamespaceID, delta int64) error
- func (cdb *CacheDB) ClearBlocks(instanceHash InstanceHash, blocks []uint32) error
- func (cdb *CacheDB) Close() error
- func (cdb *CacheDB) ComputeActualUsage() (map[StorageUsageKey]int64, error)
- func (cdb *CacheDB) ComputeInlineDataSize() (int64, error)
- func (cdb *CacheDB) DeleteBlockState(instanceHash InstanceHash) error
- func (cdb *CacheDB) DeleteLatestETag(objectHash ObjectHash) error
- func (cdb *CacheDB) DeleteMetadata(instanceHash InstanceHash) error
- func (cdb *CacheDB) DeleteObject(instanceHash InstanceHash) error
- func (cdb *CacheDB) EvictByLRU(storageID StorageID, namespaceID NamespaceID, maxObjects int, maxBytes int64) ([]evictedObject, error)
- func (cdb *CacheDB) FindRecyclableStorageID(mountedDirs map[StorageID]string) (StorageID, error)
- func (cdb *CacheDB) GetAllUsage() (map[StorageUsageKey]int64, error)
- func (cdb *CacheDB) GetBlockState(instanceHash InstanceHash) (*roaring.Bitmap, error)
- func (cdb *CacheDB) GetDirUsage(storageID StorageID) (map[NamespaceID]int64, error)
- func (cdb *CacheDB) GetDownloadedBlockCount(instanceHash InstanceHash) (uint64, error)
- func (cdb *CacheDB) GetEncryptionManager() *EncryptionManager
- func (cdb *CacheDB) GetInlineData(instanceHash InstanceHash) ([]byte, error)
- func (cdb *CacheDB) GetLatestETag(objectHash ObjectHash) (string, bool, error)
- func (cdb *CacheDB) GetMetadata(instanceHash InstanceHash) (*CacheMetadata, error)
- func (cdb *CacheDB) GetUsage(storageID StorageID, namespaceID NamespaceID) (int64, error)
- func (cdb *CacheDB) HasMetadata(instanceHash InstanceHash) (bool, error)
- func (cdb *CacheDB) InstanceHash(etag string, objectHash ObjectHash) InstanceHash
- func (cdb *CacheDB) IsBlockDownloaded(instanceHash InstanceHash, blockNum uint32) (bool, error)
- func (cdb *CacheDB) IsPurgeFirst(instanceHash InstanceHash) (bool, error)
- func (cdb *CacheDB) LoadDiskMappings() ([]DiskMapping, error)
- func (cdb *CacheDB) LoadNamespaceMappings() (map[string]NamespaceID, NamespaceID, error)
- func (cdb *CacheDB) MarkBlocksDownloaded(instanceHash InstanceHash, startBlock, endBlock uint32, storageID StorageID, ...) error
- func (cdb *CacheDB) MarkPurgeFirst(instanceHash InstanceHash) error
- func (cdb *CacheDB) MergeBlockStateWithUsage(instanceHash InstanceHash, newBlocks *roaring.Bitmap, storageID StorageID, ...) error
- func (cdb *CacheDB) MergeMetadata(instanceHash InstanceHash, incoming *CacheMetadata) error
- func (cdb *CacheDB) NewBatch() *Batch
- func (cdb *CacheDB) ObjectHash(pelicanURL string) ObjectHash
- func (cdb *CacheDB) PurgeStorageID(storageID StorageID) error
- func (cdb *CacheDB) Salt() []byte
- func (cdb *CacheDB) SaveDiskMapping(dm DiskMapping) error
- func (cdb *CacheDB) ScanMetadata(fn func(instanceHash InstanceHash, meta *CacheMetadata) error) error
- func (cdb *CacheDB) ScanMetadataFrom(startKey InstanceHash, ...) error
- func (cdb *CacheDB) SetBlockState(instanceHash InstanceHash, bitmap *roaring.Bitmap) error
- func (cdb *CacheDB) SetInlineData(instanceHash InstanceHash, encryptedData []byte) error
- func (cdb *CacheDB) SetLatestETag(objectHash ObjectHash, etag string, observedAt time.Time) error
- func (cdb *CacheDB) SetMetadata(instanceHash InstanceHash, meta *CacheMetadata) error
- func (cdb *CacheDB) SetNamespaceMapping(prefix string, id NamespaceID) error
- func (cdb *CacheDB) SetUsage(storageID StorageID, namespaceID NamespaceID, value int64) error
- func (cdb *CacheDB) StartGC(ctx context.Context, egrp *errgroup.Group)
- func (cdb *CacheDB) UnmarkPurgeFirst(instanceHash InstanceHash) error
- func (cdb *CacheDB) UpdateLRU(instanceHash InstanceHash, debounceTime time.Duration) error
- type CacheDirectives
- func (cd *CacheDirectives) Flags() uint8
- func (cd *CacheDirectives) Freshness() (time.Duration, bool)
- func (cd *CacheDirectives) HasDirectives() bool
- func (cd *CacheDirectives) HasFreshness() bool
- func (cd *CacheDirectives) IsStale(lastValidated time.Time) bool
- func (cd *CacheDirectives) IsStaleWithDefaults(lastValidated time.Time) bool
- func (cd *CacheDirectives) MaxAge() time.Duration
- func (cd *CacheDirectives) MaxAgeSet() bool
- func (cd *CacheDirectives) MustRevalidate() bool
- func (cd *CacheDirectives) NoCache() bool
- func (cd *CacheDirectives) NoStore() bool
- func (cd *CacheDirectives) Private() bool
- func (cd *CacheDirectives) ShouldStore() bool
- type CacheMetadata
- func (m *CacheMetadata) AllStorageIDs() []StorageID
- func (m *CacheMetadata) ChunkCount() int
- func (m *CacheMetadata) ComputeExpires() time.Time
- func (m *CacheMetadata) EnsureExpires()
- func (m *CacheMetadata) GetCacheControlHeader() string
- func (m *CacheMetadata) GetCacheDirectives() CacheDirectives
- func (m *CacheMetadata) GetChunkInfo() []ChunkInfo
- func (m *CacheMetadata) GetChunkStorageID(chunkIndex int) StorageID
- func (m *CacheMetadata) IsChunkAllocated(chunkIndex int) bool
- func (m *CacheMetadata) IsChunked() bool
- func (m *CacheMetadata) IsDisk() bool
- func (m *CacheMetadata) IsInline() bool
- func (m *CacheMetadata) PerDirectoryBytes() map[StorageID]int64
- func (m *CacheMetadata) ResponseCacheControl() string
- func (m *CacheMetadata) SetCacheControl(header string)
- func (m *CacheMetadata) SetChunkStorageID(chunkIndex int, storageID StorageID)
- type CacheMode
- type CacheStats
- type Checksum
- type ChecksumInfo
- type ChecksumStatus
- type ChecksumType
- type ChunkInfo
- type ChunkInfoSummary
- type ChunkLocation
- type ChunkNotification
- type ChunkSizeCode
- type ConsistencyCheckResult
- type ConsistencyChecker
- func (cc *ConsistencyChecker) GetStats() ConsistencyStats
- func (cc *ConsistencyChecker) RunDataScan(ctx context.Context, progressCh chan<- ScanProgressEvent) error
- func (cc *ConsistencyChecker) RunMetadataScan(ctx context.Context, progressCh chan<- ScanProgressEvent) error
- func (cc *ConsistencyChecker) Start(ctx context.Context, egrp *errgroup.Group)
- func (cc *ConsistencyChecker) Stop()
- func (cc *ConsistencyChecker) VerifyBlockIntegrity(instanceHash InstanceHash) ([]uint32, error)
- func (cc *ConsistencyChecker) VerifyObject(instanceHash InstanceHash) (bool, error)
- type ConsistencyConfig
- type ConsistencyStats
- type DirDiskStat
- type DirEvictionStats
- type DiskMapping
- type DiskUsageResult
- type EncryptionManager
- func (em *EncryptionManager) DecryptDataKey(encryptedDEK []byte) ([]byte, error)
- func (em *EncryptionManager) DecryptInline(encryptedData, dek, nonce []byte) ([]byte, error)
- func (em *EncryptionManager) DeriveDBKey() ([]byte, error)
- func (em *EncryptionManager) EncryptDataKey(dek []byte) ([]byte, error)
- func (em *EncryptionManager) EncryptInline(data, dek, nonce []byte) ([]byte, error)
- func (em *EncryptionManager) GenerateDataKey() ([]byte, error)
- func (em *EncryptionManager) UpdateMasterKeyEncryption() error
- type EvictionConfig
- type EvictionDirConfig
- type EvictionManager
- func (em *EvictionManager) ChooseDiskStorage() StorageID
- func (em *EvictionManager) ForcePurge() error
- func (em *EvictionManager) ForcePurgeToBytes(targetBytes uint64) (uint64, int64, error)
- func (em *EvictionManager) GetAllNamespaceUsage() (map[StorageUsageKey]int64, error)
- func (em *EvictionManager) GetNamespaceUsage(storageID StorageID, namespaceID NamespaceID) (int64, error)
- func (em *EvictionManager) GetStats() EvictionStats
- func (em *EvictionManager) GetTotalUsage() uint64
- func (em *EvictionManager) HasSpace(needed uint64) bool
- func (em *EvictionManager) MarkPurgeFirst(instanceHash InstanceHash) error
- func (em *EvictionManager) NoteUsageIncrease(storageID StorageID, bytes int64)
- func (em *EvictionManager) RecordAccess(instanceHash InstanceHash) error
- func (em *EvictionManager) Start(ctx context.Context, egrp *errgroup.Group)
- func (em *EvictionManager) TriggerEviction()
- type EvictionStats
- type HeadResult
- type InstanceHash
- type IntrospectAPIOpen
- func (api *IntrospectAPIOpen) Close() error
- func (api *IntrospectAPIOpen) GetCacheStats() (*CacheStats, error)
- func (api *IntrospectAPIOpen) GetDiskUsage() (*DiskUsageResult, error)
- func (api *IntrospectAPIOpen) GetObjectDetails(instanceHash string) (*ObjectDetails, error)
- func (api *IntrospectAPIOpen) GetObjectDetailsByURL(objectURL, etag string) (*ObjectDetails, error)
- func (api *IntrospectAPIOpen) ListAllObjects(limit int, pattern string) ([]ObjectInstance, error)
- func (api *IntrospectAPIOpen) ListObjectInstances(objectURL string) ([]ObjectInstance, error)
- func (api *IntrospectAPIOpen) RunConsistencyCheck(ctx context.Context, metadataScan, dataScan bool) (*ConsistencyCheckResult, error)
- func (api *IntrospectAPIOpen) VerifyChecksum(instanceHash string) (*VerificationResult, error)
- func (api *IntrospectAPIOpen) VerifyChecksumByURL(objectURL, etag string) (*VerificationResult, error)
- type LocalCache
- func (lc *LocalCache) Config(egrp *errgroup.Group) (err error)
- func (sc *LocalCache) Get(ctx context.Context, path, token string) (io.ReadCloser, error)
- func (lc *LocalCache) MarkObjectPurgeFirst(objectPath string) (int, error)
- func (lc *LocalCache) ReconstructCache() error
- func (lc *LocalCache) Stat(path, token string) (uint64, error)
- type LocalCacheOption
- type MasterKeyFile
- type NamespaceID
- type ObjectBlockState
- func (obs *ObjectBlockState) Add(block uint32)
- func (obs *ObjectBlockState) AddRange(start, end uint32)
- func (obs *ObjectBlockState) ClearDownloading()
- func (obs *ObjectBlockState) Clone() *roaring.Bitmap
- func (obs *ObjectBlockState) Contains(block uint32) bool
- func (obs *ObjectBlockState) ContainsRange(start, end uint32) bool
- func (obs *ObjectBlockState) GetCardinality() uint64
- func (obs *ObjectBlockState) LockRepair()
- func (obs *ObjectBlockState) MissingInRange(start, end uint32) []uint32
- func (obs *ObjectBlockState) Remove(block uint32)
- func (obs *ObjectBlockState) RemoveMany(blocks []uint32)
- func (obs *ObjectBlockState) SetDownloading()
- func (obs *ObjectBlockState) UnlockRepair()
- func (obs *ObjectBlockState) WaitForBlock(ctx context.Context, block uint32) bool
- type ObjectDetails
- type ObjectHash
- type ObjectInstance
- type ObjectReader
- func (r *ObjectReader) Close() error
- func (r *ObjectReader) ContentType() string
- func (r *ObjectReader) ETag() string
- func (r *ObjectReader) LastModified() time.Time
- func (r *ObjectReader) Read(p []byte) (n int, err error)
- func (r *ObjectReader) ReadAt(p []byte, off int64) (n int, err error)
- func (r *ObjectReader) Seek(offset int64, whence int) (int64, error)
- func (r *ObjectReader) Size() int64
- func (r *ObjectReader) WriteTo(w io.Writer) (int64, error)
- type PersistentCache
- func (pc *PersistentCache) BackdateObject(objectPath string, age time.Duration) error
- func (pc *PersistentCache) Close() error
- func (pc *PersistentCache) Config(egrp *errgroup.Group) error
- func (pc *PersistentCache) EvictObject(objectPath, token string) error
- func (pc *PersistentCache) EvictPrefix(prefix, token string, immediate bool) (int, error)
- func (pc *PersistentCache) Get(ctx context.Context, objectPath, token string) (io.ReadCloser, error)
- func (pc *PersistentCache) GetCacheStats() (*CacheStats, error)
- func (pc *PersistentCache) GetMetadata(objectPath, token string) (*CacheMetadata, error)
- func (pc *PersistentCache) GetRange(ctx context.Context, objectPath, token, rangeHeader string) (io.ReadCloser, error)
- func (pc *PersistentCache) GetSeekableReader(ctx context.Context, objectPath, bearerToken string, rangeOnly bool) (*SeekableReader, *CacheMetadata, error)
- func (pc *PersistentCache) GetStats() PersistentCacheStats
- func (pc *PersistentCache) HeadObject(objectPath, token string) (*HeadResult, error)
- func (pc *PersistentCache) IsFullyCached(ctx context.Context, objectPath, token string) bool
- func (pc *PersistentCache) KeyChangeCallback() func(ctx context.Context) error
- func (pc *PersistentCache) LaunchListener(ctx context.Context, egrp *errgroup.Group) (err error)
- func (pc *PersistentCache) MarkPurgeFirst(objectPath string) error
- func (pc *PersistentCache) Purge() error
- func (pc *PersistentCache) PurgeToBytes(targetBytes uint64) (uint64, int64, error)
- func (pc *PersistentCache) Register(ctx context.Context, router *gin.RouterGroup)
- func (pc *PersistentCache) RegisterCacheHandlers(engine *gin.Engine, directorEnabled bool) error
- func (pc *PersistentCache) SetFedToken(tok string)
- func (pc *PersistentCache) Stat(objectPath, token string) (uint64, error)
- func (pc *PersistentCache) StatCachedOnly(objectPath, token string) (uint64, error)
- type PersistentCacheConfig
- type PersistentCacheStats
- type PrestageManager
- type RangeReader
- func (rr *RangeReader) Close() error
- func (rr *RangeReader) ContentLength() int64
- func (rr *RangeReader) ContentRange() string
- func (rr *RangeReader) Read(p []byte) (n int, err error)
- func (rr *RangeReader) ReadContext(ctx context.Context, p []byte) (n int, err error)
- func (rr *RangeReader) Seek(offset int64, whence int) (int64, error)
- type RangeRequest
- type ScanProgressEvent
- type SeekableReader
- type StorageDirConfig
- type StorageDirInfo
- type StorageDirStats
- type StorageID
- type StorageManager
- func (sm *StorageManager) AllocateChunk(ctx context.Context, instanceHash InstanceHash, meta *CacheMetadata, ...) (*CacheMetadata, error)
- func (sm *StorageManager) Close()
- func (sm *StorageManager) Delete(instanceHash InstanceHash) error
- func (sm *StorageManager) DirCount() int
- func (sm *StorageManager) DirIDs() []StorageID
- func (sm *StorageManager) EvictByLRU(storageID StorageID, namespaceID NamespaceID, maxObjects int, maxBytes int64) ([]evictedObject, uint64, error)
- func (sm *StorageManager) GetDirs() map[StorageID]string
- func (sm *StorageManager) GetMetadata(instanceHash InstanceHash) (*CacheMetadata, error)
- func (sm *StorageManager) GetMissingBlocks(instanceHash InstanceHash) ([]BlockRange, error)
- func (sm *StorageManager) GetObjectSize(instanceHash InstanceHash) (int64, error)
- func (sm *StorageManager) GetSharedBlockState(instanceHash InstanceHash) (*ObjectBlockState, error)
- func (sm *StorageManager) HasObject(instanceHash InstanceHash) (bool, error)
- func (sm *StorageManager) IdentifyCorruptBlocks(instanceHash InstanceHash, startBlock, endBlock uint32) ([]uint32, error)
- func (sm *StorageManager) InitDiskStorage(ctx context.Context, instanceHash InstanceHash, contentLength int64, ...) (*CacheMetadata, error)
- func (sm *StorageManager) InitLazyChunkedStorage(ctx context.Context, instanceHash InstanceHash, contentLength int64, ...) (*CacheMetadata, error)
- func (sm *StorageManager) InlineMaxBytes() int
- func (sm *StorageManager) InvalidateSharedBlockState(instanceHash InstanceHash)
- func (sm *StorageManager) IsComplete(instanceHash InstanceHash) (bool, error)
- func (sm *StorageManager) MergeMetadata(instanceHash InstanceHash, meta *CacheMetadata) error
- func (sm *StorageManager) NewBlockWriter(instanceHash InstanceHash, startBlock uint32, existingBitmap *roaring.Bitmap, ...) (*BlockWriter, error)
- func (sm *StorageManager) NewObjectReader(instanceHash InstanceHash) (*ObjectReader, error)
- func (sm *StorageManager) ReadBlocks(instanceHash InstanceHash, startOffset int64, length int) ([]byte, error)
- func (sm *StorageManager) ReadBlocksInto(dst []byte, instanceHash InstanceHash, startOffset int64) (int, error)
- func (sm *StorageManager) ReadInline(instanceHash InstanceHash) ([]byte, error)
- func (sm *StorageManager) SetMetadata(instanceHash InstanceHash, meta *CacheMetadata) error
- func (sm *StorageManager) StoreInline(ctx context.Context, instanceHash InstanceHash, meta *CacheMetadata, ...) error
- func (sm *StorageManager) WriteBlocks(instanceHash InstanceHash, startOffset int64, data []byte) error
- type StorageUsageKey
- type VerificationResult
Constants ¶
const ( // ChunkSize is the notification granularity - notify waiters every 128KB ChunkSize = 128 * 1024 // DefaultPrefetchTimeout is the default time after which a prefetch with no active // clients will be cancelled DefaultPrefetchTimeout = 20 * time.Second // PrefetchSemaphoreReleaseInterval is how often to release and reacquire the prefetch semaphore PrefetchSemaphoreReleaseInterval = 5 * time.Second // ETAUpdateInterval is how often to update the ETA estimate ETAUpdateInterval = 250 * time.Millisecond // DefaultInitialRate is the initial assumed download rate (1 MB/s) DefaultInitialRate = 1024 * 1024 // ETAStaleThreshold is how long an ETA can be late before waiters give up ETAStaleThreshold = 5 * time.Second )
const ( // PrefixMeta stores CacheMetadata (headers, validation info, storage mode) PrefixMeta = "m:" // PrefixState stores Roaring Bitmap tracking downloaded blocks PrefixState = "s:" // PrefixInline stores encrypted inline data for small objects (< 4KB) PrefixInline = "d:" // PrefixLRU stores sorted index for eviction candidates: l:<storage_id>:<namespace_id>:<ts>:<instance_hash> PrefixLRU = "l:" // PrefixUsage stores total bytes used per storage+namespace: u:<storage_id>:<namespace_id> PrefixUsage = "u:" // PrefixDiskMap stores the mapping of disk IDs to directories PrefixDiskMap = "di:" // PrefixPurgeFirst stores instance hashes marked for priority eviction PrefixPurgeFirst = "pf:" // PrefixETag stores the latest ETag for an object: e:<object_hash> -> etag PrefixETag = "e:" // PrefixNamespace stores namespace prefix -> ID mappings: n:<prefix> -> uint32 PrefixNamespace = "n:" // KeySalt is the single DB key that stores the random salt used when // hashing object/instance names. The salt prevents an attacker with // DB access from correlating hashes with known URLs. KeySalt = "_salt" )
Key prefixes for BadgerDB
const ( // BlockDataSize is the size of data in each encrypted block (before encryption) BlockDataSize = 4080 // AuthTagSize is the size of the AES-GCM authentication tag AuthTagSize = 16 // BlockTotalSize is the total size of an encrypted block on disk BlockTotalSize = BlockDataSize + AuthTagSize // InlineThreshold is the max size for inline storage (< 4KB stored in DB) InlineThreshold = 4096 // NonceSize is the standard size for AES-GCM nonce NonceSize = 12 // KeySize is the size for AES-256 key KeySize = 32 )
Block size constants for encryption and storage
const ( CCFlagNoStore = ccNoStore CCFlagNoCache = ccNoCache CCFlagPrivate = ccPrivate CCFlagMustRevalidate = ccMustRevalidate )
Cache-Control flag bits — canonical definitions are in cache_control.go (ccNoStore, ccNoCache, ccPrivate, ccMustRevalidate). These aliases are kept for any callers that reference the CCFlag* names directly.
const DefaultReadBatchBlocks = 16
DefaultReadBatchBlocks is the default number of blocks to read from disk in a single ReadAt syscall before decrypting block-by-block. 16 blocks × 4096 bytes = 64 KiB.
const NoCacheGracePeriod = 5 * time.Second
NoCacheGracePeriod is the minimum interval between revalidation attempts when the origin sends Cache-Control: no-cache. Strictly speaking, no-cache means "must revalidate before each use," but revalidation in Pelican requires a full GET (no conditional-request support yet), which is expensive. A short grace period prevents a thundering-herd of redundant downloads when many clients hit the same object concurrently.
const OptimalReadSize = DefaultReadBatchBlocks * BlockDataSize
OptimalReadSize is the ideal number of plaintext bytes to request per read call. It equals DefaultReadBatchBlocks × BlockDataSize (16 × 4080 = 65 280 B). When callers align their buffer sizes to a multiple of BlockDataSize (4080), every block decrypts directly into the destination buffer (zero-copy). Misaligned sizes cause the last partial block in each batch to be decrypted into a temporary buffer and copied, adding overhead.
const ( // SaltSize is the number of random bytes prepended to object/instance // names before hashing. 32 bytes (256 bits) provides a comfortable // security margin. SaltSize = 32 )
Variables ¶
var ErrInitNoStore = errors.New("object must not be stored (no-store/no-cache/private)")
ErrInitNoStore is a sentinel error returned by initObjectFromStat when the origin's Cache-Control indicates the response must not be stored. The caller should fall back to a full streaming download via downloadObject.
var ErrNoStore = errors.New("origin response has Cache-Control: no-store")
ErrNoStore is returned when the origin sends Cache-Control: no-store (or private). The response data is streamed via persistentDownload.noStoreReader (an io.Pipe) directly to the first caller without persisting to the cache.
var ErrNoStoreRetry = errors.New("no-store download in progress; retry independently")
ErrNoStoreRetry is returned to waiters that attach to an in-flight no-store download. Because the stream can only be consumed once (by the first caller), subsequent callers must retry independently.
var ErrNotCached = errors.New("object not cached")
ErrNotCached is returned when an object is not in the cache
Functions ¶
func BlockOffset ¶
BlockOffset returns the byte offset in the file for a given block number
func BlocksInChunk ¶
func BlocksInChunk(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) (startBlock, endBlock uint32)
BlocksInChunk returns the range of blocks (startBlock, endBlock inclusive) that belong to a specific chunk.
func CalculateBlockCount ¶
CalculateBlockCount returns the number of blocks needed for a given content length
func CalculateChunkCount ¶
func CalculateChunkCount(contentLength int64, chunkSizeCode ChunkSizeCode) int
CalculateChunkCount returns the number of chunks needed to store an object of the given size with the specified chunk size code. Returns 1 if chunking is disabled (the entire object is one "chunk").
func CalculateFileSize ¶
CalculateFileSize returns the exact on-disk file size for a given content length. The last block is only as large as the remaining data plus its AES-GCM authentication tag — it is not padded to a full BlockTotalSize.
func CheckCacheObjectExists ¶
CheckCacheObjectExists checks if an object exists in the PersistentCache by making a HEAD request to the cache's unix socket. Returns true if the object exists. NOTE: This will trigger a download if the object is not cached, so it always returns true unless the origin is unreachable. For testing eviction, use CheckCacheObjectIsCached instead.
func CheckCacheObjectIsCached ¶
CheckCacheObjectIsCached checks if an object is actually in the cache (not just accessible). This sends a special header to prevent downloading if the object is not cached. Returns true only if the object is already cached, false if it would need to be downloaded.
func ChunkContainsBlock ¶
func ChunkContainsBlock(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int, blockNum uint32) bool
ChunkContainsBlock returns true if the specified block belongs to the given chunk.
func ChunkContentLength ¶
func ChunkContentLength(totalContentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) int64
ChunkContentLength returns the content length for a specific chunk. This is the size of data within the chunk (not the encrypted file size).
func ChunkSizeCodeToBytes ¶
func ChunkSizeCodeToBytes(code ChunkSizeCode) uint64
ChunkSizeCodeToBytes converts a ChunkSizeCode to the actual chunk size in bytes. Returns 0 if chunking is disabled. The returned size is always rounded down to a multiple of BlockDataSize to ensure blocks don't span chunk boundaries.
func ContentOffsetToBlock ¶
ContentOffsetToBlock converts a content byte offset to a block number
func ContentOffsetToChunk ¶
func ContentOffsetToChunk(contentOffset int64, chunkSizeCode ChunkSizeCode) int
ContentOffsetToChunk returns the chunk index for a given content byte offset.
func ContentOffsetWithinBlock ¶
ContentOffsetWithinBlock returns the offset within a block for a content offset
func DefaultFreshness ¶
DefaultFreshness returns the jittered freshness lifetime used when the origin doesn't provide explicit Cache-Control headers. The jitter is deterministic per object (seeded from lastValidated) so that repeated calls for the same object return the same duration.
func DiskMappingKey ¶
DiskMappingKey returns the BadgerDB key for a disk mapping entry.
func ETagKey ¶
func ETagKey(objectHash ObjectHash) []byte
ETagKey returns the BadgerDB key for ETag lookup Maps objectHash -> latest ETag for that object
func FormatChunkSize ¶
func FormatChunkSize(code ChunkSizeCode) string
FormatChunkSize returns a human-readable string for a ChunkSizeCode.
func FormatContentRange ¶
FormatContentRange formats a Content-Range header value
func GetChunkFileSuffix ¶
GetChunkFileSuffix returns the file suffix for a chunk. Chunk 0 has no suffix (the base file), chunk 1 is "-2", chunk 2 is "-3", etc.
func GetChunkPath ¶
GetChunkPath returns the full path for a chunk file. basePath is the path for chunk 0 (no suffix).
func GetChunkRange ¶
func GetChunkRange(contentLength int64, chunkSizeCode ChunkSizeCode, chunkIndex int) (start, end int64)
GetChunkRange returns the start and end byte offsets (inclusive) for a specific chunk. chunkIndex is 0-based.
func GetInstanceStoragePath ¶
func GetInstanceStoragePath(hash InstanceHash) string
GetInstanceStoragePath returns the 2-level directory path for storing a file Given hash "42561abfe18be...", returns "42/56/1abfe18be..."
func InitIssuerKeyForTests ¶
InitIssuerKeyForTests initializes issuer keys for testing. This must be called before creating a CacheDB or EncryptionManager. It generates a new issuer key in a temporary directory and registers a cleanup.
func InlineKey ¶
func InlineKey(instanceHash InstanceHash) []byte
InlineKey returns the BadgerDB key for inline data
func IsRangeRequest ¶
IsRangeRequest checks if a request contains a Range header
func LRUKey ¶
func LRUKey(storageID StorageID, namespaceID NamespaceID, timestamp time.Time, instanceHash InstanceHash) []byte
LRUKey returns the BadgerDB key for LRU tracking Format: l:<storage_id>:<namespace_id>:<timestamp_ns>:<instance_hash>
func MetaKey ¶
func MetaKey(instanceHash InstanceHash) []byte
MetaKey returns the BadgerDB key for metadata
func NamespaceKey ¶
NamespaceKey returns the BadgerDB key for a namespace prefix mapping
func NormalizePelicanURL ¶
NormalizePelicanURL normalizes a Pelican URL for consistent hashing. Handles pelican://, osdf://, and bare paths.
func OffsetInChunk ¶
func OffsetInChunk(contentOffset int64, chunkSizeCode ChunkSizeCode) int64
OffsetInChunk converts an absolute content offset to an offset within a chunk. Returns the offset relative to the start of the chunk.
func ParseLRUKey ¶
func ParseLRUKey(key []byte) (storageID StorageID, namespaceID NamespaceID, timestamp time.Time, instanceHash InstanceHash, err error)
ParseLRUKey parses an LRU key and returns storageID, namespaceID, timestamp, and instanceHash
func ParseUsageKey ¶
func ParseUsageKey(key []byte) (storageID StorageID, namespaceID NamespaceID, err error)
ParseUsageKey extracts the storage ID and namespace ID from a usage key
func PurgeFirstKey ¶
func PurgeFirstKey(instanceHash InstanceHash) []byte
PurgeFirstKey returns the BadgerDB key for purge first tracking
func RemainingFreshness ¶
RemainingFreshness returns how much freshness lifetime is left for an object whose origin did not specify Cache-Control, clamped to zero.
func StateKey ¶
func StateKey(instanceHash InstanceHash) []byte
StateKey returns the BadgerDB key for block state bitmap
func UsageKey ¶
func UsageKey(storageID StorageID, namespaceID NamespaceID) []byte
UsageKey returns the BadgerDB key for namespace usage counter per storage Format: u:<storage_id>:<namespace_id>
func ValidateChunkLocations ¶
func ValidateChunkLocations(contentLength int64, chunkSizeCode ChunkSizeCode, locations []ChunkLocation) error
ValidateChunkLocations checks that ChunkLocations has the correct number of entries. Returns nil if valid, error otherwise.
Types ¶
type AuthorizationError ¶
type AuthorizationError struct {
Reason string
}
AuthorizationError wraps authorizationDenied with a human-readable reason. errors.Is(err, authorizationDenied) still works for existing checks.
func (*AuthorizationError) Error ¶
func (e *AuthorizationError) Error() string
func (*AuthorizationError) Is ¶
func (e *AuthorizationError) Is(target error) bool
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch allows batching multiple writes for efficiency
type BlockEncryptor ¶
type BlockEncryptor struct {
// contains filtered or unexported fields
}
BlockEncryptor handles encryption/decryption of individual blocks
func NewBlockEncryptor ¶
func NewBlockEncryptor(dek, baseNonce []byte) (*BlockEncryptor, error)
NewBlockEncryptor creates a new block encryptor with the given DEK and base nonce
func (*BlockEncryptor) DecryptBlock ¶
func (be *BlockEncryptor) DecryptBlock(blockNum uint32, encryptedBlock []byte) ([]byte, error)
DecryptBlock decrypts a block and verifies its authentication tag Input is BlockTotalSize bytes (ciphertext + auth tag) Returns the decrypted data (up to BlockDataSize bytes)
func (*BlockEncryptor) DecryptBlockTo ¶
func (be *BlockEncryptor) DecryptBlockTo(dst []byte, blockNum uint32, encryptedBlock []byte) ([]byte, error)
DecryptBlockTo decrypts a block directly into the provided destination buffer. dst must have sufficient capacity (at least len(encryptedBlock) - AuthTagSize). The decrypted data is appended to dst and the resulting slice is returned. This avoids allocation when dst has enough capacity.
func (*BlockEncryptor) EncryptBlock ¶
func (be *BlockEncryptor) EncryptBlock(blockNum uint32, data []byte) ([]byte, error)
EncryptBlock encrypts a block of data and returns data + auth tag The input data must be exactly BlockDataSize bytes (or less for the last block) The output is BlockTotalSize bytes (data + 16 byte auth tag)
func (*BlockEncryptor) EncryptBlockTo ¶
EncryptBlockTo encrypts a block of data directly into the provided destination buffer. dst must have sufficient capacity (at least len(data) + AuthTagSize). The encrypted data is appended to dst and the resulting slice is returned. This avoids allocation when dst has enough capacity.
type BlockFetcherV2 ¶
type BlockFetcherV2 struct {
// contains filtered or unexported fields
}
BlockFetcherV2 handles fetching missing blocks from the origin using the Pelican transfer client. It supports: - Per-128KB notification channels for partial completion - Client registration for data requests - Prefetch with semaphore management - Configurable timeout for prefetch cancellation when no active clients
Each BlockFetcherV2 creates its own TransferClient from the shared TransferEngine. This prevents concurrent doFetch goroutines (e.g. during auto-repair) from stealing results off a shared Results() channel.
func NewBlockFetcherV2 ¶
func NewBlockFetcherV2( storage *StorageManager, instanceHash InstanceHash, originURL, token string, fedToken client.TokenProvider, te *client.TransferEngine, cfg BlockFetcherV2Config, ) (*BlockFetcherV2, error)
NewBlockFetcherV2 creates a new block fetcher using the Pelican transfer client. It creates its own TransferClient from the given TransferEngine to avoid sharing Results() channels with other callers.
func (*BlockFetcherV2) AdoptTransfer ¶
func (bf *BlockFetcherV2) AdoptTransfer( ctx context.Context, tc *client.TransferClient, dw *decisionWriter, resultChan <-chan *client.TransferResults, egrp *errgroup.Group, wg *sync.WaitGroup, onExit func(err error), ) *fetchOperation
AdoptTransfer takes ownership of an already-in-flight full-object transfer initiated by performDownload and drives it to completion using the fetcher's existing idle-timeout and chunk-notification machinery.
Instead of creating a new BlockWriter, AdoptTransfer wraps the decisionWriter's existing BlockWriter with the fetcher's blockWriter adapter via dw.HandoffBlockWriter. All subsequent data written by the transfer engine flows through the adapter, gaining chunk notification and ETA tracking. Blocks already written before the handoff are harmlessly skipped (the shared bitmap check in writeCurrentBlock handles this).
Parameters:
- ctx: context for the transfer (cancelled on idle or cache close)
- tc: the TransferClient that owns the transfer (closed on exit)
- dw: the decisionWriter whose BlockWriter will be wrapped
- resultChan: pre-filtered channel delivering the single matching result
- egrp: errgroup for goroutine lifecycle management (test cleanup)
- wg: waitgroup to track goroutine completion (decremented on exit)
- onExit: called when the adopted transfer exits for any reason (clear downloading flag, close completionDone, etc.)
The method starts a goroutine (managed via egrp) and returns the fetchOperation for chunk notification and ETA queries.
func (*BlockFetcherV2) Close ¶
func (bf *BlockFetcherV2) Close()
Close shuts down the fetcher's dedicated TransferClient. Must be called when the fetcher is no longer needed.
func (*BlockFetcherV2) CreateFetchCallback ¶
func (bf *BlockFetcherV2) CreateFetchCallback() func(ctx context.Context, startBlock, endBlock uint32) error
CreateFetchCallback returns a callback function for the RangeReader
func (*BlockFetcherV2) FetchBlocks ¶
func (bf *BlockFetcherV2) FetchBlocks(ctx context.Context, startBlock, endBlock uint32) error
FetchBlocks fetches the specified range of blocks from the origin. Blocks until all requested blocks are available or an error occurs. Implicitly marks this fetcher as having active client interest.
func (*BlockFetcherV2) FetchBlocksAsync ¶
func (bf *BlockFetcherV2) FetchBlocksAsync(ctx context.Context, startBlock, endBlock uint32) (*fetchOperation, error)
FetchBlocksAsync fetches blocks asynchronously and returns a channel that will be closed when the fetch completes (either successfully or with error). The returned error is non-nil only if the fetch couldn't be started. Check the fetchOperation for the final error after doneCh is closed. Implicitly marks this fetcher as having active client interest.
func (*BlockFetcherV2) GetChunkChannel ¶
func (bf *BlockFetcherV2) GetChunkChannel(op *fetchOperation, chunkIndex int64) <-chan struct{}
GetChunkChannel returns a channel that will be closed when the specified chunk is complete. This is the safe notification pattern - callers wait for close, no values are sent. Returns nil if the chunk is already complete.
func (*BlockFetcherV2) StartPrefetch ¶
func (bf *BlockFetcherV2) StartPrefetch(ctx context.Context)
StartPrefetch starts prefetching the entire object in the background. The prefetch will be cancelled if there are no active clients for the prefetch timeout period.
func (*BlockFetcherV2) WaitForChunkWithETA ¶
func (bf *BlockFetcherV2) WaitForChunkWithETA(ctx context.Context, op *fetchOperation, chunkIndex int64) (completed bool, err error)
WaitForChunkWithETA waits for a chunk to complete, but gives up if the per-chunk ETA becomes stale. Returns true if the chunk completed, false if the ETA became stale (caller should try direct download).
type BlockFetcherV2Config ¶
type BlockFetcherV2Config struct {
PrefetchTimeout time.Duration
// PrefetchSem is a shared semaphore limiting the total number of
// concurrent prefetches across all fetchers and downloads. When
// nil, a per-fetcher semaphore is created with capacity 5.
PrefetchSem chan struct{}
}
BlockFetcherV2Config holds configuration for the block fetcher
type BlockRange ¶
BlockRange represents a contiguous range of blocks
type BlockSummary ¶
type BlockSummary struct {
TotalBlocks uint32 `json:"total_blocks"`
DownloadedBlocks uint32 `json:"downloaded_blocks"`
MissingBlocks []uint32 `json:"missing_blocks,omitempty"` // Up to first 100 missing blocks
IsComplete bool `json:"is_complete"`
PercentComplete float64 `json:"percent_complete"`
}
BlockSummary provides an overview of which blocks have been downloaded.
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter is an io.WriteCloser that encrypts and writes blocks directly to disk storage. It buffers incoming data and writes complete blocks as they are filled. Contiguous encrypted blocks are batched (up to writeBatchBlocks) so that multiple blocks can be written to disk in a single WriteAt call, reducing syscall overhead and improving sequential write throughput. This is used for efficient streaming downloads with block-level encryption.
func (*BlockWriter) BytesWritten ¶
func (bw *BlockWriter) BytesWritten() int64
BytesWritten returns the total number of bytes written so far
func (*BlockWriter) Close ¶
func (bw *BlockWriter) Close() error
Close flushes any remaining data and closes the file
func (*BlockWriter) Flush ¶
func (bw *BlockWriter) Flush() error
Flush writes any accumulated batch of encrypted blocks to disk and updates the shared block state so that concurrent readers can see them immediately. This is a no-op when the batch is empty.
Callers that need low-latency streaming (e.g. the blockWriter adapter in AdoptTransfer) should call Flush periodically instead of waiting for the batch to reach writeBatchBlocks.
type CacheDB ¶
type CacheDB struct {
// contains filtered or unexported fields
}
CacheDB wraps BadgerDB with cache-specific operations
func NewCacheDB ¶
NewCacheDB creates and initializes a new cache database.
Requires issuer keys to be initialized via config.GetIssuerPublicJWKS() or InitIssuerKeyForTests() before calling this function.
func OpenCacheDBReadOnly ¶
OpenCacheDBReadOnly opens an existing cache database in read-only mode. This is suitable for CLI introspection tools that need to inspect cache contents without modifying anything.
Like NewCacheDB, this requires issuer keys to be available for decrypting the database encryption key.
func (*CacheDB) AddUsage ¶
func (cdb *CacheDB) AddUsage(storageID StorageID, namespaceID NamespaceID, delta int64) error
AddUsage atomically adjusts the namespace-scoped usage counter by delta bytes. Internally this uses BadgerDB's MergeOperator so that the write is append-only (no read-modify-write cycle) and cannot conflict with concurrent transactions.
func (*CacheDB) ChargeUsage ¶
func (cdb *CacheDB) ChargeUsage(storageID StorageID, namespaceID NamespaceID, delta int64) error
ChargeUsage is an alias for AddUsage for backward compatibility.
func (*CacheDB) ClearBlocks ¶
func (cdb *CacheDB) ClearBlocks(instanceHash InstanceHash, blocks []uint32) error
ClearBlocks removes the specified blocks from the downloaded bitmap so they will be re-fetched on the next read. This is used during auto-repair when corruption is detected.
func (*CacheDB) Close ¶
Close closes the database. All usage MergeOperators are stopped first (blocking until their background goroutines exit) so that accumulated deltas are flushed before the DB is closed.
func (*CacheDB) ComputeActualUsage ¶
func (cdb *CacheDB) ComputeActualUsage() (map[StorageUsageKey]int64, error)
ComputeActualUsage performs a full scan of the metadata table to compute the real byte-level usage per (StorageID, NamespaceID).
Completed objects contribute their full ContentLength. In-progress objects contribute the bytes implied by their block bitmap.
This is an expensive read-only operation. The consistency checker accumulates usage during its metadata scan instead of calling this; it is retained for ad-hoc diagnostics and tests.
func (*CacheDB) ComputeInlineDataSize ¶
ComputeInlineDataSize scans all inline data entries (d: prefix) and sums the stored value sizes. This gives the actual bytes consumed in BadgerDB for inline object data (excluding metadata overhead).
func (*CacheDB) DeleteBlockState ¶
func (cdb *CacheDB) DeleteBlockState(instanceHash InstanceHash) error
DeleteBlockState removes block state for a file
func (*CacheDB) DeleteLatestETag ¶
func (cdb *CacheDB) DeleteLatestETag(objectHash ObjectHash) error
DeleteLatestETag removes the ETag entry for an object
func (*CacheDB) DeleteMetadata ¶
func (cdb *CacheDB) DeleteMetadata(instanceHash InstanceHash) error
DeleteMetadata removes metadata for a file
func (*CacheDB) DeleteObject ¶
func (cdb *CacheDB) DeleteObject(instanceHash InstanceHash) error
DeleteObject removes all data for a cached object. Uses metadata to compute exact LRU key for efficient deletion. Also cleans up ETag table, purge-first marker, and adjusts usage counters. Usage is decremented via AddUsage after the transaction commits so that the write cannot conflict.
func (*CacheDB) EvictByLRU ¶
func (cdb *CacheDB) EvictByLRU(storageID StorageID, namespaceID NamespaceID, maxObjects int, maxBytes int64) ([]evictedObject, error)
EvictByLRU evicts objects from a storage+namespace combination, draining purge-first items before walking the regular LRU index — all within a single BadgerDB transaction.
Eviction stops when either maxObjects have been removed or maxBytes of content has been freed — whichever comes first. A value of 0 for either limit means "no limit on that dimension". The method is allowed to go one object over the byte threshold so that progress is always made even when only large objects remain.
func (*CacheDB) FindRecyclableStorageID ¶
FindRecyclableStorageID searches persisted disk mappings for the unmounted storageID with the least usage. mountedDirs maps storageID to directory path for IDs currently assigned to live directories — those are excluded. Returns the storageID and nil on success, or an error if no recyclable ID exists.
func (*CacheDB) GetAllUsage ¶
func (cdb *CacheDB) GetAllUsage() (map[StorageUsageKey]int64, error)
GetAllUsage returns usage for all storage+namespace combinations
func (*CacheDB) GetBlockState ¶
func (cdb *CacheDB) GetBlockState(instanceHash InstanceHash) (*roaring.Bitmap, error)
GetBlockState retrieves the bitmap of downloaded blocks for a file.
If the block-state key is absent but the object's metadata indicates a completed download, a fully-populated bitmap is returned. This allows callers to treat completed objects uniformly without requiring a separate completion check. The block-state key is removed on completion to save database space (see BlockWriter.Close).
func (*CacheDB) GetDirUsage ¶
func (cdb *CacheDB) GetDirUsage(storageID StorageID) (map[NamespaceID]int64, error)
GetDirUsage returns usage for all namespaces within a single storage directory.
func (*CacheDB) GetDownloadedBlockCount ¶
func (cdb *CacheDB) GetDownloadedBlockCount(instanceHash InstanceHash) (uint64, error)
GetDownloadedBlockCount returns the number of downloaded blocks
func (*CacheDB) GetEncryptionManager ¶
func (cdb *CacheDB) GetEncryptionManager() *EncryptionManager
GetEncryptionManager returns the encryption manager
func (*CacheDB) GetInlineData ¶
func (cdb *CacheDB) GetInlineData(instanceHash InstanceHash) ([]byte, error)
GetInlineData retrieves encrypted inline data for a small file
func (*CacheDB) GetLatestETag ¶
func (cdb *CacheDB) GetLatestETag(objectHash ObjectHash) (string, bool, error)
GetLatestETag retrieves the latest ETag for an object. Returns (etag, found, err). An object cached without an ETag will return ("", true, nil); an object not in the cache returns ("", false, nil).
func (*CacheDB) GetMetadata ¶
func (cdb *CacheDB) GetMetadata(instanceHash InstanceHash) (*CacheMetadata, error)
GetMetadata retrieves cache metadata for a file
func (*CacheDB) GetUsage ¶
func (cdb *CacheDB) GetUsage(storageID StorageID, namespaceID NamespaceID) (int64, error)
GetUsage retrieves the total bytes used by a storage+namespace combination. If a MergeOperator is active for the key, its Get() method is used to replay all accumulated deltas; otherwise a raw read is performed.
func (*CacheDB) HasMetadata ¶
func (cdb *CacheDB) HasMetadata(instanceHash InstanceHash) (bool, error)
HasMetadata checks if metadata exists for a file
func (*CacheDB) InstanceHash ¶
func (cdb *CacheDB) InstanceHash(etag string, objectHash ObjectHash) InstanceHash
InstanceHash computes the salted SHA-256 hash for (etag, objectHash).
func (*CacheDB) IsBlockDownloaded ¶
func (cdb *CacheDB) IsBlockDownloaded(instanceHash InstanceHash, blockNum uint32) (bool, error)
IsBlockDownloaded checks if a specific block has been downloaded
func (*CacheDB) IsPurgeFirst ¶
func (cdb *CacheDB) IsPurgeFirst(instanceHash InstanceHash) (bool, error)
IsPurgeFirst checks if a file hash is marked for priority eviction
func (*CacheDB) LoadDiskMappings ¶
func (cdb *CacheDB) LoadDiskMappings() ([]DiskMapping, error)
LoadDiskMappings loads all persisted disk mappings.
func (*CacheDB) LoadNamespaceMappings ¶
func (cdb *CacheDB) LoadNamespaceMappings() (map[string]NamespaceID, NamespaceID, error)
LoadNamespaceMappings loads all persisted namespace mappings and returns them as a map[prefix]->id, along with the highest ID seen (so the caller can resume the counter).
func (*CacheDB) MarkBlocksDownloaded ¶
func (cdb *CacheDB) MarkBlocksDownloaded(instanceHash InstanceHash, startBlock, endBlock uint32, storageID StorageID, namespaceID NamespaceID, contentLength int64) error
MarkBlocksDownloaded marks specific blocks as downloaded and atomically updates usage statistics based on the number of newly-added blocks. Usage tracking requires metadata to be set for the instanceHash; if metadata is not yet available, the bitmap is still updated but usage tracking is skipped.
func (*CacheDB) MarkPurgeFirst ¶
func (cdb *CacheDB) MarkPurgeFirst(instanceHash InstanceHash) error
MarkPurgeFirst marks a file hash for priority eviction
func (*CacheDB) MergeBlockStateWithUsage ¶
func (cdb *CacheDB) MergeBlockStateWithUsage(instanceHash InstanceHash, newBlocks *roaring.Bitmap, storageID StorageID, namespaceID NamespaceID, contentLength int64) error
MergeBlockStateWithUsage atomically merges new blocks into the existing bitmap AND updates usage statistics based on the number of newly-enabled bits.
contentLength controls how the usage delta is calculated:
- If >= 0, the supplied contentLength, storageID, and namespaceID are used directly, avoiding a metadata DB read.
- If < 0 (typically -1), the metadata is read within the transaction to obtain the content length, storage ID, and namespace ID.
The method retries on BadgerDB transaction conflicts, which can occur when multiple concurrent block fetchers write to the same object's bitmap.
func (*CacheDB) MergeMetadata ¶
func (cdb *CacheDB) MergeMetadata(instanceHash InstanceHash, incoming *CacheMetadata) error
MergeMetadata performs an atomic read-modify-update of the metadata for instanceHash. If no metadata exists yet, incoming is written as-is (initial creation).
Field-level merge rules:
- Max-time (LastModified, LastValidated, LastAccessTime, Expires, Completed): keep the later of existing vs incoming.
- Additive (Checksums): union by ChecksumType; if both sides provide the same Type, prefer the OriginVerified entry.
- Last-writer-wins (ContentType, ContentLength, VaryHeaders, CCFlags, CCMaxAge): incoming replaces existing when the incoming value is non-zero / non-empty.
- Set-once (ETag, SourceURL, DataKey, StorageID, NamespaceID): may transition from zero-value to a value, but changing a non-zero value to a different non-zero value returns an error. ETag is set-once because it is part of the instance hash; a changed ETag produces a different instance.
The method retries on BadgerDB transaction conflicts, which can occur when multiple concurrent callers merge metadata for the same instance (e.g. concurrent range-on-miss initialization via initObjectFromStat).
func (*CacheDB) ObjectHash ¶
func (cdb *CacheDB) ObjectHash(pelicanURL string) ObjectHash
ObjectHash computes the salted SHA-256 hash for a pelican URL.
func (*CacheDB) PurgeStorageID ¶
PurgeStorageID removes all database entries associated with a storageID: object metadata, block state, inline data, LRU entries, purge-first markers, ETag entries, usage counters, and the disk mapping itself.
This is used during storage ID recycling to reclaim an ID that was previously assigned to a directory that is no longer mounted.
Objects are deleted in batches to avoid exceeding BadgerDB's transaction size limit.
func (*CacheDB) SaveDiskMapping ¶
func (cdb *CacheDB) SaveDiskMapping(dm DiskMapping) error
SaveDiskMapping persists a single storageID → (UUID, directory) mapping.
func (*CacheDB) ScanMetadata ¶
func (cdb *CacheDB) ScanMetadata(fn func(instanceHash InstanceHash, meta *CacheMetadata) error) error
ScanMetadata iterates over all metadata entries
func (*CacheDB) ScanMetadataFrom ¶
func (cdb *CacheDB) ScanMetadataFrom(startKey InstanceHash, fn func(instanceHash InstanceHash, meta *CacheMetadata) error) error
ScanMetadataFrom scans metadata starting from the given instanceHash (empty string = start from beginning)
func (*CacheDB) SetBlockState ¶
func (cdb *CacheDB) SetBlockState(instanceHash InstanceHash, bitmap *roaring.Bitmap) error
SetBlockState stores the bitmap of downloaded blocks
func (*CacheDB) SetInlineData ¶
func (cdb *CacheDB) SetInlineData(instanceHash InstanceHash, encryptedData []byte) error
SetInlineData stores encrypted inline data for a small file. The caller (StoreInline) is responsible for usage accounting via ChargeUsage; this function does NOT adjust usage counters.
func (*CacheDB) SetLatestETag ¶
SetLatestETag stores the latest ETag for an object, but only if observedAt is more recent than the already-stored timestamp. This prevents a slow download that finishes late from clobbering a newer ETag written by a more recent request.
func (*CacheDB) SetMetadata ¶
func (cdb *CacheDB) SetMetadata(instanceHash InstanceHash, meta *CacheMetadata) error
SetMetadata stores cache metadata for a file, unconditionally replacing any previously stored metadata. Use this only for initial creation of a metadata entry (e.g. InitDiskStorage, StoreInline); for subsequent updates prefer MergeMetadata which applies field-level merge semantics.
func (*CacheDB) SetNamespaceMapping ¶
func (cdb *CacheDB) SetNamespaceMapping(prefix string, id NamespaceID) error
SetNamespaceMapping persists the mapping from a namespace prefix string to a numeric ID. This ensures the IDs survive restarts so that LRU keys and usage counters remain valid.
func (*CacheDB) SetUsage ¶
func (cdb *CacheDB) SetUsage(storageID StorageID, namespaceID NamespaceID, value int64) error
SetUsage sets the absolute usage counter for a (storageID, namespaceID) pair. Any active MergeOperator for the key is stopped first (flushing pending deltas) and then the value is overwritten.
The entry is written with WithDiscard() so that BadgerDB marks all earlier versions of the key as eligible for garbage collection. Without this flag, the MergeOperator's iterateAndMerge would still see the old compacted entry (which carries bitDiscardEarlierVersions) and sum it into the total, causing the counter to include both the new baseline and the old accumulated value — a compounding overcount that grows with every reconciliation cycle.
The next AddUsage call will lazily create a fresh MergeOperator.
func (*CacheDB) UnmarkPurgeFirst ¶
func (cdb *CacheDB) UnmarkPurgeFirst(instanceHash InstanceHash) error
UnmarkPurgeFirst removes the purge first marker for a file hash
func (*CacheDB) UpdateLRU ¶
func (cdb *CacheDB) UpdateLRU(instanceHash InstanceHash, debounceTime time.Duration) error
UpdateLRU updates the LRU access time for a file Uses debouncing: only updates if last access was more than debounceTime ago This is optimized to avoid iteration by storing the last access time in metadata
type CacheDirectives ¶
type CacheDirectives struct {
// contains filtered or unexported fields
}
CacheDirectives holds parsed Cache-Control header values that are relevant to the persistent cache.
Boolean directives are packed into a bitfield; use the accessor methods (NoStore, NoCache, etc.) instead of reading fields directly.
max-age and s-maxage are merged into a single freshness lifetime: when both are present the maximum of the two is kept, because the Pelican local cache does not distinguish shared from private.
func ParseCacheControl ¶
func ParseCacheControl(header string) CacheDirectives
ParseCacheControl parses a Cache-Control header value into structured directives. The parsing is case-insensitive for directive names, as required by RFC 7234. When both max-age and s-maxage are present, the maximum of the two durations is kept (the Pelican local cache does not distinguish shared from private).
func (*CacheDirectives) Flags ¶
func (cd *CacheDirectives) Flags() uint8
Flags returns the raw packed bitfield. Callers should prefer the named accessors; this is provided for efficient serialization into CacheMetadata.CCFlags.
func (*CacheDirectives) Freshness ¶
func (cd *CacheDirectives) Freshness() (time.Duration, bool)
Freshness returns the merged freshness lifetime and whether it was set.
func (*CacheDirectives) HasDirectives ¶
func (cd *CacheDirectives) HasDirectives() bool
HasDirectives returns true if any Cache-Control directive was parsed.
func (*CacheDirectives) HasFreshness ¶
func (cd *CacheDirectives) HasFreshness() bool
HasFreshness returns true if the origin provided explicit freshness information (max-age or s-maxage).
func (*CacheDirectives) IsStale ¶
func (cd *CacheDirectives) IsStale(lastValidated time.Time) bool
IsStale checks whether a cached response is stale given when it was last validated (or originally stored). If no freshness information is available (no max-age / s-maxage / no-cache), applies the default cache policy from params with jitter.
func (*CacheDirectives) IsStaleWithDefaults ¶
func (cd *CacheDirectives) IsStaleWithDefaults(lastValidated time.Time) bool
IsStaleWithDefaults checks staleness using configured default values and jitter. This is used when the origin doesn't provide explicit Cache-Control headers.
func (*CacheDirectives) MaxAge ¶
func (cd *CacheDirectives) MaxAge() time.Duration
func (*CacheDirectives) MaxAgeSet ¶
func (cd *CacheDirectives) MaxAgeSet() bool
func (*CacheDirectives) MustRevalidate ¶
func (cd *CacheDirectives) MustRevalidate() bool
func (*CacheDirectives) NoCache ¶
func (cd *CacheDirectives) NoCache() bool
func (*CacheDirectives) NoStore ¶
func (cd *CacheDirectives) NoStore() bool
func (*CacheDirectives) Private ¶
func (cd *CacheDirectives) Private() bool
func (*CacheDirectives) ShouldStore ¶
func (cd *CacheDirectives) ShouldStore() bool
ShouldStore returns true if the response is allowed to be stored in the cache.
type CacheMetadata ¶
type CacheMetadata struct {
// Validation fields
ETag string `msgpack:"etag"` // HTTP ETag header
LastModified time.Time `msgpack:"lm"` // HTTP Last-Modified header
Expires time.Time `msgpack:"exp"` // HTTP Expires header
LastValidated time.Time `msgpack:"lv"` // When we last validated with origin
Completed time.Time `msgpack:"c"` // When download was completed
Checksums []Checksum `msgpack:"ck,omitempty"` // Object checksums
// Identification fields
ContentType string `msgpack:"ct"` // MIME type
ContentLength int64 `msgpack:"cl"` // Total object size in bytes
VaryHeaders []string `msgpack:"vh,omitempty"` // Headers that affect caching
SourceURL string `msgpack:"url,omitempty"` // Original URL including federation
// Cache-Control directives (efficient packed representation)
CCFlags uint8 `msgpack:"ccf,omitempty"` // Bitset: 0x01=no-store, 0x02=no-cache, 0x04=private, 0x08=must-revalidate
CCMaxAge int32 `msgpack:"ccma,omitempty"` // Merged max-age/s-maxage freshness lifetime (seconds, 0 = not set, max of both if both specified)
// Storage fields.
// StorageID encodes both location type and directory identity:
// 0 = inline (data stored directly in BadgerDB)
// 1 .. 255 = disk-backed (directory identified by this ID)
// For chunked objects, this is the location of chunk 0.
StorageID StorageID `msgpack:"sid"`
DataKey []byte `msgpack:"key"` // Encrypted DEK (Data Encryption Key)
// Chunking fields for large objects spread across multiple storage directories.
// ChunkSizeCode encodes the chunk size (0 = chunking disabled, see chunking.go).
// ChunkLocations stores the StorageID for chunks 1, 2, ... (chunk 0 uses StorageID above).
ChunkSizeCode ChunkSizeCode `msgpack:"csc,omitempty"` // 0 = disabled, see ChunkSizeCodeToBytes()
ChunkLocations []ChunkLocation `msgpack:"chl,omitempty"` // Locations for chunks after chunk 0
// Namespace and storage tracking for fairness-aware eviction
NamespaceID NamespaceID `msgpack:"ns"` // ID of the namespace prefix
// LRU tracking
LastAccessTime time.Time `msgpack:"la"` // Last access time for LRU index
}
CacheMetadata stores all metadata about a cached object Serialized using MessagePack for efficiency
Merge semantics (see CacheDB.MergeMetadata) ¶
Fields are classified into groups that govern how concurrent updates are reconciled:
- Max-time: LastModified, LastValidated, LastAccessTime, Expires, Completed — only advance forward (keep the later timestamp).
- Additive: Checksums — union by algorithm; prefer OriginVerified.
- Last-writer-wins: ContentType, ContentLength, VaryHeaders, CCFlags, CCMaxAge — the incoming value always replaces the old one.
- Set-once: ETag, SourceURL, DataKey, StorageID, NamespaceID, ChunkSizeCode, ChunkLocations — may transition from zero-value to set, but changing a non-zero value to a different non-zero value is an error. ETag is set-once because it is part of the instance hash; a changed ETag produces a different instance. Chunking fields are set-once because changing them would invalidate existing chunk files.
func (*CacheMetadata) AllStorageIDs ¶
func (m *CacheMetadata) AllStorageIDs() []StorageID
AllStorageIDs returns a deduplicated list of all StorageIDs used by this object. For non-chunked objects, returns just the base StorageID.
func (*CacheMetadata) ChunkCount ¶
func (m *CacheMetadata) ChunkCount() int
ChunkCount returns the number of chunks for this object. Returns 1 for non-chunked objects.
func (*CacheMetadata) ComputeExpires ¶
func (m *CacheMetadata) ComputeExpires() time.Time
ComputeExpires returns the absolute time at which this object expires. It uses the origin-supplied max-age when available, otherwise the configured default freshness policy. The result is based on LastValidated (or Completed if LastValidated is zero).
func (*CacheMetadata) EnsureExpires ¶
func (m *CacheMetadata) EnsureExpires()
EnsureExpires sets the Expires field if it is currently zero. Call this after SetCacheControl and after LastValidated is set.
func (*CacheMetadata) GetCacheControlHeader ¶
func (m *CacheMetadata) GetCacheControlHeader() string
GetCacheControlHeader reconstructs the Cache-Control header string for HTTP responses. It returns the origin's directives verbatim when present; when the origin did not specify any Cache-Control, it returns "" (callers should use ResponseCacheControl instead to get a header that reflects the default policy).
func (*CacheMetadata) GetCacheDirectives ¶
func (m *CacheMetadata) GetCacheDirectives() CacheDirectives
GetCacheDirectives returns the parsed cache directives
func (*CacheMetadata) GetChunkInfo ¶
func (m *CacheMetadata) GetChunkInfo() []ChunkInfo
GetChunkInfo returns information about all chunks for this object.
func (*CacheMetadata) GetChunkStorageID ¶
func (m *CacheMetadata) GetChunkStorageID(chunkIndex int) StorageID
GetChunkStorageID returns the StorageID for a specific chunk (0-indexed). Chunk 0 uses the base StorageID, chunks 1+ use ChunkLocations. Returns StorageIDInline (0) for unallocated chunks (lazy allocation).
func (*CacheMetadata) IsChunkAllocated ¶
func (m *CacheMetadata) IsChunkAllocated(chunkIndex int) bool
IsChunkAllocated returns true if the specified chunk has a storage ID assigned. For chunked objects, StorageIDInline (0) means the chunk is not yet allocated. For non-chunked objects, always returns true (single storage).
func (*CacheMetadata) IsChunked ¶
func (m *CacheMetadata) IsChunked() bool
IsChunked returns true when the object is stored across multiple chunk files.
func (*CacheMetadata) IsDisk ¶
func (m *CacheMetadata) IsDisk() bool
IsDisk returns true when the object data is (or will be) stored on disk. Chunked objects are always disk-based, even when chunk 0 has not yet been allocated.
func (*CacheMetadata) IsInline ¶
func (m *CacheMetadata) IsInline() bool
IsInline returns true when the object data is stored directly in BadgerDB. A chunked object is never inline, even if its base StorageID is still 0 (unallocated).
func (*CacheMetadata) PerDirectoryBytes ¶
func (m *CacheMetadata) PerDirectoryBytes() map[StorageID]int64
PerDirectoryBytes returns a map from StorageID to the number of content bytes that live in each storage directory. For non-chunked objects the entire ContentLength is attributed to the base StorageID. For chunked objects the byte count is split according to each chunk's assigned directory. Unallocated chunks (StorageID 0) are skipped.
func (*CacheMetadata) ResponseCacheControl ¶
func (m *CacheMetadata) ResponseCacheControl() string
ResponseCacheControl returns the Cache-Control header value the cache should send to downstream clients. When the origin specified directives, those are forwarded. When it did not, the cache advertises the remaining freshness lifetime (derived from LocalCache_DefaultMaxAge + jitter) as max-age so that downstream clients can cache the response without re-contacting the cache until revalidation is due.
func (*CacheMetadata) SetCacheControl ¶
func (m *CacheMetadata) SetCacheControl(header string)
SetCacheControl parses a Cache-Control header and stores the directives efficiently
func (*CacheMetadata) SetChunkStorageID ¶
func (m *CacheMetadata) SetChunkStorageID(chunkIndex int, storageID StorageID)
SetChunkStorageID sets the StorageID for a specific chunk (0-indexed). For chunk 0, sets the base StorageID. For chunks 1+, sets ChunkLocations. The ChunkLocations slice must be pre-allocated to the correct size.
type CacheMode ¶
type CacheMode int
CacheMode distinguishes whether the persistent cache is running as a full cache server (CacheType) or as a client-side local cache (LocalCacheType). The mode determines which configuration namespace is consulted for defaults (Cache.* vs LocalCache.*).
type CacheStats ¶
type CacheStats struct {
TotalInlineBytes int64 `json:"total_inline_bytes"`
TotalMetadataEntries int64 `json:"total_metadata_entries"`
TotalBytesMetadata int64 `json:"total_bytes_metadata"` // Sum of ContentLength from metadata entries
UsageCounters map[string]int64 `json:"usage_counters,omitempty"` // Pre-computed usage from u: prefix keys
StorageBreakdown map[string]*StorageDirStats `json:"storage_breakdown,omitempty"`
DirPaths map[uint8]string `json:"dir_paths,omitempty"` // StorageID → directory path
NamespaceNames map[uint32]string `json:"namespace_names,omitempty"` // NamespaceID → prefix string
}
CacheStats contains aggregate size statistics about the cache.
type Checksum ¶
type Checksum struct {
Type ChecksumType `msgpack:"t"`
Value []byte `msgpack:"v"`
OriginVerified bool `msgpack:"ov"` // True if checksum came from origin
VerifyAttempted bool `msgpack:"va,omitempty"` // True if we tried to get origin checksum
}
Checksum holds a checksum type and its value
func ParseChecksumHeader ¶
ParseChecksumHeader parses a checksum from HTTP headers
type ChecksumInfo ¶
type ChecksumInfo struct {
Type string `json:"type"`
Value string `json:"value"` // Hex-encoded
OriginVerified bool `json:"origin_verified"`
}
ChecksumInfo describes a stored checksum.
type ChecksumStatus ¶
type ChecksumStatus struct {
Type string `json:"type"`
Expected string `json:"expected"` // Hex-encoded
Computed string `json:"computed"` // Hex-encoded (only if verification ran)
Match bool `json:"match"`
}
ChecksumStatus describes the verification status of a single checksum.
type ChecksumType ¶
type ChecksumType uint8
ChecksumType identifies the type of checksum
const ( ChecksumMD5 ChecksumType = 0 ChecksumSHA1 ChecksumType = 1 ChecksumSHA256 ChecksumType = 2 ChecksumCRC32 ChecksumType = 3 ChecksumCRC32C ChecksumType = 4 )
type ChunkInfo ¶
type ChunkInfo struct {
Index int // 0-based chunk index
StorageID StorageID // Storage directory for this chunk
StartOffset int64 // Content byte offset where this chunk starts
EndOffset int64 // Content byte offset where this chunk ends (inclusive)
Size int64 // Size of this chunk in bytes
}
ChunkInfo holds computed information about a chunk.
func GetChunkInfo ¶
func GetChunkInfo(contentLength int64, chunkSizeCode ChunkSizeCode, chunkLocations []ChunkLocation, baseStorageID StorageID) []ChunkInfo
GetChunkInfo returns information about all chunks for an object. chunkLocations should be the ChunkLocations from CacheMetadata (may be nil/empty for non-chunked objects). baseStorageID is the StorageID from CacheMetadata (where chunk 0 is stored).
type ChunkInfoSummary ¶
type ChunkInfoSummary struct {
ChunkSizeBytes int64 `json:"chunk_size_bytes"`
ChunkCount int `json:"chunk_count"`
ChunkLocations []uint8 `json:"chunk_locations"` // StorageID per chunk
}
ChunkInfoSummary describes chunking configuration for large objects.
type ChunkLocation ¶
type ChunkLocation struct {
StorageID StorageID `msgpack:"s"` // Which storage directory holds this chunk
}
ChunkLocation stores the storage location of a single chunk. Chunk 0 is always stored at the StorageID in CacheMetadata. Additional chunks (1, 2, ...) are stored in the ChunkLocations slice.
type ChunkNotification ¶
type ChunkNotification struct {
ChunkIndex int64 // Which chunk this notification is for
Completed bool // True if this chunk is complete
Error error // Non-nil if an error occurred
ETA time.Time // Estimated time of completion (updated atomically)
}
ChunkNotification contains information about a chunk's completion status
type ChunkSizeCode ¶
type ChunkSizeCode uint8
ChunkSizeCode is a compact uint8 encoding of chunk sizes for storage efficiency. The encoding is non-linear to cover a wide range from 2MB to ~57GB:
- 0: Chunking disabled (object stored in a single file)
- 1-6: Doubling: 2^n MB (2, 4, 8, 16, 32, 64 MB)
- 7-21: 64 MB increments starting at 128 MB (128, 192, ..., 1024 MB)
- 22-53: 128 MB increments starting at 1152 MB (1152, 1280, ..., 5120 MB)
- 54-255: 256 MB increments starting at 5376 MB (5376, 5632, ..., ~57 GB)
const ( // ChunkingDisabled indicates the object is stored in a single file ChunkingDisabled ChunkSizeCode = 0 )
func BytesToChunkSizeCode ¶
func BytesToChunkSizeCode(size uint64) ChunkSizeCode
BytesToChunkSizeCode converts a byte size to the nearest ChunkSizeCode that is >= the requested size (rounds up to ensure chunks can hold the data). Returns ChunkingDisabled (0) if size is 0.
func ParseChunkSize ¶
func ParseChunkSize(s string) (ChunkSizeCode, error)
ParseChunkSize parses a human-readable chunk size string (e.g., "64MB", "2GB") and returns the corresponding ChunkSizeCode.
type ConsistencyCheckResult ¶
type ConsistencyCheckResult struct {
MetadataScanRan bool `json:"metadata_scan_ran"`
DataScanRan bool `json:"data_scan_ran"`
OrphanedFiles int64 `json:"orphaned_files"`
OrphanedDBEntries int64 `json:"orphaned_db_entries"`
ChecksumMismatches int64 `json:"checksum_mismatches"`
BytesVerified int64 `json:"bytes_verified"`
ObjectsVerified int64 `json:"objects_verified"`
MetadataScanErrors int64 `json:"metadata_scan_errors"`
DataScanErrors int64 `json:"data_scan_errors"`
Duration string `json:"duration"`
Error string `json:"error,omitempty"`
}
ConsistencyCheckResult contains the result of a consistency check run.
type ConsistencyChecker ¶
type ConsistencyChecker struct {
// contains filtered or unexported fields
}
ConsistencyChecker verifies cache consistency between database and disk. For multi-directory configurations, it scans each storage directory independently.
func NewConsistencyChecker ¶
func NewConsistencyChecker(db *CacheDB, storage *StorageManager, config ConsistencyConfig) *ConsistencyChecker
NewConsistencyChecker creates a new consistency checker.
func (*ConsistencyChecker) GetStats ¶
func (cc *ConsistencyChecker) GetStats() ConsistencyStats
GetStats returns current statistics
func (*ConsistencyChecker) RunDataScan ¶
func (cc *ConsistencyChecker) RunDataScan(ctx context.Context, progressCh chan<- ScanProgressEvent) error
RunDataScan performs a full data integrity scan It verifies checksums of stored objects using block-by-block reading
func (*ConsistencyChecker) RunMetadataScan ¶
func (cc *ConsistencyChecker) RunMetadataScan(ctx context.Context, progressCh chan<- ScanProgressEvent) error
RunMetadataScan performs a metadata consistency scan. It verifies that database entries match files on disk and vice versa.
func (*ConsistencyChecker) Start ¶
func (cc *ConsistencyChecker) Start(ctx context.Context, egrp *errgroup.Group)
Start begins the background consistency checking goroutines
func (*ConsistencyChecker) Stop ¶
func (cc *ConsistencyChecker) Stop()
Stop stops the consistency checker
func (*ConsistencyChecker) VerifyBlockIntegrity ¶
func (cc *ConsistencyChecker) VerifyBlockIntegrity(instanceHash InstanceHash) ([]uint32, error)
VerifyBlockIntegrity verifies the integrity of individual blocks by delegating to the storage manager's IdentifyCorruptBlocks, which handles crypto setup, file access, and AES-GCM auth-tag verification.
func (*ConsistencyChecker) VerifyObject ¶
func (cc *ConsistencyChecker) VerifyObject(instanceHash InstanceHash) (bool, error)
VerifyObject verifies a single object's integrity. If checksums are present, it verifies them all in a single pass. If no checksums are present, it still reads all data to verify readability (i.e. that decryption tags are valid and all blocks are accessible). Returns (true, nil) if the object is valid, (false, nil) if corrupt.
type ConsistencyConfig ¶
type ConsistencyConfig struct {
// MetadataScanActiveMs limits metadata scan to this many ms per second (default: 100)
MetadataScanActiveMs int64
// DataScanBytesPerSec limits data scan to this many bytes per second (default: 100MB)
DataScanBytesPerSec int64
// MinAgeForCleanup is the minimum age before an entry/file can be cleaned up (default: 5 minutes, 0 for tests)
MinAgeForCleanup time.Duration
// ChecksumTypes specifies which checksums to calculate and verify.
// When empty, defaults to []ChecksumType{ChecksumSHA256}.
ChecksumTypes []ChecksumType
}
ConsistencyConfig holds configuration for the consistency checker
type ConsistencyStats ¶
type ConsistencyStats struct {
LastMetadataScan time.Time
LastDataScan time.Time
MetadataScanErrors int64
DataScanErrors int64
OrphanedFiles int64
OrphanedDBEntries int64
ChecksumMismatches int64
BytesVerified int64
ObjectsVerified int64
}
ConsistencyStats holds statistics from consistency checks
type DirDiskStat ¶
type DirDiskStat struct {
StorageID uint8 `json:"storage_id"`
Path string `json:"path"`
BytesUsed int64 `json:"bytes_used"`
FileCount int64 `json:"file_count"`
}
DirDiskStat holds per-directory disk usage from walking the filesystem.
type DirEvictionStats ¶
DirEvictionStats contains per-directory eviction statistics
type DiskMapping ¶
type DiskMapping struct {
ID StorageID `msgpack:"id"`
UUID string `msgpack:"uuid"`
Directory string `msgpack:"dir"`
}
DiskMapping stores the mapping of a storage ID to its directory path and UUID. The UUID file is dropped in the directory root so that directories can be remounted at different paths and re-associated.
type DiskUsageResult ¶
type DiskUsageResult struct {
TotalBytesOnDisk int64 `json:"total_bytes_on_disk"`
TotalFiles int64 `json:"total_files"`
Directories map[string]*DirDiskStat `json:"directories,omitempty"`
Duration string `json:"duration"` // How long the walk took
}
DiskUsageResult contains the result of an expensive disk walk.
type EncryptionManager ¶
type EncryptionManager struct {
// contains filtered or unexported fields
}
EncryptionManager handles all encryption operations for the cache.
The masterKey is loaded (or generated) once during construction and is never mutated afterwards. Two child keys are derived from it via HKDF so that nothing ever uses the masterKey directly:
- dekWrapKey – wraps/unwraps per-object data encryption keys (DEKs)
- the DB key – returned by DeriveDBKey for BadgerDB encryption
The mu mutex protects only the on-disk serialisation in saveMasterKey (called by UpdateMasterKeyEncryption); reader methods are lock-free because the fields they touch are immutable after construction.
func NewEncryptionManager ¶
func NewEncryptionManager(baseDir string) (*EncryptionManager, error)
NewEncryptionManager creates a new encryption manager It loads or creates the master key from the specified directory.
Requires issuer keys to be initialized via config.GetIssuerPublicJWKS() or InitIssuerKeyForTests() before calling this function.
func (*EncryptionManager) DecryptDataKey ¶
func (em *EncryptionManager) DecryptDataKey(encryptedDEK []byte) ([]byte, error)
DecryptDataKey decrypts a DEK using the HKDF-derived wrapping key. The wrapping key is immutable after construction, so no lock is needed.
func (*EncryptionManager) DecryptInline ¶
func (em *EncryptionManager) DecryptInline(encryptedData, dek, nonce []byte) ([]byte, error)
DecryptInline decrypts inline data (small objects)
func (*EncryptionManager) DeriveDBKey ¶
func (em *EncryptionManager) DeriveDBKey() ([]byte, error)
DeriveDBKey derives a separate encryption key for BadgerDB using HKDF. This ensures proper key separation: the DEK-wrapping key handles per-object keys, while this derived key encrypts BadgerDB's LSM tree and WAL files (protecting metadata such as ETags, URLs, and timestamps at rest).
masterKey is immutable after construction, so no lock is needed.
func (*EncryptionManager) EncryptDataKey ¶
func (em *EncryptionManager) EncryptDataKey(dek []byte) ([]byte, error)
EncryptDataKey encrypts a DEK using the HKDF-derived wrapping key. The wrapping key is immutable after construction, so no lock is needed.
func (*EncryptionManager) EncryptInline ¶
func (em *EncryptionManager) EncryptInline(data, dek, nonce []byte) ([]byte, error)
EncryptInline encrypts data for inline storage (small objects)
func (*EncryptionManager) GenerateDataKey ¶
func (em *EncryptionManager) GenerateDataKey() ([]byte, error)
GenerateDataKey generates a new random data encryption key (DEK)
func (*EncryptionManager) UpdateMasterKeyEncryption ¶
func (em *EncryptionManager) UpdateMasterKeyEncryption() error
UpdateMasterKeyEncryption re-encrypts the master key with current issuer keys This should be called when issuer keys change
type EvictionConfig ¶
type EvictionConfig struct {
// DirConfigs maps storageID to its eviction limits.
// Each entry describes one storage directory.
DirConfigs map[StorageID]EvictionDirConfig
}
EvictionConfig holds configuration for the eviction manager
type EvictionDirConfig ¶
type EvictionDirConfig struct {
MaxSize uint64 // Maximum cache size in bytes for this directory
HighWaterPercentage int // Percentage at which eviction starts (0 = default 90)
LowWaterPercentage int // Percentage at which eviction stops (0 = default 80)
HighWaterBytes uint64 // Absolute byte threshold (overrides percentage when > 0)
LowWaterBytes uint64 // Absolute byte threshold (overrides percentage when > 0)
}
EvictionDirConfig holds per-directory eviction configuration.
type EvictionManager ¶
type EvictionManager struct {
// contains filtered or unexported fields
}
EvictionManager handles fairness-aware cache eviction. Each storage directory has independent watermarks; eviction is triggered per-directory when a directory exceeds its high-water mark and proceeds until the directory's usage falls to its low-water mark.
func NewEvictionManager ¶
func NewEvictionManager(db *CacheDB, storage *StorageManager, config EvictionConfig) *EvictionManager
NewEvictionManager creates a new eviction manager
func (*EvictionManager) ChooseDiskStorage ¶
func (em *EvictionManager) ChooseDiskStorage() StorageID
ChooseDiskStorage selects a storage directory using a pre-computed, shuffled lookup table. The table is rebuilt at most every 100 ms from the in-memory atomic usage estimates. The hot path is a single atomic increment + array index, so many goroutines can call this concurrently with negligible contention.
func (*EvictionManager) ForcePurge ¶
func (em *EvictionManager) ForcePurge() error
ForcePurge forces an immediate purge down to the low water mark. Directories are processed concurrently (see checkAndEvict).
func (*EvictionManager) ForcePurgeToBytes ¶
func (em *EvictionManager) ForcePurgeToBytes(targetBytes uint64) (uint64, int64, error)
ForcePurgeToBytes forces an immediate purge across all directories until total cache usage drops to targetBytes or below. Unlike ForcePurge, which uses the configured low-water mark, this method accepts an explicit target. Returns (bytes freed, objects evicted, error).
func (*EvictionManager) GetAllNamespaceUsage ¶
func (em *EvictionManager) GetAllNamespaceUsage() (map[StorageUsageKey]int64, error)
GetAllNamespaceUsage returns usage for all storage+namespace combinations
func (*EvictionManager) GetNamespaceUsage ¶
func (em *EvictionManager) GetNamespaceUsage(storageID StorageID, namespaceID NamespaceID) (int64, error)
GetNamespaceUsage returns usage for a specific storage+namespace combination
func (*EvictionManager) GetStats ¶
func (em *EvictionManager) GetStats() EvictionStats
GetStats returns eviction manager statistics
func (*EvictionManager) GetTotalUsage ¶
func (em *EvictionManager) GetTotalUsage() uint64
GetTotalUsage returns the current total cache usage (sum of per-dir atomics).
func (*EvictionManager) HasSpace ¶
func (em *EvictionManager) HasSpace(needed uint64) bool
HasSpace returns true if there's room for more data in at least one directory. Uses the in-memory atomic estimates (no DB query).
func (*EvictionManager) MarkPurgeFirst ¶
func (em *EvictionManager) MarkPurgeFirst(instanceHash InstanceHash) error
MarkPurgeFirst marks an object to be purged first during next eviction
func (*EvictionManager) NoteUsageIncrease ¶
func (em *EvictionManager) NoteUsageIncrease(storageID StorageID, bytes int64)
NoteUsageIncrease updates the in-memory usage estimate and triggers an eviction check if the specified directory's high-water mark appears to be exceeded. It does NOT write to the persistent database — the caller is responsible for ensuring the DB was already updated (e.g., via MergeBlockStateWithUsage which tracks usage atomically alongside the bitmap merge). The database usage updates handle race conditions where multiple writers may be updating the same blocks simultaneously. This doesn't - hence the estimated usage is going to be potentially higher than the actual usage.
The per-directory atomic counter is the fast path: if the counter is under the high-water mark no database call is made at all. When the counter indicates a possible threshold crossing we consult the database for the authoritative total, correct the counter, and only then decide whether to trigger eviction.
func (*EvictionManager) RecordAccess ¶
func (em *EvictionManager) RecordAccess(instanceHash InstanceHash) error
RecordAccess records an access to an object, updating LRU
func (*EvictionManager) Start ¶
func (em *EvictionManager) Start(ctx context.Context, egrp *errgroup.Group)
Start begins the background eviction goroutine
func (*EvictionManager) TriggerEviction ¶
func (em *EvictionManager) TriggerEviction()
TriggerEviction triggers an eviction check
type EvictionStats ¶
type EvictionStats struct {
TotalUsage uint64
DirStats map[StorageID]DirEvictionStats
NamespaceUsage map[StorageUsageKey]int64
}
EvictionStats contains eviction manager statistics
type HeadResult ¶
type HeadResult struct {
ContentLength int64
Meta *CacheMetadata // non-nil when the object is cached
}
HeadResult contains the response metadata for a HEAD request.
type InstanceHash ¶
type InstanceHash string
InstanceHash is an HMAC-SHA-256 digest that identifies a specific version (ETag) of an object. Using a dedicated type prevents accidental confusion with ObjectHash or arbitrary strings.
func ComputeInstanceHash ¶
func ComputeInstanceHash(salt []byte, etag string, objectHash ObjectHash) InstanceHash
ComputeInstanceHash computes HMAC-SHA-256(salt, etag + ":" + objectHash). This identifies a specific version of an object. If etag is empty, uses empty string (for objects without ETag support).
func ParseChunkFilename ¶
func ParseChunkFilename(filename string) (baseHash InstanceHash, chunkIndex int, ok bool)
ParseChunkFilename parses a filename (without path) that may be a chunk file. Returns the base instance hash and chunk index. For non-chunked files (no suffix), returns the filename as-is and chunkIndex 0. For chunk files like "deadbeef...-2", returns the base hash and chunkIndex 1. Returns ok=false if the filename doesn't match expected patterns.
type IntrospectAPIOpen ¶
type IntrospectAPIOpen struct {
// contains filtered or unexported fields
}
IntrospectAPIOpen provides read-only introspection access to the cache database. It opens the cache database in read-only mode, suitable for CLI tools that need to inspect cache contents without disturbing a running cache server.
func NewIntrospectAPI ¶
func NewIntrospectAPI(baseDir string) (*IntrospectAPIOpen, error)
NewIntrospectAPI creates a new introspection API by opening the cache database and storage manager in read-only mode.
func (*IntrospectAPIOpen) Close ¶
func (api *IntrospectAPIOpen) Close() error
Close releases resources held by the introspection API.
func (*IntrospectAPIOpen) GetCacheStats ¶
func (api *IntrospectAPIOpen) GetCacheStats() (*CacheStats, error)
GetCacheStats returns aggregate cache size statistics by scanning metadata. This scans all metadata entries and the inline data prefix, but does NOT walk the disk (use GetDiskUsage for that).
func (*IntrospectAPIOpen) GetDiskUsage ¶
func (api *IntrospectAPIOpen) GetDiskUsage() (*DiskUsageResult, error)
GetDiskUsage walks the storage directories to compute actual disk usage. This is an expensive operation that reads every file's size on disk.
func (*IntrospectAPIOpen) GetObjectDetails ¶
func (api *IntrospectAPIOpen) GetObjectDetails(instanceHash string) (*ObjectDetails, error)
GetObjectDetails returns detailed metadata for a specific object instance. The instanceHash can be obtained from ListObjectInstances. Alternatively, pass objectURL and etag to look up by those identifiers.
func (*IntrospectAPIOpen) GetObjectDetailsByURL ¶
func (api *IntrospectAPIOpen) GetObjectDetailsByURL(objectURL, etag string) (*ObjectDetails, error)
GetObjectDetailsByURL looks up object details by URL and optional ETag. If etag is empty, returns details for the latest version.
func (*IntrospectAPIOpen) ListAllObjects ¶
func (api *IntrospectAPIOpen) ListAllObjects(limit int, pattern string) ([]ObjectInstance, error)
ListAllObjects returns a summary of all cached objects. This can be slow for large caches; consider using pagination in production.
func (*IntrospectAPIOpen) ListObjectInstances ¶
func (api *IntrospectAPIOpen) ListObjectInstances(objectURL string) ([]ObjectInstance, error)
ListObjectInstances returns all cached instances for a given object URL. The URL should be a pelican:// URL (e.g., pelican://host/path/file.dat). If the URL doesn't have a scheme, it will be normalized with the default federation.
func (*IntrospectAPIOpen) RunConsistencyCheck ¶
func (api *IntrospectAPIOpen) RunConsistencyCheck(ctx context.Context, metadataScan, dataScan bool) (*ConsistencyCheckResult, error)
RunConsistencyCheck runs a full consistency check (metadata scan + data scan) and returns statistics. This creates a temporary ConsistencyChecker with unlimited rate limiting for the ad-hoc run.
func (*IntrospectAPIOpen) VerifyChecksum ¶
func (api *IntrospectAPIOpen) VerifyChecksum(instanceHash string) (*VerificationResult, error)
VerifyChecksum triggers a checksum verification for the specified instance. Returns detailed verification results including per-checksum status.
func (*IntrospectAPIOpen) VerifyChecksumByURL ¶
func (api *IntrospectAPIOpen) VerifyChecksumByURL(objectURL, etag string) (*VerificationResult, error)
VerifyChecksumByURL verifies checksum for an object by URL and optional ETag.
type LocalCache ¶
type LocalCache struct {
// contains filtered or unexported fields
}
func NewLocalCache ¶
func NewLocalCache(ctx context.Context, egrp *errgroup.Group, options ...LocalCacheOption) (lc *LocalCache, err error)
Create a local cache object
Launches background goroutines associated with the cache
func (*LocalCache) Config ¶
func (lc *LocalCache) Config(egrp *errgroup.Group) (err error)
Try to configure the local cache and launch the reconfigure goroutine
func (*LocalCache) Get ¶
func (sc *LocalCache) Get(ctx context.Context, path, token string) (io.ReadCloser, error)
Get path from the cache
func (*LocalCache) MarkObjectPurgeFirst ¶
func (lc *LocalCache) MarkObjectPurgeFirst(objectPath string) (int, error)
MarkObjectPurgeFirst marks the given object path as PURGEFIRST by creating the corresponding sentinel file on disk and updating the in-memory data structures accordingly.
func (*LocalCache) ReconstructCache ¶
func (lc *LocalCache) ReconstructCache() error
ReconstructCache rebuilds the in-memory data structures of the LocalCache based on the files present in the directory specified by "LocalCache.DataLocation".
type LocalCacheOption ¶
func WithDeferConfig ¶
func WithDeferConfig(deferConfig bool) LocalCacheOption
Create an option to defer the configuration of the local cache
Useful in cases where the cache should be created before the web interface is up -- but the web interface is needed to complete configuration.
type MasterKeyFile ¶
type MasterKeyFile struct {
// Keys maps public key fingerprint to encrypted master key
Keys map[string][]byte `json:"keys"`
}
MasterKeyFile represents the encrypted master key file format The master key is encrypted with each issuer private key
type NamespaceID ¶
type NamespaceID uint32
NamespaceID identifies a namespace prefix. Each distinct prefix registered in the cache is assigned a monotonically increasing ID for efficient storage and lookup. Using a dedicated type prevents accidental confusion with StorageID or arbitrary uint32 values.
type ObjectBlockState ¶
type ObjectBlockState struct {
// contains filtered or unexported fields
}
ObjectBlockState holds the shared, thread-safe block availability state for a single cached object. All RangeReaders serving the same instanceHash share one instance, ensuring that block additions (downloads) and removals (corruption repairs) are immediately visible to every goroutine.
The repairMu mutex serializes repair operations: when multiple readers detect corruption simultaneously, only the first one performs the clear-fetch-verify cycle. Subsequent callers acquire repairMu, re-check block availability, and skip the repair if it has already been done.
func NewObjectBlockState ¶
func NewObjectBlockState(bitmap *roaring.Bitmap) *ObjectBlockState
NewObjectBlockState wraps an existing bitmap in a thread-safe container.
func (*ObjectBlockState) Add ¶
func (obs *ObjectBlockState) Add(block uint32)
Add marks a single block as downloaded.
func (*ObjectBlockState) AddRange ¶
func (obs *ObjectBlockState) AddRange(start, end uint32)
AddRange marks all blocks in [start, end] as downloaded.
func (*ObjectBlockState) ClearDownloading ¶
func (obs *ObjectBlockState) ClearDownloading()
ClearDownloading marks the background download as finished and wakes any goroutines waiting in WaitForBlock.
func (*ObjectBlockState) Clone ¶
func (obs *ObjectBlockState) Clone() *roaring.Bitmap
Clone returns a point-in-time snapshot of the bitmap. The returned bitmap is independent (mutations to it do not affect the shared state).
func (*ObjectBlockState) Contains ¶
func (obs *ObjectBlockState) Contains(block uint32) bool
Contains returns true if the given block is marked as downloaded.
func (*ObjectBlockState) ContainsRange ¶
func (obs *ObjectBlockState) ContainsRange(start, end uint32) bool
ContainsRange returns true if every block in [start, end] is present. Creates a temporary range bitmap and checks if the intersection has the expected cardinality. This is O(log n) for typical bitmap layouts.
func (*ObjectBlockState) GetCardinality ¶
func (obs *ObjectBlockState) GetCardinality() uint64
GetCardinality returns the number of blocks that are downloaded.
func (*ObjectBlockState) LockRepair ¶
func (obs *ObjectBlockState) LockRepair()
LockRepair acquires the per-object repair mutex. Only one goroutine at a time may run the clear-fetch-verify repair cycle.
func (*ObjectBlockState) MissingInRange ¶
func (obs *ObjectBlockState) MissingInRange(start, end uint32) []uint32
MissingInRange returns the list of blocks in [start, end] that are not present. Uses RoaringBitmap's iterator for efficiency instead of checking each block.
func (*ObjectBlockState) Remove ¶
func (obs *ObjectBlockState) Remove(block uint32)
Remove marks a single block as not-downloaded.
func (*ObjectBlockState) RemoveMany ¶
func (obs *ObjectBlockState) RemoveMany(blocks []uint32)
RemoveMany marks the given blocks as not-downloaded.
func (*ObjectBlockState) SetDownloading ¶
func (obs *ObjectBlockState) SetDownloading()
SetDownloading marks this object as having a background download in progress. WaitForBlock will block while downloading is true.
func (*ObjectBlockState) UnlockRepair ¶
func (obs *ObjectBlockState) UnlockRepair()
UnlockRepair releases the per-object repair mutex.
func (*ObjectBlockState) WaitForBlock ¶
func (obs *ObjectBlockState) WaitForBlock(ctx context.Context, block uint32) bool
WaitForBlock waits until the specified block is available in the bitmap. It returns true if the block is available, false if the context was cancelled or the background download finished without producing the block. This avoids starting duplicate range downloads when a full download is already in progress.
The implementation spawns a goroutine to wait on the sync.Cond (which cannot be interrupted) and selects between it and ctx.Done(). On context cancellation, we signal the goroutine via a done channel and broadcast the cond, then wait for the goroutine to acknowledge exit before returning. This guarantees no goroutine leak.
type ObjectDetails ¶
type ObjectDetails struct {
ObjectInstance
// Additional details
NamespaceID uint16 `json:"namespace_id"`
StorageID uint8 `json:"storage_id"`
LastValidated time.Time `json:"last_validated,omitempty"`
CacheControl string `json:"cache_control,omitempty"`
Checksums []ChecksumInfo `json:"checksums,omitempty"`
BlockSummary *BlockSummary `json:"block_summary,omitempty"` // nil for inline storage
ChunkSummary *ChunkInfoSummary `json:"chunk_info,omitempty"` // nil for non-chunked
}
ObjectDetails provides detailed metadata for a cached object instance.
type ObjectHash ¶
type ObjectHash string
ObjectHash is an HMAC-SHA-256 digest that identifies a logical object (URL) regardless of version or ETag. Using a dedicated type prevents accidental confusion with InstanceHash or arbitrary strings.
func ComputeObjectHash ¶
func ComputeObjectHash(salt []byte, pelicanURL string) ObjectHash
ComputeObjectHash computes HMAC-SHA-256(salt, normalized URL). This identifies the logical object (URL) regardless of version/ETag. The salt is generated once per cache database and prevents offline correlation of hashes with known URLs.
type ObjectInstance ¶
type ObjectInstance struct {
InstanceHash string `json:"instance_hash"`
ETag string `json:"etag"`
SourceURL string `json:"source_url"`
ContentLength int64 `json:"content_length"`
ContentType string `json:"content_type"`
LastModified time.Time `json:"last_modified,omitempty"`
Completed time.Time `json:"completed,omitempty"`
LastAccessed time.Time `json:"last_accessed,omitempty"`
Expires time.Time `json:"expires,omitempty"`
IsLatest bool `json:"is_latest"` // True if this is the latest ETag
IsInline bool `json:"is_inline"` // True if stored inline in database
}
ObjectInstance represents a cached instance of an object with a specific ETag.
type ObjectReader ¶
type ObjectReader struct {
// contains filtered or unexported fields
}
ObjectReader provides a reader interface for cached objects. For disk-backed objects, it caches the BlockEncryptor and ObjectBlockState at construction time so that Read/ReadAt calls do not need to look them up through the TTL caches on every call.
func (*ObjectReader) Close ¶
func (r *ObjectReader) Close() error
Close releases the reader's reference to the underlying file handle. The actual FD is closed only when all references (including the TTL cache entry) have been released.
func (*ObjectReader) ContentType ¶
func (r *ObjectReader) ContentType() string
ContentType returns the content type of the object
func (*ObjectReader) LastModified ¶
func (r *ObjectReader) LastModified() time.Time
LastModified returns the last modified time
func (*ObjectReader) Read ¶
func (r *ObjectReader) Read(p []byte) (n int, err error)
Read implements io.Reader
func (*ObjectReader) ReadAt ¶
func (r *ObjectReader) ReadAt(p []byte, off int64) (n int, err error)
ReadAt implements io.ReaderAt
func (*ObjectReader) Seek ¶
func (r *ObjectReader) Seek(offset int64, whence int) (int64, error)
Seek implements io.Seeker
func (*ObjectReader) Size ¶
func (r *ObjectReader) Size() int64
Size returns the total size of the object
func (*ObjectReader) WriteTo ¶
func (r *ObjectReader) WriteTo(w io.Writer) (int64, error)
WriteTo implements io.WriterTo. When io.Copy detects this interface it calls WriteTo directly instead of issuing many small Read calls with a 32 KB default buffer. This lets the ObjectReader control the batch size, reading and decrypting many blocks at once and writing the plaintext to w in large chunks.
type PersistentCache ¶
type PersistentCache struct {
// contains filtered or unexported fields
}
PersistentCache is the new persistent local cache implementation It uses BadgerDB for metadata and block tracking, and encrypted files on disk
func NewPersistentCache ¶
func NewPersistentCache(ctx context.Context, egrp *errgroup.Group, cfg PersistentCacheConfig) (*PersistentCache, error)
NewPersistentCache creates a new persistent cache instance
func (*PersistentCache) BackdateObject ¶
func (pc *PersistentCache) BackdateObject(objectPath string, age time.Duration) error
BackdateObject shifts the Completed timestamp of a cached object backward by the given duration. This is useful for testing the Age response header without waiting for wall-clock time to elapse.
func (*PersistentCache) Close ¶
func (pc *PersistentCache) Close() error
Close shuts down the persistent cache.
Shutdown order:
- Cancel all in-flight downloads (via downloadCancel).
- Wait for completeDownload goroutines to finish — each one closes its own TransferClient, so all clients are gone before step 3.
- Shut down the transfer engine (no live clients remain).
- Stop the consistency checker.
- Close the database.
func (*PersistentCache) Config ¶
func (pc *PersistentCache) Config(egrp *errgroup.Group) error
Config configures the cache and starts periodic updates
func (*PersistentCache) EvictObject ¶
func (pc *PersistentCache) EvictObject(objectPath, token string) error
EvictObject is the programmatic API for evicting an object by path. Returns nil on success, an error if the object is in use or the eviction fails.
func (*PersistentCache) EvictPrefix ¶
func (pc *PersistentCache) EvictPrefix(prefix, token string, immediate bool) (int, error)
EvictPrefix evicts all cached objects whose source URL starts with the given path prefix (or matches it exactly). It iterates every metadata entry, selects those with a matching SourceURL, and either deletes them immediately or marks them for priority eviction (purge-first) depending on the immediate flag. Objects that are currently being downloaded are skipped. The returned count reflects objects that were acted on.
func (*PersistentCache) Get ¶
func (pc *PersistentCache) Get(ctx context.Context, objectPath, token string) (io.ReadCloser, error)
Get retrieves an object from the cache, downloading if necessary
func (*PersistentCache) GetCacheStats ¶
func (pc *PersistentCache) GetCacheStats() (*CacheStats, error)
introspectStatsHandler returns aggregate cache size statistics. GetCacheStats returns aggregate cache size statistics by scanning metadata and reading usage counters. This is the same data returned by the introspect/stats HTTP endpoint.
func (*PersistentCache) GetMetadata ¶
func (pc *PersistentCache) GetMetadata(objectPath, token string) (*CacheMetadata, error)
GetMetadata returns the cache metadata for an object if it exists. Returns nil, nil if the object is not cached.
func (*PersistentCache) GetRange ¶
func (pc *PersistentCache) GetRange(ctx context.Context, objectPath, token, rangeHeader string) (io.ReadCloser, error)
GetRange retrieves a range of an object from the cache
func (*PersistentCache) GetSeekableReader ¶
func (pc *PersistentCache) GetSeekableReader(ctx context.Context, objectPath, bearerToken string, rangeOnly bool) (*SeekableReader, *CacheMetadata, error)
GetSeekableReader returns a seekable reader for the full object with on-demand block fetching. This is designed for use with http.ServeContent which handles Range requests internally.
When rangeOnly is true and the object is not yet cached, GetSeekableReader uses a lightweight HEAD request to initialise on-disk storage instead of starting a full sequential download. This allows BlockFetcherV2 to fetch only the blocks the caller actually reads, avoiding a potentially expensive full transfer. Callers should set rangeOnly when they know the request is for a sub-range of the object (e.g. an HTTP Range request).
func (*PersistentCache) GetStats ¶
func (pc *PersistentCache) GetStats() PersistentCacheStats
GetStats returns cache statistics
func (*PersistentCache) HeadObject ¶
func (pc *PersistentCache) HeadObject(objectPath, token string) (*HeadResult, error)
HeadObject returns metadata for an object without triggering a download. If the object is cached, the full CacheMetadata is returned. If not cached, the origin is queried via HEAD (DoStat) for the size.
func (*PersistentCache) IsFullyCached ¶
func (pc *PersistentCache) IsFullyCached(ctx context.Context, objectPath, token string) bool
IsFullyCached returns true if the object at the given path is fully downloaded and present in the cache (metadata exists, ContentLength >= 0, and the download has been marked complete).
When the cached entry carries Cache-Control directives and is stale, a revalidation request is made to the origin. If revalidation confirms the same ETag (or fails gracefully, serving stale), the object is still considered fully cached. If the origin provides a new version or responds with no-store, the method returns false so the caller falls through to a full download.
func (*PersistentCache) KeyChangeCallback ¶
func (pc *PersistentCache) KeyChangeCallback() func(ctx context.Context) error
KeyChangeCallback returns a callback function that re-encrypts the master key when issuer keys change. This is used with LaunchIssuerKeysDirRefresh to ensure the cache data remains accessible as long as any issuer key is available. The callback updates the masterkey.json file with the master key encrypted under all current issuer keys.
func (*PersistentCache) LaunchListener ¶
LaunchListener launches the unix socket listener for the persistent cache
func (*PersistentCache) MarkPurgeFirst ¶
func (pc *PersistentCache) MarkPurgeFirst(objectPath string) error
MarkPurgeFirst marks an object to be purged first during next eviction
func (*PersistentCache) Purge ¶
func (pc *PersistentCache) Purge() error
Purge triggers the eviction manager to purge old entries
func (*PersistentCache) PurgeToBytes ¶
func (pc *PersistentCache) PurgeToBytes(targetBytes uint64) (uint64, int64, error)
PurgeToBytes triggers the eviction manager to purge down to the specified total usage target. Returns (bytes freed, objects evicted, error).
func (*PersistentCache) Register ¶
func (pc *PersistentCache) Register(ctx context.Context, router *gin.RouterGroup)
Register registers the control & monitoring routines with Gin
func (*PersistentCache) RegisterCacheHandlers ¶
func (pc *PersistentCache) RegisterCacheHandlers(engine *gin.Engine, directorEnabled bool) error
RegisterCacheHandlers registers HTTP handlers for the persistent cache on the Gin engine. This is used for the XRootD-free cache implementation where the cache serves content directly via the web server instead of using XRootD.
Unlike LaunchListener (which creates a Unix socket), this registers handlers on the existing Gin web server, allowing it to serve cache requests on the standard HTTP/HTTPS ports.
The /api/v1.0/cache/data/:discovery/*path routes are always registered regardless of whether a director is in use. When running standalone (directorEnabled=false), a NoRoute fallback is also registered so that bare object paths (without the /api prefix) are served. The /api namespace is always reserved and never served by the NoRoute fallback.
func (*PersistentCache) SetFedToken ¶
func (pc *PersistentCache) SetFedToken(tok string)
SetFedToken stores the federation token in memory. It is called by cache.LaunchFedTokManager (via the onTokenUpdate callback) whenever the token is created or refreshed, eliminating the need to read the token back from disk on every origin request.
func (*PersistentCache) Stat ¶
func (pc *PersistentCache) Stat(objectPath, token string) (uint64, error)
Stat returns the size of an object, querying the origin if not cached
func (*PersistentCache) StatCachedOnly ¶
func (pc *PersistentCache) StatCachedOnly(objectPath, token string) (uint64, error)
StatCachedOnly returns the size of an object only if it's cached. Returns 0, ErrNotCached if the object is not in the cache.
type PersistentCacheConfig ¶
type PersistentCacheConfig struct {
// Mode selects which configuration namespace to use for defaults.
// CacheModeServer reads Cache.StorageLocation, Cache.HighWaterMark, etc.
// CacheModeLocal (the zero value) reads LocalCache.DataLocation,
// LocalCache.HighWaterMarkPercentage, etc.
Mode CacheMode
// BaseDir is the root directory for the cache. The BadgerDB database
// lives directly under BaseDir. If StorageDirs is empty, a single
// storage directory is created under BaseDir/objects.
BaseDir string
// StorageDirs configures one or more storage directories.
// Each entry describes a directory path and its size limits.
// When empty, a single directory under BaseDir is used with
// MaxSize / HighWaterMarkPercentage / LowWaterMarkPercentage
// as its limits (for backward compatibility).
StorageDirs []StorageDirConfig
// Legacy single-directory fields — used only when StorageDirs is empty.
MaxSize uint64
HighWaterMarkPercentage int
LowWaterMarkPercentage int
// InlineStorageMaxBytes sets the maximum size for objects stored
// inline in BadgerDB. Objects at or below this threshold are stored
// inline; larger objects go to disk. 0 means use the default (4096).
InlineStorageMaxBytes int
DefaultFederation string
// DeferConfig delays the initial director namespace fetch until
// Config() is called explicitly. The server launcher sets this to
// true because the director may not be reachable when the cache is
// first constructed (e.g. cache starts before director discovery).
DeferConfig bool
}
PersistentCacheConfig holds configuration for the persistent cache
type PersistentCacheStats ¶
type PersistentCacheStats struct {
TotalUsage uint64
DirStats map[StorageID]DirEvictionStats
NamespaceUsage map[string]int64
ConsistencyStats ConsistencyStats
}
PersistentCacheStats holds cache statistics
type PrestageManager ¶
type PrestageManager struct {
// contains filtered or unexported fields
}
PrestageManager manages per-identity worker pools for prestage operations.
func NewPrestageManager ¶
func NewPrestageManager(pc *PersistentCache) *PrestageManager
NewPrestageManager creates a prestage manager for the given cache.
func (*PrestageManager) Submit ¶
func (pm *PrestageManager) Submit(ident string, req *prestageRequest) bool
Submit queues a prestage request for the given identity. Returns false if the queue is full (caller should return 429).
type RangeReader ¶
type RangeReader struct {
// contains filtered or unexported fields
}
RangeReader implements io.ReadSeeker over a byte range of a cached object. It is the primary read-path abstraction that ties the block-level storage layer to HTTP response serving.
Callers obtain a RangeReader via PersistentCache.GetRange (for explicit byte-range requests) or via PersistentCache.GetSeekableReader (wrapped in a SeekableReader so that http.ServeContent can handle Range negotiation).
For normal (cached / cacheable) responses the RangeReader reads encrypted blocks from the StorageManager, fetching missing blocks on demand via the fetchBlocks callback wired to a BlockFetcherV2 instance. It also performs a single round of transparent auto-repair when AES-GCM authentication detects on-disk corruption.
For responses marked Cache-Control: no-store, the origin's body is piped directly through the noStoreReader field; the block-storage path is bypassed entirely.
func NewRangeReader ¶
func NewRangeReader( storage *StorageManager, instanceHash InstanceHash, start, end int64, fetchBlocks func(ctx context.Context, startBlock, endBlock uint32) error, ) (*RangeReader, error)
NewRangeReader creates a reader for a range request fetchBlocks is called when blocks need to be fetched from origin
func (*RangeReader) ContentLength ¶
func (rr *RangeReader) ContentLength() int64
ContentLength returns the length of the range
func (*RangeReader) ContentRange ¶
func (rr *RangeReader) ContentRange() string
ContentRange returns the Content-Range header value
func (*RangeReader) Read ¶
func (rr *RangeReader) Read(p []byte) (n int, err error)
Read implements io.Reader with on-demand block fetching
func (*RangeReader) ReadContext ¶
ReadContext reads with context for cancellation
type RangeRequest ¶
RangeRequest represents a parsed HTTP Range header
func ParseRangeHeader ¶
func ParseRangeHeader(rangeHeader string, contentLength int64) ([]RangeRequest, error)
ParseRangeHeader parses an HTTP Range header Supports: "bytes=start-end", "bytes=start-", "bytes=-suffix"
type ScanProgressEvent ¶
type ScanProgressEvent struct {
// Phase is the scan phase: "metadata" or "data".
Phase string `json:"phase"`
// PercentComplete is the estimated completion (0–100).
PercentComplete float64 `json:"percent_complete"`
// DBEntriesScanned is the number of DB entries processed so far (metadata only).
DBEntriesScanned int64 `json:"db_entries_scanned,omitempty"`
// FilesScanned is the number of filesystem entries processed (metadata only).
FilesScanned int64 `json:"files_scanned,omitempty"`
// ObjectsVerified is the number of objects checksummed so far (data only).
ObjectsVerified int64 `json:"objects_verified,omitempty"`
// BytesVerified is the total bytes read for checksumming (data only).
BytesVerified int64 `json:"bytes_verified,omitempty"`
// Message is a human-readable status line.
Message string `json:"message,omitempty"`
}
ScanProgressEvent is sent as an SSE event to report scan progress. The Event field (used as SSE event type) is one of:
- "metadata_progress": periodic update during the metadata scan
- "metadata_done": the metadata scan has finished
- "data_progress": periodic update during the data scan
- "data_done": the data scan has finished
- "error": an error occurred
- "done": all scans complete; the Data field is a ConsistencyCheckResult
type SeekableReader ¶
type SeekableReader struct {
*RangeReader
}
SeekableReader is a reader that supports seeking and on-demand block fetching. It implements io.ReadSeekCloser for use with http.ServeContent.
For no-store streaming responses, the reader is NOT seekable (IsNoStore returns true). In that case the handler must use io.Copy instead of http.ServeContent.
func (*SeekableReader) IsNoStore ¶
func (sr *SeekableReader) IsNoStore() bool
IsNoStore returns true when this reader wraps a streaming no-store response. The caller must NOT use http.ServeContent (which requires seeking); instead it should stream the response with io.Copy and set headers manually.
type StorageDirConfig ¶
type StorageDirConfig struct {
// Path is the directory that will hold an "objects/" subdirectory
// and, for the first directory, the database.
Path string
// MaxSize is the maximum number of bytes stored on this directory.
// If 0, auto-detected from the filesystem at startup.
MaxSize uint64
// HighWaterMarkPercentage overrides the global high-water mark for this
// directory. 0 means use the global default.
HighWaterMarkPercentage int
// LowWaterMarkPercentage overrides the global low-water mark for this
// directory. 0 means use the global default.
LowWaterMarkPercentage int
}
StorageDirConfig describes one disk-backed storage directory. Multiple directories can be configured to spread data across devices; each directory has its own maximum size and optional watermark overrides.
func ParseStorageDirsConfig ¶
func ParseStorageDirsConfig() ([]StorageDirConfig, error)
ParseStorageDirsConfig reads the LocalCache.StorageDirs setting from Viper and returns parsed StorageDirConfig values. It accepts two formats for backward compatibility:
A list of strings (paths only): LocalCache: StorageDirs: - /mnt/cache1 - /mnt/cache2
A list of objects with per-directory configuration: LocalCache: StorageDirs: - Path: /mnt/cache1 MaxSize: 500GB HighWaterMarkPercentage: 95 LowWaterMarkPercentage: 85 - Path: /mnt/cache2 MaxSize: 2TB
Returns nil (not an error) when the key is unset or empty.
type StorageDirInfo ¶
StorageDirInfo describes a configured storage directory at runtime.
type StorageDirStats ¶
type StorageDirStats struct {
StorageID uint8 `json:"storage_id"`
ObjectCount int64 `json:"object_count"`
TotalBytes int64 `json:"total_bytes"` // Sum of ContentLength for objects in this dir
InlineCount int64 `json:"inline_count"` // Number of inline objects
InlineBytes int64 `json:"inline_bytes"` // Sum of ContentLength for inline objects
OnDiskCount int64 `json:"on_disk_count"` // Number of on-disk objects
OnDiskBytes int64 `json:"on_disk_bytes"` // Actual bytes on disk (content + per-block MAC overhead)
}
StorageDirStats holds per-storage-directory statistics.
type StorageID ¶
type StorageID uint8
StorageID identifies a storage location. 0 means inline (data in BadgerDB), 1–255 mean disk-backed storage in the directory mapped to that ID. Using a dedicated type prevents accidental confusion with NamespaceID or arbitrary uint8 values.
type StorageManager ¶
type StorageManager struct {
// contains filtered or unexported fields
}
func NewStorageManager ¶
func NewStorageManager(db *CacheDB, dirs []string, inlineMax int, egrp *errgroup.Group) (*StorageManager, error)
NewStorageManager creates a new storage manager with UUID-based directory identity. It performs the following steps:
- Load existing storageID → (UUID, path) mappings from the database.
- Scan the supplied directory paths for UUID files.
- Match discovered UUIDs against known mappings, updating paths as needed (so that sysadmins can remount directories at different locations).
- Assign new storage IDs to directories that have no UUID yet and drop a new UUID file in each.
- Persist updated mappings to the database.
dirs is the ordered set of configured directory paths. inlineMax sets the maximum inline object size (0 = use default InlineThreshold).
func NewStorageManagerReadOnly ¶
func NewStorageManagerReadOnly(baseDir string, db *CacheDB) (*StorageManager, error)
NewStorageManagerReadOnly creates a storage manager for read-only introspection. This is a lightweight variant suitable for CLI tools that only need to read metadata and block states, not perform downloads or writes.
func (*StorageManager) AllocateChunk ¶
func (sm *StorageManager) AllocateChunk( ctx context.Context, instanceHash InstanceHash, meta *CacheMetadata, chunkIndex int, ) (*CacheMetadata, error)
AllocateChunk allocates a storage directory for a chunk and creates the chunk file. This must be called before writing to a chunk that has StorageID = 0 (unallocated).
The storage directory is chosen via the pluggable chooseDir function, which defaults to simple round-robin but is replaced in production with EvictionManager.ChooseDiskStorage (weighted by free space).
Returns the updated CacheMetadata (which should be used for subsequent operations).
func (*StorageManager) Close ¶
func (sm *StorageManager) Close()
Close stops TTL cache eviction goroutines and releases cached resources.
func (*StorageManager) Delete ¶
func (sm *StorageManager) Delete(instanceHash InstanceHash) error
Delete removes an object from storage
func (*StorageManager) DirCount ¶
func (sm *StorageManager) DirCount() int
DirCount returns the number of configured storage directories. This is used to determine if chunking should be enabled.
func (*StorageManager) DirIDs ¶
func (sm *StorageManager) DirIDs() []StorageID
DirIDs returns the list of configured storage directory IDs in sorted order.
func (*StorageManager) EvictByLRU ¶
func (sm *StorageManager) EvictByLRU(storageID StorageID, namespaceID NamespaceID, maxObjects int, maxBytes int64) ([]evictedObject, uint64, error)
EvictByLRU walks the LRU index for a given storage+namespace and evicts the oldest objects until either maxObjects have been removed or maxBytes of content has been freed — whichever comes first. A value of 0 for either limit means "no limit on that dimension". The method is allowed to go one object over the byte threshold to prevent starvation.
All DB mutations happen atomically; filesystem deletes follow afterward. Returns the evicted objects, total bytes freed, and any error.
func (*StorageManager) GetDirs ¶
func (sm *StorageManager) GetDirs() map[StorageID]string
GetDirs returns the configured storage directories (storageID → objects dir).
func (*StorageManager) GetMetadata ¶
func (sm *StorageManager) GetMetadata(instanceHash InstanceHash) (*CacheMetadata, error)
GetMetadata retrieves metadata for an object
func (*StorageManager) GetMissingBlocks ¶
func (sm *StorageManager) GetMissingBlocks(instanceHash InstanceHash) ([]BlockRange, error)
GetMissingBlocks returns the ranges of blocks that haven't been downloaded yet
func (*StorageManager) GetObjectSize ¶
func (sm *StorageManager) GetObjectSize(instanceHash InstanceHash) (int64, error)
GetObjectSize returns the content length of a cached object
func (*StorageManager) GetSharedBlockState ¶
func (sm *StorageManager) GetSharedBlockState(instanceHash InstanceHash) (*ObjectBlockState, error)
GetSharedBlockState returns the shared, thread-safe block state for the given instanceHash. The state is loaded from the persistent database on first access and cached in a TTL cache that evicts idle entries after blockStateTTL. Every call touches the entry's TTL so actively-used states remain resident. All callers for the same instanceHash receive the same *ObjectBlockState.
func (*StorageManager) HasObject ¶
func (sm *StorageManager) HasObject(instanceHash InstanceHash) (bool, error)
HasObject checks if an object exists in storage
func (*StorageManager) IdentifyCorruptBlocks ¶
func (sm *StorageManager) IdentifyCorruptBlocks(instanceHash InstanceHash, startBlock, endBlock uint32) ([]uint32, error)
IdentifyCorruptBlocks probes each block in [startBlock, endBlock] on disk and returns the block numbers whose AES-GCM authentication tag fails or whose on-disk data is too short. Blocks that are not in the downloaded bitmap are skipped. A missing or unopenable file returns all requested blocks.
func (*StorageManager) InitDiskStorage ¶
func (sm *StorageManager) InitDiskStorage(ctx context.Context, instanceHash InstanceHash, contentLength int64, storageID StorageID, namespaceID NamespaceID) (*CacheMetadata, error)
InitDiskStorage initializes disk storage for a large object in the specified storage directory. The full contentLength is charged to the usage counter at creation time so that the accounting matches the filesystem's pre-allocation (Truncate). Returns the metadata with encryption keys set up.
func (*StorageManager) InitLazyChunkedStorage ¶
func (sm *StorageManager) InitLazyChunkedStorage( ctx context.Context, instanceHash InstanceHash, contentLength int64, chunkSizeCode ChunkSizeCode, ) (*CacheMetadata, error)
InitLazyChunkedStorage initializes metadata for a chunked object WITHOUT creating chunk files. Files are created lazily when AllocateChunk is called (typically on first write to each chunk). This supports byte-range downloads where chunks may be written out of order.
All chunk StorageIDs are initialized to 0 (unallocated). Use AllocateChunk to assign a storage directory and create the file for each chunk before writing.
Parameters:
- instanceHash: unique identifier for this object version
- contentLength: total object size in bytes
- chunkSizeCode: chunk size encoding (must be non-zero for chunked storage)
func (*StorageManager) InlineMaxBytes ¶
func (sm *StorageManager) InlineMaxBytes() int
InlineMaxBytes returns the configured maximum inline object size.
func (*StorageManager) InvalidateSharedBlockState ¶
func (sm *StorageManager) InvalidateSharedBlockState(instanceHash InstanceHash)
InvalidateSharedBlockState removes the cached block state for an instanceHash, forcing the next GetSharedBlockState call to reload from the database. This should be called when an object is deleted or evicted.
func (*StorageManager) IsComplete ¶
func (sm *StorageManager) IsComplete(instanceHash InstanceHash) (bool, error)
IsComplete checks if all blocks have been downloaded
func (*StorageManager) MergeMetadata ¶
func (sm *StorageManager) MergeMetadata(instanceHash InstanceHash, meta *CacheMetadata) error
MergeMetadata performs an atomic read-modify-update of object metadata
func (*StorageManager) NewBlockWriter ¶
func (sm *StorageManager) NewBlockWriter(instanceHash InstanceHash, startBlock uint32, existingBitmap *roaring.Bitmap, onComplete func()) (*BlockWriter, error)
NewBlockWriter creates a new block writer for streaming encrypted writes to disk. The existingBitmap parameter allows resuming partial downloads by skipping already-downloaded blocks. startBlock specifies which block number the first incoming byte corresponds to (use 0 to start from the beginning).
func (*StorageManager) NewObjectReader ¶
func (sm *StorageManager) NewObjectReader(instanceHash InstanceHash) (*ObjectReader, error)
NewObjectReader creates a reader for a cached object
func (*StorageManager) ReadBlocks ¶
func (sm *StorageManager) ReadBlocks(instanceHash InstanceHash, startOffset int64, length int) ([]byte, error)
ReadBlocks reads and decrypts blocks from disk storage. It uses the shared ObjectBlockState to check block availability.
func (*StorageManager) ReadBlocksInto ¶
func (sm *StorageManager) ReadBlocksInto(dst []byte, instanceHash InstanceHash, startOffset int64) (int, error)
ReadBlocksInto reads and decrypts blocks from disk storage directly into the caller-provided dst buffer. It returns the number of bytes written to dst. This avoids the result allocation and copy that ReadBlocks performs. dst must be large enough to hold the requested data.
func (*StorageManager) ReadInline ¶
func (sm *StorageManager) ReadInline(instanceHash InstanceHash) ([]byte, error)
ReadInline reads small data from inline storage
func (*StorageManager) SetMetadata ¶
func (sm *StorageManager) SetMetadata(instanceHash InstanceHash, meta *CacheMetadata) error
SetMetadata stores metadata for an object (full replace, use for initial creation only)
func (*StorageManager) StoreInline ¶
func (sm *StorageManager) StoreInline(ctx context.Context, instanceHash InstanceHash, meta *CacheMetadata, data []byte) error
StoreInline stores small data inline in BadgerDB
func (*StorageManager) WriteBlocks ¶
func (sm *StorageManager) WriteBlocks(instanceHash InstanceHash, startOffset int64, data []byte) error
WriteBlocks writes encrypted blocks to disk storage This is the main write path for large objects
type StorageUsageKey ¶
type StorageUsageKey struct {
StorageID StorageID
NamespaceID NamespaceID
}
StorageUsageKey combines storage ID and namespace ID for usage tracking
type VerificationResult ¶
type VerificationResult struct {
InstanceHash string `json:"instance_hash"`
Valid bool `json:"valid"`
Error string `json:"error,omitempty"`
ChecksumStatus []ChecksumStatus `json:"checksum_status,omitempty"`
}
VerificationResult contains the result of a checksum verification.
Source Files
¶
- block_fetcher.go
- block_state.go
- cache_authz.go
- cache_control.go
- cache_size_unix.go
- chunking.go
- consistency.go
- database.go
- encryption.go
- eviction.go
- introspect.go
- local_cache.go
- persistent_cache.go
- persistent_cache_api.go
- prestage_evict_api.go
- range_reader.go
- schema.go
- storage.go
- storage_fadvise_linux.go
- test_helpers.go