Documentation
      ¶
    
    
  
    
  
    Index ¶
- Constants
 - Variables
 - func EnsureStatementProfiler(ctx context.Context, from context.Context) context.Context
 - func Get[T any](fs FileService, name string) (res T, err error)
 - func HandleRemoteRead(ctx context.Context, fs FileService, req *pb.Request, resp *pb.CacheResponse) error
 - func JoinPath(serviceName string, path string) string
 - func NewFileWithChecksumOSFile(ctx context.Context, underlying *os.File, blockContentSize int, ...) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])
 - func NewLRUCache(capacity int64, checkOverlaps bool, callbacks *CacheCallbacks) *lrucache.LRU[CacheKey, CacheData]
 - func NewStatementProfiler(ctx context.Context) (newCtx context.Context, end func(fileSuffixFunc func() string))
 - func OnDiskCacheEvict(ctx context.Context, fn OnDiskCacheEvictFunc) (ret context.Context)
 - func OnDiskCacheWritten(ctx context.Context, fn OnDiskCacheWrittenFunc) (ret context.Context)
 - func StatementProfileNewSpan(ctx context.Context) (_ context.Context, end func())
 - type AliyunSDK
 - func (a *AliyunSDK) Delete(ctx context.Context, keys ...string) (err error)
 - func (a *AliyunSDK) Exists(ctx context.Context, key string) (bool, error)
 - func (a *AliyunSDK) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
 - func (a *AliyunSDK) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
 - func (a *AliyunSDK) Stat(ctx context.Context, key string) (size int64, err error)
 - func (a *AliyunSDK) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
 
- type AwsSDKv1
 - func (a *AwsSDKv1) Delete(ctx context.Context, keys ...string) (err error)
 - func (a *AwsSDKv1) Exists(ctx context.Context, key string) (bool, error)
 - func (a *AwsSDKv1) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
 - func (a *AwsSDKv1) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
 - func (a *AwsSDKv1) Stat(ctx context.Context, key string) (size int64, err error)
 - func (a *AwsSDKv1) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
 
- type AwsSDKv2
 - func (a *AwsSDKv2) Delete(ctx context.Context, keys ...string) (err error)
 - func (a *AwsSDKv2) Exists(ctx context.Context, key string) (bool, error)
 - func (a *AwsSDKv2) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
 - func (a *AwsSDKv2) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
 - func (a *AwsSDKv2) Stat(ctx context.Context, key string) (size int64, err error)
 - func (a *AwsSDKv2) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
 
- type Bytes
 - type CacheCallbackFunc
 - type CacheCallbacks
 - type CacheConfig
 - type CacheData
 - type CacheDataAllocator
 - type CacheKey
 - type CachingFileService
 - type Config
 - type DataCache
 - type DirEntry
 - type DiskCache
 - func (d *DiskCache) DeletePaths(ctx context.Context, paths []string) error
 - func (d *DiskCache) Flush()
 - func (d *DiskCache) Read(ctx context.Context, vector *IOVector) (err error)
 - func (d *DiskCache) SetFile(ctx context.Context, path string, ...) error
 - func (d *DiskCache) Update(ctx context.Context, vector *IOVector, async bool) (err error)
 
- type DiskCacheCallbacks
 - type ETLFileService
 - type FileCache
 - type FileLike
 - type FileService
 - type FileServices
 - func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error
 - func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error)
 - func (f *FileServices) Name() string
 - func (f *FileServices) PrefetchFile(ctx context.Context, filePath string) error
 - func (f *FileServices) Read(ctx context.Context, vector *IOVector) error
 - func (f *FileServices) ReadCache(ctx context.Context, vector *IOVector) error
 - func (f *FileServices) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
 - func (f *FileServices) Write(ctx context.Context, vector IOVector) error
 
- type FileWithChecksum
 - func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)
 - func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)
 - func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)
 - func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error)
 - func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error)
 
- type IOEntry
 - type IOLockKey
 - type IOLocks
 - type IOVector
 - type IOVectorCache
 - type KeyRouter
 - type KeyRouterFactory
 - type LocalETLFS
 - func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error
 - func (l *LocalETLFS) ETLCompatible()
 - func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)
 - func (l *LocalETLFS) Name() string
 - func (l *LocalETLFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)
 - func (l *LocalETLFS) PrefetchFile(ctx context.Context, filePath string) error
 - func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error
 - func (l *LocalETLFS) ReadCache(ctx context.Context, vector *IOVector) error
 - func (l *LocalETLFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
 - func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error
 
- type LocalETLFSMutator
 - type LocalFS
 - func (l *LocalFS) Delete(ctx context.Context, filePaths ...string) error
 - func (l *LocalFS) FlushCache()
 - func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)
 - func (l *LocalFS) Name() string
 - func (l *LocalFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)
 - func (l *LocalFS) PrefetchFile(ctx context.Context, filePath string) error
 - func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error)
 - func (l *LocalFS) ReadCache(ctx context.Context, vector *IOVector) (err error)
 - func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error
 - func (l *LocalFS) SetAsyncUpdate(b bool)
 - func (l *LocalFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
 - func (l *LocalFS) Write(ctx context.Context, vector IOVector) error
 
- type LocalFSMutator
 - type MemCache
 - type MemoryFS
 - func (m *MemoryFS) Delete(ctx context.Context, filePaths ...string) error
 - func (m *MemoryFS) ETLCompatible()
 - func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)
 - func (m *MemoryFS) Name() string
 - func (m *MemoryFS) PrefetchFile(ctx context.Context, filePath string) error
 - func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) (err error)
 - func (m *MemoryFS) ReadCache(ctx context.Context, vector *IOVector) (err error)
 - func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error
 - func (m *MemoryFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
 - func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error
 
- type MinioSDK
 - func (a *MinioSDK) Delete(ctx context.Context, keys ...string) (err error)
 - func (a *MinioSDK) Exists(ctx context.Context, key string) (bool, error)
 - func (a *MinioSDK) List(ctx context.Context, prefix string, fn func(bool, string, int64) (bool, error)) error
 - func (a *MinioSDK) Read(ctx context.Context, key string, min *int64, max *int64) (r io.ReadCloser, err error)
 - func (a *MinioSDK) Stat(ctx context.Context, key string) (size int64, err error)
 - func (a *MinioSDK) Write(ctx context.Context, key string, r io.Reader, size int64, expire *time.Time) (err error)
 
- type MutableFileService
 - type Mutator
 - type NewFileServicesFunc
 - type ObjectStorage
 - type ObjectStorageArguments
 - type OnDiskCacheEvictFunc
 - type OnDiskCacheWrittenFunc
 - type Path
 - type Policy
 - type Pool
 - type ProfileHandler
 - type ProfileInfo
 - type ProfileSpan
 - type PutBack
 - type RCBytes
 - type RCPool
 - type RCPoolItem
 - type RemoteCache
 - type ReplaceableFileService
 - type S3FS
 - func (s *S3FS) Delete(ctx context.Context, filePaths ...string) error
 - func (*S3FS) ETLCompatible()
 - func (s *S3FS) FlushCache()
 - func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)
 - func (s *S3FS) Name() string
 - func (s *S3FS) PrefetchFile(ctx context.Context, filePath string) error
 - func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error)
 - func (s *S3FS) ReadCache(ctx context.Context, vector *IOVector) (err error)
 - func (s *S3FS) SetAsyncUpdate(b bool)
 - func (s *S3FS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)
 - func (s *S3FS) Write(ctx context.Context, vector IOVector) error
 
- type SpanProfiler
 - type TargetCacheKeys
 
Constants ¶
const ( SkipMemoryCacheReads = 1 << iota SkipMemoryCacheWrites SkipDiskCacheReads SkipDiskCacheWrites SkipFullFilePreloads )
const ( SkipCacheReads = SkipMemoryCacheReads | SkipDiskCacheReads SkipCacheWrites = SkipMemoryCacheWrites | SkipDiskCacheWrites SkipDiskCache = SkipDiskCacheReads | SkipDiskCacheWrites SkipMemoryCache = SkipMemoryCacheReads | SkipMemoryCacheWrites SkipAllCache = SkipDiskCache | SkipMemoryCache )
const DisableCacheCapacity = 1
    const ServiceNameSeparator = ":"
    Variables ¶
var CtxKeyDiskCacheCallbacks ctxKeyDiskCacheCallbacks
    var CtxKeyStatementProfiler ctxKeyStatementProfiler
    var DefaultCacheDataAllocator = new(bytesAllocator)
    var DefaultCacheDataAllocator = RCBytesPool
var DisabledCacheConfig = CacheConfig{ MemoryCapacity: ptrTo[toml.ByteSize](DisableCacheCapacity), DiskCapacity: ptrTo[toml.ByteSize](DisableCacheCapacity), }
var ErrNotCacheFile = errorStr("not a cache file")
    var FSProfileHandler = NewProfileHandler()
    var PerStatementProfileDir = os.Getenv("PER_STMT_PROFILE_DIR")
    var PerStatementProfileThreshold = func() time.Duration { str := os.Getenv("PER_STMT_PROFILE_THRESHOLD_MSEC") if str == "" { return time.Millisecond * 500 } n, err := strconv.Atoi(str) if err != nil { panic(err) } return time.Millisecond * time.Duration(n) }()
var RCBytesPool = func() *rcBytesPool { ret := &rcBytesPool{} for size := rcBytesPoolMinCap; size <= rcBytesPoolMaxCap; size *= 2 { size := size ret.sizes = append(ret.sizes, size) ret.pools = append(ret.pools, NewRCPool(func() []byte { return make([]byte, size) })) } return ret }()
RCBytesPool is the global RCBytes pool
Functions ¶
func EnsureStatementProfiler ¶ added in v1.0.0
EnsureStatementProfiler ensure a statement profiler is set in context, if not, copy one from another context
func HandleRemoteRead ¶ added in v1.0.0
func HandleRemoteRead( ctx context.Context, fs FileService, req *pb.Request, resp *pb.CacheResponse, ) error
func NewFileWithChecksumOSFile ¶ added in v0.8.0
func NewFileWithChecksumOSFile( ctx context.Context, underlying *os.File, blockContentSize int, perfCounterSets []*perfcounter.CounterSet, ) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])
func NewLRUCache ¶ added in v1.0.0
func NewStatementProfiler ¶ added in v1.0.0
func OnDiskCacheEvict ¶ added in v1.0.0
func OnDiskCacheEvict(ctx context.Context, fn OnDiskCacheEvictFunc) (ret context.Context)
func OnDiskCacheWritten ¶ added in v1.0.0
func OnDiskCacheWritten(ctx context.Context, fn OnDiskCacheWrittenFunc) (ret context.Context)
Types ¶
type AliyunSDK ¶ added in v1.0.0
type AliyunSDK struct {
	// contains filtered or unexported fields
}
    func NewAliyunSDK ¶ added in v1.0.0
func NewAliyunSDK( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (_ *AliyunSDK, err error)
type AwsSDKv1 ¶ added in v1.0.0
type AwsSDKv1 struct {
	// contains filtered or unexported fields
}
    func NewAwsSDKv1 ¶ added in v1.0.0
func NewAwsSDKv1( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (*AwsSDKv1, error)
type AwsSDKv2 ¶ added in v1.0.0
type AwsSDKv2 struct {
	// contains filtered or unexported fields
}
    func NewAwsSDKv2 ¶ added in v1.0.0
func NewAwsSDKv2( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (*AwsSDKv2, error)
type CacheCallbackFunc ¶ added in v1.0.0
type CacheCallbacks ¶ added in v1.0.0
type CacheCallbacks struct {
	PostGet   []CacheCallbackFunc
	PostSet   []CacheCallbackFunc
	PostEvict []CacheCallbackFunc
}
    type CacheConfig ¶ added in v0.6.0
type CacheConfig struct {
	MemoryCapacity       *toml.ByteSize `toml:"memory-capacity" user_setting:"advanced"`
	DiskPath             *string        `toml:"disk-path"`
	DiskCapacity         *toml.ByteSize `toml:"disk-capacity"`
	DiskMinEvictInterval *toml.Duration `toml:"disk-min-evict-interval"`
	DiskEvictTarget      *float64       `toml:"disk-evict-target"`
	RemoteCacheEnabled   bool           `toml:"remote-cache-enabled"`
	RPC                  morpc.Config   `toml:"rpc"`
	CacheClient      client.CacheClient `json:"-"`
	KeyRouterFactory KeyRouterFactory   `json:"-"`
	KeyRouter        KeyRouter          `json:"-"`
	InitKeyRouter    *sync.Once         `json:"-"`
	CacheCallbacks   `json:"-"`
	// contains filtered or unexported fields
}
    func (*CacheConfig) SetRemoteCacheCallback ¶ added in v1.0.0
func (c *CacheConfig) SetRemoteCacheCallback()
type CacheData ¶ added in v1.0.0
func CacheOriginalData ¶ added in v1.0.0
type CacheDataAllocator ¶ added in v1.0.0
type CachingFileService ¶ added in v0.6.0
type CachingFileService interface {
	FileService
	// FlushCache flushes cache
	FlushCache()
	// SetAsyncUpdate sets cache update operation to async mode
	SetAsyncUpdate(bool)
}
    CachingFileService is an extension to the FileService
type Config ¶ added in v0.6.0
type Config struct {
	// Name name of fileservice, describe what an instance of fileservice is used for
	Name string `toml:"name"`
	// Backend fileservice backend. [MEM|DISK|DISK-ETL|S3|MINIO]
	Backend string `toml:"backend"`
	// S3 used to create fileservice using s3 as the backend
	S3 ObjectStorageArguments `toml:"s3"`
	// Cache specifies configs for cache
	Cache CacheConfig `toml:"cache"`
	// DataDir used to create fileservice using DISK as the backend
	DataDir string `toml:"data-dir"`
	// FixMissing inidicates the file service to try its best to fix missing files
	FixMissing bool `toml:"fix-missing"`
}
    Config fileService config
type DataCache ¶ added in v1.0.0
type DataCache interface {
	Set(ctx context.Context, key CacheKey, value CacheData)
	Get(ctx context.Context, key CacheKey) (value CacheData, ok bool)
	//TODO file contents may change, so we still need this s.
	DeletePaths(ctx context.Context, paths []string)
	Flush()
	Capacity() int64
	Used() int64
	Available() int64
}
    DataCache caches IOEntry.CachedData
type DiskCache ¶ added in v0.7.0
type DiskCache struct {
	// contains filtered or unexported fields
}
    func NewDiskCache ¶ added in v0.7.0
func (*DiskCache) DeletePaths ¶ added in v1.1.0
type DiskCacheCallbacks ¶ added in v1.0.0
type DiskCacheCallbacks struct {
	OnWritten []OnDiskCacheWrittenFunc
	OnEvict   []OnDiskCacheEvictFunc
}
    type ETLFileService ¶ added in v0.6.0
type ETLFileService interface {
	FileService
	// ETLCompatible marks the implementation to be compatible to ETL operations
	// implementations must save file contents as-is
	ETLCompatible()
}
    ETLFileService is an extension to the FileService
func GetForETL ¶ added in v0.6.0
func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileService, readPath string, err error)
GetForETL get or creates a FileService instance for ETL operations if service part of path is empty, a LocalETLFS will be created if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-no-key,<endpoint>,<region>,<bucket>,<prefix> minio,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>
key value pairs can be in any order
type FileService ¶
type FileService interface {
	// Name is file service's name
	// service name is case-insensitive
	Name() string
	// Write writes a new file
	// returns ErrFileExisted if file already existed
	// returns ErrSizeNotMatch if provided size does not match data
	// entries in vector should be written atomically. if write failed, following reads must not succeed.
	Write(ctx context.Context, vector IOVector) error
	// Read reads a file to fill IOEntries
	// returns ErrFileNotFound if requested file not found
	// returns ErrUnexpectedEOF if less data is read than requested size
	// returns ErrEmptyRange if no data at specified offset and size
	// returns ErrEmptyVector if no IOEntry is passed
	Read(ctx context.Context, vector *IOVector) error
	// ReadCache reads cached data if any
	// if cache hit, IOEntry.CachedData will be set
	ReadCache(ctx context.Context, vector *IOVector) error
	// List lists sub-entries in a dir
	List(ctx context.Context, dirPath string) ([]DirEntry, error)
	// Delete deletes multi file
	// returns ErrFileNotFound if requested file not found
	Delete(ctx context.Context, filePaths ...string) error
	// Stat returns infomations about a file
	// returns ErrFileNotFound if requested file not found
	StatFile(ctx context.Context, filePath string) (*DirEntry, error)
	// PrefetchFile prefetches a file
	PrefetchFile(ctx context.Context, filePath string) error
}
    FileService is a write-once file system
func GetForBackup ¶ added in v1.0.0
func GetForBackup(ctx context.Context, spec string) (res FileService, err error)
GetForBackup creates a FileService instance for backup operations if service part of path is empty, a LocalFS will be created if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>,is-minio=<is-minio>
func NewFileService ¶ added in v0.6.0
func NewFileService( ctx context.Context, cfg Config, perfCounterSets []*perfcounter.CounterSet, ) (FileService, error)
NewFileService create file service from config
func SubPath ¶ added in v0.8.0
func SubPath(upstream FileService, path string) FileService
SubPath returns a FileService instance that operates at specified sub path of the upstream instance
type FileServices ¶ added in v0.6.0
type FileServices struct {
	// contains filtered or unexported fields
}
    func NewFileServices ¶ added in v0.6.0
func NewFileServices(defaultName string, fss ...FileService) (*FileServices, error)
func (*FileServices) Delete ¶ added in v0.6.0
func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error
func (*FileServices) Name ¶ added in v0.6.0
func (f *FileServices) Name() string
func (*FileServices) PrefetchFile ¶ added in v1.0.1
func (f *FileServices) PrefetchFile(ctx context.Context, filePath string) error
func (*FileServices) Read ¶ added in v0.6.0
func (f *FileServices) Read(ctx context.Context, vector *IOVector) error
func (*FileServices) ReadCache ¶ added in v1.0.0
func (f *FileServices) ReadCache(ctx context.Context, vector *IOVector) error
type FileWithChecksum ¶ added in v0.6.0
type FileWithChecksum[T FileLike] struct { // contains filtered or unexported fields }
FileWithChecksum maps file contents to blocks with checksum
func NewFileWithChecksum ¶ added in v0.6.0
func NewFileWithChecksum[T FileLike]( ctx context.Context, underlying T, blockContentSize int, perfCounterSets []*perfcounter.CounterSet, ) *FileWithChecksum[T]
func (*FileWithChecksum[T]) Read ¶ added in v0.6.0
func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)
func (*FileWithChecksum[T]) ReadAt ¶ added in v0.6.0
func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)
func (*FileWithChecksum[T]) Seek ¶ added in v0.6.0
func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)
type IOEntry ¶
type IOEntry struct {
	// offset in file
	// when writing or mutating, offset can be arbitrary value, gaps between provided data are zero-filled
	// when reading, valid offsets are in range [0, len(file) - 1]
	Offset int64
	// number of bytes to read or write, [1, len(file)]
	// when reading, pass -1 to read to the end of file
	Size int64
	// raw content
	// when reading, if len(Data) < Size, a new Size-lengthed byte slice will be allocated
	Data []byte
	// when reading, if Writer is not nil, write data to it instead of setting Data field
	WriterForRead io.Writer
	// when reading, if ReadCloser is not nil, set an io.ReadCloser instead of setting Data field
	ReadCloserForRead *io.ReadCloser
	// when writing, if Reader is not nil, read data from it instead of reading Data field
	// number of bytes to be read is specified by Size field
	// if number of bytes is unknown, set Size field to -1
	ReaderForWrite io.Reader
	// When reading, if the ToCacheData field is not nil, the returning object's byte slice will be set to this field
	// Data, WriterForRead, ReadCloserForRead may be empty if CachedData is not null
	// if ToCacheData is provided, caller should always read CachedData instead of Data, WriterForRead or ReadCloserForRead
	CachedData CacheData
	// ToCacheData constructs an object byte slice from entry contents
	// reader or data must not be retained after returns
	// reader always contains entry contents
	// data may contains entry contents if available
	// if data is empty, the io.Reader must be fully read before returning nil error
	ToCacheData func(reader io.Reader, data []byte, allocator CacheDataAllocator) (cacheData CacheData, err error)
	// contains filtered or unexported fields
}
    type IOVector ¶
type IOVector struct {
	// FilePath indicates where to find the file
	// a path has two parts, service name and file name, separated by ':'
	// service name is optional, if omitted, the receiver FileService will use the default name of the service
	// file name parts are separated by '/'
	// valid characters in file name: 0-9 a-z A-Z / ! - _ . * ' ( )
	// and all printable non-ASCII characters
	// example:
	// s3:a/b/c S3:a/b/c represents the same file 'a/b/c' located in 'S3' service
	FilePath string
	// io entries
	// empty Entries is not allowed
	// when writing, overlapping Entries is not allowed
	Entries []IOEntry
	// ExpireAt specifies the expire time of the file
	// implementations may or may not delete the file after this time
	// zero value means no expire
	ExpireAt time.Time
	// Policy controls policy for the vector
	Policy Policy
	// Caches indicates extra caches to operate on
	Caches []IOVectorCache
}
    type IOVectorCache ¶ added in v0.8.0
type IOVectorCache interface {
	Read(
		ctx context.Context,
		vector *IOVector,
	) error
	Update(
		ctx context.Context,
		vector *IOVector,
		async bool,
	) error
	Flush()
	//TODO file contents may change, so we still need this s.
	DeletePaths(
		ctx context.Context,
		paths []string,
	) error
}
    VectorCache caches IOVector
type KeyRouter ¶ added in v1.0.0
type KeyRouter interface {
	// Target returns the remote cache server service address of
	// the cache key. If the cache do not exist in any node, it
	// returns empty string.
	Target(k CacheKey) string
	// AddItem pushes a cache key item into a queue with a local
	// cache server service address in the item. Gossip will take
	// all the items and send them to other nodes in gossip cluster.
	AddItem(key CacheKey, operation gpb.Operation)
}
    KeyRouter is an interface manages the remote cache information.
type KeyRouterFactory ¶ added in v1.0.0
type KeyRouterFactory func() KeyRouter
type LocalETLFS ¶ added in v0.6.0
LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations
func NewLocalETLFS ¶ added in v0.6.0
func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error)
func (*LocalETLFS) Delete ¶ added in v0.6.0
func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error
func (*LocalETLFS) ETLCompatible ¶ added in v0.6.0
func (l *LocalETLFS) ETLCompatible()
func (*LocalETLFS) Name ¶ added in v0.6.0
func (l *LocalETLFS) Name() string
func (*LocalETLFS) NewMutator ¶ added in v0.6.0
func (*LocalETLFS) PrefetchFile ¶ added in v1.0.1
func (l *LocalETLFS) PrefetchFile(ctx context.Context, filePath string) error
func (*LocalETLFS) Read ¶ added in v0.6.0
func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error
func (*LocalETLFS) ReadCache ¶ added in v1.0.0
func (l *LocalETLFS) ReadCache(ctx context.Context, vector *IOVector) error
type LocalETLFSMutator ¶ added in v0.6.0
type LocalETLFSMutator struct {
	// contains filtered or unexported fields
}
    func (*LocalETLFSMutator) Append ¶ added in v0.6.0
func (l *LocalETLFSMutator) Append(ctx context.Context, entries ...IOEntry) error
func (*LocalETLFSMutator) Close ¶ added in v0.6.0
func (l *LocalETLFSMutator) Close() error
type LocalFS ¶
LocalFS is a FileService implementation backed by local file system
func NewLocalFS ¶
func NewLocalFS( ctx context.Context, name string, rootPath string, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, ) (*LocalFS, error)
func (*LocalFS) FlushCache ¶ added in v0.6.0
func (l *LocalFS) FlushCache()
func (*LocalFS) NewMutator ¶ added in v0.6.0
func (*LocalFS) PrefetchFile ¶ added in v1.0.1
func (*LocalFS) SetAsyncUpdate ¶ added in v0.8.0
type LocalFSMutator ¶ added in v0.6.0
type LocalFSMutator struct {
	// contains filtered or unexported fields
}
    func (*LocalFSMutator) Append ¶ added in v0.6.0
func (l *LocalFSMutator) Append(ctx context.Context, entries ...IOEntry) error
func (*LocalFSMutator) Close ¶ added in v0.6.0
func (l *LocalFSMutator) Close() error
type MemCache ¶ added in v0.6.0
type MemCache struct {
	// contains filtered or unexported fields
}
    func NewMemCache ¶ added in v0.6.0
func NewMemCache( dataCache DataCache, counterSets []*perfcounter.CounterSet, ) *MemCache
func (*MemCache) DeletePaths ¶ added in v1.1.0
type MemoryFS ¶
MemoryFS is an in-memory FileService implementation
func NewMemoryFS ¶
func NewMemoryFS( name string, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, ) (*MemoryFS, error)
func (*MemoryFS) ETLCompatible ¶ added in v0.6.0
func (m *MemoryFS) ETLCompatible()
func (*MemoryFS) PrefetchFile ¶ added in v1.0.1
type MinioSDK ¶ added in v1.0.0
type MinioSDK struct {
	// contains filtered or unexported fields
}
    func NewMinioSDK ¶ added in v1.0.0
func NewMinioSDK( ctx context.Context, args ObjectStorageArguments, perfCounterSets []*perfcounter.CounterSet, ) (*MinioSDK, error)
type MutableFileService ¶
type MutableFileService interface {
	FileService
	// NewMutator creates a new mutator
	NewMutator(ctx context.Context, filePath string) (Mutator, error)
}
    MutableFileService is an extension interface to FileService that allow mutation
type Mutator ¶ added in v0.6.0
type Mutator interface {
	// Mutate mutates file contents
	Mutate(ctx context.Context, entries ...IOEntry) error
	// Append appends data to file
	// all IOEntry.Offset is base on the end of file position
	// for example, passing IOEntry{Offset: 0, Len:1, Data: []byte("a")} will append "a" to the end of file
	Append(ctx context.Context, entries ...IOEntry) error
	// Close closes the mutator
	// Must be called after finishing mutation
	Close() error
}
    type NewFileServicesFunc ¶ added in v0.6.0
type NewFileServicesFunc = func(defaultName string) (*FileServices, error)
NewFileServicesFunc creates a new *FileServices
type ObjectStorage ¶ added in v1.0.0
type ObjectStorage interface {
	// List lists objects with specified prefix
	List(
		ctx context.Context,
		prefix string,
		fn func(isPrefix bool, key string, size int64) (bool, error),
	) (
		err error,
	)
	// Stat returns informations about an object
	Stat(
		ctx context.Context,
		key string,
	) (
		size int64,
		err error,
	)
	// Exists reports whether specified object exists
	Exists(
		ctx context.Context,
		key string,
	) (
		bool,
		error,
	)
	// Write writes an object
	Write(
		ctx context.Context,
		key string,
		r io.Reader,
		size int64,
		expire *time.Time,
	) (
		err error,
	)
	// Read returns an io.Reader for specified object range
	Read(
		ctx context.Context,
		key string,
		min *int64,
		max *int64,
	) (
		r io.ReadCloser,
		err error,
	)
	// Delete deletes objects
	Delete(
		ctx context.Context,
		keys ...string,
	) (
		err error,
	)
}
    type ObjectStorageArguments ¶ added in v1.0.0
type ObjectStorageArguments struct {
	// misc
	Name                string `toml:"name"`
	KeyPrefix           string `toml:"key-prefix"`
	// s3
	Bucket    string   `toml:"bucket"`
	Endpoint  string   `toml:"endpoint"`
	IsMinio   bool     `toml:"is-minio"`
	Region    string   `toml:"region"`
	CertFiles []string `toml:"cert-files"`
	// credentials
	RoleARN         string `json:"-" toml:"role-arn"`
	BearerToken     string `json:"-" toml:"bearer-token"`
	ExternalID      string `json:"-" toml:"external-id"`
	KeyID           string `json:"-" toml:"key-id"`
	KeySecret       string `json:"-" toml:"key-secret"`
	RAMRole         string `json:"-" toml:"ram-role"`
	RoleSessionName string `json:"-" toml:"role-session-name"`
	SecurityToken   string `json:"-" toml:"security-token"`
	SessionToken    string `json:"-" toml:"session-token"`
}
    func (*ObjectStorageArguments) SetFromString ¶ added in v1.0.0
func (o *ObjectStorageArguments) SetFromString(arguments []string) error
func (ObjectStorageArguments) String ¶ added in v1.0.1
func (o ObjectStorageArguments) String() string
type OnDiskCacheEvictFunc ¶ added in v1.0.0
type OnDiskCacheEvictFunc = func( diskFilePath string, )
type OnDiskCacheWrittenFunc ¶ added in v1.0.0
type Path ¶ added in v0.6.0
func ParsePathAtService ¶ added in v0.6.0
func (Path) ServiceString ¶ added in v0.8.0
type Policy ¶ added in v1.0.0
type Policy uint64
func (Policy) CacheFullFile ¶ added in v1.0.0
func (Policy) CacheIOEntry ¶ added in v1.0.0
type Pool ¶ added in v0.8.0
type Pool[T any] struct { // contains filtered or unexported fields }
type ProfileHandler ¶ added in v0.8.0
type ProfileHandler struct {
	// contains filtered or unexported fields
}
    func NewProfileHandler ¶ added in v0.8.0
func NewProfileHandler() *ProfileHandler
func (*ProfileHandler) AddSample ¶ added in v0.8.0
func (p *ProfileHandler) AddSample(duration time.Duration, tags ...string)
func (*ProfileHandler) ServeHTTP ¶ added in v0.8.0
func (p *ProfileHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
func (*ProfileHandler) StartProfile ¶ added in v0.8.0
func (p *ProfileHandler) StartProfile() ( write func(w io.Writer), stop func(), )
type ProfileInfo ¶ added in v1.0.0
func NewProfileInfo ¶ added in v1.0.0
func NewProfileInfo() *ProfileInfo
type ProfileSpan ¶ added in v1.0.0
type PutBack ¶ added in v0.8.0
type PutBack[T any] struct { // contains filtered or unexported fields }
type RCBytes ¶ added in v1.0.0
type RCBytes struct {
	*RCPoolItem[[]byte]
}
    RCBytes represents a reference counting []byte from a pool newly created RCBytes' ref count is 1 owner should call Release to give it back to the pool new sharing owner should call Retain to increase ref count
type RCPool ¶ added in v1.0.0
type RCPool[T any] struct { // contains filtered or unexported fields }
RCPool represents a pool of reference counting objects
func (*RCPool[T]) Get ¶ added in v1.0.0
func (r *RCPool[T]) Get() *RCPoolItem[T]
type RCPoolItem ¶ added in v1.0.0
type RCPoolItem[T any] struct { Value T // contains filtered or unexported fields }
func (*RCPoolItem[T]) Release ¶ added in v1.0.0
func (r *RCPoolItem[T]) Release()
func (*RCPoolItem[T]) Retain ¶ added in v1.0.0
func (r *RCPoolItem[T]) Retain()
type RemoteCache ¶ added in v1.0.0
type RemoteCache struct {
	// contains filtered or unexported fields
}
    RemoteCache is the cache for remote read.
func NewRemoteCache ¶ added in v1.0.0
func NewRemoteCache(client client.CacheClient, factory KeyRouterFactory) *RemoteCache
func (*RemoteCache) DeletePaths ¶ added in v1.1.0
func (r *RemoteCache) DeletePaths(ctx context.Context, paths []string) error
func (*RemoteCache) Flush ¶ added in v1.0.0
func (r *RemoteCache) Flush()
type ReplaceableFileService ¶ added in v0.6.0
type ReplaceableFileService interface {
	FileService
	Replace(ctx context.Context, vector IOVector) error
}
    ReplaceableFileService is an extension interface to FileService that allow replacing a whole file
type S3FS ¶
type S3FS struct {
	// contains filtered or unexported fields
}
    S3FS is a FileService implementation backed by S3
func NewS3FS ¶
func NewS3FS( ctx context.Context, args ObjectStorageArguments, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, noCache bool, ) (*S3FS, error)
func NewS3FSOnMinio ¶ added in v0.6.0
func NewS3FSOnMinio( ctx context.Context, args ObjectStorageArguments, cacheConfig CacheConfig, perfCounterSets []*perfcounter.CounterSet, noCache bool, ) (*S3FS, error)
NewS3FSOnMinio creates S3FS on minio server this is needed because the URL scheme of minio server does not compatible with AWS'
func (*S3FS) ETLCompatible ¶ added in v0.6.0
func (*S3FS) ETLCompatible()
func (*S3FS) FlushCache ¶ added in v0.6.0
func (s *S3FS) FlushCache()
func (*S3FS) PrefetchFile ¶ added in v1.0.1
func (*S3FS) SetAsyncUpdate ¶ added in v0.8.0
type SpanProfiler ¶ added in v1.0.0
type SpanProfiler struct {
	// contains filtered or unexported fields
}
    SpanProfiler prpfiles a span in one goroutine multiple SpanProfilers may share one *profile.Profile
func NewSpanProfiler ¶ added in v1.0.0
func NewSpanProfiler() *SpanProfiler
NewSpanProfiler creates a new span profiler
func (*SpanProfiler) Begin ¶ added in v1.0.0
func (s *SpanProfiler) Begin(skip int) (profiler *SpanProfiler, end func())
Begin begins a new span If the calling goroutine does not match s.goID, a new profiler for the calling goroutine will be created The newly created profiler will share the same profile to s.profile
type TargetCacheKeys ¶ added in v1.0.0
type TargetCacheKeys map[string][]*pb.RequestCacheKey
      
      Source Files
      ¶
    
- aliyun_sdk.go
 - aws_sdk_v1.go
 - aws_sdk_v2.go
 - bytes.go
 - cache.go
 - caching_file_service.go
 - config.go
 - disk_cache.go
 - disk_cache_callbacks.go
 - error.go
 - etl_file_service.go
 - file_cache.go
 - file_like.go
 - file_service.go
 - file_services.go
 - file_with_checksum.go
 - get.go
 - io.go
 - io_entry.go
 - io_entry_reader.go
 - io_lock.go
 - io_vector.go
 - local_etl_fs.go
 - local_fs.go
 - mem_cache.go
 - memory_fs.go
 - minio_sdk.go
 - mutable_file_service.go
 - object_storage.go
 - object_storage_arguments.go
 - path.go
 - policy.go
 - pool.go
 - profile.go
 - profile_info.go
 - profile_span.go
 - profile_statement.go
 - rc_bytes.go
 - rc_pool.go
 - remote_cache.go
 - replaceable_file_service.go
 - retry.go
 - retryable_reader.go
 - s3_fs.go
 - s3_fs_restore.go
 - sub_path.go
 - utils.go