stream

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2024 License: Apache-2.0 Imports: 28 Imported by: 12

Documentation

Index

Constants

View Source
const (
	MaxMountRetryLimit = 6
	MountRetryInterval = time.Second * 5
)
View Source
const (
	ExtentStatusOpen int32 = iota
	ExtentStatusClosed
	ExtentStatusRecovery
	ExtentStatusError
)

State machines

View Source
const (
	StreamSendMaxRetry      = 200
	StreamSendSleepInterval = 100 * time.Millisecond
	StreamSendMaxTimeout    = 10 * time.Minute
	RetryFactor             = 12 / 10
)
View Source
const (
	MaxSelectDataPartitionForWrite = 32
	MaxNewHandlerRetry             = 3
	MaxPacketErrorCount            = 128
	MaxDirtyListLen                = 0
)
View Source
const (
	StreamerNormal int32 = iota
	StreamerError
	LastEKVersionNotEqual
)

Variables

View Source
var (
	TryOtherAddrError = errors.New("TryOtherAddrError")
	DpDiscardError    = errors.New("DpDiscardError")
	LimitedIoError    = errors.New("LimitedIoError")
)
View Source
var StreamConnPool = util.NewConnectPool()

Functions

func GetExtentHandlerID

func GetExtentHandlerID() uint64

GetExtentHandlerID returns the extent handler ID.

Types

type AppendExtentKeyFunc

type AppendExtentKeyFunc func(parentInode, inode uint64, key proto.ExtentKey, discard []proto.ExtentKey) (int, error)

type CacheBcacheFunc added in v1.34.0

type CacheBcacheFunc func(key string, buf []byte) error

type DirtyExtentList

type DirtyExtentList struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

DirtyExtentList defines the struct of the dirty extent list.

func NewDirtyExtentList

func NewDirtyExtentList() *DirtyExtentList

NewDirtyExtentList returns a new DirtyExtentList instance.

func (*DirtyExtentList) Get

func (dl *DirtyExtentList) Get() *list.Element

Get gets the next element in the dirty extent list.

func (*DirtyExtentList) Len

func (dl *DirtyExtentList) Len() int

Len returns the size of the dirty extent list.

func (*DirtyExtentList) Put

func (dl *DirtyExtentList) Put(eh *ExtentHandler)

Put puts a new extent handler into the dirty extent list.

func (*DirtyExtentList) Remove

func (dl *DirtyExtentList) Remove(e *list.Element)

Remove removes the element from the dirty extent list.

type EvictBacheFunc added in v1.34.0

type EvictBacheFunc func(key string) error

type EvictIcacheFunc added in v1.34.0

type EvictIcacheFunc func(inode uint64)

type EvictRequest

type EvictRequest struct {
	// contains filtered or unexported fields
}

EvictRequest defines an evict request.

type ExtentCache

type ExtentCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ExtentCache defines the struct of the extent cache.

func NewExtentCache

func NewExtentCache(inode uint64) *ExtentCache

NewExtentCache returns a new extent cache.

func (*ExtentCache) Append

func (cache *ExtentCache) Append(ek *proto.ExtentKey, sync bool) (discardExtents []proto.ExtentKey)

Append appends an extent key.

func (*ExtentCache) Get

func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey)

Get returns the extent key based on the given offset.

func (*ExtentCache) GetEndForAppendWrite added in v1.34.0

func (cache *ExtentCache) GetEndForAppendWrite(offset uint64, verSeq uint64, needCheck bool) (ret *proto.ExtentKey)

GetEndForAppendWrite returns the extent key whose end offset equals the given offset.

func (*ExtentCache) List

func (cache *ExtentCache) List() []*proto.ExtentKey

List returns a list of the extents in the cache.

func (*ExtentCache) LogOutPut added in v1.34.0

func (cache *ExtentCache) LogOutPut()

func (*ExtentCache) Max

func (cache *ExtentCache) Max() *proto.ExtentKey

Max returns the max extent key in the cache.

func (*ExtentCache) PrepareReadRequests

func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest

PrepareReadRequests classifies the incoming request.

func (*ExtentCache) PrepareWriteRequests

func (cache *ExtentCache) PrepareWriteRequests(offset, size int, data []byte) []*ExtentRequest

PrepareWriteRequests TODO explain

func (*ExtentCache) Refresh

func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error

Refresh refreshes the extent cache.

func (*ExtentCache) RefreshForce added in v1.34.0

func (cache *ExtentCache) RefreshForce(inode uint64, force bool, getExtents GetExtentsFunc) error

func (*ExtentCache) RemoveDiscard added in v1.34.0

func (cache *ExtentCache) RemoveDiscard(discardExtents []proto.ExtentKey)

func (*ExtentCache) SetSize

func (cache *ExtentCache) SetSize(size uint64, sync bool)

SetSize set the size of the cache.

func (*ExtentCache) Size

func (cache *ExtentCache) Size() (size int, gen uint64)

Size returns the size of the cache.

func (*ExtentCache) SplitExtentKey added in v1.34.0

func (cache *ExtentCache) SplitExtentKey(inodeID uint64, ekPivot *proto.ExtentKey) (err error)

Split extent key.

func (*ExtentCache) TruncDiscard added in v1.34.0

func (cache *ExtentCache) TruncDiscard(size uint64)

type ExtentClient

type ExtentClient struct {
	BcacheHealth bool

	LimitManager *manager.LimitManager
	// contains filtered or unexported fields
}

ExtentClient defines the struct of the extent client.

func NewExtentClient

func NewExtentClient(config *ExtentConfig) (client *ExtentClient, err error)

NewExtentClient returns a new extent client.

func (*ExtentClient) AllocatePreLoadDataPartition added in v1.34.0

func (client *ExtentClient) AllocatePreLoadDataPartition(volName string, count int, capacity, ttl uint64, zones string) (err error)

func (*ExtentClient) CheckDataPartitionExsit added in v1.34.0

func (client *ExtentClient) CheckDataPartitionExsit(partitionID uint64) error

func (*ExtentClient) Close added in v1.5.0

func (client *ExtentClient) Close() error

func (*ExtentClient) CloseStream

func (client *ExtentClient) CloseStream(inode uint64) error

Release request shall grab the lock until request is sent to the request channel

func (*ExtentClient) EvictStream

func (client *ExtentClient) EvictStream(inode uint64) error

Evict request shall grab the lock until request is sent to the request channel

func (*ExtentClient) FileSize

func (client *ExtentClient) FileSize(inode uint64) (size int, gen uint64, valid bool)

FileSize returns the file size.

func (*ExtentClient) Flush

func (client *ExtentClient) Flush(inode uint64) error

func (*ExtentClient) ForceRefreshExtentsCache added in v1.34.0

func (client *ExtentClient) ForceRefreshExtentsCache(inode uint64) error

func (*ExtentClient) GetDataPartitionForWrite added in v1.34.0

func (client *ExtentClient) GetDataPartitionForWrite() error

func (*ExtentClient) GetEnablePosixAcl added in v1.34.0

func (client *ExtentClient) GetEnablePosixAcl() bool

func (*ExtentClient) GetExtentCacheGen added in v1.34.0

func (client *ExtentClient) GetExtentCacheGen(inode uint64) uint64

GetExtentCacheGen return extent generation

func (*ExtentClient) GetExtents added in v1.34.0

func (client *ExtentClient) GetExtents(inode uint64) []*proto.ExtentKey

func (*ExtentClient) GetFlowInfo added in v1.34.0

func (client *ExtentClient) GetFlowInfo() (*proto.ClientReportLimitInfo, bool)

func (*ExtentClient) GetLatestVer added in v1.34.0

func (client *ExtentClient) GetLatestVer() uint64

func (*ExtentClient) GetRate added in v1.4.0

func (client *ExtentClient) GetRate() string

func (*ExtentClient) GetReadVer added in v1.34.0

func (client *ExtentClient) GetReadVer() uint64

func (*ExtentClient) GetStreamer

func (client *ExtentClient) GetStreamer(inode uint64) *Streamer

GetStreamer returns the streamer.

func (*ExtentClient) GetVerMgr added in v1.34.0

func (client *ExtentClient) GetVerMgr() *proto.VolVersionInfoList

func (*ExtentClient) GetVolumeName added in v1.34.0

func (client *ExtentClient) GetVolumeName() string

func (*ExtentClient) IsPreloadMode added in v1.34.0

func (client *ExtentClient) IsPreloadMode() bool

func (*ExtentClient) OpenStream

func (client *ExtentClient) OpenStream(inode uint64) error

Open request shall grab the lock until request is sent to the request channel

func (*ExtentClient) OpenStreamWithCache added in v1.34.0

func (client *ExtentClient) OpenStreamWithCache(inode uint64, needBCache bool) error

Open request shall grab the lock until request is sent to the request channel

func (*ExtentClient) Read

func (client *ExtentClient) Read(inode uint64, data []byte, offset int, size int) (read int, err error)

func (*ExtentClient) ReadExtent added in v1.34.0

func (client *ExtentClient) ReadExtent(inode uint64, ek *proto.ExtentKey, data []byte, offset int, size int) (read int, err error, isStream bool)

func (*ExtentClient) RefreshExtentsCache

func (client *ExtentClient) RefreshExtentsCache(inode uint64) error

RefreshExtentsCache refreshes the extent cache.

func (*ExtentClient) SetClientID added in v1.34.0

func (client *ExtentClient) SetClientID(id uint64) (err error)

func (*ExtentClient) SetFileSize

func (client *ExtentClient) SetFileSize(inode uint64, size int, sync bool)

SetFileSize set the file size.

func (*ExtentClient) SetReadRate added in v1.4.0

func (client *ExtentClient) SetReadRate(val int) string

func (*ExtentClient) SetWriteRate added in v1.4.0

func (client *ExtentClient) SetWriteRate(val int) string

func (*ExtentClient) Truncate

func (client *ExtentClient) Truncate(mw *meta.MetaWrapper, parentIno uint64, inode uint64, size int, fullPath string) error

func (*ExtentClient) UidIsLimited added in v1.34.0

func (client *ExtentClient) UidIsLimited(uid uint32) bool

func (*ExtentClient) UpdateDataPartitionForColdVolume added in v1.34.0

func (client *ExtentClient) UpdateDataPartitionForColdVolume() error

func (*ExtentClient) UpdateFlowInfo added in v1.34.0

func (client *ExtentClient) UpdateFlowInfo(limit *proto.LimitRsp2Client)

func (*ExtentClient) UpdateLatestVer added in v1.34.0

func (client *ExtentClient) UpdateLatestVer(verList *proto.VolVersionInfoList) (err error)

func (*ExtentClient) UploadFlowInfo added in v1.34.0

func (client *ExtentClient) UploadFlowInfo(clientInfo wrapper.SimpleClientInfo) (bWork bool, err error)

func (*ExtentClient) Write

func (client *ExtentClient) Write(inode uint64, offset int, data []byte, flags int, checkFunc func() error) (write int, err error)

Write writes the data.

type ExtentConfig added in v1.34.0

type ExtentConfig struct {
	Volume            string
	VolumeType        int
	Masters           []string
	FollowerRead      bool
	NearRead          bool
	Preload           bool
	ReadRate          int64
	WriteRate         int64
	BcacheEnable      bool
	BcacheDir         string
	MaxStreamerLimit  int64
	VerReadSeq        uint64
	OnAppendExtentKey AppendExtentKeyFunc
	OnSplitExtentKey  SplitExtentKeyFunc
	OnGetExtents      GetExtentsFunc
	OnTruncate        TruncateFunc
	OnEvictIcache     EvictIcacheFunc
	OnLoadBcache      LoadBcacheFunc
	OnCacheBcache     CacheBcacheFunc
	OnEvictBcache     EvictBacheFunc

	DisableMetaCache             bool
	MinWriteAbleDataPartitionCnt int
	StreamRetryTimeout           int
}

type ExtentHandler

type ExtentHandler struct {
	sync.Once
	// contains filtered or unexported fields
}

ExtentHandler defines the struct of the extent handler.

func NewExtentHandler

func NewExtentHandler(stream *Streamer, offset int, storeMode int, size int) *ExtentHandler

NewExtentHandler returns a new extent handler.

func (*ExtentHandler) String

func (eh *ExtentHandler) String() string

String returns the string format of the extent handler.

type ExtentReader

type ExtentReader struct {
	// contains filtered or unexported fields
}

ExtentReader defines the struct of the extent reader.

func NewExtentReader

func NewExtentReader(inode uint64, key *proto.ExtentKey, dp *wrapper.DataPartition, followerRead bool, retryRead bool) *ExtentReader

NewExtentReader returns a new extent reader.

func (*ExtentReader) Read

func (reader *ExtentReader) Read(req *ExtentRequest) (readBytes int, err error)

Read reads the extent request.

func (*ExtentReader) String

func (reader *ExtentReader) String() (m string)

String returns the string format of the extent reader.

type ExtentRequest

type ExtentRequest struct {
	FileOffset int
	Size       int
	Data       []byte
	ExtentKey  *proto.ExtentKey
}

ExtentRequest defines the struct for the request of read or write an extent.

func NewExtentRequest

func NewExtentRequest(offset, size int, data []byte, ek *proto.ExtentKey) *ExtentRequest

NewExtentRequest returns a new extent request.

func (*ExtentRequest) String

func (er *ExtentRequest) String() string

String returns the string format of the extent request.

type FlushRequest

type FlushRequest struct {
	// contains filtered or unexported fields
}

FlushRequest defines a flush request.

type GetExtentsFunc

type GetExtentsFunc func(inode uint64) (uint64, uint64, []proto.ExtentKey, error)

type GetReplyFunc

type GetReplyFunc func(conn *net.TCPConn) (err error, again bool)

type LoadBcacheFunc added in v1.34.0

type LoadBcacheFunc func(key string, buf []byte, offset uint64, size uint32) (int, error)

type MultiVerMgr added in v1.34.0

type MultiVerMgr struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type OpenRequest

type OpenRequest struct {
	// contains filtered or unexported fields
}

OpenRequest defines an open request.

type Packet

type Packet struct {
	proto.Packet
	// contains filtered or unexported fields
}

Packet defines a wrapper of the packet in proto.

func NewCreateExtentPacket

func NewCreateExtentPacket(dp *wrapper.DataPartition, inode uint64) *Packet

NewCreateExtentPacket returns a new packet to create extent.

func NewOverwriteByAppendPacket added in v1.34.0

func NewOverwriteByAppendPacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int,
	inode uint64, fileOffset int, direct bool, op uint8,
) *Packet

NewOverwritePacket returns a new overwrite packet.

func NewOverwritePacket

func NewOverwritePacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, fileOffset int) *Packet

NewOverwritePacket returns a new overwrite packet.

func NewReadPacket

func NewReadPacket(key *proto.ExtentKey, extentOffset, size int, inode uint64, fileOffset int, followerRead bool) *Packet

NewReadPacket returns a new read packet.

func NewReply

func NewReply(reqID int64, partitionID uint64, extentID uint64) *Packet

NewReply returns a new reply packet. TODO rename to NewReplyPacket?

func NewWritePacket

func NewWritePacket(inode uint64, fileOffset, storeMode int) *Packet

NewWritePacket returns a new write packet.

func NewWriteTinyDirectly added in v1.34.0

func NewWriteTinyDirectly(inode uint64, dpID uint64, offset int, dp *wrapper.DataPartition) *Packet

func (*Packet) String

func (p *Packet) String() string

String returns the string format of the packet.

type ReleaseRequest

type ReleaseRequest struct {
	// contains filtered or unexported fields
}

ReleaseRequest defines a release request.

type SplitExtentKeyFunc added in v1.34.0

type SplitExtentKeyFunc func(parentInode, inode uint64, key proto.ExtentKey) error

type StreamConn

type StreamConn struct {
	// contains filtered or unexported fields
}

StreamConn defines the struct of the stream connection.

func NewStreamConn

func NewStreamConn(dp *wrapper.DataPartition, follower bool, timeout time.Duration) (sc *StreamConn)

NewStreamConn returns a new stream connection.

func NewStreamConnByHost added in v1.34.0

func NewStreamConnByHost(host string) *StreamConn

func (*StreamConn) Send

func (sc *StreamConn) Send(retry *bool, req *Packet, getReply GetReplyFunc) (err error)

Send send the given packet over the network through the stream connection until success or the maximum number of retries is reached.

func (*StreamConn) String

func (sc *StreamConn) String() string

String returns the string format of the stream connection.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

One inode corresponds to one streamer. All the requests to the same inode will be queued. TODO rename streamer here is not a good name as it also handles overwrites, not just stream write.

func NewStreamer

func NewStreamer(client *ExtentClient, inode uint64) *Streamer

NewStreamer returns a new streamer.

func (*Streamer) GetExtentReader

func (s *Streamer) GetExtentReader(ek *proto.ExtentKey) (*ExtentReader, error)

GetExtentReader returns the extent reader. TODO: use memory pool

func (*Streamer) GetExtents

func (s *Streamer) GetExtents() error

TODO should we call it RefreshExtents instead?

func (*Streamer) GetExtentsForce added in v1.34.0

func (s *Streamer) GetExtentsForce() error

func (*Streamer) GetExtentsForceRefresh added in v1.34.0

func (s *Streamer) GetExtentsForceRefresh() error

func (*Streamer) GetStoreMod added in v1.34.0

func (s *Streamer) GetStoreMod(offset int, size int) (storeMode int)

func (*Streamer) IssueEvictRequest

func (s *Streamer) IssueEvictRequest() error

func (*Streamer) IssueFlushRequest

func (s *Streamer) IssueFlushRequest() error

func (*Streamer) IssueOpenRequest

func (s *Streamer) IssueOpenRequest() error

Open request shall grab the lock until request is sent to the request channel

func (*Streamer) IssueReleaseRequest

func (s *Streamer) IssueReleaseRequest() error

func (*Streamer) IssueTruncRequest

func (s *Streamer) IssueTruncRequest(size int, fullPath string) error

func (*Streamer) IssueWriteRequest

func (s *Streamer) IssueWriteRequest(offset int, data []byte, flags int, checkFunc func() error) (write int, err error)

func (*Streamer) SetParentInode added in v1.34.0

func (s *Streamer) SetParentInode(inode uint64)

func (*Streamer) String

func (s *Streamer) String() string

String returns the string format of the streamer.

type TruncRequest

type TruncRequest struct {
	// contains filtered or unexported fields
}

TruncRequest defines a truncate request.

type TruncateFunc

type TruncateFunc func(inode, size uint64, fullPath string) error

type VerUpdateRequest added in v1.34.0

type VerUpdateRequest struct {
	// contains filtered or unexported fields
}

VerUpdateRequest defines an verseq update request.

type WriteRequest

type WriteRequest struct {
	// contains filtered or unexported fields
}

WriteRequest defines a write request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL