Documentation
¶
Index ¶
- Constants
- Variables
- func GetExtentHandlerID() uint64
- type AppendExtentKeyFunc
- type CacheBcacheFunc
- type DirtyExtentList
- type EvictBacheFunc
- type EvictIcacheFunc
- type EvictRequest
- type ExtentCache
- func (cache *ExtentCache) Append(ek *proto.ExtentKey, sync bool) (discardExtents []proto.ExtentKey)
- func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey)
- func (cache *ExtentCache) GetEndForAppendWrite(offset uint64, verSeq uint64, needCheck bool) (ret *proto.ExtentKey)
- func (cache *ExtentCache) List() []*proto.ExtentKey
- func (cache *ExtentCache) LogOutPut()
- func (cache *ExtentCache) Max() *proto.ExtentKey
- func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest
- func (cache *ExtentCache) PrepareWriteRequests(offset, size int, data []byte) []*ExtentRequest
- func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error
- func (cache *ExtentCache) RefreshForce(inode uint64, force bool, getExtents GetExtentsFunc) error
- func (cache *ExtentCache) RemoveDiscard(discardExtents []proto.ExtentKey)
- func (cache *ExtentCache) SetSize(size uint64, sync bool)
- func (cache *ExtentCache) Size() (size int, gen uint64)
- func (cache *ExtentCache) SplitExtentKey(inodeID uint64, ekPivot *proto.ExtentKey) (err error)
- func (cache *ExtentCache) TruncDiscard(size uint64)
- type ExtentClient
- func (client *ExtentClient) AllocatePreLoadDataPartition(volName string, count int, capacity, ttl uint64, zones string) (err error)
- func (client *ExtentClient) CheckDataPartitionExsit(partitionID uint64) error
- func (client *ExtentClient) Close() error
- func (client *ExtentClient) CloseStream(inode uint64) error
- func (client *ExtentClient) EvictStream(inode uint64) error
- func (client *ExtentClient) FileSize(inode uint64) (size int, gen uint64, valid bool)
- func (client *ExtentClient) Flush(inode uint64) error
- func (client *ExtentClient) ForceRefreshExtentsCache(inode uint64) error
- func (client *ExtentClient) GetDataPartitionForWrite() error
- func (client *ExtentClient) GetEnablePosixAcl() bool
- func (client *ExtentClient) GetExtentCacheGen(inode uint64) uint64
- func (client *ExtentClient) GetExtents(inode uint64) []*proto.ExtentKey
- func (client *ExtentClient) GetFlowInfo() (*proto.ClientReportLimitInfo, bool)
- func (client *ExtentClient) GetLatestVer() uint64
- func (client *ExtentClient) GetRate() string
- func (client *ExtentClient) GetReadVer() uint64
- func (client *ExtentClient) GetStreamer(inode uint64) *Streamer
- func (client *ExtentClient) GetVerMgr() *proto.VolVersionInfoList
- func (client *ExtentClient) GetVolumeName() string
- func (client *ExtentClient) IsPreloadMode() bool
- func (client *ExtentClient) OpenStream(inode uint64) error
- func (client *ExtentClient) OpenStreamWithCache(inode uint64, needBCache bool) error
- func (client *ExtentClient) Read(inode uint64, data []byte, offset int, size int) (read int, err error)
- func (client *ExtentClient) ReadExtent(inode uint64, ek *proto.ExtentKey, data []byte, offset int, size int) (read int, err error, isStream bool)
- func (client *ExtentClient) RefreshExtentsCache(inode uint64) error
- func (client *ExtentClient) SetClientID(id uint64) (err error)
- func (client *ExtentClient) SetFileSize(inode uint64, size int, sync bool)
- func (client *ExtentClient) SetReadRate(val int) string
- func (client *ExtentClient) SetWriteRate(val int) string
- func (client *ExtentClient) Truncate(mw *meta.MetaWrapper, parentIno uint64, inode uint64, size int, ...) error
- func (client *ExtentClient) UidIsLimited(uid uint32) bool
- func (client *ExtentClient) UpdateDataPartitionForColdVolume() error
- func (client *ExtentClient) UpdateFlowInfo(limit *proto.LimitRsp2Client)
- func (client *ExtentClient) UpdateLatestVer(verList *proto.VolVersionInfoList) (err error)
- func (client *ExtentClient) UploadFlowInfo(clientInfo wrapper.SimpleClientInfo) (bWork bool, err error)
- func (client *ExtentClient) Write(inode uint64, offset int, data []byte, flags int, checkFunc func() error) (write int, err error)
- type ExtentConfig
- type ExtentHandler
- type ExtentReader
- type ExtentRequest
- type FlushRequest
- type GetExtentsFunc
- type GetReplyFunc
- type LoadBcacheFunc
- type MultiVerMgr
- type OpenRequest
- type Packet
- func NewCreateExtentPacket(dp *wrapper.DataPartition, inode uint64) *Packet
- func NewOverwriteByAppendPacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, ...) *Packet
- func NewOverwritePacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, ...) *Packet
- func NewReadPacket(key *proto.ExtentKey, extentOffset, size int, inode uint64, fileOffset int, ...) *Packet
- func NewReply(reqID int64, partitionID uint64, extentID uint64) *Packet
- func NewWritePacket(inode uint64, fileOffset, storeMode int) *Packet
- func NewWriteTinyDirectly(inode uint64, dpID uint64, offset int, dp *wrapper.DataPartition) *Packet
- type ReleaseRequest
- type SplitExtentKeyFunc
- type StreamConn
- type Streamer
- func (s *Streamer) GetExtentReader(ek *proto.ExtentKey) (*ExtentReader, error)
- func (s *Streamer) GetExtents() error
- func (s *Streamer) GetExtentsForce() error
- func (s *Streamer) GetExtentsForceRefresh() error
- func (s *Streamer) GetStoreMod(offset int, size int) (storeMode int)
- func (s *Streamer) IssueEvictRequest() error
- func (s *Streamer) IssueFlushRequest() error
- func (s *Streamer) IssueOpenRequest() error
- func (s *Streamer) IssueReleaseRequest() error
- func (s *Streamer) IssueTruncRequest(size int, fullPath string) error
- func (s *Streamer) IssueWriteRequest(offset int, data []byte, flags int, checkFunc func() error) (write int, err error)
- func (s *Streamer) SetParentInode(inode uint64)
- func (s *Streamer) String() string
- type TruncRequest
- type TruncateFunc
- type VerUpdateRequest
- type WriteRequest
Constants ¶
const ( MaxMountRetryLimit = 6 MountRetryInterval = time.Second * 5 )
const ( ExtentStatusOpen int32 = iota ExtentStatusClosed ExtentStatusRecovery ExtentStatusError )
State machines
const ( StreamSendMaxRetry = 200 StreamSendSleepInterval = 100 * time.Millisecond StreamSendMaxTimeout = 10 * time.Minute RetryFactor = 12 / 10 )
const ( MaxSelectDataPartitionForWrite = 32 MaxNewHandlerRetry = 3 MaxPacketErrorCount = 128 MaxDirtyListLen = 0 )
const ( StreamerNormal int32 = iota StreamerError LastEKVersionNotEqual )
Variables ¶
var ( TryOtherAddrError = errors.New("TryOtherAddrError") DpDiscardError = errors.New("DpDiscardError") LimitedIoError = errors.New("LimitedIoError") )
var StreamConnPool = util.NewConnectPool()
Functions ¶
func GetExtentHandlerID ¶
func GetExtentHandlerID() uint64
GetExtentHandlerID returns the extent handler ID.
Types ¶
type AppendExtentKeyFunc ¶
type CacheBcacheFunc ¶ added in v1.34.0
type DirtyExtentList ¶
DirtyExtentList defines the struct of the dirty extent list.
func NewDirtyExtentList ¶
func NewDirtyExtentList() *DirtyExtentList
NewDirtyExtentList returns a new DirtyExtentList instance.
func (*DirtyExtentList) Get ¶
func (dl *DirtyExtentList) Get() *list.Element
Get gets the next element in the dirty extent list.
func (*DirtyExtentList) Len ¶
func (dl *DirtyExtentList) Len() int
Len returns the size of the dirty extent list.
func (*DirtyExtentList) Put ¶
func (dl *DirtyExtentList) Put(eh *ExtentHandler)
Put puts a new extent handler into the dirty extent list.
func (*DirtyExtentList) Remove ¶
func (dl *DirtyExtentList) Remove(e *list.Element)
Remove removes the element from the dirty extent list.
type EvictBacheFunc ¶ added in v1.34.0
type EvictIcacheFunc ¶ added in v1.34.0
type EvictIcacheFunc func(inode uint64)
type EvictRequest ¶
type EvictRequest struct {
// contains filtered or unexported fields
}
EvictRequest defines an evict request.
type ExtentCache ¶
ExtentCache defines the struct of the extent cache.
func NewExtentCache ¶
func NewExtentCache(inode uint64) *ExtentCache
NewExtentCache returns a new extent cache.
func (*ExtentCache) Get ¶
func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey)
Get returns the extent key based on the given offset.
func (*ExtentCache) GetEndForAppendWrite ¶ added in v1.34.0
func (cache *ExtentCache) GetEndForAppendWrite(offset uint64, verSeq uint64, needCheck bool) (ret *proto.ExtentKey)
GetEndForAppendWrite returns the extent key whose end offset equals the given offset.
func (*ExtentCache) List ¶
func (cache *ExtentCache) List() []*proto.ExtentKey
List returns a list of the extents in the cache.
func (*ExtentCache) LogOutPut ¶ added in v1.34.0
func (cache *ExtentCache) LogOutPut()
func (*ExtentCache) Max ¶
func (cache *ExtentCache) Max() *proto.ExtentKey
Max returns the max extent key in the cache.
func (*ExtentCache) PrepareReadRequests ¶
func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest
PrepareReadRequests classifies the incoming request.
func (*ExtentCache) PrepareWriteRequests ¶
func (cache *ExtentCache) PrepareWriteRequests(offset, size int, data []byte) []*ExtentRequest
PrepareWriteRequests TODO explain
func (*ExtentCache) Refresh ¶
func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error
Refresh refreshes the extent cache.
func (*ExtentCache) RefreshForce ¶ added in v1.34.0
func (cache *ExtentCache) RefreshForce(inode uint64, force bool, getExtents GetExtentsFunc) error
func (*ExtentCache) RemoveDiscard ¶ added in v1.34.0
func (cache *ExtentCache) RemoveDiscard(discardExtents []proto.ExtentKey)
func (*ExtentCache) SetSize ¶
func (cache *ExtentCache) SetSize(size uint64, sync bool)
SetSize set the size of the cache.
func (*ExtentCache) Size ¶
func (cache *ExtentCache) Size() (size int, gen uint64)
Size returns the size of the cache.
func (*ExtentCache) SplitExtentKey ¶ added in v1.34.0
func (cache *ExtentCache) SplitExtentKey(inodeID uint64, ekPivot *proto.ExtentKey) (err error)
Split extent key.
func (*ExtentCache) TruncDiscard ¶ added in v1.34.0
func (cache *ExtentCache) TruncDiscard(size uint64)
type ExtentClient ¶
type ExtentClient struct {
BcacheHealth bool
LimitManager *manager.LimitManager
// contains filtered or unexported fields
}
ExtentClient defines the struct of the extent client.
func NewExtentClient ¶
func NewExtentClient(config *ExtentConfig) (client *ExtentClient, err error)
NewExtentClient returns a new extent client.
func (*ExtentClient) AllocatePreLoadDataPartition ¶ added in v1.34.0
func (*ExtentClient) CheckDataPartitionExsit ¶ added in v1.34.0
func (client *ExtentClient) CheckDataPartitionExsit(partitionID uint64) error
func (*ExtentClient) Close ¶ added in v1.5.0
func (client *ExtentClient) Close() error
func (*ExtentClient) CloseStream ¶
func (client *ExtentClient) CloseStream(inode uint64) error
Release request shall grab the lock until request is sent to the request channel
func (*ExtentClient) EvictStream ¶
func (client *ExtentClient) EvictStream(inode uint64) error
Evict request shall grab the lock until request is sent to the request channel
func (*ExtentClient) FileSize ¶
func (client *ExtentClient) FileSize(inode uint64) (size int, gen uint64, valid bool)
FileSize returns the file size.
func (*ExtentClient) Flush ¶
func (client *ExtentClient) Flush(inode uint64) error
func (*ExtentClient) ForceRefreshExtentsCache ¶ added in v1.34.0
func (client *ExtentClient) ForceRefreshExtentsCache(inode uint64) error
func (*ExtentClient) GetDataPartitionForWrite ¶ added in v1.34.0
func (client *ExtentClient) GetDataPartitionForWrite() error
func (*ExtentClient) GetEnablePosixAcl ¶ added in v1.34.0
func (client *ExtentClient) GetEnablePosixAcl() bool
func (*ExtentClient) GetExtentCacheGen ¶ added in v1.34.0
func (client *ExtentClient) GetExtentCacheGen(inode uint64) uint64
GetExtentCacheGen return extent generation
func (*ExtentClient) GetExtents ¶ added in v1.34.0
func (client *ExtentClient) GetExtents(inode uint64) []*proto.ExtentKey
func (*ExtentClient) GetFlowInfo ¶ added in v1.34.0
func (client *ExtentClient) GetFlowInfo() (*proto.ClientReportLimitInfo, bool)
func (*ExtentClient) GetLatestVer ¶ added in v1.34.0
func (client *ExtentClient) GetLatestVer() uint64
func (*ExtentClient) GetRate ¶ added in v1.4.0
func (client *ExtentClient) GetRate() string
func (*ExtentClient) GetReadVer ¶ added in v1.34.0
func (client *ExtentClient) GetReadVer() uint64
func (*ExtentClient) GetStreamer ¶
func (client *ExtentClient) GetStreamer(inode uint64) *Streamer
GetStreamer returns the streamer.
func (*ExtentClient) GetVerMgr ¶ added in v1.34.0
func (client *ExtentClient) GetVerMgr() *proto.VolVersionInfoList
func (*ExtentClient) GetVolumeName ¶ added in v1.34.0
func (client *ExtentClient) GetVolumeName() string
func (*ExtentClient) IsPreloadMode ¶ added in v1.34.0
func (client *ExtentClient) IsPreloadMode() bool
func (*ExtentClient) OpenStream ¶
func (client *ExtentClient) OpenStream(inode uint64) error
Open request shall grab the lock until request is sent to the request channel
func (*ExtentClient) OpenStreamWithCache ¶ added in v1.34.0
func (client *ExtentClient) OpenStreamWithCache(inode uint64, needBCache bool) error
Open request shall grab the lock until request is sent to the request channel
func (*ExtentClient) ReadExtent ¶ added in v1.34.0
func (*ExtentClient) RefreshExtentsCache ¶
func (client *ExtentClient) RefreshExtentsCache(inode uint64) error
RefreshExtentsCache refreshes the extent cache.
func (*ExtentClient) SetClientID ¶ added in v1.34.0
func (client *ExtentClient) SetClientID(id uint64) (err error)
func (*ExtentClient) SetFileSize ¶
func (client *ExtentClient) SetFileSize(inode uint64, size int, sync bool)
SetFileSize set the file size.
func (*ExtentClient) SetReadRate ¶ added in v1.4.0
func (client *ExtentClient) SetReadRate(val int) string
func (*ExtentClient) SetWriteRate ¶ added in v1.4.0
func (client *ExtentClient) SetWriteRate(val int) string
func (*ExtentClient) Truncate ¶
func (client *ExtentClient) Truncate(mw *meta.MetaWrapper, parentIno uint64, inode uint64, size int, fullPath string) error
func (*ExtentClient) UidIsLimited ¶ added in v1.34.0
func (client *ExtentClient) UidIsLimited(uid uint32) bool
func (*ExtentClient) UpdateDataPartitionForColdVolume ¶ added in v1.34.0
func (client *ExtentClient) UpdateDataPartitionForColdVolume() error
func (*ExtentClient) UpdateFlowInfo ¶ added in v1.34.0
func (client *ExtentClient) UpdateFlowInfo(limit *proto.LimitRsp2Client)
func (*ExtentClient) UpdateLatestVer ¶ added in v1.34.0
func (client *ExtentClient) UpdateLatestVer(verList *proto.VolVersionInfoList) (err error)
func (*ExtentClient) UploadFlowInfo ¶ added in v1.34.0
func (client *ExtentClient) UploadFlowInfo(clientInfo wrapper.SimpleClientInfo) (bWork bool, err error)
type ExtentConfig ¶ added in v1.34.0
type ExtentConfig struct {
Volume string
VolumeType int
Masters []string
FollowerRead bool
NearRead bool
Preload bool
ReadRate int64
WriteRate int64
BcacheEnable bool
BcacheDir string
MaxStreamerLimit int64
VerReadSeq uint64
OnAppendExtentKey AppendExtentKeyFunc
OnSplitExtentKey SplitExtentKeyFunc
OnGetExtents GetExtentsFunc
OnTruncate TruncateFunc
OnEvictIcache EvictIcacheFunc
OnLoadBcache LoadBcacheFunc
OnCacheBcache CacheBcacheFunc
OnEvictBcache EvictBacheFunc
DisableMetaCache bool
MinWriteAbleDataPartitionCnt int
StreamRetryTimeout int
}
type ExtentHandler ¶
ExtentHandler defines the struct of the extent handler.
func NewExtentHandler ¶
func NewExtentHandler(stream *Streamer, offset int, storeMode int, size int) *ExtentHandler
NewExtentHandler returns a new extent handler.
func (*ExtentHandler) String ¶
func (eh *ExtentHandler) String() string
String returns the string format of the extent handler.
type ExtentReader ¶
type ExtentReader struct {
// contains filtered or unexported fields
}
ExtentReader defines the struct of the extent reader.
func NewExtentReader ¶
func NewExtentReader(inode uint64, key *proto.ExtentKey, dp *wrapper.DataPartition, followerRead bool, retryRead bool) *ExtentReader
NewExtentReader returns a new extent reader.
func (*ExtentReader) Read ¶
func (reader *ExtentReader) Read(req *ExtentRequest) (readBytes int, err error)
Read reads the extent request.
func (*ExtentReader) String ¶
func (reader *ExtentReader) String() (m string)
String returns the string format of the extent reader.
type ExtentRequest ¶
ExtentRequest defines the struct for the request of read or write an extent.
func NewExtentRequest ¶
func NewExtentRequest(offset, size int, data []byte, ek *proto.ExtentKey) *ExtentRequest
NewExtentRequest returns a new extent request.
func (*ExtentRequest) String ¶
func (er *ExtentRequest) String() string
String returns the string format of the extent request.
type FlushRequest ¶
type FlushRequest struct {
// contains filtered or unexported fields
}
FlushRequest defines a flush request.
type GetExtentsFunc ¶
type LoadBcacheFunc ¶ added in v1.34.0
type MultiVerMgr ¶ added in v1.34.0
type OpenRequest ¶
type OpenRequest struct {
// contains filtered or unexported fields
}
OpenRequest defines an open request.
type Packet ¶
Packet defines a wrapper of the packet in proto.
func NewCreateExtentPacket ¶
func NewCreateExtentPacket(dp *wrapper.DataPartition, inode uint64) *Packet
NewCreateExtentPacket returns a new packet to create extent.
func NewOverwriteByAppendPacket ¶ added in v1.34.0
func NewOverwriteByAppendPacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, fileOffset int, direct bool, op uint8, ) *Packet
NewOverwritePacket returns a new overwrite packet.
func NewOverwritePacket ¶
func NewOverwritePacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, fileOffset int) *Packet
NewOverwritePacket returns a new overwrite packet.
func NewReadPacket ¶
func NewReadPacket(key *proto.ExtentKey, extentOffset, size int, inode uint64, fileOffset int, followerRead bool) *Packet
NewReadPacket returns a new read packet.
func NewWritePacket ¶
NewWritePacket returns a new write packet.
func NewWriteTinyDirectly ¶ added in v1.34.0
type ReleaseRequest ¶
type ReleaseRequest struct {
// contains filtered or unexported fields
}
ReleaseRequest defines a release request.
type SplitExtentKeyFunc ¶ added in v1.34.0
type StreamConn ¶
type StreamConn struct {
// contains filtered or unexported fields
}
StreamConn defines the struct of the stream connection.
func NewStreamConn ¶
func NewStreamConn(dp *wrapper.DataPartition, follower bool, timeout time.Duration) (sc *StreamConn)
NewStreamConn returns a new stream connection.
func NewStreamConnByHost ¶ added in v1.34.0
func NewStreamConnByHost(host string) *StreamConn
func (*StreamConn) Send ¶
func (sc *StreamConn) Send(retry *bool, req *Packet, getReply GetReplyFunc) (err error)
Send send the given packet over the network through the stream connection until success or the maximum number of retries is reached.
func (*StreamConn) String ¶
func (sc *StreamConn) String() string
String returns the string format of the stream connection.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
One inode corresponds to one streamer. All the requests to the same inode will be queued. TODO rename streamer here is not a good name as it also handles overwrites, not just stream write.
func NewStreamer ¶
func NewStreamer(client *ExtentClient, inode uint64) *Streamer
NewStreamer returns a new streamer.
func (*Streamer) GetExtentReader ¶
func (s *Streamer) GetExtentReader(ek *proto.ExtentKey) (*ExtentReader, error)
GetExtentReader returns the extent reader. TODO: use memory pool
func (*Streamer) GetExtents ¶
TODO should we call it RefreshExtents instead?
func (*Streamer) GetExtentsForce ¶ added in v1.34.0
func (*Streamer) GetExtentsForceRefresh ¶ added in v1.34.0
func (*Streamer) GetStoreMod ¶ added in v1.34.0
func (*Streamer) IssueEvictRequest ¶
func (*Streamer) IssueFlushRequest ¶
func (*Streamer) IssueOpenRequest ¶
Open request shall grab the lock until request is sent to the request channel
func (*Streamer) IssueReleaseRequest ¶
func (*Streamer) IssueTruncRequest ¶
func (*Streamer) IssueWriteRequest ¶
func (*Streamer) SetParentInode ¶ added in v1.34.0
type TruncRequest ¶
type TruncRequest struct {
// contains filtered or unexported fields
}
TruncRequest defines a truncate request.
type TruncateFunc ¶
type VerUpdateRequest ¶ added in v1.34.0
type VerUpdateRequest struct {
// contains filtered or unexported fields
}
VerUpdateRequest defines an verseq update request.
type WriteRequest ¶
type WriteRequest struct {
// contains filtered or unexported fields
}
WriteRequest defines a write request.