store

package
v0.23.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Module wires the default bun-backed ClaimStore and DeleteQueue implementations into the fx graph, and exposes the high-level storage.Files facade composed over them.

Each store constructor is registered twice through fx.As: once under the framework-internal interface (consumed by storage_resource and the worker), once under the minimal public interface (consumed by storage.NewFiles and any business code that takes the facade as a dependency). Both registrations resolve to the same underlying instance.

Functions

This section is empty.

Types

type ClaimStatus

type ClaimStatus string

ClaimStatus enumerates the lifecycle states of an UploadClaim row.

  • StatusPending: the claim's underlying object is not yet finalized (chunked uploads between init_upload and complete_upload). Business code MUST NOT consume a pending claim; ConsumeMany filters them out.
  • StatusUploaded: the object exists in the backend and the claim is eligible for business consumption (Files.OnCreate).
const (
	ClaimStatusPending  ClaimStatus = "pending"
	ClaimStatusUploaded ClaimStatus = "uploaded"
)

type ClaimStore

type ClaimStore interface {
	// ConsumeMany is the only method business code reaches through the
	// public storage.ClaimConsumer interface. Embedding keeps the two
	// surfaces in lock-step at compile time. Only claims with
	// Status='uploaded' are eligible; pending claims behave as if they
	// did not exist and surface ErrClaimNotFound.
	storage.ClaimConsumer

	// Create persists a new pending claim. Returns an error if a row with
	// the same Key already exists.
	Create(ctx context.Context, claim *UploadClaim) error

	// UpdateUploadID sets the upload_id field on an existing claim row.
	// Used by the upload init flow to attach a backend multipart session
	// ID after the claim row has been persisted (INSERT-first ordering).
	// Returns ErrClaimNotFound when no row matches id.
	UpdateUploadID(ctx context.Context, id, uploadID string) error

	// MarkUploaded flips claim.status from 'pending' to 'uploaded' inside
	// tx so the business layer can consume the claim. Used by the
	// complete_upload (and synchronous single-shot upload) paths once
	// the underlying object is finalized. Returns ErrClaimNotFound when
	// no row matches id.
	MarkUploaded(ctx context.Context, tx orm.DB, id string) error

	// Get returns the claim by ID, or ErrClaimNotFound.
	Get(ctx context.Context, id string) (*UploadClaim, error)

	// CountPendingByOwner returns the number of claims with
	// status='pending' owned by the given principal. Used by init_upload
	// to enforce the per-user in-flight session cap.
	CountPendingByOwner(ctx context.Context, owner string) (int, error)

	// GetByKey returns the claim by object key, or ErrClaimNotFound.
	GetByKey(ctx context.Context, key string) (*UploadClaim, error)

	// Consume deletes the claim row for key, executed inside the supplied
	// business transaction tx. Returns ErrClaimNotFound when no row
	// exists for key OR when the row is still pending. tx must be the
	// same orm.DB instance passed to RunInTX.
	Consume(ctx context.Context, tx orm.DB, key string) error

	// ScanExpired returns up to limit pending claims whose ExpiresAt is
	// before now. Uploaded claims are intentionally excluded: their
	// finalized objects are awaiting business consumption and must not be
	// reaped if Consume happens after the original TTL.
	ScanExpired(ctx context.Context, now timex.DateTime, limit int) ([]UploadClaim, error)

	// DeleteByID removes a single claim row, used after the upload abort
	// path has finished cleaning up the corresponding storage side-effects.
	// Non-transactional; safe to call outside a business transaction.
	DeleteByID(ctx context.Context, id string) error

	// DeleteByIDInTx removes a single claim row inside tx. Used by the
	// abort_upload flow so the part-row cascade and the claim-row delete
	// commit together.
	DeleteByIDInTx(ctx context.Context, tx orm.DB, id string) error

	// DeleteByIDs removes multiple claim rows inside the supplied
	// transaction. Used by the claim sweeper to atomically pair the
	// claim-row removal with the corresponding DeleteQueue.Schedule call,
	// guaranteeing the queue and the claim table commit together.
	DeleteByIDs(ctx context.Context, tx orm.DB, ids []string) error
}

ClaimStore persists upload claims. Implementations are expected to be safe for concurrent use. The interface deliberately splits transactional methods (taking an orm.DB tx parameter) from non-transactional ones (Create, Get*, ScanExpired, DeleteByID) used by the upload init flow and by the claim sweeper worker respectively.

Internal type: business code uses the minimal storage.ClaimConsumer interface (which ClaimStore satisfies via embedding) and the higher-level storage.Files facade. ClaimStore itself is consumed only by the init/abort flow (storage_resource) and the claim sweeper.

func NewClaimStore

func NewClaimStore(db orm.DB) ClaimStore

NewClaimStore returns the default ClaimStore implementation backed by the orm.DB abstraction. The concrete SQL dialect is determined by the underlying orm provider; this package depends only on orm.DB.

The returned value also satisfies the public storage.ClaimConsumer interface; the fx graph exposes both surfaces.

type DeleteQueue

type DeleteQueue interface {
	// Schedule is the only method business code reaches through the
	// public storage.DeleteScheduler interface. Embedding keeps the two
	// surfaces in lock-step at compile time.
	storage.DeleteScheduler

	// Enqueue INSERTs fully-formed rows inside tx. Used by the claim
	// sweeper to forward UploadID + claim_expired reason on a per-row
	// basis. items may be empty (no-op).
	Enqueue(ctx context.Context, tx orm.DB, items []PendingDelete) error

	// Lease atomically claims up to limit rows whose NextAttemptAt <= now,
	// pushing each claimed row's NextAttemptAt to now+leaseDuration.
	// Returned rows are the worker's responsibility until Done or Defer is
	// called or the lease expires. leaseDuration should comfortably exceed
	// expected per-item processing time (e.g. 5 minutes for object delete).
	Lease(ctx context.Context, now timex.DateTime, limit int, leaseDuration time.Duration) ([]PendingDelete, error)

	// Done removes the rows identified by ids in a single batch (DELETE).
	// ids may be empty (no-op).
	Done(ctx context.Context, ids []string) error

	// Defer atomically increments Attempts and sets NextAttemptAt = nextAt
	// for the row identified by id. The worker uses this on transient
	// failure with an exponential-backoff timestamp.
	Defer(ctx context.Context, id string, nextAt timex.DateTime) error
}

DeleteQueue is the durable queue backing background object deletion.

Lifecycle:

  1. The CRUD layer Schedules items inside the business transaction; the INSERT commits atomically with the business write.
  2. The delete worker Leases due rows in batches. Lease atomically pushes each leased row's NextAttemptAt into the future (visibility timeout) so concurrent workers (multi-instance deployments, retried jobs) cannot pick the same row.
  3. On successful object deletion the worker calls Done to remove the row. On transient failure the worker calls Defer with a backoff timestamp; on crash the lease silently expires and the row becomes visible to the next Lease.

Deployment notes:

  • Multi-instance: the visibility timeout protects against double- processing within a single tick, but multiple worker instances will still race for the same set of due rows on every tick. The default implementation uses SELECT ... FOR UPDATE SKIP LOCKED so each worker leases a disjoint slice without leader election; SQLite's single-writer model makes the locking degenerate harmlessly.

  • S3 incomplete multipart cleanup: the upload init flow occasionally leaves orphan multipart sessions on the backend if the database write following InitMultipart fails. The framework reaps these through the claim sweeper for happy-path failures, but operators should still configure an S3 lifecycle rule that aborts incomplete multipart uploads after N days as a defense-in-depth measure.

Internal type: business code uses the minimal storage.DeleteScheduler interface (which DeleteQueue satisfies via embedding). DeleteQueue is consumed directly only by the storage worker (Lease/Done/Defer) and by the claim sweeper (Enqueue, which retains UploadID for abort).

func NewDeleteQueue

func NewDeleteQueue(db orm.DB) DeleteQueue

NewDeleteQueue returns the default DeleteQueue implementation backed by the orm.DB abstraction. The concrete SQL dialect is determined by the underlying orm provider; this package depends only on orm.DB.

The Lease implementation issues SELECT ... FOR UPDATE SKIP LOCKED inside a transaction so multi-instance worker deployments can run without leader election: each worker leases a disjoint slice of due rows, and the visibility-timeout UPDATE pushes them out of sight for the lease window. SQLite, which lacks row-level locking, transparently drops the FOR UPDATE clause via the ORM (single-writer DB → no race to begin with). MySQL and PostgreSQL execute the lock as expected.

The returned value also satisfies the public storage.DeleteScheduler interface; the fx graph exposes both surfaces.

type PendingDelete

type PendingDelete struct {
	orm.BaseModel `json:"-" bun:"table:sys_storage_pending_delete,alias:spd"`

	ID            string               `json:"id"            bun:"id,pk"`
	Key           string               `json:"key"           bun:"object_key"`
	UploadID      string               `json:"uploadId"      bun:"upload_id"`
	Reason        storage.DeleteReason `json:"reason"        bun:"reason"`
	Attempts      int                  `json:"attempts"      bun:"attempts"`
	NextAttemptAt timex.DateTime       `json:"nextAttemptAt" bun:"next_attempt_at"`
	CreatedAt     timex.DateTime       `json:"createdAt"     bun:"created_at,skipupdate"`
}

PendingDelete is a queued instruction to delete a single object from the storage backend. Rows are inserted by the CRUD layer inside the same business transaction that dereferenced the file, so the queue inherits the atomicity of the business write. The delete worker drains the queue asynchronously with retry/backoff.

UploadID is non-empty only for rows scheduled by the claim sweeper for expired multipart claims; the worker will best-effort abort the dangling multipart session before deleting the object.

Internal type: business code never constructs PendingDelete values directly. The higher-level storage.Files facade and the public storage.DeleteScheduler interface accept (key, reason) pairs and the implementation builds these rows internally. The claim sweeper uses Enqueue to retain control over UploadID + Reason on a per-row basis.

func (*PendingDelete) IsMultipart

func (p *PendingDelete) IsMultipart() bool

IsMultipart reports whether this pending-delete row references a backend multipart session that must be aborted before the object can be deleted.

type UploadClaim

type UploadClaim struct {
	orm.BaseModel `json:"-" bun:"table:sys_storage_upload_claim,alias:suc"`

	ID               string         `json:"id"               bun:"id,pk"`
	CreatedAt        timex.DateTime `json:"createdAt"        bun:"created_at,skipupdate"`
	CreatedBy        string         `json:"createdBy"        bun:"created_by,skipupdate"`
	Key              string         `json:"key"              bun:"object_key"`
	UploadID         string         `json:"uploadId"         bun:"upload_id"`
	Size             int64          `json:"size"             bun:"size"`
	ContentType      string         `json:"contentType"      bun:"content_type"`
	OriginalFilename string         `json:"originalFilename" bun:"original_filename"`
	Status           ClaimStatus    `json:"status"           bun:"status"`
	Public           bool           `json:"public"           bun:"public"`
	PartSize         int64          `json:"partSize"         bun:"part_size"`
	PartCount        int            `json:"partCount"        bun:"part_count"`
	ExpiresAt        timex.DateTime `json:"expiresAt"        bun:"expires_at"`
}

UploadClaim is the in-flight bookkeeping row for an upload that has not yet been adopted by a business transaction. Claims are short-lived: inserted by upload / init_upload, transitioned to 'uploaded' by upload / complete_upload, then deleted by Consume (when business commits) or by ScanExpired + DeleteByID (when the TTL elapses).

The row carries enough context to abort multipart sessions and delete abandoned objects. It is NOT a long-term audit record.

Internal type: only the storage worker, upload flow, and the claim sweeper construct or consume UploadClaim values. Business code interacts with claims indirectly through storage.ClaimConsumer (and the higher-level storage.Files facade).

func (*UploadClaim) IsMultipart

func (c *UploadClaim) IsMultipart() bool

IsMultipart reports whether the claim represents a multipart upload session (UploadID is non-empty).

func (*UploadClaim) IsUploaded

func (c *UploadClaim) IsUploaded() bool

IsUploaded reports whether the claim's underlying object has been finalized in the backend and is eligible for business consumption.

type UploadPart

type UploadPart struct {
	orm.BaseModel `json:"-" bun:"table:sys_storage_upload_part,alias:sup"`

	ID         string         `json:"id"         bun:"id,pk"`
	ClaimID    string         `json:"claimId"    bun:"claim_id"`
	PartNumber int            `json:"partNumber" bun:"part_number"`
	ETag       string         `json:"eTag"       bun:"etag"`
	Size       int64          `json:"size"       bun:"size"`
	CreatedAt  timex.DateTime `json:"createdAt"  bun:"created_at,skipupdate"`
}

UploadPart records one chunk of an in-flight chunked upload. Rows are inserted by storage_resource's upload_part handler after the backend returns the part's ETag and are deleted by complete_upload / abort (or cascaded when the parent claim is deleted by the sweeper).

Internal type: business code never sees parts. The Files facade and public storage.ClaimConsumer interface only deal in finalized claims.

type UploadPartStore

type UploadPartStore interface {
	// Upsert inserts a new part row or overwrites the ETag / Size of an
	// existing (claim_id, part_number) row. Used by upload_part to make
	// part re-upload safe (the backend returns a fresh ETag and the
	// caller persists it without caring about previous attempts).
	Upsert(ctx context.Context, tx orm.DB, part *UploadPart) error

	// ListByClaim returns every part of the given claim, sorted ascending
	// by PartNumber. complete_upload uses the result to assemble the
	// CompletedPart list it hands to the backend.
	ListByClaim(ctx context.Context, claimID string) ([]UploadPart, error)

	// DeleteByClaim removes every part of the given claim inside tx.
	// Used by complete_upload (after the backend assembles the final
	// object) and abort_upload (cleanup). The schema also cascades on
	// claim deletion; this method is the explicit path for the success
	// case where the claim row remains in 'uploaded' state.
	DeleteByClaim(ctx context.Context, tx orm.DB, claimID string) error
}

UploadPartStore persists per-part bookkeeping for chunked uploads.

All transactional methods (Upsert, DeleteByClaim) take an orm.DB tx parameter — they only execute inside the storage_resource handlers, which already own the surrounding transaction. ListByClaim is non-transactional because complete_upload reads parts before opening its commit transaction.

Internal type: not part of the public storage surface.

func NewUploadPartStore

func NewUploadPartStore(db orm.DB) UploadPartStore

NewUploadPartStore returns the default UploadPartStore implementation backed by the orm.DB abstraction. The concrete SQL dialect is determined by the underlying orm provider; this package depends only on orm.DB.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL