sqlrepo

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: Apache-2.0, MIT Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultCheckpointInterval = 5 * time.Minute

DefaultCheckpointInterval is the default interval for automatic WAL checkpointing.

View Source
const DefaultPreparedStmtCacheSize = 128

Variables

This section is empty.

Functions

func GooseMigrations added in v0.5.1

func GooseMigrations(dialect Dialect) []*goose.Migration

GooseMigrations reads embedded .sql migration files, transforms the SQL for the given dialect, and returns goose Migration objects.

func Null

func Null[T any](v *T) sql.Null[T]

func NullString

func NullString(s *string) sql.NullString

func TransformSQL added in v0.5.1

func TransformSQL(sql string, dialect Dialect) string

TransformSQL adapts SQL written in SQLite dialect for the target dialect. For SQLite, it returns the input unchanged. For Postgres:

  • Removes PRAGMA statements
  • Removes the STRICT keyword from CREATE TABLE
  • Replaces BLOB with BYTEA
  • Replaces INTEGER with BIGINT (SQLite INTEGER is 64-bit; Postgres INTEGER is 32-bit)

Types

type Dialect added in v0.5.1

type Dialect int

Dialect represents the SQL dialect being used.

const (
	DialectSQLite Dialect = iota
	DialectPostgres
)

type FileInfo

type FileInfo struct {
	Path string
	Size uint64
}

type Option added in v0.4.0

type Option func(*Repo)

func WithEventBus added in v0.4.0

func WithEventBus(bus bus.Bus) Option

type Repo

type Repo struct {
	// contains filtered or unexported fields
}

func New

func New(db *sql.DB, opts ...Option) (*Repo, error)

New creates a new Repo instance with the given database connection. Defaults to SQLite dialect.

func NewWithDialect added in v0.5.1

func NewWithDialect(db *sql.DB, dialect Dialect, opts ...Option) (*Repo, error)

NewWithDialect creates a new Repo instance with the given database connection and dialect.

func (*Repo) AddNodeToShard

func (r *Repo) AddNodeToShard(ctx context.Context, shardID id.ShardID, nodeCID cid.Cid, spaceDID did.DID, uploadID id.UploadID, offset uint64, options ...blobs.AddNodeToShardOption) error

AddNodeToShard assigns an existing node_upload record to a shard. Note: the [offset] is NOT the absolute offset within the shard, but the offset into the *new* bytes in the shard CAR where the node data begins--in other words, the length of the length varint + the length of the CID bytes. The node's block will be indexed as appearing at `shard.size + offset`, running for `node.size` bytes, and then the shard size will be increased by `offset + node.size`.

func (*Repo) AddShardToIndex added in v0.4.0

func (r *Repo) AddShardToIndex(ctx context.Context, indexID id.IndexID, shardID id.ShardID) error

func (*Repo) AddSourceToSpace

func (r *Repo) AddSourceToSpace(ctx context.Context, spaceDID did.DID, sourceID id.SourceID) error

AddSourceToSpace adds a source to a space in the repository.

func (*Repo) CIDForFSEntry

func (r *Repo) CIDForFSEntry(ctx context.Context, fsEntryID id.FSEntryID) (cid.Cid, error)

func (*Repo) Checkpoint added in v0.4.0

func (r *Repo) Checkpoint(ctx context.Context) error

Checkpoint forces a WAL checkpoint to transfer data from the write-ahead log to the main database file. This should be called periodically during long operations to prevent unbounded WAL growth. The RESTART mode runs until the WAL is fully checkpointed, so that write can start from the beginning of the file. This is a no-op for PostgreSQL.

func (*Repo) Close

func (r *Repo) Close() error

func (*Repo) CompleteDAGScansForUpload

func (r *Repo) CompleteDAGScansForUpload(ctx context.Context, uploadID id.UploadID) ([]model.DAGScan, error)

func (*Repo) CreateDAGScan

func (r *Repo) CreateDAGScan(ctx context.Context, fsEntryID id.FSEntryID, isDirectory bool, uploadID id.UploadID, spaceDID did.DID) (dagmodel.DAGScan, error)

func (*Repo) CreateDirectoryChildren

func (r *Repo) CreateDirectoryChildren(ctx context.Context, parent *scanmodel.Directory, children []scanmodel.FSEntry) error

CreateDirectoryChildren links a directory to its children in the repository.

func (*Repo) CreateIndex added in v0.4.0

func (r *Repo) CreateIndex(ctx context.Context, uploadID id.UploadID) (*model.Index, error)

func (*Repo) CreateShard

func (r *Repo) CreateShard(ctx context.Context, uploadID id.UploadID, size uint64, digestState, pieceCidState []byte) (*model.Shard, error)

func (*Repo) CreateSource

func (r *Repo) CreateSource(ctx context.Context, name string, path string, options ...sourcemodel.SourceOption) (*sourcemodel.Source, error)

CreateSource creates a new source in the repository with the given name, path, and options.

func (*Repo) DeleteFSEntry

func (r *Repo) DeleteFSEntry(ctx context.Context, spaceDID did.DID, fsEntryID id.FSEntryID) error

func (*Repo) DeleteNodes

func (r *Repo) DeleteNodes(ctx context.Context, spaceDID did.DID, nodeCIDs []cid.Cid) error

func (*Repo) DeleteShard

func (r *Repo) DeleteShard(ctx context.Context, shardID id.ShardID) error

func (*Repo) DeleteSpace

func (r *Repo) DeleteSpace(ctx context.Context, spaceDID did.DID) error

DeleteSpace deletes a space from the repository.

func (*Repo) Dialect added in v0.5.1

func (r *Repo) Dialect() Dialect

Dialect returns the SQL dialect used by this Repo.

func (*Repo) DirectoryChildren

func (r *Repo) DirectoryChildren(ctx context.Context, dir *scanmodel.Directory) ([]scanmodel.FSEntry, error)

DirectoryChildren retrieves the children of a directory from the repository.

func (r *Repo) DirectoryLinks(ctx context.Context, dirScan *model.DirectoryDAGScan) ([]model.LinkParams, error)

DirectoryLinks retrieves link parameters for a given directory scan.

func (*Repo) FilesToDAGScan

func (r *Repo) FilesToDAGScan(ctx context.Context, uploadID id.UploadID, count int) ([]FileInfo, error)

func (*Repo) FindNodeByCIDAndSpaceDID

func (r *Repo) FindNodeByCIDAndSpaceDID(ctx context.Context, c cid.Cid, spaceDID did.DID) (dagsmodel.Node, error)

func (*Repo) FindOrCreateDirectory

func (r *Repo) FindOrCreateDirectory(ctx context.Context, path string, lastModified time.Time, mode fs.FileMode, checksum []byte, sourceID id.SourceID, spaceDID did.DID) (*scanmodel.Directory, bool, error)

FindOrCreateDirectory finds or creates a directory entry in the repository with the given parameters. If the directory already exists, it returns the existing directory and false. If the directory does not exist, it creates a new directory entry and returns it along with true.

func (*Repo) FindOrCreateFile

func (r *Repo) FindOrCreateFile(ctx context.Context, path string, lastModified time.Time, mode fs.FileMode, size uint64, checksum []byte, sourceID id.SourceID, spaceDID did.DID) (*scanmodel.File, bool, error)

FindOrCreateFile finds or creates a file entry in the repository with the given parameters. If the file already exists, it returns the existing file and false. If the file does not exist, it creates a new file entry and returns it along with true.

func (*Repo) FindOrCreateRawNode

func (r *Repo) FindOrCreateRawNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, path string, sourceID id.SourceID, offset uint64) (*model.RawNode, bool, error)

FindOrCreateRawNode finds or creates a raw node in the repository. If a node with the same CID, size, path, source ID, and offset already exists, it returns that node. If not, it creates a new raw node with the provided parameters.

func (*Repo) FindOrCreateSpace

func (r *Repo) FindOrCreateSpace(ctx context.Context, did did.DID, name string, options ...spacesmodel.SpaceOption) (*spacesmodel.Space, error)

FindOrCreateSpace finds an existing space or creates a new one in the repository with the given name and options.

func (*Repo) FindOrCreateUnixFSNode

func (r *Repo) FindOrCreateUnixFSNode(ctx context.Context, cid cid.Cid, size uint64, spaceDID did.DID, uploadID id.UploadID, ufsdata []byte, linkParams []model.LinkParams) (*model.UnixFSNode, bool, error)

FindOrCreateUnixFSNode finds or creates a UnixFS node in the repository. If a node with the same CID, size, and ufsdata already exists, it returns that node. If not, it creates a new UnixFS node with the provided parameters.

func (*Repo) FindOrCreateUploads

func (r *Repo) FindOrCreateUploads(ctx context.Context, spaceDID did.DID, sourceIDs []id.SourceID) ([]*model.Upload, error)

FindOrCreateUploads creates uploads for a given space and source IDs.

func (*Repo) ForEachNode

func (r *Repo) ForEachNode(ctx context.Context, shardID id.ShardID, yield func(node dagsmodel.Node, shardOffset uint64) error) error

func (*Repo) GetFileByID

func (r *Repo) GetFileByID(ctx context.Context, fileID id.FSEntryID) (*scanmodel.File, error)

GetFileByID retrieves a file by its unique ID from the repository.

func (*Repo) GetIndexByID added in v0.4.0

func (r *Repo) GetIndexByID(ctx context.Context, indexID id.IndexID) (*model.Index, error)

func (*Repo) GetShardByID

func (r *Repo) GetShardByID(ctx context.Context, shardID id.ShardID) (*model.Shard, error)

func (*Repo) GetSourceByID

func (r *Repo) GetSourceByID(ctx context.Context, sourceID id.SourceID) (*sourcemodel.Source, error)

GetSourceByID retrieves a source by its unique ID from the repository.

func (*Repo) GetSourceByName

func (r *Repo) GetSourceByName(ctx context.Context, name string) (*sourcemodel.Source, error)

GetSourceByName retrieves a source by its name from the repository.

func (*Repo) GetSpaceByDID

func (r *Repo) GetSpaceByDID(ctx context.Context, spaceDID did.DID) (*spacesmodel.Space, error)

GetSpaceByDID retrieves a space by its unique DID from the repository.

func (*Repo) GetSpaceByName

func (r *Repo) GetSpaceByName(ctx context.Context, name string) (*spacesmodel.Space, error)

GetSpaceByName retrieves a space by its name from the repository.

func (*Repo) GetUploadByID

func (r *Repo) GetUploadByID(ctx context.Context, uploadID id.UploadID) (*model.Upload, error)

GetUploadByID retrieves an upload by its unique ID from the repository.

func (*Repo) HasIncompleteChildren

func (r *Repo) HasIncompleteChildren(ctx context.Context, directoryScans *model.DirectoryDAGScan) (bool, error)

HasIncompleteChildren returns whether the given directory scan has at least one child scan that is not completed.

func (*Repo) IncompleteDAGScansForUpload

func (r *Repo) IncompleteDAGScansForUpload(ctx context.Context, uploadID id.UploadID) ([]model.DAGScan, error)

func (*Repo) IndexesForUploadByState added in v0.4.0

func (r *Repo) IndexesForUploadByState(ctx context.Context, uploadID id.UploadID, state model.BlobState) ([]*model.Index, error)

func (*Repo) LinksForCID

func (r *Repo) LinksForCID(ctx context.Context, c cid.Cid, sd did.DID) ([]*model.Link, error)

func (*Repo) ListSpaceSources

func (r *Repo) ListSpaceSources(ctx context.Context, spaceDID did.DID) ([]id.SourceID, error)

ListSpaceSources lists all sources associated with a given space DID.

func (*Repo) ListSpaces

func (r *Repo) ListSpaces(ctx context.Context) ([]*spacesmodel.Space, error)

ListSpaces lists all spaces in the repository.

func (*Repo) NodesByShard

func (r *Repo) NodesByShard(ctx context.Context, shardID id.ShardID, startOffset uint64) ([]dagsmodel.Node, error)

func (*Repo) NodesNotInShards added in v0.4.0

func (r *Repo) NodesNotInShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) ([]cid.Cid, error)

NodesNotInShards returns CIDs of nodes that are not yet assigned to shards.

func (*Repo) RemoveSourceFromSpace

func (r *Repo) RemoveSourceFromSpace(ctx context.Context, spaceDID did.DID, sourceID id.SourceID) error

RemoveSourceFromSpace removes a source from a space in the repository.

func (*Repo) ShardedFiles

func (r *Repo) ShardedFiles(ctx context.Context, uploadID id.UploadID, count int) ([]FileInfo, error)

func (*Repo) ShardsForIndex added in v0.4.0

func (r *Repo) ShardsForIndex(ctx context.Context, indexID id.IndexID) ([]*model.Shard, error)

func (*Repo) ShardsForUpload added in v0.4.0

func (r *Repo) ShardsForUpload(ctx context.Context, uploadID id.UploadID) ([]*model.Shard, error)

func (*Repo) ShardsForUploadByState

func (r *Repo) ShardsForUploadByState(ctx context.Context, uploadID id.UploadID, state model.BlobState) ([]*model.Shard, error)

func (*Repo) ShardsNotInIndexes added in v0.4.0

func (r *Repo) ShardsNotInIndexes(ctx context.Context, uploadID id.UploadID) ([]id.ShardID, error)

func (*Repo) StartPeriodicCheckpoint added in v0.4.0

func (r *Repo) StartPeriodicCheckpoint(ctx context.Context, interval time.Duration)

StartPeriodicCheckpoint starts a background goroutine that periodically checkpoints the WAL to prevent unbounded growth during long operations. Call StopPeriodicCheckpoint to stop it, or it will be stopped when Close is called. This is a no-op for PostgreSQL, which manages WAL internally.

func (*Repo) StopPeriodicCheckpoint added in v0.4.0

func (r *Repo) StopPeriodicCheckpoint()

StopPeriodicCheckpoint stops the background checkpoint goroutine if running.

func (*Repo) TotalBytesToScan

func (r *Repo) TotalBytesToScan(ctx context.Context, uploadID id.UploadID) (uint64, error)

func (*Repo) UpdateDAGScan

func (r *Repo) UpdateDAGScan(ctx context.Context, dagScan model.DAGScan) error

UpdateDAGScan updates a DAG scan in the repository.

func (*Repo) UpdateIndex added in v0.4.0

func (r *Repo) UpdateIndex(ctx context.Context, index *model.Index) error

func (*Repo) UpdateShard

func (r *Repo) UpdateShard(ctx context.Context, shard *model.Shard) error

UpdateShard updates a DAG scan in the repository.

func (*Repo) UpdateSource

func (r *Repo) UpdateSource(ctx context.Context, src *sourcemodel.Source) error

UpdateSource updates the given source in the repository.

func (*Repo) UpdateUpload

func (r *Repo) UpdateUpload(ctx context.Context, upload *model.Upload) error

UpdateUpload implements uploads.Repo.

type RowScanner

type RowScanner interface {
	Scan(dest ...any) error
}

RowScanner can scan a row into a set of destinations. It should not be confused with sql.Scanner, which is used to scan a single value.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL