Documentation
¶
Index ¶
- Constants
- func GooseMigrations(dialect Dialect) []*goose.Migration
- func Null[T any](v *T) sql.Null[T]
- func NullString(s *string) sql.NullString
- func TransformSQL(sql string, dialect Dialect) string
- type Dialect
- type FileInfo
- type Option
- type Repo
- func (r *Repo) AddNodeToShard(ctx context.Context, shardID id.ShardID, nodeCID cid.Cid, spaceDID did.DID, ...) error
- func (r *Repo) AddShardToIndex(ctx context.Context, indexID id.IndexID, shardID id.ShardID) error
- func (r *Repo) AddSourceToSpace(ctx context.Context, spaceDID did.DID, sourceID id.SourceID) error
- func (r *Repo) CIDForFSEntry(ctx context.Context, fsEntryID id.FSEntryID) (cid.Cid, error)
- func (r *Repo) Checkpoint(ctx context.Context) error
- func (r *Repo) Close() error
- func (r *Repo) CompleteDAGScansForUpload(ctx context.Context, uploadID id.UploadID) ([]model.DAGScan, error)
- func (r *Repo) CreateDAGScan(ctx context.Context, fsEntryID id.FSEntryID, isDirectory bool, ...) (dagmodel.DAGScan, error)
- func (r *Repo) CreateDirectoryChildren(ctx context.Context, parent *scanmodel.Directory, children []scanmodel.FSEntry) error
- func (r *Repo) CreateIndex(ctx context.Context, uploadID id.UploadID) (*model.Index, error)
- func (r *Repo) CreateShard(ctx context.Context, uploadID id.UploadID, size uint64, ...) (*model.Shard, error)
- func (r *Repo) CreateSource(ctx context.Context, name string, path string, ...) (*sourcemodel.Source, error)
- func (r *Repo) DeleteFSEntry(ctx context.Context, spaceDID did.DID, fsEntryID id.FSEntryID) error
- func (r *Repo) DeleteNodes(ctx context.Context, spaceDID did.DID, nodeCIDs []cid.Cid) error
- func (r *Repo) DeleteShard(ctx context.Context, shardID id.ShardID) error
- func (r *Repo) DeleteSpace(ctx context.Context, spaceDID did.DID) error
- func (r *Repo) Dialect() Dialect
- func (r *Repo) DirectoryChildren(ctx context.Context, dir *scanmodel.Directory) ([]scanmodel.FSEntry, error)
- func (r *Repo) DirectoryLinks(ctx context.Context, dirScan *model.DirectoryDAGScan) ([]model.LinkParams, error)
- func (r *Repo) FilesToDAGScan(ctx context.Context, uploadID id.UploadID, count int) ([]FileInfo, error)
- func (r *Repo) FindNodeByCIDAndSpaceDID(ctx context.Context, c cid.Cid, spaceDID did.DID) (dagsmodel.Node, error)
- func (r *Repo) FindOrCreateDirectory(ctx context.Context, path string, lastModified time.Time, mode fs.FileMode, ...) (*scanmodel.Directory, bool, error)
- func (r *Repo) FindOrCreateFile(ctx context.Context, path string, lastModified time.Time, mode fs.FileMode, ...) (*scanmodel.File, bool, error)
- func (r *Repo) FindOrCreateRawNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, ...) (*model.RawNode, bool, error)
- func (r *Repo) FindOrCreateSpace(ctx context.Context, did did.DID, name string, ...) (*spacesmodel.Space, error)
- func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, ...) (*model.UnixFSNode, bool, error)
- func (r *Repo) FindOrCreateUploads(ctx context.Context, spaceDID did.DID, sourceIDs []id.SourceID) ([]*model.Upload, error)
- func (r *Repo) ForEachNode(ctx context.Context, shardID id.ShardID, ...) error
- func (r *Repo) GetFileByID(ctx context.Context, fileID id.FSEntryID) (*scanmodel.File, error)
- func (r *Repo) GetIndexByID(ctx context.Context, indexID id.IndexID) (*model.Index, error)
- func (r *Repo) GetShardByID(ctx context.Context, shardID id.ShardID) (*model.Shard, error)
- func (r *Repo) GetSourceByID(ctx context.Context, sourceID id.SourceID) (*sourcemodel.Source, error)
- func (r *Repo) GetSourceByName(ctx context.Context, name string) (*sourcemodel.Source, error)
- func (r *Repo) GetSpaceByDID(ctx context.Context, spaceDID did.DID) (*spacesmodel.Space, error)
- func (r *Repo) GetSpaceByName(ctx context.Context, name string) (*spacesmodel.Space, error)
- func (r *Repo) GetUploadByID(ctx context.Context, uploadID id.UploadID) (*model.Upload, error)
- func (r *Repo) HasIncompleteChildren(ctx context.Context, directoryScans *model.DirectoryDAGScan) (bool, error)
- func (r *Repo) IncompleteDAGScansForUpload(ctx context.Context, uploadID id.UploadID) ([]model.DAGScan, error)
- func (r *Repo) IndexesForUploadByState(ctx context.Context, uploadID id.UploadID, state model.BlobState) ([]*model.Index, error)
- func (r *Repo) LinksForCID(ctx context.Context, c cid.Cid, sd did.DID) ([]*model.Link, error)
- func (r *Repo) ListSpaceSources(ctx context.Context, spaceDID did.DID) ([]id.SourceID, error)
- func (r *Repo) ListSpaces(ctx context.Context) ([]*spacesmodel.Space, error)
- func (r *Repo) NodesByShard(ctx context.Context, shardID id.ShardID, startOffset uint64) ([]dagsmodel.Node, error)
- func (r *Repo) NodesNotInShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) ([]cid.Cid, error)
- func (r *Repo) RemoveSourceFromSpace(ctx context.Context, spaceDID did.DID, sourceID id.SourceID) error
- func (r *Repo) ShardedFiles(ctx context.Context, uploadID id.UploadID, count int) ([]FileInfo, error)
- func (r *Repo) ShardsForIndex(ctx context.Context, indexID id.IndexID) ([]*model.Shard, error)
- func (r *Repo) ShardsForUpload(ctx context.Context, uploadID id.UploadID) ([]*model.Shard, error)
- func (r *Repo) ShardsForUploadByState(ctx context.Context, uploadID id.UploadID, state model.BlobState) ([]*model.Shard, error)
- func (r *Repo) ShardsNotInIndexes(ctx context.Context, uploadID id.UploadID) ([]id.ShardID, error)
- func (r *Repo) StartPeriodicCheckpoint(ctx context.Context, interval time.Duration)
- func (r *Repo) StopPeriodicCheckpoint()
- func (r *Repo) TotalBytesToScan(ctx context.Context, uploadID id.UploadID) (uint64, error)
- func (r *Repo) UpdateDAGScan(ctx context.Context, dagScan model.DAGScan) error
- func (r *Repo) UpdateIndex(ctx context.Context, index *model.Index) error
- func (r *Repo) UpdateShard(ctx context.Context, shard *model.Shard) error
- func (r *Repo) UpdateSource(ctx context.Context, src *sourcemodel.Source) error
- func (r *Repo) UpdateUpload(ctx context.Context, upload *model.Upload) error
- type RowScanner
Constants ¶
const DefaultCheckpointInterval = 5 * time.Minute
DefaultCheckpointInterval is the default interval for automatic WAL checkpointing.
const DefaultPreparedStmtCacheSize = 128
Variables ¶
This section is empty.
Functions ¶
func GooseMigrations ¶ added in v0.5.1
GooseMigrations reads embedded .sql migration files, transforms the SQL for the given dialect, and returns goose Migration objects.
func NullString ¶
func NullString(s *string) sql.NullString
func TransformSQL ¶ added in v0.5.1
TransformSQL adapts SQL written in SQLite dialect for the target dialect. For SQLite, it returns the input unchanged. For Postgres:
- Removes PRAGMA statements
- Removes the STRICT keyword from CREATE TABLE
- Replaces BLOB with BYTEA
- Replaces INTEGER with BIGINT (SQLite INTEGER is 64-bit; Postgres INTEGER is 32-bit)
Types ¶
type Repo ¶
type Repo struct {
// contains filtered or unexported fields
}
func New ¶
New creates a new Repo instance with the given database connection. Defaults to SQLite dialect.
func NewWithDialect ¶ added in v0.5.1
NewWithDialect creates a new Repo instance with the given database connection and dialect.
func (*Repo) AddNodeToShard ¶
func (r *Repo) AddNodeToShard(ctx context.Context, shardID id.ShardID, nodeCID cid.Cid, spaceDID did.DID, uploadID id.UploadID, offset uint64, options ...blobs.AddNodeToShardOption) error
AddNodeToShard assigns an existing node_upload record to a shard. Note: the [offset] is NOT the absolute offset within the shard, but the offset into the *new* bytes in the shard CAR where the node data begins--in other words, the length of the length varint + the length of the CID bytes. The node's block will be indexed as appearing at `shard.size + offset`, running for `node.size` bytes, and then the shard size will be increased by `offset + node.size`.
func (*Repo) AddShardToIndex ¶ added in v0.4.0
func (*Repo) AddSourceToSpace ¶
AddSourceToSpace adds a source to a space in the repository.
func (*Repo) CIDForFSEntry ¶
func (*Repo) Checkpoint ¶ added in v0.4.0
Checkpoint forces a WAL checkpoint to transfer data from the write-ahead log to the main database file. This should be called periodically during long operations to prevent unbounded WAL growth. The RESTART mode runs until the WAL is fully checkpointed, so that write can start from the beginning of the file. This is a no-op for PostgreSQL.
func (*Repo) CompleteDAGScansForUpload ¶
func (*Repo) CreateDAGScan ¶
func (*Repo) CreateDirectoryChildren ¶
func (r *Repo) CreateDirectoryChildren(ctx context.Context, parent *scanmodel.Directory, children []scanmodel.FSEntry) error
CreateDirectoryChildren links a directory to its children in the repository.
func (*Repo) CreateIndex ¶ added in v0.4.0
func (*Repo) CreateShard ¶
func (*Repo) CreateSource ¶
func (r *Repo) CreateSource(ctx context.Context, name string, path string, options ...sourcemodel.SourceOption) (*sourcemodel.Source, error)
CreateSource creates a new source in the repository with the given name, path, and options.
func (*Repo) DeleteFSEntry ¶
func (*Repo) DeleteNodes ¶
func (*Repo) DeleteShard ¶
func (*Repo) DeleteSpace ¶
DeleteSpace deletes a space from the repository.
func (*Repo) DirectoryChildren ¶
func (r *Repo) DirectoryChildren(ctx context.Context, dir *scanmodel.Directory) ([]scanmodel.FSEntry, error)
DirectoryChildren retrieves the children of a directory from the repository.
func (*Repo) DirectoryLinks ¶
func (r *Repo) DirectoryLinks(ctx context.Context, dirScan *model.DirectoryDAGScan) ([]model.LinkParams, error)
DirectoryLinks retrieves link parameters for a given directory scan.
func (*Repo) FilesToDAGScan ¶
func (*Repo) FindNodeByCIDAndSpaceDID ¶
func (*Repo) FindOrCreateDirectory ¶
func (r *Repo) FindOrCreateDirectory(ctx context.Context, path string, lastModified time.Time, mode fs.FileMode, checksum []byte, sourceID id.SourceID, spaceDID did.DID) (*scanmodel.Directory, bool, error)
FindOrCreateDirectory finds or creates a directory entry in the repository with the given parameters. If the directory already exists, it returns the existing directory and false. If the directory does not exist, it creates a new directory entry and returns it along with true.
func (*Repo) FindOrCreateFile ¶
func (r *Repo) FindOrCreateFile(ctx context.Context, path string, lastModified time.Time, mode fs.FileMode, size uint64, checksum []byte, sourceID id.SourceID, spaceDID did.DID) (*scanmodel.File, bool, error)
FindOrCreateFile finds or creates a file entry in the repository with the given parameters. If the file already exists, it returns the existing file and false. If the file does not exist, it creates a new file entry and returns it along with true.
func (*Repo) FindOrCreateRawNode ¶
func (r *Repo) FindOrCreateRawNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, path string, sourceID id.SourceID, offset uint64) (*model.RawNode, bool, error)
FindOrCreateRawNode finds or creates a raw node in the repository. If a node with the same CID, size, path, source ID, and offset already exists, it returns that node. If not, it creates a new raw node with the provided parameters.
func (*Repo) FindOrCreateSpace ¶
func (r *Repo) FindOrCreateSpace(ctx context.Context, did did.DID, name string, options ...spacesmodel.SpaceOption) (*spacesmodel.Space, error)
FindOrCreateSpace finds an existing space or creates a new one in the repository with the given name and options.
func (*Repo) FindOrCreateUnixFSNode ¶
func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, ufsdata []byte, linkParams []model.LinkParams) (*model.UnixFSNode, bool, error)
FindOrCreateUnixFSNode finds or creates a UnixFS node in the repository. If a node with the same CID, size, and ufsdata already exists, it returns that node. If not, it creates a new UnixFS node with the provided parameters.
func (*Repo) FindOrCreateUploads ¶
func (r *Repo) FindOrCreateUploads(ctx context.Context, spaceDID did.DID, sourceIDs []id.SourceID) ([]*model.Upload, error)
FindOrCreateUploads creates uploads for a given space and source IDs.
func (*Repo) ForEachNode ¶
func (*Repo) GetFileByID ¶
GetFileByID retrieves a file by its unique ID from the repository.
func (*Repo) GetIndexByID ¶ added in v0.4.0
func (*Repo) GetShardByID ¶
func (*Repo) GetSourceByID ¶
func (r *Repo) GetSourceByID(ctx context.Context, sourceID id.SourceID) (*sourcemodel.Source, error)
GetSourceByID retrieves a source by its unique ID from the repository.
func (*Repo) GetSourceByName ¶
GetSourceByName retrieves a source by its name from the repository.
func (*Repo) GetSpaceByDID ¶
GetSpaceByDID retrieves a space by its unique DID from the repository.
func (*Repo) GetSpaceByName ¶
GetSpaceByName retrieves a space by its name from the repository.
func (*Repo) GetUploadByID ¶
GetUploadByID retrieves an upload by its unique ID from the repository.
func (*Repo) HasIncompleteChildren ¶
func (r *Repo) HasIncompleteChildren(ctx context.Context, directoryScans *model.DirectoryDAGScan) (bool, error)
HasIncompleteChildren returns whether the given directory scan has at least one child scan that is not completed.
func (*Repo) IncompleteDAGScansForUpload ¶
func (*Repo) IndexesForUploadByState ¶ added in v0.4.0
func (*Repo) LinksForCID ¶
func (*Repo) ListSpaceSources ¶
ListSpaceSources lists all sources associated with a given space DID.
func (*Repo) ListSpaces ¶
ListSpaces lists all spaces in the repository.
func (*Repo) NodesByShard ¶
func (*Repo) NodesNotInShards ¶ added in v0.4.0
func (r *Repo) NodesNotInShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) ([]cid.Cid, error)
NodesNotInShards returns CIDs of nodes that are not yet assigned to shards.
func (*Repo) RemoveSourceFromSpace ¶
func (r *Repo) RemoveSourceFromSpace(ctx context.Context, spaceDID did.DID, sourceID id.SourceID) error
RemoveSourceFromSpace removes a source from a space in the repository.
func (*Repo) ShardedFiles ¶
func (*Repo) ShardsForIndex ¶ added in v0.4.0
func (*Repo) ShardsForUpload ¶ added in v0.4.0
func (*Repo) ShardsForUploadByState ¶
func (*Repo) ShardsNotInIndexes ¶ added in v0.4.0
func (*Repo) StartPeriodicCheckpoint ¶ added in v0.4.0
StartPeriodicCheckpoint starts a background goroutine that periodically checkpoints the WAL to prevent unbounded growth during long operations. Call StopPeriodicCheckpoint to stop it, or it will be stopped when Close is called. This is a no-op for PostgreSQL, which manages WAL internally.
func (*Repo) StopPeriodicCheckpoint ¶ added in v0.4.0
func (r *Repo) StopPeriodicCheckpoint()
StopPeriodicCheckpoint stops the background checkpoint goroutine if running.
func (*Repo) TotalBytesToScan ¶
func (*Repo) UpdateDAGScan ¶
UpdateDAGScan updates a DAG scan in the repository.
func (*Repo) UpdateIndex ¶ added in v0.4.0
func (*Repo) UpdateShard ¶
UpdateShard updates a DAG scan in the repository.
func (*Repo) UpdateSource ¶
UpdateSource updates the given source in the repository.
type RowScanner ¶
RowScanner can scan a row into a set of destinations. It should not be confused with sql.Scanner, which is used to scan a single value.