Documentation
¶
Index ¶
- Constants
- Variables
- func DialWithTimeout(ctx context.Context, addr string) (net.Conn, error)
- func GenerateFsID(name string) string
- func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
- func GetLogger() *logger
- func GetPrivateIpAddr() (string, error)
- func GetPublicIpAddr() (string, error)
- func InitLogger(debugMode bool, prettyLogs bool)
- func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error)
- func SHA1StringToUint64(hash string) (uint64, error)
- func ToSlice(v interface{}) []interface{}
- func ToStruct(m map[string]string, out interface{}) error
- func WithClientName(name string) func(*redis.UniversalOptions)
- type BlobCacheClient
- func (c *BlobCacheClient) Cleanup() error
- func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64, opts struct{ ... }) ([]byte, error)
- func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64, opts struct{ ... }) (chan []byte, error)
- func (c *BlobCacheClient) GetNearbyHosts() ([]*BlobCacheHost, error)
- func (c *BlobCacheClient) GetState() error
- func (c *BlobCacheClient) HostsAvailable() bool
- func (c *BlobCacheClient) IsCachedNearby(hash string, routingKey string) (bool, error)
- func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, path string) bool
- func (c *BlobCacheClient) StoreContent(chunks chan []byte, hash string, opts struct{ ... }) (string, error)
- func (c *BlobCacheClient) StoreContentFromFUSE(source struct{ ... }, opts struct{ ... }) (string, error)
- func (c *BlobCacheClient) StoreContentFromS3(source struct{ ... }, opts struct{ ... }) (string, error)
- func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error
- type BlobCacheClientConfig
- type BlobCacheConfig
- type BlobCacheGlobalConfig
- type BlobCacheHost
- type BlobCacheMetadata
- func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, locality string, ...) ([]*BlobCacheHost, error)
- func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (m *BlobCacheMetadata) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error
- func (m *BlobCacheMetadata) RemoveFsNode(ctx context.Context, id string) error
- func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error
- func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *BlobCacheMetadata) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type BlobCacheMetadataMode
- type BlobCacheMetricsConfig
- type BlobCacheServerConfig
- type BlobCacheServerMode
- type BlobFs
- type BlobFsConfig
- type BlobFsMetadata
- type BlobFsNode
- type BlobFsSystemOpts
- type BlobcacheMetrics
- type BufferPool
- type CacheService
- func (cs *CacheService) AddFsNodeChild(ctx context.Context, req *proto.AddFsNodeChildRequest) (*proto.AddFsNodeChildResponse, error)
- func (cs *CacheService) AddHostToIndex(ctx context.Context, req *proto.AddHostToIndexRequest) (*proto.AddHostToIndexResponse, error)
- func (cs *CacheService) GetAvailableHosts(ctx context.Context, req *proto.GetAvailableHostsRequest) (*proto.GetAvailableHostsResponse, error)
- func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
- func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error
- func (cs *CacheService) GetFsNode(ctx context.Context, req *proto.GetFsNodeRequest) (*proto.GetFsNodeResponse, error)
- func (cs *CacheService) GetFsNodeChildren(ctx context.Context, req *proto.GetFsNodeChildrenRequest) (*proto.GetFsNodeChildrenResponse, error)
- func (cs *CacheService) GetRegionConfig(ctx context.Context, req *proto.GetRegionConfigRequest) (*proto.GetRegionConfigResponse, error)
- func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
- func (cs *CacheService) HasContent(ctx context.Context, req *proto.HasContentRequest) (*proto.HasContentResponse, error)
- func (cs *CacheService) HostKeepAlive()
- func (cs *CacheService) RefreshStoreFromContentLock(ctx context.Context, req *proto.RefreshStoreFromContentLockRequest) (*proto.RefreshStoreFromContentLockResponse, error)
- func (cs *CacheService) RemoveClientLock(ctx context.Context, req *proto.RemoveClientLockRequest) (*proto.RemoveClientLockResponse, error)
- func (cs *CacheService) RemoveFsNode(ctx context.Context, req *proto.RemoveFsNodeRequest) (*proto.RemoveFsNodeResponse, error)
- func (cs *CacheService) RemoveFsNodeChild(ctx context.Context, req *proto.RemoveFsNodeChildRequest) (*proto.RemoveFsNodeChildResponse, error)
- func (cs *CacheService) RemoveStoreFromContentLock(ctx context.Context, req *proto.RemoveStoreFromContentLockRequest) (*proto.RemoveStoreFromContentLockResponse, error)
- func (cs *CacheService) SetClientLock(ctx context.Context, req *proto.SetClientLockRequest) (*proto.SetClientLockResponse, error)
- func (cs *CacheService) SetFsNode(ctx context.Context, req *proto.SetFsNodeRequest) (*proto.SetFsNodeResponse, error)
- func (cs *CacheService) SetHostKeepAlive(ctx context.Context, req *proto.SetHostKeepAliveRequest) (*proto.SetHostKeepAliveResponse, error)
- func (cs *CacheService) SetStoreFromContentLock(ctx context.Context, req *proto.SetStoreFromContentLockRequest) (*proto.SetStoreFromContentLockResponse, error)
- func (cs *CacheService) StartServer(port uint) error
- func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
- func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error)
- func (cs *CacheService) StoreContentFromSourceWithLock(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceWithLockResponse, error)
- func (cs *CacheService) StoreContentInBlobFs(ctx context.Context, path string, hash string, size uint64) error
- type CacheServiceOpts
- type ClientOptions
- type ClientRequest
- type ClientRequestType
- type ConfigFormat
- type ConfigLoaderFunc
- type ConfigManager
- type ContentAddressableStorage
- func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, content []byte) error
- func (cas *ContentAddressableStorage) Cleanup()
- func (cas *ContentAddressableStorage) Exists(hash string) bool
- func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) (int64, error)
- func (cas *ContentAddressableStorage) GetDiskCacheMetrics() (int64, int64, float64, error)
- type CoordinatorClient
- type CoordinatorClientLocal
- func (c *CoordinatorClientLocal) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientLocal) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientLocal) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (c *CoordinatorClientLocal) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (c *CoordinatorClientLocal) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (c *CoordinatorClientLocal) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
- func (c *CoordinatorClientLocal) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientLocal) RemoveClientLock(ctx context.Context, hash string, host string) error
- func (c *CoordinatorClientLocal) RemoveFsNode(ctx context.Context, id string) error
- func (c *CoordinatorClientLocal) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientLocal) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientLocal) SetClientLock(ctx context.Context, hash string, host string) error
- func (c *CoordinatorClientLocal) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (c *CoordinatorClientLocal) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientLocal) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type CoordinatorClientRemote
- func (c *CoordinatorClientRemote) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientRemote) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientRemote) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (c *CoordinatorClientRemote) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (c *CoordinatorClientRemote) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (c *CoordinatorClientRemote) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
- func (c *CoordinatorClientRemote) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientRemote) RemoveClientLock(ctx context.Context, hash string, hostId string) error
- func (c *CoordinatorClientRemote) RemoveFsNode(ctx context.Context, id string) error
- func (c *CoordinatorClientRemote) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (c *CoordinatorClientRemote) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (c *CoordinatorClientRemote) SetClientLock(ctx context.Context, hash string, hostId string) error
- func (c *CoordinatorClientRemote) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (c *CoordinatorClientRemote) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (c *CoordinatorClientRemote) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type DiscoveryClient
- type ErrNodeNotFound
- type FSNode
- func (n *FSNode) Create(ctx context.Context, name string, flags uint32, mode uint32, ...) (inode *fs.Inode, fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)
- func (n *FSNode) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno
- func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)
- func (n *FSNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*fs.Inode, syscall.Errno)
- func (n *FSNode) OnAdd(ctx context.Context)
- func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFlags uint32, errno syscall.Errno)
- func (n *FSNode) Opendir(ctx context.Context) syscall.Errno
- func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno)
- func (n *FSNode) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno)
- func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno)
- func (n *FSNode) Rename(ctx context.Context, oldName string, newParent fs.InodeEmbedder, ...) syscall.Errno
- func (n *FSNode) Rmdir(ctx context.Context, name string) syscall.Errno
- func (n *FSNode) Unlink(ctx context.Context, name string) syscall.Errno
- type FileSystem
- type FileSystemOpts
- type FileSystemStorage
- type HostMap
- func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
- func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)
- func (hm *HostMap) Get(hostId string) *BlobCacheHost
- func (hm *HostMap) GetAll() []*BlobCacheHost
- func (hm *HostMap) Members() mapset.Set[string]
- func (hm *HostMap) Remove(host *BlobCacheHost)
- func (hm *HostMap) Set(host *BlobCacheHost)
- type JuiceFSConfig
- type JuiceFsSource
- type MetadataConfig
- type MockCoordinator
- func (m *MockCoordinator) AddFsNodeChild(ctx context.Context, pid, id string) error
- func (m *MockCoordinator) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *MockCoordinator) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
- func (m *MockCoordinator) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
- func (m *MockCoordinator) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
- func (m *MockCoordinator) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
- func (m *MockCoordinator) RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (m *MockCoordinator) RemoveClientLock(ctx context.Context, hash string, host string) error
- func (m *MockCoordinator) RemoveFsNode(ctx context.Context, id string) error
- func (m *MockCoordinator) RemoveFsNodeChild(ctx context.Context, pid, id string) error
- func (m *MockCoordinator) RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- func (m *MockCoordinator) SetClientLock(ctx context.Context, hash string, host string) error
- func (m *MockCoordinator) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
- func (m *MockCoordinator) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
- func (m *MockCoordinator) SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
- type MountPointConfig
- type MountPointSource
- type ParserFunc
- type PrefetchState
- type Prefetcher
- type RedisClient
- func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (r *RedisClient) PSubscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error, func())
- func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
- func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
- func (r *RedisClient) ToSlice(v interface{}) []interface{}
- func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error
- type RedisConfig
- type RedisLock
- type RedisLockOption
- type RedisLockOptions
- type RedisMode
- type RegionConfig
- type RendezvousHasher
- type S3Client
- type S3SourceConfig
- type Source
- type SourceConfig
- type StorageLayer
- type ValkeyConfig
- type ValkeyExistingPrimary
Constants ¶
const ( SourceModeJuiceFS string = "juicefs" SourceModeMountPoint string = "mountpoint" )
const ( BlobCacheHostPrefix string = "blobcache-host" BlobCacheVersion string = "dev" )
Variables ¶
var ( BufferSize1MB = 1 * 1024 * 1024 BufferSize4MB = 4 * 1024 * 1024 BufferSize16MB = 16 * 1024 * 1024 )
Standard buffer sizes aligned with typical chunk sizes
var ( ErrHostNotFound = errors.New("host not found") ErrUnableToReachHost = errors.New("unable to reach host") ErrInvalidHostVersion = errors.New("invalid host version") ErrContentNotFound = errors.New("content not found") ErrClientNotFound = errors.New("client not found") ErrCacheLockHeld = errors.New("cache lock held") ErrUnableToPopulateContent = errors.New("unable to populate content from original source") ErrBlobFsMountFailure = errors.New("failed to mount blobfs") ErrUnableToAcquireLock = errors.New("unable to acquire lock") )
var ( ErrChannelClosed = errors.New("redis: channel closed") ErrConnectionIssue = errors.New("redis: connection issue") ErrUnknownRedisMode = errors.New("redis: unknown mode") )
var (
Logger *logger
)
var MetadataKeys = &metadataKeys{}
Functions ¶
func GenerateFsID ¶
Generates a directory ID based on parent ID and name.
func GetConfigParser ¶
func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
func GetPrivateIpAddr ¶
func GetPublicIpAddr ¶
func InitLogger ¶
func SHA1StringToUint64 ¶
SHA1StringToUint64 converts the first 8 bytes of a SHA-1 hash string to a uint64
func ToSlice ¶
func ToSlice(v interface{}) []interface{}
Flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.
func ToStruct ¶
Copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.
func WithClientName ¶
func WithClientName(name string) func(*redis.UniversalOptions)
Types ¶
type BlobCacheClient ¶
type BlobCacheClient struct {
// contains filtered or unexported fields
}
func NewBlobCacheClient ¶
func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheClient, error)
func (*BlobCacheClient) Cleanup ¶
func (c *BlobCacheClient) Cleanup() error
func (*BlobCacheClient) GetContent ¶
func (*BlobCacheClient) GetContentStream ¶
func (*BlobCacheClient) GetNearbyHosts ¶
func (c *BlobCacheClient) GetNearbyHosts() ([]*BlobCacheHost, error)
func (*BlobCacheClient) GetState ¶
func (c *BlobCacheClient) GetState() error
func (*BlobCacheClient) HostsAvailable ¶
func (c *BlobCacheClient) HostsAvailable() bool
func (*BlobCacheClient) IsCachedNearby ¶
func (c *BlobCacheClient) IsCachedNearby(hash string, routingKey string) (bool, error)
func (*BlobCacheClient) IsPathCachedNearby ¶
func (c *BlobCacheClient) IsPathCachedNearby(ctx context.Context, path string) bool
func (*BlobCacheClient) StoreContent ¶
func (*BlobCacheClient) StoreContentFromFUSE ¶
func (*BlobCacheClient) StoreContentFromS3 ¶
func (*BlobCacheClient) WaitForHosts ¶
func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error
type BlobCacheClientConfig ¶
type BlobCacheClientConfig struct {
Token string `key:"token" json:"token"`
MinRetryLengthBytes int64 `key:"minRetryLengthBytes" json:"min_retry_length_bytes"`
MaxGetContentAttempts int `key:"maxGetContentAttempts" json:"max_get_content_attempts"`
NTopHosts int `key:"nTopHosts" json:"n_top_hosts"`
BlobFs BlobFsConfig `key:"blobfs" json:"blobfs"`
}
type BlobCacheConfig ¶
type BlobCacheConfig struct {
Server BlobCacheServerConfig `key:"server" json:"server"`
Client BlobCacheClientConfig `key:"client" json:"client"`
Global BlobCacheGlobalConfig `key:"global" json:"global"`
Metrics BlobCacheMetricsConfig `key:"metrics" json:"metrics"`
}
type BlobCacheGlobalConfig ¶
type BlobCacheGlobalConfig struct {
DefaultLocality string `key:"defaultLocality" json:"default_locality"`
CoordinatorHost string `key:"coordinatorHost" json:"coordinator_host"`
ServerPort uint `key:"serverPort" json:"server_port"`
DiscoveryIntervalS int `key:"discoveryIntervalS" json:"discovery_interval_s"`
RoundTripThresholdMilliseconds uint `key:"rttThresholdMilliseconds" json:"rtt_threshold_ms"`
HostStorageCapacityThresholdPct float64 `key:"hostStorageCapacityThresholdPct" json:"host_storage_capacity_threshold_pct"`
GRPCDialTimeoutS int `key:"grpcDialTimeoutS" json:"grpc_dial_timeout_s"`
GRPCMessageSizeBytes int `key:"grpcMessageSizeBytes" json:"grpc_message_size_bytes"`
GRPCInitialWindowSize int `key:"grpcInitialWindowSize" json:"grpc_initial_window_size"`
GRPCInitialConnWindowSize int `key:"grpcInitialConnWindowSize" json:"grpc_initial_conn_window_size"`
GRPCWriteBufferSize int `key:"grpcWriteBufferSize" json:"grpc_write_buffer_size"`
GRPCReadBufferSize int `key:"grpcReadBufferSize" json:"grpc_read_buffer_size"`
GRPCMaxConcurrentStreams int `key:"grpcMaxConcurrentStreams" json:"grpc_max_concurrent_streams"`
GRPCNumStreamWorkers int `key:"grpcNumStreamWorkers" json:"grpc_num_stream_workers"`
DebugMode bool `key:"debugMode" json:"debug_mode"`
PrettyLogs bool `key:"prettyLogs" json:"pretty_logs"`
}
func (*BlobCacheGlobalConfig) GetLocality ¶
func (c *BlobCacheGlobalConfig) GetLocality() string
type BlobCacheHost ¶
type BlobCacheHost struct {
RTT time.Duration `redis:"rtt" json:"rtt"`
HostId string `redis:"host_id" json:"host_id"`
Addr string `redis:"addr" json:"addr"`
PrivateAddr string `redis:"private_addr" json:"private_addr"`
CapacityUsagePct float64 `redis:"capacity_usage_pct" json:"capacity_usage_pct"`
}
func (*BlobCacheHost) Bytes ¶
func (h *BlobCacheHost) Bytes() []byte
Bytes is needed for the rendezvous hasher
func (*BlobCacheHost) ToProto ¶
func (h *BlobCacheHost) ToProto() *proto.BlobCacheHost
type BlobCacheMetadata ¶
type BlobCacheMetadata struct {
// contains filtered or unexported fields
}
func NewBlobCacheMetadata ¶
func NewBlobCacheMetadata(cfg MetadataConfig) (*BlobCacheMetadata, error)
func (*BlobCacheMetadata) AddFsNodeChild ¶
func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*BlobCacheMetadata) AddHostToIndex ¶
func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*BlobCacheMetadata) GetAvailableHosts ¶
func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context, locality string, removeHostCallback func(host *BlobCacheHost)) ([]*BlobCacheHost, error)
func (*BlobCacheMetadata) GetFsNode ¶
func (m *BlobCacheMetadata) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*BlobCacheMetadata) GetFsNodeChildren ¶
func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*BlobCacheMetadata) GetHostIndex ¶
func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*BlobCacheMetadata) RefreshStoreFromContentLock ¶
func (*BlobCacheMetadata) RemoveClientLock ¶
func (m *BlobCacheMetadata) RemoveClientLock(ctx context.Context, clientId, hash string) error
func (*BlobCacheMetadata) RemoveFsNode ¶
func (m *BlobCacheMetadata) RemoveFsNode(ctx context.Context, id string) error
func (*BlobCacheMetadata) RemoveFsNodeChild ¶
func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*BlobCacheMetadata) RemoveHostFromIndex ¶
func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*BlobCacheMetadata) RemoveStoreFromContentLock ¶
func (*BlobCacheMetadata) SetClientLock ¶
func (m *BlobCacheMetadata) SetClientLock(ctx context.Context, clientId, hash string) error
func (*BlobCacheMetadata) SetFsNode ¶
func (m *BlobCacheMetadata) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*BlobCacheMetadata) SetHostKeepAlive ¶
func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*BlobCacheMetadata) SetStoreFromContentLock ¶
type BlobCacheMetadataMode ¶
type BlobCacheMetadataMode string
const ( BlobCacheMetadataModeDefault BlobCacheMetadataMode = "default" BlobCacheMetadataModeLocal BlobCacheMetadataMode = "local" )
type BlobCacheMetricsConfig ¶
type BlobCacheServerConfig ¶
type BlobCacheServerConfig struct {
Mode BlobCacheServerMode `key:"mode" json:"mode"`
DiskCacheDir string `key:"diskCacheDir" json:"disk_cache_dir"`
DiskCacheMaxUsagePct float64 `key:"diskCacheMaxUsagePct" json:"disk_cache_max_usage_pct"`
EnableMemoryCache bool `key:"enableMemoryCache" json:"enable_memory_cache"`
Token string `key:"token" json:"token"`
PrettyLogs bool `key:"prettyLogs" json:"pretty_logs"`
ObjectTtlS int `key:"objectTtlS" json:"object_ttl_s"`
MaxCachePct int64 `key:"maxCachePct" json:"max_cache_pct"`
PageSizeBytes int64 `key:"pageSizeBytes" json:"page_size_bytes"`
Metadata MetadataConfig `key:"metadata" json:"metadata"`
Sources []SourceConfig `key:"sources" json:"sources"`
S3DownloadConcurrency int64 `key:"s3DownloadConcurrency" json:"s3_download_concurrency"`
S3DownloadChunkSize int64 `key:"s3DownloadChunkSize" json:"s3_download_chunk_size"`
// Allows a coordinator to override a slave server's config for a specific locality/region
Regions map[string]RegionConfig `key:"regions" json:"regions"`
}
func BlobCacheServerConfigFromProto ¶
func BlobCacheServerConfigFromProto(protoConfig *proto.BlobCacheServerConfig) BlobCacheServerConfig
func (*BlobCacheServerConfig) ToProto ¶
func (c *BlobCacheServerConfig) ToProto() *proto.BlobCacheServerConfig
type BlobCacheServerMode ¶
type BlobCacheServerMode string
const ( BlobCacheServerModeCoordinator BlobCacheServerMode = "coordinator" BlobCacheServerModeSlave BlobCacheServerMode = "slave" )
type BlobFs ¶
type BlobFs struct {
CoordinatorClient CoordinatorClient
Client *BlobCacheClient
Config BlobCacheClientConfig
// contains filtered or unexported fields
}
func NewFileSystem ¶
func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error)
NewFileSystem initializes a new BlobFs with root metadata.
type BlobFsConfig ¶
type BlobFsConfig struct {
Enabled bool `key:"enabled" json:"enabled"`
MountPoint string `key:"mountPoint" json:"mount_point"`
MaxBackgroundTasks int `key:"maxBackgroundTasks" json:"max_background_tasks"`
MaxWriteKB int `key:"maxWriteKB" json:"max_write_kb"`
MaxReadAheadKB int `key:"maxReadAheadKB" json:"max_read_ahead_kb"`
DirectMount bool `key:"directMount" json:"direct_mount"`
DirectIO bool `key:"directIO" json:"direct_io"`
Options []string `key:"options" json:"options"`
}
type BlobFsMetadata ¶
type BlobFsMetadata struct {
PID string `redis:"pid" json:"pid"`
ID string `redis:"id" json:"id"`
Name string `redis:"name" json:"name"`
Path string `redis:"path" json:"path"`
Hash string `redis:"hash" json:"hash"`
Ino uint64 `redis:"ino" json:"ino"`
Size uint64 `redis:"size" json:"size"`
Blocks uint64 `redis:"blocks" json:"blocks"`
Atime uint64 `redis:"atime" json:"atime"`
Mtime uint64 `redis:"mtime" json:"mtime"`
Ctime uint64 `redis:"ctime" json:"ctime"`
Atimensec uint32 `redis:"atimensec" json:"atimensec"`
Mtimensec uint32 `redis:"mtimensec" json:"mtimensec"`
Ctimensec uint32 `redis:"ctimensec" json:"ctimensec"`
Mode uint32 `redis:"mode" json:"mode"`
Nlink uint32 `redis:"nlink" json:"nlink"`
Rdev uint32 `redis:"rdev" json:"rdev"`
Blksize uint32 `redis:"blksize" json:"blksize"`
Padding uint32 `redis:"padding" json:"padding"`
Uid uint32 `redis:"uid" json:"uid"`
Gid uint32 `redis:"gid" json:"gid"`
Gen uint64 `redis:"gen" json:"gen"`
}
func (*BlobFsMetadata) ToProto ¶
func (m *BlobFsMetadata) ToProto() *proto.BlobFsMetadata
type BlobFsNode ¶
type BlobFsSystemOpts ¶
type BlobFsSystemOpts struct {
Verbose bool
CoordinatorClient CoordinatorClient
Config BlobCacheClientConfig
Client *BlobCacheClient
}
type BlobcacheMetrics ¶
type BlobcacheMetrics struct {
DiskCacheUsageMB *metrics.Histogram
DiskCacheUsagePct *metrics.Histogram
MemCacheUsageMB *metrics.Histogram
MemCacheUsagePct *metrics.Histogram
// Cache tier hit ratios
L0HitRatio *metrics.Histogram // In-memory cache hits
L1HitRatio *metrics.Histogram // Disk cache hits
L2MissRatio *metrics.Histogram // Remote fetch required
// Operation counters
L0Hits *metrics.Counter
L1Hits *metrics.Counter
L2Misses *metrics.Counter
TotalReads *metrics.Counter
// Bytes served per tier
L0BytesServed *metrics.Counter
L1BytesServed *metrics.Counter
L2BytesFetched *metrics.Counter
// FUSE operation latencies
FUSEReadLatency *metrics.Histogram
FUSELookupLatency *metrics.Histogram
FUSEGetattrLatency *metrics.Histogram
// Read throughput
ReadThroughputMBps *metrics.Histogram
}
type BufferPool ¶
type BufferPool struct {
// contains filtered or unexported fields
}
BufferPool provides a pool of reusable byte slices to reduce allocations Optimized for 1-4MB chunks as recommended in the optimization plan
func NewBufferPool ¶
func NewBufferPool() *BufferPool
NewBufferPool creates a new buffer pool with predefined size buckets
func (*BufferPool) Get ¶
func (bp *BufferPool) Get(size int) []byte
Get retrieves a buffer of at least the requested size
type CacheService ¶
type CacheService struct {
proto.UnimplementedBlobCacheServer
// contains filtered or unexported fields
}
func NewCacheService ¶
func NewCacheService(ctx context.Context, cfg BlobCacheConfig, locality string) (*CacheService, error)
func (*CacheService) AddFsNodeChild ¶
func (cs *CacheService) AddFsNodeChild(ctx context.Context, req *proto.AddFsNodeChildRequest) (*proto.AddFsNodeChildResponse, error)
func (*CacheService) AddHostToIndex ¶
func (cs *CacheService) AddHostToIndex(ctx context.Context, req *proto.AddHostToIndexRequest) (*proto.AddHostToIndexResponse, error)
func (*CacheService) GetAvailableHosts ¶
func (cs *CacheService) GetAvailableHosts(ctx context.Context, req *proto.GetAvailableHostsRequest) (*proto.GetAvailableHostsResponse, error)
func (*CacheService) GetContent ¶
func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error)
func (*CacheService) GetContentStream ¶
func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error
func (*CacheService) GetFsNode ¶
func (cs *CacheService) GetFsNode(ctx context.Context, req *proto.GetFsNodeRequest) (*proto.GetFsNodeResponse, error)
func (*CacheService) GetFsNodeChildren ¶
func (cs *CacheService) GetFsNodeChildren(ctx context.Context, req *proto.GetFsNodeChildrenRequest) (*proto.GetFsNodeChildrenResponse, error)
func (*CacheService) GetRegionConfig ¶
func (cs *CacheService) GetRegionConfig(ctx context.Context, req *proto.GetRegionConfigRequest) (*proto.GetRegionConfigResponse, error)
func (*CacheService) GetState ¶
func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error)
func (*CacheService) HasContent ¶
func (cs *CacheService) HasContent(ctx context.Context, req *proto.HasContentRequest) (*proto.HasContentResponse, error)
func (*CacheService) HostKeepAlive ¶
func (cs *CacheService) HostKeepAlive()
func (*CacheService) RefreshStoreFromContentLock ¶
func (cs *CacheService) RefreshStoreFromContentLock(ctx context.Context, req *proto.RefreshStoreFromContentLockRequest) (*proto.RefreshStoreFromContentLockResponse, error)
func (*CacheService) RemoveClientLock ¶
func (cs *CacheService) RemoveClientLock(ctx context.Context, req *proto.RemoveClientLockRequest) (*proto.RemoveClientLockResponse, error)
func (*CacheService) RemoveFsNode ¶
func (cs *CacheService) RemoveFsNode(ctx context.Context, req *proto.RemoveFsNodeRequest) (*proto.RemoveFsNodeResponse, error)
func (*CacheService) RemoveFsNodeChild ¶
func (cs *CacheService) RemoveFsNodeChild(ctx context.Context, req *proto.RemoveFsNodeChildRequest) (*proto.RemoveFsNodeChildResponse, error)
func (*CacheService) RemoveStoreFromContentLock ¶
func (cs *CacheService) RemoveStoreFromContentLock(ctx context.Context, req *proto.RemoveStoreFromContentLockRequest) (*proto.RemoveStoreFromContentLockResponse, error)
func (*CacheService) SetClientLock ¶
func (cs *CacheService) SetClientLock(ctx context.Context, req *proto.SetClientLockRequest) (*proto.SetClientLockResponse, error)
func (*CacheService) SetFsNode ¶
func (cs *CacheService) SetFsNode(ctx context.Context, req *proto.SetFsNodeRequest) (*proto.SetFsNodeResponse, error)
func (*CacheService) SetHostKeepAlive ¶
func (cs *CacheService) SetHostKeepAlive(ctx context.Context, req *proto.SetHostKeepAliveRequest) (*proto.SetHostKeepAliveResponse, error)
func (*CacheService) SetStoreFromContentLock ¶
func (cs *CacheService) SetStoreFromContentLock(ctx context.Context, req *proto.SetStoreFromContentLockRequest) (*proto.SetStoreFromContentLockResponse, error)
func (*CacheService) StartServer ¶
func (cs *CacheService) StartServer(port uint) error
func (*CacheService) StoreContent ¶
func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) error
func (*CacheService) StoreContentFromSource ¶
func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error)
func (*CacheService) StoreContentFromSourceWithLock ¶
func (cs *CacheService) StoreContentFromSourceWithLock(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceWithLockResponse, error)
func (*CacheService) StoreContentInBlobFs ¶
type CacheServiceOpts ¶
type CacheServiceOpts struct {
Addr string
}
type ClientOptions ¶
type ClientOptions struct {
RoutingKey string
}
type ClientRequest ¶
type ClientRequest struct {
// contains filtered or unexported fields
}
type ClientRequestType ¶
type ClientRequestType int
const ( ClientRequestTypeStorage ClientRequestType = iota ClientRequestTypeRetrieval )
type ConfigFormat ¶
type ConfigFormat string
var ( JSONConfigFormat ConfigFormat = ".json" YAMLConfigFormat ConfigFormat = ".yaml" YMLConfigFormat ConfigFormat = ".yml" )
type ConfigLoaderFunc ¶
ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.
type ConfigManager ¶
type ConfigManager[T any] struct { // contains filtered or unexported fields }
ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.
func NewConfigManager ¶
func NewConfigManager[T any]() (*ConfigManager[T], error)
NewConfigManager creates a new instance of the ConfigManager[T] type for managing configuration of type 'T'. It initializes the ConfigManager with the specified 'T' type, loads a default configuration, and optionally loads a user configuration if the 'CONFIG_PATH' environment variable is provided. If debug mode is enabled, it prints the current configuration.
func (*ConfigManager[T]) GetConfig ¶
func (cm *ConfigManager[T]) GetConfig() T
GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.
func (*ConfigManager[T]) LoadConfig ¶
func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error
LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.
func (*ConfigManager[T]) Print ¶
func (cm *ConfigManager[T]) Print() string
Print returns a string representation of the current configuration state.
type ContentAddressableStorage ¶
type ContentAddressableStorage struct {
// contains filtered or unexported fields
}
func NewContentAddressableStorage ¶
func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, locality string, coordinator CoordinatorClient, config BlobCacheConfig) (*ContentAddressableStorage, error)
func (*ContentAddressableStorage) Cleanup ¶
func (cas *ContentAddressableStorage) Cleanup()
func (*ContentAddressableStorage) Exists ¶
func (cas *ContentAddressableStorage) Exists(hash string) bool
func (*ContentAddressableStorage) GetDiskCacheMetrics ¶
func (cas *ContentAddressableStorage) GetDiskCacheMetrics() (int64, int64, float64, error)
type CoordinatorClient ¶
type CoordinatorClient interface {
AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
SetClientLock(ctx context.Context, hash string, host string) error
RemoveClientLock(ctx context.Context, hash string, host string) error
SetStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
RemoveStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
RefreshStoreFromContentLock(ctx context.Context, locality string, sourcePath string) error
SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
RemoveFsNode(ctx context.Context, id string) error
RemoveFsNodeChild(ctx context.Context, pid, id string) error
GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
AddFsNodeChild(ctx context.Context, pid, id string) error
}
func NewCoordinatorClientLocal ¶
func NewCoordinatorClientLocal(globalConfig BlobCacheGlobalConfig, serverConfig BlobCacheServerConfig) (CoordinatorClient, error)
func NewCoordinatorClientRemote ¶
func NewCoordinatorClientRemote(cfg BlobCacheGlobalConfig, token string) (CoordinatorClient, error)
type CoordinatorClientLocal ¶
type CoordinatorClientLocal struct {
// contains filtered or unexported fields
}
func (*CoordinatorClientLocal) AddFsNodeChild ¶
func (c *CoordinatorClientLocal) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientLocal) AddHostToIndex ¶
func (c *CoordinatorClientLocal) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientLocal) GetAvailableHosts ¶
func (c *CoordinatorClientLocal) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*CoordinatorClientLocal) GetFsNode ¶
func (c *CoordinatorClientLocal) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*CoordinatorClientLocal) GetFsNodeChildren ¶
func (c *CoordinatorClientLocal) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*CoordinatorClientLocal) GetRegionConfig ¶
func (c *CoordinatorClientLocal) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
func (*CoordinatorClientLocal) RefreshStoreFromContentLock ¶
func (*CoordinatorClientLocal) RemoveClientLock ¶
func (*CoordinatorClientLocal) RemoveFsNode ¶
func (c *CoordinatorClientLocal) RemoveFsNode(ctx context.Context, id string) error
func (*CoordinatorClientLocal) RemoveFsNodeChild ¶
func (c *CoordinatorClientLocal) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientLocal) RemoveStoreFromContentLock ¶
func (*CoordinatorClientLocal) SetClientLock ¶
func (*CoordinatorClientLocal) SetFsNode ¶
func (c *CoordinatorClientLocal) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*CoordinatorClientLocal) SetHostKeepAlive ¶
func (c *CoordinatorClientLocal) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientLocal) SetStoreFromContentLock ¶
type CoordinatorClientRemote ¶
type CoordinatorClientRemote struct {
// contains filtered or unexported fields
}
func (*CoordinatorClientRemote) AddFsNodeChild ¶
func (c *CoordinatorClientRemote) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientRemote) AddHostToIndex ¶
func (c *CoordinatorClientRemote) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientRemote) GetAvailableHosts ¶
func (c *CoordinatorClientRemote) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*CoordinatorClientRemote) GetFsNode ¶
func (c *CoordinatorClientRemote) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*CoordinatorClientRemote) GetFsNodeChildren ¶
func (c *CoordinatorClientRemote) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*CoordinatorClientRemote) GetRegionConfig ¶
func (c *CoordinatorClientRemote) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
func (*CoordinatorClientRemote) RefreshStoreFromContentLock ¶
func (*CoordinatorClientRemote) RemoveClientLock ¶
func (*CoordinatorClientRemote) RemoveFsNode ¶
func (c *CoordinatorClientRemote) RemoveFsNode(ctx context.Context, id string) error
func (*CoordinatorClientRemote) RemoveFsNodeChild ¶
func (c *CoordinatorClientRemote) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*CoordinatorClientRemote) RemoveStoreFromContentLock ¶
func (*CoordinatorClientRemote) SetClientLock ¶
func (*CoordinatorClientRemote) SetFsNode ¶
func (c *CoordinatorClientRemote) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*CoordinatorClientRemote) SetHostKeepAlive ¶
func (c *CoordinatorClientRemote) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*CoordinatorClientRemote) SetStoreFromContentLock ¶
type DiscoveryClient ¶
type DiscoveryClient struct {
// contains filtered or unexported fields
}
func NewDiscoveryClient ¶
func NewDiscoveryClient(cfg BlobCacheGlobalConfig, hostMap *HostMap, coordinator CoordinatorClient, locality string) *DiscoveryClient
func (*DiscoveryClient) GetHostState ¶
func (d *DiscoveryClient) GetHostState(ctx context.Context, host *BlobCacheHost) (*BlobCacheHost, error)
GetHostState attempts to connect to the gRPC service and verifies its availability
type ErrNodeNotFound ¶
type ErrNodeNotFound struct {
Id string
}
func (*ErrNodeNotFound) Error ¶
func (e *ErrNodeNotFound) Error() string
type FSNode ¶
type FileSystem ¶
type FileSystemOpts ¶
type FileSystemOpts struct {
MountPoint string
Verbose bool
Metadata *BlobCacheMetadata
}
BlobFS types
type FileSystemStorage ¶
type HostMap ¶
type HostMap struct {
// contains filtered or unexported fields
}
func NewHostMap ¶
func NewHostMap(cfg BlobCacheGlobalConfig, onHostAdded func(*BlobCacheHost) error) *HostMap
func (*HostMap) Closest ¶
func (hm *HostMap) Closest(timeout time.Duration) (*BlobCacheHost, error)
Closest finds the nearest host within a given timeout If no hosts are found, it will error out
func (*HostMap) ClosestWithCapacity ¶
func (hm *HostMap) ClosestWithCapacity(timeout time.Duration) (*BlobCacheHost, error)
ClosestWithCapacity finds the nearest host with available storage capacity within a given timeout If no hosts are found, it will error out
func (*HostMap) Get ¶
func (hm *HostMap) Get(hostId string) *BlobCacheHost
func (*HostMap) GetAll ¶
func (hm *HostMap) GetAll() []*BlobCacheHost
func (*HostMap) Remove ¶
func (hm *HostMap) Remove(host *BlobCacheHost)
func (*HostMap) Set ¶
func (hm *HostMap) Set(host *BlobCacheHost)
type JuiceFSConfig ¶
type JuiceFSConfig struct {
RedisURI string `key:"redisURI" json:"redis_uri"`
Bucket string `key:"bucket" json:"bucket"`
AccessKey string `key:"accessKey" json:"access_key"`
SecretKey string `key:"secretKey" json:"secret_key"`
CacheSize int64 `key:"cacheSize" json:"cache_size"`
BlockSize int64 `key:"blockSize" json:"block_size"`
Prefetch int64 `key:"prefetch" json:"prefetch"`
BufferSize int64 `key:"bufferSize" json:"buffer_size"`
}
type JuiceFsSource ¶
type JuiceFsSource struct {
// contains filtered or unexported fields
}
func (*JuiceFsSource) Format ¶
func (s *JuiceFsSource) Format(fsName string) error
func (*JuiceFsSource) Mount ¶
func (s *JuiceFsSource) Mount(localPath string) error
func (*JuiceFsSource) Unmount ¶
func (s *JuiceFsSource) Unmount(localPath string) error
type MetadataConfig ¶
type MetadataConfig struct {
Mode BlobCacheMetadataMode `key:"mode" json:"mode"`
ValkeyConfig ValkeyConfig `key:"valkey" json:"valkey"`
// Default config
RedisAddr string `key:"redisAddr" json:"redis_addr"`
RedisPasswd string `key:"redisPasswd" json:"redis_passwd"`
RedisTLSEnabled bool `key:"redisTLSEnabled" json:"redis_tls_enabled"`
RedisMode RedisMode `key:"redisMode" json:"redis_mode"`
RedisMasterName string `key:"redisMasterName" json:"redis_master_name"`
}
type MockCoordinator ¶
type MockCoordinator struct {
// contains filtered or unexported fields
}
MockCoordinator is a simple in-memory coordinator for testing Does not require Redis or any external dependencies
func NewMockCoordinator ¶
func NewMockCoordinator() *MockCoordinator
func (*MockCoordinator) AddFsNodeChild ¶
func (m *MockCoordinator) AddFsNodeChild(ctx context.Context, pid, id string) error
func (*MockCoordinator) AddHostToIndex ¶
func (m *MockCoordinator) AddHostToIndex(ctx context.Context, locality string, host *BlobCacheHost) error
func (*MockCoordinator) GetAvailableHosts ¶
func (m *MockCoordinator) GetAvailableHosts(ctx context.Context, locality string) ([]*BlobCacheHost, error)
func (*MockCoordinator) GetFsNode ¶
func (m *MockCoordinator) GetFsNode(ctx context.Context, id string) (*BlobFsMetadata, error)
func (*MockCoordinator) GetFsNodeChildren ¶
func (m *MockCoordinator) GetFsNodeChildren(ctx context.Context, id string) ([]*BlobFsMetadata, error)
func (*MockCoordinator) GetRegionConfig ¶
func (m *MockCoordinator) GetRegionConfig(ctx context.Context, locality string) (BlobCacheServerConfig, error)
func (*MockCoordinator) RefreshStoreFromContentLock ¶
func (*MockCoordinator) RemoveClientLock ¶
func (*MockCoordinator) RemoveFsNode ¶
func (m *MockCoordinator) RemoveFsNode(ctx context.Context, id string) error
func (*MockCoordinator) RemoveFsNodeChild ¶
func (m *MockCoordinator) RemoveFsNodeChild(ctx context.Context, pid, id string) error
func (*MockCoordinator) RemoveStoreFromContentLock ¶
func (*MockCoordinator) SetClientLock ¶
func (*MockCoordinator) SetFsNode ¶
func (m *MockCoordinator) SetFsNode(ctx context.Context, id string, metadata *BlobFsMetadata) error
func (*MockCoordinator) SetHostKeepAlive ¶
func (m *MockCoordinator) SetHostKeepAlive(ctx context.Context, locality string, host *BlobCacheHost) error
func (*MockCoordinator) SetStoreFromContentLock ¶
type MountPointConfig ¶
type MountPointConfig struct {
BucketName string `key:"bucketName" json:"bucket_name"`
AccessKey string `key:"accessKey" json:"access_key"`
SecretKey string `key:"secretKey" json:"secret_key"`
Region string `key:"region" json:"region"`
EndpointURL string `key:"endpointUrl" json:"endpoint_url"`
ForcePathStyle bool `key:"forcePathStyle" json:"force_path_style"`
}
type MountPointSource ¶
type MountPointSource struct {
// contains filtered or unexported fields
}
func (*MountPointSource) Format ¶
func (s *MountPointSource) Format(fsName string) error
func (*MountPointSource) Mount ¶
func (s *MountPointSource) Mount(localPath string) error
func (*MountPointSource) Unmount ¶
func (s *MountPointSource) Unmount(localPath string) error
type ParserFunc ¶
type PrefetchState ¶
type PrefetchState struct {
// contains filtered or unexported fields
}
PrefetchState tracks sequential read patterns per file/hash
type Prefetcher ¶
type Prefetcher struct {
// contains filtered or unexported fields
}
Prefetcher detects sequential reads and prefetches ahead
func NewPrefetcher ¶
func NewPrefetcher(ctx context.Context, cas *ContentAddressableStorage, bufferPool *BufferPool) *Prefetcher
NewPrefetcher creates a new prefetcher instance
func (*Prefetcher) OnRead ¶
func (pf *Prefetcher) OnRead(hash string, offset, length int64)
OnRead should be called on each read to detect patterns
type RedisClient ¶
type RedisClient struct {
redis.UniversalClient
}
func NewRedisClient ¶
func NewRedisClient(config RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)
func (*RedisClient) Keys ¶
Gets all keys using a pattern Actually runs a scan since keys locks up the database.
func (*RedisClient) PSubscribe ¶
func (*RedisClient) ToSlice ¶
func (r *RedisClient) ToSlice(v interface{}) []interface{}
type RedisConfig ¶
type RedisConfig struct {
Addrs []string `key:"addrs" json:"addrs"`
Mode RedisMode `key:"mode" json:"mode"`
ClientName string `key:"clientName" json:"client_name"`
EnableTLS bool `key:"enableTLS" json:"enable_tls"`
InsecureSkipVerify bool `key:"insecureSkipVerify" json:"insecure_skip_verify"`
MinIdleConns int `key:"minIdleConns" json:"min_idle_conns"`
MaxIdleConns int `key:"maxIdleConns" json:"max_idle_conns"`
ConnMaxIdleTime time.Duration `key:"connMaxIdleTime" json:"conn_max_idle_time"`
ConnMaxLifetime time.Duration `key:"connMaxLifetime" json:"conn_max_lifetime"`
DialTimeout time.Duration `key:"dialTimeout" json:"dial_timeout"`
ReadTimeout time.Duration `key:"readTimeout" json:"read_timeout"`
WriteTimeout time.Duration `key:"writeTimeout" json:"write_timeout"`
MaxRedirects int `key:"maxRedirects" json:"max_redirects"`
MaxRetries int `key:"maxRetries" json:"max_retries"`
PoolSize int `key:"poolSize" json:"pool_size"`
Username string `key:"username" json:"username"`
Password string `key:"password" json:"password"`
RouteByLatency bool `key:"routeByLatency" json:"route_by_latency"`
MasterName string `key:"masterName" json:"master_name"`
SentinelPassword string `key:"sentinelPassword" json:"sentinel_password"`
}
type RedisLock ¶
type RedisLock struct {
// contains filtered or unexported fields
}
func NewRedisLock ¶
func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock
type RedisLockOption ¶
type RedisLockOption func(*RedisLock)
type RedisLockOptions ¶
type RegionConfig ¶
type RegionConfig struct {
ServerConfig BlobCacheServerConfig `key:"server" json:"server"`
}
type RendezvousHasher ¶
type RendezvousHasher interface {
Add(hosts ...*BlobCacheHost)
Remove(host *BlobCacheHost)
GetN(n int, key string) []*BlobCacheHost
}
type S3Client ¶
type S3Client struct {
Client *s3.Client
Source S3SourceConfig
DownloadConcurrency int64
DownloadChunkSize int64
}
func NewS3Client ¶
func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig, serverConfig BlobCacheServerConfig) (*S3Client, error)
func (*S3Client) BucketName ¶
func (*S3Client) DownloadIntoBuffer ¶
type S3SourceConfig ¶
type Source ¶
type Source interface {
Mount(localPath string) error
Format(fsName string) error
Unmount(localPath string) error
}
func NewJuiceFsSource ¶
func NewJuiceFsSource(config JuiceFSConfig) (Source, error)
func NewMountPointSource ¶
func NewMountPointSource(config MountPointConfig) (Source, error)
func NewSource ¶
func NewSource(config SourceConfig) (Source, error)
type SourceConfig ¶
type SourceConfig struct {
Mode string `key:"mode" json:"mode"`
FilesystemName string `key:"fsName" json:"filesystem_name"`
FilesystemPath string `key:"fsPath" json:"filesystem_path"`
JuiceFS JuiceFSConfig `key:"juicefs" json:"juicefs"`
MountPoint MountPointConfig `key:"mountpoint" json:"mountpoint"`
}
type StorageLayer ¶
type StorageLayer interface {
}
type ValkeyConfig ¶
type ValkeyConfig struct {
PrimaryName string `key:"primaryName" json:"primary_name"`
Password string `key:"password" json:"password"`
TLS bool `key:"tls" json:"tls"`
Host string `key:"host" json:"host"`
Port int `key:"port" json:"port"`
ExistingPrimary ValkeyExistingPrimary `key:"existingPrimary" json:"existingPrimary"`
}
type ValkeyExistingPrimary ¶
Source Files
¶
- blobfs.go
- blobfs_node.go
- buffer_pool.go
- client.go
- config.go
- coordinator_local.go
- coordinator_mock.go
- coordinator_remote.go
- discovery.go
- errors.go
- fadvise.go
- hostmap.go
- logger.go
- metadata.go
- metrics.go
- network.go
- prefetcher.go
- redis.go
- s3_client.go
- server.go
- server_coordinator.go
- source.go
- source_juicefs.go
- source_mountpoint.go
- storage.go
- types.go