Documentation
¶
Overview ¶
Package core provides test helper functions for managing test environment
Index ¶
- Constants
- Variables
- func AddDeduplicationToCron(scheduler *CronScheduler)
- func CalculateChecksums(data []byte) (hdrXXH3 int64, fullXXH3 int64, sha256_0 int64, sha256_1 int64, sha256_2 int64, ...)
- func CalculateChecksumsFromReader(reader io.Reader, size int64) (uint64, uint64, int64, int64, int64, int64, error)
- func CleanAllTestData() error
- func CleanRecycleBin(c Ctx, bktID int64, h Handler, ma MetadataAdapter, da DataAdapter, ...) error
- func CleanTestBucketData(bktID int64) error
- func CleanTestDB(bktID int64) error
- func CleanupTestEnv()
- func ClearCompressionKind(kind *uint32)
- func ClearEncryptionKind(kind *uint32)
- func CompressData(data []byte, cmpr archiver.Compressor, kind *uint32) ([]byte, error)
- func CompressReader(reader io.Reader, writer io.Writer, cmpr archiver.Compressor) error
- func CreateCompressor(cmprWay uint32, cmprQlty uint32) archiver.Compressor
- func CreateDecompressor(kind uint32) archiver.Decompressor
- func DecompressData(data []byte, decompr archiver.Decompressor) ([]byte, error)
- func DecompressReader(reader io.Reader, writer io.Writer, decompr archiver.Decompressor) error
- func DecryptData(data []byte, kind uint32, key string) ([]byte, error)
- func DeleteObject(c Ctx, bktID, id int64, ma MetadataAdapter) error
- func EncryptData(data []byte, kind uint32, key string) ([]byte, error)
- func EscapeSQLString(s string) string
- func ExtendDataInfoTable(db *sql.DB) error
- func GetCompressionName(cmprWay uint32) string
- func GetCompressionType(kind uint32) uint32
- func GetDBWithType(connType DBConnectionType, dirPath string) (*sql.DB, error)
- func GetEncryptionName(endecWay uint32) string
- func GetEncryptionType(kind uint32) uint32
- func GetMainDBWithKey(basePathOrKey ...interface{}) (*sql.DB, error)
- func GetReadDB(dirPath string) (*sql.DB, error)
- func GetWriteDB(dirPath string) (*sql.DB, error)
- func HasCompression(kind uint32) bool
- func HasEncryption(kind uint32) bool
- func HashPassword(password string) (string, error)
- func InitBucketDB(dataPath string, bktID int64, key ...string) error
- func InitDB(basePath, key string) error
- func InitDBPool(maxReadConns, maxWriteConns, maxIdleConns int, connMaxLifetime time.Duration)
- func InitIDGen(ig *idgen.IDGen)
- func InitSnapshotTables(db *sql.DB) error
- func InitTestEnv()
- func IsInstantUploadEnabled() bool
- func IsInstantUploadEnabledWithConfig(cfg *InstantUploadConfig) bool
- func IsSparseFile(dataInfo *DataInfo) bool
- func MarkObjectAsDeleted(c Ctx, bktID, id int64, ma MetadataAdapter) error
- func MarkSparseFile(dataInfo *DataInfo)
- func NewID() int64
- func Now() int64
- func NowNano() int64
- func PermanentlyDeleteObject(c Ctx, bktID, id int64, h Handler, ma MetadataAdapter, da DataAdapter) error
- func PermanentlyDeleteObjectBatch(c Ctx, bktID int64, ids []int64, h Handler, ma MetadataAdapter, da DataAdapter) map[int64]error
- func ProcessData(data []byte, kind *uint32, cmprQlty uint32, encryptionKey string, ...) ([]byte, error)
- func SetCompressionKind(kind *uint32, cmprWay uint32)
- func SetEncryptionKind(kind *uint32, endecWay uint32)
- func SetupTestDirs(prefix string) (baseDir, dataDir string, cleanup func())
- func ShouldCompressFile(fileName string, firstChunk []byte) bool
- func ShouldCompressFileByName(fileName string) bool
- func UnprocessData(data []byte, kind uint32, decryptionKey string) ([]byte, error)
- func UpdateFileLatestVersion(c Ctx, bktID int64, ma MetadataAdapter) error
- func ValidateEncryptionKey(endecWay uint32, key string) error
- type ACLMetadataAdapter
- type AccessCtrlMgr
- type Admin
- type BaseMetadataAdapter
- type BucketACL
- type BucketInfo
- type BucketMetadataAdapter
- type BucketSnapshot
- type BucketStatsDelta
- type Config
- type CronJobConfig
- type CronSchedule
- type CronScheduler
- type Ctx
- type DBConnectionType
- type DBPool
- type DataAdapter
- type DataInfo
- type DataInfoMetadataAdapter
- type DataMetadataAdapter
- type DatabasePool
- type DeduplicationConfig
- type DeduplicationJobResult
- type DefaultAccessCtrlMgr
- type DefaultBaseMetadataAdapter
- func (dba *DefaultBaseMetadataAdapter) CheckPermission(c Ctx, bktID int64, uid int64, action int) (bool, error)
- func (dba *DefaultBaseMetadataAdapter) Close()
- func (dba *DefaultBaseMetadataAdapter) DeleteACL(c Ctx, bktID int64, uid int64) error
- func (dba *DefaultBaseMetadataAdapter) DeleteAllACL(c Ctx, bktID int64) error
- func (dba *DefaultBaseMetadataAdapter) DeleteBkt(c Ctx, bktID int64) error
- func (dba *DefaultBaseMetadataAdapter) DeleteUser(c Ctx, userID int64) error
- func (dba *DefaultBaseMetadataAdapter) GetBkt(c Ctx, ids []int64) (o []*BucketInfo, err error)
- func (dba *DefaultBaseMetadataAdapter) GetUsr(c Ctx, ids []int64) (o []*UserInfo, err error)
- func (dba *DefaultBaseMetadataAdapter) GetUsr2(c Ctx, usr string) (o *UserInfo, err error)
- func (dba *DefaultBaseMetadataAdapter) ListACL(c Ctx, bktID int64) ([]*BucketACL, error)
- func (dba *DefaultBaseMetadataAdapter) ListACLByUser(c Ctx, uid int64) ([]*BucketACL, error)
- func (dba *DefaultBaseMetadataAdapter) ListUsers(c Ctx) (o []*UserInfo, err error)
- func (dba *DefaultBaseMetadataAdapter) PutACL(c Ctx, bktID int64, uid int64, perm int) error
- func (dba *DefaultBaseMetadataAdapter) PutBkt(c Ctx, o []*BucketInfo) error
- func (dba *DefaultBaseMetadataAdapter) PutUsr(c Ctx, u *UserInfo) error
- func (dba *DefaultBaseMetadataAdapter) SetBaseKey(key string)
- func (dba *DefaultBaseMetadataAdapter) SetBasePath(basePath string)
- func (dba *DefaultBaseMetadataAdapter) SetPath(basePath string)
- func (dba *DefaultBaseMetadataAdapter) SetUsr(c Ctx, fields []string, u *UserInfo) error
- func (dba *DefaultBaseMetadataAdapter) UpdateBktQuota(c Ctx, bktID int64, quota int64) error
- type DefaultDataAdapter
- func (dda *DefaultDataAdapter) CleanupUnreferencedData(ctx Ctx, bktID int64) error
- func (dda *DefaultDataAdapter) Close()
- func (dda *DefaultDataAdapter) CopyData(ctx Ctx, bktID, srcDataID, dstDataID int64) error
- func (dda *DefaultDataAdapter) DecrementSnapshotDataRefs(ctx Ctx, snapshotID int64) error
- func (dda *DefaultDataAdapter) Delete(c Ctx, bktID, dataID int64, sn int) error
- func (dda *DefaultDataAdapter) IncrementSnapshotDataRefs(ctx Ctx, snapshotID int64) error
- func (dda *DefaultDataAdapter) IncrementSnapshotDataRefsTx(ctx Ctx, tx Transaction, snapshotID int64) error
- func (dda *DefaultDataAdapter) MarkDataAsShared(ctx Ctx, bktID, snapshotID int64) error
- func (dda *DefaultDataAdapter) MarkDataAsSharedTx(ctx Ctx, tx Transaction, bktID, snapshotID int64) error
- func (dda *DefaultDataAdapter) Read(c Ctx, bktID, dataID int64, sn int) ([]byte, error)
- func (dda *DefaultDataAdapter) ReadBytes(c Ctx, bktID, dataID int64, sn, offset, size int) ([]byte, error)
- func (dda *DefaultDataAdapter) SetDataPath(dataPath string)
- func (dda *DefaultDataAdapter) Write(c Ctx, bktID, dataID int64, sn int, buf []byte) error
- func (dda *DefaultDataAdapter) WriteWithCOW(ctx Ctx, bktID, objID int64, data []byte) (int64, error)
- type DefaultDataMetadataAdapter
- func (dda *DefaultDataMetadataAdapter) Close()
- func (dda *DefaultDataMetadataAdapter) ListAllBuckets(c Ctx) (o []*BucketInfo, err error)
- func (dda *DefaultDataMetadataAdapter) SetDataKey(key string)
- func (dda *DefaultDataMetadataAdapter) SetDataPath(dataPath string)
- func (dda *DefaultDataMetadataAdapter) SetPath(dataPath string)
- type DefaultMetadataAdapter
- func (dma *DefaultMetadataAdapter) BeginTransaction(ctx Ctx) (Transaction, error)
- func (dma *DefaultMetadataAdapter) Close()
- func (dma *DefaultMetadataAdapter) CopyObjectsToSnapshot(ctx Ctx, snapshotID, bktID int64) (fileCount, totalSize int64, err error)
- func (dma *DefaultMetadataAdapter) CopyObjectsToSnapshotTx(ctx Ctx, tx Transaction, snapshotID, bktID int64) (fileCount, totalSize int64, err error)
- func (dma *DefaultMetadataAdapter) CountDataRefs(c Ctx, bktID int64, dataIDs []int64) (map[int64]int64, error)
- func (dma *DefaultMetadataAdapter) DecBktDedupSavings(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) DecBktLogicalUsed(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) DecBktRealUsed(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) DecBktUsed(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) DeleteBkt(c Ctx, bktID int64) error
- func (dma *DefaultMetadataAdapter) DeleteBucketSnapshot(ctx Ctx, snapshotID int64) error
- func (dma *DefaultMetadataAdapter) DeleteData(c Ctx, bktID int64, dataIDs []int64) error
- func (dma *DefaultMetadataAdapter) DeleteObj(c Ctx, bktID int64, id int64) error
- func (dma *DefaultMetadataAdapter) DeleteSnapshotObjects(ctx Ctx, snapshotID int64) error
- func (dma *DefaultMetadataAdapter) FindDuplicateData(c Ctx, bktID int64, offset, limit int) (groups []DuplicateGroup, total int64, err error)
- func (dma *DefaultMetadataAdapter) FindSmallPackageData(c Ctx, bktID int64, maxSize int64, offset, limit int) (d []*DataInfo, total int64, err error)
- func (dma *DefaultMetadataAdapter) GetAttr(c Ctx, bktID int64, objID int64, key string) ([]byte, error)
- func (dma *DefaultMetadataAdapter) GetBkt(c Ctx, ids []int64) (o []*BucketInfo, err error)
- func (dma *DefaultMetadataAdapter) GetBucketSnapshot(ctx Ctx, snapshotID int64) (*BucketSnapshot, error)
- func (dma *DefaultMetadataAdapter) GetBucketSnapshotByName(ctx Ctx, bktID int64, name string) (*BucketSnapshot, error)
- func (dma *DefaultMetadataAdapter) GetData(c Ctx, bktID, id int64) (d *DataInfo, err error)
- func (dma *DefaultMetadataAdapter) GetObj(c Ctx, bktID int64, ids []int64) (o []*ObjectInfo, err error)
- func (dma *DefaultMetadataAdapter) GetObjByDataID(c Ctx, bktID int64, dataID int64) ([]*ObjectInfo, error)
- func (dma *DefaultMetadataAdapter) GetObjectsByVersion(ctx Ctx, bktID, version int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
- func (dma *DefaultMetadataAdapter) GetSnapshotObjects(ctx Ctx, snapshotID int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
- func (dma *DefaultMetadataAdapter) IncBktDedupSavings(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) IncBktLogicalUsed(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) IncBktRealUsed(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) IncBktUsed(c Ctx, bktID int64, size int64) error
- func (dma *DefaultMetadataAdapter) InsertBucketSnapshot(ctx Ctx, snapshot *BucketSnapshot) (int64, error)
- func (dma *DefaultMetadataAdapter) InsertBucketSnapshotTx(ctx Ctx, tx Transaction, snapshot *BucketSnapshot) (int64, error)
- func (dma *DefaultMetadataAdapter) ListAllData(c Ctx, bktID int64, offset, limit int) (d []*DataInfo, total int64, err error)
- func (dma *DefaultMetadataAdapter) ListAttrs(c Ctx, bktID int64, objID int64) ([]string, error)
- func (dma *DefaultMetadataAdapter) ListBucketSnapshots(ctx Ctx, bktID int64, limit, offset int) ([]*BucketSnapshot, error)
- func (dma *DefaultMetadataAdapter) ListChildren(c Ctx, bktID int64, pid int64, offset, limit int) ([]*ObjectInfo, int64, error)
- func (dma *DefaultMetadataAdapter) ListDeletedObjs(c Ctx, bktID int64, beforeTime int64, limit int) ([]*ObjectInfo, error)
- func (dma *DefaultMetadataAdapter) ListObj(c Ctx, bktID, pid int64, wd, delim, order string, count int) (o []*ObjectInfo, cnt int64, d string, err error)
- func (dma *DefaultMetadataAdapter) ListObjsByType(c Ctx, bktID int64, objType int, offset, limit int) ([]*ObjectInfo, int64, error)
- func (dma *DefaultMetadataAdapter) ListRecycleBin(c Ctx, bktID int64, opt ListOptions) (o []*ObjectInfo, cnt int64, d string, err error)
- func (dma *DefaultMetadataAdapter) ListVersions(c Ctx, bktID int64, fileID int64) ([]*ObjectInfo, error)
- func (dma *DefaultMetadataAdapter) MarkBucketObjectsAsDeleted(ctx Ctx, bktID int64) error
- func (dma *DefaultMetadataAdapter) MarkBucketObjectsAsDeletedTx(ctx Ctx, tx Transaction, bktID int64) error
- func (dma *DefaultMetadataAdapter) PutBkt(c Ctx, o []*BucketInfo) error
- func (dma *DefaultMetadataAdapter) PutData(c Ctx, bktID int64, d []*DataInfo) error
- func (dma *DefaultMetadataAdapter) PutDataAndObj(c Ctx, bktID int64, d []*DataInfo, o []*ObjectInfo) error
- func (dma *DefaultMetadataAdapter) PutObj(c Ctx, bktID int64, o []*ObjectInfo) (ids []int64, err error)
- func (dma *DefaultMetadataAdapter) RefData(c Ctx, bktID int64, d []*DataInfo) ([]int64, error)
- func (dma *DefaultMetadataAdapter) RemoveAttr(c Ctx, bktID int64, objID int64, key string) error
- func (dma *DefaultMetadataAdapter) RestoreObjectsFromSnapshot(ctx Ctx, snapshotID, targetBktID int64) error
- func (dma *DefaultMetadataAdapter) RestoreObjectsFromSnapshotTx(ctx Ctx, tx Transaction, snapshotID, targetBktID int64) error
- func (dma *DefaultMetadataAdapter) SetAttr(c Ctx, bktID int64, objID int64, key string, value []byte) error
- func (dma *DefaultMetadataAdapter) SetBaseKey(key string)
- func (dma *DefaultMetadataAdapter) SetBasePath(basePath string)
- func (dma *DefaultMetadataAdapter) SetDataKey(key string)
- func (dma *DefaultMetadataAdapter) SetDataPath(dataPath string)
- func (dma *DefaultMetadataAdapter) SetObj(c Ctx, bktID int64, fields []string, o *ObjectInfo) error
- func (dma *DefaultMetadataAdapter) UpdateBktQuota(c Ctx, bktID int64, quota int64) error
- func (dma *DefaultMetadataAdapter) UpdateObjDataID(c Ctx, bktID int64, oldDataID, newDataID int64) error
- func (dma *DefaultMetadataAdapter) UpdateSnapshotStats(ctx Ctx, snapshotID int64, fileCount, totalSize int64, status SnapshotStatus) error
- func (dma *DefaultMetadataAdapter) UpdateSnapshotStatsTx(ctx Ctx, tx Transaction, snapshotID int64, fileCount, totalSize int64, ...) error
- func (dma *DefaultMetadataAdapter) UpdateSnapshotStatus(ctx Ctx, snapshotID int64, status SnapshotStatus) error
- type DefragmentResult
- type DirtyDataResult
- type DuplicateGroup
- type Error
- type FixScrubIssuesResult
- type Handler
- type InstantUploadConfig
- type ListOptions
- type LocalAdmin
- func (la *LocalAdmin) Close()
- func (la *LocalAdmin) CreateUser(c Ctx, username, password, name string, role uint32) (*UserInfo, error)
- func (la *LocalAdmin) Defragment(c Ctx, bktID int64) (*DefragmentResult, error)
- func (la *LocalAdmin) DeleteACL(c Ctx, bktID int64, uid int64) error
- func (la *LocalAdmin) DeleteAllACL(c Ctx, bktID int64) error
- func (la *LocalAdmin) DeleteBkt(c Ctx, bktID int64) error
- func (la *LocalAdmin) DeleteUser(c Ctx, userID int64) error
- func (la *LocalAdmin) ListACL(c Ctx, bktID int64) ([]*BucketACL, error)
- func (la *LocalAdmin) ListACLByUser(c Ctx, uid int64) ([]*BucketACL, error)
- func (la *LocalAdmin) ListUsers(c Ctx) ([]*UserInfo, error)
- func (la *LocalAdmin) MergeDuplicateData(c Ctx, bktID int64) (*MergeDuplicateResult, error)
- func (la *LocalAdmin) New(Admin) Admin
- func (la *LocalAdmin) PutACL(c Ctx, bktID int64, uid int64, perm int) error
- func (la *LocalAdmin) PutBkt(c Ctx, o []*BucketInfo) error
- func (la *LocalAdmin) ScanDirtyData(c Ctx, bktID int64) (*DirtyDataResult, error)
- func (la *LocalAdmin) ScanOrphanedChunks(c Ctx, bktID int64, delaySeconds int) (*ScanOrphanedChunksResult, error)
- func (la *LocalAdmin) Scrub(c Ctx, bktID int64) (*ScrubResult, error)
- func (la *LocalAdmin) SetQuota(c Ctx, bktID int64, quota int64) error
- func (la *LocalAdmin) UpdateUser(c Ctx, userID int64, username, password, name string, role *uint32) error
- type LocalHandler
- func (lh *LocalHandler) AccessCtrlMgr() AccessCtrlMgr
- func (lh *LocalHandler) CleanRecycleBin(c Ctx, bktID int64, targetID int64) error
- func (lh *LocalHandler) Close()
- func (lh *LocalHandler) CreateVersionFromFile(c Ctx, bktID, existingFileID int64) error
- func (lh *LocalHandler) DataAdapter() DataAdapter
- func (lh *LocalHandler) Delete(c Ctx, bktID, id int64) error
- func (lh *LocalHandler) DeleteBatch(c Ctx, bktID int64, ids []int64) map[int64]error
- func (lh *LocalHandler) Get(c Ctx, bktID int64, ids []int64) ([]*ObjectInfo, error)
- func (lh *LocalHandler) GetBktInfo(c Ctx, bktID int64) (*BucketInfo, error)
- func (lh *LocalHandler) GetData(c Ctx, bktID, id int64, sn int, offsetOrSize ...int) ([]byte, error)
- func (lh *LocalHandler) GetDataAdapter() DataAdapter
- func (lh *LocalHandler) GetDataInfo(c Ctx, bktID, id int64) (*DataInfo, error)
- func (lh *LocalHandler) List(c Ctx, bktID, pid int64, opt ListOptions) ([]*ObjectInfo, int64, string, error)
- func (lh *LocalHandler) ListRecycleBin(c Ctx, bktID int64, opt ListOptions) ([]*ObjectInfo, int64, string, error)
- func (lh *LocalHandler) Login(c Ctx, usr, pwd string) (Ctx, *UserInfo, []*BucketInfo, error)
- func (lh *LocalHandler) MetadataAdapter() MetadataAdapter
- func (lh *LocalHandler) MoveTo(c Ctx, bktID, id, pid int64) error
- func (lh *LocalHandler) New(Handler) Handler
- func (lh *LocalHandler) Put(c Ctx, bktID int64, o []*ObjectInfo) ([]int64, error)
- func (lh *LocalHandler) PutData(c Ctx, bktID, dataID int64, sn int, buf []byte) (int64, error)
- func (lh *LocalHandler) PutDataFromReader(c Ctx, bktID, dataID int64, sn int, r io.Reader, size int64) (int64, error)
- func (lh *LocalHandler) PutDataInfo(c Ctx, bktID int64, d []*DataInfo) (ids []int64, err error)
- func (lh *LocalHandler) PutDataInfoAndObj(c Ctx, bktID int64, d []*DataInfo, o []*ObjectInfo) error
- func (lh *LocalHandler) Recycle(c Ctx, bktID, id int64) error
- func (lh *LocalHandler) Ref(c Ctx, bktID int64, d []*DataInfo) ([]int64, error)
- func (lh *LocalHandler) Rename(c Ctx, bktID, id int64, name string) error
- func (lh *LocalHandler) ScanOrphanedChunks(c Ctx, bktID int64, delaySeconds int) (*ScanOrphanedChunksResult, error)
- func (lh *LocalHandler) SetAccessCtrlMgr(acm AccessCtrlMgr)
- func (lh *LocalHandler) SetAdapter(ma MetadataAdapter, da DataAdapter)
- func (lh *LocalHandler) SetDBKey(key string)
- func (lh *LocalHandler) UpdateFileLatestVersion(c Ctx, bktID int64) error
- type MergeDuplicateResult
- type MetadataAdapter
- type NoAuthAccessCtrlMgr
- type ObjectInfo
- type ObjectMetadataAdapter
- type ResourceControlConfig
- type ResourceController
- type ScanOrphanedChunksResult
- type ScrubResult
- type SnapshotConfig
- type SnapshotDataAdapter
- type SnapshotManager
- func (sm *SnapshotManager) CleanupExpiredSnapshots(ctx context.Context, bktID int64) (int, error)
- func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, bktID int64, name, description string, ...) (*BucketSnapshot, error)
- func (sm *SnapshotManager) DeleteSnapshot(ctx context.Context, snapshotID int64) error
- func (sm *SnapshotManager) GetSnapshot(ctx context.Context, snapshotID int64) (*BucketSnapshot, error)
- func (sm *SnapshotManager) GetSnapshotByName(ctx context.Context, bktID int64, name string) (*BucketSnapshot, error)
- func (sm *SnapshotManager) GetSnapshotFiles(ctx context.Context, snapshotID int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
- func (sm *SnapshotManager) ListSnapshots(ctx context.Context, bktID int64, limit, offset int) ([]*BucketSnapshot, error)
- func (sm *SnapshotManager) RestoreSnapshot(ctx context.Context, snapshotID int64, targetBktID int64) error
- type SnapshotMetadataAdapter
- type SnapshotObject
- type SnapshotStatus
- type SnapshotType
- type Transaction
- type UserInfo
- type UserMetadataAdapter
- type WriteBufferConfig
Constants ¶
const ( NA = 0 DR = 1 << (iota - 1) // Data Read DW // Data Write DD // Data Delete MDR // Metadata Read MDW // Metadata Write MDD // Metadata Delete DRW = DR | DW // Data Read/Write MDRW = MDR | MDW // Metadata Read/Write READ = DR | MDR // Data and Metadata Read WRITE = DW | MDW // Data and Metadata Write DELETE = DD | MDD // Data and Metadata Delete ALL = READ | WRITE | DELETE // Data and Metadata Read/Write/Delete )
const ( USER = iota // Regular User ADMIN // Administrator )
const ( REF_LEVEL_OFF = iota // OFF REF_LEVEL_FULL // Read entire file REF_LEVEL_FAST // Read entire file after header check succeeds )
Instant upload level setting
const ( CONFLICT_COVER = iota // Merge or overwrite CONFLICT_RENAME // Rename CONFLICT_THROW // Throw error CONFLICT_SKIP // Skip )
Conflict resolution for same name
const ( ERR_AUTH_FAILED = Error("auth failed") ERR_NEED_LOGIN = Error("need login") ERR_INCORRECT_PWD = Error("incorrect username or password") ERR_NO_PERM = Error("no permission") ERR_NO_ROLE = Error("role mismatch") ERR_OPEN_FILE = Error("open file failed") ERR_READ_FILE = Error("read file failed") ERR_OPEN_DB = Error("open db failed") ERR_QUERY_DB = Error("query db failed") ERR_EXEC_DB = Error("exec db failed") ERR_DUP_KEY = Error("object with same name already exists") ERR_QUOTA_EXCEED = Error("quota exceeded") )
const ( OBJ_TYPE_MALFORMED = iota - 1 OBJ_TYPE_NONE OBJ_TYPE_DIR OBJ_TYPE_FILE OBJ_TYPE_VERSION OBJ_TYPE_JOURNAL // Journal snapshot (lightweight version with incremental changes) )
Object types
const ( DATA_NORMAL = uint32(1 << iota) // Normal DATA_ENDEC_AES256 // Whether AES encrypted DATA_ENDEC_SM4 // Whether SM4 encrypted DATA_ENDEC_RESERVED // Whether reserved encryption DATA_CMPR_SNAPPY // Whether snappy compressed DATA_CMPR_ZSTD // Whether zstd compressed DATA_CMPR_GZIP // Whether gzip compressed DATA_CMPR_BR // Whether brotli compressed DATA_KIND_IMG // Image type DATA_KIND_VIDEO // Video type DATA_KIND_AUDIO // Audio type DATA_KIND_ARCHIVE // Archive type DATA_KIND_DOCS // Document type DATA_KIND_FONT // Font type DATA_KIND_APP // Application type DATA_KIND_RESERVED // Unknown type DATA_SPARSE // Sparse file: chunks may not exist, read as zeros DATA_MALFORMED = 0 // Whether corrupted DATA_ENDEC_MASK = DATA_ENDEC_AES256 | DATA_ENDEC_SM4 | DATA_ENDEC_RESERVED DATA_CMPR_MASK = DATA_CMPR_SNAPPY | DATA_CMPR_ZSTD | DATA_CMPR_GZIP | DATA_CMPR_BR DATA_KIND_MASK = DATA_KIND_IMG | DATA_KIND_VIDEO | DATA_KIND_AUDIO | DATA_KIND_ARCHIVE | DATA_KIND_DOCS | DATA_KIND_FONT | DATA_KIND_APP | DATA_KIND_RESERVED )
Data status
const ( ACL_TBL = "acl" // Access Control List table USR_TBL = "usr" BKT_TBL = "bkt" OBJ_TBL = "obj" DATA_TBL = "data" ATTR_TBL = "attr" // Extended attributes (xattr) table )
const DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 // 10MB
Default chunk size
const DefaultHdrSize = 102400
HdrSize is the size of header data used for HdrXXH3 calculation (first 100KB)
const DefaultListPageSize = 1000
DefaultListPageSize Default list page size
const EmptyDataID = 4708888888888
const (
MODE_NAME_ENCRYPTED uint32 = 1 << 31 // Name field is encrypted (when dataDBKey is set)
)
Mode flags (using high bits to avoid conflict with standard Unix permissions 0-0777)
const WritingVersionName = "0"
WritingVersionName is the special name for "writing version" that allows direct data block modification Versions with name="0" can be directly modified without creating new versions
Variables ¶
var DeleteDelaySeconds = func() int64 { if delay := os.Getenv("ORCAS_DELETE_DELAY"); delay != "" { if d, err := strconv.ParseInt(delay, 10, 64); err == nil && d > 0 { return d } } return 5 }()
DeleteDelaySeconds Delete delay time (seconds), wait for the specified time before deleting data files Can be overridden via environment variable ORCAS_DELETE_DELAY, default is 5 seconds
Functions ¶
func AddDeduplicationToCron ¶
func AddDeduplicationToCron(scheduler *CronScheduler)
AddDeduplicationToCron adds deduplication job to the given CronScheduler This should be called during CronScheduler initialization
func CalculateChecksums ¶
func CalculateChecksums(data []byte) (hdrXXH3 int64, fullXXH3 int64, sha256_0 int64, sha256_1 int64, sha256_2 int64, sha256_3 int64)
CalculateChecksums calculates HdrXXH3, XXH3, and SHA-256 checksums from data Returns HdrXXH3, XXH3, SHA256_0, SHA256_1, SHA256_2, SHA256_3, and error This function is used for instant upload (deduplication) feature
func CalculateChecksumsFromReader ¶
func CalculateChecksumsFromReader(reader io.Reader, size int64) (uint64, uint64, int64, int64, int64, int64, error)
CalculateChecksumsFromReader calculates checksums by reading from an io.Reader This is more memory-efficient for large files Returns HdrXXH3, XXH3, SHA256_0, SHA256_1, SHA256_2, SHA256_3, and error
func CleanRecycleBin ¶
func CleanRecycleBin(c Ctx, bktID int64, h Handler, ma MetadataAdapter, da DataAdapter, targetID int64) error
CleanRecycleBin cleans up objects marked as deleted in recycle bin (physically delete unreferenced data files and metadata) targetID of 0 means clean all eligible objects, otherwise only clean specified object Ensure only one cleanup operation is executing for the same bktID at a time
func CleanTestBucketData ¶
CleanTestBucketData cleans up all data files for a specific bucket
func CleanTestDB ¶
CleanTestDB cleans up database files for a specific bucket This also closes connection pools to ensure clean state
func ClearCompressionKind ¶
func ClearCompressionKind(kind *uint32)
ClearCompressionKind removes compression kind flags
func ClearEncryptionKind ¶
func ClearEncryptionKind(kind *uint32)
ClearEncryptionKind removes encryption kind flags
func CompressData ¶
CompressData compresses data using the specified compressor Returns compressed data if compression succeeds and reduces size, otherwise returns original data The kind parameter will be modified to remove compression flag if compression fails or doesn't reduce size
func CompressReader ¶
CompressReader compresses data from reader to writer
func CreateCompressor ¶
func CreateCompressor(cmprWay uint32, cmprQlty uint32) archiver.Compressor
CreateCompressor creates a compressor based on compression type and quality Returns nil if compression is disabled (cmprWay == 0)
func CreateDecompressor ¶
func CreateDecompressor(kind uint32) archiver.Decompressor
CreateDecompressor creates a decompressor based on compression flags in kind Returns nil if no compression is present
func DecompressData ¶
func DecompressData(data []byte, decompr archiver.Decompressor) ([]byte, error)
DecompressData decompresses data using the specified decompressor
func DecompressReader ¶
DecompressReader decompresses data from reader to writer
func DecryptData ¶
DecryptData decrypts data using the specified encryption method Returns decrypted data if decryption succeeds, otherwise returns error
func DeleteObject ¶
func DeleteObject(c Ctx, bktID, id int64, ma MetadataAdapter) error
DeleteObject marks object as deleted (recursively delete child objects)
func EncryptData ¶
EncryptData encrypts data using the specified encryption method Returns encrypted data if encryption succeeds, otherwise returns error
func EscapeSQLString ¶
EscapeSQLString 转义SQL字符串,只遍历一次字符串 将单引号 ' 转义为 ”,防止SQL注入
func ExtendDataInfoTable ¶
ExtendDataInfoTable adds snapshot-related columns to data_info table
func GetCompressionName ¶
GetCompressionName returns human-readable compression algorithm name
func GetCompressionType ¶
GetCompressionType returns the compression type from kind
func GetDBWithType ¶
func GetDBWithType(connType DBConnectionType, dirPath string) (*sql.DB, error)
GetDBWithType gets a database connection from the pool with specified connection type dirPath: path for database directory (empty string defaults to current directory ".")
func GetEncryptionName ¶
GetEncryptionName returns human-readable encryption algorithm name
func GetEncryptionType ¶
GetEncryptionType returns the encryption type from kind
func GetMainDBWithKey ¶
GetMainDBWithKey opens main database with specified encryption key basePath: path for main database directory (empty string defaults to current directory ".") key: encryption key (optional, empty string means unencrypted) Can be called as: GetMainDBWithKey(basePath, key) or GetMainDBWithKey(basePath)
func GetReadDB ¶
GetReadDB gets a read-only database connection dirPath: path for database directory (empty string defaults to current directory ".")
func GetWriteDB ¶
GetWriteDB gets a write database connection dirPath: path for database directory (empty string defaults to current directory ".")
func HasCompression ¶
HasCompression checks if data has compression flag
func HasEncryption ¶
HasEncryption checks if data has encryption flag
func HashPassword ¶
HashPassword encrypts password using PBKDF2 Returns format: iter:salt:hash iter is randomly generated iteration count (between 1000-10000), increases password hash diversity
func InitBucketDB ¶
InitBucketDB initializes a bucket database dataPath: path for bucket databases (empty string defaults to current directory ".") bktID: bucket ID key: encryption key (optional, empty string means unencrypted)
func InitDB ¶
InitDB initializes the main database basePath: path for main database (empty string defaults to current directory ".") key: encryption key (deprecated, no longer used for database encryption)
func InitDBPool ¶
InitDBPool initializes the global database connection pool
func InitIDGen ¶
InitIDGen initializes the global ID generator If not initialized, a default ID generator will be created on first use
func InitSnapshotTables ¶
InitSnapshotTables initializes snapshot-related database tables
func InitTestEnv ¶
func InitTestEnv()
InitTestEnv initializes test environment with in-memory filesystem (tmpfs) This sets up test directories to use /dev/shm (shared memory) for faster tests
func IsInstantUploadEnabled ¶
func IsInstantUploadEnabled() bool
IsInstantUploadEnabled checks if instant upload is enabled via environment variable
func IsInstantUploadEnabledWithConfig ¶
func IsInstantUploadEnabledWithConfig(cfg *InstantUploadConfig) bool
IsInstantUploadEnabledWithConfig checks if instant upload is enabled with given config
func IsSparseFile ¶
IsSparseFile checks if DataInfo represents a sparse file Sparse files have DATA_SPARSE flag set, meaning chunks may not exist and should be read as zeros
func MarkObjectAsDeleted ¶
func MarkObjectAsDeleted(c Ctx, bktID, id int64, ma MetadataAdapter) error
MarkObjectAsDeleted marks object as deleted without recursively deleting child objects This is used for fast deletion where child objects are deleted asynchronously
func MarkSparseFile ¶
func MarkSparseFile(dataInfo *DataInfo)
MarkSparseFile marks a DataInfo as sparse file
func Now ¶
func Now() int64
Now Get current Unix timestamp (seconds) Uses time calibrator's timestamp to avoid creating temporary objects with each time.Now() call This function can be reused throughout the project to reduce GC pressure Named similar to time.Now(), but returns Unix timestamp (seconds) instead of time.Time object
func PermanentlyDeleteObject ¶
func PermanentlyDeleteObject(c Ctx, bktID, id int64, h Handler, ma MetadataAdapter, da DataAdapter) error
PermanentlyDeleteObject permanently deletes object (physically delete object and data files) Ensure only one delete operation is executing for the same object ID at a time
func PermanentlyDeleteObjectBatch ¶
func PermanentlyDeleteObjectBatch(c Ctx, bktID int64, ids []int64, h Handler, ma MetadataAdapter, da DataAdapter) map[int64]error
PermanentlyDeleteObjectBatch permanently deletes multiple objects in batch for better performance Returns a map of objID -> error for any failures (nil map means all succeeded) This is optimized for bulk deletion by: 1. Batching database queries (GetObj, CountDataRefs) 2. Processing deletes concurrently with controlled parallelism 3. Batching metadata deletions
func ProcessData ¶
func ProcessData(data []byte, kind *uint32, cmprQlty uint32, encryptionKey string, isFirstChunk bool) ([]byte, error)
ProcessData applies compression and encryption to data Processing order: Compression -> Encryption Returns processed data and updates kind flags if compression/encryption fails
Parameters:
- data: original data to process
- kind: pointer to kind flags (will be modified if processing fails)
- cmprQlty: compression quality
- encryptionKey: encryption key (can be empty if needsEncryption is false)
- isFirstChunk: whether the current chunk is the first chunk
func SetCompressionKind ¶
SetCompressionKind sets compression kind flags based on compression method
func SetEncryptionKind ¶
SetEncryptionKind sets encryption kind flags based on encryption method
func SetupTestDirs ¶
SetupTestDirs creates temporary directories for a test and returns cleanup function Returns baseDir, dataDir, and cleanup function
func ShouldCompressFile ¶
ShouldCompressFile checks if a file should be compressed based on file extension and file header Returns true if the file should be compressed, false otherwise
Algorithm:
- Check file extension first (faster than file header check)
- If extension check passed, check file header using filetype.Match
Empty chunks are allowed to be compressed
func ShouldCompressFileByName ¶
ShouldCompressFileByName determines if a file should be compressed based on its name only This checks the file extension only (no file header check) This is faster but less accurate than ShouldCompressFile
func UnprocessData ¶
UnprocessData applies decryption and decompression to data Processing order: Decryption -> Decompression (reverse of ProcessData)
Parameters:
- data: processed data to unprocess
- kind: kind flags indicating what processing was applied
- decryptionKey: decryption key (can be empty if no encryption)
func UpdateFileLatestVersion ¶
func UpdateFileLatestVersion(c Ctx, bktID int64, ma MetadataAdapter) error
UpdateFileLatestVersion recursively updates directory size and DataID Assumption: when uploading new version, file's DataID and size have been updated, so only need to update directories Recursively process directories until size and DataID stabilize (accumulate from leaf directories upward)
func ValidateEncryptionKey ¶
ValidateEncryptionKey validates if the encryption key is valid for the specified encryption method
Types ¶
type ACLMetadataAdapter ¶
type ACLMetadataAdapter interface {
// CheckPermission checks if a user has the required permission for a bucket
// Returns true if the user's ACL permission covers the required action
CheckPermission(c Ctx, bktID int64, uid int64, action int) (bool, error)
// ListACLByUser lists all buckets accessible by a user
ListACLByUser(c Ctx, uid int64) ([]*BucketACL, error)
// PutACL adds or updates an ACL entry (bktID, uid pair) with permission
// perm: permission flags (DR, DW, DD, MDR, MDW, MDD, or ALL)
PutACL(c Ctx, bktID int64, uid int64, perm int) error
// ListACL lists all ACL entries for a bucket
ListACL(c Ctx, bktID int64) ([]*BucketACL, error)
// DeleteACL deletes a specific ACL entry (bktID, uid pair)
DeleteACL(c Ctx, bktID int64, uid int64) error
// DeleteAllACL deletes all ACL entries for a bucket
DeleteAllACL(c Ctx, bktID int64) error
}
ACLMetadataAdapter provides ACL CRUD operations
type AccessCtrlMgr ¶
type Admin ¶
type Admin interface {
// 传入underlying,返回当前的,构成链式调用
New(a Admin) Admin
Close()
PutBkt(c Ctx, o []*BucketInfo) error
DeleteBkt(c Ctx, bktID int64) error
// SetQuota Set bucket quota
SetQuota(c Ctx, bktID int64, quota int64) error
// 审计数据完整性:检查元数据和数据文件的一致性
Scrub(c Ctx, bktID int64) (*ScrubResult, error)
// 扫描脏数据:检查上传失败、机器断电导致的不完整数据
ScanDirtyData(c Ctx, bktID int64) (*DirtyDataResult, error)
// 合并秒传重复数据:查找并合并具有相同校验值但不同DataID的重复数据
MergeDuplicateData(c Ctx, bktID int64) (*MergeDuplicateResult, error)
// 碎片整理:小文件离线归并打包,打包中被删除的数据块前移
Defragment(c Ctx, bktID int64) (*DefragmentResult, error)
// 扫描孤立chunk:扫描文件系统中的chunk文件,检查元数据是否存在
// delaySeconds: 延迟时间(秒),在删除前再次检查元数据
ScanOrphanedChunks(c Ctx, bktID int64, delaySeconds int) (*ScanOrphanedChunksResult, error)
// 用户管理
CreateUser(c Ctx, username, password, name string, role uint32) (*UserInfo, error)
UpdateUser(c Ctx, userID int64, username, password, name string, role *uint32) error
DeleteUser(c Ctx, userID int64) error
ListUsers(c Ctx) ([]*UserInfo, error)
// PutACL adds or updates an ACL entry (bktID, uid pair) with permission
// perm: permission flags (DR, DW, DD, MDR, MDW, MDD, or ALL)
PutACL(c Ctx, bktID int64, uid int64, perm int) error
// ListACL lists all ACL entries for a bucket
ListACL(c Ctx, bktID int64) ([]*BucketACL, error)
// DeleteACL deletes a specific ACL entry (bktID, uid pair)
DeleteACL(c Ctx, bktID int64, uid int64) error
// DeleteAllACL deletes all ACL entries for a bucket
DeleteAllACL(c Ctx, bktID int64) error
// ListACLByUser lists all buckets accessible by a user
ListACLByUser(c Ctx, uid int64) ([]*BucketACL, error)
}
func NewAdminWithAccessCtrl ¶
func NewAdminWithAccessCtrl(acm AccessCtrlMgr) Admin
NewAdminWithAccessCtrl creates an Admin with a custom AccessCtrlMgr This allows injecting a custom access control manager for testing or special use cases
func NewAdminWithAdapters ¶
func NewAdminWithAdapters(ma MetadataAdapter, da DataAdapter, acm AccessCtrlMgr) Admin
NewAdminWithAdapters creates an Admin with custom MetadataAdapter, DataAdapter, and AccessCtrlMgr This provides maximum flexibility for testing or custom implementations
func NewLocalAdmin ¶
NewLocalAdmin creates a LocalAdmin with specified paths
func NewNoAuthAdmin ¶
NewNoAuthAdmin creates an Admin that bypasses all authentication and permission checks This bypasses main database operations (user authentication and ACL permission checks), but bucket database operations (data and object metadata) are still performed normally. This is useful for testing, internal operations, or when authentication is handled externally. The admin uses NoAuthAccessCtrlMgr which always allows all operations without querying the main database.
type BaseMetadataAdapter ¶
type BaseMetadataAdapter interface {
Close()
UserMetadataAdapter
ACLMetadataAdapter
SetBasePath(basePath string)
SetBaseKey(key string)
}
BaseMetadataAdapter manages base metadata (users, ACL) stored in main database Note: BucketMetadataAdapter is now part of DataMetadataAdapter since buckets are stored in bucket databases
type BucketACL ¶
type BucketACL struct {
BktID int64 `borm:"bkt_id" json:"bkt_id,omitempty"` // Bucket ID
UID int64 `borm:"uid" json:"uid,omitempty"` // User ID
Perm int `borm:"perm" json:"perm,omitempty"` // Permission flags (DR, DW, DD, MDR, MDW, MDD, or ALL)
}
BucketACL represents access control list for a bucket
type BucketInfo ¶
type BucketInfo struct {
ID int64 `borm:"id" json:"i,omitempty"` // Bucket ID
Name string `borm:"n" json:"n,omitempty"` // Bucket name
Type int `borm:"t" json:"t,omitempty"` // Bucket type, 0: none, 1: normal ...
Quota int64 `borm:"q" json:"q,omitempty"` // Quota, negative means unlimited
Used int64 `borm:"u" json:"s,omitempty"` // Logical usage, counts original size of all versions
RealUsed int64 `borm:"ru" json:"ru,omitempty"` // Actual physical usage, counts actual stored data size
LogicalUsed int64 `borm:"lu" json:"lu,omitempty"` // Logical occupancy, counts logical size of all valid objects (not deleted, PID >= 0) considering deduplication but excluding deleted objects
DedupSavings int64 `borm:"ds" json:"ds,omitempty"` // Instant upload space savings, counts deduplicated data savings (LogicalUsed - unique data block size)
ChunkSize int64 `borm:"cs" json:"cs,omitempty"` // Chunk size (bytes), must be >0 (defaults to system chunk size)
}
type BucketMetadataAdapter ¶
type BucketMetadataAdapter interface {
PutBkt(c Ctx, o []*BucketInfo) error
DeleteBkt(c Ctx, bktID int64) error
GetBkt(c Ctx, ids []int64) ([]*BucketInfo, error)
ListAllBuckets(c Ctx) ([]*BucketInfo, error) // Get all buckets (for scheduled tasks)
// Update bucket quota and usage
UpdateBktQuota(c Ctx, bktID int64, quota int64) error
// Increase bucket's actual usage (when uploading data)
IncBktRealUsed(c Ctx, bktID int64, size int64) error
// Decrease bucket's actual usage (when deleting data)
DecBktRealUsed(c Ctx, bktID int64, size int64) error
// Increase bucket's logical usage (when creating objects, including instant upload)
IncBktUsed(c Ctx, bktID int64, size int64) error
// Decrease bucket's logical usage (when deleting objects)
DecBktUsed(c Ctx, bktID int64, size int64) error
// Increase bucket's logical occupancy (when creating objects, only count valid objects)
IncBktLogicalUsed(c Ctx, bktID int64, size int64) error
// Decrease bucket's logical occupancy (when deleting objects)
DecBktLogicalUsed(c Ctx, bktID int64, size int64) error
// Increase bucket's deduplication savings (saved data size from instant upload)
IncBktDedupSavings(c Ctx, bktID int64, size int64) error
// Decrease bucket's deduplication savings (when deleting objects)
DecBktDedupSavings(c Ctx, bktID int64, size int64) error
}
type BucketSnapshot ¶
type BucketSnapshot struct {
ID int64 `json:"id"`
BucketID int64 `json:"bucket_id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
CreatedAt int64 `json:"created_at"`
MetadataVersion int64 `json:"metadata_version"` // Metadata version number
SnapshotType SnapshotType `json:"snapshot_type"`
Status SnapshotStatus `json:"status"`
FileCount int64 `json:"file_count"`
TotalSize int64 `json:"total_size"`
IsLazy bool `json:"is_lazy"` // Lazy copy mode
ErrorMessage string `json:"error_message,omitempty"`
}
BucketSnapshot represents a bucket snapshot
type BucketStatsDelta ¶
type BucketStatsDelta struct {
// Bucket info cache (may be nil if not loaded yet)
BucketInfo *BucketInfo `json:"-"` // Full bucket info, cached from database
// Incremental changes (atomic)
Used int64 // Incremental change in Used (atomic)
RealUsed int64 // Incremental change in RealUsed (atomic)
LogicalUsed int64 // Incremental change in LogicalUsed (atomic)
DedupSavings int64 // Incremental change in DedupSavings (instant upload space savings) (atomic)
DataPath string // Data path for this bucket
// contains filtered or unexported fields
}
BucketStatsDelta incremental update for bucket space statistics and bucket info cache
func (*BucketStatsDelta) Add ¶
func (bsd *BucketStatsDelta) Add(used, realUsed, logicalUsed, dedupSavings int64)
Add adds incremental changes using atomic operations
func (*BucketStatsDelta) Get ¶
func (bsd *BucketStatsDelta) Get() (used, realUsed, logicalUsed, dedupSavings int64)
Get retrieves current incremental values (for flushing) and resets them atomically
type Config ¶
type Config struct {
UserName string // Username
Password string // Password
NoAuth bool // If true, bypass authentication and permission checks (no user required)
BasePath string // Base path for metadata (database storage location), if empty uses current directory "."
DataPath string // Data path for file data storage location, if empty uses current directory "."
RefLevel uint32 // Instant upload level setting: REF_LEVEL_OFF (default) / REF_LEVEL_FULL: Ref / REF_LEVEL_FAST: TryRef+Ref
PkgThres uint32 // Package count limit, default 1000 if not set
CmprWay uint32 // Compression method (smart compression by default, decides whether to compress based on file type), see DATA_CMPR_MASK
CmprQlty uint32 // Compression level, br:[0,11], gzip:[-3,9], zstd:[0,10]
EndecWay uint32 // Encryption method, see DATA_ENDEC_MASK
EndecKey string // Encryption KEY, SM4 requires exactly 16 characters, AES256 requires more than 16 characters
DontSync string // Filename wildcards to exclude from sync (https://pkg.go.dev/path/filepath#Match), separated by semicolons
Conflict uint32 // Conflict resolution for same name, CONFLICT_COVER: merge or overwrite / CONFLICT_RENAME: rename / CONFLICT_THROW: throw error / CONFLICT_SKIP: skip
NameTmpl string // Rename suffix, "%s的副本", should contain "%s"
WorkersN uint32 // Concurrent pool size, not less than 16
}
Config represents business layer configuration (not stored in database) These fields are handled at business layer (cmd/vfs), not in bucket config
type CronJobConfig ¶
type CronJobConfig struct {
// ScrubEnabled Whether to enable ScrubData scheduled task
ScrubEnabled bool
// ScrubSchedule Cron expression for ScrubData (format: minute hour day month weekday)
ScrubSchedule string
// ScrubFixOrphaned Whether to automatically fix orphaned data (files without metadata references)
ScrubFixOrphaned bool
// ScrubFixCorrupted Whether to automatically fix corrupted data (metadata without files)
ScrubFixCorrupted bool
// ScrubFixMismatchedChecksum Whether to automatically fix mismatched checksum data
ScrubFixMismatchedChecksum bool
// MergeEnabled Whether to enable MergeDuplicateData scheduled task
MergeEnabled bool
// MergeSchedule Cron expression for MergeDuplicateData
MergeSchedule string
// DeduplicationEnabled Whether to enable offline deduplication job
DeduplicationEnabled bool
// DeduplicationSchedule Cron expression for offline deduplication (default: "0 2 * * *")
DeduplicationSchedule string
// DefragmentEnabled Whether to enable Defragment scheduled task
DefragmentEnabled bool
// DefragmentSchedule Cron expression for Defragment
DefragmentSchedule string
// DefragmentMaxSize Maximum file size for Defragment (files smaller than this will be packed)
DefragmentMaxSize int64
// DefragmentAccessWindow Access window time for Defragment (seconds)
DefragmentAccessWindow int64
// DefragmentThreshold Space usage threshold for Defragment (percentage, 0-100)
// Defragmentation will only be performed when fragmentation rate ((Used - RealUsed) / Used * 100) reaches this threshold
DefragmentThreshold int64
}
CronJobConfig Scheduled task configuration
func GetCronJobConfig ¶
func GetCronJobConfig() CronJobConfig
GetCronJobConfig Get scheduled task configuration
type CronSchedule ¶
type CronSchedule struct {
Minute []int // Minute (0-59)
Hour []int // Hour (0-23)
Day []int // Day (1-31)
Month []int // Month (1-12)
Weekday []int // Weekday (0-6, 0=Sunday)
}
CronSchedule parses cron expressions and determines if execution is needed Format: minute hour day month weekday Supports wildcard * and number ranges
func ParseCronSchedule ¶
func ParseCronSchedule(schedule string) (*CronSchedule, error)
ParseCronSchedule parses cron expression Format: minute hour day month weekday Example: "0 2 * * *" means every day at 2:00 AM Example: "0 4 * * 0" means every Sunday at 4:00 AM
type CronScheduler ¶
type CronScheduler struct {
// contains filtered or unexported fields
}
CronScheduler 定时任务调度器
func NewCronScheduler ¶
func NewCronScheduler(ctx context.Context, config CronJobConfig, h Handler, ma MetadataAdapter, da DataAdapter) *CronScheduler
NewCronScheduler 创建定时任务调度器
type Ctx ¶
func BucketInfo2Ctx ¶
func BucketInfo2Ctx(c Ctx, bkt *BucketInfo) Ctx
BucketInfo2Ctx sets bucket information to context, including bucket key This allows GetDB to use bucket's key for database encryption
func UserInfo2Ctx ¶
type DBConnectionType ¶
type DBConnectionType int
DBConnectionType represents the type of database connection
const ( // DBRead represents a read-only connection DBRead DBConnectionType = iota // DBWrite represents a write connection DBWrite )
type DBPool ¶
type DBPool struct {
// contains filtered or unexported fields
}
DBPool manages database connection pools with read/write separation
func (*DBPool) Close ¶
func (dp *DBPool) Close()
Close closes all database connections in the pool CRITICAL: This forcefully closes all connections regardless of reference count Use this when you need to ensure all database files are released This allows external components (like WAL checkpoint manager) to stop first
func (*DBPool) ForceReleaseDB ¶
ForceReleaseDB forcefully releases a database pool regardless of reference count This is useful when you need to ensure all connections are closed
func (*DBPool) GetDB ¶
GetDB gets a database connection from the pool connType specifies whether to use read or write connection
func (*DBPool) GetDBStats ¶
GetDBStats returns statistics about the connection pool
type DataAdapter ¶
type DataAdapter interface {
Close()
SetDataPath(dataPath string)
Write(c Ctx, bktID, dataID int64, sn int, buf []byte) error
Read(c Ctx, bktID, dataID int64, sn int) ([]byte, error)
ReadBytes(c Ctx, bktID, dataID int64, sn, offset, size int) ([]byte, error)
// Delete deletes a specific data chunk (DataID + sn)
// If the chunk doesn't exist, it returns nil (no error)
Delete(c Ctx, bktID, dataID int64, sn int) error
}
type DataInfo ¶
type DataInfo struct {
ID int64 `borm:"id" json:"i,omitempty"` // Data ID (randomly generated by idgen)
Size int64 `borm:"s" json:"s,omitempty"` // Data size (compressed/encrypted)
OrigSize int64 `borm:"os" json:"r,omitempty"` // Original data size (before compression/encryption)
HdrXXH3 int64 `borm:"h" json:"h,omitempty"` // XXHash3-64bit checksum of first 100KB
XXH3 int64 `borm:"x" json:"x,omitempty"` // XXHash3-64bit checksum of entire original data
SHA256_0 int64 `borm:"s0" json:"s0,omitempty"` // SHA-256 hash of entire original data (bytes 0-7)
SHA256_1 int64 `borm:"s1" json:"s1,omitempty"` // SHA-256 hash of entire original data (bytes 8-15)
SHA256_2 int64 `borm:"s2" json:"s2,omitempty"` // SHA-256 hash of entire original data (bytes 16-23)
SHA256_3 int64 `borm:"s3" json:"s3,omitempty"` // SHA-256 hash of entire original data (bytes 24-31)
Kind uint32 `borm:"k" json:"k,omitempty"` // Data status: normal, corrupted, encrypted, compressed, type (for preview, etc.)
PkgID int64 `borm:"pi" json:"p,omitempty"` // Package data ID (also generated by idgen)
PkgOffset uint32 `borm:"po" json:"g,omitempty"` // Offset position in package data
}
func EmptyDataInfo ¶
func EmptyDataInfo() *DataInfo
type DataInfoMetadataAdapter ¶
type DataInfoMetadataAdapter interface {
RefData(c Ctx, bktID int64, d []*DataInfo) ([]int64, error)
PutData(c Ctx, bktID int64, d []*DataInfo) error
GetData(c Ctx, bktID, id int64) (*DataInfo, error)
// PutDataAndObj writes both DataInfo and ObjectInfo in a single transaction
// This optimization reduces database round trips
PutDataAndObj(c Ctx, bktID int64, d []*DataInfo, o []*ObjectInfo) error
ListAllData(c Ctx, bktID int64, offset, limit int) ([]*DataInfo, int64, error) // offset: offset, limit: page size, returns data and total count
// Find duplicate data: returns DataID groups with same checksums (OrigSize, HdrXXH3, XXH3, SHA256 all same)
FindDuplicateData(c Ctx, bktID int64, offset, limit int) ([]DuplicateGroup, int64, error) // Returns duplicate data groups and total count
// Update object's DataID reference
UpdateObjDataID(c Ctx, bktID int64, oldDataID, newDataID int64) error
// Delete data metadata
DeleteData(c Ctx, bktID int64, dataIDs []int64) error
// Find small file data that can be packaged (for defragmentation)
FindSmallPackageData(c Ctx, bktID int64, maxSize int64, offset, limit int) ([]*DataInfo, int64, error)
}
DataMetadataAdapter manages data metadata stored in bucket databases DataInfoMetadataAdapter manages data metadata stored in bucket databases
type DataMetadataAdapter ¶
type DataMetadataAdapter interface {
Close()
BucketMetadataAdapter // Buckets are stored in bucket databases, not main database
DataInfoMetadataAdapter
ObjectMetadataAdapter
SetDataPath(dataPath string)
SetDataKey(key string)
}
DataMetadataAdapter manages data and object metadata stored in bucket databases Also manages bucket metadata since buckets are stored in bucket databases (dataPath)
type DatabasePool ¶
type DatabasePool struct {
// contains filtered or unexported fields
}
DatabasePool manages read and write connection pools for a single database
type DeduplicationConfig ¶
type DeduplicationConfig struct {
Enabled bool `json:"enabled"` // Enable deduplication job
Schedule string `json:"schedule"` // Cron expression, default "0 2 * * *" (2:00 AM daily)
PageSize int `json:"page_size"` // Page size for processing, default 100
BatchInterval time.Duration `json:"batch_interval"` // Interval between batches, default 100ms
MaxDuration time.Duration `json:"max_duration"` // Maximum execution time, default 2 hours
MaxItemsPerSecond int `json:"max_items_per_second"` // Max items per second, default 100
ConcurrentBuckets int `json:"concurrent_buckets"` // Number of buckets to process concurrently, default 1
}
DeduplicationConfig configuration for deduplication jobs
func DefaultDeduplicationConfig ¶
func DefaultDeduplicationConfig() DeduplicationConfig
DefaultDeduplicationConfig returns default configuration
func GetDeduplicationConfig ¶
func GetDeduplicationConfig() DeduplicationConfig
GetDeduplicationConfig returns the current deduplication configuration TODO: Load from actual configuration file/database
type DeduplicationJobResult ¶
type DeduplicationJobResult struct {
StartTime time.Time `json:"start_time"` // Start time
EndTime time.Time `json:"end_time"` // End time
Duration time.Duration `json:"duration"` // Execution duration
BucketsScanned int `json:"buckets_scanned"` // Number of buckets scanned
BucketResults map[int64]*MergeDuplicateResult `json:"bucket_results"` // Results for each bucket
TotalFreedSize int64 `json:"total_freed_size"` // Total freed space (bytes)
TotalMergedData int `json:"total_merged_data"` // Total merged data count
Errors []string `json:"errors"` // Error list
}
DeduplicationJobResult result of deduplication job
func GetDeduplicationJobStatus ¶
func GetDeduplicationJobStatus() (running bool, lastResult *DeduplicationJobResult, lastTime time.Time)
GetDeduplicationJobStatus returns the status of the last deduplication job
func RunDeduplicationJob ¶
func RunDeduplicationJob(ctx context.Context, ma MetadataAdapter, da DataAdapter) (*DeduplicationJobResult, error)
RunDeduplicationJob runs deduplication job for all buckets This is the main entry point for scheduled deduplication jobs It iterates through all buckets and calls MergeDuplicateData for each
type DefaultAccessCtrlMgr ¶
type DefaultAccessCtrlMgr struct {
// contains filtered or unexported fields
}
func (*DefaultAccessCtrlMgr) CheckOwn ¶
func (dacm *DefaultAccessCtrlMgr) CheckOwn(c Ctx, bktID int64) error
CheckOwn Check if the user has access to the bucket (via ACL) This is equivalent to checking if user has any permission (ALL)
func (*DefaultAccessCtrlMgr) CheckPermission ¶
func (dacm *DefaultAccessCtrlMgr) CheckPermission(c Ctx, action int, bktID int64) error
func (*DefaultAccessCtrlMgr) CheckRole ¶
func (dacm *DefaultAccessCtrlMgr) CheckRole(c Ctx, role uint32) error
func (*DefaultAccessCtrlMgr) SetAdapter ¶
func (dacm *DefaultAccessCtrlMgr) SetAdapter(ma MetadataAdapter)
type DefaultBaseMetadataAdapter ¶
type DefaultBaseMetadataAdapter struct {
// contains filtered or unexported fields
}
DefaultBaseMetadataAdapter implements BaseMetadataAdapter
func (*DefaultBaseMetadataAdapter) CheckPermission ¶
func (dba *DefaultBaseMetadataAdapter) CheckPermission(c Ctx, bktID int64, uid int64, action int) (bool, error)
CheckPermission checks if a user has the required permission for a bucket Returns true if the user's ACL permission covers the required action
func (*DefaultBaseMetadataAdapter) Close ¶
func (dba *DefaultBaseMetadataAdapter) Close()
func (*DefaultBaseMetadataAdapter) DeleteACL ¶
func (dba *DefaultBaseMetadataAdapter) DeleteACL(c Ctx, bktID int64, uid int64) error
func (*DefaultBaseMetadataAdapter) DeleteAllACL ¶
func (dba *DefaultBaseMetadataAdapter) DeleteAllACL(c Ctx, bktID int64) error
func (*DefaultBaseMetadataAdapter) DeleteBkt ¶
func (dba *DefaultBaseMetadataAdapter) DeleteBkt(c Ctx, bktID int64) error
func (*DefaultBaseMetadataAdapter) DeleteUser ¶
func (dba *DefaultBaseMetadataAdapter) DeleteUser(c Ctx, userID int64) error
func (*DefaultBaseMetadataAdapter) GetBkt ¶
func (dba *DefaultBaseMetadataAdapter) GetBkt(c Ctx, ids []int64) (o []*BucketInfo, err error)
func (*DefaultBaseMetadataAdapter) GetUsr ¶
func (dba *DefaultBaseMetadataAdapter) GetUsr(c Ctx, ids []int64) (o []*UserInfo, err error)
func (*DefaultBaseMetadataAdapter) GetUsr2 ¶
func (dba *DefaultBaseMetadataAdapter) GetUsr2(c Ctx, usr string) (o *UserInfo, err error)
func (*DefaultBaseMetadataAdapter) ListACL ¶
func (dba *DefaultBaseMetadataAdapter) ListACL(c Ctx, bktID int64) ([]*BucketACL, error)
func (*DefaultBaseMetadataAdapter) ListACLByUser ¶
func (dba *DefaultBaseMetadataAdapter) ListACLByUser(c Ctx, uid int64) ([]*BucketACL, error)
func (*DefaultBaseMetadataAdapter) ListUsers ¶
func (dba *DefaultBaseMetadataAdapter) ListUsers(c Ctx) (o []*UserInfo, err error)
func (*DefaultBaseMetadataAdapter) PutACL ¶
ACLMetadataManager implementation ACL is stored in main database, not in bucket database
func (*DefaultBaseMetadataAdapter) PutBkt ¶
func (dba *DefaultBaseMetadataAdapter) PutBkt(c Ctx, o []*BucketInfo) error
func (*DefaultBaseMetadataAdapter) PutUsr ¶
func (dba *DefaultBaseMetadataAdapter) PutUsr(c Ctx, u *UserInfo) error
func (*DefaultBaseMetadataAdapter) SetBaseKey ¶
func (dba *DefaultBaseMetadataAdapter) SetBaseKey(key string)
SetBaseKey sets the encryption key for the adapter (for main database) Note: This is kept for API compatibility but not used for database connections anymore Database encryption is no longer used; only name encryption via dataDBKey is supported
func (*DefaultBaseMetadataAdapter) SetBasePath ¶
func (dba *DefaultBaseMetadataAdapter) SetBasePath(basePath string)
SetBasePath sets the base path for the adapter (for main database)
func (*DefaultBaseMetadataAdapter) SetPath ¶
func (dba *DefaultBaseMetadataAdapter) SetPath(basePath string)
SetPath sets the base path for the adapter (for main database)
func (*DefaultBaseMetadataAdapter) SetUsr ¶
func (dba *DefaultBaseMetadataAdapter) SetUsr(c Ctx, fields []string, u *UserInfo) error
func (*DefaultBaseMetadataAdapter) UpdateBktQuota ¶
func (dba *DefaultBaseMetadataAdapter) UpdateBktQuota(c Ctx, bktID int64, quota int64) error
type DefaultDataAdapter ¶
type DefaultDataAdapter struct {
// contains filtered or unexported fields
}
func (*DefaultDataAdapter) CleanupUnreferencedData ¶
func (dda *DefaultDataAdapter) CleanupUnreferencedData(ctx Ctx, bktID int64) error
CleanupUnreferencedData cleans up unreferenced data blocks
func (*DefaultDataAdapter) Close ¶
func (dda *DefaultDataAdapter) Close()
func (*DefaultDataAdapter) CopyData ¶
func (dda *DefaultDataAdapter) CopyData(ctx Ctx, bktID, srcDataID, dstDataID int64) error
CopyData copies data from one ID to another
func (*DefaultDataAdapter) DecrementSnapshotDataRefs ¶
func (dda *DefaultDataAdapter) DecrementSnapshotDataRefs(ctx Ctx, snapshotID int64) error
DecrementSnapshotDataRefs decrements reference counts for snapshot data
func (*DefaultDataAdapter) Delete ¶
func (dda *DefaultDataAdapter) Delete(c Ctx, bktID, dataID int64, sn int) error
Delete deletes a specific data chunk (DataID + sn) If the chunk doesn't exist, it returns nil (no error)
func (*DefaultDataAdapter) IncrementSnapshotDataRefs ¶
func (dda *DefaultDataAdapter) IncrementSnapshotDataRefs(ctx Ctx, snapshotID int64) error
IncrementSnapshotDataRefs increments reference counts for snapshot data
func (*DefaultDataAdapter) IncrementSnapshotDataRefsTx ¶
func (dda *DefaultDataAdapter) IncrementSnapshotDataRefsTx(ctx Ctx, tx Transaction, snapshotID int64) error
IncrementSnapshotDataRefsTx increments reference counts within a transaction
func (*DefaultDataAdapter) MarkDataAsShared ¶
func (dda *DefaultDataAdapter) MarkDataAsShared(ctx Ctx, bktID, snapshotID int64) error
MarkDataAsShared marks data as shared (for COW)
func (*DefaultDataAdapter) MarkDataAsSharedTx ¶
func (dda *DefaultDataAdapter) MarkDataAsSharedTx(ctx Ctx, tx Transaction, bktID, snapshotID int64) error
MarkDataAsSharedTx marks data as shared within a transaction
func (*DefaultDataAdapter) SetDataPath ¶
func (dda *DefaultDataAdapter) SetDataPath(dataPath string)
SetDataPath sets the data path for the adapter
func (*DefaultDataAdapter) WriteWithCOW ¶
func (dda *DefaultDataAdapter) WriteWithCOW(ctx Ctx, bktID, objID int64, data []byte) (int64, error)
WriteWithCOW writes data with copy-on-write semantics
type DefaultDataMetadataAdapter ¶
type DefaultDataMetadataAdapter struct {
// contains filtered or unexported fields
}
DefaultDataMetadataAdapter implements DataMetadataAdapter
func (*DefaultDataMetadataAdapter) Close ¶
func (dda *DefaultDataMetadataAdapter) Close()
func (*DefaultDataMetadataAdapter) ListAllBuckets ¶
func (dda *DefaultDataMetadataAdapter) ListAllBuckets(c Ctx) (o []*BucketInfo, err error)
func (*DefaultDataMetadataAdapter) SetDataKey ¶
func (dda *DefaultDataMetadataAdapter) SetDataKey(key string)
SetDataKey sets the encryption key for the adapter (for name encryption only)
func (*DefaultDataMetadataAdapter) SetDataPath ¶
func (dda *DefaultDataMetadataAdapter) SetDataPath(dataPath string)
SetDataPath sets the data path for the adapter (for bucket databases and data files)
func (*DefaultDataMetadataAdapter) SetPath ¶
func (dda *DefaultDataMetadataAdapter) SetPath(dataPath string)
SetPath sets the data path for the adapter (for bucket databases and data files)
type DefaultMetadataAdapter ¶
type DefaultMetadataAdapter struct {
*DefaultBaseMetadataAdapter
*DefaultDataMetadataAdapter
}
DefaultMetadataAdapter combines BaseMetadataAdapter and DataMetadataAdapter for backward compatibility
func NewDefaultMetadataAdapter ¶
func NewDefaultMetadataAdapter() *DefaultMetadataAdapter
func (*DefaultMetadataAdapter) BeginTransaction ¶
func (dma *DefaultMetadataAdapter) BeginTransaction(ctx Ctx) (Transaction, error)
BeginTransaction begins a database transaction
func (*DefaultMetadataAdapter) Close ¶
func (dma *DefaultMetadataAdapter) Close()
func (*DefaultMetadataAdapter) CopyObjectsToSnapshot ¶
func (dma *DefaultMetadataAdapter) CopyObjectsToSnapshot(ctx Ctx, snapshotID, bktID int64) (fileCount, totalSize int64, err error)
CopyObjectsToSnapshot copies objects to snapshot table
func (*DefaultMetadataAdapter) CopyObjectsToSnapshotTx ¶
func (dma *DefaultMetadataAdapter) CopyObjectsToSnapshotTx(ctx Ctx, tx Transaction, snapshotID, bktID int64) (fileCount, totalSize int64, err error)
CopyObjectsToSnapshotTx copies objects to snapshot table within a transaction
func (*DefaultMetadataAdapter) CountDataRefs ¶
func (*DefaultMetadataAdapter) DecBktDedupSavings ¶
func (dma *DefaultMetadataAdapter) DecBktDedupSavings(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) DecBktLogicalUsed ¶
func (dma *DefaultMetadataAdapter) DecBktLogicalUsed(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) DecBktRealUsed ¶
func (dma *DefaultMetadataAdapter) DecBktRealUsed(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) DecBktUsed ¶
func (dma *DefaultMetadataAdapter) DecBktUsed(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) DeleteBkt ¶
func (dma *DefaultMetadataAdapter) DeleteBkt(c Ctx, bktID int64) error
DeleteBkt overrides DefaultBaseMetadataAdapter.DeleteBkt to access dataPath from DefaultDataMetadataAdapter
func (*DefaultMetadataAdapter) DeleteBucketSnapshot ¶
func (dma *DefaultMetadataAdapter) DeleteBucketSnapshot(ctx Ctx, snapshotID int64) error
DeleteBucketSnapshot deletes a snapshot
func (*DefaultMetadataAdapter) DeleteData ¶
func (dma *DefaultMetadataAdapter) DeleteData(c Ctx, bktID int64, dataIDs []int64) error
func (*DefaultMetadataAdapter) DeleteObj ¶
func (dma *DefaultMetadataAdapter) DeleteObj(c Ctx, bktID int64, id int64) error
func (*DefaultMetadataAdapter) DeleteSnapshotObjects ¶
func (dma *DefaultMetadataAdapter) DeleteSnapshotObjects(ctx Ctx, snapshotID int64) error
DeleteSnapshotObjects deletes snapshot objects
func (*DefaultMetadataAdapter) FindDuplicateData ¶
func (dma *DefaultMetadataAdapter) FindDuplicateData(c Ctx, bktID int64, offset, limit int) (groups []DuplicateGroup, total int64, err error)
func (*DefaultMetadataAdapter) FindSmallPackageData ¶
func (*DefaultMetadataAdapter) GetAttr ¶
func (dma *DefaultMetadataAdapter) GetAttr(c Ctx, bktID int64, objID int64, key string) ([]byte, error)
GetAttr gets an extended attribute value for an object
func (*DefaultMetadataAdapter) GetBkt ¶
func (dma *DefaultMetadataAdapter) GetBkt(c Ctx, ids []int64) (o []*BucketInfo, err error)
GetBkt overrides DefaultBaseMetadataAdapter.GetBkt to access dataPath from DefaultDataMetadataAdapter
func (*DefaultMetadataAdapter) GetBucketSnapshot ¶
func (dma *DefaultMetadataAdapter) GetBucketSnapshot(ctx Ctx, snapshotID int64) (*BucketSnapshot, error)
GetBucketSnapshot gets a snapshot by ID
func (*DefaultMetadataAdapter) GetBucketSnapshotByName ¶
func (dma *DefaultMetadataAdapter) GetBucketSnapshotByName(ctx Ctx, bktID int64, name string) (*BucketSnapshot, error)
GetBucketSnapshotByName gets a snapshot by bucket ID and name
func (*DefaultMetadataAdapter) GetData ¶
func (dma *DefaultMetadataAdapter) GetData(c Ctx, bktID, id int64) (d *DataInfo, err error)
func (*DefaultMetadataAdapter) GetObj ¶
func (dma *DefaultMetadataAdapter) GetObj(c Ctx, bktID int64, ids []int64) (o []*ObjectInfo, err error)
func (*DefaultMetadataAdapter) GetObjByDataID ¶
func (dma *DefaultMetadataAdapter) GetObjByDataID(c Ctx, bktID int64, dataID int64) ([]*ObjectInfo, error)
func (*DefaultMetadataAdapter) GetObjectsByVersion ¶
func (dma *DefaultMetadataAdapter) GetObjectsByVersion(ctx Ctx, bktID, version int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
GetObjectsByVersion gets objects by metadata version (for lazy snapshots)
func (*DefaultMetadataAdapter) GetSnapshotObjects ¶
func (dma *DefaultMetadataAdapter) GetSnapshotObjects(ctx Ctx, snapshotID int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
GetSnapshotObjects gets objects in a snapshot
func (*DefaultMetadataAdapter) IncBktDedupSavings ¶
func (dma *DefaultMetadataAdapter) IncBktDedupSavings(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) IncBktLogicalUsed ¶
func (dma *DefaultMetadataAdapter) IncBktLogicalUsed(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) IncBktRealUsed ¶
func (dma *DefaultMetadataAdapter) IncBktRealUsed(c Ctx, bktID int64, size int64) error
Override methods in DefaultMetadataAdapter to pass dataPath
func (*DefaultMetadataAdapter) IncBktUsed ¶
func (dma *DefaultMetadataAdapter) IncBktUsed(c Ctx, bktID int64, size int64) error
func (*DefaultMetadataAdapter) InsertBucketSnapshot ¶
func (dma *DefaultMetadataAdapter) InsertBucketSnapshot(ctx Ctx, snapshot *BucketSnapshot) (int64, error)
InsertBucketSnapshot inserts a new bucket snapshot
func (*DefaultMetadataAdapter) InsertBucketSnapshotTx ¶
func (dma *DefaultMetadataAdapter) InsertBucketSnapshotTx(ctx Ctx, tx Transaction, snapshot *BucketSnapshot) (int64, error)
InsertBucketSnapshotTx inserts a new bucket snapshot within a transaction
func (*DefaultMetadataAdapter) ListAllData ¶
func (*DefaultMetadataAdapter) ListAttrs ¶
ListAttrs lists all extended attribute keys for an object
func (*DefaultMetadataAdapter) ListBucketSnapshots ¶
func (dma *DefaultMetadataAdapter) ListBucketSnapshots(ctx Ctx, bktID int64, limit, offset int) ([]*BucketSnapshot, error)
ListBucketSnapshots lists snapshots for a bucket
func (*DefaultMetadataAdapter) ListChildren ¶
func (dma *DefaultMetadataAdapter) ListChildren(c Ctx, bktID int64, pid int64, offset, limit int) ([]*ObjectInfo, int64, error)
func (*DefaultMetadataAdapter) ListDeletedObjs ¶
func (dma *DefaultMetadataAdapter) ListDeletedObjs(c Ctx, bktID int64, beforeTime int64, limit int) ([]*ObjectInfo, error)
func (*DefaultMetadataAdapter) ListObj ¶
func (dma *DefaultMetadataAdapter) ListObj(c Ctx, bktID, pid int64, wd, delim, order string, count int) (o []*ObjectInfo, cnt int64, d string, err error, )
func (*DefaultMetadataAdapter) ListObjsByType ¶
func (dma *DefaultMetadataAdapter) ListObjsByType(c Ctx, bktID int64, objType int, offset, limit int) ([]*ObjectInfo, int64, error)
func (*DefaultMetadataAdapter) ListRecycleBin ¶
func (dma *DefaultMetadataAdapter) ListRecycleBin(c Ctx, bktID int64, opt ListOptions) (o []*ObjectInfo, cnt int64, d string, err error)
func (*DefaultMetadataAdapter) ListVersions ¶
func (dma *DefaultMetadataAdapter) ListVersions(c Ctx, bktID int64, fileID int64) ([]*ObjectInfo, error)
func (*DefaultMetadataAdapter) MarkBucketObjectsAsDeleted ¶
func (dma *DefaultMetadataAdapter) MarkBucketObjectsAsDeleted(ctx Ctx, bktID int64) error
MarkBucketObjectsAsDeleted marks all objects in a bucket as deleted
func (*DefaultMetadataAdapter) MarkBucketObjectsAsDeletedTx ¶
func (dma *DefaultMetadataAdapter) MarkBucketObjectsAsDeletedTx(ctx Ctx, tx Transaction, bktID int64) error
MarkBucketObjectsAsDeletedTx marks all objects in a bucket as deleted within a transaction
func (*DefaultMetadataAdapter) PutBkt ¶
func (dma *DefaultMetadataAdapter) PutBkt(c Ctx, o []*BucketInfo) error
PutBkt overrides DefaultBaseMetadataAdapter.PutBkt to access dataPath from DefaultDataMetadataAdapter
func (*DefaultMetadataAdapter) PutData ¶
func (dma *DefaultMetadataAdapter) PutData(c Ctx, bktID int64, d []*DataInfo) error
func (*DefaultMetadataAdapter) PutDataAndObj ¶
func (dma *DefaultMetadataAdapter) PutDataAndObj(c Ctx, bktID int64, d []*DataInfo, o []*ObjectInfo) error
PutDataAndObj writes both DataInfo and ObjectInfo in a single transaction This optimization reduces database round trips by combining two separate writes
func (*DefaultMetadataAdapter) PutObj ¶
func (dma *DefaultMetadataAdapter) PutObj(c Ctx, bktID int64, o []*ObjectInfo) (ids []int64, err error)
func (*DefaultMetadataAdapter) RemoveAttr ¶
RemoveAttr removes an extended attribute from an object
func (*DefaultMetadataAdapter) RestoreObjectsFromSnapshot ¶
func (dma *DefaultMetadataAdapter) RestoreObjectsFromSnapshot(ctx Ctx, snapshotID, targetBktID int64) error
RestoreObjectsFromSnapshot restores objects from a snapshot
func (*DefaultMetadataAdapter) RestoreObjectsFromSnapshotTx ¶
func (dma *DefaultMetadataAdapter) RestoreObjectsFromSnapshotTx(ctx Ctx, tx Transaction, snapshotID, targetBktID int64) error
RestoreObjectsFromSnapshotTx restores objects from a snapshot within a transaction
func (*DefaultMetadataAdapter) SetAttr ¶
func (dma *DefaultMetadataAdapter) SetAttr(c Ctx, bktID int64, objID int64, key string, value []byte) error
SetAttr sets an extended attribute value for an object
func (*DefaultMetadataAdapter) SetBaseKey ¶
func (dma *DefaultMetadataAdapter) SetBaseKey(key string)
SetBaseKey sets the encryption key for the main database
func (*DefaultMetadataAdapter) SetBasePath ¶
func (dma *DefaultMetadataAdapter) SetBasePath(basePath string)
SetBasePath sets the base path (for main database)
func (*DefaultMetadataAdapter) SetDataKey ¶
func (dma *DefaultMetadataAdapter) SetDataKey(key string)
SetDataKey sets the encryption key for bucket databases
func (*DefaultMetadataAdapter) SetDataPath ¶
func (dma *DefaultMetadataAdapter) SetDataPath(dataPath string)
SetDataPath sets the data path (for bucket databases and data files)
func (*DefaultMetadataAdapter) SetObj ¶
func (dma *DefaultMetadataAdapter) SetObj(c Ctx, bktID int64, fields []string, o *ObjectInfo) error
func (*DefaultMetadataAdapter) UpdateBktQuota ¶
func (dma *DefaultMetadataAdapter) UpdateBktQuota(c Ctx, bktID int64, quota int64) error
UpdateBktQuota overrides DefaultBaseMetadataAdapter.UpdateBktQuota to access dataPath from DefaultDataMetadataAdapter
func (*DefaultMetadataAdapter) UpdateObjDataID ¶
func (dma *DefaultMetadataAdapter) UpdateObjDataID(c Ctx, bktID int64, oldDataID, newDataID int64) error
func (*DefaultMetadataAdapter) UpdateSnapshotStats ¶
func (dma *DefaultMetadataAdapter) UpdateSnapshotStats(ctx Ctx, snapshotID int64, fileCount, totalSize int64, status SnapshotStatus) error
UpdateSnapshotStats updates snapshot statistics
func (*DefaultMetadataAdapter) UpdateSnapshotStatsTx ¶
func (dma *DefaultMetadataAdapter) UpdateSnapshotStatsTx(ctx Ctx, tx Transaction, snapshotID int64, fileCount, totalSize int64, status SnapshotStatus) error
UpdateSnapshotStatsTx updates snapshot statistics within a transaction
func (*DefaultMetadataAdapter) UpdateSnapshotStatus ¶
func (dma *DefaultMetadataAdapter) UpdateSnapshotStatus(ctx Ctx, snapshotID int64, status SnapshotStatus) error
UpdateSnapshotStatus updates snapshot status
type DefragmentResult ¶
type DefragmentResult struct {
PackedGroups int64 `json:"packed_groups"` // 打包的组数
PackedFiles int64 `json:"packed_files"` // 打包的文件数
CompactedPkgs int64 `json:"compacted_pkgs"` // 整理的打包文件数
FreedSize int64 `json:"freed_size"` // 释放的空间大小(字节)
SkippedInUse int64 `json:"skipped_in_use"` // 跳过正在使用的数据数
}
func Defragment ¶
func Defragment(c Ctx, bktID int64, a Admin, ma MetadataAdapter, da DataAdapter) (*DefragmentResult, error)
Defragment performs defragmentation: offline merge and pack small files, move remaining data blocks forward in packages Strategy: 1. First, find holes (gaps from deleted data blocks) in existing package files and fill them with small files 2. Then, pack remaining small files into new package files Benefits of hole-filling approach: - Reuses space in existing package files, reducing new package file creation - Reduces fragmentation by consolidating data - More efficient than repacking everything: only processes files that fit in holes Evaluation: Hole-filling is beneficial when: - Package files have significant fragmentation (many deleted blocks) - Small files can efficiently fill existing holes - Reduces total I/O compared to full repacking maxSize: Maximum file size (files smaller than this will be packed) accessWindow: Access window time (seconds), reserved parameter. Currently only uses reference count to determine if data is in use. If data has references (refCount > 0), skip. If access time field is added in future, can use this parameter. Ensures only one defragmentation operation is executing for the same bktID at a time
type DirtyDataResult ¶
type DirtyDataResult struct {
IncompleteChunks []int64 `json:"incomplete_chunks"` // 分片不完整的数据(上传过程中断电)
UnreadableData []int64 `json:"unreadable_data"` // 无法读取的数据文件
}
func ScanDirtyData ¶
func ScanDirtyData(c Ctx, bktID int64, ma MetadataAdapter, da DataAdapter) (*DirtyDataResult, error)
ScanDirtyData scans dirty data (incomplete data caused by power failure, upload failure) Main detection: 1. Incomplete chunk data (some chunks missing) 2. Unreadable data files (file exists but read fails)
type DuplicateGroup ¶
type DuplicateGroup struct {
OrigSize int64 // Original data size
HdrXXH3 int64 // XXHash3-64bit checksum of first 100KB (as int64)
XXH3 int64 // XXHash3-64bit checksum of entire original data (as int64)
SHA256_0 int64 // SHA-256 hash bytes 0-7
SHA256_1 int64 // SHA-256 hash bytes 8-15
SHA256_2 int64 // SHA-256 hash bytes 16-23
SHA256_3 int64 // SHA-256 hash bytes 24-31
DataIDs []int64 // List of DataIDs with same checksums
}
DuplicateGroup represents a group of duplicate data with same checksums
type FixScrubIssuesResult ¶
type FixScrubIssuesResult struct {
FixedCorrupted int `json:"fixed_corrupted"` // 修复的损坏数据数
FixedOrphaned int `json:"fixed_orphaned"` // 修复的孤立数据数
FixedMismatchedChecksum int `json:"fixed_mismatched_checksum"` // 修复的校验和不匹配数据数
FreedSize int64 `json:"freed_size"` // 释放的空间大小(字节)
Errors []string `json:"errors"` // 修复过程中的错误信息
}
func FixScrubIssues ¶
func FixScrubIssues(c Ctx, bktID int64, result *ScrubResult, ma MetadataAdapter, da DataAdapter, options struct { FixCorrupted bool FixOrphaned bool FixMismatchedChecksum bool }, ) (*FixScrubIssuesResult, error)
FixScrubIssues fixes issues detected by ScrubData Based on ScrubResult, can selectively fix different types of issues options:
- FixCorrupted: whether to fix corrupted data (delete metadata without files), default false
- FixOrphaned: whether to fix orphaned data (delete files without references), default false
- FixMismatchedChecksum: whether to fix mismatched checksum data (delete corrupted data), default false
Returns fix statistics
type Handler ¶
type Handler interface {
// 传入underlying,返回当前的,构成链式调用
New(h Handler) Handler
Close()
// 设置自定义的存储适配器
SetAdapter(ma MetadataAdapter, da DataAdapter)
SetAccessCtrlMgr(acm AccessCtrlMgr)
MetadataAdapter() MetadataAdapter
DataAdapter() DataAdapter
AccessCtrlMgr() AccessCtrlMgr
// 登录用户
Login(c Ctx, usr, pwd string) (Ctx, *UserInfo, []*BucketInfo, error)
// 只有文件长度、HdrXXH3是预Ref,如果成功返回新DataID,失败返回0
// 有文件长度、XXH3、SHA-256,成功返回引用的DataID,失败返回0,客户端发现DataID有变化,说明不需要上传数据
// 如果非预Ref DataID传0,说明跳过了预Ref
Ref(c Ctx, bktID int64, d []*DataInfo) ([]int64, error)
// PutData uploads data chunk, sn starts from 0, if dataID is 0 a new one will be created
PutData(c Ctx, bktID, dataID int64, sn int, buf []byte) (int64, error)
// GetData reads data: one param means sn, two params mean sn+offset, three params mean sn+offset+size
GetData(c Ctx, bktID, id int64, sn int, offsetOrSize ...int) ([]byte, error)
// 上传元数据
PutDataInfo(c Ctx, bktID int64, d []*DataInfo) ([]int64, error)
// PutDataInfoAndObj writes both DataInfo and ObjectInfo in a single transaction
// This optimization reduces database round trips for better performance
PutDataInfoAndObj(c Ctx, bktID int64, d []*DataInfo, o []*ObjectInfo) error
// 获取数据信息
GetDataInfo(c Ctx, bktID, id int64) (*DataInfo, error)
// Name不传默认用ID字符串化后的值作为Name
Put(c Ctx, bktID int64, o []*ObjectInfo) ([]int64, error)
Get(c Ctx, bktID int64, ids []int64) ([]*ObjectInfo, error)
List(c Ctx, bktID, pid int64, opt ListOptions) (o []*ObjectInfo, cnt int64, delim string, err error)
Rename(c Ctx, bktID, id int64, name string) error
MoveTo(c Ctx, bktID, id, pid int64) error
// Recycle marks object as deleted to recycle bin
// During garbage collection: data without metadata reference is dirty data (need window time)
// Metadata without data is corrupted data
Recycle(c Ctx, bktID, id int64) error
// Delete permanently deletes object
Delete(c Ctx, bktID, id int64) error
// DeleteBatch permanently deletes multiple objects in batch for better performance
// Returns a map of objID -> error for any failures (nil map means all succeeded)
DeleteBatch(c Ctx, bktID int64, ids []int64) map[int64]error
// CleanRecycleBin physically deletes objects marked as deleted in recycle bin
CleanRecycleBin(c Ctx, bktID int64, targetID int64) error
// ListRecycleBin lists objects in recycle bin
ListRecycleBin(c Ctx, bktID int64, opt ListOptions) (o []*ObjectInfo, cnt int64, delim string, err error)
// UpdateFileLatestVersion updates all files' latest version DataID, update time, size and directory size
UpdateFileLatestVersion(c Ctx, bktID int64) error
// GetBkt Get bucket information (for getting bucket configuration)
GetBktInfo(c Ctx, bktID int64) (*BucketInfo, error)
// ScanOrphanedChunks scans orphaned chunks (chunks without metadata)
// delaySeconds: delay time in seconds before re-checking and deleting orphaned chunks
// Returns result with orphaned chunks found and deleted
ScanOrphanedChunks(c Ctx, bktID int64, delaySeconds int) (*ScanOrphanedChunksResult, error)
}
func NewLocalHandler ¶
NewLocalHandler creates a new LocalHandler basePath: path for main database and bucket databases (empty string for default) dataPath: path for data file storage (empty string for default)
func NewNoAuthHandler ¶
NewNoAuthHandler creates a Handler that bypasses all authentication and permission checks This bypasses main database operations (user authentication and ACL permission checks), but bucket database operations (data and object metadata) are still performed normally. This is useful for testing, internal operations, or when authentication is handled externally. The handler uses NoAuthAccessCtrlMgr which always allows all operations without querying the main database. basePath: path for main database and bucket databases (empty string for default) dataPath: path for data file storage (empty string for default)
type InstantUploadConfig ¶
type InstantUploadConfig struct {
RefLevel uint32 // Instant upload level: 0=OFF, 1=FULL, 2=FAST
}
InstantUploadConfig stores instant upload (deduplication) configuration
func GetBucketInstantUploadConfig ¶
func GetBucketInstantUploadConfig(bucket *BucketInfo) *InstantUploadConfig
GetBucketInstantUploadConfig extracts instant upload config from bucket info Returns nil if bucket is nil or config is not set Note: RefLevel is no longer stored in bucket config, should be provided via core.Config in business layer
type ListOptions ¶
type ListOptions struct {
Word string `json:"w,omitempty"` // 过滤词,支持通配符*和?
Delim string `json:"d,omitempty"` // 分隔符,每次请求后返回,原样回传即可
Type int `json:"t,omitempty"` // 对象类型,-1: malformed, 0: 不过滤(default), 1: dir, 2: file, 3: version, 4: preview(thumb/m3u8/pdf)
Count int `json:"c,omitempty"` // 查询个数
Order string `json:"o,omitempty"` // 排序方式,id/mtime/name/size/type 前缀 +: 升序(默认) -: 降序
Brief int `json:"e,omitempty"` // 显示更少内容(只在网络传输层,节省流量时有效),0: FULL(default), 1: without EXT, 2:only ID
}
type LocalAdmin ¶
type LocalAdmin struct {
// contains filtered or unexported fields
}
func (*LocalAdmin) Close ¶
func (la *LocalAdmin) Close()
func (*LocalAdmin) CreateUser ¶
func (*LocalAdmin) Defragment ¶
func (la *LocalAdmin) Defragment(c Ctx, bktID int64) (*DefragmentResult, error)
func (*LocalAdmin) DeleteAllACL ¶
func (la *LocalAdmin) DeleteAllACL(c Ctx, bktID int64) error
func (*LocalAdmin) DeleteUser ¶
func (la *LocalAdmin) DeleteUser(c Ctx, userID int64) error
func (*LocalAdmin) ListACL ¶
func (la *LocalAdmin) ListACL(c Ctx, bktID int64) ([]*BucketACL, error)
func (*LocalAdmin) ListACLByUser ¶
func (la *LocalAdmin) ListACLByUser(c Ctx, uid int64) ([]*BucketACL, error)
func (*LocalAdmin) MergeDuplicateData ¶
func (la *LocalAdmin) MergeDuplicateData(c Ctx, bktID int64) (*MergeDuplicateResult, error)
func (*LocalAdmin) New ¶
func (la *LocalAdmin) New(Admin) Admin
New returns current admin, forming a chain with underlying admin
func (*LocalAdmin) PutACL ¶
ACL management methods (delegate to ACLMetadataAdapter with permission check)
func (*LocalAdmin) PutBkt ¶
func (la *LocalAdmin) PutBkt(c Ctx, o []*BucketInfo) error
func (*LocalAdmin) ScanDirtyData ¶
func (la *LocalAdmin) ScanDirtyData(c Ctx, bktID int64) (*DirtyDataResult, error)
func (*LocalAdmin) ScanOrphanedChunks ¶
func (la *LocalAdmin) ScanOrphanedChunks(c Ctx, bktID int64, delaySeconds int) (*ScanOrphanedChunksResult, error)
func (*LocalAdmin) Scrub ¶
func (la *LocalAdmin) Scrub(c Ctx, bktID int64) (*ScrubResult, error)
func (*LocalAdmin) UpdateUser ¶
type LocalHandler ¶
type LocalHandler struct {
// contains filtered or unexported fields
}
func (*LocalHandler) AccessCtrlMgr ¶
func (lh *LocalHandler) AccessCtrlMgr() AccessCtrlMgr
func (*LocalHandler) CleanRecycleBin ¶
func (lh *LocalHandler) CleanRecycleBin(c Ctx, bktID int64, targetID int64) error
func (*LocalHandler) Close ¶
func (lh *LocalHandler) Close()
func (*LocalHandler) CreateVersionFromFile ¶
func (lh *LocalHandler) CreateVersionFromFile(c Ctx, bktID, existingFileID int64) error
CreateVersionFromFile creates a new version from an existing file This is used when renaming a file to a name that already exists The existing file becomes a version, and the version's parent is the existing file itself After creating the version, the existing file can be replaced by the renamed file
func (*LocalHandler) DataAdapter ¶
func (lh *LocalHandler) DataAdapter() DataAdapter
func (*LocalHandler) DeleteBatch ¶
func (*LocalHandler) Get ¶
func (lh *LocalHandler) Get(c Ctx, bktID int64, ids []int64) ([]*ObjectInfo, error)
func (*LocalHandler) GetBktInfo ¶
func (lh *LocalHandler) GetBktInfo(c Ctx, bktID int64) (*BucketInfo, error)
func (*LocalHandler) GetData ¶
func (lh *LocalHandler) GetData(c Ctx, bktID, id int64, sn int, offsetOrSize ...int) ([]byte, error)
GetData reads data chunk One param means sn, two params mean sn+offset, three params mean sn+offset+size For sparse files (DATA_SPARSE flag), missing chunks are filled with zeros
func (*LocalHandler) GetDataAdapter ¶
func (lh *LocalHandler) GetDataAdapter() DataAdapter
GetDataAdapter returns the DataAdapter instance This allows external packages to access DataAdapter for operations like Delete
func (*LocalHandler) GetDataInfo ¶
func (lh *LocalHandler) GetDataInfo(c Ctx, bktID, id int64) (*DataInfo, error)
func (*LocalHandler) List ¶
func (lh *LocalHandler) List(c Ctx, bktID, pid int64, opt ListOptions) ([]*ObjectInfo, int64, string, error)
func (*LocalHandler) ListRecycleBin ¶
func (lh *LocalHandler) ListRecycleBin(c Ctx, bktID int64, opt ListOptions) ([]*ObjectInfo, int64, string, error)
func (*LocalHandler) Login ¶
func (lh *LocalHandler) Login(c Ctx, usr, pwd string) (Ctx, *UserInfo, []*BucketInfo, error)
func (*LocalHandler) MetadataAdapter ¶
func (lh *LocalHandler) MetadataAdapter() MetadataAdapter
func (*LocalHandler) New ¶
func (lh *LocalHandler) New(Handler) Handler
New returns current handler, forming a chain with underlying handler
func (*LocalHandler) Put ¶
func (lh *LocalHandler) Put(c Ctx, bktID int64, o []*ObjectInfo) ([]int64, error)
Put creates objects During garbage collection: data without metadata reference is dirty data (need window time) Metadata without data is corrupted data PID supports using two's complement to directly reference object ID that hasn't been uploaded yet
func (*LocalHandler) PutData ¶
PutData uploads data chunk For large files, sn starts from 0. For small files or packaged uploads, use sn=0 If dataID is 0, a new dataID will be created automatically
func (*LocalHandler) PutDataFromReader ¶
func (lh *LocalHandler) PutDataFromReader(c Ctx, bktID, dataID int64, sn int, r io.Reader, size int64) (int64, error)
PutDataFromReader uploads data chunk from io.Reader, sn starts from 0 If dataID is 0, a new dataID will be created automatically This is more efficient for streaming data as it avoids loading all data into memory
func (*LocalHandler) PutDataInfo ¶
PutDataInfo creates metadata after data is uploaded
func (*LocalHandler) PutDataInfoAndObj ¶
func (lh *LocalHandler) PutDataInfoAndObj(c Ctx, bktID int64, d []*DataInfo, o []*ObjectInfo) error
PutDataInfoAndObj writes both DataInfo and ObjectInfo in a single transaction This optimization reduces database round trips for better performance
func (*LocalHandler) Ref ¶
Ref performs pre-ref: only file length and HdrXXH3 are pre-ref, returns new DataID on success, 0 on failure With file length, XXH3, SHA-256, returns referenced DataID on success, 0 on failure Client detects DataID change means no need to upload data If non-pre-ref DataID is 0, pre-ref is skipped
func (*LocalHandler) Rename ¶
func (lh *LocalHandler) Rename(c Ctx, bktID, id int64, name string) error
func (*LocalHandler) ScanOrphanedChunks ¶
func (lh *LocalHandler) ScanOrphanedChunks(c Ctx, bktID int64, delaySeconds int) (*ScanOrphanedChunksResult, error)
func (*LocalHandler) SetAccessCtrlMgr ¶
func (lh *LocalHandler) SetAccessCtrlMgr(acm AccessCtrlMgr)
func (*LocalHandler) SetAdapter ¶
func (lh *LocalHandler) SetAdapter(ma MetadataAdapter, da DataAdapter)
func (*LocalHandler) SetDBKey ¶
func (lh *LocalHandler) SetDBKey(key string)
SetDBKey sets the encryption key for database (filename encryption) This sets DataDBKey which is used for encrypting object names in the database
func (*LocalHandler) UpdateFileLatestVersion ¶
func (lh *LocalHandler) UpdateFileLatestVersion(c Ctx, bktID int64) error
type MergeDuplicateResult ¶
type MergeDuplicateResult struct {
MergedGroups int64 `json:"merged_groups"` // 合并的重复数据组数
MergedData []map[int64]int64 `json:"merged_data"` // 合并的数据映射:重复DataID -> 主DataID
FreedSize int64 `json:"freed_size"` // 释放的空间大小(字节)
}
func MergeDuplicateData ¶
func MergeDuplicateData(c Ctx, bktID int64, ma MetadataAdapter, da DataAdapter) (*MergeDuplicateResult, error)
MergeDuplicateData merges duplicate instant upload data Find duplicate data with same checksum value but different DataIDs, merge them into one DataID Ensure only one merge operation is executing for the same bktID at a time
func RunDeduplicationForBucket ¶
func RunDeduplicationForBucket(ctx context.Context, bktID int64, ma MetadataAdapter, da DataAdapter) (*MergeDuplicateResult, error)
RunDeduplicationForBucket runs deduplication for a single bucket This wraps the existing MergeDuplicateData function with proper resource control
type MetadataAdapter ¶
type MetadataAdapter interface {
Close()
BaseMetadataAdapter
DataMetadataAdapter
}
MetadataAdapter combines BaseMetadataAdapter and DataMetadataAdapter for backward compatibility
type NoAuthAccessCtrlMgr ¶
type NoAuthAccessCtrlMgr struct{}
NoAuthAccessCtrlMgr is an AccessCtrlMgr that bypasses all permission checks This bypasses authentication and authorization checks that require the main database (user table and ACL table), but bucket database operations (data and object metadata) are still performed normally. This is useful for testing, internal operations, or when authentication is handled externally
func (*NoAuthAccessCtrlMgr) CheckOwn ¶
func (nacm *NoAuthAccessCtrlMgr) CheckOwn(c Ctx, bktID int64) error
func (*NoAuthAccessCtrlMgr) CheckPermission ¶
func (nacm *NoAuthAccessCtrlMgr) CheckPermission(c Ctx, action int, bktID int64) error
func (*NoAuthAccessCtrlMgr) CheckRole ¶
func (nacm *NoAuthAccessCtrlMgr) CheckRole(c Ctx, role uint32) error
func (*NoAuthAccessCtrlMgr) SetAdapter ¶
func (nacm *NoAuthAccessCtrlMgr) SetAdapter(ma MetadataAdapter)
type ObjectInfo ¶
type ObjectInfo struct {
ID int64 `borm:"id" json:"i,omitempty"` // Object ID (randomly generated by idgen)
PID int64 `borm:"pid" json:"p,omitempty"` // Parent object ID
MTime int64 `borm:"m" json:"m,omitempty"` // Update time, second-level timestamp
DataID int64 `borm:"did" json:"d,omitempty"` // Data ID, if 0, no data (newly created file, DataID is object ID, serving as first version data)
Type int `borm:"t" json:"t,omitempty"` // Object type, -1: malformed, 0: none, 1: dir, 2: file, 3: version, 4: journal
Name string `borm:"n" json:"n,omitempty"` // Object name (may be encrypted if MODE_NAME_ENCRYPTED flag is set)
Size int64 `borm:"s" json:"s,omitempty"` // Object size, directory size is child object count, file size is latest version byte count
Mode uint32 `borm:"md" json:"md,omitempty"` // File mode (permissions and flags), 0 means use default. High bit (1<<31) indicates encrypted name
Extra string `borm:"e" json:"e,omitempty"` // Object extended information
}
type ObjectMetadataAdapter ¶
type ObjectMetadataAdapter interface {
PutObj(c Ctx, bktID int64, o []*ObjectInfo) ([]int64, error)
GetObj(c Ctx, bktID int64, ids []int64) ([]*ObjectInfo, error)
SetObj(c Ctx, bktID int64, fields []string, o *ObjectInfo) error
ListObj(c Ctx, bktID, pid int64, wd, delim, order string, count int) ([]*ObjectInfo, int64, string, error)
CountDataRefs(c Ctx, bktID int64, dataIDs []int64) (map[int64]int64, error) // Count DataID references
DeleteObj(c Ctx, bktID int64, id int64) error // Delete object (mark as deleted, flip PID to negative)
ListDeletedObjs(c Ctx, bktID int64, beforeTime int64, limit int) ([]*ObjectInfo, error) // List deleted objects (PID < 0)
ListRecycleBin(c Ctx, bktID int64, opt ListOptions) ([]*ObjectInfo, int64, string, error) // List recycle bin (objects with PID < 0)
// Query all objects of specified type (not deleted, pid >= 0)
// offset: offset, limit: page size, returns data and total count
ListObjsByType(c Ctx, bktID int64, objType int, offset, limit int) ([]*ObjectInfo, int64, error)
// Query all child objects under specified directory (not deleted, pid >= 0)
// offset: offset, limit: page size, returns data and total count
ListChildren(c Ctx, bktID int64, pid int64, offset, limit int) ([]*ObjectInfo, int64, error)
// Query all objects that reference the specified DataID
GetObjByDataID(c Ctx, bktID int64, dataID int64) ([]*ObjectInfo, error)
// Query all versions of a file (sorted by MTime descending, latest first)
ListVersions(c Ctx, bktID int64, fileID int64) ([]*ObjectInfo, error)
// Extended attributes (xattr) operations
GetAttr(c Ctx, bktID int64, objID int64, key string) ([]byte, error) // Get extended attribute value
SetAttr(c Ctx, bktID int64, objID int64, key string, value []byte) error // Set extended attribute value
RemoveAttr(c Ctx, bktID int64, objID int64, key string) error // Remove extended attribute
ListAttrs(c Ctx, bktID int64, objID int64) ([]string, error) // List all extended attribute keys for an object
}
type ResourceControlConfig ¶
type ResourceControlConfig struct {
// BatchInterval Batch interval time (milliseconds), delay between each batch processing
// Configurable via environment variable ORCAS_BATCH_INTERVAL_MS, default 100ms
BatchInterval time.Duration
// MaxDuration Maximum running duration (seconds), stop processing after this time
// Configurable via environment variable ORCAS_MAX_DURATION_SEC, default 0 means no limit
MaxDuration time.Duration
// MaxItemsPerSecond Maximum items processed per second, for rate limiting
// Configurable via environment variable ORCAS_MAX_ITEMS_PER_SEC, default 0 means no limit
MaxItemsPerSecond int
// AdaptiveDelay Whether to enable adaptive delay, dynamically adjust delay based on processed data volume
// Configurable via environment variable ORCAS_ADAPTIVE_DELAY (true/false), default true
AdaptiveDelay bool
// AdaptiveDelayFactor Adaptive delay factor, delay time = BatchInterval * (1 + processed items / factor)
// Configurable via environment variable ORCAS_ADAPTIVE_DELAY_FACTOR, default 1000
AdaptiveDelayFactor int64
}
ResourceControlConfig Resource control configuration
func GetResourceControlConfig ¶
func GetResourceControlConfig() ResourceControlConfig
GetResourceControlConfig Get resource control configuration
type ResourceController ¶
type ResourceController struct {
// contains filtered or unexported fields
}
ResourceController resource controller, used to limit the use of resource-intensive operations
func NewResourceController ¶
func NewResourceController(config ResourceControlConfig) *ResourceController
NewResourceController creates a new resource controller
func (*ResourceController) GetElapsedTime ¶
func (rc *ResourceController) GetElapsedTime() time.Duration
GetElapsedTime gets the elapsed time
func (*ResourceController) GetProcessedItems ¶
func (rc *ResourceController) GetProcessedItems() int64
GetProcessedItems gets the number of processed items
func (*ResourceController) ShouldStop ¶
func (rc *ResourceController) ShouldStop() bool
ShouldStop checks if processing should stop (exceeds maximum duration)
func (*ResourceController) WaitIfNeeded ¶
func (rc *ResourceController) WaitIfNeeded(itemsProcessed int)
WaitIfNeeded waits between batch processing (implements batch interval and rate limiting) itemsProcessed: number of items processed in this batch
type ScanOrphanedChunksResult ¶
type ScanOrphanedChunksResult struct {
TotalScanned int64 `json:"total_scanned"` // 扫描的chunk总数
OrphanedChunks []int64 `json:"orphaned_chunks"` // 孤立的chunk(dataID列表,去重)
DeletedChunks int `json:"deleted_chunks"` // 删除的chunk数量
FreedSize int64 `json:"freed_size"` // 释放的空间大小(字节)
DelayedChunks int `json:"delayed_chunks"` // 延迟检查的chunk数量
StillOrphaned int `json:"still_orphaned"` // 延迟后仍然孤立的chunk数量
Errors []string `json:"errors"` // 扫描过程中的错误信息
}
func ScanOrphanedChunks ¶
func ScanOrphanedChunks(c Ctx, bktID int64, ma MetadataAdapter, da DataAdapter, delaySeconds int) (*ScanOrphanedChunksResult, error)
ScanOrphanedChunks scans orphaned chunks (chunks without metadata or without object references) Scans all chunk files in the data directory, checks if DataInfo exists and if it's referenced by any ObjectInfo Every 1000 chunks, checks metadata and reference counts. If DataInfo doesn't exist or has no object references, delays and re-checks. If still orphaned after delay, deletes the chunk delaySeconds: delay time in seconds before re-checking and deleting orphaned chunks
type ScrubResult ¶
type ScrubResult struct {
TotalData int `json:"total_data"` // 总数据数
CorruptedData []int64 `json:"corrupted_data"` // 损坏数据:有元数据但没有数据文件
OrphanedData []int64 `json:"orphaned_data"` // 孤立数据:有数据文件但没有元数据引用
MismatchedChecksum []int64 `json:"mismatched_checksum"` // 校验和不匹配的数据
}
func ScrubData ¶
func ScrubData(c Ctx, bktID int64, ma MetadataAdapter, da DataAdapter) (*ScrubResult, error)
ScrubData audits data integrity, checks consistency between metadata and data files
type SnapshotConfig ¶
type SnapshotConfig struct {
Enabled bool // Enable snapshot feature
AutoSnapshot bool // Auto snapshot
SnapshotSchedule string // Cron expression
MaxSnapshots int // Max snapshot count
RetentionDays int // Retention days
LazyMode bool // Lazy mode (O(1) creation)
COWEnabled bool // Enable Copy-On-Write
}
SnapshotConfig defines snapshot configuration
func DefaultSnapshotConfig ¶
func DefaultSnapshotConfig() SnapshotConfig
DefaultSnapshotConfig returns default snapshot configuration
type SnapshotDataAdapter ¶
type SnapshotDataAdapter interface {
// COW 相关操作
DecrementSnapshotDataRefs(ctx Ctx, snapshotID int64) error
IncrementSnapshotDataRefs(ctx Ctx, snapshotID int64) error
IncrementSnapshotDataRefsTx(ctx Ctx, tx Transaction, snapshotID int64) error
CleanupUnreferencedData(ctx Ctx, bktID int64) error
// COW 写入
WriteWithCOW(ctx Ctx, bktID, objID int64, data []byte) (int64, error)
CopyData(ctx Ctx, bktID, srcDataID, dstDataID int64) error
}
SnapshotDataAdapter 快照数据操作接口
type SnapshotManager ¶
type SnapshotManager struct {
// contains filtered or unexported fields
}
SnapshotManager manages bucket snapshots
func NewSnapshotManager ¶
func NewSnapshotManager(sma SnapshotMetadataAdapter, sda SnapshotDataAdapter, config SnapshotConfig) *SnapshotManager
NewSnapshotManager creates a new snapshot manager
func (*SnapshotManager) CleanupExpiredSnapshots ¶
CleanupExpiredSnapshots cleanup expired snapshots
func (*SnapshotManager) CreateSnapshot ¶
func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, bktID int64, name, description string, snapshotType SnapshotType) (*BucketSnapshot, error)
CreateSnapshot creates a new bucket snapshot
func (*SnapshotManager) DeleteSnapshot ¶
func (sm *SnapshotManager) DeleteSnapshot(ctx context.Context, snapshotID int64) error
DeleteSnapshot delete a snapshot
func (*SnapshotManager) GetSnapshot ¶
func (sm *SnapshotManager) GetSnapshot(ctx context.Context, snapshotID int64) (*BucketSnapshot, error)
GetSnapshot get snapshot details
func (*SnapshotManager) GetSnapshotByName ¶
func (sm *SnapshotManager) GetSnapshotByName(ctx context.Context, bktID int64, name string) (*BucketSnapshot, error)
GetSnapshotByName get snapshot by name
func (*SnapshotManager) GetSnapshotFiles ¶
func (sm *SnapshotManager) GetSnapshotFiles(ctx context.Context, snapshotID int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
GetSnapshotFiles get files in a snapshot
func (*SnapshotManager) ListSnapshots ¶
func (sm *SnapshotManager) ListSnapshots(ctx context.Context, bktID int64, limit, offset int) ([]*BucketSnapshot, error)
ListSnapshots list snapshots
func (*SnapshotManager) RestoreSnapshot ¶
func (sm *SnapshotManager) RestoreSnapshot(ctx context.Context, snapshotID int64, targetBktID int64) error
RestoreSnapshot restore a snapshot
type SnapshotMetadataAdapter ¶
type SnapshotMetadataAdapter interface {
// 事务
BeginTransaction(ctx Ctx) (Transaction, error)
// 快照基本操作
InsertBucketSnapshot(ctx Ctx, snapshot *BucketSnapshot) (int64, error)
UpdateSnapshotStatus(ctx Ctx, snapshotID int64, status SnapshotStatus) error
UpdateSnapshotStats(ctx Ctx, snapshotID int64, fileCount, totalSize int64, status SnapshotStatus) error
GetBucketSnapshot(ctx Ctx, snapshotID int64) (*BucketSnapshot, error)
GetBucketSnapshotByName(ctx Ctx, bktID int64, name string) (*BucketSnapshot, error)
ListBucketSnapshots(ctx Ctx, bktID int64, limit, offset int) ([]*BucketSnapshot, error)
DeleteBucketSnapshot(ctx Ctx, snapshotID int64) error
// 快照对象操作
CopyObjectsToSnapshot(ctx Ctx, snapshotID, bktID int64) (fileCount, totalSize int64, err error)
GetSnapshotObjects(ctx Ctx, snapshotID int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
GetObjectsByVersion(ctx Ctx, bktID, version int64, prefix string, limit, offset int) ([]*SnapshotObject, error)
DeleteSnapshotObjects(ctx Ctx, snapshotID int64) error
// 恢复操作
MarkBucketObjectsAsDeleted(ctx Ctx, bktID int64) error
RestoreObjectsFromSnapshot(ctx Ctx, snapshotID, targetBktID int64) error
// 事务操作
InsertBucketSnapshotTx(ctx Ctx, tx Transaction, snapshot *BucketSnapshot) (int64, error)
CopyObjectsToSnapshotTx(ctx Ctx, tx Transaction, snapshotID, bktID int64) (fileCount, totalSize int64, err error)
UpdateSnapshotStatsTx(ctx Ctx, tx Transaction, snapshotID int64, fileCount, totalSize int64, status SnapshotStatus) error
MarkBucketObjectsAsDeletedTx(ctx Ctx, tx Transaction, bktID int64) error
RestoreObjectsFromSnapshotTx(ctx Ctx, tx Transaction, snapshotID, targetBktID int64) error
}
SnapshotMetadataAdapter 快照元数据操作接口
type SnapshotObject ¶
type SnapshotObject struct {
SnapshotID int64 `json:"snapshot_id"`
ObjID int64 `json:"obj_id"`
Path string `json:"path"`
DataID int64 `json:"data_id"`
Size int64 `json:"size"`
MTime int64 `json:"mtime"`
Checksum string `json:"checksum,omitempty"`
}
SnapshotObject represents an object in a snapshot
type SnapshotStatus ¶
type SnapshotStatus int32
SnapshotStatus defines snapshot status
const ( SnapshotStatusInProgress SnapshotStatus = 0 // In progress SnapshotStatusComplete SnapshotStatus = 1 // Complete SnapshotStatusDeleting SnapshotStatus = 2 // Deleting SnapshotStatusFailed SnapshotStatus = 3 // Failed )
type SnapshotType ¶
type SnapshotType int32
SnapshotType defines snapshot type
const ( SnapshotTypeManual SnapshotType = 0 // Manual SnapshotTypeAuto SnapshotType = 1 // Auto SnapshotTypeScheduled SnapshotType = 2 // Scheduled )
type UserInfo ¶
type UserInfo struct {
ID int64 `borm:"id" json:"i,omitempty"` // User ID
Usr string `borm:"usr" json:"u,omitempty"` // Username
Pwd string `borm:"pwd" json:"p,omitempty"` // Password, encrypted using PBKDF2-HMAC-SHA256
Role uint32 `borm:"role" json:"r,omitempty"` // User role: regular user / administrator
Name string `borm:"name" json:"n,omitempty"` // Name
Avatar string `borm:"avatar" json:"a,omitempty"` // Avatar
}
type UserMetadataAdapter ¶
type WriteBufferConfig ¶
type WriteBufferConfig struct {
// MaxBufferSize Maximum buffer size (bytes), exceeding this size will trigger immediate write
// Configurable via environment variable ORCAS_MAX_WRITE_BUFFER_SIZE, default 8MB
MaxBufferSize int64
// MaxBufferWrites Maximum buffered write count, exceeding this count will trigger immediate write
// Configurable via environment variable ORCAS_MAX_WRITE_BUFFER_COUNT, default 2048
MaxBufferWrites int64
// BufferWindow Buffer window time (seconds), multiple writes within the specified time will be merged
// Configurable via environment variable ORCAS_WRITE_BUFFER_WINDOW_SEC, default 10 seconds
BufferWindow time.Duration
// BatchWriteEnabled Whether to enable batch write optimization
// When disabled, files are written as individual objects directly
// Configurable via environment variable ORCAS_BATCH_WRITE_ENABLED, default true
BatchWriteEnabled bool
// MaxBatchWriteFileSize Maximum file size for batch write optimization (bytes)
// Files larger than this size will use direct write path
// Based on performance tests: 1KB files benefit 155%, 1MB files degrade 9.6-65.5%
// Optimal threshold: 64KB-128KB for best performance
// Configurable via environment variable ORCAS_MAX_BATCH_WRITE_FILE_SIZE, default 64KB
MaxBatchWriteFileSize int64
}
WriteBufferConfig Random write buffer configuration
func GetWriteBufferConfig ¶
func GetWriteBufferConfig() WriteBufferConfig
GetWriteBufferConfig Get random write buffer configuration