uploadqueue

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package uploadqueue provides an async upload queue for Storacha CAR uploads. CARs are enqueued synchronously (fast, local SQLite write) and uploaded to Storacha in the background by worker goroutines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetExtractBlockPositionsFn

func SetExtractBlockPositionsFn(fn func([]byte) (map[cid.Cid]blobindex.Position, error))

SetExtractBlockPositionsFn registers the CAR block position extractor. Called once from storacha package init.

Types

type Config

type Config struct {
	Store        QueueStore
	Uploader     StorachaUploader
	Parallelism  int           // default: 2
	PollInterval time.Duration // default: 1s
	MaxRetries   int           // default: 5
	Logger       *slog.Logger
}

Config holds configuration for the upload queue manager.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the async upload queue background worker.

func New

func New(cfg Config) *Manager

New creates a new upload queue manager. Call Start to begin background processing.

func (*Manager) Notify

func (m *Manager) Notify()

Notify wakes a sleeping worker. Call after enqueuing a new CAR. The channel buffer is intentionally size 1: rapid consecutive enqueues coalesce into a single notification. The second CAR will be picked up on the next poll cycle (PollInterval) or by a worker that just finished processing.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context)

Start launches the background upload workers.

func (*Manager) Stop

func (m *Manager) Stop()

Stop gracefully shuts down the background workers.

type QueueStore

type QueueStore interface {
	DequeuePendingCARs(ctx context.Context, limit int) ([]storage.PendingCAR, error)
	GetPendingBlobsForCAR(ctx context.Context, queueID int64) ([]storage.PendingBlob, error)
	MarkCARUploaded(ctx context.Context, id int64) error
	MarkCARFailed(ctx context.Context, id int64, errMsg string) error
	MarkBlobUploaded(ctx context.Context, id int64) error
	MarkBlobFailed(ctx context.Context, id int64, errMsg string) error
}

QueueStore is the storage interface required by the upload queue manager. *sqlite.LogStore implements this interface.

type StorachaUploader

type StorachaUploader interface {
	UploadFullStateCAR(ctx context.Context, carData []byte, rootCID cid.Cid, positions map[cid.Cid]blobindex.Position) (string, error)
	UploadFinalizedBlob(ctx context.Context, data []byte) (string, error)
}

StorachaUploader uploads a full-state CAR to Storacha.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL