defs

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: MIT Imports: 43 Imported by: 0

README

flow diagram

DeFS

Go Reference Go

DeFS(Decentralized file service) 去中心化文件服务,引领了数据存储领域的革新,通过引入动态存储技术,解决了传统静态存储和去中心化存储所面临的挑战。其核心设计理念是为用户提供更加高效、安全且可靠的数据存储解决方案。通过数据的自由流动和动态迁移,在去中心化的同时,又保障了数据的安全性和可靠性。

该项目目前正在积极开发中,处于 Alpha 状态。

文档

该文档正在编写中……

Documentation

Index

Constants

View Source
const (
	// 文件上传检查事件
	EventFileUploadCheck = "defs@event:file/upload/check/1.0.0"
	// 文件片段上传事件
	EventFileSliceUpload = "defs@event:file/slice/upload/1.0.0"
	// 文件下载开始事件
	EventFileDownloadStart = "defs@event:file/download/start/1.0.0"
	// 文件下载检查事件
	EventFileDownloadCheck = "defs@event:file/download/check/1.0.0"
)

事件协议

View Source
const (
	FileHeader = "\x89PNGA\r\n\x1a\n" // 自定义文件头
	Version    = 1                    // 版本控制
)
View Source
const (
	// 文件上传请求主题
	PubsubFileUploadRequestTopic = "defs@pubsub:file/upload/request/1.0.0"
	// 文件上传响应主题
	PubsubFileUploadResponseTopic = "defs@pubsub:file/upload/response/1.0.0"
	// 文件下载请求主题
	PubsubFileDownloadRequestTopic = "defs@pubsub:file/download/request/1.0.0"
	// 文件下载响应主题
	PubsubFileDownloadResponseTopic = "defs@pubsub:file/download/response/1.0.0"
)

订阅主题

View Source
const (
	// 文件片段上传协议
	StreamFileSliceUploadProtocol = "defs@stream:file/slice/upload/1.0.0"
	// 文件下载响应协议
	StreamFileDownloadResponseProtocol = "defs@stream:file/download/response/1.0.0"
)

流协议

Variables

View Source
var (
	RootPath = filepath.Join(ObtainRootPath(), "defsdata")

	// 二级目录
	Files = filepath.Join(RootPath, "files") // 文件目录
	DB    = filepath.Join(RootPath, "db")    // 数据库目录
	Logs  = filepath.Join(RootPath, "logs")  // 日志目录

	// 三级目录
	UploadPath     = filepath.Join(Files, "uploads")   // 上传目录
	SlicePath      = filepath.Join(Files, "slices")    // 切片目录
	DownloadPath   = filepath.Join(Files, "downloads") // 下载目录
	BusinessDbPath = filepath.Join(DB, "businessdbs")  // 业务db目录
)

路径管理器

View Source
var ChunkOffsetLength = make(map[string]int64)

块的偏移位置和长度,用于随机访问

View Source
var ErrFoo = fmt.Errorf("EOF")

EOF 是当没有更多输入可用时 Read 返回的错误。 函数应该仅返回 EOF 以表示输入正常结束。

Functions

func AppendSegmentToFile added in v0.0.4

func AppendSegmentToFile(file *os.File, segmentType string, data []byte, xref *FileXref) error

AppendSegmentToFile 打开现有文件并添加一个新的段(segmentType 和 data),同时更新 xref 表

func AppendSegmentsToFile added in v0.0.4

func AppendSegmentsToFile(file *os.File, segments map[string][]byte, xref *FileXref) error

AppendSegmentsToFile 打开现有文件并批量添加新的段,同时更新 xref 表

func CalculateHash added in v0.0.4

func CalculateHash(data []byte) []byte

计算[]byte的SHA-256 hash值

func CreateTempFile added in v0.0.4

func CreateTempFile(payload []byte) (file *os.File, err error)

func DecodeFromBytes added in v0.0.4

func DecodeFromBytes(data []byte, result interface{}) error

DecodeFromBytes 使用 gob 解码将 []byte 转换为指定的数据结构

func Decrypt added in v0.0.4

func Decrypt(key, ciphertext []byte) ([]byte, error)

Decrypt 使用AES-CTR模式和给定的密钥对密文进行解密

func EncodeToBytes added in v0.0.4

func EncodeToBytes(data interface{}) ([]byte, error)

EncodeToBytes 使用 gob 编码将任意数据转换为 []byte

func Encrypt added in v0.0.4

func Encrypt(key, plaintext []byte) ([]byte, error)

Encrypt 使用AES-CTR模式和给定的密钥对明文进行加密

func FromBytes added in v0.0.4

func FromBytes[T any](data []byte) (T, error)

FromBytes 泛型函数,用于将 []byte 转换回指定类型

func GenerateKeysFromSeed added in v0.0.4

func GenerateKeysFromSeed(seedData []byte, bits int) (*rsa.PrivateKey, *rsa.PublicKey, error)

GenerateKeysFromSeed 使用种子数据生成 RSA 密钥对

func GetDefaultDownloadPath added in v0.0.4

func GetDefaultDownloadPath() string

GetDefaultDownloadPath 返回操作系统的默认下载路径。 它假设用户使用的是操作系统的标准下载文件夹。

func HandleFileDownloadRequestPubSub added in v0.0.4

func HandleFileDownloadRequestPubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, res *streams.RequestMessage)

HandleFileDownloadRequestPubSub 处理文件下载请求的订阅消息

func HandleFileDownloadResponsePubSub added in v0.0.4

func HandleFileDownloadResponsePubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, db *sqlites.SqliteDB, downloadChan chan *downloadChan, registry *eventbus.EventRegistry, pool *MemoryPool, res *streams.RequestMessage)

HandleFileDownloadResponsePubSub 处理文件下载响应的订阅消息

func HandleFileUploadRequestPubSub added in v0.0.4

func HandleFileUploadRequestPubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, res *streams.RequestMessage)

HandleFileUploadRequestPubSub 处理文件上传请求的订阅消息

func HandleFileUploadResponsePubSub added in v0.0.4

func HandleFileUploadResponsePubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, pool *MemoryPool, res *streams.RequestMessage)

HandleFileUploadResponsePubSub 处理文件上传响应的订阅消息

func IsGorunEnv added in v0.0.4

func IsGorunEnv() bool

是否go run运行环境

func MergeFieldsForSigning added in v0.0.4

func MergeFieldsForSigning(fields ...interface{}) ([]byte, error)

MergeFieldsForSigning 接受任意数量和类型的字段,将它们序列化并合并为一个 []byte。

func ObtainRootPath added in v0.0.4

func ObtainRootPath() string

获取根目录

func ProcessDownloadResponseChecklist added in v0.0.4

func ProcessDownloadResponseChecklist(pool *MemoryPool, db *sqlites.SqliteDB, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, payload *FileDownloadResponseChecklistPayload, receiver peer.ID)

ProcessDownloadResponseChecklist 处理文件下载响应清单

func ProcessDownloadResponseContent added in v0.0.4

func ProcessDownloadResponseContent(p2p *dep2p.DeP2P, db *sqlites.SqliteDB, downloadChan chan *downloadChan, registry *eventbus.EventRegistry, pool *MemoryPool, payload *FileDownloadResponseContentPayload)

ProcessDownloadResponseContent 处理文件下载响应内容

func ProcessDownloadResponseGetContent added in v0.0.4

func ProcessDownloadResponseGetContent(p2p *dep2p.DeP2P, db *sqlites.SqliteDB, downloadChan chan *downloadChan, registry *eventbus.EventRegistry, pool *MemoryPool, payload *FileDownloadResponseContentPayload)

ProcessDownloadResponseGetContent 处理文件下载响应内容

func ReadChunk added in v0.0.4

func ReadChunk(file *os.File, chunkType string, key []byte) (string, []byte, error)

ReadChunk 读取块内容

func ReadSegment added in v0.0.4

func ReadSegment(file *os.File, segmentType string, xref *FileXref) ([]byte, error)

ReadSegment 从文件读取段

func ReadSegmentFromBuffer added in v0.0.4

func ReadSegmentFromBuffer(buffer *bytes.Buffer, segmentType string, xref *FileXref) ([]byte, error)

ReadSegmentFromBuffer 从缓冲区读取段

func ReadSegmentToFile added in v0.0.4

func ReadSegmentToFile(file *os.File, segmentType string, xref *FileXref) ([]byte, error)

ReadSegment 从文件读取段

func RegisterEventProtocol added in v0.0.4

func RegisterEventProtocol(lc fx.Lifecycle, input RegisterEventProtocolInput) error

RegisterEvents 注册事件

func RegisterPubsubProtocol added in v0.0.4

func RegisterPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)

RegisterPubsubProtocol 注册订阅

func RegisterStreamProtocol added in v0.0.4

func RegisterStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)

RegisterStreamProtocol 注册流

func SaveAndClose added in v0.0.4

func SaveAndClose(file *os.File, xref *FileXref) error

SaveAndClose 保存 xref 表和关闭文件

func SendDataToPeer added in v0.0.4

func SendDataToPeer(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, protocol, topic, genre string, receiver peer.ID, data interface{}) error

SendDataToPeer 尝试先通过流发送数据,失败后通过订阅发送 protocol 协议 topic 主题 genre 类型 receiver 接收方ID data 内容

func SendDownloadInfo added in v0.0.4

func SendDownloadInfo(downloadChans chan *downloadChan, assetID, sliceHash string, totalPieces, index int)

SendDownloadInfo 向下载通道发送信息

func SendDownloadRequestContents added in v0.0.4

func SendDownloadRequestContents(pool *MemoryPool, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, payload *FileDownloadResponseChecklistPayload, receiver peer.ID)

SendDownloadRequestContents 发送文件下载请求(内容)

func SendFileSliceToNetwork added in v0.0.4

func SendFileSliceToNetwork(opt *Options, p2p *dep2p.DeP2P, uploadChan chan *uploadChan, registry *eventbus.EventRegistry, cache *ristretto.Cache, pool *MemoryPool, assetID, sliceHash string, totalPieces, current int) error

SendFileSliceToNetwork 发送文件片段至网络 assetID 文件资产的唯一标识(外部标识) sliceHash 文件片段的哈希值(外部标识) totalPieces 文件片段的总量 current 当前序列

func SendPubSub added in v0.0.4

func SendPubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, topic, genre string, receiver peer.ID, data interface{}) error

SendPubSub 向指定的节点发送订阅消息 topic 主题 genre 类型 receiver 接收方ID data 内容

func SendUploadInfo added in v0.0.4

func SendUploadInfo(uploadChans chan *uploadChan, assetID, sliceHash string, totalPieces, index int, peerIDs []string)

SendUploadInfo 向上传通道发送信息

func ToBytes added in v0.0.4

func ToBytes[T any](data T) ([]byte, error)

ToBytes 泛型函数,用于将不同类型的数据转换为 []byte

func WriteChunk added in v0.0.4

func WriteChunk(buffer *bytes.Buffer, chunkType string, data []byte, key []byte) error

WriteChunk 写入块到缓冲区

func WriteSegment added in v0.0.4

func WriteSegment(file *os.File, segmentType string, data []byte, xref *FileXref) error

WriteSegment 将段写入文件

func WriteSegmentToBuffer added in v0.0.4

func WriteSegmentToBuffer(buffer *bytes.Buffer, segmentType string, data []byte, xref *FileXref) error

WriteSegmentToBuffer 将段写入缓冲区

func WriteSegmentToFile added in v0.0.4

func WriteSegmentToFile(file *os.File, segmentType string, data []byte, xref *FileXref) error

WriteSegment 将段写入文件

func WriteSegmentsToBuffer added in v0.0.4

func WriteSegmentsToBuffer(buffer *bytes.Buffer, segments map[string][]byte, xref *FileXref) error

WriteSegmentsToBuffer 批量将段写入缓冲区

func WriteSegmentsToFile added in v0.0.4

func WriteSegmentsToFile(file *os.File, segments map[string][]byte, xref *FileXref) error

WriteSegmentsToFile 批量将段写入文件

Types

type BitSet added in v0.0.4

type BitSet struct {
	// contains filtered or unexported fields
}

BitSet 实现

func NewBitSet added in v0.0.4

func NewBitSet(size int) *BitSet

func (*BitSet) Clear added in v0.0.4

func (b *BitSet) Clear(i int)

func (*BitSet) IsSet added in v0.0.4

func (b *BitSet) IsSet(i int) bool

func (*BitSet) Set added in v0.0.4

func (b *BitSet) Set(i int)

type DownloadPieceInfo added in v0.0.4

type DownloadPieceInfo struct {
	Hash    string   // 文件片段的哈希值
	PeerID  []string // 该片段对应的节点ID
	RSCodes bool     // 是否为纠删码
}

DownloadPieceInfo 表示单个文件片段的信息和对应的节点ID

type DownloadTask added in v0.0.4

type DownloadTask struct {
	FileHash    string                     // 文件内容的哈希值(内部标识)
	Name        string                     // 文件的基本名称
	Size        int64                      // 常规文件的长度(以字节为单位)
	TotalPieces int                        // 文件总片数(数据片段和纠删码片段的总数)
	DataPieces  int                        // 数据片段的数量
	Progress    BitSet                     // 文件下载进度
	PieceInfo   map[int]*DownloadPieceInfo // 存储每个文件片段的哈希和对应节点ID
	Mu          sync.RWMutex               // 控制对Progress的并发访问
	Paused      bool                       // 是否暂停上传
}

DownloadTask 表示单个文件资产的下载状态

type FS

type FS struct {
	// contains filtered or unexported fields
}

FS提供了与DeFS交互所需的各种函数

func Open

func Open(opt *Options, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub) (*FS, error)

Open 返回一个新的文件存储对象

func (*FS) ContinueDownloading added in v0.0.4

func (fs *FS) ContinueDownloading(assetID string, fileHash ...string) error

TODO:ContinueDownloading 继续下载文件

func (*FS) DeleteDownloading added in v0.0.4

func (fs *FS) DeleteDownloading(assetID string, fileHash ...string) error

TODO:DeleteDownloading 删除下载文件

func (*FS) Download added in v0.0.4

func (fs *FS) Download(assetID string, fileHash ...string) error

Download 下载文件 暂停后重新下载,需要输入文件ID用于解密,不落盘 assetID string // 文件资产的唯一标识(外部标识) fileHash string // 文件内容的哈希值(内部标识)

func (*FS) GetDownloadChannel added in v0.0.4

func (fs *FS) GetDownloadChannel() chan *downloadChan

GetDownloadChannel 返回下载通道

func (*FS) GetUploadChannel added in v0.0.4

func (fs *FS) GetUploadChannel() chan *uploadChan

GetUploadChannel 返回上传通道

func (*FS) PauseDownloading added in v0.0.4

func (fs *FS) PauseDownloading(assetID string, fileHash ...string) error

TODO:PauseDownloading 停止下载文件

func (*FS) Upload added in v0.0.4

func (fs *FS) Upload(path string) (*struct {
	AssetID  string    // 文件资产的唯一标识(外部标识)
	FileHash string    // 文件内容的哈希值(内部标识)
	Name     string    // 文件的基本名称
	Size     int64     // 常规文件的长度(以字节为单位)
	ModTime  time.Time // 修改时间
	FileType string    // 文件类型或格式
}, error)

Upload 上传新文件 path: 文件路径

type FileDownloadRequestChecklistPayload added in v0.0.4

type FileDownloadRequestChecklistPayload struct {
	AssetID string // 文件资产的唯一标识(外部标识)
}

文件下载请求(清单)

type FileDownloadRequestContentPayload added in v0.0.4

type FileDownloadRequestContentPayload struct {
	AssetID   string // 文件资产的唯一标识(外部标识)
	SliceHash string // 待下载的切片哈希
	Index     int    // 文件片段的索引(该片段在文件中的顺序位置)
}

文件下载请求(内容)

type FileDownloadResponseChecklistPayload added in v0.0.4

type FileDownloadResponseChecklistPayload struct {
	AssetID         string            // 文件资产的唯一标识(外部标识)
	FileHash        string            // 文件内容的哈希值(内部标识)
	Name            string            // 文件的基本名称
	Size            int64             // 常规文件的长度(以字节为单位)
	SliceTable      map[int]HashTable // 文件片段的哈希表
	AvailableSlices map[int]string    // 本地存储的文件片段信息
}

文件下载响应(清单)

type FileDownloadResponseContentPayload added in v0.0.4

type FileDownloadResponseContentPayload struct {
	AssetID      string // 文件资产的唯一标识(外部标识)
	SliceHash    string // 下载的切片哈希
	Index        int    // 文件片段的索引(该片段在文件中的顺序位置)
	SliceContent []byte // 切片内容
}

文件下载响应(内容)

type FileInfo added in v0.0.4

type FileInfo struct {
	// contains filtered or unexported fields
}

FileInfo 描述一个文件

func (*FileInfo) AssetID added in v0.0.4

func (fi *FileInfo) AssetID() string

AssetID 资产的唯一标识

func (*FileInfo) DataShards added in v0.0.4

func (fi *FileInfo) DataShards() int64

DataShards 数据分片

func (*FileInfo) FileHash added in v0.0.4

func (fi *FileInfo) FileHash() string

FileHash 文件内容的哈希值

func (*FileInfo) FileType added in v0.0.4

func (fi *FileInfo) FileType() string

fileType 文件类型或格式

func (*FileInfo) ModTime added in v0.0.4

func (fi *FileInfo) ModTime() time.Time

ModTime 修改时间

func (*FileInfo) Name added in v0.0.4

func (fi *FileInfo) Name() string

Name 文件的基本名称

func (*FileInfo) ParityShards added in v0.0.4

func (fi *FileInfo) ParityShards() int64

ParityShards 奇偶分片

func (*FileInfo) PublicKey added in v0.0.4

func (fi *FileInfo) PublicKey() []byte

publicKey 文件所有者的公钥 func (fi *FileInfo) PublicKey() *rsa.PublicKey {

func (*FileInfo) Size added in v0.0.4

func (fi *FileInfo) Size() int64

Size 常规文件的长度(以字节为单位)

func (*FileInfo) SliceList added in v0.0.4

func (fi *FileInfo) SliceList() []SliceInfo

Slice 切片列表

func (*FileInfo) SliceTable added in v0.0.4

func (fi *FileInfo) SliceTable() map[int]HashTable

SliceTable 切片内容的哈希表

func (*FileInfo) UploadTime added in v0.0.4

func (fi *FileInfo) UploadTime() time.Time

UploadTime 上传时间

type FileUploadRequestCheckPayload added in v0.0.4

type FileUploadRequestCheckPayload struct {
	AssetID    string    // 文件资产的唯一标识(外部标识)
	UploadTime time.Time // 上传时间
}

文件上传请求(检查)

type FileXref added in v0.0.4

type FileXref struct {
	XrefTable map[string]XrefEntry // xref 表
	StartXref int64                // startxref 的位置
	// contains filtered or unexported fields
}

FileXref 结构体用于跟踪单个文件的 xref 表和 startxref 的位置

func LoadXref added in v0.0.4

func LoadXref(file *os.File) (*FileXref, error)

LoadXref 从文件加载 xref 表

func NewFileXref added in v0.0.4

func NewFileXref() *FileXref

NewFileXref 创建一个新的 FileXref 对象,并初始化 xref 表

type HashTable added in v0.0.4

type HashTable struct {
	Hash    string // 文件片段的哈希值
	RsCodes bool   // 是否为纠删码
}

type MemoryPool added in v0.0.4

type MemoryPool struct {
	UploadTasks   map[string]*UploadTask   // 上传任务池
	DownloadTasks map[string]*DownloadTask // 下载任务池
	Mu            sync.RWMutex             // 读写互斥锁
}

MemoryPool 定义了文件上传和下载的内存池

func (*MemoryPool) AddDownloadTask added in v0.0.4

func (pool *MemoryPool) AddDownloadTask(assetID string, fileHash ...string) error

AddDownloadTask 添加一个新的下载任务。如果任务已存在,返回错误。 可选参数 fileHash 用于指定文件内容的哈希值。

func (*MemoryPool) AddUploadTask added in v0.0.4

func (pool *MemoryPool) AddUploadTask(assetID string, totalPieces int) error

AddUploadTask 添加一个新的上传任务

func (*MemoryPool) DeleteDownloadTask added in v0.0.4

func (pool *MemoryPool) DeleteDownloadTask(assetID string)

DeleteDownloadTask 删除指定资产的下载任务

func (*MemoryPool) DeleteUploadTask added in v0.0.4

func (pool *MemoryPool) DeleteUploadTask(assetID string)

DeleteUploadTask 删除指定资产的上传任务

func (*MemoryPool) GetIncompleteDownloadPieces added in v0.0.4

func (pool *MemoryPool) GetIncompleteDownloadPieces(assetID string) []string

GetIncompleteDownloadPieces 获取未完成的下载片段的哈希值

func (*MemoryPool) GetIncompleteUploadPieces added in v0.0.4

func (pool *MemoryPool) GetIncompleteUploadPieces(assetID string) []string

GetIncompleteUploadPieces 获取未完成的上传片段

func (*MemoryPool) IsDownloadComplete added in v0.0.4

func (pool *MemoryPool) IsDownloadComplete(assetID string) bool

IsDownloadComplete 检查指定文件资产的下载是否完成

func (*MemoryPool) IsDownloadTaskPaused added in v0.0.4

func (pool *MemoryPool) IsDownloadTaskPaused(assetID string) (bool, error)

IsDownloadTaskPaused 检查指定的下载任务是否已暂停

func (*MemoryPool) IsUploadComplete added in v0.0.4

func (pool *MemoryPool) IsUploadComplete(assetID string) bool

IsUploadComplete 检查指定文件资产的上传是否完成

func (*MemoryPool) IsUploadTaskPaused added in v0.0.4

func (pool *MemoryPool) IsUploadTaskPaused(assetID string) (bool, error)

IsUploadTaskPaused 检查指定的上传任务是否已暂停

func (*MemoryPool) MarkDownloadPieceComplete added in v0.0.4

func (pool *MemoryPool) MarkDownloadPieceComplete(assetID string, pieceIndex int) bool

MarkDownloadPieceComplete 标记下载任务中的一个片段为完成,并返回是否所有片段都已下载

func (*MemoryPool) MarkDownloadPieceCompleteByHash added in v0.0.4

func (pool *MemoryPool) MarkDownloadPieceCompleteByHash(assetID, pieceHash string) bool

MarkDownloadPieceCompleteByHash 根据文件片段的哈希值标记下载任务中的一个片段为完成,并返回是否所有片段都已下载

func (*MemoryPool) MarkUploadPieceComplete added in v0.0.4

func (pool *MemoryPool) MarkUploadPieceComplete(assetID string, pieceIndex int) bool

MarkUploadPieceComplete 标记上传任务中的一个片段为完成,并返回是否所有片段都已上传

func (*MemoryPool) PauseDownloadTask added in v0.0.4

func (pool *MemoryPool) PauseDownloadTask(assetID string) error

PauseDownloadTask 暂停指定的下载任务

func (*MemoryPool) PauseUploadTask added in v0.0.4

func (pool *MemoryPool) PauseUploadTask(assetID string) error

PauseUploadTask 暂停指定的上传任务

func (*MemoryPool) ResetDownloadTask added in v0.0.4

func (pool *MemoryPool) ResetDownloadTask(assetID string) error

ResetDownloadTask 清除下载任务的所有进度

func (*MemoryPool) ResumeDownloadTask added in v0.0.4

func (pool *MemoryPool) ResumeDownloadTask(assetID string) error

ResumeDownloadTask 恢复指定的下载任务

func (*MemoryPool) ResumeUploadTask added in v0.0.4

func (pool *MemoryPool) ResumeUploadTask(assetID string) error

ResumeUploadTask 恢复指定的上传任务

func (*MemoryPool) RevertDownloadPieceProgress added in v0.0.4

func (pool *MemoryPool) RevertDownloadPieceProgress(assetID, pieceHash string) error

RevertDownloadPieceProgress 回退单个文件片段的下载进度

func (*MemoryPool) UpdateDownloadPieceInfo added in v0.0.4

func (pool *MemoryPool) UpdateDownloadPieceInfo(peerID string, assetID, name string, size int64, sliceTable map[int]HashTable, pieceHashes map[int]string, fileHash ...string)

UpdateDownloadPieceInfo 用于更新下载任务中特定片段的节点信息。 assetID 是文件资产的唯一标识。 sliceTable 是文件片段的哈希表,其中 key 是文件片段的序号,value 是文件片段的哈希。 peerID 是存储文件片段的节点ID。 fileHash 是文件内容的哈希值,如果提供则更新。

func (*MemoryPool) UpdateUploadPieceInfo added in v0.0.4

func (pool *MemoryPool) UpdateUploadPieceInfo(assetID string, pieceHash string, pieceInfo *UploadPieceInfo)

UpdateUploadPieceInfo 更新上传任务中特定片段的信息

type NewEventRegistryOutput added in v0.0.4

type NewEventRegistryOutput struct {
	fx.Out
	Registry *eventbus.EventRegistry // 事件总线
}

func NewEventRegistry added in v0.0.4

func NewEventRegistry(lc fx.Lifecycle) (out NewEventRegistryOutput, err error)

NewEventRegistry 新的事件总线

type NewMemoryPoolOutput added in v0.0.4

type NewMemoryPoolOutput struct {
	fx.Out
	Pool *MemoryPool // 文件上传内存池
}

func NewMemoryPool added in v0.0.4

func NewMemoryPool(lc fx.Lifecycle) (out NewMemoryPoolOutput, err error)

NewMemoryPool 初始化一个新的文件上传内存池

type NewRistrettoCacheOutput added in v0.0.4

type NewRistrettoCacheOutput struct {
	fx.Out
	Cache *ristretto.Cache // 缓存实例
}

func NewRistrettoCache added in v0.0.4

func NewRistrettoCache(lc fx.Lifecycle) (out NewRistrettoCacheOutput, err error)

NewRistrettoCache 新的缓存实例

type Options added in v0.0.4

type Options struct {
	// contains filtered or unexported fields
}

Options 是用于创建文件存储对象的参数

func DefaultOptions added in v0.0.4

func DefaultOptions() *Options

DefaultOptions 设置一个推荐选项列表以获得良好的性能。

func (*Options) BuildDownloadPath added in v0.0.4

func (opt *Options) BuildDownloadPath(path string)

BuildDownloadPath 设置下载路径

func (*Options) BuildLocalStorage added in v0.0.4

func (opt *Options) BuildLocalStorage(isEnable bool)

BuildLocalStorage 设置是否启动本地存储选项

func (*Options) BuildMaxSliceSize added in v0.0.4

func (opt *Options) BuildMaxSliceSize(maxSliceSize int64) error

BuildMaxSliceSize 设置最大切片的大小选项

func (*Options) BuildMinSliceSize added in v0.0.4

func (opt *Options) BuildMinSliceSize(minSliceSize int64) error

BuildMinSliceSize 设置最小切片的大小

func (*Options) BuildRootPath added in v0.0.4

func (opt *Options) BuildRootPath(path string)

BuildRootPath 设置文件根路径

func (*Options) BuildRoutingTableLow added in v0.0.4

func (opt *Options) BuildRoutingTableLow(low int64)

BuildRoutingTableLow 设置路由表中连接的最小节点数量

func (*Options) BuildShardsOptions added in v0.0.4

func (opt *Options) BuildShardsOptions(dataShards, parityShards int64) error

BuildShardsOptions 设置奇偶分片大小选项

func (*Options) BuildSizeAndRatioOptions added in v0.0.4

func (opt *Options) BuildSizeAndRatioOptions(shardSize int64, parityRatio float64) error

BuildSizeAndRatioOptions 设置奇偶分片比例选项 shardSize 以字节为单位

func (*Options) GetShardsOptions added in v0.0.4

func (opt *Options) GetShardsOptions() (int64, int64, bool)

GetShardsOptions 获取奇偶分片大小选项

func (*Options) GetSizeAndRatioOptions added in v0.0.4

func (opt *Options) GetSizeAndRatioOptions() (int64, float64, bool)

GetSizeAndRatioOptions 获取奇偶分片比例选项

type RegisterEventProtocolInput added in v0.0.4

type RegisterEventProtocolInput struct {
	fx.In
	Ctx          context.Context     // 全局上下文
	Opt          *Options            // 文件存储选项配置
	P2P          *dep2p.DeP2P        // DeP2P网络主机
	PubSub       *pubsub.DeP2PPubSub // DeP2P网络订阅
	DB           *sqlites.SqliteDB   // sqlite数据库服务
	UploadChan   chan *uploadChan    // 用于刷新上传的通道
	DownloadChan chan *downloadChan  // 用于刷新下载的通道

	Registry *eventbus.EventRegistry // 事件总线
	Cache    *ristretto.Cache        // 缓存实例
	Pool     *MemoryPool             // 内存池
}

type RegisterPubsubProtocolInput added in v0.0.4

type RegisterPubsubProtocolInput struct {
	fx.In
	Ctx          context.Context     // 全局上下文
	Opt          *Options            // 文件存储选项配置
	P2P          *dep2p.DeP2P        // DeP2P网络主机
	PubSub       *pubsub.DeP2PPubSub // DeP2P网络订阅
	DB           *sqlites.SqliteDB   // sqlite数据库服务
	UploadChan   chan *uploadChan    // 用于刷新上传的通道
	DownloadChan chan *downloadChan  // 用于刷新下载的通道

	Registry *eventbus.EventRegistry // 事件总线
	Cache    *ristretto.Cache        // 缓存实例
	Pool     *MemoryPool             // 内存池
}

type RegisterStreamProtocolInput added in v0.0.4

type RegisterStreamProtocolInput struct {
	fx.In
	Ctx          context.Context     // 全局上下文
	Opt          *Options            // 文件存储选项配置
	P2P          *dep2p.DeP2P        // DeP2P网络主机
	PubSub       *pubsub.DeP2PPubSub // DeP2P网络订阅
	DB           *sqlites.SqliteDB   // sqlite数据库服务
	UploadChan   chan *uploadChan    // 用于刷新上传的通道
	DownloadChan chan *downloadChan  // 用于刷新下载的通道

	Registry *eventbus.EventRegistry // 事件总线
	Cache    *ristretto.Cache        // 缓存实例
	Pool     *MemoryPool             // 内存池
}

type SliceInfo added in v0.0.4

type SliceInfo struct {
	// contains filtered or unexported fields
}

SliceInfo 描述了文件的一个切片信息

func (*SliceInfo) Index added in v0.0.4

func (sl *SliceInfo) Index() int

Index 文件片段的索引(该片段在文件中的顺序位置)

func (*SliceInfo) RSCodes added in v0.0.4

func (sl *SliceInfo) RSCodes() bool

RSCodes 文件片段是否为纠删码

func (*SliceInfo) Signature added in v0.0.4

func (sl *SliceInfo) Signature() []byte

Signature 文件和文件片段的数据签名

func (*SliceInfo) SliceHash added in v0.0.4

func (sl *SliceInfo) SliceHash() string

SliceHash 资产的唯一标识(外部标识)

type StorageMode added in v0.0.4

type StorageMode int

存储模式

const (
	FileMode      StorageMode = iota // 文件模式
	SliceMode                        // 切片模式,将文件分割成有限个切片
	RS_Size                          // 纠删码(大小)模式
	RS_Proportion                    // 纠删码(比例)模式
)

type StreamProtocol added in v0.0.4

type StreamProtocol struct {
	// contains filtered or unexported fields
}

流协议

func (*StreamProtocol) HandleFileDownloadResponseStream added in v0.0.4

func (sp *StreamProtocol) HandleFileDownloadResponseStream(req *streams.RequestMessage, res *streams.ResponseMessage) error

HandleFileDownloadResponseStream 处理文件下载响应的流消息

func (*StreamProtocol) HandleStreamFileSliceUploadStream added in v0.0.4

func (sp *StreamProtocol) HandleStreamFileSliceUploadStream(req *streams.RequestMessage, res *streams.ResponseMessage) error

HandleStreamFileSliceUploadStream 处理文件片段上传的流消息

type UploadPieceInfo added in v0.0.4

type UploadPieceInfo struct {
	Index  int      // 文件片段的序列号
	PeerID []string // 节点的host地址
}

UploadPieceInfo 表示单个文件片段的信息

type UploadTask added in v0.0.4

type UploadTask struct {
	TotalPieces int                         // 文件总片数
	Progress    BitSet                      // 文件上传进度
	PieceInfo   map[string]*UploadPieceInfo // 每个文件片段的详细信息
	Mu          sync.RWMutex                // 控制对Progress的并发访问
	RetryCounts map[int]int                 // 记录失败重试次数的映射
	Paused      bool                        // 是否暂停上传
}

UploadTask 表示单个文件资产的上传状态

type XrefEntry added in v0.0.4

type XrefEntry struct {
	Offset int64  // 偏移量
	Length uint32 // 长度
}

XrefEntry 结构体用于保存每个段的偏移量和长度

Directories

Path Synopsis
定义共享的基类和方法
定义共享的基类和方法

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL