storage

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2024 License: Apache-2.0 Imports: 29 Imported by: 6

Documentation

Index

Constants

View Source
const (
	ExtentOpenOpt  = os.O_CREATE | os.O_RDWR | os.O_EXCL
	ExtentHasClose = -1
	SEEK_DATA      = 3
	SEEK_HOLE      = 4
)
View Source
const (
	ExtCrcHeaderFileName     = "EXTENT_CRC"
	ExtBaseExtentIDFileName  = "EXTENT_META"
	TinyDeleteFileOpt        = os.O_CREATE | os.O_RDWR | os.O_APPEND
	TinyExtDeletedFileName   = "TINYEXTENT_DELETE"
	NormalExtDeletedFileName = "NORMALEXTENT_DELETE"
	MaxExtentCount           = 20000
	TinyExtentCount          = 64
	TinyExtentStartID        = 1
	MinExtentID              = 1024
	DeleteTinyRecordSize     = 24
	UpdateCrcInterval        = 600
	RepairInterval           = 60
	RandomWriteType          = 2
	AppendWriteType          = 1
	AppendRandomWriteType    = 4

	NormalExtentDeleteRetainTime = 3600 * 4
	CacheFlushMinInterval        = 5 * time.Minute
	CacheFlushMaxInterval        = 15 * time.Minute
	ExtentReadDirHint            = "READDIR_HINT"
	ExtentReadDirHintV2          = "READDIR_HINT_V2"
	ExtentReadDirHintTemp        = "READDIR_HINT.tmp"

	StaleExtStoreBackupSuffix = ".old"
	StaleExtStoreTimeFormat   = "20060102150405.000000000"
)
View Source
const (
	BaseExtentIDOffset = 0
)
View Source
const (
	DiskSectorSize = 512
)

Sector size

View Source
const (
	ExtentMaxSize = 1024 * 1024 * 1024 * 1024 * 4 // 4TB
)

Variables

View Source
var (
	ExtentHasBeenDeletedError        = errors.New("extent has been deleted")
	ParameterMismatchError           = errors.New("parameter mismatch error")
	NoAvailableExtentError           = errors.New("no available extent")
	NoBrokenExtentError              = errors.New("no unavailable extent")
	NoSpaceError                     = errors.New("no space left on the device")
	ForbiddenDataPartitionError      = errors.New("the data partition is forbidden")
	ForbiddenMetaPartitionError      = errors.New("meta partition is forbidden")
	TryAgainError                    = errors.New("try again")
	LimitedIoError                   = errors.New("limited io error")
	CrcMismatchError                 = errors.New("packet Crc is incorrect")
	NoLeaderError                    = errors.New("no raft leader")
	ExtentNotFoundError              = errors.New("extent does not exist")
	ExtentExistsError                = errors.New("extent already exists")
	ExtentIsFullError                = errors.New("extent is full")
	BrokenExtentError                = errors.New("extent has been broken")
	BrokenDiskError                  = errors.New("disk has broken")
	ForbidWriteError                 = errors.New("single replica decommission forbid write")
	VerNotConsistentError            = errors.New("ver not consistent")
	SnapshotNeedNewExtentError       = errors.New("snapshot need new extent error")
	NoDiskReadRepairExtentTokenError = errors.New("no disk read repair extent token")
	ReachMaxExtentsCountError        = errors.New("reached max extents count")
)
View Source
var (
	RegexpExtentFile, _ = regexp.Compile(`^(\d)+$`)
	SnapShotFilePool    = &sync.Pool{New: func() interface{} {
		return new(proto.File)
	}}
)
View Source
var (
	NormalExtentFilter = func() ExtentFilter {
		now := time.Now()
		return func(ei *ExtentInfo) bool {
			return !IsTinyExtent(ei.FileID) && now.Unix()-ei.ModifyTime > RepairInterval && !ei.IsDeleted
		}
	}

	TinyExtentFilter = func(filters []uint64) ExtentFilter {
		return func(ei *ExtentInfo) bool {
			if !IsTinyExtent(ei.FileID) {
				return false
			}
			for _, filterID := range filters {
				if filterID == ei.FileID {
					return true
				}
			}
			return false
		}
	}
)

Filters

View Source
var ErrStoreAlreadyClosed = errors.New("extent store already closed")

Functions

func GetSnapShotFileFromPool

func GetSnapShotFileFromPool() (f *proto.File)

func IsAppendRandomWrite

func IsAppendRandomWrite(writeType int) bool

func IsAppendWrite

func IsAppendWrite(writeType int) bool

func IsRandomWrite

func IsRandomWrite(writeType int) bool

func IsTinyExtent

func IsTinyExtent(extentID uint64) bool

IsTinyExtent checks if the given extent is tiny extent.

func MarshalTinyExtent

func MarshalTinyExtent(extentID uint64, offset, size int64) (data []byte)

func MkdirAll

func MkdirAll(name string) (err error)

func PutSnapShotFileToPool

func PutSnapShotFileToPool(f *proto.File)

func UnMarshalTinyExtent

func UnMarshalTinyExtent(data []byte) (extentID, offset, size uint64)

Types

type BlockCrc

type BlockCrc struct {
	BlockNo int
	Crc     uint32
}

type BlockCrcArr

type BlockCrcArr []*BlockCrc

func (BlockCrcArr) Len

func (arr BlockCrcArr) Len() int

func (BlockCrcArr) Less

func (arr BlockCrcArr) Less(i, j int) bool

func (BlockCrcArr) Swap

func (arr BlockCrcArr) Swap(i, j int)

type Extent

type Extent struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Extent is an implementation of Extent for local regular extent file data management. This extent implementation manages all header info and data body in one single entry file. Header of extent include inode value of this extent block and Crc blocks of data blocks.

func NewExtentInCore

func NewExtentInCore(name string, extentID uint64) *Extent

NewExtentInCore create and returns a new extent instance.

func (*Extent) Close

func (e *Extent) Close() (err error)

Close this extent and release FD.

func (*Extent) Exist

func (e *Extent) Exist() (exsit bool)

func (*Extent) Flush

func (e *Extent) Flush() (err error)

Flush synchronizes data to the disk.

func (*Extent) GetCrc

func (e *Extent) GetCrc(blockNo int64) uint32

func (*Extent) GetDataSize

func (e *Extent) GetDataSize(statSize int64) (dataSize int64)

func (*Extent) GetFile

func (e *Extent) GetFile() *os.File

func (*Extent) GetSize

func (e *Extent) GetSize() (int64, uint64)

func (*Extent) HasClosed

func (e *Extent) HasClosed() bool

func (*Extent) InitToFS

func (e *Extent) InitToFS() (err error)

InitToFS init extent data info filesystem. If entry file exist and overwrite is true, this operation will clear all data of exist entry file and initialize extent header data.

func (*Extent) ModifyTime

func (e *Extent) ModifyTime() int64

ModifyTime returns the time when this extent was modified recently.

func (*Extent) Read

func (e *Extent) Read(data []byte, offset, size int64, isRepairRead bool) (crc uint32, err error)

Read reads data from an extent.

func (*Extent) ReadTiny

func (e *Extent) ReadTiny(data []byte, offset, size int64, isRepairRead bool) (crc uint32, err error)

ReadTiny read data from a tiny extent.

func (*Extent) RestoreFromFS

func (e *Extent) RestoreFromFS() (err error)

RestoreFromFS restores the entity data and status from the file stored on the filesystem.

func (*Extent) Size

func (e *Extent) Size() (size int64)

Size returns length of the extent (not including the header).

func (*Extent) String

func (e *Extent) String() string

func (*Extent) TinyExtentRecover

func (e *Extent) TinyExtentRecover(data []byte, offset, size int64, crc uint32, isEmptyPacket bool) (err error)

func (*Extent) Write

func (e *Extent) Write(param *WriteParam, crcFunc UpdateCrcFunc) (status uint8, err error)

Write writes data to an extent.

func (*Extent) WriteTiny

func (e *Extent) WriteTiny(param *WriteParam) (err error)

WriteTiny performs write on a tiny extent.

type ExtentCache

type ExtentCache struct {
	// contains filtered or unexported fields
}

ExtentCache is an implementation of the ExtentCache with LRU support.

func NewExtentCache

func NewExtentCache(capacity int) *ExtentCache

NewExtentCache creates and returns a new ExtentCache instance.

func (*ExtentCache) Clear

func (cache *ExtentCache) Clear()

Clear closes all the extents stored in the cache.

func (*ExtentCache) CopyAndFlush

func (cache *ExtentCache) CopyAndFlush(motifyInterval time.Duration)

func (*ExtentCache) Del

func (cache *ExtentCache) Del(extentID uint64)

Del deletes the extent stored in the cache.

func (*ExtentCache) Flush

func (cache *ExtentCache) Flush()

Flush synchronizes the extent stored in the cache to the disk.

func (*ExtentCache) Get

func (cache *ExtentCache) Get(extentID uint64) (e *Extent, ok bool)

Get gets the extent from the cache.

func (*ExtentCache) Put

func (cache *ExtentCache) Put(e *Extent)

Put puts an extent object into the cache.

func (*ExtentCache) Size

func (cache *ExtentCache) Size() int

Size returns number of extents stored in the cache.

type ExtentDeleted

type ExtentDeleted struct {
	ExtentID uint64 `json:"extentID"`
	Offset   uint64 `json:"offset"`
	Size     uint64 `json:"size"`
}

type ExtentFilter

type ExtentFilter func(info *ExtentInfo) bool

type ExtentInfo

type ExtentInfo struct {
	FileID              uint64 `json:"fileId"`
	Size                uint64 `json:"size"`
	Crc                 uint32 `json:"Crc"`
	IsDeleted           bool   `json:"deleted"`
	ModifyTime          int64  `json:"modTime"` // random write not update modify time
	AccessTime          int64  `json:"accessTime"`
	Source              string `json:"src"`
	SnapshotDataOff     uint64 `json:"snapSize"`
	SnapPreAllocDataOff uint64 `json:"snapPreAllocSize"`
	ApplyID             uint64 `json:"applyID"`
	ApplySize           int64  `json:"ApplySize"`
}

func (*ExtentInfo) MarshalBinary

func (ei *ExtentInfo) MarshalBinary() (v []byte, err error)

func (*ExtentInfo) MarshalBinaryWithBuffer

func (ei *ExtentInfo) MarshalBinaryWithBuffer(buff *bytes.Buffer) (err error)

func (*ExtentInfo) String

func (ei *ExtentInfo) String() (m string)

func (*ExtentInfo) TotalSize

func (ei *ExtentInfo) TotalSize() uint64

func (*ExtentInfo) UnmarshalBinary

func (ei *ExtentInfo) UnmarshalBinary(v []byte) (err error)

func (*ExtentInfo) UnmarshalBinaryWithBuffer

func (ei *ExtentInfo) UnmarshalBinaryWithBuffer(buff *bytes.Buffer) (err error)

func (*ExtentInfo) UpdateExtentInfo

func (ei *ExtentInfo) UpdateExtentInfo(extent *Extent, crc uint32)

type ExtentInfoArr

type ExtentInfoArr []*ExtentInfo

func (ExtentInfoArr) Len

func (arr ExtentInfoArr) Len() int

func (ExtentInfoArr) Less

func (arr ExtentInfoArr) Less(i, j int) bool

func (ExtentInfoArr) Swap

func (arr ExtentInfoArr) Swap(i, j int)

type ExtentMapItem

type ExtentMapItem struct {
	// contains filtered or unexported fields
}

ExtentMapItem stores the extent entity pointer and the element pointer of the extent entity in a cache list.

type ExtentStore

type ExtentStore struct {
	ApplyId uint64
	// contains filtered or unexported fields
}

ExtentStore defines fields used in the storage engine. Packets smaller than 128K are stored in the "tinyExtent", a place to persist the small files. packets larger than or equal to 128K are stored in the normal "extent", a place to persist large files. The difference between them is that the extentID of a tinyExtent starts at 5000000 and ends at 5000128. Multiple small files can be appended to the same tinyExtent. In addition, the deletion of small files is implemented by the punch hole from the underlying file system.

func NewExtentStore

func NewExtentStore(dataDir string, partitionID uint64, storeSize, dpType int, isCreate bool) (s *ExtentStore, err error)

func (*ExtentStore) AvailableTinyExtentCnt

func (s *ExtentStore) AvailableTinyExtentCnt() int

AvailableTinyExtentCnt returns the count of the available tiny extents.

func (*ExtentStore) BackendTask

func (s *ExtentStore) BackendTask()

func (*ExtentStore) BrokenTinyExtentCnt

func (s *ExtentStore) BrokenTinyExtentCnt() int

BrokenTinyExtentCnt returns the count of the broken tiny extents.

func (*ExtentStore) BuildSnapshotExtentCrcMetaFile

func (s *ExtentStore) BuildSnapshotExtentCrcMetaFile(blockNo int) (fp *os.File, err error)

func (*ExtentStore) CanGcDelete

func (s *ExtentStore) CanGcDelete(extId uint64) bool

func (*ExtentStore) Close

func (s *ExtentStore) Close()

Close closes the extent store.

func (*ExtentStore) Create

func (s *ExtentStore) Create(extentID uint64) (err error)

Create creates an extent.

func (*ExtentStore) DeleteBlockCrc

func (s *ExtentStore) DeleteBlockCrc(extentID uint64) (err error)

func (*ExtentStore) DeleteExtentInfo

func (s *ExtentStore) DeleteExtentInfo(id uint64)

func (*ExtentStore) DumpExtents

func (s *ExtentStore) DumpExtents() (extInfos SortedExtentInfos)

func (*ExtentStore) ExtentBatchLockNormalExtent

func (s *ExtentStore) ExtentBatchLockNormalExtent(gcLockEks *proto.GcLockExtents) (err error)

func (*ExtentStore) ExtentBatchUnlockNormalExtent

func (s *ExtentStore) ExtentBatchUnlockNormalExtent(ext []*proto.ExtentKey)

func (*ExtentStore) ExtentID

func (s *ExtentStore) ExtentID(filename string) (extentID uint64, isExtent bool)

ExtentID return the extent ID.

func (*ExtentStore) Flush

func (s *ExtentStore) Flush()

func (*ExtentStore) GetAllExtents

func (s *ExtentStore) GetAllExtents(beforeTime int64) (extents []*ExtentInfo, err error)

func (*ExtentStore) GetAllWatermarks

func (s *ExtentStore) GetAllWatermarks(filter ExtentFilter) (extents []*ExtentInfo, tinyDeleteFileSize int64, err error)

GetAllWatermarks returns all the watermarks.

func (*ExtentStore) GetAvailableTinyExtent

func (s *ExtentStore) GetAvailableTinyExtent() (extentID uint64, err error)

GetAvailableTinyExtent returns the available tiny extent from the channel.

func (*ExtentStore) GetBrokenTinyExtent

func (s *ExtentStore) GetBrokenTinyExtent() (extentID uint64, err error)

GetBrokenTinyExtent returns the first broken extent in the channel.

func (*ExtentStore) GetExtentCount

func (s *ExtentStore) GetExtentCount() (count int)

GetExtentCount returns the number of extents in the extentInfoMap

func (*ExtentStore) GetExtentCountWithoutLock

func (s *ExtentStore) GetExtentCountWithoutLock() (count int)

func (*ExtentStore) GetExtentFinfoSize

func (s *ExtentStore) GetExtentFinfoSize(extentID uint64) (size uint64, err error)

func (*ExtentStore) GetExtentInfo

func (s *ExtentStore) GetExtentInfo(id uint64) (ei *ExtentInfo, ok bool)

func (*ExtentStore) GetExtentInfoCount

func (s *ExtentStore) GetExtentInfoCount() (count int)

func (*ExtentStore) GetExtentInfoFromDisk

func (s *ExtentStore) GetExtentInfoFromDisk(id uint64) (ei *ExtentInfo, err error)

func (*ExtentStore) GetExtentSnapshotModOffset

func (s *ExtentStore) GetExtentSnapshotModOffset(extentID uint64, allocSize uint32) (watermark int64, err error)

GetTinyExtentOffset returns the offset of the given extent.

func (*ExtentStore) GetExtentWithHoleAvailableOffset

func (s *ExtentStore) GetExtentWithHoleAvailableOffset(extentID uint64, offset int64) (newOffset, newEnd int64, err error)

func (*ExtentStore) GetGcFlag

func (s *ExtentStore) GetGcFlag(extId uint64) proto.GcFlag

func (*ExtentStore) GetHasDeleteExtent

func (s *ExtentStore) GetHasDeleteExtent() (extentDes []ExtentDeleted, err error)

func (*ExtentStore) GetHasDeleteTinyRecords

func (s *ExtentStore) GetHasDeleteTinyRecords() (extentDes []ExtentDeleted, err error)

func (*ExtentStore) GetMaxExtentIDAndPartitionSize

func (s *ExtentStore) GetMaxExtentIDAndPartitionSize() (maxExtentID, totalSize uint64)

StoreSizeExtentID returns the size of the extent store

func (*ExtentStore) GetPersistenceBaseExtentID

func (s *ExtentStore) GetPersistenceBaseExtentID() (extentID uint64, err error)

func (*ExtentStore) GetPreAllocSpaceExtentIDOnVerifyFile

func (s *ExtentStore) GetPreAllocSpaceExtentIDOnVerifyFile() (extentID uint64)

func (*ExtentStore) GetStoreUsedSize

func (s *ExtentStore) GetStoreUsedSize() (used int64)

func (*ExtentStore) GetTinyExtentOffset

func (s *ExtentStore) GetTinyExtentOffset(extentID uint64) (watermark int64, err error)

GetTinyExtentOffset returns the offset of the given extent.

func (*ExtentStore) HasExtent

func (s *ExtentStore) HasExtent(extentID uint64) (exist bool)

HasExtent tells if the extent store has the extent with the given ID

func (*ExtentStore) IsClosed

func (s *ExtentStore) IsClosed() (closed bool)

func (*ExtentStore) IsDeletedNormalExtent

func (s *ExtentStore) IsDeletedNormalExtent(extentID uint64) (ok bool)

func (*ExtentStore) LoadExtentFromDisk

func (s *ExtentStore) LoadExtentFromDisk(extentID uint64, putCache bool) (e *Extent, err error)

func (*ExtentStore) LoadTinyDeleteFileOffset

func (s *ExtentStore) LoadTinyDeleteFileOffset() (offset int64, err error)

func (*ExtentStore) MarkDelete

func (s *ExtentStore) MarkDelete(extentID uint64, offset, size int64) (err error)

MarkDelete marks the given extent as deleted.

func (*ExtentStore) MoveAllToBrokenTinyExtentC

func (s *ExtentStore) MoveAllToBrokenTinyExtentC(cnt int)

MoveAllToBrokenTinyExtentC moves all the tiny extents to the channel stores the broken extents.

func (*ExtentStore) NextExtentID

func (s *ExtentStore) NextExtentID() (extentID uint64, err error)

NextExtentID returns the next extentID. When the client sends the request to create an extent, this function generates an unique extentID within the current partition. This function can only be called by the leader.

func (*ExtentStore) PersistenceBaseExtentID

func (s *ExtentStore) PersistenceBaseExtentID(extentID uint64) (err error)

func (*ExtentStore) PersistenceBlockCrc

func (s *ExtentStore) PersistenceBlockCrc(e *Extent, blockNo int, blockCrc uint32) (err error)

func (*ExtentStore) PersistenceHasDeleteExtent

func (s *ExtentStore) PersistenceHasDeleteExtent(extentID uint64) (err error)

func (*ExtentStore) PreAllocSpaceOnVerfiyFile

func (s *ExtentStore) PreAllocSpaceOnVerfiyFile(currExtentID uint64)

func (*ExtentStore) PreAllocSpaceOnVerfiyFileForAppend

func (s *ExtentStore) PreAllocSpaceOnVerfiyFileForAppend(idx int)

func (*ExtentStore) PutNormalExtentToDeleteCache

func (s *ExtentStore) PutNormalExtentToDeleteCache(extentID uint64)

func (*ExtentStore) RangeExtentInfo

func (s *ExtentStore) RangeExtentInfo(iter func(id uint64, ei *ExtentInfo) (ok bool, err error)) (err error)

func (*ExtentStore) Read

func (s *ExtentStore) Read(extentID uint64, offset, size int64, nbuf []byte, isRepairRead bool, isBackupRead bool) (crc uint32, err error)

Read reads the extent based on the given id.

func (*ExtentStore) ReadTinyDeleteRecords

func (s *ExtentStore) ReadTinyDeleteRecords(offset, size int64, data []byte) (crc uint32, err error)

func (*ExtentStore) RecordTinyDelete

func (s *ExtentStore) RecordTinyDelete(extentID uint64, offset, size int64) (err error)

func (*ExtentStore) ScanBlocks

func (s *ExtentStore) ScanBlocks(extentID uint64) (bcs []*BlockCrc, err error)

func (*ExtentStore) SendAllToBrokenTinyExtentC

func (s *ExtentStore) SendAllToBrokenTinyExtentC(extentIds []uint64)

SendAllToBrokenTinyExtentC sends all the extents to the channel that stores the broken extents.

func (*ExtentStore) SendToAvailableTinyExtentC

func (s *ExtentStore) SendToAvailableTinyExtentC(extentID uint64)

SendToAvailableTinyExtentC sends the extent to the channel that stores the available tiny extents.

func (*ExtentStore) SendToBrokenTinyExtentC

func (s *ExtentStore) SendToBrokenTinyExtentC(extentID uint64)

SendToBrokenTinyExtentC sends the given extent id to the channel.

func (*ExtentStore) SetExtentInfo

func (s *ExtentStore) SetExtentInfo(id uint64, ei *ExtentInfo)

func (*ExtentStore) SnapShot

func (s *ExtentStore) SnapShot() (files []*proto.File, err error)

SnapShot returns the information of all the extents on the current data partition. When the master sends the loadDataPartition request, the snapshot is used to compare the replicas.

func (*ExtentStore) StoreSizeExtentID

func (s *ExtentStore) StoreSizeExtentID(maxExtentID uint64) (totalSize uint64)

StoreSizeExtentID returns the size of the extent store

func (*ExtentStore) TinyExtentGetFinfoSize

func (s *ExtentStore) TinyExtentGetFinfoSize(extentID uint64) (size uint64, err error)

func (*ExtentStore) TinyExtentRecover

func (s *ExtentStore) TinyExtentRecover(extentID uint64, offset, size int64, data []byte, crc uint32, isEmptyPacket bool) (err error)

func (*ExtentStore) UpdateBaseExtentID

func (s *ExtentStore) UpdateBaseExtentID(id uint64) (err error)

UpdateBaseExtentID updates the base extent ID.

func (*ExtentStore) Watermark

func (s *ExtentStore) Watermark(extentID uint64) (ei *ExtentInfo, err error)

Watermark returns the extent info of the given extent on the record.

func (*ExtentStore) Write

func (s *ExtentStore) Write(param *WriteParam) (status uint8, err error)

Write writes the given extent to the disk.

type GetExtentCrcFunc

type GetExtentCrcFunc func(extentID uint64) (crc uint32, err error)

type SortedExtentInfos

type SortedExtentInfos []*ExtentInfo

SortedExtentInfos defines an array sorted by AccessTime

func (SortedExtentInfos) Len

func (extInfos SortedExtentInfos) Len() int

func (SortedExtentInfos) Less

func (extInfos SortedExtentInfos) Less(i, j int) bool

func (SortedExtentInfos) Swap

func (extInfos SortedExtentInfos) Swap(i, j int)

type UpdateCrcFunc

type UpdateCrcFunc func(e *Extent, blockNo int, crc uint32) (err error)

type WriteParam

type WriteParam struct {
	ExtentID                                uint64
	Offset                                  int64
	Size                                    int64
	Data                                    []byte
	Crc                                     uint32
	WriteType                               int
	IsSync, IsHole, IsRepair, IsBackupWrite bool
}

func (*WriteParam) String

func (wparam *WriteParam) String() (m string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL