Documentation
¶
Overview ¶
Package archive moves aged data from hot storage (Supabase) to cold storage (R2, S3, B2, etc.) to stay within quota limits.
Index ¶
- func ColdKey(jobID, taskID string) string
- func TaskHTMLObjectPath(jobID, taskID string) string
- type ArchiveCandidate
- type ArchiveSource
- type Archiver
- type ColdStorageProvider
- type Config
- type JobMarkerFunc
- type S3Provider
- func (p *S3Provider) Delete(ctx context.Context, bucket, key string) error
- func (p *S3Provider) Download(ctx context.Context, bucket, key string) (io.ReadCloser, error)
- func (p *S3Provider) Exists(ctx context.Context, bucket, key string) (bool, error)
- func (p *S3Provider) Ping(ctx context.Context, bucket string) error
- func (p *S3Provider) Provider() string
- func (p *S3Provider) Upload(ctx context.Context, bucket, key string, data io.Reader, opts UploadOptions) error
- type TaskHTMLSource
- type UploadOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func TaskHTMLObjectPath ¶
TaskHTMLObjectPath returns the canonical object path for a task HTML blob in both hot storage and cold storage.
Types ¶
type ArchiveCandidate ¶
type ArchiveCandidate struct {
TaskID string
JobID string
StorageBucket string
StoragePath string
SHA256 string
CompressedSizeBytes int64
ContentType string
ContentEncoding string
}
ArchiveCandidate represents a single item eligible for archival.
type ArchiveSource ¶
type ArchiveSource interface {
// Name returns a human-readable label, e.g. "task_html".
Name() string
// FindCandidates returns up to batchSize items eligible for archival.
FindCandidates(ctx context.Context, batchSize int) ([]ArchiveCandidate, error)
// OnArchived is called after the candidate has been safely persisted
// in cold storage and verified.
OnArchived(ctx context.Context, candidate ArchiveCandidate, provider, bucket, key string) error
}
ArchiveSource abstracts a category of data that can be archived. Each implementation knows how to find candidates and mark them done.
type Archiver ¶
type Archiver struct {
// contains filtered or unexported fields
}
Archiver runs periodic sweeps to move data from hot to cold storage.
func NewArchiver ¶
func NewArchiver(provider ColdStorageProvider, storage storageDownloader, cfg Config, markJobsDone JobMarkerFunc, sources ...ArchiveSource) *Archiver
NewArchiver creates an archiver with the given provider, hot-storage client, and one or more archive sources.
type ColdStorageProvider ¶
type ColdStorageProvider interface {
// Ping verifies that the provider can reach the given bucket.
// Call once at startup to catch bad credentials/endpoints early.
Ping(ctx context.Context, bucket string) error
// Upload writes data to the given bucket/key.
Upload(ctx context.Context, bucket, key string, data io.Reader, opts UploadOptions) error
// Download retrieves an object by bucket/key.
Download(ctx context.Context, bucket, key string) (io.ReadCloser, error)
// Exists returns true if the object exists and is readable.
Exists(ctx context.Context, bucket, key string) (bool, error)
// Provider returns the short name of the backend ("r2", "s3", "b2").
Provider() string
}
ColdStorageProvider abstracts an S3-compatible object store.
func ProviderFromEnv ¶
func ProviderFromEnv() (ColdStorageProvider, error)
ProviderFromEnv builds a ColdStorageProvider from ARCHIVE_* env vars. Returns (nil, nil) if ARCHIVE_PROVIDER is unset.
type Config ¶
type Config struct {
Provider string // "r2", "s3", "b2"
Bucket string // cold-storage bucket name
RetentionJobs int // keep this many recent jobs hot per domain/org
Interval time.Duration // time between archive sweeps
BatchSize int // candidates per sweep
Concurrency int // parallel upload workers
}
Config controls the archiver's runtime behaviour.
func ConfigFromEnv ¶
func ConfigFromEnv() *Config
ConfigFromEnv builds a Config from ARCHIVE_* environment variables. Returns nil if ARCHIVE_PROVIDER is unset (feature disabled).
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns sensible defaults, overridable via environment.
type JobMarkerFunc ¶
JobMarkerFunc marks fully-archived jobs. Called at the start of each sweep.
type S3Provider ¶
type S3Provider struct {
// contains filtered or unexported fields
}
S3Provider implements ColdStorageProvider for any S3-compatible service (Cloudflare R2, Backblaze B2, AWS S3, MinIO, etc.).
func NewS3Provider ¶
func NewS3Provider(endpoint, accessKeyID, secretAccessKey, region, providerName string) (*S3Provider, error)
NewS3Provider creates a provider from explicit credentials.
func (*S3Provider) Delete ¶
func (p *S3Provider) Delete(ctx context.Context, bucket, key string) error
func (*S3Provider) Download ¶
func (p *S3Provider) Download(ctx context.Context, bucket, key string) (io.ReadCloser, error)
func (*S3Provider) Provider ¶
func (p *S3Provider) Provider() string
func (*S3Provider) Upload ¶
func (p *S3Provider) Upload(ctx context.Context, bucket, key string, data io.Reader, opts UploadOptions) error
type TaskHTMLSource ¶
type TaskHTMLSource struct {
// contains filtered or unexported fields
}
TaskHTMLSource finds and marks task HTML blobs for archival.
func NewTaskHTMLSource ¶
func NewTaskHTMLSource(dbQueue archiveDB, retentionJobs int) *TaskHTMLSource
NewTaskHTMLSource creates an ArchiveSource backed by the given DB queue.
func (*TaskHTMLSource) FindCandidates ¶
func (s *TaskHTMLSource) FindCandidates(ctx context.Context, batchSize int) ([]ArchiveCandidate, error)
func (*TaskHTMLSource) Name ¶
func (s *TaskHTMLSource) Name() string
func (*TaskHTMLSource) OnArchived ¶
func (s *TaskHTMLSource) OnArchived(ctx context.Context, c ArchiveCandidate, provider, bucket, key string) error